#!/urs/bin/python3 """ Test Segmentation """ import random import string import unittest from bacpypes.debugging import ModuleLogger, bacpypes_debugging from bacpypes.primitivedata import CharacterString from bacpypes.constructeddata import Any from bacpypes.comm import Client, bind from bacpypes.pdu import Address, LocalBroadcast from bacpypes.vlan import Network, Node from bacpypes.npdu import NPDU from bacpypes.apdu import ( APDU, apdu_types, ConfirmedPrivateTransferRequest, ConfirmedPrivateTransferACK, ) from bacpypes.app import Application from bacpypes.appservice import StateMachineAccessPoint, ApplicationServiceAccessPoint from bacpypes.netservice import NetworkServiceAccessPoint, NetworkServiceElement from bacpypes.local.device import LocalDeviceObject from ..state_machine import StateMachine, StateMachineGroup, TrafficLog from ..time_machine import reset_time_machine, run_time_machine # some debugging _debug = 0 _log = ModuleLogger(globals()) # # ApplicationNetwork # @bacpypes_debugging class ApplicationNetwork(StateMachineGroup): def __init__(self, td_device_object, iut_device_object): if _debug: ApplicationNetwork._debug("__init__ %r %r", td_device_object, iut_device_object) StateMachineGroup.__init__(self) # reset the time machine reset_time_machine() if _debug: ApplicationNetwork._debug(" - time machine reset") # create a traffic log self.traffic_log = TrafficLog() # make a little LAN self.vlan = Network(broadcast_address=LocalBroadcast()) self.vlan.traffic_log = self.traffic_log # sniffer self.sniffer = SnifferNode(self.vlan) # test device self.td = ApplicationStateMachine(td_device_object, self.vlan) self.append(self.td) # implementation under test self.iut = ApplicationStateMachine(iut_device_object, self.vlan) self.append(self.iut) def run(self, time_limit=60.0): if _debug: ApplicationNetwork._debug("run %r", time_limit) # run the group super(ApplicationNetwork, self).run() if _debug: ApplicationNetwork._debug(" - group running") # run it for some time run_time_machine(time_limit) if _debug: ApplicationNetwork._debug(" - time machine finished") for state_machine in self.state_machines: ApplicationNetwork._debug(" - machine: %r", state_machine) for direction, pdu in state_machine.transaction_log: ApplicationNetwork._debug(" %s %s", direction, str(pdu)) # traffic log has what was processed on each vlan self.traffic_log.dump(ApplicationNetwork._debug) # check for success all_success, some_failed = super(ApplicationNetwork, self).check_for_success() ApplicationNetwork._debug(" - all_success, some_failed: %r, %r", all_success, some_failed) assert all_success # # SnifferNode # @bacpypes_debugging class SnifferNode(Client): ### , StateMachine): def __init__(self, vlan): if _debug: SnifferNode._debug("__init__ %r", vlan) # save the name and give it a blank address self.name = "sniffer" self.address = Address() # continue with initialization Client.__init__(self) ### StateMachine.__init__(self) # create a promiscuous node, added to the network self.node = Node(self.address, vlan, promiscuous=True) if _debug: SnifferNode._debug(" - node: %r", self.node) # bind this to the node bind(self, self.node) def send(self, pdu): if _debug: SnifferNode._debug("send(%s) %r", self.name, pdu) raise RuntimeError("sniffers don't send") def confirmation(self, pdu): if _debug: SnifferNode._debug("confirmation(%s) %r", self.name, pdu) # it's an NPDU npdu = NPDU() npdu.decode(pdu) # decode as a generic APDU apdu = APDU() apdu.decode(npdu) # "lift" the source and destination address if npdu.npduSADR: apdu.pduSource = npdu.npduSADR else: apdu.pduSource = npdu.pduSource if npdu.npduDADR: apdu.pduDestination = npdu.npduDADR else: apdu.pduDestination = npdu.pduDestination # make a more focused interpretation atype = apdu_types.get(apdu.apduType) if _debug: SnifferNode._debug(" - atype: %r", atype) xpdu = apdu apdu = atype() apdu.decode(xpdu) print(repr(apdu)) apdu.debug_contents() print("") # # ApplicationStateMachine # @bacpypes_debugging class ApplicationStateMachine(Application, StateMachine): def __init__(self, localDevice, vlan): if _debug: ApplicationStateMachine._debug("__init__ %r %r", localDevice, vlan) # build an address and save it self.address = Address(localDevice.objectIdentifier[1]) if _debug: ApplicationStateMachine._debug(" - address: %r", self.address) # continue with initialization Application.__init__(self, localDevice) StateMachine.__init__(self, name=localDevice.objectName) # include a application decoder self.asap = ApplicationServiceAccessPoint() # pass the device object to the state machine access point so it # can know if it should support segmentation self.smap = StateMachineAccessPoint(localDevice) # the segmentation state machines need access to the same device # information cache as the application self.smap.deviceInfoCache = self.deviceInfoCache # a network service access point will be needed self.nsap = NetworkServiceAccessPoint() # give the NSAP a generic network layer service element self.nse = NetworkServiceElement() bind(self.nse, self.nsap) # bind the top layers bind(self, self.asap, self.smap, self.nsap) # create a node, added to the network self.node = Node(self.address, vlan) # bind the network service to the node, no network number self.nsap.bind(self.node) # placeholder for the result block self.confirmed_private_result = None def send(self, apdu): if _debug: ApplicationStateMachine._debug("send(%s) %r", self.name, apdu) # send the apdu down the stack self.request(apdu) def indication(self, apdu): if _debug: ApplicationStateMachine._debug("indication(%s) %r", self.name, apdu) # let the state machine know the request was received self.receive(apdu) # allow the application to process it super(ApplicationStateMachine, self).indication(apdu) def confirmation(self, apdu): if _debug: ApplicationStateMachine._debug("confirmation(%s) %r", self.name, apdu) # forward the confirmation to the state machine self.receive(apdu) def do_ConfirmedPrivateTransferRequest(self, apdu): if _debug: ApplicationStateMachine._debug("do_ConfirmedPrivateTransferRequest(%s) %r", self.name, apdu) # simple ack xapdu = ConfirmedPrivateTransferACK(context=apdu) xapdu.vendorID = 999 xapdu.serviceNumber = 1 xapdu.resultBlock = self.confirmed_private_result if _debug: ApplicationStateMachine._debug(" - xapdu: %r", xapdu) # send the response back self.response(xapdu) # # SegmentationTest # @bacpypes_debugging def SegmentationTest(prefix, c_len, s_len): if _debug: SegmentationTest._debug("SegmentationTest %r %r %r", prefix, c_len, s_len) # :-/ lowercase_letters = getattr(string, 'lowercase', None) or getattr(string, 'ascii_lowercase', None) # client device object td_device_object = LocalDeviceObject( objectName="td", objectIdentifier=("device", 10), maxApduLengthAccepted=206, segmentationSupported='segmentedBoth', maxSegmentsAccepted=4, vendorIdentifier=999, ) # server device object iut_device_object = LocalDeviceObject( objectName="iut", objectIdentifier=("device", 20), maxApduLengthAccepted=206, segmentationSupported='segmentedBoth', maxSegmentsAccepted=99, vendorIdentifier=999, ) # create a network anet = ApplicationNetwork(td_device_object, iut_device_object) # network settings c_ndpu_len = 50 s_ndpu_len = 50 # tell the device info cache of the client about the server if 0: iut_device_info = anet.td.deviceInfoCache.get_device_info(anet.iut.address) # update the rest of the values iut_device_info.maxApduLengthAccepted = iut_device_object.maxApduLengthAccepted iut_device_info.segmentationSupported = iut_device_object.segmentationSupported iut_device_info.vendorID = iut_device_object.vendorIdentifier iut_device_info.maxSegmentsAccepted = iut_device_object.maxSegmentsAccepted iut_device_info.maxNpduLength = s_ndpu_len # tell the device info cache of the server device about the client if 0: td_device_info = anet.iut.deviceInfoCache.get_device_info(anet.td.address) # update the rest of the values td_device_info.maxApduLengthAccepted = td_device_object.maxApduLengthAccepted td_device_info.segmentationSupported = td_device_object.segmentationSupported td_device_info.vendorID = td_device_object.vendorIdentifier td_device_info.maxSegmentsAccepted = td_device_object.maxSegmentsAccepted td_device_info.maxNpduLength = c_ndpu_len # build a request string if c_len: request_string = Any( CharacterString( ''.join(random.choice(lowercase_letters) for _ in range(c_len)) ) ) else: request_string = None # response string is stuffed into the server if s_len: anet.iut.confirmed_private_result = Any( CharacterString( ''.join(random.choice(lowercase_letters) for _ in range(s_len)) ) ) else: anet.iut.confirmed_private_result = None # send the request, get it acked anet.td.start_state.doc(prefix + "-0") \ .send(ConfirmedPrivateTransferRequest( vendorID=999, serviceNumber=1, serviceParameters=request_string, destination=anet.iut.address, )).doc(prefix + "-1") \ .receive(ConfirmedPrivateTransferACK).doc(prefix + "-2") \ .success() # no IUT application layer matching anet.iut.start_state.success() # run the group anet.run() @bacpypes_debugging class TestSegmentation(unittest.TestCase): def test_1(self): if _debug: TestSegmentation._debug("test_1") SegmentationTest("7-1", 0, 0) def test_2(self): if _debug: TestSegmentation._debug("test_2") SegmentationTest("7-2", 10, 0) def test_3(self): if _debug: TestSegmentation._debug("test_3") SegmentationTest("7-3", 100, 0) def test_4(self): if _debug: TestSegmentation._debug("test_4") SegmentationTest("7-4", 200, 0) def test_5(self): if _debug: TestSegmentation._debug("test_5") SegmentationTest("7-5", 0, 10) def test_6(self): if _debug: TestSegmentation._debug("test_6") SegmentationTest("7-6", 0, 200) def test_7(self): if _debug: TestSegmentation._debug("test_7") SegmentationTest("7-7", 300, 0) def test_8(self): if _debug: TestSegmentation._debug("test_8") SegmentationTest("7-8", 300, 300) def test_9(self): if _debug: TestSegmentation._debug("test_9") SegmentationTest("7-9", 600, 600)