mirror of
				https://github.com/JoelBender/bacpypes
				synced 2025-10-27 00:57:47 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			174 lines
		
	
	
		
			5.3 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			174 lines
		
	
	
		
			5.3 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
| #!/usr/bin/env python
 | |
| 
 | |
| """
 | |
| This application presents a 'console' prompt to the user asking for read commands
 | |
| which create ReadPropertyRequest PDUs, then lines up the coorresponding ReadPropertyACK
 | |
| and prints the value.
 | |
| """
 | |
| 
 | |
| import sys
 | |
| from collections import deque
 | |
| 
 | |
| from bacpypes.debugging import bacpypes_debugging, ModuleLogger
 | |
| from bacpypes.consolelogging import ConfigArgumentParser
 | |
| 
 | |
| from bacpypes.core import run, stop, deferred
 | |
| from bacpypes.iocb import IOCB
 | |
| 
 | |
| from bacpypes.pdu import Address
 | |
| from bacpypes.apdu import ReadPropertyRequest
 | |
| 
 | |
| from bacpypes.app import BIPSimpleApplication
 | |
| from bacpypes.object import get_object_class, get_datatype
 | |
| from bacpypes.local.device import LocalDeviceObject
 | |
| 
 | |
| # some debugging
 | |
| _debug = 0
 | |
| _log = ModuleLogger(globals())
 | |
| 
 | |
| # globals
 | |
| this_application = None
 | |
| device_address = None
 | |
| object_identifier = None
 | |
| property_list = None
 | |
| 
 | |
| #
 | |
| #   ReadPropertyApplication
 | |
| #
 | |
| 
 | |
| @bacpypes_debugging
 | |
| class ReadPropertyApplication(BIPSimpleApplication):
 | |
| 
 | |
|     def __init__(self, *args):
 | |
|         if _debug: ReadPropertyApplication._debug("__init__ %r", args)
 | |
|         BIPSimpleApplication.__init__(self, *args)
 | |
| 
 | |
|         # current property being read
 | |
|         self.property_identifier = None
 | |
| 
 | |
|     def next_request(self):
 | |
|         if _debug: ReadPropertyApplication._debug("next_request")
 | |
|         global device_address, object_identifier, property_list
 | |
| 
 | |
|         # check to see if we're done
 | |
|         if not property_list:
 | |
|             if _debug: ReadPropertyApplication._debug("    - done")
 | |
|             stop()
 | |
|             return
 | |
| 
 | |
|         # get the next request
 | |
|         self.property_identifier = property_list.popleft()
 | |
|         if _debug: ReadPropertyApplication._debug("    - property_identifier: %r", self.property_identifier)
 | |
| 
 | |
|         # build a request
 | |
|         request = ReadPropertyRequest(
 | |
|             destination=device_address,
 | |
|             objectIdentifier=object_identifier,
 | |
|             propertyIdentifier=self.property_identifier,
 | |
|             )
 | |
|         if _debug: ReadPropertyApplication._debug("    - request: %r", request)
 | |
| 
 | |
|         # make an IOCB
 | |
|         iocb = IOCB(request)
 | |
| 
 | |
|         # set a callback for the response
 | |
|         iocb.add_callback(self.complete_request)
 | |
|         if _debug: ReadPropertyApplication._debug("    - iocb: %r", iocb)
 | |
| 
 | |
|         # send the request
 | |
|         this_application.request_io(iocb)
 | |
| 
 | |
|     def complete_request(self, iocb):
 | |
|         if _debug: ReadPropertyApplication._debug("complete_request %r", iocb)
 | |
| 
 | |
|         if iocb.ioResponse:
 | |
|             apdu = iocb.ioResponse
 | |
| 
 | |
|             # find the datatype
 | |
|             datatype = get_datatype(apdu.objectIdentifier[0], self.property_identifier)
 | |
|             if _debug: ReadPropertyApplication._debug("    - datatype: %r", datatype)
 | |
|             if not datatype:
 | |
|                 raise TypeError("unknown datatype")
 | |
| 
 | |
|             # special case for array parts, others are managed by cast_out
 | |
|             value = apdu.propertyValue.cast_out(datatype)
 | |
|             if _debug: ReadPropertyApplication._debug("    - value: %r", value)
 | |
| 
 | |
|             sys.stdout.write(self.property_identifier + " = " + str(value) + '\n')
 | |
|             if hasattr(value, 'debug_contents'):
 | |
|                 value.debug_contents(file=sys.stdout)
 | |
|             sys.stdout.flush()
 | |
| 
 | |
|         if iocb.ioError:
 | |
|             if _debug: ReadPropertyApplication._debug("    - error: %r", iocb.ioError)
 | |
| 
 | |
|             # if it is an unknown property, just skip to the next one
 | |
|             if getattr(iocb.ioError, 'errorCode', '') != 'unknownProperty':
 | |
|                 sys.stdout.write(self.property_identifier + "! " + str(iocb.ioError) + '\n')
 | |
|                 sys.stdout.flush()
 | |
| 
 | |
|         # fire off another request
 | |
|         deferred(self.next_request)
 | |
| 
 | |
| 
 | |
| #
 | |
| #   __main__
 | |
| #
 | |
| 
 | |
| def main():
 | |
|     global this_application, device_address, object_identifier, property_list
 | |
| 
 | |
|     # parse the command line arguments
 | |
|     parser = ConfigArgumentParser(description=__doc__)
 | |
|     parser.add_argument(
 | |
|         "address",
 | |
|         help="device address",
 | |
|         )
 | |
|     parser.add_argument(
 | |
|         "objtype",
 | |
|         help="object types, e.g., analogInput",
 | |
|         )
 | |
|     parser.add_argument(
 | |
|         "objinstance", type=int,
 | |
|         help="object instance",
 | |
|         )
 | |
|     args = parser.parse_args()
 | |
| 
 | |
|     if _debug: _log.debug("initialization")
 | |
|     if _debug: _log.debug("    - args: %r", args)
 | |
| 
 | |
|     # interpret the address
 | |
|     device_address = Address(args.address)
 | |
|     if _debug: _log.debug("    - device_address: %r", device_address)
 | |
| 
 | |
|     # build an identifier
 | |
|     object_identifier = (args.objtype, args.objinstance)
 | |
|     if _debug: _log.debug("    - object_identifier: %r", object_identifier)
 | |
| 
 | |
|     # get the object class
 | |
|     object_class = get_object_class(args.objtype)
 | |
|     if _debug: _log.debug("    - object_class: %r", object_class)
 | |
| 
 | |
|     # make a queue of the properties
 | |
|     property_list = deque(prop.identifier for prop in object_class.properties)
 | |
|     if _debug: _log.debug("    - property_list: %r", property_list)
 | |
| 
 | |
|     # make a device object
 | |
|     this_device = LocalDeviceObject(ini=args.ini)
 | |
|     if _debug: _log.debug("    - this_device: %r", this_device)
 | |
| 
 | |
|     # make a simple application
 | |
|     this_application = ReadPropertyApplication(this_device, args.ini.address)
 | |
| 
 | |
|     # fire off a request when the core has a chance
 | |
|     deferred(this_application.next_request)
 | |
| 
 | |
|     _log.debug("running")
 | |
| 
 | |
|     run()
 | |
| 
 | |
|     _log.debug("fini")
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     main()
 | 
