1
0
mirror of https://github.com/JoelBender/bacpypes synced 2025-09-28 22:15:23 +08:00
bacpypes/tests/test_bvll/test_codec.py

291 lines
10 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Test BVLL Encoding and Decoding
-------------------------------
"""
import string
import unittest
from bacpypes.debugging import bacpypes_debugging, ModuleLogger, btox, xtob
from bacpypes.comm import bind
from bacpypes.pdu import PDU, Address, LocalBroadcast
from bacpypes.bvll import Result, WriteBroadcastDistributionTable, \
ReadBroadcastDistributionTable, ReadBroadcastDistributionTableAck, \
ForwardedNPDU, RegisterForeignDevice, ReadForeignDeviceTable, \
ReadForeignDeviceTableAck, FDTEntry, DeleteForeignDeviceTableEntry, \
DistributeBroadcastToNetwork, OriginalUnicastNPDU, \
OriginalBroadcastNPDU
from bacpypes.bvllservice import AnnexJCodec
from ..trapped_classes import TrappedClient, TrappedServer
from ..state_machine import match_pdu
# some debugging
_debug = 0
_log = ModuleLogger(globals())
# extended form of xtob that first removes whitespace and period seperators
xxtob = lambda s: xtob(''.join(s.split()).replace('.', ''))
@bacpypes_debugging
class TestAnnexJCodec(unittest.TestCase):
def setup_method(self, method):
"""This function is called before each test method is called as is
given a reference to the test method."""
if _debug: TestAnnexJCodec._debug("setup_method %r", method)
# minature trapped stack
self.client = TrappedClient()
self.codec = AnnexJCodec()
self.server = TrappedServer()
bind(self.client, self.codec, self.server)
def request(self, pdu):
"""Pass the PDU to the client to send down the stack."""
self.client.request(pdu)
def indication(self, pdu_type=None, **pdu_attrs):
"""Check what the server received."""
assert match_pdu(self.server.indication_received, pdu_type, **pdu_attrs)
def response(self, pdu):
"""Pass the PDU to the server to send up the stack."""
self.server.response(pdu)
def confirmation(self, pdu_type=None, **pdu_attrs):
"""Check what the client received."""
assert match_pdu(self.client.confirmation_received, pdu_type, **pdu_attrs)
def test_result(self):
"""Test the Result encoding and decoding."""
if _debug: TestAnnexJCodec._debug("test_codec_01")
# Request successful
pdu_bytes = xxtob('81.00.0006.0000')
self.request(Result(0))
self.indication(pduData=pdu_bytes)
self.response(PDU(pdu_bytes))
self.confirmation(Result, bvlciResultCode=0)
# Request error condition
pdu_bytes = xxtob('81.00.0006.0001')
self.request(Result(1))
self.indication(pduData=pdu_bytes)
self.response(PDU(pdu_bytes))
self.confirmation(Result, bvlciResultCode=1)
def test_write_broadcast_distribution_table(self):
"""Test the WriteBroadcastDistributionTable encoding and decoding."""
if _debug: TestAnnexJCodec._debug("test_write_broadcast_distribution_table")
# write an empty table
pdu_bytes = xxtob('81.01.0004')
self.request(WriteBroadcastDistributionTable([]))
self.indication(pduData=pdu_bytes)
self.response(PDU(pdu_bytes))
self.confirmation(WriteBroadcastDistributionTable, bvlciBDT=[])
# write a table with an element
addr = Address('192.168.0.254/24')
pdu_bytes = xxtob('81.01.000e'
'c0.a8.00.fe.ba.c0 ff.ff.ff.00' # address and mask
)
self.request(WriteBroadcastDistributionTable([addr]))
self.indication(pduData=pdu_bytes)
self.response(PDU(pdu_bytes))
self.confirmation(WriteBroadcastDistributionTable, bvlciBDT=[addr])
def test_read_broadcast_distribution_table(self):
"""Test the ReadBroadcastDistributionTable encoding and decoding."""
if _debug: TestAnnexJCodec._debug("test_read_broadcast_distribution_table")
# read the table
pdu_bytes = xxtob('81.02.0004')
self.request(ReadBroadcastDistributionTable())
self.indication(pduData=pdu_bytes)
self.response(PDU(pdu_bytes))
self.confirmation(ReadBroadcastDistributionTable)
def test_read_broadcast_distribution_table_ack(self):
"""Test the ReadBroadcastDistributionTableAck encoding and decoding."""
if _debug: TestAnnexJCodec._debug("test_read_broadcast_distribution_table_ack")
# read returns an empty table
pdu_bytes = xxtob('81.03.0004')
self.request(ReadBroadcastDistributionTableAck([]))
self.indication(pduData=pdu_bytes)
self.response(PDU(pdu_bytes))
self.confirmation(ReadBroadcastDistributionTableAck, bvlciBDT=[])
# read returns a table with an element
addr = Address('192.168.0.254/24')
pdu_bytes = xxtob('81.03.000e' # bvlci
'c0.a8.00.fe.ba.c0 ff.ff.ff.00' # address and mask
)
self.request(ReadBroadcastDistributionTableAck([addr]))
self.indication(pduData=pdu_bytes)
self.response(PDU(pdu_bytes))
self.confirmation(ReadBroadcastDistributionTableAck, bvlciBDT=[addr])
def test_forwarded_npdu(self):
"""Test the ForwardedNPDU encoding and decoding."""
if _debug: TestAnnexJCodec._debug("test_forwarded_npdu")
# read returns a table with an element
addr = Address('192.168.0.1')
xpdu = xxtob('deadbeef')
pdu_bytes = xxtob('81.04.000e' # bvlci
'c0.a8.00.01.ba.c0' # original source address
'deadbeef' # forwarded PDU
)
self.request(ForwardedNPDU(addr, xpdu))
self.indication(pduData=pdu_bytes)
self.response(PDU(pdu_bytes))
self.confirmation(ForwardedNPDU, bvlciAddress=addr, pduData=xpdu)
def test_register_foreign_device(self):
"""Test the RegisterForeignDevice encoding and decoding."""
if _debug: TestAnnexJCodec._debug("test_register_foreign_device")
# register as a foreign device with a 30 second time-to-live
pdu_bytes = xxtob('81.05.0006' # bvlci
'001e' # time-to-live
)
self.request(RegisterForeignDevice(30))
self.indication(pduData=pdu_bytes)
self.response(PDU(pdu_bytes))
self.confirmation(RegisterForeignDevice, bvlciTimeToLive=30)
def test_read_foreign_device_table(self):
"""Test the ReadForeignDeviceTable encoding and decoding."""
if _debug: TestAnnexJCodec._debug("test_read_foreign_device_table")
# read returns an empty table
pdu_bytes = xxtob('81.06.0004')
self.request(ReadForeignDeviceTable())
self.indication(pduData=pdu_bytes)
self.response(PDU(pdu_bytes))
self.confirmation(ReadForeignDeviceTable)
def test_read_foreign_device_table_ack(self):
"""Test the ReadForeignDeviceTableAck encoding and decoding."""
if _debug: TestAnnexJCodec._debug("test_read_foreign_device_table_ack")
# read returns an empty table
pdu_bytes = xxtob('81.07.0004')
self.request(ReadForeignDeviceTableAck([]))
self.indication(pduData=pdu_bytes)
self.response(PDU(pdu_bytes))
self.confirmation(ReadForeignDeviceTableAck, bvlciFDT=[])
# read returns a table with one entry
fdte = FDTEntry()
fdte.fdAddress = Address("192.168.0.10")
fdte.fdTTL = 30
fdte.fdRemain = 15
pdu_bytes = xxtob('81.07.000e' # bvlci
'c0.a8.00.0a.ba.c0' # address
'001e.000f' # ttl and remaining
)
self.request(ReadForeignDeviceTableAck([fdte]))
self.indication(pduData=pdu_bytes)
self.response(PDU(pdu_bytes))
self.confirmation(ReadForeignDeviceTableAck, bvlciFDT=[fdte])
def test_delete_foreign_device_table_entry(self):
"""Test the DeleteForeignDeviceTableEntry encoding and decoding."""
if _debug: TestAnnexJCodec._debug("test_delete_foreign_device_table_entry")
# delete an element
addr = Address('192.168.0.11/24')
pdu_bytes = xxtob('81.08.000a' # bvlci
'c0.a8.00.0b.ba.c0' # address of entry to be deleted
)
self.request(DeleteForeignDeviceTableEntry(addr))
self.indication(pduData=pdu_bytes)
self.response(PDU(pdu_bytes))
self.confirmation(DeleteForeignDeviceTableEntry, bvlciAddress=addr)
def test_distribute_broadcast_to_network(self):
"""Test the DistributeBroadcastToNetwork encoding and decoding."""
if _debug: TestAnnexJCodec._debug("test_distribute_broadcast_to_network")
# read returns a table with an element
xpdu = xxtob('deadbeef')
pdu_bytes = xxtob('81.09.0008' # bvlci
'deadbeef' # PDU to broadcast
)
self.request(DistributeBroadcastToNetwork(xpdu))
self.indication(pduData=pdu_bytes)
self.response(PDU(pdu_bytes))
self.confirmation(DistributeBroadcastToNetwork, pduData=xpdu)
def test_original_unicast_npdu(self):
"""Test the OriginalUnicastNPDU encoding and decoding."""
if _debug: TestAnnexJCodec._debug("test_original_unicast_npdu")
# read returns a table with an element
xpdu = xxtob('deadbeef')
pdu_bytes = xxtob('81.0a.0008' # bvlci
'deadbeef' # PDU being unicast
)
self.request(OriginalUnicastNPDU(xpdu))
self.indication(pduData=pdu_bytes)
self.response(PDU(pdu_bytes))
self.confirmation(OriginalUnicastNPDU, pduData=xpdu)
def test_original_broadcast_npdu(self):
"""Test the OriginalBroadcastNPDU encoding and decoding."""
if _debug: TestAnnexJCodec._debug("test_original_broadcast_npdu")
# read returns a table with an element
xpdu = xxtob('deadbeef')
pdu_bytes = xxtob('81.0b.0008' # bvlci
'deadbeef' # PDU being broadcast
)
self.request(OriginalBroadcastNPDU(xpdu))
self.indication(pduData=pdu_bytes)
self.response(PDU(pdu_bytes))
self.confirmation(OriginalBroadcastNPDU, pduData=xpdu)