1
0
mirror of https://github.com/JoelBender/bacpypes synced 2025-09-28 22:15:23 +08:00

more tests

This commit is contained in:
Joel Bender 2018-06-28 16:43:43 -04:00
parent 352b92da92
commit 9db07dc8d5

View File

@ -18,10 +18,15 @@ from bacpypes.basetypes import (
)
from bacpypes.apdu import (
SubscribeCOVRequest,
ReadPropertyRequest, ReadPropertyACK,
ConfirmedCOVNotificationRequest, UnconfirmedCOVNotificationRequest,
SimpleAckPDU, Error, RejectPDU, AbortPDU,
)
from bacpypes.service.object import (
ReadWritePropertyServices,
)
from bacpypes.service.cov import (
ChangeOfValueServices,
)
@ -112,6 +117,184 @@ class TestBasic(unittest.TestCase):
@bacpypes_debugging
class TestBinaryValue(unittest.TestCase):
def test_8_10_1(self):
"""Confirmed Notifications Subscription"""
if _debug: TestBinaryValue._debug("test_8_10_1")
# create a network
anet = ApplicationNetwork("test_8_10_1")
# add the service capability to the IUT
anet.td.add_capability(COVTestClientServices)
anet.iut.add_capability(ChangeOfValueServices)
anet.iut.add_capability(ReadWritePropertyServices)
# make a binary value object
test_bv = BinaryValueObject(
objectIdentifier=('binaryValue', 1),
objectName='bv',
presentValue='inactive',
statusFlags=[0, 0, 0, 0],
)
# an easy way to change the present value
write_test_bv = lambda v: setattr(test_bv, 'presentValue', v)
# add it to the implementation
anet.iut.add_object(test_bv)
# tell the TD how to respond to confirmed notifications
anet.td.test_ack = True
anet.td.test_reject = None
anet.td.test_abort = None
# wait for the subscription
anet.iut.start_state.doc("8.10.1-1-0") \
.receive(SubscribeCOVRequest).doc("8.10.1-1-1") \
.success()
# send the subscription, wait for the ack
anet.td.start_state.doc("8.10.1-2-0") \
.send(SubscribeCOVRequest(
destination=anet.iut.address,
subscriberProcessIdentifier=1,
monitoredObjectIdentifier=('binaryValue', 1),
issueConfirmedNotifications=True,
lifetime=30,
)).doc("8.10.1-2-1") \
.receive(SimpleAckPDU).doc("8.10.1-2-2") \
.send(ReadPropertyRequest(
destination=anet.iut.address,
objectIdentifier=anet.iut_device_object.objectIdentifier,
propertyIdentifier='activeCovSubscriptions',
)).doc("8.10.1-2-3") \
.receive(ReadPropertyACK).doc("8.10.1-2-3") \
.success()
# run the group
anet.run()
def test_8_10_2(self):
"""Unconfirmed Notifications Subscription"""
if _debug: TestBinaryValue._debug("test_8_10_2")
# create a network
anet = ApplicationNetwork("test_8_10_2")
# add the service capability to the IUT
anet.td.add_capability(COVTestClientServices)
anet.iut.add_capability(ChangeOfValueServices)
# make a binary value object
test_bv = BinaryValueObject(
objectIdentifier=('binaryValue', 1),
objectName='bv',
presentValue='inactive',
statusFlags=[0, 0, 0, 0],
)
# an easy way to change the present value
write_test_bv = lambda v: setattr(test_bv, 'presentValue', v)
# add it to the implementation
anet.iut.add_object(test_bv)
# tell the TD how to respond to confirmed notifications
anet.td.test_ack = True
anet.td.test_reject = None
anet.td.test_abort = None
# wait for the subscription
anet.iut.start_state.doc("8.10.2-1-0") \
.receive(SubscribeCOVRequest).doc("8.10.2-1-1") \
.success()
# send the subscription, wait for the ack
anet.td.start_state.doc("8.10.2-2-0") \
.send(SubscribeCOVRequest(
destination=anet.iut.address,
subscriberProcessIdentifier=1,
monitoredObjectIdentifier=('binaryValue', 1),
issueConfirmedNotifications=False,
lifetime=30,
)).doc("8.10.2-2-1") \
.receive(SimpleAckPDU).doc("8.10.2-2-2") \
.success()
# run the group, cut the time limit short
anet.run(time_limit=5.0)
# check that the IUT still has the detection
if _debug: TestBinaryValue._debug(" - detections: %r", anet.iut.cov_detections)
assert len(anet.iut.cov_detections) == 1
# pop out the subscription list and criteria
obj_ref, criteria = anet.iut.cov_detections.popitem()
if _debug: TestBinaryValue._debug(" - criteria: %r", criteria)
# get the list of subscriptions from the criteria
subscriptions = criteria.cov_subscriptions.cov_subscriptions
if _debug: TestBinaryValue._debug(" - subscriptions: %r", subscriptions)
assert len(subscriptions) == 1
def test_8_10_3(self):
"""Canceling a Subscription"""
if _debug: TestBinaryValue._debug("test_8_10_3")
# create a network
anet = ApplicationNetwork("test_8_10_3")
# add the service capability to the IUT
anet.td.add_capability(COVTestClientServices)
anet.iut.add_capability(ChangeOfValueServices)
# make a binary value object
test_bv = BinaryValueObject(
objectIdentifier=('binaryValue', 1),
objectName='bv',
presentValue='inactive',
statusFlags=[0, 0, 0, 0],
)
# an easy way to change the present value
write_test_bv = lambda v: setattr(test_bv, 'presentValue', v)
# add it to the implementation
anet.iut.add_object(test_bv)
# tell the TD how to respond to confirmed notifications
anet.td.test_ack = True
anet.td.test_reject = None
anet.td.test_abort = None
# wait for the subscription
anet.iut.start_state.doc("8.10.3-1-0") \
.receive(SubscribeCOVRequest).doc("8.10.3-1-1") \
.receive(SubscribeCOVRequest).doc("8.10.3-1-1") \
.success()
# send the subscription, wait for the ack, then send the cancelation
# and wait for the ack
anet.td.start_state.doc("8.10.3-2-0") \
.send(SubscribeCOVRequest(
destination=anet.iut.address,
subscriberProcessIdentifier=1,
monitoredObjectIdentifier=('binaryValue', 1),
issueConfirmedNotifications=False,
lifetime=30,
)).doc("8.10.3-2-1") \
.receive(SimpleAckPDU).doc("8.10.3-2-2") \
.send(SubscribeCOVRequest(
destination=anet.iut.address,
subscriberProcessIdentifier=1,
monitoredObjectIdentifier=('binaryValue', 1),
)).doc("8.10.3-2-1") \
.receive(SimpleAckPDU).doc("8.10.3-2-2") \
.success()
# run the group
anet.run()
def test_no_traffic(self):
"""Test basic configuration of a network."""
if _debug: TestBinaryValue._debug("test_no_traffic")