1
0
mirror of https://github.com/JoelBender/bacpypes synced 2025-09-28 22:15:23 +08:00

first commit, introduced DeviceCommunicationControlServices class

Application checks and drops packets as per Clause 16

WIP

do_DeviceCommunication implemented

Set deviceInfoCache if not set already

First iteration

removed unused import

proper simpleAckPDU

removed unnecessary comments

Instanciate SMAP in Application class

Removed smap instanciation from BIPForeignApplication

Add support so one can add properties to LocalDeviceObject

Remove adding _dcc variables to LocalDeviceObject

Support for authentication WIP

removed unwanted change

Response with error on authentication failure

Removed unused import

DeviceCommunicationService class added for py3

Device allow for attributes

Changes in Application

Appservice changes for py3

Dont attach smap instance to local device

miscc
This commit is contained in:
Amit Kumar 2017-07-20 19:20:11 +05:30
parent b1fc637193
commit 79262b29d3
7 changed files with 166 additions and 34 deletions

1
.gitignore vendored
View File

@ -4,6 +4,7 @@ __pycache__/
# Editor backups
*.py~
.idea/
# C extensions
*.so

View File

@ -200,7 +200,9 @@ class Application(ApplicationServiceElement, Collector):
# keep track of the local device
if localDevice:
self.smap = StateMachineAccessPoint(localDevice)
self.localDevice = localDevice
self.smap._localDevice = self.localDevice
# bind the device object to this application
localDevice._app = self
@ -224,6 +226,8 @@ class Application(ApplicationServiceElement, Collector):
# use the provided cache or make a default one
self.deviceInfoCache = deviceInfoCache or DeviceInfoCache()
if not self.smap.deviceInfoCache:
self.smap.deviceInfoCache = self.deviceInfoCache
# controllers for managing confirmed requests as a client
self.controllers = {}
@ -470,14 +474,6 @@ class BIPSimpleApplication(ApplicationIOController, WhoIsIAmServices, ReadWriteP
# include a application decoder
self.asap = ApplicationServiceAccessPoint()
# pass the device object to the state machine access point so it
# can know if it should support segmentation
self.smap = StateMachineAccessPoint(localDevice)
# the segmentation state machines need access to the same device
# information cache as the application
self.smap.deviceInfoCache = self.deviceInfoCache
# a network service access point will be needed
self.nsap = NetworkServiceAccessPoint()
@ -526,14 +522,6 @@ class BIPForeignApplication(ApplicationIOController, WhoIsIAmServices, ReadWrite
# include a application decoder
self.asap = ApplicationServiceAccessPoint()
# pass the device object to the state machine access point so it
# can know if it should support segmentation
self.smap = StateMachineAccessPoint(localDevice)
# the segmentation state machines need access to the same device
# information cache as the application
self.smap.deviceInfoCache = self.deviceInfoCache
# a network service access point will be needed
self.nsap = NetworkServiceAccessPoint()

View File

@ -1118,6 +1118,13 @@ class StateMachineAccessPoint(Client, ServiceAccessPoint):
def confirmation(self, pdu):
"""Packets coming up the stack are APDU's."""
if _debug: StateMachineAccessPoint._debug("confirmation %r", pdu)
if pdu.apduService != 17:
if _debug: StateMachineAccessPoint._debug("DeviceCommunicationRequest")
if getattr(self._localDevice, "_dcc_disable_all", None):
raise RuntimeError("All communications disabled")
if getattr(self._localDevice, "_dcc_disable", None):
if pdu.apduService != 8:
raise RuntimeError("All communications disabled except indication for Who-IS.")
# make a more focused interpretation
atype = apdu_types.get(pdu.apduType)
@ -1217,6 +1224,13 @@ class StateMachineAccessPoint(Client, ServiceAccessPoint):
a new transaction as a client."""
if _debug: StateMachineAccessPoint._debug("sap_indication %r", apdu)
if apdu.apduService != 17:
if getattr(self._localDevice, "_dcc_disable_all", None):
raise RuntimeError("All communications disabled.")
if getattr(self._localDevice, "_dcc_disable", None):
if apdu.apduService != 0:
raise RuntimeError("All communications disabled except indication for Who-IS.")
if isinstance(apdu, UnconfirmedRequestPDU):
# deliver to the device
self.request(apdu)

View File

@ -7,11 +7,16 @@ from ..pdu import GlobalBroadcast
from ..primitivedata import Date, Time, ObjectIdentifier
from ..constructeddata import ArrayOf
from ..apdu import WhoIsRequest, IAmRequest, IHaveRequest
from ..apdu import WhoIsRequest, IAmRequest, IHaveRequest, SimpleAckPDU, Error
from ..errors import ExecutionError, InconsistentParameters, \
MissingRequiredParameter, ParameterOutOfRange
from ..object import register_object_type, registered_object_types, \
Property, DeviceObject
from ..task import FunctionTask
from ..basetypes import ErrorClass, ErrorCode
from time import time as _time
# some debugging
_debug = 0
@ -90,6 +95,11 @@ class LocalDeviceObject(DeviceObject):
if attr not in kwargs:
kwargs[attr] = value
for key, value in kwargs.items():
if key.startswith("_"):
setattr(self, key, value)
del kwargs[key]
# check for registration
if self.__class__ not in registered_object_types.values():
if 'vendorIdentifier' not in kwargs:
@ -125,6 +135,10 @@ class LocalDeviceObject(DeviceObject):
if 'objectList' not in self.propertyList:
self.propertyList.append('objectList')
def enable_communications(self):
self._dcc_disable_all = False
self._dcc_disable = False
#
# Who-Is I-Am Services
#
@ -361,3 +375,53 @@ class WhoHasIHaveServices(Capability):
raise MissingRequiredParameter("objectName required")
### check to see if the application is looking for this object
@bacpypes_debugging
class DeviceCommunicationControlServices(Capability):
def __init__(self):
if _debug: DeviceCommunicationControlServices._debug("__init__")
Capability.__init__(self)
self._enable_task = None
def do_DeviceCommunicationControlRequest(self, apdu):
if _debug: DeviceCommunicationControlServices._debug("do_CommunicationControlRequest, %r", apdu)
_response = SimpleAckPDU(context=apdu)
security_error = False
if getattr(self.localDevice, "_dcc_password", None):
if not apdu.password or apdu.password != getattr(self.localDevice, "_dcc_password"):
_response = Error(errorClass=ErrorClass("security"), errorCode=ErrorCode("passwordFailure"), context=apdu)
security_error = True
self.localDevice._app.smap.sap_confirmation(_response)
if security_error:
return
time_duration = apdu.timeDuration
start_time = None
if apdu.enableDisable == "enable":
if self._enable_task:
self._enable_task.suspend_task()
self._enable_communications()
elif apdu.enableDisable == "disable":
self.localDevice._dcc_disable_all = True
self.localDevice._dcc_disable = True
start_time = _time()
else:
self.localDevice._dcc_disable_all = False
self.localDevice._dcc_disable = True
start_time = _time()
if time_duration:
self._enable_task = FunctionTask(self._enable_communications)
self._enable_task.install_task(start_time + time_duration)
def _enable_communications(self):
if _debug: DeviceCommunicationControlServices._debug("enable_communications")
self.localDevice.enable_communications()

View File

@ -200,7 +200,9 @@ class Application(ApplicationServiceElement, Collector):
# keep track of the local device
if localDevice:
self.smap = StateMachineAccessPoint(localDevice)
self.localDevice = localDevice
self.smap._localDevice = self.localDevice
# bind the device object to this application
localDevice._app = self
@ -224,6 +226,8 @@ class Application(ApplicationServiceElement, Collector):
# use the provided cache or make a default one
self.deviceInfoCache = deviceInfoCache or DeviceInfoCache()
if not self.smap.deviceInfoCache:
self.smap.deviceInfoCache = self.deviceInfoCache
# controllers for managing confirmed requests as a client
self.controllers = {}
@ -470,14 +474,6 @@ class BIPSimpleApplication(ApplicationIOController, WhoIsIAmServices, ReadWriteP
# include a application decoder
self.asap = ApplicationServiceAccessPoint()
# pass the device object to the state machine access point so it
# can know if it should support segmentation
self.smap = StateMachineAccessPoint(localDevice)
# the segmentation state machines need access to the same device
# information cache as the application
self.smap.deviceInfoCache = self.deviceInfoCache
# a network service access point will be needed
self.nsap = NetworkServiceAccessPoint()
@ -526,14 +522,6 @@ class BIPForeignApplication(ApplicationIOController, WhoIsIAmServices, ReadWrite
# include a application decoder
self.asap = ApplicationServiceAccessPoint()
# pass the device object to the state machine access point so it
# can know if it should support segmentation
self.smap = StateMachineAccessPoint(localDevice)
# the segmentation state machines need access to the same device
# information cache as the application
self.smap.deviceInfoCache = self.deviceInfoCache
# a network service access point will be needed
self.nsap = NetworkServiceAccessPoint()

View File

@ -1118,6 +1118,13 @@ class StateMachineAccessPoint(Client, ServiceAccessPoint):
def confirmation(self, pdu):
"""Packets coming up the stack are APDU's."""
if _debug: StateMachineAccessPoint._debug("confirmation %r", pdu)
if pdu.apduService != 17:
if _debug: StateMachineAccessPoint._debug("DeviceCommunicationRequest")
if getattr(self._localDevice, "_dcc_disable_all", None):
raise RuntimeError("All communications disabled")
if getattr(self._localDevice, "_dcc_disable", None):
if pdu.apduService != 8:
raise RuntimeError("All communications disabled except indication for Who-IS.")
# make a more focused interpretation
atype = apdu_types.get(pdu.apduType)
@ -1217,6 +1224,13 @@ class StateMachineAccessPoint(Client, ServiceAccessPoint):
a new transaction as a client."""
if _debug: StateMachineAccessPoint._debug("sap_indication %r", apdu)
if apdu.apduService != 17:
if getattr(self._localDevice, "_dcc_disable_all", None):
raise RuntimeError("All communications disabled.")
if getattr(self._localDevice, "_dcc_disable", None):
if apdu.apduService != 0:
raise RuntimeError("All communications disabled except indication for Who-IS.")
if isinstance(apdu, UnconfirmedRequestPDU):
# deliver to the device
self.request(apdu)

View File

@ -7,11 +7,16 @@ from ..pdu import GlobalBroadcast
from ..primitivedata import Date, Time, ObjectIdentifier
from ..constructeddata import ArrayOf
from ..apdu import WhoIsRequest, IAmRequest, IHaveRequest
from ..apdu import WhoIsRequest, IAmRequest, IHaveRequest, SimpleAckPDU, Error
from ..errors import ExecutionError, InconsistentParameters, \
MissingRequiredParameter, ParameterOutOfRange
from ..object import register_object_type, registered_object_types, \
Property, DeviceObject
from ..task import FunctionTask
from ..basetypes import ErrorClass, ErrorCode
from time import time as _time
# some debugging
_debug = 0
@ -90,6 +95,11 @@ class LocalDeviceObject(DeviceObject):
if attr not in kwargs:
kwargs[attr] = value
for key, value in kwargs.items():
if key.startswith("_"):
setattr(self, key, value)
del kwargs[key]
# check for registration
if self.__class__ not in registered_object_types.values():
if 'vendorIdentifier' not in kwargs:
@ -125,6 +135,9 @@ class LocalDeviceObject(DeviceObject):
if 'objectList' not in self.propertyList:
self.propertyList.append('objectList')
def enable_communications(self):
self._dcc_disable_all = False
self._dcc_disable = False
#
# Who-Is I-Am Services
#
@ -337,3 +350,53 @@ class WhoHasIHaveServices(Capability):
raise MissingRequiredParameter("objectName required")
### check to see if the application is looking for this object
@bacpypes_debugging
class DeviceCommunicationControlServices(Capability):
def __init__(self):
if _debug: DeviceCommunicationControlServices._debug("__init__")
Capability.__init__(self)
self._enable_task = None
def do_DeviceCommunicationControlRequest(self, apdu):
if _debug: DeviceCommunicationControlServices._debug("do_CommunicationControlRequest, %r", apdu)
_response = SimpleAckPDU(context=apdu)
security_error = False
if getattr(self.localDevice, "_dcc_password", None):
if not apdu.password or apdu.password != getattr(self.localDevice, "_dcc_password"):
_response = Error(errorClass=ErrorClass("security"), errorCode=ErrorCode("passwordFailure"), context=apdu)
security_error = True
self.localDevice_app.smap.sap_confirmation(_response)
if security_error:
return
time_duration = apdu.timeDuration
start_time = None
if apdu.enableDisable == "enable":
if self._enable_task:
self._enable_task.suspend_task()
self._enable_communications()
elif apdu.enableDisable == "disable":
self.localDevice._dcc_disable_all = True
self.localDevice._dcc_disable = True
start_time = _time()
else:
self.localDevice._dcc_disable_all = False
self.localDevice._dcc_disable = True
start_time = _time()
if time_duration:
self._enable_task = FunctionTask(self._enable_communications)
self._enable_task.install_task(start_time + time_duration)
def _enable_communications(self):
if _debug: DeviceCommunicationControlServices._debug("enable_communications")
self.localDevice.enable_communications()