mirror of
https://github.com/JoelBender/bacpypes
synced 2025-09-28 22:15:23 +08:00
rename these so they dont match test*.py or *test.py
This commit is contained in:
parent
c123a2707b
commit
42fc5cde0b
151
sandbox/i_am_reject_test_x.py
Executable file
151
sandbox/i_am_reject_test_x.py
Executable file
|
@ -0,0 +1,151 @@
|
|||
#!/usr/bin/python
|
||||
|
||||
"""
|
||||
BACpypes Test
|
||||
-------------
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
from bacpypes.debugging import bacpypes_debugging, ModuleLogger, xtob
|
||||
from bacpypes.consolelogging import ArgumentParser
|
||||
|
||||
from bacpypes.pdu import Address
|
||||
from bacpypes.comm import bind
|
||||
|
||||
from bacpypes.apdu import APDU, IAmRequest
|
||||
|
||||
from bacpypes.app import LocalDeviceObject, Application
|
||||
from bacpypes.appservice import ApplicationServiceAccessPoint, StateMachineAccessPoint
|
||||
|
||||
from tests.state_machine import ServerStateMachine
|
||||
|
||||
# some debugging
|
||||
_debug = 0
|
||||
_log = ModuleLogger(globals())
|
||||
|
||||
# defaults for testing
|
||||
BACPYPES_TEST = ""
|
||||
BACPYPES_TEST_OPTION = ""
|
||||
|
||||
# parsed test options
|
||||
test_options = None
|
||||
|
||||
|
||||
#
|
||||
# TestApplication
|
||||
#
|
||||
|
||||
@bacpypes_debugging
|
||||
class TestApplication(Application):
|
||||
|
||||
def __init__(self, localDevice, localAddress, aseID=None):
|
||||
if _debug: TestApplication._debug("__init__ %r %r aseID=%r", localDevice, localAddress, aseID)
|
||||
Application.__init__(self, localDevice, localAddress, aseID)
|
||||
|
||||
#
|
||||
# setUp
|
||||
#
|
||||
|
||||
@bacpypes_debugging
|
||||
def setUp(argv=None):
|
||||
global test_options
|
||||
|
||||
# create an argument parser
|
||||
parser = ArgumentParser(description=__doc__)
|
||||
|
||||
# add an option
|
||||
parser.add_argument(
|
||||
'--option', help="this is an option",
|
||||
default=os.getenv("BACPYPES_TEST_OPTION") or BACPYPES_TEST_OPTION,
|
||||
)
|
||||
|
||||
# get the debugging args and parse them
|
||||
arg_str = os.getenv("BACPYPES_TEST") or BACPYPES_TEST
|
||||
test_options = parser.parse_args(argv or arg_str.split())
|
||||
|
||||
if _debug: setUp._debug("setUp")
|
||||
if _debug: setUp._debug(" - test_options: %r", test_options)
|
||||
|
||||
#
|
||||
# tearDown
|
||||
#
|
||||
|
||||
@bacpypes_debugging
|
||||
def tearDown():
|
||||
if _debug: tearDown._debug("tearDown")
|
||||
|
||||
#
|
||||
# main
|
||||
#
|
||||
|
||||
setUp(sys.argv[1:])
|
||||
|
||||
# make a device object
|
||||
test_device = LocalDeviceObject(
|
||||
objectName='test_device',
|
||||
objectIdentifier=599,
|
||||
maxApduLengthAccepted=1024,
|
||||
segmentationSupported='segmentedBoth',
|
||||
vendorIdentifier=15,
|
||||
)
|
||||
|
||||
# make a test address
|
||||
test_address = Address(1)
|
||||
|
||||
# create a client state machine, trapped server, and bind them together
|
||||
test_application = TestApplication(test_device, test_address)
|
||||
|
||||
# include a application decoder
|
||||
test_asap = ApplicationServiceAccessPoint()
|
||||
|
||||
# pass the device object to the state machine access point so it
|
||||
# can know if it should support segmentation
|
||||
test_smap = StateMachineAccessPoint(test_device)
|
||||
|
||||
# state machine
|
||||
test_server = ServerStateMachine()
|
||||
|
||||
# bind everything together
|
||||
bind(test_application, test_asap, test_smap, test_server)
|
||||
|
||||
# ==============================================================================
|
||||
|
||||
i_am_request = IAmRequest(
|
||||
iAmDeviceIdentifier=('device', 100),
|
||||
maxAPDULengthAccepted=1024,
|
||||
segmentationSupported='segmentedBoth',
|
||||
vendorID=15,
|
||||
)
|
||||
print("i_am_request")
|
||||
i_am_request.debug_contents()
|
||||
print("")
|
||||
|
||||
test_apdu = APDU()
|
||||
i_am_request.encode(test_apdu)
|
||||
|
||||
print("test_apdu")
|
||||
test_apdu.debug_contents()
|
||||
print("")
|
||||
|
||||
print("modify test_apdu")
|
||||
test_apdu.pduData = test_apdu.pduData[5:]
|
||||
test_apdu.debug_contents()
|
||||
print("")
|
||||
|
||||
# make a send transition from start to success
|
||||
test_server.start_state.send(test_apdu).success()
|
||||
|
||||
# run the machine
|
||||
print("running")
|
||||
test_server.run()
|
||||
print("")
|
||||
|
||||
# ==============================================================================
|
||||
|
||||
# check for success
|
||||
assert not test_server.running
|
||||
assert test_server.current_state.is_success_state
|
||||
|
||||
tearDown()
|
199
sandbox/read_property_reject_test_x.py
Executable file
199
sandbox/read_property_reject_test_x.py
Executable file
|
@ -0,0 +1,199 @@
|
|||
#!/usr/bin/python
|
||||
|
||||
"""
|
||||
BACpypes Test
|
||||
-------------
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
from bacpypes.debugging import bacpypes_debugging, ModuleLogger, xtob
|
||||
from bacpypes.consolelogging import ArgumentParser
|
||||
|
||||
from bacpypes.pdu import Address
|
||||
from bacpypes.comm import bind
|
||||
|
||||
from bacpypes.apdu import APDU, ReadPropertyRequest, \
|
||||
ComplexAckPDU, RejectPDU, AbortPDU
|
||||
|
||||
from bacpypes.app import LocalDeviceObject, Application
|
||||
from bacpypes.appservice import ApplicationServiceAccessPoint, StateMachineAccessPoint
|
||||
|
||||
from tests.state_machine import Server, StateMachine
|
||||
|
||||
# some debugging
|
||||
_debug = 0
|
||||
_log = ModuleLogger(globals())
|
||||
|
||||
# defaults for testing
|
||||
BACPYPES_TEST = ""
|
||||
BACPYPES_TEST_OPTION = ""
|
||||
|
||||
# parsed test options
|
||||
test_options = None
|
||||
|
||||
|
||||
#
|
||||
# TestApplication
|
||||
#
|
||||
|
||||
@bacpypes_debugging
|
||||
class TestApplication(Application):
|
||||
|
||||
def __init__(self, localDevice, localAddress, aseID=None):
|
||||
if _debug: TestApplication._debug("__init__ %r %r aseID=%r", localDevice, localAddress, aseID)
|
||||
Application.__init__(self, localDevice, localAddress, aseID)
|
||||
|
||||
#
|
||||
# setUp
|
||||
#
|
||||
|
||||
@bacpypes_debugging
|
||||
def setUp(argv=None):
|
||||
global test_options
|
||||
|
||||
# create an argument parser
|
||||
parser = ArgumentParser(description=__doc__)
|
||||
|
||||
# add an option
|
||||
parser.add_argument(
|
||||
'--option', help="this is an option",
|
||||
default=os.getenv("BACPYPES_TEST_OPTION") or BACPYPES_TEST_OPTION,
|
||||
)
|
||||
|
||||
# get the debugging args and parse them
|
||||
arg_str = os.getenv("BACPYPES_TEST") or BACPYPES_TEST
|
||||
test_options = parser.parse_args(argv or arg_str.split())
|
||||
|
||||
if _debug: setUp._debug("setUp")
|
||||
if _debug: setUp._debug(" - test_options: %r", test_options)
|
||||
|
||||
#
|
||||
# tearDown
|
||||
#
|
||||
|
||||
@bacpypes_debugging
|
||||
def tearDown():
|
||||
if _debug: tearDown._debug("tearDown")
|
||||
|
||||
|
||||
#
|
||||
# MatchingStateMachine
|
||||
#
|
||||
|
||||
|
||||
@bacpypes_debugging
|
||||
class MatchingStateMachine(Server, StateMachine):
|
||||
|
||||
def __init__(self):
|
||||
if _debug: MatchingStateMachine._debug("__init__")
|
||||
|
||||
Server.__init__(self)
|
||||
StateMachine.__init__(self)
|
||||
|
||||
def send(self, pdu):
|
||||
if _debug: MatchingStateMachine._debug("send %r", pdu)
|
||||
self.response(pdu)
|
||||
|
||||
def indication(self, pdu):
|
||||
if _debug: MatchingStateMachine._debug("indication %r", pdu)
|
||||
self.receive(pdu)
|
||||
|
||||
def match_pdu(self, pdu, transition_pdu):
|
||||
if _debug: MatchingStateMachine._debug("match_pdu %r %r", pdu, transition_pdu)
|
||||
|
||||
# instance types match is enough
|
||||
is_instance = isinstance(pdu, transition_pdu)
|
||||
if _debug: MatchingStateMachine._debug(" - is_instance: %r", is_instance)
|
||||
|
||||
return is_instance
|
||||
|
||||
#
|
||||
# main
|
||||
#
|
||||
|
||||
setUp(sys.argv[1:])
|
||||
|
||||
# make a device object
|
||||
test_device = LocalDeviceObject(
|
||||
objectName='test_device',
|
||||
objectIdentifier=100,
|
||||
maxApduLengthAccepted=1024,
|
||||
segmentationSupported='segmentedBoth',
|
||||
vendorIdentifier=15,
|
||||
)
|
||||
|
||||
# make a test address
|
||||
test_address = Address(1)
|
||||
|
||||
# create a client state machine, trapped server, and bind them together
|
||||
test_application = TestApplication(test_device, test_address)
|
||||
|
||||
print("objects: " + str(test_application.objectIdentifier))
|
||||
|
||||
# get the services supported
|
||||
services_supported = test_application.get_services_supported()
|
||||
if _debug: _log.debug(" - services_supported: %r", services_supported)
|
||||
|
||||
# let the device object know
|
||||
test_device.protocolServicesSupported = services_supported.value
|
||||
|
||||
# include a application decoder
|
||||
test_asap = ApplicationServiceAccessPoint()
|
||||
|
||||
# pass the device object to the state machine access point so it
|
||||
# can know if it should support segmentation
|
||||
test_smap = StateMachineAccessPoint(test_device)
|
||||
|
||||
# state machine
|
||||
test_server = MatchingStateMachine()
|
||||
|
||||
# bind everything together
|
||||
bind(test_application, test_asap, test_smap, test_server)
|
||||
|
||||
# ==============================================================================
|
||||
|
||||
read_property_request = ReadPropertyRequest(
|
||||
objectIdentifier=('device', 100),
|
||||
propertyIdentifier='objectName',
|
||||
)
|
||||
read_property_request.pduSource = Address(2)
|
||||
# read_property_request.pduDestination = Address(1)
|
||||
read_property_request.apduInvokeID = 1
|
||||
|
||||
print("read_property_request")
|
||||
read_property_request.debug_contents()
|
||||
print("")
|
||||
|
||||
test_apdu = APDU()
|
||||
read_property_request.encode(test_apdu)
|
||||
|
||||
print("test_apdu")
|
||||
test_apdu.debug_contents()
|
||||
print("")
|
||||
|
||||
if 0:
|
||||
print("modify test_apdu")
|
||||
test_apdu.pduData = test_apdu.pduData[:5]
|
||||
test_apdu.debug_contents()
|
||||
print("")
|
||||
|
||||
# make a send transition from start to success
|
||||
start_state = test_server.start_state.doc("start_state")
|
||||
response_state = start_state.send(test_apdu).doc("response_state")
|
||||
success_state = response_state.receive(ComplexAckPDU).doc("success_state")
|
||||
success_state.success()
|
||||
|
||||
# run the machine
|
||||
print("running")
|
||||
test_server.run()
|
||||
print("")
|
||||
|
||||
# ==============================================================================
|
||||
|
||||
# check for success
|
||||
assert not test_server.running
|
||||
assert test_server.current_state.is_success_state
|
||||
|
||||
tearDown()
|
150
sandbox/who_is_reject_test_x.py
Executable file
150
sandbox/who_is_reject_test_x.py
Executable file
|
@ -0,0 +1,150 @@
|
|||
#!/usr/bin/python
|
||||
|
||||
"""
|
||||
BACpypes Test
|
||||
-------------
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
from bacpypes.debugging import bacpypes_debugging, ModuleLogger, xtob
|
||||
from bacpypes.consolelogging import ArgumentParser
|
||||
|
||||
from bacpypes.pdu import Address
|
||||
from bacpypes.comm import bind
|
||||
|
||||
from bacpypes.apdu import APDU, WhoIsRequest
|
||||
|
||||
from bacpypes.app import LocalDeviceObject, Application
|
||||
from bacpypes.appservice import ApplicationServiceAccessPoint, StateMachineAccessPoint
|
||||
|
||||
from tests.state_machine import ServerStateMachine
|
||||
|
||||
# some debugging
|
||||
_debug = 0
|
||||
_log = ModuleLogger(globals())
|
||||
|
||||
# defaults for testing
|
||||
BACPYPES_TEST = ""
|
||||
BACPYPES_TEST_OPTION = ""
|
||||
|
||||
# parsed test options
|
||||
test_options = None
|
||||
|
||||
|
||||
#
|
||||
# TestApplication
|
||||
#
|
||||
|
||||
@bacpypes_debugging
|
||||
class TestApplication(Application):
|
||||
|
||||
def __init__(self, localDevice, localAddress, aseID=None):
|
||||
if _debug: TestApplication._debug("__init__ %r %r aseID=%r", localDevice, localAddress, aseID)
|
||||
Application.__init__(self, localDevice, localAddress, aseID)
|
||||
|
||||
#
|
||||
# setUp
|
||||
#
|
||||
|
||||
@bacpypes_debugging
|
||||
def setUp(argv=None):
|
||||
global test_options
|
||||
|
||||
# create an argument parser
|
||||
parser = ArgumentParser(description=__doc__)
|
||||
|
||||
# add an option
|
||||
parser.add_argument(
|
||||
'--option', help="this is an option",
|
||||
default=os.getenv("BACPYPES_TEST_OPTION") or BACPYPES_TEST_OPTION,
|
||||
)
|
||||
|
||||
# get the debugging args and parse them
|
||||
arg_str = os.getenv("BACPYPES_TEST") or BACPYPES_TEST
|
||||
test_options = parser.parse_args(argv or arg_str.split())
|
||||
|
||||
if _debug: setUp._debug("setUp")
|
||||
if _debug: setUp._debug(" - test_options: %r", test_options)
|
||||
|
||||
#
|
||||
# tearDown
|
||||
#
|
||||
|
||||
@bacpypes_debugging
|
||||
def tearDown():
|
||||
if _debug: tearDown._debug("tearDown")
|
||||
|
||||
#
|
||||
# main
|
||||
#
|
||||
|
||||
setUp(sys.argv[1:])
|
||||
|
||||
# make a device object
|
||||
test_device = LocalDeviceObject(
|
||||
objectName='test_device',
|
||||
objectIdentifier=100,
|
||||
maxApduLengthAccepted=1024,
|
||||
segmentationSupported='segmentedBoth',
|
||||
vendorIdentifier=15,
|
||||
)
|
||||
|
||||
# make a test address
|
||||
test_address = Address(1)
|
||||
|
||||
# create a client state machine, trapped server, and bind them together
|
||||
test_application = TestApplication(test_device, test_address)
|
||||
|
||||
# include a application decoder
|
||||
test_asap = ApplicationServiceAccessPoint()
|
||||
|
||||
# pass the device object to the state machine access point so it
|
||||
# can know if it should support segmentation
|
||||
test_smap = StateMachineAccessPoint(test_device)
|
||||
|
||||
# state machine
|
||||
test_server = ServerStateMachine()
|
||||
|
||||
# bind everything together
|
||||
bind(test_application, test_asap, test_smap, test_server)
|
||||
|
||||
# ==============================================================================
|
||||
|
||||
who_is_request = WhoIsRequest(
|
||||
deviceInstanceRangeLowLimit=0,
|
||||
deviceInstanceRangeHighLimit=4194303,
|
||||
)
|
||||
print("who_is_request")
|
||||
who_is_request.debug_contents()
|
||||
print("")
|
||||
|
||||
test_apdu = APDU()
|
||||
who_is_request.encode(test_apdu)
|
||||
|
||||
print("test_apdu")
|
||||
test_apdu.debug_contents()
|
||||
print("")
|
||||
|
||||
print("modify test_apdu")
|
||||
test_apdu.pduData = test_apdu.pduData[:-1]
|
||||
# test_apdu.pduData = xtob('7509006869207468657265') # CharacterString("hi there")
|
||||
test_apdu.debug_contents()
|
||||
print("")
|
||||
|
||||
# make a send transition from start to success
|
||||
test_server.start_state.send(test_apdu).success()
|
||||
|
||||
# run the machine
|
||||
print("running")
|
||||
test_server.run()
|
||||
print("")
|
||||
|
||||
# ==============================================================================
|
||||
|
||||
# check for success
|
||||
assert not test_server.running
|
||||
assert test_server.current_state.is_success_state
|
||||
|
||||
tearDown()
|
103
sandbox/xtest_io_client.py
Normal file
103
sandbox/xtest_io_client.py
Normal file
|
@ -0,0 +1,103 @@
|
|||
#!/usr/bin/python
|
||||
|
||||
"""
|
||||
test_io_client
|
||||
"""
|
||||
|
||||
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
|
||||
|
||||
from bacpypes.console import ConsoleClient
|
||||
from bacpypes.consolelogging import ArgumentParser
|
||||
from bacpypes.comm import PDU, Server, bind
|
||||
|
||||
from bacpypes.core import run
|
||||
|
||||
from io import IOProxy, IOCB, COMPLETED, ABORTED
|
||||
|
||||
# some debugging
|
||||
_debug = 0
|
||||
_log = ModuleLogger(globals())
|
||||
|
||||
# defaults
|
||||
SERVER = "127.0.0.1"
|
||||
CONTROLLER = "test"
|
||||
|
||||
# globals
|
||||
testServer = None
|
||||
|
||||
#
|
||||
# MiddleMan
|
||||
#
|
||||
|
||||
@bacpypes_debugging
|
||||
class MiddleMan(Server):
|
||||
|
||||
def indication(self, pdu):
|
||||
MiddleMan._debug('indication %r', pdu)
|
||||
|
||||
try:
|
||||
iocb = IOCB(int(pdu.pduData))
|
||||
except:
|
||||
print "Integer please."
|
||||
return
|
||||
|
||||
iocb.set_timeout(5)
|
||||
iocb.add_callback(self.complete)
|
||||
|
||||
# send the request
|
||||
testServer.request_io(iocb)
|
||||
|
||||
def complete(self, iocb):
|
||||
MiddleMan._debug('complete %r', iocb)
|
||||
|
||||
# if this has completed successfully, pass it up
|
||||
if iocb.ioState == COMPLETED:
|
||||
self.response(PDU(str(iocb.ioResponse) + '\n'))
|
||||
|
||||
# if this aborted, pass that up too
|
||||
elif iocb.ioState == ABORTED:
|
||||
self.response(PDU(repr(iocb.ioError) + '\n'))
|
||||
|
||||
else:
|
||||
raise RuntimeError, "invalid state: %r" % (iocb.ioState,)
|
||||
|
||||
#
|
||||
# __main__
|
||||
#
|
||||
|
||||
try:
|
||||
# create a parser
|
||||
parser = ArgumentParser(description=__doc__)
|
||||
|
||||
# add an option to pick a server
|
||||
parser.add_argument('--server',
|
||||
help="server name or address",
|
||||
default=SERVER,
|
||||
)
|
||||
|
||||
# add an option to pick a controller
|
||||
parser.add_argument('--controller',
|
||||
help="controller name",
|
||||
default=CONTROLLER,
|
||||
)
|
||||
|
||||
# parse the command line arguments
|
||||
args = parser.parse_args()
|
||||
|
||||
if _debug: _log.debug("initialization")
|
||||
if _debug: _log.debug(" - args: %r", args)
|
||||
|
||||
# create a proxy for the real server
|
||||
testServer = IOProxy(args.controller, args.server)
|
||||
|
||||
console = ConsoleClient()
|
||||
middleMan = MiddleMan()
|
||||
bind(console, middleMan)
|
||||
|
||||
_log.debug("running")
|
||||
run()
|
||||
|
||||
except Exception, e:
|
||||
_log.exception("an error has occurred: %s", e)
|
||||
finally:
|
||||
_log.debug("finally")
|
76
sandbox/xtest_io_server.py
Normal file
76
sandbox/xtest_io_server.py
Normal file
|
@ -0,0 +1,76 @@
|
|||
#!/usr/bin/python
|
||||
|
||||
"""
|
||||
Test IO Server
|
||||
"""
|
||||
|
||||
import random
|
||||
|
||||
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
|
||||
from bacpypes.consolelogging import ArgumentParser
|
||||
|
||||
from bacpypes.core import run
|
||||
|
||||
from io import IOController, IOServer
|
||||
|
||||
# some debugging
|
||||
_debug = 0
|
||||
_log = ModuleLogger(globals())
|
||||
|
||||
# defaults
|
||||
CONTROLLER = "test"
|
||||
|
||||
#
|
||||
# TestController
|
||||
#
|
||||
|
||||
@bacpypes_debugging
|
||||
class TestController(IOController):
|
||||
|
||||
def __init__(self, name):
|
||||
if _debug: TestController._debug('__init__')
|
||||
IOController.__init__(self, name)
|
||||
|
||||
def process_io(self, iocb):
|
||||
if _debug: TestController._debug('process_io %r', iocb)
|
||||
|
||||
# some random data
|
||||
rslt = random.random() * iocb.args[0]
|
||||
|
||||
# send back the result
|
||||
self.complete_io(iocb, rslt)
|
||||
|
||||
#
|
||||
# __main__
|
||||
#
|
||||
|
||||
try:
|
||||
# create a parser
|
||||
parser = ArgumentParser(description=__doc__)
|
||||
|
||||
# add an option to pick a controller
|
||||
parser.add_argument('--controller',
|
||||
help="controller name",
|
||||
default=CONTROLLER,
|
||||
)
|
||||
|
||||
# parse the command line arguments
|
||||
args = parser.parse_args()
|
||||
|
||||
if _debug: _log.debug("initialization")
|
||||
if _debug: _log.debug(" - args: %r", args)
|
||||
|
||||
# this is an IO server
|
||||
IOServer()
|
||||
|
||||
# create a test controller
|
||||
TestController(args.controller)
|
||||
|
||||
_log.debug("running")
|
||||
run()
|
||||
|
||||
except Exception, e:
|
||||
_log.exception("an error has occurred: %s", e)
|
||||
finally:
|
||||
_log.debug("finally")
|
||||
|
313
sandbox/xtest_issue_45.py
Executable file
313
sandbox/xtest_issue_45.py
Executable file
|
@ -0,0 +1,313 @@
|
|||
#!/usr/bin/python
|
||||
|
||||
"""
|
||||
test_issue_45
|
||||
|
||||
This sample application builds a VLAN with two application layer nodes and
|
||||
uses a console prompt to send packets.
|
||||
"""
|
||||
|
||||
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
|
||||
from bacpypes.consolelogging import ArgumentParser
|
||||
from bacpypes.consolecmd import ConsoleCmd
|
||||
|
||||
from bacpypes.core import run
|
||||
from bacpypes.comm import bind
|
||||
|
||||
from bacpypes.pdu import Address, GlobalBroadcast
|
||||
|
||||
from bacpypes.vlan import Network, Node
|
||||
|
||||
from bacpypes.app import LocalDeviceObject, Application
|
||||
from bacpypes.appservice import StateMachineAccessPoint, ApplicationServiceAccessPoint
|
||||
from bacpypes.netservice import NetworkServiceAccessPoint, NetworkServiceElement
|
||||
|
||||
from bacpypes.apdu import WhoIsRequest, IAmRequest, ReadPropertyRequest, WritePropertyRequest
|
||||
|
||||
from bacpypes.primitivedata import Null, Atomic, Integer, Unsigned, Real
|
||||
from bacpypes.constructeddata import Array, Any
|
||||
from bacpypes.object import get_object_class, get_datatype
|
||||
|
||||
# some debugging
|
||||
_debug = 0
|
||||
_log = ModuleLogger(globals())
|
||||
|
||||
# globals
|
||||
vlan_app_1 = None
|
||||
vlan_app_2 = None
|
||||
|
||||
#
|
||||
# VLANApplication
|
||||
#
|
||||
|
||||
@bacpypes_debugging
|
||||
class VLANApplication(Application):
|
||||
|
||||
def __init__(self, vlan_device, vlan_address, aseID=None):
|
||||
if _debug: VLANApplication._debug("__init__ %r %r aseID=%r", vlan_device, vlan_address, aseID)
|
||||
Application.__init__(self, vlan_device, vlan_address, aseID)
|
||||
|
||||
# include a application decoder
|
||||
self.asap = ApplicationServiceAccessPoint()
|
||||
|
||||
# pass the device object to the state machine access point so it
|
||||
# can know if it should support segmentation
|
||||
self.smap = StateMachineAccessPoint(vlan_device)
|
||||
|
||||
# a network service access point will be needed
|
||||
self.nsap = NetworkServiceAccessPoint()
|
||||
|
||||
# give the NSAP a generic network layer service element
|
||||
self.nse = NetworkServiceElement()
|
||||
bind(self.nse, self.nsap)
|
||||
|
||||
# bind the top layers
|
||||
bind(self, self.asap, self.smap, self.nsap)
|
||||
|
||||
# create a vlan node at the assigned address
|
||||
self.vlan_node = Node(vlan_address)
|
||||
|
||||
# bind the stack to the node, no network number
|
||||
self.nsap.bind(self.vlan_node)
|
||||
|
||||
def request(self, apdu):
|
||||
if _debug: VLANApplication._debug("[%s]request %r", self.vlan_node.address, apdu)
|
||||
Application.request(self, apdu)
|
||||
|
||||
def indication(self, apdu):
|
||||
if _debug: VLANApplication._debug("[%s]indication %r", self.vlan_node.address, apdu)
|
||||
Application.indication(self, apdu)
|
||||
|
||||
def response(self, apdu):
|
||||
if _debug: VLANApplication._debug("[%s]response %r", self.vlan_node.address, apdu)
|
||||
Application.response(self, apdu)
|
||||
|
||||
def confirmation(self, apdu):
|
||||
if _debug: VLANApplication._debug("[%s]confirmation %r", self.vlan_node.address, apdu)
|
||||
Application.confirmation(self, apdu)
|
||||
|
||||
#
|
||||
# TestConsoleCmd
|
||||
#
|
||||
|
||||
class TestConsoleCmd(ConsoleCmd):
|
||||
|
||||
def do_whois(self, args):
|
||||
"""whois [ <addr>] [ <lolimit> <hilimit> ]"""
|
||||
args = args.split()
|
||||
if _debug: TestConsoleCmd._debug("do_whois %r", args)
|
||||
|
||||
try:
|
||||
# build a request
|
||||
request = WhoIsRequest()
|
||||
if (len(args) == 1) or (len(args) == 3):
|
||||
request.pduDestination = Address(args[0])
|
||||
del args[0]
|
||||
else:
|
||||
request.pduDestination = GlobalBroadcast()
|
||||
|
||||
if len(args) == 2:
|
||||
request.deviceInstanceRangeLowLimit = int(args[0])
|
||||
request.deviceInstanceRangeHighLimit = int(args[1])
|
||||
if _debug: TestConsoleCmd._debug(" - request: %r", request)
|
||||
|
||||
# give it to the application
|
||||
vlan_app_1.request(request)
|
||||
|
||||
except Exception, e:
|
||||
TestConsoleCmd._exception("exception: %r", e)
|
||||
|
||||
def do_iam(self, args):
|
||||
"""iam"""
|
||||
args = args.split()
|
||||
if _debug: TestConsoleCmd._debug("do_iam %r", args)
|
||||
|
||||
try:
|
||||
# build a request
|
||||
request = IAmRequest()
|
||||
request.pduDestination = GlobalBroadcast()
|
||||
|
||||
# set the parameters from the device object
|
||||
request.iAmDeviceIdentifier = vlan_device_1.objectIdentifier
|
||||
request.maxAPDULengthAccepted = vlan_device_1.maxApduLengthAccepted
|
||||
request.segmentationSupported = vlan_device_1.segmentationSupported
|
||||
request.vendorID = vlan_device_1.vendorIdentifier
|
||||
if _debug: TestConsoleCmd._debug(" - request: %r", request)
|
||||
|
||||
# give it to the application
|
||||
vlan_app_1.request(request)
|
||||
|
||||
except Exception, e:
|
||||
TestConsoleCmd._exception("exception: %r", e)
|
||||
|
||||
def do_read(self, args):
|
||||
"""read <addr> <type> <inst> <prop> [ <indx> ]"""
|
||||
args = args.split()
|
||||
if _debug: TestConsoleCmd._debug("do_read %r", args)
|
||||
|
||||
try:
|
||||
addr, obj_type, obj_inst, prop_id = args[:4]
|
||||
|
||||
if obj_type.isdigit():
|
||||
obj_type = int(obj_type)
|
||||
elif not get_object_class(obj_type):
|
||||
raise ValueError, "unknown object type"
|
||||
|
||||
obj_inst = int(obj_inst)
|
||||
|
||||
datatype = get_datatype(obj_type, prop_id)
|
||||
if not datatype:
|
||||
raise ValueError, "invalid property for object type"
|
||||
|
||||
# build a request
|
||||
request = ReadPropertyRequest(
|
||||
objectIdentifier=(obj_type, obj_inst),
|
||||
propertyIdentifier=prop_id,
|
||||
)
|
||||
request.pduDestination = Address(addr)
|
||||
|
||||
if len(args) == 5:
|
||||
request.propertyArrayIndex = int(args[4])
|
||||
if _debug: TestConsoleCmd._debug(" - request: %r", request)
|
||||
|
||||
# give it to the application
|
||||
vlan_app_1.request(request)
|
||||
|
||||
except Exception, e:
|
||||
TestConsoleCmd._exception("exception: %r", e)
|
||||
|
||||
def do_write(self, args):
|
||||
"""write <addr> <type> <inst> <prop> <value> [ <indx> ] [ <priority> ]"""
|
||||
args = args.split()
|
||||
if _debug: TestConsoleCmd._debug("do_write %r", args)
|
||||
|
||||
try:
|
||||
addr, obj_type, obj_inst, prop_id = args[:4]
|
||||
if obj_type.isdigit():
|
||||
obj_type = int(obj_type)
|
||||
obj_inst = int(obj_inst)
|
||||
value = args[4]
|
||||
|
||||
indx = None
|
||||
if len(args) >= 6:
|
||||
if args[5] != "-":
|
||||
indx = int(args[5])
|
||||
if _debug: TestConsoleCmd._debug(" - indx: %r", indx)
|
||||
|
||||
priority = None
|
||||
if len(args) >= 7:
|
||||
priority = int(args[6])
|
||||
if _debug: TestConsoleCmd._debug(" - priority: %r", priority)
|
||||
|
||||
# get the datatype
|
||||
datatype = get_datatype(obj_type, prop_id)
|
||||
if _debug: TestConsoleCmd._debug(" - datatype: %r", datatype)
|
||||
|
||||
# change atomic values into something encodeable, null is a special case
|
||||
if (value == 'null'):
|
||||
value = Null()
|
||||
elif issubclass(datatype, Atomic):
|
||||
if datatype is Integer:
|
||||
value = int(value)
|
||||
elif datatype is Real:
|
||||
value = float(value)
|
||||
elif datatype is Unsigned:
|
||||
value = int(value)
|
||||
value = datatype(value)
|
||||
elif issubclass(datatype, Array) and (indx is not None):
|
||||
if indx == 0:
|
||||
value = Integer(value)
|
||||
elif issubclass(datatype.subtype, Atomic):
|
||||
value = datatype.subtype(value)
|
||||
elif not isinstance(value, datatype.subtype):
|
||||
raise TypeError, "invalid result datatype, expecting %s" % (datatype.subtype.__name__,)
|
||||
elif not isinstance(value, datatype):
|
||||
raise TypeError, "invalid result datatype, expecting %s" % (datatype.__name__,)
|
||||
if _debug: TestConsoleCmd._debug(" - encodeable value: %r %s", value, type(value))
|
||||
|
||||
# build a request
|
||||
request = WritePropertyRequest(
|
||||
objectIdentifier=(obj_type, obj_inst),
|
||||
propertyIdentifier=prop_id
|
||||
)
|
||||
request.pduDestination = Address(addr)
|
||||
|
||||
# save the value
|
||||
request.propertyValue = Any()
|
||||
try:
|
||||
request.propertyValue.cast_in(value)
|
||||
except Exception, e:
|
||||
TestConsoleCmd._exception("WriteProperty cast error: %r", e)
|
||||
|
||||
# optional array index
|
||||
if indx is not None:
|
||||
request.propertyArrayIndex = indx
|
||||
|
||||
# optional priority
|
||||
if priority is not None:
|
||||
request.priority = priority
|
||||
|
||||
if _debug: TestConsoleCmd._debug(" - request: %r", request)
|
||||
|
||||
# give it to the application
|
||||
vlan_app_1.request(request)
|
||||
|
||||
except Exception, e:
|
||||
TestConsoleCmd._exception("exception: %r", e)
|
||||
|
||||
bacpypes_debugging(TestConsoleCmd)
|
||||
|
||||
#
|
||||
# __main__
|
||||
#
|
||||
|
||||
try:
|
||||
# parse the command line arguments
|
||||
args = ArgumentParser(description=__doc__).parse_args()
|
||||
|
||||
if _debug: _log.debug("initialization")
|
||||
if _debug: _log.debug(" - args: %r", args)
|
||||
|
||||
# create a VLAN
|
||||
vlan = Network()
|
||||
|
||||
# create the first device
|
||||
vlan_device_1 = \
|
||||
LocalDeviceObject(
|
||||
objectName="VLAN Node 1",
|
||||
objectIdentifier=('device', 1),
|
||||
maxApduLengthAccepted=1024,
|
||||
segmentationSupported='noSegmentation',
|
||||
vendorIdentifier=15,
|
||||
)
|
||||
|
||||
# create the application stack, add it to the network
|
||||
vlan_app_1 = VLANApplication(vlan_device_1, Address(1))
|
||||
vlan.add_node(vlan_app_1.vlan_node)
|
||||
|
||||
# create the second device
|
||||
vlan_device_2 = \
|
||||
LocalDeviceObject(
|
||||
objectName="VLAN Node 2",
|
||||
objectIdentifier=('device', 2),
|
||||
maxApduLengthAccepted=1024,
|
||||
segmentationSupported='noSegmentation',
|
||||
vendorIdentifier=15,
|
||||
)
|
||||
|
||||
# create the application stack, add it to the network
|
||||
vlan_app_2 = VLANApplication(vlan_device_2, Address(2))
|
||||
vlan.add_node(vlan_app_2.vlan_node)
|
||||
|
||||
# make a console
|
||||
this_console = TestConsoleCmd()
|
||||
|
||||
_log.debug("running")
|
||||
|
||||
run()
|
||||
|
||||
except Exception, e:
|
||||
_log.exception("an error has occurred: %s", e)
|
||||
finally:
|
||||
_log.debug("finally")
|
||||
|
Loading…
Reference in New Issue
Block a user