diff --git a/py25/bacpypes/apdu.py b/py25/bacpypes/apdu.py index 6879c28..ca5beb8 100755 --- a/py25/bacpypes/apdu.py +++ b/py25/bacpypes/apdu.py @@ -53,34 +53,41 @@ def register_error_type(klass): error_types[klass.serviceChoice] = klass # -# encode_max_apdu_segments/decode_max_apdu_segments +# encode_max_segments_accepted/decode_max_segments_accepted # -def encode_max_apdu_segments(arg): - if (arg > 64): return 7 - return {None:0, 0:0, 2:1, 4:2, 8:3, 16:4, 32:5, 64:6}.get(arg) +def encode_max_segments_accepted(arg): + """Encode the maximum number of segments the device will accept, Section + 20.1.2.4""" + w = 0 + while (arg and not arg & 1): + w += 1 + arg = (arg >> 1) + return w -def decode_max_apdu_segments(arg): - if (arg >= 7): return 128 - return {0:None, 1:2, 2:4, 3:8, 4:16, 5:32, 6:64}.get(arg) +def decode_max_segments_accepted(arg): + """Decode the maximum number of segments the device will accept, Section + 20.1.2.4""" + return arg and (1 << arg) or None # -# encode_max_apdu_response/decode_max_apdu_response +# encode_max_apdu_length_accepted/decode_max_apdu_length_accepted # -_max_apdu_response_encoding = {0:50, 1:128, 2:206, 3:480, 4:1024, 5:1476} +_max_apdu_response_encoding = [50, 128, 206, 480, 1024, 1476, None, None, + None, None, None, None, None, None, None, None] -def encode_max_apdu_response(arg): - encodings = _max_apdu_response_encoding.items() - encodings.sort(lambda x, y: y[1] - x[1]) - for i, v in encodings: +def encode_max_apdu_length_accepted(arg): + for i, v in enumerate(_max_apdu_response_encoding): if (v <= arg): return i - raise ValueError("invalid max APDU response encoding: %d" % (arg,)) + raise ValueError("invalid max APDU length accepted: {0}".format(arg)) -def decode_max_apdu_response(arg): - return _max_apdu_response_encoding.get(arg) +def decode_max_apdu_length_accepted(arg): + v = _max_apdu_response_encoding[arg] + if not v: + raise ValueError("invalid max APDU length accepted: {0}".format(arg)) # # APCI @@ -162,7 +169,7 @@ class APCI(PCI, DebugContents): if self.apduSA: buff += 0x02 pdu.put(buff) - pdu.put((encode_max_apdu_segments(self.apduMaxSegs) << 4) + encode_max_apdu_response(self.apduMaxResp)) + pdu.put((encode_max_segments_accepted(self.apduMaxSegs) << 4) + encode_max_apdu_length_accepted(self.apduMaxResp)) pdu.put(self.apduInvokeID) if self.apduSeg: pdu.put(self.apduSeq) @@ -243,8 +250,8 @@ class APCI(PCI, DebugContents): self.apduMor = ((buff & 0x04) != 0) self.apduSA = ((buff & 0x02) != 0) buff = pdu.get() - self.apduMaxSegs = decode_max_apdu_segments( (buff >> 4) & 0x07 ) - self.apduMaxResp = decode_max_apdu_response( buff & 0x0F ) + self.apduMaxSegs = decode_max_segments_accepted( (buff >> 4) & 0x07 ) + self.apduMaxResp = decode_max_apdu_length_accepted( buff & 0x0F ) self.apduInvokeID = pdu.get() if self.apduSeg: self.apduSeq = pdu.get() diff --git a/py27/bacpypes/apdu.py b/py27/bacpypes/apdu.py index 872f11c..6582bea 100755 --- a/py27/bacpypes/apdu.py +++ b/py27/bacpypes/apdu.py @@ -53,34 +53,41 @@ def register_error_type(klass): error_types[klass.serviceChoice] = klass # -# encode_max_apdu_segments/decode_max_apdu_segments +# encode_max_segments_accepted/decode_max_segments_accepted # -def encode_max_apdu_segments(arg): - if (arg > 64): return 7 - return {None:0, 0:0, 2:1, 4:2, 8:3, 16:4, 32:5, 64:6}.get(arg) +def encode_max_segments_accepted(arg): + """Encode the maximum number of segments the device will accept, Section + 20.1.2.4""" + w = 0 + while (arg and not arg & 1): + w += 1 + arg = (arg >> 1) + return w -def decode_max_apdu_segments(arg): - if (arg >= 7): return 128 - return {0:None, 1:2, 2:4, 3:8, 4:16, 5:32, 6:64}.get(arg) +def decode_max_segments_accepted(arg): + """Decode the maximum number of segments the device will accept, Section + 20.1.2.4""" + return arg and (1 << arg) or None # -# encode_max_apdu_response/decode_max_apdu_response +# encode_max_apdu_length_accepted/decode_max_apdu_length_accepted # -_max_apdu_response_encoding = {0:50, 1:128, 2:206, 3:480, 4:1024, 5:1476} +_max_apdu_response_encoding = [50, 128, 206, 480, 1024, 1476, None, None, + None, None, None, None, None, None, None, None] -def encode_max_apdu_response(arg): - encodings = _max_apdu_response_encoding.items() - encodings.sort(lambda x, y: y[1] - x[1]) - for i, v in encodings: +def encode_max_apdu_length_accepted(arg): + for i, v in enumerate(_max_apdu_response_encoding): if (v <= arg): return i - raise ValueError("invalid max APDU response encoding: {0}".format(arg)) + raise ValueError("invalid max APDU length accepted: {0}".format(arg)) -def decode_max_apdu_response(arg): - return _max_apdu_response_encoding.get(arg) +def decode_max_apdu_length_accepted(arg): + v = _max_apdu_response_encoding[arg] + if not v: + raise ValueError("invalid max APDU length accepted: {0}".format(arg)) # # APCI @@ -163,7 +170,7 @@ class APCI(PCI, DebugContents): if self.apduSA: buff += 0x02 pdu.put(buff) - pdu.put((encode_max_apdu_segments(self.apduMaxSegs) << 4) + encode_max_apdu_response(self.apduMaxResp)) + pdu.put((encode_max_segments_accepted(self.apduMaxSegs) << 4) + encode_max_apdu_length_accepted(self.apduMaxResp)) pdu.put(self.apduInvokeID) if self.apduSeg: pdu.put(self.apduSeq) @@ -244,8 +251,8 @@ class APCI(PCI, DebugContents): self.apduMor = ((buff & 0x04) != 0) self.apduSA = ((buff & 0x02) != 0) buff = pdu.get() - self.apduMaxSegs = decode_max_apdu_segments( (buff >> 4) & 0x07 ) - self.apduMaxResp = decode_max_apdu_response( buff & 0x0F ) + self.apduMaxSegs = decode_max_segments_accepted( (buff >> 4) & 0x07 ) + self.apduMaxResp = decode_max_apdu_length_accepted( buff & 0x0F ) self.apduInvokeID = pdu.get() if self.apduSeg: self.apduSeq = pdu.get() diff --git a/py34/bacpypes/apdu.py b/py34/bacpypes/apdu.py index 5637732..b3c1ca8 100755 --- a/py34/bacpypes/apdu.py +++ b/py34/bacpypes/apdu.py @@ -53,34 +53,41 @@ def register_error_type(klass): error_types[klass.serviceChoice] = klass # -# encode_max_apdu_segments/decode_max_apdu_segments +# encode_max_segments_accepted/decode_max_segments_accepted # -def encode_max_apdu_segments(arg): - if (arg > 64): return 7 - return {None:0, 0:0, 2:1, 4:2, 8:3, 16:4, 32:5, 64:6}.get(arg) +def encode_max_segments_accepted(arg): + """Encode the maximum number of segments the device will accept, Section + 20.1.2.4""" + w = 0 + while (arg and not arg & 1): + w += 1 + arg = (arg >> 1) + return w -def decode_max_apdu_segments(arg): - if (arg >= 7): return 128 - return {0:None, 1:2, 2:4, 3:8, 4:16, 5:32, 6:64}.get(arg) +def decode_max_segments_accepted(arg): + """Decode the maximum number of segments the device will accept, Section + 20.1.2.4""" + return arg and (1 << arg) or None # -# encode_max_apdu_response/decode_max_apdu_response +# encode_max_apdu_length_accepted/decode_max_apdu_length_accepted # -_max_apdu_response_encoding = {0:50, 1:128, 2:206, 3:480, 4:1024, 5:1476} +_max_apdu_response_encoding = [50, 128, 206, 480, 1024, 1476, None, None, + None, None, None, None, None, None, None, None] -def encode_max_apdu_response(arg): - encodings = _max_apdu_response_encoding.items() - encodings.sort(lambda x, y: y[1] - x[1]) - for i, v in encodings: +def encode_max_apdu_length_accepted(arg): + for i, v in enumerate(_max_apdu_response_encoding): if (v <= arg): return i - raise ValueError("invalid max APDU response encoding: {0}".format(arg)) + raise ValueError("invalid max APDU length accepted: {0}".format(arg)) -def decode_max_apdu_response(arg): - return _max_apdu_response_encoding.get(arg) +def decode_max_apdu_length_accepted(arg): + v = _max_apdu_response_encoding[arg] + if not v: + raise ValueError("invalid max APDU length accepted: {0}".format(arg)) # # APCI @@ -163,7 +170,7 @@ class APCI(PCI, DebugContents): if self.apduSA: buff += 0x02 pdu.put(buff) - pdu.put((encode_max_apdu_segments(self.apduMaxSegs) << 4) + encode_max_apdu_response(self.apduMaxResp)) + pdu.put((encode_max_segments_accepted(self.apduMaxSegs) << 4) + encode_max_apdu_length_accepted(self.apduMaxResp)) pdu.put(self.apduInvokeID) if self.apduSeg: pdu.put(self.apduSeq) @@ -244,8 +251,8 @@ class APCI(PCI, DebugContents): self.apduMor = ((buff & 0x04) != 0) self.apduSA = ((buff & 0x02) != 0) buff = pdu.get() - self.apduMaxSegs = decode_max_apdu_segments( (buff >> 4) & 0x07 ) - self.apduMaxResp = decode_max_apdu_response( buff & 0x0F ) + self.apduMaxSegs = decode_max_segments_accepted( (buff >> 4) & 0x07 ) + self.apduMaxResp = decode_max_apdu_length_accepted( buff & 0x0F ) self.apduInvokeID = pdu.get() if self.apduSeg: self.apduSeq = pdu.get() diff --git a/tests/test_apdu/__init__.py b/tests/test_apdu/__init__.py index e4bf8f2..6ebaef7 100644 --- a/tests/test_apdu/__init__.py +++ b/tests/test_apdu/__init__.py @@ -4,4 +4,4 @@ Test BACpypes APDU Module """ -from . import test_max_apdu_response +from . import test_max_apdu_length_accepted diff --git a/tests/test_apdu/test_max_apdu_length_accepted.py b/tests/test_apdu/test_max_apdu_length_accepted.py new file mode 100644 index 0000000..126ec39 --- /dev/null +++ b/tests/test_apdu/test_max_apdu_length_accepted.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +Test Max APDU Length Accepted +----------------------------- +""" + +import unittest + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + + +@bacpypes_debugging +class TestMaxAPDULengthAccepted(unittest.TestCase): + + def test_max_apdu_length_accepted_encode(self): + if _debug: TestMaxAPDULengthAccepted._debug("test_max_apdu_length_accepted_encode") + + def test_max_apdu_length_accepted_decode(self): + if _debug: TestMaxAPDULengthAccepted._debug("test_max_apdu_length_accepted_decode") diff --git a/tests/test_apdu/test_max_apdu_response.py b/tests/test_apdu/test_max_apdu_response.py deleted file mode 100644 index d35c5ed..0000000 --- a/tests/test_apdu/test_max_apdu_response.py +++ /dev/null @@ -1,25 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -""" -Test Max APDU Response ----------------------- -""" - -import unittest - -from bacpypes.debugging import bacpypes_debugging, ModuleLogger - -# some debugging -_debug = 0 -_log = ModuleLogger(globals()) - - -@bacpypes_debugging -class TestMaxAPDUResponse(unittest.TestCase): - - def test_max_apdu_response_encode(self): - if _debug: TestAddress._debug("test_max_apdu_response_encode") - - def test_max_apdu_response_decode(self): - if _debug: TestAddress._debug("test_max_apdu_response_decode")