mirror of
https://github.com/JoelBender/bacpypes
synced 2025-09-28 22:15:23 +08:00
fix the names of the encode and decode functions
This commit is contained in:
parent
5b448d860c
commit
aecb06c727
|
@ -53,34 +53,41 @@ def register_error_type(klass):
|
|||
error_types[klass.serviceChoice] = klass
|
||||
|
||||
#
|
||||
# encode_max_apdu_segments/decode_max_apdu_segments
|
||||
# encode_max_segments_accepted/decode_max_segments_accepted
|
||||
#
|
||||
|
||||
def encode_max_apdu_segments(arg):
|
||||
if (arg > 64): return 7
|
||||
return {None:0, 0:0, 2:1, 4:2, 8:3, 16:4, 32:5, 64:6}.get(arg)
|
||||
def encode_max_segments_accepted(arg):
|
||||
"""Encode the maximum number of segments the device will accept, Section
|
||||
20.1.2.4"""
|
||||
w = 0
|
||||
while (arg and not arg & 1):
|
||||
w += 1
|
||||
arg = (arg >> 1)
|
||||
return w
|
||||
|
||||
def decode_max_apdu_segments(arg):
|
||||
if (arg >= 7): return 128
|
||||
return {0:None, 1:2, 2:4, 3:8, 4:16, 5:32, 6:64}.get(arg)
|
||||
def decode_max_segments_accepted(arg):
|
||||
"""Decode the maximum number of segments the device will accept, Section
|
||||
20.1.2.4"""
|
||||
return arg and (1 << arg) or None
|
||||
|
||||
#
|
||||
# encode_max_apdu_response/decode_max_apdu_response
|
||||
# encode_max_apdu_length_accepted/decode_max_apdu_length_accepted
|
||||
#
|
||||
|
||||
_max_apdu_response_encoding = {0:50, 1:128, 2:206, 3:480, 4:1024, 5:1476}
|
||||
_max_apdu_response_encoding = [50, 128, 206, 480, 1024, 1476, None, None,
|
||||
None, None, None, None, None, None, None, None]
|
||||
|
||||
def encode_max_apdu_response(arg):
|
||||
encodings = _max_apdu_response_encoding.items()
|
||||
encodings.sort(lambda x, y: y[1] - x[1])
|
||||
for i, v in encodings:
|
||||
def encode_max_apdu_length_accepted(arg):
|
||||
for i, v in enumerate(_max_apdu_response_encoding):
|
||||
if (v <= arg):
|
||||
return i
|
||||
|
||||
raise ValueError("invalid max APDU response encoding: %d" % (arg,))
|
||||
raise ValueError("invalid max APDU length accepted: {0}".format(arg))
|
||||
|
||||
def decode_max_apdu_response(arg):
|
||||
return _max_apdu_response_encoding.get(arg)
|
||||
def decode_max_apdu_length_accepted(arg):
|
||||
v = _max_apdu_response_encoding[arg]
|
||||
if not v:
|
||||
raise ValueError("invalid max APDU length accepted: {0}".format(arg))
|
||||
|
||||
#
|
||||
# APCI
|
||||
|
@ -162,7 +169,7 @@ class APCI(PCI, DebugContents):
|
|||
if self.apduSA:
|
||||
buff += 0x02
|
||||
pdu.put(buff)
|
||||
pdu.put((encode_max_apdu_segments(self.apduMaxSegs) << 4) + encode_max_apdu_response(self.apduMaxResp))
|
||||
pdu.put((encode_max_segments_accepted(self.apduMaxSegs) << 4) + encode_max_apdu_length_accepted(self.apduMaxResp))
|
||||
pdu.put(self.apduInvokeID)
|
||||
if self.apduSeg:
|
||||
pdu.put(self.apduSeq)
|
||||
|
@ -243,8 +250,8 @@ class APCI(PCI, DebugContents):
|
|||
self.apduMor = ((buff & 0x04) != 0)
|
||||
self.apduSA = ((buff & 0x02) != 0)
|
||||
buff = pdu.get()
|
||||
self.apduMaxSegs = decode_max_apdu_segments( (buff >> 4) & 0x07 )
|
||||
self.apduMaxResp = decode_max_apdu_response( buff & 0x0F )
|
||||
self.apduMaxSegs = decode_max_segments_accepted( (buff >> 4) & 0x07 )
|
||||
self.apduMaxResp = decode_max_apdu_length_accepted( buff & 0x0F )
|
||||
self.apduInvokeID = pdu.get()
|
||||
if self.apduSeg:
|
||||
self.apduSeq = pdu.get()
|
||||
|
|
|
@ -53,34 +53,41 @@ def register_error_type(klass):
|
|||
error_types[klass.serviceChoice] = klass
|
||||
|
||||
#
|
||||
# encode_max_apdu_segments/decode_max_apdu_segments
|
||||
# encode_max_segments_accepted/decode_max_segments_accepted
|
||||
#
|
||||
|
||||
def encode_max_apdu_segments(arg):
|
||||
if (arg > 64): return 7
|
||||
return {None:0, 0:0, 2:1, 4:2, 8:3, 16:4, 32:5, 64:6}.get(arg)
|
||||
def encode_max_segments_accepted(arg):
|
||||
"""Encode the maximum number of segments the device will accept, Section
|
||||
20.1.2.4"""
|
||||
w = 0
|
||||
while (arg and not arg & 1):
|
||||
w += 1
|
||||
arg = (arg >> 1)
|
||||
return w
|
||||
|
||||
def decode_max_apdu_segments(arg):
|
||||
if (arg >= 7): return 128
|
||||
return {0:None, 1:2, 2:4, 3:8, 4:16, 5:32, 6:64}.get(arg)
|
||||
def decode_max_segments_accepted(arg):
|
||||
"""Decode the maximum number of segments the device will accept, Section
|
||||
20.1.2.4"""
|
||||
return arg and (1 << arg) or None
|
||||
|
||||
#
|
||||
# encode_max_apdu_response/decode_max_apdu_response
|
||||
# encode_max_apdu_length_accepted/decode_max_apdu_length_accepted
|
||||
#
|
||||
|
||||
_max_apdu_response_encoding = {0:50, 1:128, 2:206, 3:480, 4:1024, 5:1476}
|
||||
_max_apdu_response_encoding = [50, 128, 206, 480, 1024, 1476, None, None,
|
||||
None, None, None, None, None, None, None, None]
|
||||
|
||||
def encode_max_apdu_response(arg):
|
||||
encodings = _max_apdu_response_encoding.items()
|
||||
encodings.sort(lambda x, y: y[1] - x[1])
|
||||
for i, v in encodings:
|
||||
def encode_max_apdu_length_accepted(arg):
|
||||
for i, v in enumerate(_max_apdu_response_encoding):
|
||||
if (v <= arg):
|
||||
return i
|
||||
|
||||
raise ValueError("invalid max APDU response encoding: {0}".format(arg))
|
||||
raise ValueError("invalid max APDU length accepted: {0}".format(arg))
|
||||
|
||||
def decode_max_apdu_response(arg):
|
||||
return _max_apdu_response_encoding.get(arg)
|
||||
def decode_max_apdu_length_accepted(arg):
|
||||
v = _max_apdu_response_encoding[arg]
|
||||
if not v:
|
||||
raise ValueError("invalid max APDU length accepted: {0}".format(arg))
|
||||
|
||||
#
|
||||
# APCI
|
||||
|
@ -163,7 +170,7 @@ class APCI(PCI, DebugContents):
|
|||
if self.apduSA:
|
||||
buff += 0x02
|
||||
pdu.put(buff)
|
||||
pdu.put((encode_max_apdu_segments(self.apduMaxSegs) << 4) + encode_max_apdu_response(self.apduMaxResp))
|
||||
pdu.put((encode_max_segments_accepted(self.apduMaxSegs) << 4) + encode_max_apdu_length_accepted(self.apduMaxResp))
|
||||
pdu.put(self.apduInvokeID)
|
||||
if self.apduSeg:
|
||||
pdu.put(self.apduSeq)
|
||||
|
@ -244,8 +251,8 @@ class APCI(PCI, DebugContents):
|
|||
self.apduMor = ((buff & 0x04) != 0)
|
||||
self.apduSA = ((buff & 0x02) != 0)
|
||||
buff = pdu.get()
|
||||
self.apduMaxSegs = decode_max_apdu_segments( (buff >> 4) & 0x07 )
|
||||
self.apduMaxResp = decode_max_apdu_response( buff & 0x0F )
|
||||
self.apduMaxSegs = decode_max_segments_accepted( (buff >> 4) & 0x07 )
|
||||
self.apduMaxResp = decode_max_apdu_length_accepted( buff & 0x0F )
|
||||
self.apduInvokeID = pdu.get()
|
||||
if self.apduSeg:
|
||||
self.apduSeq = pdu.get()
|
||||
|
|
|
@ -53,34 +53,41 @@ def register_error_type(klass):
|
|||
error_types[klass.serviceChoice] = klass
|
||||
|
||||
#
|
||||
# encode_max_apdu_segments/decode_max_apdu_segments
|
||||
# encode_max_segments_accepted/decode_max_segments_accepted
|
||||
#
|
||||
|
||||
def encode_max_apdu_segments(arg):
|
||||
if (arg > 64): return 7
|
||||
return {None:0, 0:0, 2:1, 4:2, 8:3, 16:4, 32:5, 64:6}.get(arg)
|
||||
def encode_max_segments_accepted(arg):
|
||||
"""Encode the maximum number of segments the device will accept, Section
|
||||
20.1.2.4"""
|
||||
w = 0
|
||||
while (arg and not arg & 1):
|
||||
w += 1
|
||||
arg = (arg >> 1)
|
||||
return w
|
||||
|
||||
def decode_max_apdu_segments(arg):
|
||||
if (arg >= 7): return 128
|
||||
return {0:None, 1:2, 2:4, 3:8, 4:16, 5:32, 6:64}.get(arg)
|
||||
def decode_max_segments_accepted(arg):
|
||||
"""Decode the maximum number of segments the device will accept, Section
|
||||
20.1.2.4"""
|
||||
return arg and (1 << arg) or None
|
||||
|
||||
#
|
||||
# encode_max_apdu_response/decode_max_apdu_response
|
||||
# encode_max_apdu_length_accepted/decode_max_apdu_length_accepted
|
||||
#
|
||||
|
||||
_max_apdu_response_encoding = {0:50, 1:128, 2:206, 3:480, 4:1024, 5:1476}
|
||||
_max_apdu_response_encoding = [50, 128, 206, 480, 1024, 1476, None, None,
|
||||
None, None, None, None, None, None, None, None]
|
||||
|
||||
def encode_max_apdu_response(arg):
|
||||
encodings = _max_apdu_response_encoding.items()
|
||||
encodings.sort(lambda x, y: y[1] - x[1])
|
||||
for i, v in encodings:
|
||||
def encode_max_apdu_length_accepted(arg):
|
||||
for i, v in enumerate(_max_apdu_response_encoding):
|
||||
if (v <= arg):
|
||||
return i
|
||||
|
||||
raise ValueError("invalid max APDU response encoding: {0}".format(arg))
|
||||
raise ValueError("invalid max APDU length accepted: {0}".format(arg))
|
||||
|
||||
def decode_max_apdu_response(arg):
|
||||
return _max_apdu_response_encoding.get(arg)
|
||||
def decode_max_apdu_length_accepted(arg):
|
||||
v = _max_apdu_response_encoding[arg]
|
||||
if not v:
|
||||
raise ValueError("invalid max APDU length accepted: {0}".format(arg))
|
||||
|
||||
#
|
||||
# APCI
|
||||
|
@ -163,7 +170,7 @@ class APCI(PCI, DebugContents):
|
|||
if self.apduSA:
|
||||
buff += 0x02
|
||||
pdu.put(buff)
|
||||
pdu.put((encode_max_apdu_segments(self.apduMaxSegs) << 4) + encode_max_apdu_response(self.apduMaxResp))
|
||||
pdu.put((encode_max_segments_accepted(self.apduMaxSegs) << 4) + encode_max_apdu_length_accepted(self.apduMaxResp))
|
||||
pdu.put(self.apduInvokeID)
|
||||
if self.apduSeg:
|
||||
pdu.put(self.apduSeq)
|
||||
|
@ -244,8 +251,8 @@ class APCI(PCI, DebugContents):
|
|||
self.apduMor = ((buff & 0x04) != 0)
|
||||
self.apduSA = ((buff & 0x02) != 0)
|
||||
buff = pdu.get()
|
||||
self.apduMaxSegs = decode_max_apdu_segments( (buff >> 4) & 0x07 )
|
||||
self.apduMaxResp = decode_max_apdu_response( buff & 0x0F )
|
||||
self.apduMaxSegs = decode_max_segments_accepted( (buff >> 4) & 0x07 )
|
||||
self.apduMaxResp = decode_max_apdu_length_accepted( buff & 0x0F )
|
||||
self.apduInvokeID = pdu.get()
|
||||
if self.apduSeg:
|
||||
self.apduSeq = pdu.get()
|
||||
|
|
|
@ -4,4 +4,4 @@
|
|||
Test BACpypes APDU Module
|
||||
"""
|
||||
|
||||
from . import test_max_apdu_response
|
||||
from . import test_max_apdu_length_accepted
|
||||
|
|
25
tests/test_apdu/test_max_apdu_length_accepted.py
Normal file
25
tests/test_apdu/test_max_apdu_length_accepted.py
Normal file
|
@ -0,0 +1,25 @@
|
|||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""
|
||||
Test Max APDU Length Accepted
|
||||
-----------------------------
|
||||
"""
|
||||
|
||||
import unittest
|
||||
|
||||
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
|
||||
|
||||
# some debugging
|
||||
_debug = 0
|
||||
_log = ModuleLogger(globals())
|
||||
|
||||
|
||||
@bacpypes_debugging
|
||||
class TestMaxAPDULengthAccepted(unittest.TestCase):
|
||||
|
||||
def test_max_apdu_length_accepted_encode(self):
|
||||
if _debug: TestMaxAPDULengthAccepted._debug("test_max_apdu_length_accepted_encode")
|
||||
|
||||
def test_max_apdu_length_accepted_decode(self):
|
||||
if _debug: TestMaxAPDULengthAccepted._debug("test_max_apdu_length_accepted_decode")
|
|
@ -1,25 +0,0 @@
|
|||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""
|
||||
Test Max APDU Response
|
||||
----------------------
|
||||
"""
|
||||
|
||||
import unittest
|
||||
|
||||
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
|
||||
|
||||
# some debugging
|
||||
_debug = 0
|
||||
_log = ModuleLogger(globals())
|
||||
|
||||
|
||||
@bacpypes_debugging
|
||||
class TestMaxAPDUResponse(unittest.TestCase):
|
||||
|
||||
def test_max_apdu_response_encode(self):
|
||||
if _debug: TestAddress._debug("test_max_apdu_response_encode")
|
||||
|
||||
def test_max_apdu_response_decode(self):
|
||||
if _debug: TestAddress._debug("test_max_apdu_response_decode")
|
Loading…
Reference in New Issue
Block a user