1
0
mirror of https://github.com/JoelBender/bacpypes synced 2025-09-28 22:15:23 +08:00
This commit is contained in:
Joel Bender 2018-08-07 23:42:53 -04:00
parent a1ff426686
commit d73458953c
8 changed files with 1003 additions and 53 deletions

View File

@ -11,9 +11,11 @@ from .errors import ConfigurationError
from .comm import Client, Server, bind, \
ServiceAccessPoint, ApplicationServiceElement
from .task import FunctionTask
from .pdu import Address, LocalBroadcast, LocalStation, PDU, RemoteStation
from .npdu import IAmRouterToNetwork, NPDU, WhoIsRouterToNetwork, npdu_types
from .npdu import NPDU, npdu_types, IAmRouterToNetwork, WhoIsRouterToNetwork, \
WhatIsNetworkNumber, NetworkNumberIs
from .apdu import APDU as _APDU
# some debugging
@ -163,7 +165,7 @@ bacpypes_debugging(RouterInfoCache)
class NetworkAdapter(Client, DebugContents):
_debug_contents = ('adapterSAP-', 'adapterNet')
_debug_contents = ('adapterSAP-', 'adapterNet', 'adapterNetConfigured')
def __init__(self, sap, net, cid=None):
if _debug: NetworkAdapter._debug("__init__ %s %r cid=%r", sap, net, cid)
@ -171,6 +173,12 @@ class NetworkAdapter(Client, DebugContents):
self.adapterSAP = sap
self.adapterNet = net
# record if this was 0=learned, 1=configured, None=unknown
if net is None:
self.adapterNetConfigured = None
else:
self.adapterNetConfigured = 1
def confirmation(self, pdu):
"""Decode upstream PDUs and pass them up to the service access point."""
if _debug: NetworkAdapter._debug("confirmation %r (net=%r)", pdu, self.adapterNet)
@ -201,11 +209,11 @@ bacpypes_debugging(NetworkAdapter)
class NetworkServiceAccessPoint(ServiceAccessPoint, Server, DebugContents):
_debug_contents = ('adapters++', 'routers++', 'networks+'
, 'localAdapter-', 'localAddress'
_debug_contents = ('adapters++', 'pending_nets',
'local_adapter-', 'local_address',
)
def __init__(self, routerInfoCache=None, sap=None, sid=None):
def __init__(self, router_info_cache=None, sap=None, sid=None):
if _debug: NetworkServiceAccessPoint._debug("__init__ sap=%r sid=%r", sap, sid)
ServiceAccessPoint.__init__(self, sap)
Server.__init__(self, sid)
@ -214,7 +222,7 @@ class NetworkServiceAccessPoint(ServiceAccessPoint, Server, DebugContents):
self.adapters = {} # net -> NetworkAdapter
# use the provided cache or make a default one
self.router_info_cache = routerInfoCache or RouterInfoCache()
self.router_info_cache = router_info_cache or RouterInfoCache()
# map to a list of application layer packets waiting for a path
self.pending_nets = {}
@ -231,18 +239,13 @@ class NetworkServiceAccessPoint(ServiceAccessPoint, Server, DebugContents):
if net in self.adapters:
raise RuntimeError("already bound")
# when binding to an adapter and there is more than one, then they
# must all have network numbers and one of them will be the default
if (net is not None) and (None in self.adapters):
raise RuntimeError("default adapter bound")
# create an adapter object, add it to our map
adapter = NetworkAdapter(self, net)
self.adapters[net] = adapter
if _debug: NetworkServiceAccessPoint._debug(" - adapters[%r]: %r", net, adapter)
# if the address was given, make it the "local" one
if address:
if address and not self.localAddress:
self.local_adapter = adapter
self.local_address = address
@ -332,9 +335,17 @@ class NetworkServiceAccessPoint(ServiceAccessPoint, Server, DebugContents):
# if the network matches the local adapter it's local
if (dnet == adapter.adapterNet):
### log this, the application shouldn't be sending to a remote station address
### when it's a directly connected network
raise RuntimeError("addressing problem")
if (npdu.pduDestination.addrType == Address.remoteStationAddr):
if _debug: NetworkServiceAccessPoint._debug(" - mapping remote station to local station")
npdu.pduDestination = LocalStation(npdu.pduDestination.addrAddr)
elif (npdu.pduDestination.addrType == Address.remoteBroadcastAddr):
if _debug: NetworkServiceAccessPoint._debug(" - mapping remote broadcast to local broadcast")
npdu.pduDestination = LocalBroadcast()
else:
raise RuntimeError("addressing problem")
adapter.process_npdu(npdu)
return
# get it ready to send when the path is found
npdu.pduDestination = None
@ -673,6 +684,9 @@ class NetworkServiceElement(ApplicationServiceElement):
if _debug: NetworkServiceElement._debug("__init__ eid=%r", eid)
ApplicationServiceElement.__init__(self, eid)
# network number is timeout
self.network_number_is_task = None
def indication(self, adapter, npdu):
if _debug: NetworkServiceElement._debug("indication %r %r", adapter, npdu)
@ -889,5 +903,153 @@ class NetworkServiceElement(ApplicationServiceElement):
# reference the service access point
# sap = self.elementService
def what_is_network_number(self, adapter=None, address=None):
if _debug: NetworkServiceElement._debug("what_is_network_number %r", adapter, address)
# reference the service access point
sap = self.elementService
# a little error checking
if (adapter is None) and (address is not None):
raise RuntimeError("inconsistent parameters")
# build a request
winn = WhatIsNetworkNumber()
winn.pduDestination = LocalBroadcast()
# check for a specific adapter
if adapter:
if address is not None:
winn.pduDestination = address
adapter_list = [adapter]
else:
# send to adapters we don't know anything about
adapter_list = []
for xadapter in sap.adapters.values():
if xadapter.adapterNet is None:
adapter_list.append(xadapter)
if _debug: NetworkServiceElement._debug(" - adapter_list: %r", adapter_list)
# send it to the adapter(s)
for xadapter in adapter_list:
self.request(xadapter, winn)
def WhatIsNetworkNumber(self, adapter, npdu):
if _debug: NetworkServiceElement._debug("WhatIsNetworkNumber %r %r", adapter, npdu)
# reference the service access point
sap = self.elementService
# check to see if the local network is known
if adapter.adapterNet is None:
if _debug: NetworkServiceElement._debug(" - local network not known")
return
# if this is not a router, wait for somebody else to answer
if (npdu.pduDestination.addrType == Address.localBroadcastAddr):
if _debug: NetworkServiceElement._debug(" - local broadcast request")
if len(sap.adapters) == 1:
if _debug: NetworkServiceElement._debug(" - not a router")
if self.network_number_is_task:
if _debug: NetworkServiceElement._debug(" - already waiting")
else:
self.network_number_is_task = FunctionTask(self.network_number_is, adapter)
self.network_number_is_task.install_task(delta=10 * 1000)
return
# send out what we know
self.network_number_is(adapter)
def network_number_is(self, adapter=None):
if _debug: NetworkServiceElement._debug("network_number_is %r", adapter)
# reference the service access point
sap = self.elementService
# specific adapter, or all configured adapters
if adapter is not None:
adapter_list = [adapter]
else:
# send to adapters we are configured to know
adapter_list = []
for xadapter in sap.adapters.values():
if (xadapter.adapterNet is not None) and (xadapter.adapterNetConfigured == 1):
adapter_list.append(xadapter)
if _debug: NetworkServiceElement._debug(" - adapter_list: %r", adapter_list)
# loop through the adapter(s)
for xadapter in adapter_list:
if xadapter.adapterNet is None:
if _debug: NetworkServiceElement._debug(" - unknown network: %r", xadapter)
continue
# build a broadcast annoucement
nni = NetworkNumberIs(net=xadapter.adapterNet, flag=xadapter.adapterNetConfigured)
nni.pduDestination = LocalBroadcast()
if _debug: NetworkServiceElement._debug(" - nni: %r", nni)
# send it to the adapter
self.request(xadapter, nni)
def NetworkNumberIs(self, adapter, npdu):
if _debug: NetworkServiceElement._debug("NetworkNumberIs %r %r", adapter, npdu)
# reference the service access point
sap = self.elementService
# if this was not sent as a broadcast, ignore it
if (npdu.pduDestination.addrType != Address.localBroadcastAddr):
if _debug: NetworkServiceElement._debug(" - not broadcast")
return
# if we are waiting for someone else to say what this network number
# is, cancel that task
if self.network_number_is_task:
if _debug: NetworkServiceElement._debug(" - cancel waiting task")
self.network_number_is_task.suspend_task()
self.network_number_is_task = None
# check to see if the local network is known
if adapter.adapterNet is None:
if _debug: NetworkServiceElement._debug(" - local network not known: %r", list(sap.adapters.keys()))
# delete the reference from an unknown network
del sap.adapters[None]
adapter.adapterNet = npdu.nniNet
adapter.adapterNetConfigured = 0
# we now know what network this is
sap.adapters[adapter.adapterNet] = adapter
if _debug: NetworkServiceElement._debug(" - local network learned")
###TODO: s/None/net/g in routing tables
return
# check if this matches what we have
if adapter.adapterNet == npdu.nniNet:
if _debug: NetworkServiceElement._debug(" - matches what we have")
return
# check it this matches what we know, if we know it
if adapter.adapterNetConfigured == 1:
if _debug: NetworkServiceElement._debug(" - doesn't match what we know")
return
if _debug: NetworkServiceElement._debug(" - learning something new")
# delete the reference from the old (learned) network
del sap.adapters[adapter.adapterNet]
adapter.adapterNet = npdu.nniNet
adapter.adapterNetConfigured = npdu.nniFlag
# we now know what network this is
sap.adapters[adapter.adapterNet] = adapter
###TODO: s/old/new/g in routing tables
bacpypes_debugging(NetworkServiceElement)

View File

@ -517,7 +517,7 @@ class BIPSimpleApplication(ApplicationIOController, WhoIsIAmServices, ReadWriteP
bind(self.bip, self.annexj, self.mux.annexJ)
# bind the BIP stack to the network, no network number
self.nsap.bind(self.bip)
self.nsap.bind(self.bip, address=self.localAddress)
def close_socket(self):
if _debug: BIPSimpleApplication._debug("close_socket")
@ -578,7 +578,7 @@ class BIPForeignApplication(ApplicationIOController, WhoIsIAmServices, ReadWrite
bind(self.bip, self.annexj, self.mux.annexJ)
# bind the NSAP to the stack, no network number
self.nsap.bind(self.bip)
self.nsap.bind(self.bip, address=self.localAddress)
def close_socket(self):
if _debug: BIPForeignApplication._debug("close_socket")
@ -619,4 +619,5 @@ class BIPNetworkApplication(NetworkServiceElement):
bind(self.bip, self.annexj, self.mux.annexJ)
# bind the NSAP to the stack, no network number
self.nsap.bind(self.bip)
self.nsap.bind(self.bip, address=self.localAddress)

View File

@ -11,9 +11,11 @@ from .errors import ConfigurationError
from .comm import Client, Server, bind, \
ServiceAccessPoint, ApplicationServiceElement
from .task import FunctionTask
from .pdu import Address, LocalBroadcast, LocalStation, PDU, RemoteStation
from .npdu import IAmRouterToNetwork, NPDU, WhoIsRouterToNetwork, npdu_types
from .npdu import NPDU, npdu_types, IAmRouterToNetwork, WhoIsRouterToNetwork, \
WhatIsNetworkNumber, NetworkNumberIs
from .apdu import APDU as _APDU
# some debugging
@ -163,7 +165,7 @@ class RouterInfoCache:
@bacpypes_debugging
class NetworkAdapter(Client, DebugContents):
_debug_contents = ('adapterSAP-', 'adapterNet')
_debug_contents = ('adapterSAP-', 'adapterNet', 'adapterNetConfigured')
def __init__(self, sap, net, cid=None):
if _debug: NetworkAdapter._debug("__init__ %s %r cid=%r", sap, net, cid)
@ -171,6 +173,12 @@ class NetworkAdapter(Client, DebugContents):
self.adapterSAP = sap
self.adapterNet = net
# record if this was 0=learned, 1=configured, None=unknown
if net is None:
self.adapterNetConfigured = None
else:
self.adapterNetConfigured = 1
def confirmation(self, pdu):
"""Decode upstream PDUs and pass them up to the service access point."""
if _debug: NetworkAdapter._debug("confirmation %r (net=%r)", pdu, self.adapterNet)
@ -200,11 +208,11 @@ class NetworkAdapter(Client, DebugContents):
@bacpypes_debugging
class NetworkServiceAccessPoint(ServiceAccessPoint, Server, DebugContents):
_debug_contents = ('adapters++', 'routers++', 'networks+'
, 'localAdapter-', 'localAddress'
_debug_contents = ('adapters++', 'pending_nets',
'local_adapter-', 'local_address',
)
def __init__(self, routerInfoCache=None, sap=None, sid=None):
def __init__(self, router_info_cache=None, sap=None, sid=None):
if _debug: NetworkServiceAccessPoint._debug("__init__ sap=%r sid=%r", sap, sid)
ServiceAccessPoint.__init__(self, sap)
Server.__init__(self, sid)
@ -213,7 +221,7 @@ class NetworkServiceAccessPoint(ServiceAccessPoint, Server, DebugContents):
self.adapters = {} # net -> NetworkAdapter
# use the provided cache or make a default one
self.router_info_cache = routerInfoCache or RouterInfoCache()
self.router_info_cache = router_info_cache or RouterInfoCache()
# map to a list of application layer packets waiting for a path
self.pending_nets = {}
@ -230,18 +238,13 @@ class NetworkServiceAccessPoint(ServiceAccessPoint, Server, DebugContents):
if net in self.adapters:
raise RuntimeError("already bound")
# when binding to an adapter and there is more than one, then they
# must all have network numbers and one of them will be the default
if (net is not None) and (None in self.adapters):
raise RuntimeError("default adapter bound")
# create an adapter object, add it to our map
adapter = NetworkAdapter(self, net)
self.adapters[net] = adapter
if _debug: NetworkServiceAccessPoint._debug(" - adapters[%r]: %r", net, adapter)
# if the address was given, make it the "local" one
if address:
if address and not self.localAddress:
self.local_adapter = adapter
self.local_address = address
@ -331,9 +334,17 @@ class NetworkServiceAccessPoint(ServiceAccessPoint, Server, DebugContents):
# if the network matches the local adapter it's local
if (dnet == adapter.adapterNet):
### log this, the application shouldn't be sending to a remote station address
### when it's a directly connected network
raise RuntimeError("addressing problem")
if (npdu.pduDestination.addrType == Address.remoteStationAddr):
if _debug: NetworkServiceAccessPoint._debug(" - mapping remote station to local station")
npdu.pduDestination = LocalStation(npdu.pduDestination.addrAddr)
elif (npdu.pduDestination.addrType == Address.remoteBroadcastAddr):
if _debug: NetworkServiceAccessPoint._debug(" - mapping remote broadcast to local broadcast")
npdu.pduDestination = LocalBroadcast()
else:
raise RuntimeError("addressing problem")
adapter.process_npdu(npdu)
return
# get it ready to send when the path is found
npdu.pduDestination = None
@ -671,6 +682,9 @@ class NetworkServiceElement(ApplicationServiceElement):
if _debug: NetworkServiceElement._debug("__init__ eid=%r", eid)
ApplicationServiceElement.__init__(self, eid)
# network number is timeout
self.network_number_is_task = None
def indication(self, adapter, npdu):
if _debug: NetworkServiceElement._debug("indication %r %r", adapter, npdu)
@ -887,3 +901,151 @@ class NetworkServiceElement(ApplicationServiceElement):
# reference the service access point
# sap = self.elementService
def what_is_network_number(self, adapter=None, address=None):
if _debug: NetworkServiceElement._debug("what_is_network_number %r", adapter, address)
# reference the service access point
sap = self.elementService
# a little error checking
if (adapter is None) and (address is not None):
raise RuntimeError("inconsistent parameters")
# build a request
winn = WhatIsNetworkNumber()
winn.pduDestination = LocalBroadcast()
# check for a specific adapter
if adapter:
if address is not None:
winn.pduDestination = address
adapter_list = [adapter]
else:
# send to adapters we don't know anything about
adapter_list = []
for xadapter in sap.adapters.values():
if xadapter.adapterNet is None:
adapter_list.append(xadapter)
if _debug: NetworkServiceElement._debug(" - adapter_list: %r", adapter_list)
# send it to the adapter(s)
for xadapter in adapter_list:
self.request(xadapter, winn)
def WhatIsNetworkNumber(self, adapter, npdu):
if _debug: NetworkServiceElement._debug("WhatIsNetworkNumber %r %r", adapter, npdu)
# reference the service access point
sap = self.elementService
# check to see if the local network is known
if adapter.adapterNet is None:
if _debug: NetworkServiceElement._debug(" - local network not known")
return
# if this is not a router, wait for somebody else to answer
if (npdu.pduDestination.addrType == Address.localBroadcastAddr):
if _debug: NetworkServiceElement._debug(" - local broadcast request")
if len(sap.adapters) == 1:
if _debug: NetworkServiceElement._debug(" - not a router")
if self.network_number_is_task:
if _debug: NetworkServiceElement._debug(" - already waiting")
else:
self.network_number_is_task = FunctionTask(self.network_number_is, adapter)
self.network_number_is_task.install_task(delta=10 * 1000)
return
# send out what we know
self.network_number_is(adapter)
def network_number_is(self, adapter=None):
if _debug: NetworkServiceElement._debug("network_number_is %r", adapter)
# reference the service access point
sap = self.elementService
# specific adapter, or all configured adapters
if adapter is not None:
adapter_list = [adapter]
else:
# send to adapters we are configured to know
adapter_list = []
for xadapter in sap.adapters.values():
if (xadapter.adapterNet is not None) and (xadapter.adapterNetConfigured == 1):
adapter_list.append(xadapter)
if _debug: NetworkServiceElement._debug(" - adapter_list: %r", adapter_list)
# loop through the adapter(s)
for xadapter in adapter_list:
if xadapter.adapterNet is None:
if _debug: NetworkServiceElement._debug(" - unknown network: %r", xadapter)
continue
# build a broadcast annoucement
nni = NetworkNumberIs(net=xadapter.adapterNet, flag=xadapter.adapterNetConfigured)
nni.pduDestination = LocalBroadcast()
if _debug: NetworkServiceElement._debug(" - nni: %r", nni)
# send it to the adapter
self.request(xadapter, nni)
def NetworkNumberIs(self, adapter, npdu):
if _debug: NetworkServiceElement._debug("NetworkNumberIs %r %r", adapter, npdu)
# reference the service access point
sap = self.elementService
# if this was not sent as a broadcast, ignore it
if (npdu.pduDestination.addrType != Address.localBroadcastAddr):
if _debug: NetworkServiceElement._debug(" - not broadcast")
return
# if we are waiting for someone else to say what this network number
# is, cancel that task
if self.network_number_is_task:
if _debug: NetworkServiceElement._debug(" - cancel waiting task")
self.network_number_is_task.suspend_task()
self.network_number_is_task = None
# check to see if the local network is known
if adapter.adapterNet is None:
if _debug: NetworkServiceElement._debug(" - local network not known: %r", list(sap.adapters.keys()))
# delete the reference from an unknown network
del sap.adapters[None]
adapter.adapterNet = npdu.nniNet
adapter.adapterNetConfigured = 0
# we now know what network this is
sap.adapters[adapter.adapterNet] = adapter
if _debug: NetworkServiceElement._debug(" - local network learned")
###TODO: s/None/net/g in routing tables
return
# check if this matches what we have
if adapter.adapterNet == npdu.nniNet:
if _debug: NetworkServiceElement._debug(" - matches what we have")
return
# check it this matches what we know, if we know it
if adapter.adapterNetConfigured == 1:
if _debug: NetworkServiceElement._debug(" - doesn't match what we know")
return
if _debug: NetworkServiceElement._debug(" - learning something new")
# delete the reference from the old (learned) network
del sap.adapters[adapter.adapterNet]
adapter.adapterNet = npdu.nniNet
adapter.adapterNetConfigured = npdu.nniFlag
# we now know what network this is
sap.adapters[adapter.adapterNet] = adapter
###TODO: s/old/new/g in routing tables

View File

@ -11,9 +11,11 @@ from .errors import ConfigurationError
from .comm import Client, Server, bind, \
ServiceAccessPoint, ApplicationServiceElement
from .task import FunctionTask
from .pdu import Address, LocalBroadcast, LocalStation, PDU, RemoteStation
from .npdu import IAmRouterToNetwork, NPDU, WhoIsRouterToNetwork, npdu_types
from .npdu import NPDU, npdu_types, IAmRouterToNetwork, WhoIsRouterToNetwork, \
WhatIsNetworkNumber, NetworkNumberIs
from .apdu import APDU as _APDU
# some debugging
@ -163,7 +165,7 @@ class RouterInfoCache:
@bacpypes_debugging
class NetworkAdapter(Client, DebugContents):
_debug_contents = ('adapterSAP-', 'adapterNet')
_debug_contents = ('adapterSAP-', 'adapterNet', 'adapterNetConfigured')
def __init__(self, sap, net, cid=None):
if _debug: NetworkAdapter._debug("__init__ %s %r cid=%r", sap, net, cid)
@ -171,6 +173,12 @@ class NetworkAdapter(Client, DebugContents):
self.adapterSAP = sap
self.adapterNet = net
# record if this was 0=learned, 1=configured, None=unknown
if net is None:
self.adapterNetConfigured = None
else:
self.adapterNetConfigured = 1
def confirmation(self, pdu):
"""Decode upstream PDUs and pass them up to the service access point."""
if _debug: NetworkAdapter._debug("confirmation %r (net=%r)", pdu, self.adapterNet)
@ -200,11 +208,11 @@ class NetworkAdapter(Client, DebugContents):
@bacpypes_debugging
class NetworkServiceAccessPoint(ServiceAccessPoint, Server, DebugContents):
_debug_contents = ('adapters++', 'routers++', 'networks+'
, 'localAdapter-', 'localAddress'
_debug_contents = ('adapters++', 'pending_nets',
'local_adapter-', 'local_address',
)
def __init__(self, routerInfoCache=None, sap=None, sid=None):
def __init__(self, router_info_cache=None, sap=None, sid=None):
if _debug: NetworkServiceAccessPoint._debug("__init__ sap=%r sid=%r", sap, sid)
ServiceAccessPoint.__init__(self, sap)
Server.__init__(self, sid)
@ -213,7 +221,7 @@ class NetworkServiceAccessPoint(ServiceAccessPoint, Server, DebugContents):
self.adapters = {} # net -> NetworkAdapter
# use the provided cache or make a default one
self.router_info_cache = routerInfoCache or RouterInfoCache()
self.router_info_cache = router_info_cache or RouterInfoCache()
# map to a list of application layer packets waiting for a path
self.pending_nets = {}
@ -230,18 +238,13 @@ class NetworkServiceAccessPoint(ServiceAccessPoint, Server, DebugContents):
if net in self.adapters:
raise RuntimeError("already bound")
# when binding to an adapter and there is more than one, then they
# must all have network numbers and one of them will be the default
if (net is not None) and (None in self.adapters):
raise RuntimeError("default adapter bound")
# create an adapter object, add it to our map
adapter = NetworkAdapter(self, net)
self.adapters[net] = adapter
if _debug: NetworkServiceAccessPoint._debug(" - adapters[%r]: %r", net, adapter)
# if the address was given, make it the "local" one
if address:
if address and not self.localAddress:
self.local_adapter = adapter
self.local_address = address
@ -331,9 +334,17 @@ class NetworkServiceAccessPoint(ServiceAccessPoint, Server, DebugContents):
# if the network matches the local adapter it's local
if (dnet == adapter.adapterNet):
### log this, the application shouldn't be sending to a remote station address
### when it's a directly connected network
raise RuntimeError("addressing problem")
if (npdu.pduDestination.addrType == Address.remoteStationAddr):
if _debug: NetworkServiceAccessPoint._debug(" - mapping remote station to local station")
npdu.pduDestination = LocalStation(npdu.pduDestination.addrAddr)
elif (npdu.pduDestination.addrType == Address.remoteBroadcastAddr):
if _debug: NetworkServiceAccessPoint._debug(" - mapping remote broadcast to local broadcast")
npdu.pduDestination = LocalBroadcast()
else:
raise RuntimeError("addressing problem")
adapter.process_npdu(npdu)
return
# get it ready to send when the path is found
npdu.pduDestination = None
@ -671,6 +682,9 @@ class NetworkServiceElement(ApplicationServiceElement):
if _debug: NetworkServiceElement._debug("__init__ eid=%r", eid)
ApplicationServiceElement.__init__(self, eid)
# network number is timeout
self.network_number_is_task = None
def indication(self, adapter, npdu):
if _debug: NetworkServiceElement._debug("indication %r %r", adapter, npdu)
@ -887,3 +901,151 @@ class NetworkServiceElement(ApplicationServiceElement):
# reference the service access point
# sap = self.elementService
def what_is_network_number(self, adapter=None, address=None):
if _debug: NetworkServiceElement._debug("what_is_network_number %r", adapter, address)
# reference the service access point
sap = self.elementService
# a little error checking
if (adapter is None) and (address is not None):
raise RuntimeError("inconsistent parameters")
# build a request
winn = WhatIsNetworkNumber()
winn.pduDestination = LocalBroadcast()
# check for a specific adapter
if adapter:
if address is not None:
winn.pduDestination = address
adapter_list = [adapter]
else:
# send to adapters we don't know anything about
adapter_list = []
for xadapter in sap.adapters.values():
if xadapter.adapterNet is None:
adapter_list.append(xadapter)
if _debug: NetworkServiceElement._debug(" - adapter_list: %r", adapter_list)
# send it to the adapter(s)
for xadapter in adapter_list:
self.request(xadapter, winn)
def WhatIsNetworkNumber(self, adapter, npdu):
if _debug: NetworkServiceElement._debug("WhatIsNetworkNumber %r %r", adapter, npdu)
# reference the service access point
sap = self.elementService
# check to see if the local network is known
if adapter.adapterNet is None:
if _debug: NetworkServiceElement._debug(" - local network not known")
return
# if this is not a router, wait for somebody else to answer
if (npdu.pduDestination.addrType == Address.localBroadcastAddr):
if _debug: NetworkServiceElement._debug(" - local broadcast request")
if len(sap.adapters) == 1:
if _debug: NetworkServiceElement._debug(" - not a router")
if self.network_number_is_task:
if _debug: NetworkServiceElement._debug(" - already waiting")
else:
self.network_number_is_task = FunctionTask(self.network_number_is, adapter)
self.network_number_is_task.install_task(delta=10 * 1000)
return
# send out what we know
self.network_number_is(adapter)
def network_number_is(self, adapter=None):
if _debug: NetworkServiceElement._debug("network_number_is %r", adapter)
# reference the service access point
sap = self.elementService
# specific adapter, or all configured adapters
if adapter is not None:
adapter_list = [adapter]
else:
# send to adapters we are configured to know
adapter_list = []
for xadapter in sap.adapters.values():
if (xadapter.adapterNet is not None) and (xadapter.adapterNetConfigured == 1):
adapter_list.append(xadapter)
if _debug: NetworkServiceElement._debug(" - adapter_list: %r", adapter_list)
# loop through the adapter(s)
for xadapter in adapter_list:
if xadapter.adapterNet is None:
if _debug: NetworkServiceElement._debug(" - unknown network: %r", xadapter)
continue
# build a broadcast annoucement
nni = NetworkNumberIs(net=xadapter.adapterNet, flag=xadapter.adapterNetConfigured)
nni.pduDestination = LocalBroadcast()
if _debug: NetworkServiceElement._debug(" - nni: %r", nni)
# send it to the adapter
self.request(xadapter, nni)
def NetworkNumberIs(self, adapter, npdu):
if _debug: NetworkServiceElement._debug("NetworkNumberIs %r %r", adapter, npdu)
# reference the service access point
sap = self.elementService
# if this was not sent as a broadcast, ignore it
if (npdu.pduDestination.addrType != Address.localBroadcastAddr):
if _debug: NetworkServiceElement._debug(" - not broadcast")
return
# if we are waiting for someone else to say what this network number
# is, cancel that task
if self.network_number_is_task:
if _debug: NetworkServiceElement._debug(" - cancel waiting task")
self.network_number_is_task.suspend_task()
self.network_number_is_task = None
# check to see if the local network is known
if adapter.adapterNet is None:
if _debug: NetworkServiceElement._debug(" - local network not known: %r", list(sap.adapters.keys()))
# delete the reference from an unknown network
del sap.adapters[None]
adapter.adapterNet = npdu.nniNet
adapter.adapterNetConfigured = 0
# we now know what network this is
sap.adapters[adapter.adapterNet] = adapter
if _debug: NetworkServiceElement._debug(" - local network learned")
###TODO: s/None/net/g in routing tables
return
# check if this matches what we have
if adapter.adapterNet == npdu.nniNet:
if _debug: NetworkServiceElement._debug(" - matches what we have")
return
# check it this matches what we know, if we know it
if adapter.adapterNetConfigured == 1:
if _debug: NetworkServiceElement._debug(" - doesn't match what we know")
return
if _debug: NetworkServiceElement._debug(" - learning something new")
# delete the reference from the old (learned) network
del sap.adapters[adapter.adapterNet]
adapter.adapterNet = npdu.nniNet
adapter.adapterNetConfigured = npdu.nniFlag
# we now know what network this is
sap.adapters[adapter.adapterNet] = adapter
###TODO: s/old/new/g in routing tables

View File

@ -12,7 +12,10 @@ from bacpypes.consolecmd import ConsoleCmd
from bacpypes.core import run, enable_sleeping
from bacpypes.pdu import Address
from bacpypes.npdu import InitializeRoutingTable, WhoIsRouterToNetwork, IAmRouterToNetwork
from bacpypes.npdu import (
WhoIsRouterToNetwork, IAmRouterToNetwork,
InitializeRoutingTable, InitializeRoutingTableAck,
)
from bacpypes.app import BIPNetworkApplication
@ -45,6 +48,11 @@ class WhoIsRouterApplication(BIPNetworkApplication):
if isinstance(npdu, IAmRouterToNetwork):
print("{} -> {}, {}".format(npdu.pduSource, npdu.pduDestination, npdu.iartnNetworkList))
elif isinstance(npdu, InitializeRoutingTableAck):
print("{} routing table".format(npdu.pduSource))
for rte in npdu.irtaTable:
print(" {} {} {}".format(rte.rtDNET, rte.rtPortID, rte.rtPortInfo))
BIPNetworkApplication.indication(self, adapter, npdu)
def response(self, adapter, npdu):
@ -76,7 +84,7 @@ class WhoIsRouterConsoleCmd(ConsoleCmd):
return
# give it to the application
this_application.request(this_application.nsap.adapters[None], request)
this_application.request(this_application.nsap.local_adapter, request)
def do_wirtn(self, args):
"""wirtn <addr> [ <net> ]"""
@ -94,7 +102,7 @@ class WhoIsRouterConsoleCmd(ConsoleCmd):
return
# give it to the application
this_application.request(this_application.nsap.adapters[None], request)
this_application.request(this_application.nsap.local_adapter, request)
#
# __main__

View File

@ -8,4 +8,5 @@ from . import test_net_1
from . import test_net_2
from . import test_net_3
from . import test_net_4
from . import test_net_6

View File

@ -109,11 +109,11 @@ class NetworkLayerStateMachine(ClientStateMachine):
# create a network layer encoder/decoder
self.codec = NPDUCodec()
if _debug: SnifferStateMachine._debug(" - codec: %r", self.codec)
if _debug: NetworkLayerStateMachine._debug(" - codec: %r", self.codec)
# create a node, added to the network
self.node = Node(self.address, vlan)
if _debug: SnifferStateMachine._debug(" - node: %r", self.node)
if _debug: NetworkLayerStateMachine._debug(" - node: %r", self.node)
# bind this to the node
bind(self, self.codec, self.node)

View File

@ -0,0 +1,454 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Test Network Number Discovery
-----------------------------
"""
import unittest
from bacpypes.debugging import bacpypes_debugging, ModuleLogger, btox, xtob
from bacpypes.core import deferred
from bacpypes.comm import Client, Server, bind
from bacpypes.pdu import PDU, Address, LocalBroadcast
from bacpypes.vlan import Network
from bacpypes.npdu import (
npdu_types, NPDU,
WhoIsRouterToNetwork, IAmRouterToNetwork, ICouldBeRouterToNetwork,
RejectMessageToNetwork, RouterBusyToNetwork, RouterAvailableToNetwork,
RoutingTableEntry, InitializeRoutingTable, InitializeRoutingTableAck,
EstablishConnectionToNetwork, DisconnectConnectionToNetwork,
WhatIsNetworkNumber, NetworkNumberIs,
)
from ..state_machine import match_pdu, StateMachineGroup, TrafficLog
from ..time_machine import reset_time_machine, run_time_machine
from .helpers import SnifferStateMachine, NetworkLayerStateMachine, RouterNode
# some debugging
_debug = 0
_log = ModuleLogger(globals())
#
# TNetwork1
#
@bacpypes_debugging
class TNetwork1(StateMachineGroup):
"""
Network 1 has sniffer1, the TD is on network 2 with sniffer2, network 3 has
sniffer3. Network 1 and 2 are connected with a router IUT1, network 2 and 3
are connected by router IUT2.
"""
def __init__(self):
if _debug: TNetwork1._debug("__init__")
StateMachineGroup.__init__(self)
# reset the time machine
reset_time_machine()
if _debug: TNetwork1._debug(" - time machine reset")
# create a traffic log
self.traffic_log = TrafficLog()
# implementation under test
self.iut1 = RouterNode() # router from vlan1 to vlan2
self.iut2 = RouterNode() # router from vlan2 to vlan3
# make a little LAN
self.vlan1 = Network(name="vlan1", broadcast_address=LocalBroadcast())
self.vlan1.traffic_log = self.traffic_log
# sniffer node
self.sniffer1 = NetworkLayerStateMachine("1", self.vlan1)
self.append(self.sniffer1)
# connect vlan1 to iut1
self.iut1.add_network("2", self.vlan1, 1)
# make another little LAN
self.vlan2 = Network(name="vlan2", broadcast_address=LocalBroadcast())
self.vlan2.traffic_log = self.traffic_log
# test device
self.td = NetworkLayerStateMachine("3", self.vlan2)
self.append(self.td)
# sniffer node
self.sniffer2 = NetworkLayerStateMachine("4", self.vlan2)
self.append(self.sniffer2)
# connect vlan2 to both routers
self.iut1.add_network("5", self.vlan2, 2)
self.iut2.add_network("6", self.vlan2, 2)
# make another little LAN
self.vlan3 = Network(name="vlan3", broadcast_address=LocalBroadcast())
self.vlan3.traffic_log = self.traffic_log
# sniffer node
self.sniffer3 = NetworkLayerStateMachine("7", self.vlan3)
self.append(self.sniffer3)
# connect vlan3 to the second router
self.iut2.add_network("8", self.vlan3, 3)
if _debug:
TNetwork1._debug(" - iut1.nsap: %r", self.iut1.nsap)
TNetwork1._debug(" - iut2.nsap: %r", self.iut2.nsap)
def run(self, time_limit=60.0):
if _debug: TNetwork1._debug("run %r", time_limit)
# run the group
super(TNetwork1, self).run()
# run it for some time
run_time_machine(time_limit)
if _debug:
TNetwork1._debug(" - time machine finished")
# list the state machines which shows their current state
for state_machine in self.state_machines:
TNetwork1._debug(" - machine: %r", state_machine)
# each one has a list of sent/received pdus
for direction, pdu in state_machine.transaction_log:
TNetwork1._debug(" %s %s %s",
direction,
pdu.pduSource or pdu.pduDestination,
pdu.__class__.__name__,
)
# traffic log has what was processed on each vlan
self.traffic_log.dump(TNetwork1._debug)
# check for success
all_success, some_failed = super(TNetwork1, self).check_for_success()
assert all_success
#
# TNetwork2
#
@bacpypes_debugging
class TNetwork2(StateMachineGroup):
"""
This test network is almost exactly the same as TNetwork1 with the
exception that IUT1 is connected to network 2 but doesn't know the
network number, it learns it from IUT2.
"""
def __init__(self):
if _debug: TNetwork2._debug("__init__")
StateMachineGroup.__init__(self)
# reset the time machine
reset_time_machine()
if _debug: TNetwork2._debug(" - time machine reset")
# create a traffic log
self.traffic_log = TrafficLog()
# implementation under test
self.iut1 = RouterNode() # router from vlan1 to vlan2
self.iut2 = RouterNode() # router from vlan2 to vlan3
# make a little LAN
self.vlan1 = Network(name="vlan1", broadcast_address=LocalBroadcast())
self.vlan1.traffic_log = self.traffic_log
# sniffer node
self.sniffer1 = NetworkLayerStateMachine("1", self.vlan1)
self.append(self.sniffer1)
# connect vlan1 to iut1
self.iut1.add_network("2", self.vlan1, 1)
# make another little LAN
self.vlan2 = Network(name="vlan2", broadcast_address=LocalBroadcast())
self.vlan2.traffic_log = self.traffic_log
# test device
self.td = NetworkLayerStateMachine("3", self.vlan2)
self.append(self.td)
# sniffer node
self.sniffer2 = NetworkLayerStateMachine("4", self.vlan2)
self.append(self.sniffer2)
# connect vlan2 to both routers
self.iut1.add_network("5", self.vlan2, None) ### NOT CONFIGURED
self.iut2.add_network("6", self.vlan2, 2)
# make another little LAN
self.vlan3 = Network(name="vlan3", broadcast_address=LocalBroadcast())
self.vlan3.traffic_log = self.traffic_log
# sniffer node
self.sniffer3 = NetworkLayerStateMachine("7", self.vlan3)
self.append(self.sniffer3)
# connect vlan3 to the second router
self.iut2.add_network("8", self.vlan3, 3)
if _debug:
TNetwork2._debug(" - iut1.nsap: %r", self.iut1.nsap)
TNetwork2._debug(" - iut2.nsap: %r", self.iut2.nsap)
def run(self, time_limit=60.0):
if _debug: TNetwork2._debug("run %r", time_limit)
# run the group
super(TNetwork2, self).run()
# run it for some time
run_time_machine(time_limit)
if _debug:
TNetwork2._debug(" - time machine finished")
# list the state machines which shows their current state
for state_machine in self.state_machines:
TNetwork2._debug(" - machine: %r", state_machine)
# each one has a list of sent/received pdus
for direction, pdu in state_machine.transaction_log:
TNetwork2._debug(" %s %s %s",
direction,
pdu.pduSource or pdu.pduDestination,
pdu.__class__.__name__,
)
# traffic log has what was processed on each vlan
self.traffic_log.dump(TNetwork2._debug)
# check for success
all_success, some_failed = super(TNetwork2, self).check_for_success()
assert all_success
@bacpypes_debugging
class TestSimple(unittest.TestCase):
def test_idle(self):
"""Test an idle network, nothing happens is success."""
if _debug: TestSimple._debug("test_idle")
# create a network
tnet = TNetwork1()
# all start states are successful
tnet.td.start_state.success()
tnet.sniffer1.start_state.success()
tnet.sniffer2.start_state.success()
tnet.sniffer3.start_state.success()
# run the group
tnet.run()
@bacpypes_debugging
class TestNetworkNumberIs(unittest.TestCase):
def test_1(self):
"""Test broadcasts from a router."""
if _debug: TestNetworkNumberIs._debug("test_1")
# create a network
tnet = TNetwork1()
# tell the IUT to send what it knows
deferred(tnet.iut1.nse.network_number_is)
# TD sees same traffic as sniffer2
tnet.td.start_state.success()
# network 1 sees router from 1 to 2
tnet.sniffer1.start_state.doc("1-1-0") \
.receive(NetworkNumberIs,
nniNet=1,
nniFlag=1,
).doc("1-1-1") \
.success()
# network 2 sees router from 2 to 1
tnet.sniffer2.start_state.doc("1-2-0") \
.receive(NetworkNumberIs,
nniNet=2,
nniFlag=1,
).doc("1-2-1") \
.success()
# network 3 sees nothing, message isn't routed
tnet.sniffer3.start_state.doc("1-3-0") \
.timeout(10).doc("1-3-1") \
.success()
# run the group
tnet.run()
def test_2(self):
"""Test router response to queries."""
if _debug: TestNetworkNumberIs._debug("test_2")
# create a network
tnet = TNetwork1()
# test device broadcasts a request for the network number
s211 = tnet.td.start_state.doc("2-1-0") \
.send(WhatIsNetworkNumber(
destination=LocalBroadcast(),
)).doc("2-1-1") \
# test device sees both responses
both = s211 \
.receive(NetworkNumberIs,
pduSource=Address(5),
nniNet=2,
nniFlag=1,
).doc("2-1-2-a") \
.receive(NetworkNumberIs,
pduSource=Address(6),
nniNet=2,
nniFlag=1,
).doc("2-1-2-b") \
# allow the two packets in either order
s211.receive(NetworkNumberIs,
pduSource=Address(6),
nniNet=2,
nniFlag=1,
).doc("2-1-2-c") \
.receive(NetworkNumberIs,
pduSource=Address(5),
nniNet=2,
nniFlag=1,
next_state=both,
).doc("2-1-2-d") \
# fail if anything is received after both packets
both.timeout(3).doc("2-1-3") \
.success()
# short circuit sniffers
tnet.sniffer1.start_state.success()
tnet.sniffer2.start_state.success()
tnet.sniffer3.start_state.success()
# run the group
tnet.run()
def test_3(self):
"""Test broadcasts from a router."""
if _debug: TestNetworkNumberIs._debug("test_3")
# create a network
tnet = TNetwork2()
# tell the IUT to send what it knows
deferred(tnet.iut1.nse.network_number_is)
# TD sees same traffic as sniffer2
tnet.td.start_state.success()
# network 1 sees router from 1 to 2
tnet.sniffer1.start_state.doc("3-1-0") \
.receive(NetworkNumberIs,
nniNet=1,
nniFlag=1,
).doc("3-1-1") \
.success()
# network 2 sees nothing
tnet.sniffer2.start_state.doc("3-2-0") \
.timeout(10).doc("3-2-1") \
.success()
# network 3 sees nothing
tnet.sniffer3.start_state.doc("3-3-0") \
.timeout(10).doc("3-3-1") \
.success()
# run the group
tnet.run()
def test_4(self):
"""Test router response to queries."""
if _debug: TestNetworkNumberIs._debug("test_4")
# create a network
tnet = TNetwork2()
def iut1_knows_net(net):
assert net in tnet.iut1.nsap.adapters
# test device sends request, receives one response
tnet.td.start_state.doc("4-1-0") \
.call(iut1_knows_net, None).doc("4-1-1") \
.send(WhatIsNetworkNumber(
destination=LocalBroadcast(),
)).doc("4-1-2") \
.receive(NetworkNumberIs,
pduSource=Address(6),
nniNet=2,
nniFlag=1,
).doc("4-1-3") \
.timeout(3).doc("4-1-4") \
.call(iut1_knows_net, 2).doc("4-1-5") \
.success()
# short circuit sniffers
tnet.sniffer1.start_state.success()
tnet.sniffer2.start_state.success()
tnet.sniffer3.start_state.success()
# run the group
tnet.run()
def test_5(self):
"""Test router response to queries."""
if _debug: TestNetworkNumberIs._debug("test_5")
# create a network
tnet = TNetwork2()
# tell the IUT2 to send what it knows
deferred(tnet.iut2.nse.network_number_is)
# test device receives announcement from IUT2, requests network
# number from IUT1, receives announcement from IUT1 with the
# network learned
tnet.td.start_state.doc("5-1-0") \
.receive(NetworkNumberIs,
pduSource=Address(6),
nniNet=2,
nniFlag=1,
).doc("5-1-1") \
.send(WhatIsNetworkNumber(
destination=Address(5),
)).doc("5-1-2") \
.receive(NetworkNumberIs,
pduSource=Address(5),
nniNet=2,
nniFlag=0,
).doc("5-1-3") \
.timeout(3).doc("5-1-4") \
.success()
# short circuit sniffers
tnet.sniffer1.start_state.success()
tnet.sniffer2.start_state.success()
tnet.sniffer3.start_state.success()
# run the group
tnet.run()