mirror of
https://github.com/JoelBender/bacpypes
synced 2025-09-28 22:15:23 +08:00
merging in the latest stage before continuing with this issue
This commit is contained in:
commit
da1336bc59
|
@ -2,6 +2,18 @@
|
|||
|
||||
"""BACnet Python Package"""
|
||||
|
||||
#
|
||||
# Platform Check
|
||||
#
|
||||
|
||||
import sys as _sys
|
||||
import warnings as _warnings
|
||||
|
||||
_supported_platforms = ('linux2', 'win32', 'darwin')
|
||||
|
||||
if _sys.platform not in _supported_platforms:
|
||||
_warnings.warn("unsupported platform", RuntimeWarning)
|
||||
|
||||
#
|
||||
# Communications Core Modules
|
||||
#
|
||||
|
|
|
@ -91,7 +91,7 @@ class UDPMultiplexer:
|
|||
bind(self.direct, self.directPort)
|
||||
|
||||
# create and bind the broadcast address for non-Windows
|
||||
if specialBroadcast and (not noBroadcast) and 'win' not in sys.platform:
|
||||
if specialBroadcast and (not noBroadcast) and sys.platform in ('linux2', 'darwin'):
|
||||
self.broadcast = _MultiplexClient(self)
|
||||
self.broadcastPort = UDPDirector(self.addrBroadcastTuple, reuse=True)
|
||||
bind(self.direct, self.broadcastPort)
|
||||
|
@ -293,7 +293,7 @@ class BIPSAP(ServiceAccessPoint):
|
|||
|
||||
def __init__(self, sap=None):
|
||||
"""A BIP service access point."""
|
||||
if _debug: BIPSimple._debug("__init__ sap=%r", sap)
|
||||
if _debug: BIPSAP._debug("__init__ sap=%r", sap)
|
||||
ServiceAccessPoint.__init__(self, sap)
|
||||
|
||||
def sap_indication(self, pdu):
|
||||
|
|
|
@ -8,7 +8,6 @@ import sys
|
|||
import types
|
||||
import os
|
||||
import gc
|
||||
import readline
|
||||
import signal
|
||||
import cmd
|
||||
import logging
|
||||
|
@ -20,6 +19,12 @@ from .consolelogging import ConsoleLogHandler
|
|||
|
||||
from . import core
|
||||
|
||||
# readline is used for history files
|
||||
try:
|
||||
import readline
|
||||
except ImportError:
|
||||
readline = None
|
||||
|
||||
# some debugging
|
||||
_debug = 0
|
||||
_log = ModuleLogger(globals())
|
||||
|
@ -263,7 +268,8 @@ class ConsoleCmd(cmd.Cmd, Thread, Logging):
|
|||
cmd.Cmd.preloop(self) ## sets up command completion
|
||||
|
||||
try:
|
||||
readline.read_history_file(sys.argv[0] + ".history")
|
||||
if readline:
|
||||
readline.read_history_file(sys.argv[0] + ".history")
|
||||
except Exception, err:
|
||||
if not isinstance(err, IOError):
|
||||
self.stdout.write("history error: %s\n" % err)
|
||||
|
@ -273,7 +279,8 @@ class ConsoleCmd(cmd.Cmd, Thread, Logging):
|
|||
Despite the claims in the Cmd documentaion, Cmd.postloop() is not a stub.
|
||||
"""
|
||||
try:
|
||||
readline.write_history_file(sys.argv[0]+".history")
|
||||
if readline:
|
||||
readline.write_history_file(sys.argv[0] + ".history")
|
||||
except Exception, err:
|
||||
if not isinstance(err, IOError):
|
||||
self.stdout.write("history error: %s\n" % err)
|
||||
|
|
|
@ -21,7 +21,7 @@ _task_manager = None
|
|||
_unscheduled_tasks = []
|
||||
|
||||
# only defined for linux platforms
|
||||
if sys.platform.startswith(('linux', 'darwin')):
|
||||
if sys.platform in ('linux2', 'darwin'):
|
||||
from .event import WaitableEvent
|
||||
#
|
||||
# _Trigger
|
||||
|
|
|
@ -2,6 +2,18 @@
|
|||
|
||||
"""BACnet Python Package"""
|
||||
|
||||
#
|
||||
# Platform Check
|
||||
#
|
||||
|
||||
import sys as _sys
|
||||
import warnings as _warnings
|
||||
|
||||
_supported_platforms = ('linux2', 'win32', 'darwin')
|
||||
|
||||
if _sys.platform not in _supported_platforms:
|
||||
_warnings.warn("unsupported platform", RuntimeWarning)
|
||||
|
||||
#
|
||||
# Communications Core Modules
|
||||
#
|
||||
|
|
|
@ -92,7 +92,7 @@ class UDPMultiplexer:
|
|||
bind(self.direct, self.directPort)
|
||||
|
||||
# create and bind the broadcast address for non-Windows
|
||||
if specialBroadcast and (not noBroadcast) and 'win' not in sys.platform:
|
||||
if specialBroadcast and (not noBroadcast) and sys.platform in ('linux2', 'darwin'):
|
||||
self.broadcast = _MultiplexClient(self)
|
||||
self.broadcastPort = UDPDirector(self.addrBroadcastTuple, reuse=True)
|
||||
bind(self.direct, self.broadcastPort)
|
||||
|
@ -291,7 +291,7 @@ class BIPSAP(ServiceAccessPoint):
|
|||
|
||||
def __init__(self, sap=None):
|
||||
"""A BIP service access point."""
|
||||
if _debug: BIPSimple._debug("__init__ sap=%r", sap)
|
||||
if _debug: BIPSAP._debug("__init__ sap=%r", sap)
|
||||
ServiceAccessPoint.__init__(self, sap)
|
||||
|
||||
def sap_indication(self, pdu):
|
||||
|
|
|
@ -8,7 +8,6 @@ import sys
|
|||
import types
|
||||
import os
|
||||
import gc
|
||||
import readline
|
||||
import signal
|
||||
import cmd
|
||||
import logging
|
||||
|
@ -20,6 +19,12 @@ from .consolelogging import ConsoleLogHandler
|
|||
|
||||
from . import core
|
||||
|
||||
# readline is used for history files
|
||||
try:
|
||||
import readline
|
||||
except ImportError:
|
||||
readline = None
|
||||
|
||||
# some debugging
|
||||
_debug = 0
|
||||
_log = ModuleLogger(globals())
|
||||
|
@ -265,7 +270,8 @@ class ConsoleCmd(cmd.Cmd, Thread, Logging):
|
|||
cmd.Cmd.preloop(self) ## sets up command completion
|
||||
|
||||
try:
|
||||
readline.read_history_file(sys.argv[0] + ".history")
|
||||
if readline:
|
||||
readline.read_history_file(sys.argv[0] + ".history")
|
||||
except Exception as err:
|
||||
if not isinstance(err, IOError):
|
||||
self.stdout.write("history error: %s\n" % err)
|
||||
|
@ -275,7 +281,8 @@ class ConsoleCmd(cmd.Cmd, Thread, Logging):
|
|||
Despite the claims in the Cmd documentaion, Cmd.postloop() is not a stub.
|
||||
"""
|
||||
try:
|
||||
readline.write_history_file(sys.argv[0]+".history")
|
||||
if readline:
|
||||
readline.write_history_file(sys.argv[0] + ".history")
|
||||
except Exception as err:
|
||||
if not isinstance(err, IOError):
|
||||
self.stdout.write("history error: %s\n" % err)
|
||||
|
|
|
@ -21,7 +21,7 @@ _task_manager = None
|
|||
_unscheduled_tasks = []
|
||||
|
||||
# only defined for linux platforms
|
||||
if sys.platform.startswith(('linux', 'darwin')):
|
||||
if sys.platform in ('linux2', 'darwin'):
|
||||
from .event import WaitableEvent
|
||||
#
|
||||
# _Trigger
|
||||
|
|
|
@ -2,6 +2,18 @@
|
|||
|
||||
"""BACnet Python Package"""
|
||||
|
||||
#
|
||||
# Platform Check
|
||||
#
|
||||
|
||||
import sys as _sys
|
||||
import warnings as _warnings
|
||||
|
||||
_supported_platforms = ('linux', 'win32', 'darwin')
|
||||
|
||||
if _sys.platform not in _supported_platforms:
|
||||
_warnings.warn("unsupported platform", RuntimeWarning)
|
||||
|
||||
#
|
||||
# Communications Core Modules
|
||||
#
|
||||
|
|
|
@ -92,7 +92,7 @@ class UDPMultiplexer:
|
|||
bind(self.direct, self.directPort)
|
||||
|
||||
# create and bind the broadcast address for non-Windows
|
||||
if specialBroadcast and (not noBroadcast) and 'win' not in sys.platform:
|
||||
if specialBroadcast and (not noBroadcast) and sys.platform in ('linux', 'darwin'):
|
||||
self.broadcast = _MultiplexClient(self)
|
||||
self.broadcastPort = UDPDirector(self.addrBroadcastTuple, reuse=True)
|
||||
bind(self.direct, self.broadcastPort)
|
||||
|
@ -289,7 +289,7 @@ class BIPSAP(ServiceAccessPoint):
|
|||
|
||||
def __init__(self, sap=None):
|
||||
"""A BIP service access point."""
|
||||
if _debug: BIPSimple._debug("__init__ sap=%r", sap)
|
||||
if _debug: BIPSAP._debug("__init__ sap=%r", sap)
|
||||
ServiceAccessPoint.__init__(self, sap)
|
||||
|
||||
def sap_indication(self, pdu):
|
||||
|
|
|
@ -8,7 +8,6 @@ import sys
|
|||
import types
|
||||
import os
|
||||
import gc
|
||||
import readline
|
||||
import signal
|
||||
import cmd
|
||||
import logging
|
||||
|
@ -20,6 +19,12 @@ from .consolelogging import ConsoleLogHandler
|
|||
|
||||
from . import core
|
||||
|
||||
# readline is used for history files
|
||||
try:
|
||||
import readline
|
||||
except ImportError:
|
||||
readline = None
|
||||
|
||||
# some debugging
|
||||
_debug = 0
|
||||
_log = ModuleLogger(globals())
|
||||
|
@ -266,7 +271,8 @@ class ConsoleCmd(cmd.Cmd, Thread, Logging):
|
|||
cmd.Cmd.preloop(self) ## sets up command completion
|
||||
|
||||
try:
|
||||
readline.read_history_file(sys.argv[0] + ".history")
|
||||
if readline:
|
||||
readline.read_history_file(sys.argv[0] + ".history")
|
||||
except Exception as err:
|
||||
if not isinstance(err, IOError):
|
||||
self.stdout.write("history error: %s\n" % err)
|
||||
|
@ -276,7 +282,8 @@ class ConsoleCmd(cmd.Cmd, Thread, Logging):
|
|||
Despite the claims in the Cmd documentaion, Cmd.postloop() is not a stub.
|
||||
"""
|
||||
try:
|
||||
readline.write_history_file(sys.argv[0]+".history")
|
||||
if readline:
|
||||
readline.write_history_file(sys.argv[0] + ".history")
|
||||
except Exception as err:
|
||||
if not isinstance(err, IOError):
|
||||
self.stdout.write("history error: %s\n" % err)
|
||||
|
|
|
@ -21,7 +21,7 @@ _task_manager = None
|
|||
_unscheduled_tasks = []
|
||||
|
||||
# only defined for linux platforms
|
||||
if sys.platform.startswith(('linux', 'darwin')):
|
||||
if sys.platform in ('linux', 'darwin'):
|
||||
from .event import WaitableEvent
|
||||
#
|
||||
# _Trigger
|
||||
|
|
220
samples/WhoIsIAmForeign.py
Executable file
220
samples/WhoIsIAmForeign.py
Executable file
|
@ -0,0 +1,220 @@
|
|||
#!/usr/bin/python
|
||||
|
||||
"""
|
||||
This application presents a 'console' prompt to the user asking for Who-Is and I-Am
|
||||
commands which create the related APDUs, then lines up the coorresponding I-Am
|
||||
for incoming traffic and prints out the contents.
|
||||
"""
|
||||
|
||||
import sys
|
||||
|
||||
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
|
||||
from bacpypes.consolelogging import ConfigArgumentParser
|
||||
from bacpypes.consolecmd import ConsoleCmd
|
||||
|
||||
from bacpypes.core import run
|
||||
|
||||
from bacpypes.pdu import Address, GlobalBroadcast
|
||||
from bacpypes.app import LocalDeviceObject, BIPForeignApplication
|
||||
|
||||
from bacpypes.apdu import WhoIsRequest, IAmRequest
|
||||
from bacpypes.basetypes import ServicesSupported
|
||||
from bacpypes.errors import DecodingError
|
||||
|
||||
# some debugging
|
||||
_debug = 0
|
||||
_log = ModuleLogger(globals())
|
||||
|
||||
# globals
|
||||
this_device = None
|
||||
this_application = None
|
||||
this_console = None
|
||||
|
||||
#
|
||||
# WhoIsIAmApplication
|
||||
#
|
||||
|
||||
class WhoIsIAmApplication(BIPForeignApplication):
|
||||
|
||||
def __init__(self, *args):
|
||||
if _debug: WhoIsIAmApplication._debug("__init__ %r", args)
|
||||
BIPForeignApplication.__init__(self, *args)
|
||||
|
||||
# keep track of requests to line up responses
|
||||
self._request = None
|
||||
|
||||
def request(self, apdu):
|
||||
if _debug: WhoIsIAmApplication._debug("request %r", apdu)
|
||||
|
||||
# save a copy of the request
|
||||
self._request = apdu
|
||||
|
||||
# forward it along
|
||||
BIPForeignApplication.request(self, apdu)
|
||||
|
||||
def confirmation(self, apdu):
|
||||
if _debug: WhoIsIAmApplication._debug("confirmation %r", apdu)
|
||||
|
||||
# forward it along
|
||||
BIPForeignApplication.confirmation(self, apdu)
|
||||
|
||||
def indication(self, apdu):
|
||||
if _debug: WhoIsIAmApplication._debug("indication %r", apdu)
|
||||
|
||||
if (isinstance(self._request, WhoIsRequest)) and (isinstance(apdu, IAmRequest)):
|
||||
device_type, device_instance = apdu.iAmDeviceIdentifier
|
||||
if device_type != 'device':
|
||||
raise DecodingError("invalid object type")
|
||||
|
||||
if (self._request.deviceInstanceRangeLowLimit is not None) and \
|
||||
(device_instance < self._request.deviceInstanceRangeLowLimit):
|
||||
pass
|
||||
elif (self._request.deviceInstanceRangeHighLimit is not None) and \
|
||||
(device_instance > self._request.deviceInstanceRangeHighLimit):
|
||||
pass
|
||||
else:
|
||||
# print out the contents
|
||||
sys.stdout.write('pduSource = ' + repr(apdu.pduSource) + '\n')
|
||||
sys.stdout.write('iAmDeviceIdentifier = ' + str(apdu.iAmDeviceIdentifier) + '\n')
|
||||
sys.stdout.write('maxAPDULengthAccepted = ' + str(apdu.maxAPDULengthAccepted) + '\n')
|
||||
sys.stdout.write('segmentationSupported = ' + str(apdu.segmentationSupported) + '\n')
|
||||
sys.stdout.write('vendorID = ' + str(apdu.vendorID) + '\n')
|
||||
sys.stdout.flush()
|
||||
|
||||
# forward it along
|
||||
BIPForeignApplication.indication(self, apdu)
|
||||
|
||||
bacpypes_debugging(WhoIsIAmApplication)
|
||||
|
||||
#
|
||||
# WhoIsIAmConsoleCmd
|
||||
#
|
||||
|
||||
class WhoIsIAmConsoleCmd(ConsoleCmd):
|
||||
|
||||
def do_whois(self, args):
|
||||
"""whois [ <addr>] [ <lolimit> <hilimit> ]"""
|
||||
args = args.split()
|
||||
if _debug: WhoIsIAmConsoleCmd._debug("do_whois %r", args)
|
||||
|
||||
try:
|
||||
# build a request
|
||||
request = WhoIsRequest()
|
||||
if (len(args) == 1) or (len(args) == 3):
|
||||
request.pduDestination = Address(args[0])
|
||||
del args[0]
|
||||
else:
|
||||
request.pduDestination = GlobalBroadcast()
|
||||
|
||||
if len(args) == 2:
|
||||
request.deviceInstanceRangeLowLimit = int(args[0])
|
||||
request.deviceInstanceRangeHighLimit = int(args[1])
|
||||
if _debug: WhoIsIAmConsoleCmd._debug(" - request: %r", request)
|
||||
|
||||
# give it to the application
|
||||
this_application.request(request)
|
||||
|
||||
except Exception as err:
|
||||
WhoIsIAmConsoleCmd._exception("exception: %r", err)
|
||||
|
||||
def do_iam(self, args):
|
||||
"""iam"""
|
||||
args = args.split()
|
||||
if _debug: WhoIsIAmConsoleCmd._debug("do_iam %r", args)
|
||||
|
||||
try:
|
||||
# build a request
|
||||
request = IAmRequest()
|
||||
request.pduDestination = GlobalBroadcast()
|
||||
|
||||
# set the parameters from the device object
|
||||
request.iAmDeviceIdentifier = this_device.objectIdentifier
|
||||
request.maxAPDULengthAccepted = this_device.maxApduLengthAccepted
|
||||
request.segmentationSupported = this_device.segmentationSupported
|
||||
request.vendorID = this_device.vendorIdentifier
|
||||
if _debug: WhoIsIAmConsoleCmd._debug(" - request: %r", request)
|
||||
|
||||
# give it to the application
|
||||
this_application.request(request)
|
||||
|
||||
except Exception as err:
|
||||
WhoIsIAmConsoleCmd._exception("exception: %r", err)
|
||||
|
||||
def do_rtn(self, args):
|
||||
"""rtn <addr> <net> ... """
|
||||
args = args.split()
|
||||
if _debug: WhoIsIAmConsoleCmd._debug("do_rtn %r", args)
|
||||
|
||||
# safe to assume only one adapter
|
||||
adapter = this_application.nsap.adapters[0]
|
||||
if _debug: WhoIsIAmConsoleCmd._debug(" - adapter: %r", adapter)
|
||||
|
||||
# provide the address and a list of network numbers
|
||||
router_address = Address(args[0])
|
||||
network_list = [int(arg) for arg in args[1:]]
|
||||
|
||||
# pass along to the service access point
|
||||
this_application.nsap.add_router_references(adapter, router_address, network_list)
|
||||
|
||||
bacpypes_debugging(WhoIsIAmConsoleCmd)
|
||||
|
||||
#
|
||||
# main
|
||||
#
|
||||
|
||||
def main():
|
||||
global this_device, this_application, this_console
|
||||
|
||||
# parse the command line arguments
|
||||
args = ConfigArgumentParser(description=__doc__).parse_args()
|
||||
|
||||
if _debug: _log.debug("initialization")
|
||||
if _debug: _log.debug(" - args: %r", args)
|
||||
|
||||
# make a device object
|
||||
this_device = LocalDeviceObject(
|
||||
objectName=args.ini.objectname,
|
||||
objectIdentifier=int(args.ini.objectidentifier),
|
||||
maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted),
|
||||
segmentationSupported=args.ini.segmentationsupported,
|
||||
vendorIdentifier=int(args.ini.vendoridentifier),
|
||||
)
|
||||
if _debug: _log.debug(" - this_device: %r", this_device)
|
||||
|
||||
# build a bit string that knows about the bit names
|
||||
pss = ServicesSupported()
|
||||
pss['whoIs'] = 1
|
||||
pss['iAm'] = 1
|
||||
pss['readProperty'] = 1
|
||||
pss['writeProperty'] = 1
|
||||
|
||||
# set the property value to be just the bits
|
||||
this_device.protocolServicesSupported = pss.value
|
||||
|
||||
# make a simple application
|
||||
this_application = WhoIsIAmApplication(
|
||||
this_device, args.ini.address,
|
||||
Address(args.ini.foreignbbmd),
|
||||
int(args.ini.foreignttl),
|
||||
)
|
||||
if _debug: _log.debug(" - this_application: %r", this_application)
|
||||
|
||||
# get the services supported
|
||||
services_supported = this_application.get_services_supported()
|
||||
if _debug: _log.debug(" - services_supported: %r", services_supported)
|
||||
|
||||
# let the device object know
|
||||
this_device.protocolServicesSupported = services_supported.value
|
||||
|
||||
# make a console
|
||||
this_console = WhoIsIAmConsoleCmd()
|
||||
if _debug: _log.debug(" - this_console: %r", this_console)
|
||||
|
||||
_log.debug("running")
|
||||
|
||||
run()
|
||||
|
||||
_log.debug("finally")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
Loading…
Reference in New Issue
Block a user