mirror of
https://github.com/JoelBender/bacpypes
synced 2025-09-28 22:15:23 +08:00
200 lines
4.8 KiB
Python
Executable File
200 lines
4.8 KiB
Python
Executable File
#!/usr/bin/python
|
|
|
|
"""
|
|
BACpypes Test
|
|
-------------
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
|
|
from bacpypes.debugging import bacpypes_debugging, ModuleLogger, xtob
|
|
from bacpypes.consolelogging import ArgumentParser
|
|
|
|
from bacpypes.pdu import Address
|
|
from bacpypes.comm import bind
|
|
|
|
from bacpypes.apdu import APDU, ReadPropertyRequest, \
|
|
ComplexAckPDU, RejectPDU, AbortPDU
|
|
|
|
from bacpypes.app import LocalDeviceObject, Application
|
|
from bacpypes.appservice import ApplicationServiceAccessPoint, StateMachineAccessPoint
|
|
|
|
from tests.state_machine import Server, StateMachine
|
|
|
|
# some debugging
|
|
_debug = 0
|
|
_log = ModuleLogger(globals())
|
|
|
|
# defaults for testing
|
|
BACPYPES_TEST = ""
|
|
BACPYPES_TEST_OPTION = ""
|
|
|
|
# parsed test options
|
|
test_options = None
|
|
|
|
|
|
#
|
|
# TestApplication
|
|
#
|
|
|
|
@bacpypes_debugging
|
|
class TestApplication(Application):
|
|
|
|
def __init__(self, localDevice, localAddress, aseID=None):
|
|
if _debug: TestApplication._debug("__init__ %r %r aseID=%r", localDevice, localAddress, aseID)
|
|
Application.__init__(self, localDevice, localAddress, aseID)
|
|
|
|
#
|
|
# setUp
|
|
#
|
|
|
|
@bacpypes_debugging
|
|
def setUp(argv=None):
|
|
global test_options
|
|
|
|
# create an argument parser
|
|
parser = ArgumentParser(description=__doc__)
|
|
|
|
# add an option
|
|
parser.add_argument(
|
|
'--option', help="this is an option",
|
|
default=os.getenv("BACPYPES_TEST_OPTION") or BACPYPES_TEST_OPTION,
|
|
)
|
|
|
|
# get the debugging args and parse them
|
|
arg_str = os.getenv("BACPYPES_TEST") or BACPYPES_TEST
|
|
test_options = parser.parse_args(argv or arg_str.split())
|
|
|
|
if _debug: setUp._debug("setUp")
|
|
if _debug: setUp._debug(" - test_options: %r", test_options)
|
|
|
|
#
|
|
# tearDown
|
|
#
|
|
|
|
@bacpypes_debugging
|
|
def tearDown():
|
|
if _debug: tearDown._debug("tearDown")
|
|
|
|
|
|
#
|
|
# MatchingStateMachine
|
|
#
|
|
|
|
|
|
@bacpypes_debugging
|
|
class MatchingStateMachine(Server, StateMachine):
|
|
|
|
def __init__(self):
|
|
if _debug: MatchingStateMachine._debug("__init__")
|
|
|
|
Server.__init__(self)
|
|
StateMachine.__init__(self)
|
|
|
|
def send(self, pdu):
|
|
if _debug: MatchingStateMachine._debug("send %r", pdu)
|
|
self.response(pdu)
|
|
|
|
def indication(self, pdu):
|
|
if _debug: MatchingStateMachine._debug("indication %r", pdu)
|
|
self.receive(pdu)
|
|
|
|
def match_pdu(self, pdu, transition_pdu):
|
|
if _debug: MatchingStateMachine._debug("match_pdu %r %r", pdu, transition_pdu)
|
|
|
|
# instance types match is enough
|
|
is_instance = isinstance(pdu, transition_pdu)
|
|
if _debug: MatchingStateMachine._debug(" - is_instance: %r", is_instance)
|
|
|
|
return is_instance
|
|
|
|
#
|
|
# main
|
|
#
|
|
|
|
setUp(sys.argv[1:])
|
|
|
|
# make a device object
|
|
test_device = LocalDeviceObject(
|
|
objectName='test_device',
|
|
objectIdentifier=100,
|
|
maxApduLengthAccepted=1024,
|
|
segmentationSupported='segmentedBoth',
|
|
vendorIdentifier=15,
|
|
)
|
|
|
|
# make a test address
|
|
test_address = Address(1)
|
|
|
|
# create a client state machine, trapped server, and bind them together
|
|
test_application = TestApplication(test_device, test_address)
|
|
|
|
print("objects: " + str(test_application.objectIdentifier))
|
|
|
|
# get the services supported
|
|
services_supported = test_application.get_services_supported()
|
|
if _debug: _log.debug(" - services_supported: %r", services_supported)
|
|
|
|
# let the device object know
|
|
test_device.protocolServicesSupported = services_supported.value
|
|
|
|
# include a application decoder
|
|
test_asap = ApplicationServiceAccessPoint()
|
|
|
|
# pass the device object to the state machine access point so it
|
|
# can know if it should support segmentation
|
|
test_smap = StateMachineAccessPoint(test_device)
|
|
|
|
# state machine
|
|
test_server = MatchingStateMachine()
|
|
|
|
# bind everything together
|
|
bind(test_application, test_asap, test_smap, test_server)
|
|
|
|
# ==============================================================================
|
|
|
|
read_property_request = ReadPropertyRequest(
|
|
objectIdentifier=('device', 100),
|
|
propertyIdentifier='objectName',
|
|
)
|
|
read_property_request.pduSource = Address(2)
|
|
# read_property_request.pduDestination = Address(1)
|
|
read_property_request.apduInvokeID = 1
|
|
|
|
print("read_property_request")
|
|
read_property_request.debug_contents()
|
|
print("")
|
|
|
|
test_apdu = APDU()
|
|
read_property_request.encode(test_apdu)
|
|
|
|
print("test_apdu")
|
|
test_apdu.debug_contents()
|
|
print("")
|
|
|
|
if 0:
|
|
print("modify test_apdu")
|
|
test_apdu.pduData = test_apdu.pduData[:5]
|
|
test_apdu.debug_contents()
|
|
print("")
|
|
|
|
# make a send transition from start to success
|
|
start_state = test_server.start_state.doc("start_state")
|
|
response_state = start_state.send(test_apdu).doc("response_state")
|
|
success_state = response_state.receive(ComplexAckPDU).doc("success_state")
|
|
success_state.success()
|
|
|
|
# run the machine
|
|
print("running")
|
|
test_server.run()
|
|
print("")
|
|
|
|
# ==============================================================================
|
|
|
|
# check for success
|
|
assert not test_server.running
|
|
assert test_server.current_state.is_success_state
|
|
|
|
tearDown()
|