From 629d0584993e938a93e9fca4590a3fc76a03769a Mon Sep 17 00:00:00 2001 From: Joel Bender Date: Fri, 8 Mar 2019 11:11:42 -0500 Subject: [PATCH] additional sample for network application registered as a foreign device --- samples/WhoIsRouterForeign.py | 142 ++++++++++++++++++++++++++++++++++ 1 file changed, 142 insertions(+) create mode 100755 samples/WhoIsRouterForeign.py diff --git a/samples/WhoIsRouterForeign.py b/samples/WhoIsRouterForeign.py new file mode 100755 index 0000000..af77f40 --- /dev/null +++ b/samples/WhoIsRouterForeign.py @@ -0,0 +1,142 @@ +#!/usr/bin/env python + +""" +This sample application has just a network stack, not a full application, +and is a way to create InitializeRoutingTable and WhoIsRouterToNetwork requests. +""" + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ConfigArgumentParser +from bacpypes.consolecmd import ConsoleCmd + +from bacpypes.core import run, enable_sleeping + +from bacpypes.pdu import Address +from bacpypes.npdu import ( + WhoIsRouterToNetwork, IAmRouterToNetwork, + InitializeRoutingTable, InitializeRoutingTableAck, + ) + +from bacpypes.app import BIPNetworkApplication + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# globals +this_application = None +this_console = None + +# +# WhoIsRouterApplication +# + +@bacpypes_debugging +class WhoIsRouterApplication(BIPNetworkApplication): + + def __init__(self, *args): + if _debug: WhoIsRouterApplication._debug("__init__ %r", args) + BIPNetworkApplication.__init__(self, *args) + + def request(self, adapter, npdu): + if _debug: WhoIsRouterApplication._debug("request %r %r", adapter, npdu) + BIPNetworkApplication.request(self, adapter, npdu) + + def indication(self, adapter, npdu): + if _debug: WhoIsRouterApplication._debug("indication %r %r", adapter, npdu) + + if isinstance(npdu, IAmRouterToNetwork): + print("{} -> {}, {}".format(npdu.pduSource, npdu.pduDestination, npdu.iartnNetworkList)) + + elif isinstance(npdu, InitializeRoutingTableAck): + print("{} routing table".format(npdu.pduSource)) + for rte in npdu.irtaTable: + print(" {} {} {}".format(rte.rtDNET, rte.rtPortID, rte.rtPortInfo)) + + BIPNetworkApplication.indication(self, adapter, npdu) + + def response(self, adapter, npdu): + if _debug: WhoIsRouterApplication._debug("response %r %r", adapter, npdu) + BIPNetworkApplication.response(self, adapter, npdu) + + def confirmation(self, adapter, npdu): + if _debug: WhoIsRouterApplication._debug("confirmation %r %r", adapter, npdu) + BIPNetworkApplication.confirmation(self, adapter, npdu) + +# +# WhoIsRouterConsoleCmd +# + +@bacpypes_debugging +class WhoIsRouterConsoleCmd(ConsoleCmd): + + def do_irt(self, args): + """irt """ + args = args.split() + if _debug: WhoIsRouterConsoleCmd._debug("do_irt %r", args) + + # build a request + try: + request = InitializeRoutingTable() + request.pduDestination = Address(args[0]) + except: + print("invalid arguments") + return + + # give it to the application + this_application.request(this_application.nsap.local_adapter, request) + + def do_wirtn(self, args): + """wirtn [ ]""" + args = args.split() + if _debug: WhoIsRouterConsoleCmd._debug("do_wirtn %r", args) + + # build a request + try: + request = WhoIsRouterToNetwork() + request.pduDestination = Address(args[0]) + if (len(args) > 1): + request.wirtnNetwork = int(args[1]) + except: + print("invalid arguments") + return + + # give it to the application + this_application.request(this_application.nsap.local_adapter, request) + +# +# __main__ +# + +def main(): + global this_application + + # parse the command line arguments + args = ConfigArgumentParser(description=__doc__).parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # make a simple application + this_application = WhoIsRouterApplication( + args.ini.address, + Address(args.ini.foreignbbmd), + int(args.ini.foreignttl), + ) + if _debug: _log.debug(" - this_application: %r", this_application) + + # make a console + this_console = WhoIsRouterConsoleCmd() + if _debug: _log.debug(" - this_console: %r", this_console) + + # enable sleeping will help with threads + enable_sleeping() + + _log.debug("running") + + run() + + _log.debug("fini") + +if __name__ == "__main__": + main()