1
0
mirror of https://github.com/JoelBender/bacpypes synced 2025-10-27 00:57:47 +08:00

sync the python versions

This commit is contained in:
Joel Bender
2017-08-31 23:59:53 -04:00
parent 4e052ada85
commit aee547d050
2 changed files with 153 additions and 6 deletions

View File

@@ -5,6 +5,8 @@ Virtual Local Area Network
"""
import random
import socket
import struct
from copy import deepcopy
from .errors import ConfigurationError
@@ -12,7 +14,7 @@ from .debugging import ModuleLogger, bacpypes_debugging
from .core import deferred
from .pdu import Address
from .comm import Server
from .comm import Client, Server, bind
# some debugging
_debug = 0
@@ -60,10 +62,12 @@ class Network:
if pdu.pduDestination == self.broadcast_address:
for n in self.nodes:
if (pdu.pduSource != n.address):
if _debug: Network._debug(" - match: %r", n)
n.response(deepcopy(pdu))
else:
for n in self.nodes:
if n.promiscuous or (pdu.pduDestination == n.address):
if _debug: Network._debug(" - match: %r", n)
n.response(deepcopy(pdu))
def __len__(self):
@@ -165,7 +169,7 @@ class IPNode(Node):
tuple and a broadcast tuple that would be used for socket communications.
"""
def __init__(self, addr, lan=None):
def __init__(self, addr, lan=None, promiscuous=False, spoofing=False, sid=None):
if _debug: IPNode._debug("__init__ %r lan=%r", addr, lan)
# make sure it's an Address that has appropriate pieces
@@ -178,7 +182,77 @@ class IPNode(Node):
self.addrBroadcastTuple = addr.addrBroadcastTuple
# continue initializing
Node.__init__(self, addr.addrTuple, lan=lan)
Node.__init__(self, addr.addrTuple, lan=lan, promiscuous=promiscuous, spoofing=spoofing, sid=sid)
bacpypes_debugging(IPNode)
#
# IPRouterNode
#
class IPRouterNode(Client):
def __init__(self, router, addr, lan=None):
if _debug: IPRouterNode._debug("__init__ %r %r lan=%r", router, addr, lan)
# save the reference to the router
self.router = router
# make ourselves an IPNode and bind to it
self.node = IPNode(addr, lan=lan, promiscuous=True, spoofing=True)
bind(self, self.node)
# save our mask and subnet
self.addrMask = addr.addrMask
self.addrSubnet = addr.addrSubnet
def confirmation(self, pdu):
if _debug: IPRouterNode._debug("confirmation %r", pdu)
self.router.process_pdu(self, pdu)
def process_pdu(self, pdu):
if _debug: IPRouterNode._debug("process_pdu %r", pdu)
# pass it downstream
self.request(pdu)
bacpypes_debugging(IPRouterNode)
#
# IPRouter
#
class IPRouter:
def __init__(self):
if _debug: IPRouter._debug("__init__")
# connected network nodes
self.nodes = []
def add_network(self, addr, lan):
if _debug: IPRouter._debug("add_network %r %r", addr, lan)
node = IPRouterNode(self, addr, lan)
if _debug: IPRouter._debug(" - node: %r", node)
self.nodes.append(node)
def process_pdu(self, node, pdu):
if _debug: IPRouter._debug("process_pdu %r %r", node, pdu)
# unpack the address part of the destination
addrstr = socket.inet_aton(pdu.pduDestination[0])
ipaddr = struct.unpack('!L', addrstr)[0]
if _debug: IPRouter._debug(" - ipaddr: %r", ipaddr)
# loop through the other nodes
for inode in self.nodes:
if inode is not node:
if (ipaddr & inode.addrMask) == inode.addrSubnet:
if _debug: IPRouter._debug(" - inode: %r", inode)
inode.process_pdu(pdu)
bacpypes_debugging(IPRouter)