1
0
mirror of https://github.com/JoelBender/bacpypes synced 2025-09-28 22:15:23 +08:00

merging #147 for next release

This commit is contained in:
Joel Bender 2017-12-06 20:56:52 -05:00
commit 44e9c33b6b
21 changed files with 1187 additions and 15 deletions

View File

@ -1472,7 +1472,18 @@ class Reliability(Enumerated):
, 'multiStateFault':9
, 'configurationError':10
, 'communicationFailure':12
, 'numberFault':13
, 'memberFault': 13
, 'monitoredObjectFault': 14
, 'tripped': 15
, 'lampFailure': 16
, 'activationFailure': 17
, 'renewDHCPFailure': 18
, 'renewFDRegistration-failure': 19
, 'restartAutoNegotiationFailure': 20
, 'restartFailure': 21
, 'proprietaryCommandFailure': 22
, 'faultsListed': 23
, 'referencedObjectFault': 24
}
class RestartReason(Enumerated):

View File

@ -565,7 +565,7 @@ def ArrayOf(klass):
def __setitem__(self, item, value):
# no wrapping index
if (item < 1) or (item > self.value[0]):
if (item < 0) or (item > self.value[0]):
raise IndexError("index out of range")
# special length handling for index 0
@ -575,7 +575,11 @@ def ArrayOf(klass):
self.value = self.value[0:value + 1]
elif value > self.value[0]:
# extend
self.value.extend( [None] * (value - self.value[0]) )
if issubclass(self.subtype, Atomic):
self.value.extend( [self.subtype().value] * (value - self.value[0]) )
else:
for i in range(value - self.value[0]):
self.value.append(self.subtype())
else:
return
self.value[0] = value

View File

