1
0
mirror of https://github.com/JoelBender/bacpypes synced 2025-09-28 22:15:23 +08:00

upgrade sample to add writing a BDT #451

This commit is contained in:
Joel Bender 2022-03-24 23:50:29 -04:00
parent 7cd1a1a175
commit 00bd5fe5d9

View File

@ -13,9 +13,14 @@ from bacpypes.comm import bind, Client
from bacpypes.core import run, enable_sleeping
from bacpypes.pdu import Address
from bacpypes.bvll import ReadBroadcastDistributionTable, \
ReadBroadcastDistributionTableAck, ReadForeignDeviceTable, \
ReadForeignDeviceTableAck, Result
from bacpypes.bvll import (
ReadBroadcastDistributionTable,
ReadBroadcastDistributionTableAck,
ReadForeignDeviceTable,
ReadForeignDeviceTableAck,
WriteBroadcastDistributionTable,
Result,
)
from bacpypes.bvllservice import AnnexJCodec, UDPMultiplexer
# some debugging
@ -27,33 +32,63 @@ this_application = None
@bacpypes_debugging
class ReadBBMDConsoleClient(ConsoleCmd, Client):
class ReadWriteBBMDConsoleClient(ConsoleCmd, Client):
def do_readbdt(self, args):
"""readbdt <addr>"""
args = args.split()
if _debug:
ReadBBMDConsoleClient._debug("do_readbdt %r", args)
ReadWriteBBMDConsoleClient._debug("do_readbdt %r", args)
# build a request and send it downstream
read_bdt = ReadBroadcastDistributionTable(destination=Address(args[0]))
if _debug:
ReadWriteBBMDConsoleClient._debug(" - read_bdt: %r", read_bdt)
self.request(read_bdt)
def do_readfdt(self, args):
"""readfdt <addr>"""
args = args.split()
if _debug:
ReadBBMDConsoleClient._debug("do_readfdt %r", args)
ReadWriteBBMDConsoleClient._debug("do_readfdt %r", args)
# build a request and send it downstream
read_fdt = ReadForeignDeviceTable(destination=Address(args[0]))
if _debug:
ReadWriteBBMDConsoleClient._debug(" - read_fdt: %r", read_fdt)
self.request(read_fdt)
def do_writebdt(self, args):
"""writebdt <addr> <entry> ..."""
args = args.split()
if _debug:
ReadWriteBBMDConsoleClient._debug("do_writebdt %r", args)
# build a list of broadcast distribution table entries which just so
# happen to be BACpypes IPv4 addresses
bdt = []
for addr in args[1:]:
bdte = Address(addr)
bdt.append(bdte)
# build a request and send it downstream
write_bdt = WriteBroadcastDistributionTable(
destination=Address(args[0]), bdt=bdt
)
if _debug:
ReadWriteBBMDConsoleClient._debug(" - write_bdt: %r", write_bdt)
self.request(write_bdt)
def confirmation(self, pdu):
"""Filter for the acks and errors."""
if _debug:
ReadBBMDConsoleClient._debug("confirmation %r", pdu)
ReadWriteBBMDConsoleClient._debug("confirmation %r", pdu)
if isinstance(pdu, (ReadBroadcastDistributionTableAck, ReadForeignDeviceTableAck, Result)):
if isinstance(
pdu, (ReadBroadcastDistributionTableAck, ReadForeignDeviceTableAck, Result)
):
pdu.debug_contents()
@ -65,7 +100,7 @@ def main():
parser.add_argument(
"local_address",
help="IPv4 address",
)
)
args = parser.parse_args()
if _debug:
@ -77,7 +112,7 @@ def main():
_log.debug(" - local_address: %r", local_address)
# make a console
this_console = ReadBBMDConsoleClient()
this_console = ReadWriteBBMDConsoleClient()
if _debug:
_log.debug(" - this_console: %r", this_console)