1
0
mirror of https://github.com/JoelBender/bacpypes synced 2025-09-28 22:15:23 +08:00

simplify xtob

This commit is contained in:
Joel Bender 2017-09-12 22:06:22 -04:00
parent 3e1f813f71
commit ab70b4a267
6 changed files with 47 additions and 59 deletions

View File

@ -5,6 +5,7 @@ Debugging
"""
import sys
import re
import logging
import binascii
from cStringIO import StringIO
@ -40,9 +41,8 @@ def btox(data, sep=''):
def xtob(data, sep=''):
"""Interpret the hex encoding of a blob (string)."""
# if there is a separator, remove it
if sep:
data = ''.join(data.split(sep))
# remove the non-hex characters
data = re.sub("[^0-9a-fA-F]", '', data)
# interpret the hex
return binascii.unhexlify(data)

View File

@ -5,6 +5,7 @@ Debugging
"""
import sys
import re
import logging
import binascii
from cStringIO import StringIO
@ -40,9 +41,8 @@ def btox(data, sep=''):
def xtob(data, sep=''):
"""Interpret the hex encoding of a blob (string)."""
# if there is a separator, remove it
if sep:
data = ''.join(data.split(sep))
# remove the non-hex characters
data = re.sub("[^0-9a-fA-F]", '', data)
# interpret the hex
return binascii.unhexlify(data)

View File