@ -167,6 +167,7 @@ class Property(Logging):
# get the value
value = obj._values[self.identifier]
if _debug: Property._debug(" - value: %r", value)
# access an array
if arrayIndex is not None:
@ -200,14 +201,61 @@ class Property(Logging):
if not self.mutable:
raise ExecutionError(errorClass='property', errorCode='writeAccessDenied')
# if changing the length of the array, the value is unsigned
if arrayIndex == 0:
if not Unsigned.is_valid(value):
raise InvalidParameterDatatype("length of %s must be unsigned" % (
self.identifier,
))
# if it's atomic, make sure it's valid
if issubclass(self.datatype, Atomic):
elif issubclass(self.datatype, Atomic):
if _debug: Property._debug(" - property is atomic, checking value")
if not self.datatype.is_valid(value):
raise InvalidParameterDatatype("%s must be of type %s" % (
self.identifier, self.datatype.__name__,
))
# if it's an array, make sure it's valid regarding arrayIndex provided
elif issubclass(self.datatype, Array):
if _debug: Property._debug(" - property is array, checking subtype and index")
# changing a single element
if arrayIndex is not None:
# if it's atomic, make sure it's valid
if issubclass(self.datatype.subtype, Atomic):
if _debug: Property._debug(" - subtype is atomic, checking value")
if not self.datatype.subtype.is_valid(value):
raise InvalidParameterDatatype("%s must be of type %s" % (
self.identifier, self.datatype.__name__,
))
# constructed type
elif not isinstance(value, self.datatype.subtype):
raise InvalidParameterDatatype("%s must be of type %s" % (
self.identifier, self.datatype.subtype.__name__
))
# replacing the array
elif isinstance(value, list):
# check validity regarding subtype
for item in value:
# if it's atomic, make sure it's valid
if issubclass(self.datatype.subtype, Atomic):
if _debug: Property._debug(" - subtype is atomic, checking value")
if not self.datatype.subtype.is_valid(item):
raise InvalidParameterDatatype("elements of %s must be of type %s" % (
self.identifier, self.datatype.subtype.__name__,
))
# constructed type
elif not isinstance(item, self.datatype.subtype):
raise InvalidParameterDatatype("elements of %s must be of type %s" % (
self.identifier, self.datatype.subtype.__name__
))
# value is mutated into a new array
value = self.datatype(value)
# some kind of constructed data
elif not isinstance(value, self.datatype):
if _debug: Property._debug(" - property is not atomic and wrong type")
raise InvalidParameterDatatype("%s must be of type %s" % (

View File

@ -1471,7 +1471,18 @@ class Reliability(Enumerated):
, 'multiStateFault':9
, 'configurationError':10
, 'communicationFailure':12
, 'numberFault':13
, 'memberFault': 13
, 'monitoredObjectFault': 14
, 'tripped': 15
, 'lampFailure': 16
, 'activationFailure': 17
, 'renewDHCPFailure': 18
, 'renewFDRegistration-failure': 19
, 'restartAutoNegotiationFailure': 20
, 'restartFailure': 21
, 'proprietaryCommandFailure': 22
, 'faultsListed': 23
, 'referencedObjectFault': 24
}
class RestartReason(Enumerated):

View File

@ -563,7 +563,7 @@ def ArrayOf(klass):
def __setitem__(self, item, value):
# no wrapping index
if (item < 1) or (item > self.value[0]):
if (item < 0) or (item > self.value[0]):
raise IndexError("index out of range")
# special length handling for index 0
@ -573,7 +573,11 @@ def ArrayOf(klass):
self.value = self.value[0:value + 1]
elif value > self.value[0]:
# extend
self.value.extend( [None] * (value - self.value[0]) )
if issubclass(self.subtype, Atomic):
self.value.extend( [self.subtype().value] * (value - self.value[0]) )
else:
for i in range(value - self.value[0]):
self.value.append(self.subtype())
else:
return
self.value[0] = value

View File

@ -168,6 +168,7 @@ class Property:
# get the value
value = obj._values[self.identifier]
if _debug: Property._debug(" - value: %r", value)
# access an array
if arrayIndex is not None:
@ -201,14 +202,61 @@ class Property:
if not self.mutable:
raise ExecutionError(errorClass='property', errorCode='writeAccessDenied')
# if changing the length of the array, the value is unsigned
if arrayIndex == 0:
if not Unsigned.is_valid(value):
raise InvalidParameterDatatype("length of %s must be unsigned" % (
self.identifier,
))
# if it's atomic, make sure it's valid
if issubclass(self.datatype, Atomic):
elif issubclass(self.datatype, Atomic):
if _debug: Property._debug(" - property is atomic, checking value")
if not self.datatype.is_valid(value):
raise InvalidParameterDatatype("%s must be of type %s" % (
self.identifier, self.datatype.__name__,
))
# if it's an array, make sure it's valid regarding arrayIndex provided
elif issubclass(self.datatype, Array):
if _debug: Property._debug(" - property is array, checking subtype and index")
# changing a single element
if arrayIndex is not None:
# if it's atomic, make sure it's valid
if issubclass(self.datatype.subtype, Atomic):
if _debug: Property._debug(" - subtype is atomic, checking value")
if not self.datatype.subtype.is_valid(value):
raise InvalidParameterDatatype("%s must be of type %s" % (
self.identifier, self.datatype.__name__,
))
# constructed type
elif not isinstance(value, self.datatype.subtype):
raise InvalidParameterDatatype("%s must be of type %s" % (
self.identifier, self.datatype.subtype.__name__
))
# replacing the array
elif isinstance(value, list):
# check validity regarding subtype
for item in value:
# if it's atomic, make sure it's valid
if issubclass(self.datatype.subtype, Atomic):
if _debug: Property._debug(" - subtype is atomic, checking value")
if not self.datatype.subtype.is_valid(item):
raise InvalidParameterDatatype("elements of %s must be of type %s" % (
self.identifier, self.datatype.subtype.__name__,
))
# constructed type
elif not isinstance(item, self.datatype.subtype):
raise InvalidParameterDatatype("elements of %s must be of type %s" % (
self.identifier, self.datatype.subtype.__name__
))
# value is mutated into a new array
value = self.datatype(value)
# some kind of constructed data
elif not isinstance(value, self.datatype):
if _debug: Property._debug(" - property is not atomic and wrong type")
raise InvalidParameterDatatype("%s must be of type %s" % (

View File

@ -1472,7 +1472,18 @@ class Reliability(Enumerated):
, 'multiStateFault':9
, 'configurationError':10
, 'communicationFailure':12
, 'numberFault':13
, 'memberFault': 13
, 'monitoredObjectFault': 14
, 'tripped': 15
, 'lampFailure': 16
, 'activationFailure': 17
, 'renewDHCPFailure': 18
, 'renewFDRegistration-failure': 19
, 'restartAutoNegotiationFailure': 20
, 'restartFailure': 21
, 'proprietaryCommandFailure': 22
, 'faultsListed': 23
, 'referencedObjectFault': 24
}
class RestartReason(Enumerated):

View File

@ -563,7 +563,7 @@ def ArrayOf(klass):
def __setitem__(self, item, value):
# no wrapping index
if (item < 1) or (item > self.value[0]):
if (item < 0) or (item > self.value[0]):
raise IndexError("index out of range")
# special length handling for index 0
@ -573,7 +573,11 @@ def ArrayOf(klass):
self.value = self.value[0:value + 1]
elif value > self.value[0]:
# extend
self.value.extend( [None] * (value - self.value[0]) )
if issubclass(self.subtype, Atomic):
self.value.extend( [self.subtype().value] * (value - self.value[0]) )
else:
for i in range(value - self.value[0]):
self.value.append(self.subtype())
else:
return
self.value[0] = value

View File

@ -168,6 +168,7 @@ class Property:
# get the value
value = obj._values[self.identifier]
if _debug: Property._debug(" - value: %r", value)
# access an array
if arrayIndex is not None:
@ -201,14 +202,61 @@ class Property:
if not self.mutable:
raise ExecutionError(errorClass='property', errorCode='writeAccessDenied')
# if changing the length of the array, the value is unsigned
if arrayIndex == 0:
if not Unsigned.is_valid(value):
raise InvalidParameterDatatype("length of %s must be unsigned" % (
self.identifier,
))
# if it's atomic, make sure it's valid
if issubclass(self.datatype, Atomic):
elif issubclass(self.datatype, Atomic):
if _debug: Property._debug(" - property is atomic, checking value")
if not self.datatype.is_valid(value):
raise InvalidParameterDatatype("%s must be of type %s" % (
self.identifier, self.datatype.__name__,
))
# if it's an array, make sure it's valid regarding arrayIndex provided
elif issubclass(self.datatype, Array):
if _debug: Property._debug(" - property is array, checking subtype and index")
# changing a single element
if arrayIndex is not None:
# if it's atomic, make sure it's valid
if issubclass(self.datatype.subtype, Atomic):
if _debug: Property._debug(" - subtype is atomic, checking value")
if not self.datatype.subtype.is_valid(value):
raise InvalidParameterDatatype("%s must be of type %s" % (
self.identifier, self.datatype.__name__,
))
# constructed type
elif not isinstance(value, self.datatype.subtype):
raise InvalidParameterDatatype("%s must be of type %s" % (
self.identifier, self.datatype.subtype.__name__
))
# replacing the array
elif isinstance(value, list):
# check validity regarding subtype
for item in value:
# if it's atomic, make sure it's valid
if issubclass(self.datatype.subtype, Atomic):
if _debug: Property._debug(" - subtype is atomic, checking value")
if not self.datatype.subtype.is_valid(item):
raise InvalidParameterDatatype("elements of %s must be of type %s" % (
self.identifier, self.datatype.subtype.__name__,
))
# constructed type
elif not isinstance(item, self.datatype.subtype):
raise InvalidParameterDatatype("elements of %s must be of type %s" % (
self.identifier, self.datatype.subtype.__name__
))
# value is mutated into a new array
value = self.datatype(value)
# some kind of constructed data
elif not isinstance(value, self.datatype):
if _debug: Property._debug(" - property is not atomic and wrong type")
raise InvalidParameterDatatype("%s must be of type %s" % (

View File

@ -12,7 +12,8 @@ from bacpypes.consolelogging import ConfigArgumentParser
from bacpypes.core import run
from bacpypes.primitivedata import Real
from bacpypes.primitivedata import Real, CharacterString
from bacpypes.constructeddata import ArrayOf
from bacpypes.object import AnalogValueObject, Property, register_object_type
from bacpypes.errors import ExecutionError
@ -73,6 +74,7 @@ class RandomAnalogValueObject(AnalogValueObject):
properties = [
RandomValueProperty('presentValue'),
Property('eventMessageTexts', ArrayOf(CharacterString), mutable=True),
]
def __init__(self, **kwargs):
@ -107,7 +109,8 @@ def main():
# make a random input object
ravo1 = RandomAnalogValueObject(
objectIdentifier=('analogValue', 1), objectName='Random1'
objectIdentifier=('analogValue', 1), objectName='Random1',
eventMessageTexts=ArrayOf(CharacterString)(["hello"]),
)
_log.debug(" - ravo1: %r", ravo1)

View File

@ -0,0 +1,265 @@
#!/usr/bin/env python
"""
This application is similar to ReadWriteProperty but it is just for beating on
the event message texts, an array of character strings.
"""
import sys
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.consolelogging import ConfigArgumentParser
from bacpypes.consolecmd import ConsoleCmd
from bacpypes.core import run, enable_sleeping
from bacpypes.iocb import IOCB
from bacpypes.pdu import Address
from bacpypes.object import get_datatype
from bacpypes.apdu import SimpleAckPDU, \
ReadPropertyRequest, ReadPropertyACK, WritePropertyRequest
from bacpypes.primitivedata import Unsigned, CharacterString
from bacpypes.constructeddata import Array, ArrayOf, Any
from bacpypes.app import BIPSimpleApplication
from bacpypes.service.device import LocalDeviceObject
# some debugging
_debug = 0
_log = ModuleLogger(globals())
# globals
this_application = None
context = None
#
# ReadWritePropertyConsoleCmd
#
@bacpypes_debugging
class ReadWritePropertyConsoleCmd(ConsoleCmd):
def do_read(self, args):
"""read [ <indx> ]"""
args = args.split()
if _debug: ReadWritePropertyConsoleCmd._debug("do_read %r", args)
global context
try:
addr, obj_type, obj_inst = context
prop_id = 'eventMessageTexts'
datatype = get_datatype(obj_type, prop_id)
if not datatype:
raise ValueError("invalid property for object type")
# build a request
request = ReadPropertyRequest(
objectIdentifier=(obj_type, obj_inst),
propertyIdentifier=prop_id,
)
request.pduDestination = Address(addr)
if len(args) == 1:
request.propertyArrayIndex = int(args[0])
if _debug: ReadWritePropertyConsoleCmd._debug(" - request: %r", request)
# make an IOCB
iocb = IOCB(request)
if _debug: ReadWritePropertyConsoleCmd._debug(" - iocb: %r", iocb)
# give it to the application
this_application.request_io(iocb)
# wait for it to complete
iocb.wait()
# do something for success
if iocb.ioResponse:
apdu = iocb.ioResponse
# should be an ack
if not isinstance(apdu, ReadPropertyACK):
if _debug: ReadWritePropertyConsoleCmd._debug(" - not an ack")
return
# find the datatype
datatype = get_datatype(apdu.objectIdentifier[0], apdu.propertyIdentifier)
if _debug: ReadWritePropertyConsoleCmd._debug(" - datatype: %r", datatype)
if not datatype:
raise TypeError("unknown datatype")
# special case for array parts, others are managed by cast_out
if issubclass(datatype, Array) and (apdu.propertyArrayIndex is not None):
if apdu.propertyArrayIndex == 0:
value = apdu.propertyValue.cast_out(Unsigned)
else:
value = apdu.propertyValue.cast_out(datatype.subtype)
else:
value = apdu.propertyValue.cast_out(datatype)
if _debug: ReadWritePropertyConsoleCmd._debug(" - value: %r", value)
sys.stdout.write(str(value) + '\n')
if hasattr(value, 'debug_contents'):
value.debug_contents(file=sys.stdout)
sys.stdout.flush()
# do something for error/reject/abort
if iocb.ioError:
sys.stdout.write(str(iocb.ioError) + '\n')
except Exception as error:
ReadWritePropertyConsoleCmd._exception("exception: %r", error)
def do_write(self, args):
"""
write <indx> <value>
write 0 <len>
write [ <value> ]...
"""
args = args.split()
ReadWritePropertyConsoleCmd._debug("do_write %r", args)
try:
addr, obj_type, obj_inst = context
prop_id = 'eventMessageTexts'
indx = None
if args and args[0].isdigit():
indx = int(args[0])
if indx == 0:
value = Unsigned(int(args[1]))
else:
value = CharacterString(args[1])
else:
value = ArrayOf(CharacterString)(args[0:])
# build a request
request = WritePropertyRequest(
objectIdentifier=(obj_type, obj_inst),
propertyIdentifier=prop_id
)
request.pduDestination = Address(addr)
# save the value
request.propertyValue = Any()
try:
request.propertyValue.cast_in(value)
except Exception as error:
ReadWritePropertyConsoleCmd._exception("WriteProperty cast error: %r", error)
# optional array index
if indx is not None:
request.propertyArrayIndex = indx
if _debug: ReadWritePropertyConsoleCmd._debug(" - request: %r", request)
# make an IOCB
iocb = IOCB(request)
if _debug: ReadWritePropertyConsoleCmd._debug(" - iocb: %r", iocb)
# give it to the application
this_application.request_io(iocb)
# wait for it to complete
iocb.wait()
# do something for success
if iocb.ioResponse:
# should be an ack
if not isinstance(iocb.ioResponse, SimpleAckPDU):
if _debug: ReadWritePropertyConsoleCmd._debug(" - not an ack")
return
sys.stdout.write("ack\n")
# do something for error/reject/abort
if iocb.ioError:
sys.stdout.write(str(iocb.ioError) + '\n')
except Exception as error:
ReadWritePropertyConsoleCmd._exception("exception: %r", error)
def do_rtn(self, args):
"""rtn <addr> <net> ... """
args = args.split()
if _debug: ReadWritePropertyConsoleCmd._debug("do_rtn %r", args)
# safe to assume only one adapter
adapter = this_application.nsap.adapters[0]
if _debug: ReadWritePropertyConsoleCmd._debug(" - adapter: %r", adapter)
# provide the address and a list of network numbers
router_address = Address(args[0])
network_list = [int(arg) for arg in args[1:]]
# pass along to the service access point
this_application.nsap.add_router_references(adapter, router_address, network_list)
#
# __main__
#
def main():
global this_application, context
# parse the command line arguments
parser = ConfigArgumentParser(description=__doc__)
parser.add_argument(
"address",
help="address of server",
)
parser.add_argument(
"objtype",
help="object type",
)
parser.add_argument(
"objinst", type=int,
help="object instance",
)
args = parser.parse_args()
if _debug: _log.debug("initialization")
if _debug: _log.debug(" - args: %r", args)
# set the context, the collection of the above parameters
context = args.address, args.objtype, args.objinst
if _debug: _log.debug(" - context: %r", context)
# make a device object
this_device = LocalDeviceObject(
objectName=args.ini.objectname,
objectIdentifier=int(args.ini.objectidentifier),
maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted),
segmentationSupported=args.ini.segmentationsupported,
vendorIdentifier=int(args.ini.vendoridentifier),
)
# make a simple application
this_application = BIPSimpleApplication(this_device, args.ini.address)
# get the services supported
services_supported = this_application.get_services_supported()
if _debug: _log.debug(" - services_supported: %r", services_supported)
# let the device object know
this_device.protocolServicesSupported = services_supported.value
# make a console
this_console = ReadWritePropertyConsoleCmd()
if _debug: _log.debug(" - this_console: %r", this_console)
# enable sleeping will help with threads
enable_sleeping()
_log.debug("running")
run()
_log.debug("fini")
if __name__ == "__main__":
main()

View File

@ -17,6 +17,7 @@ from . import trapped_classes
from . import test_comm
from . import test_pdu
from . import test_primitive_data
from . import test_constructed_data
from . import test_utilities
from . import test_vlan

View File

@ -0,0 +1,14 @@
#!/usr/bin/python
"""
Test Constructed Data Module
"""
from . import test_sequence
from . import test_sequence_of
from . import test_array_of
from . import test_choice
from . import test_any
from . import test_any_atomic

View File

@ -0,0 +1,88 @@
#!/usr/bin/python
"""
Helper classes for constructed data tests.
"""
from bacpypes.debugging import bacpypes_debugging, ModuleLogger, xtob
from bacpypes.errors import MissingRequiredParameter
from bacpypes.primitivedata import Boolean, Integer, Tag, TagList
from bacpypes.constructeddata import Element, Sequence
# some debugging
_debug = 0
_log = ModuleLogger(globals())
@bacpypes_debugging
class SequenceEquality:
"""
This mixin class adds an equality function for matching values for all of
the elements, even if they are optional. It will raise an exception for
missing elements, even if they are missing in both objects.
"""
def __eq__(self, other):
if _debug: SequenceEquality._debug("__eq__ %r", other)
# loop through this sequences elements
for element in self.sequenceElements:
self_value = getattr(self, element.name, None)
other_value = getattr(other, element.name, None)
if (not element.optional) and ((self_value is None) or (other_value is None)):
raise MissingRequiredParameter("%s is a missing required element of %s" % (element.name, self.__class__.__name__))
if not (self_value == other_value):
return False
# success
return True
@bacpypes_debugging
class EmptySequence(Sequence, SequenceEquality):
def __init__(self, *args, **kwargs):
if _debug: EmptySequence._debug("__init__ %r %r", args, kwargs)
Sequence.__init__(self, *args, **kwargs)
@bacpypes_debugging
class SimpleSequence(Sequence, SequenceEquality):
sequenceElements = [
Element('hydrogen', Boolean),
]
def __init__(self, *args, **kwargs):
if _debug: SimpleSequence._debug("__init__ %r %r", args, kwargs)
Sequence.__init__(self, *args, **kwargs)
@bacpypes_debugging
class CompoundSequence1(Sequence, SequenceEquality):
sequenceElements = [
Element('hydrogen', Boolean),
Element('helium', Integer),
]
def __init__(self, *args, **kwargs):
if _debug: CompoundSequence1._debug("__init__ %r %r", args, kwargs)
Sequence.__init__(self, *args, **kwargs)
@bacpypes_debugging
class CompoundSequence2(Sequence, SequenceEquality):
sequenceElements = [
Element('lithium', Boolean, optional=True),
Element('beryllium', Integer),
]
def __init__(self, *args, **kwargs):
if _debug: CompoundSequence2._debug("__init__ %r %r", args, kwargs)
Sequence.__init__(self, *args, **kwargs)

View File

@ -0,0 +1,4 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# placeholder

View File

@ -0,0 +1,4 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# placeholder

View File

@ -0,0 +1,198 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Test Array
----------
"""
import unittest
from bacpypes.debugging import bacpypes_debugging, ModuleLogger, xtob
from bacpypes.errors import MissingRequiredParameter
from bacpypes.primitivedata import Integer, Tag, TagList
from bacpypes.constructeddata import Element, Sequence, ArrayOf
from .helpers import SimpleSequence
# some debugging
_debug = 0
_log = ModuleLogger(globals())
# array of integers
IntegerArray = ArrayOf(Integer)
@bacpypes_debugging
class TestIntegerArray(unittest.TestCase):
def test_empty_array(self):
if _debug: TestIntegerArray._debug("test_empty_array")
# create an empty array
ary = IntegerArray()
if _debug: TestIntegerArray._debug(" - ary: %r", ary)
# array sematics
assert len(ary) == 0
assert ary[0] == 0
# encode it in a tag list
tag_list = TagList()
ary.encode(tag_list)
if _debug: TestIntegerArray._debug(" - tag_list: %r", tag_list)
# create another sequence and decode the tag list
ary = IntegerArray()
ary.decode(tag_list)
if _debug: TestIntegerArray._debug(" - seq: %r", seq)
def test_append(self):
if _debug: TestIntegerArray._debug("test_append")
# create an empty array
ary = IntegerArray()
if _debug: TestIntegerArray._debug(" - ary: %r", ary)
# append an integer
ary.append(2)
assert len(ary) == 1
assert ary[0] == 1
assert ary[1] == 2
def test_delete_item(self):
if _debug: TestIntegerArray._debug("test_delete_item")
# create an array
ary = IntegerArray([1, 2, 3])
if _debug: TestIntegerArray._debug(" - ary: %r", ary)
# delete something
del ary[2]
assert len(ary) == 2
assert ary[0] == 2
assert ary.value[1:] == [1, 3]
def test_index_item(self):
if _debug: TestIntegerArray._debug("test_index_item")
# create an array
ary = IntegerArray([1, 2, 3])
if _debug: TestIntegerArray._debug(" - ary: %r", ary)
# find something
assert ary.index(3) == 3
# not find something
with self.assertRaises(ValueError):
indx = ary.index(4)
def test_remove_item(self):
if _debug: TestIntegerArray._debug("test_remove_item")
# create an array
ary = IntegerArray([1, 2, 3])
if _debug: TestIntegerArray._debug(" - ary: %r", ary)
# remove something
ary.remove(2)
assert ary.value[1:] == [1, 3]
# not remove something
with self.assertRaises(ValueError):
ary.remove(4)
def test_resize(self):
if _debug: TestIntegerArray._debug("test_resize")
# create an array
ary = IntegerArray([1, 2, 3])
if _debug: TestIntegerArray._debug(" - ary: %r", ary)
# make it shorter
ary[0] = 2
assert ary.value[1:] == [1, 2]
# make it longer
ary[0] = 4
assert ary.value[1:] == [1, 2, 0, 0]
def test_get_item(self):
if _debug: TestIntegerArray._debug("test_get_item")
# create an array
ary = IntegerArray([1, 2, 3])
if _debug: TestIntegerArray._debug(" - ary: %r", ary)
# BACnet semantics
assert ary[1] == 1
def test_set_item(self):
if _debug: TestIntegerArray._debug("test_set_item")
# create an array
ary = IntegerArray([1, 2, 3])
if _debug: TestIntegerArray._debug(" - ary: %r", ary)
# BACnet semantics, no type checking
ary[1] = 10
assert ary[1] == 10
def test_codec(self):
if _debug: TestIntegerArray._debug("test_codec")
# test array contents
ary_value = [1, 2, 3]
# create an array
ary = IntegerArray(ary_value)
if _debug: TestIntegerArray._debug(" - ary: %r", ary)
# encode it in a tag list
tag_list = TagList()
ary.encode(tag_list)
if _debug: TestIntegerArray._debug(" - tag_list: %r", tag_list)
# create another sequence and decode the tag list
ary = IntegerArray()
ary.decode(tag_list)
if _debug: TestIntegerArray._debug(" - ary %r", ary)
# value matches
assert ary.value[1:] == ary_value
# array of a sequence
SimpleSequenceArray = ArrayOf(SimpleSequence)
@bacpypes_debugging
class TestSimpleSequenceArray(unittest.TestCase):
def test_codec(self):
if _debug: TestSimpleSequenceArray._debug("test_codec")
# test array contents
ary_value = [
SimpleSequence(hydrogen=True),
SimpleSequence(hydrogen=False),
SimpleSequence(hydrogen=True),
]
# create an array
ary = SimpleSequenceArray(ary_value)
if _debug: TestSimpleSequenceArray._debug(" - ary: %r", ary)
# encode it in a tag list
tag_list = TagList()
ary.encode(tag_list)
if _debug: TestSimpleSequenceArray._debug(" - tag_list: %r", tag_list)
# create another sequence and decode the tag list
ary = SimpleSequenceArray()
ary.decode(tag_list)
if _debug: TestSimpleSequenceArray._debug(" - ary %r", ary)
# value matches
assert ary.value[1:] == ary_value

View File

@ -0,0 +1,4 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# placeholder

View File

@ -0,0 +1,204 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Test Constructed Data Sequence
------------------------------
"""
import unittest
from bacpypes.debugging import bacpypes_debugging, ModuleLogger, xtob
from bacpypes.errors import MissingRequiredParameter
from bacpypes.primitivedata import Boolean, Integer, Tag, TagList
from bacpypes.constructeddata import Element, Sequence
from .helpers import EmptySequence, SimpleSequence, CompoundSequence1, \
CompoundSequence2
# some debugging
_debug = 0
_log = ModuleLogger(globals())
@bacpypes_debugging
class TestEmptySequence(unittest.TestCase):
def test_empty_sequence(self):
if _debug: TestEmptySequence._debug("test_empty_sequence")
# create a sequence
seq = EmptySequence()
if _debug: TestEmptySequence._debug(" - seq: %r", seq)
# encode it in a tag list
tag_list = TagList()
seq.encode(tag_list)
if _debug: TestEmptySequence._debug(" - tag_list: %r", tag_list)
# create another sequence and decode the tag list
seq = EmptySequence()
seq.decode(tag_list)
if _debug: TestEmptySequence._debug(" - seq: %r", seq)
def test_no_elements(self):
if _debug: TestEmptySequence._debug("test_no_elements")
# create a sequence with an undefined element
with self.assertRaises(TypeError):
seq = EmptySequence(some_element=None)
@bacpypes_debugging
class TestSimpleSequence(unittest.TestCase):
def test_missing_element(self):
if _debug: TestSimpleSequence._debug("test_missing_element")
# create a sequence with a missing required element
seq = SimpleSequence()
if _debug: TestSimpleSequence._debug(" - seq: %r", seq)
# encode it in a tag list
tag_list = TagList()
with self.assertRaises(MissingRequiredParameter):
seq.encode(tag_list)
def test_wrong_type(self):
if _debug: TestSimpleSequence._debug("test_wrong_type")
# create a sequence with wrong element value type
seq = SimpleSequence(hydrogen=12)
with self.assertRaises(TypeError):
tag_list = TagList()
seq.encode(tag_list)
def test_codec(self):
if _debug: TestSimpleSequence._debug("test_codec")
# create a sequence
seq = SimpleSequence(hydrogen=False)
if _debug: TestSimpleSequence._debug(" - seq: %r", seq)
# encode it in a tag list
tag_list = TagList()
seq.encode(tag_list)
if _debug: TestSimpleSequence._debug(" - tag_list: %r", tag_list)
# create another sequence and decode the tag list
seq = SimpleSequence()
seq.decode(tag_list)
if _debug: TestSimpleSequence._debug(" - seq: %r", seq)
@bacpypes_debugging
class TestCompoundSequence1(unittest.TestCase):
def test_missing_element(self):
if _debug: TestCompoundSequence1._debug("test_missing_element")
# create a sequence with a missing required element
seq = CompoundSequence1()
if _debug: TestSimpleSequence._debug(" - seq: %r", seq)
# encode it in a tag list
tag_list = TagList()
with self.assertRaises(MissingRequiredParameter):
seq.encode(tag_list)
# create a sequence with a missing required element
seq = CompoundSequence1(hydrogen=True)
if _debug: TestSimpleSequence._debug(" - seq: %r", seq)
# encode it in a tag list
tag_list = TagList()
with self.assertRaises(MissingRequiredParameter):
seq.encode(tag_list)
# create a sequence with a missing required element
seq = CompoundSequence1(helium=2)
if _debug: TestSimpleSequence._debug(" - seq: %r", seq)
# encode it in a tag list
tag_list = TagList()
with self.assertRaises(MissingRequiredParameter):
seq.encode(tag_list)
def test_codec(self):
if _debug: TestCompoundSequence1._debug("test_codec")
# create a sequence
seq = CompoundSequence1(hydrogen=True, helium=2)
if _debug: TestCompoundSequence1._debug(" - seq: %r", seq)
# encode it in a tag list
tag_list = TagList()
seq.encode(tag_list)
if _debug: TestCompoundSequence1._debug(" - tag_list: %r", tag_list)
# create another sequence and decode the tag list
seq = CompoundSequence1()
seq.decode(tag_list)
if _debug: TestCompoundSequence1._debug(" - seq: %r", seq)
@bacpypes_debugging
class TestCompoundSequence2(unittest.TestCase):
def test_missing_element(self):
if _debug: TestCompoundSequence2._debug("test_missing_element")
# create a sequence with a missing required element
seq = CompoundSequence2()
if _debug: TestCompoundSequence2._debug(" - seq: %r", seq)
# encode it in a tag list
tag_list = TagList()
with self.assertRaises(MissingRequiredParameter):
seq.encode(tag_list)
# create a sequence with a missing required element
seq = CompoundSequence2(lithium=True)
if _debug: TestCompoundSequence2._debug(" - seq: %r", seq)
# encode it in a tag list
tag_list = TagList()
with self.assertRaises(MissingRequiredParameter):
seq.encode(tag_list)
def test_codec_1(self):
if _debug: TestCompoundSequence2._debug("test_codec_1")
# create a sequence
seq = CompoundSequence2(beryllium=2)
if _debug: TestCompoundSequence2._debug(" - seq: %r", seq)
# encode it in a tag list
tag_list = TagList()
seq.encode(tag_list)
if _debug: TestCompoundSequence2._debug(" - tag_list: %r", tag_list)
# create another sequence and decode the tag list
seq = CompoundSequence2()
seq.decode(tag_list)
if _debug: TestCompoundSequence2._debug(" - seq: %r", seq)
def test_codec_2(self):
if _debug: TestCompoundSequence2._debug("test_codec_2")
# create a sequence
seq = CompoundSequence2(lithium=True, beryllium=3)
if _debug: TestCompoundSequence2._debug(" - seq: %r", seq)
# encode it in a tag list
tag_list = TagList()
seq.encode(tag_list)
if _debug: TestCompoundSequence2._debug(" - tag_list: %r", tag_list)
# create another sequence and decode the tag list
seq = CompoundSequence2()
seq.decode(tag_list)
if _debug: TestCompoundSequence2._debug(" - seq: %r", seq)

View File

@ -0,0 +1,4 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# placeholder

View File

@ -1 +1,195 @@
# placeholder
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Test Object Services
--------------------
"""
import unittest
from bacpypes.debugging import bacpypes_debugging, ModuleLogger, xtob
from bacpypes.errors import ExecutionError, InvalidParameterDatatype
from bacpypes.primitivedata import CharacterString
from bacpypes.constructeddata import ArrayOf
from bacpypes.object import register_object_type, ReadableProperty, \
WritableProperty, Object
# some debugging
_debug = 0
_log = ModuleLogger(globals())
@bacpypes_debugging
class TestBasic(unittest.TestCase):
def test_basic(self):
"""Test basic configuration of a network."""
if _debug: TestBasic._debug("test_basic")
# create an object, no properties
obj = Object()
@bacpypes_debugging
@register_object_type(vendor_id=999)
class SampleReadableLocation(Object):
objectType = 'sampleReadableLocation'
properties = [
ReadableProperty('location', CharacterString),
]
def __init__(self, **kwargs):
if _debug: SampleReadableLocation._debug("__init__ %r", kwargs)
Object.__init__(self, **kwargs)
@bacpypes_debugging
class TestReadableLocation(unittest.TestCase):
def test_sample(self):
"""Test basic configuration of a network."""
if _debug: TestReadableLocation._debug("test_sample")
# create an object, default property value is None
obj = SampleReadableLocation()
assert obj.location == None
# create an object with a location
obj = SampleReadableLocation(location="home")
assert obj.ReadProperty('location') == "home"
# not an array, write access denied
with self.assertRaises(ExecutionError):
obj.ReadProperty('location', 0)
with self.assertRaises(ExecutionError):
obj.WriteProperty('location', "work")
@bacpypes_debugging
@register_object_type(vendor_id=999)
class SampleWritableLocation(Object):
objectType = 'sampleWritableLocation'
properties = [
WritableProperty('location', CharacterString),
]
def __init__(self, **kwargs):
if _debug: SampleWritableLocation._debug("__init__ %r", kwargs)
Object.__init__(self, **kwargs)
@bacpypes_debugging
class TestWritableLocation(unittest.TestCase):
def test_sample(self):
"""Test basic configuration of a network."""
if _debug: TestWritableLocation._debug("test_sample")
# create an object with a location
obj = SampleWritableLocation(location="home")
assert obj.ReadProperty('location') == "home"
# not an array, write access denied
with self.assertRaises(ExecutionError):
obj.ReadProperty('location', 0)
# write access successful
obj.WriteProperty('location', "work")
assert obj.location == "work"
# wrong data type
with self.assertRaises(InvalidParameterDatatype):
obj.WriteProperty('location', 12)
# array of character strings
ArrayOfCharacterString = ArrayOf(CharacterString)
@bacpypes_debugging
@register_object_type(vendor_id=999)
class SampleWritableArray(Object):
objectType = 'sampleWritableLocation'
properties = [
WritableProperty('location', ArrayOfCharacterString),
]
def __init__(self, **kwargs):
if _debug: SampleWritableArray._debug("__init__ %r", kwargs)
Object.__init__(self, **kwargs)
@bacpypes_debugging
class TestWritableArray(unittest.TestCase):
def test_empty_array(self):
"""Test basic configuration of a network."""
if _debug: TestWritableArray._debug("test_empty_array")
# create an object with a location
obj = SampleWritableArray(location=ArrayOfCharacterString())
if _debug: TestWritableArray._debug(" - obj.location: %r", obj.location)
assert len(obj.location) == 0
assert obj.location[0] == 0
def test_short_array(self):
if _debug: TestWritableArray._debug("test_short_array")
# create an object with a location
obj = SampleWritableArray(location=ArrayOfCharacterString(["home"]))
if _debug: TestWritableArray._debug(" - obj.location: %r", obj.location)
assert obj.ReadProperty('location', 0) == 1
assert obj.ReadProperty('location', 1) == "home"
def test_changing_length(self):
if _debug: TestWritableArray._debug("test_changing_length")
# create an object with a location
obj = SampleWritableArray(location=ArrayOfCharacterString(["home"]))
if _debug: TestWritableArray._debug(" - obj.location: %r", obj.location)
# change the length of the array
obj.WriteProperty('location', 2, arrayIndex=0)
assert obj.ReadProperty('location', 0) == 2
# array extended with none, should get property default value
assert obj.ReadProperty('location', 2) == ""
# wrong datatype
with self.assertRaises(InvalidParameterDatatype):
obj.WriteProperty('location', "nope", arrayIndex=0)
def test_changing_item(self):
if _debug: TestWritableArray._debug("test_changing_item")
# create an object with a location
obj = SampleWritableArray(location=ArrayOfCharacterString(["home"]))
if _debug: TestWritableArray._debug(" - obj.location: %r", obj.location)
# change the element
obj.WriteProperty('location', "work", arrayIndex=1)
assert obj.ReadProperty('location', 1) == "work"
# wrong datatype
with self.assertRaises(InvalidParameterDatatype):
obj.WriteProperty('location', 12, arrayIndex=1)
def test_replacing_array(self):
if _debug: TestWritableArray._debug("test_replacing_array")
# create an object with a location
obj = SampleWritableArray()
if _debug: TestWritableArray._debug(" - obj.location: %r", obj.location)
# replace the array
obj.WriteProperty('location', ArrayOfCharacterString(["home", "work"]))
assert obj.ReadProperty('location', 0) == 2
assert obj.ReadProperty('location', 1) == "home"
assert obj.ReadProperty('location', 2) == "work"