@ -5,6 +5,7 @@ Debugging
"""
import sys
import re
import logging
import binascii
from io import StringIO
@ -40,9 +41,8 @@ def btox(data, sep=''):
def xtob(data, sep=''):
"""Interpret the hex encoding of a blob (byte string)."""
# if there is a separator, remove it
if sep:
data = ''.join(data.split(sep))
# remove the non-hex characters
data = re.sub("[^0-9a-fA-F]", '', data)
# interpret the hex
return binascii.unhexlify(data)

View File

@ -30,10 +30,6 @@ _debug = 0
_log = ModuleLogger(globals())
# extended form of xtob that first removes whitespace and period seperators
xxtob = lambda s: xtob(''.join(s.split()).replace('.', ''))
@bacpypes_debugging
class TestAnnexJCodec(unittest.TestCase):
@ -69,7 +65,7 @@ class TestAnnexJCodec(unittest.TestCase):
if _debug: TestAnnexJCodec._debug("test_codec_01")
# Request successful
pdu_bytes = xxtob('81.00.0006.0000')
pdu_bytes = xtob('81.00.0006.0000')
self.request(Result(0))
self.indication(pduData=pdu_bytes)
@ -78,7 +74,7 @@ class TestAnnexJCodec(unittest.TestCase):
self.confirmation(Result, bvlciResultCode=0)
# Request error condition
pdu_bytes = xxtob('81.00.0006.0001')
pdu_bytes = xtob('81.00.0006.0001')
self.request(Result(1))
self.indication(pduData=pdu_bytes)
@ -91,7 +87,7 @@ class TestAnnexJCodec(unittest.TestCase):
if _debug: TestAnnexJCodec._debug("test_write_broadcast_distribution_table")
# write an empty table
pdu_bytes = xxtob('81.01.0004')
pdu_bytes = xtob('81.01.0004')
self.request(WriteBroadcastDistributionTable([]))
self.indication(pduData=pdu_bytes)
@ -101,7 +97,7 @@ class TestAnnexJCodec(unittest.TestCase):
# write a table with an element
addr = Address('192.168.0.254/24')
pdu_bytes = xxtob('81.01.000e'
pdu_bytes = xtob('81.01.000e'
'c0.a8.00.fe.ba.c0 ff.ff.ff.00' # address and mask
)
@ -116,7 +112,7 @@ class TestAnnexJCodec(unittest.TestCase):
if _debug: TestAnnexJCodec._debug("test_read_broadcast_distribution_table")
# read the table
pdu_bytes = xxtob('81.02.0004')
pdu_bytes = xtob('81.02.0004')
self.request(ReadBroadcastDistributionTable())
self.indication(pduData=pdu_bytes)
@ -129,7 +125,7 @@ class TestAnnexJCodec(unittest.TestCase):
if _debug: TestAnnexJCodec._debug("test_read_broadcast_distribution_table_ack")
# read returns an empty table
pdu_bytes = xxtob('81.03.0004')
pdu_bytes = xtob('81.03.0004')
self.request(ReadBroadcastDistributionTableAck([]))
self.indication(pduData=pdu_bytes)
@ -139,7 +135,7 @@ class TestAnnexJCodec(unittest.TestCase):
# read returns a table with an element
addr = Address('192.168.0.254/24')
pdu_bytes = xxtob('81.03.000e' # bvlci
pdu_bytes = xtob('81.03.000e' # bvlci
'c0.a8.00.fe.ba.c0 ff.ff.ff.00' # address and mask
)
@ -155,8 +151,8 @@ class TestAnnexJCodec(unittest.TestCase):
# read returns a table with an element
addr = Address('192.168.0.1')
xpdu = xxtob('deadbeef')
pdu_bytes = xxtob('81.04.000e' # bvlci
xpdu = xtob('deadbeef')
pdu_bytes = xtob('81.04.000e' # bvlci
'c0.a8.00.01.ba.c0' # original source address
'deadbeef' # forwarded PDU
)
@ -172,7 +168,7 @@ class TestAnnexJCodec(unittest.TestCase):
if _debug: TestAnnexJCodec._debug("test_register_foreign_device")
# register as a foreign device with a 30 second time-to-live
pdu_bytes = xxtob('81.05.0006' # bvlci
pdu_bytes = xtob('81.05.0006' # bvlci
'001e' # time-to-live
)
@ -187,7 +183,7 @@ class TestAnnexJCodec(unittest.TestCase):
if _debug: TestAnnexJCodec._debug("test_read_foreign_device_table")
# read returns an empty table
pdu_bytes = xxtob('81.06.0004')
pdu_bytes = xtob('81.06.0004')
self.request(ReadForeignDeviceTable())
self.indication(pduData=pdu_bytes)
@ -200,7 +196,7 @@ class TestAnnexJCodec(unittest.TestCase):
if _debug: TestAnnexJCodec._debug("test_read_foreign_device_table_ack")
# read returns an empty table
pdu_bytes = xxtob('81.07.0004')
pdu_bytes = xtob('81.07.0004')
self.request(ReadForeignDeviceTableAck([]))
self.indication(pduData=pdu_bytes)
@ -213,7 +209,7 @@ class TestAnnexJCodec(unittest.TestCase):
fdte.fdAddress = Address("192.168.0.10")
fdte.fdTTL = 30
fdte.fdRemain = 15
pdu_bytes = xxtob('81.07.000e' # bvlci
pdu_bytes = xtob('81.07.000e' # bvlci
'c0.a8.00.0a.ba.c0' # address
'001e.000f' # ttl and remaining
)
@ -230,7 +226,7 @@ class TestAnnexJCodec(unittest.TestCase):
# delete an element
addr = Address('192.168.0.11/24')
pdu_bytes = xxtob('81.08.000a' # bvlci
pdu_bytes = xtob('81.08.000a' # bvlci
'c0.a8.00.0b.ba.c0' # address of entry to be deleted
)
@ -245,8 +241,8 @@ class TestAnnexJCodec(unittest.TestCase):
if _debug: TestAnnexJCodec._debug("test_distribute_broadcast_to_network")
# read returns a table with an element
xpdu = xxtob('deadbeef')
pdu_bytes = xxtob('81.09.0008' # bvlci
xpdu = xtob('deadbeef')
pdu_bytes = xtob('81.09.0008' # bvlci
'deadbeef' # PDU to broadcast
)
@ -261,8 +257,8 @@ class TestAnnexJCodec(unittest.TestCase):
if _debug: TestAnnexJCodec._debug("test_original_unicast_npdu")
# read returns a table with an element
xpdu = xxtob('deadbeef')
pdu_bytes = xxtob('81.0a.0008' # bvlci
xpdu = xtob('deadbeef')
pdu_bytes = xtob('81.0a.0008' # bvlci
'deadbeef' # PDU being unicast
)
@ -277,8 +273,8 @@ class TestAnnexJCodec(unittest.TestCase):
if _debug: TestAnnexJCodec._debug("test_original_broadcast_npdu")
# read returns a table with an element
xpdu = xxtob('deadbeef')
pdu_bytes = xxtob('81.0b.0008' # bvlci
xpdu = xtob('deadbeef')
pdu_bytes = xtob('81.0b.0008' # bvlci
'deadbeef' # PDU being broadcast
)

View File

@ -24,10 +24,6 @@ _debug = 0
_log = ModuleLogger(globals())
# extended form of xtob that first removes whitespace and period seperators
xxtob = lambda s: xtob(''.join(s.split()).replace('.', ''))
#
# TNetwork
#
@ -119,10 +115,10 @@ class TestForeign(unittest.TestCase):
.success()
# sniffer pieces
registration_request = xxtob('81.05.0006' # bvlci
registration_request = xtob('81.05.0006' # bvlci
'001e' # time-to-live
)
registration_ack = xxtob('81.00.0006.0000') # simple ack
registration_ack = xtob('81.00.0006.0000') # simple ack
# remote sniffer sees registration
remote_sniffer.start_state.doc("1-1-0") \
@ -142,8 +138,8 @@ class TestForeign(unittest.TestCase):
.success()
# the tnode reads the registration table
read_fdt_request = xxtob('81.06.0004') # bvlci
read_fdt_ack = xxtob('81.07.000e' # read-ack
read_fdt_request = xtob('81.06.0004') # bvlci
read_fdt_ack = xtob('81.07.000e' # read-ack
'c0.a8.06.02.ba.c0 001e 0023' # address, ttl, remaining
)
@ -178,10 +174,10 @@ class TestForeign(unittest.TestCase):
tnet.append(remote_sniffer)
# sniffer pieces
registration_request = xxtob('81.05.0006' # bvlci
registration_request = xtob('81.05.0006' # bvlci
'000a' # time-to-live
)
registration_ack = xxtob('81.00.0006.0000') # simple ack
registration_ack = xtob('81.00.0006.0000') # simple ack
# remote sniffer sees registration
remote_sniffer.start_state.doc("2-1-0") \
@ -202,7 +198,7 @@ class TestForeign(unittest.TestCase):
tnet = TNetwork()
# make a PDU from node 1 to node 2
pdu_data = xxtob('dead.beef')
pdu_data = xtob('dead.beef')
pdu = PDU(pdu_data, source=tnet.fd.address, destination=tnet.bbmd.address)
if _debug: TestForeign._debug(" - pdu: %r", pdu)
@ -223,11 +219,11 @@ class TestForeign(unittest.TestCase):
tnet.append(remote_sniffer)
# sniffer pieces
registration_request = xxtob('81.05.0006' # bvlci
registration_request = xtob('81.05.0006' # bvlci
'003c' # time-to-live (60)
)
registration_ack = xxtob('81.00.0006.0000') # simple ack
unicast_pdu = xxtob('81.0a.0008' # original unicast bvlci
registration_ack = xtob('81.00.0006.0000') # simple ack
unicast_pdu = xtob('81.0a.0008' # original unicast bvlci
'dead.beef' # PDU being unicast
)
@ -250,7 +246,7 @@ class TestForeign(unittest.TestCase):
tnet = TNetwork()
# make a broadcast pdu
pdu_data = xxtob('dead.beef')
pdu_data = xtob('dead.beef')
pdu = PDU(pdu_data, destination=LocalBroadcast())
if _debug: TestForeign._debug(" - pdu: %r", pdu)
@ -280,11 +276,11 @@ class TestForeign(unittest.TestCase):
tnet.append(remote_sniffer)
# sniffer pieces
registration_request = xxtob('81.05.0006' # bvlci
registration_request = xtob('81.05.0006' # bvlci
'003c' # time-to-live (60)
)
registration_ack = xxtob('81.00.0006.0000') # simple ack
distribute_pdu = xxtob('81.09.0008' # bvlci
registration_ack = xtob('81.00.0006.0000') # simple ack
distribute_pdu = xtob('81.09.0008' # bvlci
'deadbeef' # PDU to broadcast
)

View File

@ -23,10 +23,6 @@ _debug = 0
_log = ModuleLogger(globals())
# extended form of xtob that first removes whitespace and period seperators
xxtob = lambda s: xtob(''.join(s.split()).replace('.', ''))
#
# TNetwork
#
@ -104,7 +100,7 @@ class TestSimple(unittest.TestCase):
tnet = TNetwork()
# make a PDU from node 1 to node 2
pdu_data = xxtob('dead.beef')
pdu_data = xtob('dead.beef')
pdu = PDU(pdu_data, source=tnet.td.address, destination=tnet.iut.address)
if _debug: TestSimple._debug(" - pdu: %r", pdu)
@ -116,7 +112,7 @@ class TestSimple(unittest.TestCase):
tnet.sniffer.start_state.receive(PDU,
pduSource=tnet.td.address.addrTuple,
pduDestination=tnet.iut.address.addrTuple,
pduData=xxtob('81.0a.0008' # original unicast bvlci
pduData=xtob('81.0a.0008' # original unicast bvlci
'deadbeef' # PDU being unicast
),
).timeout(1.0).success()
@ -132,7 +128,7 @@ class TestSimple(unittest.TestCase):
tnet = TNetwork()
# make a PDU from node 1 to node 2
pdu_data = xxtob('dead.beef')
pdu_data = xtob('dead.beef')
pdu = PDU(pdu_data, source=tnet.td.address, destination=LocalBroadcast())
if _debug: TestSimple._debug(" - pdu: %r", pdu)
@ -144,7 +140,7 @@ class TestSimple(unittest.TestCase):
tnet.sniffer.start_state.receive(PDU,
pduSource=tnet.td.address.addrTuple,
pduDestination=('192.168.4.255', 47808),
pduData=xxtob('81.0b.0008' # original broadcast bvlci
pduData=xtob('81.0b.0008' # original broadcast bvlci
'deadbeef' # PDU being unicast
),
).timeout(1.0).success()