1
0
mirror of https://github.com/JoelBender/bacpypes synced 2025-09-28 22:15:23 +08:00

first crack at supporting interface names

This commit is contained in:
Joel Bender 2017-03-10 00:39:35 -05:00
parent 665ee4be99
commit ec01d22eae
3 changed files with 158 additions and 0 deletions

View File

@ -8,6 +8,11 @@ import re
import socket
import struct
try:
import netifaces
except ImportError:
netifaces = None
from .debugging import ModuleLogger, bacpypes_debugging, btox, xtob
from .comm import PCI as _PCI, PDUData
@ -202,6 +207,55 @@ class Address:
self.addrAddr = xtob(addr[2:-1])
self.addrLen = len(self.addrAddr)
elif netifaces and interface_re.match(addr):
if _debug: Address._debug(" - interface name with optional port")
interface, port = interface_re.match(addr).groups()
if port is not None:
self.addrPort = int(port)
else:
self.addrPort = 47808
interfaces = netifaces.interfaces()
if interface not in interfaces:
raise ValueError("not an interface: %s" % (interface,))
if _debug: Address._debug(" - interfaces: %r", interfaces)
ifaddresses = netifaces.ifaddresses(interface)
if netifaces.AF_INET not in ifaddresses:
raise ValueError("interface does not support IPv4: %s" % (interface,))
ipv4addresses = ifaddresses[netifaces.AF_INET]
if len(ipv4addresses) > 1:
raise ValueError("interface supports multiple IPv4 addresses: %s" % (interface,))
ifaddress = ipv4addresses[0]
if _debug: Address._debug(" - ifaddress: %r", ifaddress)
addr = ifaddress['addr']
self.addrTuple = (addr, self.addrPort)
if _debug: Address._debug(" - addrTuple: %r", self.addrTuple)
addrstr = socket.inet_aton(addr)
self.addrIP = struct.unpack('!L', addrstr)[0]
if 'netmask' in ifaddress:
maskstr = socket.inet_aton(ifaddress['netmask'])
self.addrMask = struct.unpack('!L', maskstr)[0]
else:
self.addrMask = _long_mask
self.addrHost = (self.addrIP & ~self.addrMask)
self.addrSubnet = (self.addrIP & self.addrMask)
if 'broadcast' in ifaddress:
self.addrBroadcastTuple = (ifaddress['broadcast'], self.addrPort)
else:
self.addrBroadcastTuple = None
if _debug: Address._debug(" - addrBroadcastTuple: %r", self.addrBroadcastTuple)
self.addrAddr = addrstr + struct.pack('!H', self.addrPort & _short_mask)
self.addrLen = 6
else:
raise ValueError("unrecognized format")

View File

@ -8,6 +8,11 @@ import re
import socket
import struct
try:
import netifaces
except ImportError:
netifaces = None
from .debugging import ModuleLogger, bacpypes_debugging, btox, xtob
from .comm import PCI as _PCI, PDUData
@ -25,6 +30,7 @@ _log = ModuleLogger(globals())
ip_address_mask_port_re = re.compile(r'^(?:(\d+):)?(\d+\.\d+\.\d+\.\d+)(?:/(\d+))?(?::(\d+))?$')
ethernet_re = re.compile(r'^([0-9A-Fa-f][0-9A-Fa-f][:]){5}([0-9A-Fa-f][0-9A-Fa-f])$' )
interface_re = re.compile(r'^(?:([\w]+))(?::(\d+))?$')
@bacpypes_debugging
class Address:
@ -203,6 +209,49 @@ class Address:
self.addrAddr = xtob(addr[2:-1])
self.addrLen = len(self.addrAddr)
elif netifaces and interface_re.match(addr):
interface, port = interface_re.match(addr).groups()
if port is not None:
self.addrPort = int(port)
else:
self.addrPort = 47808
interfaces = netifaces.interfaces()
if interface not in interfaces:
raise ValueError("not an interface: %s" % (interface,))
ifaddresses = netifaces.ifaddresses(interface)
if netifaces.AF_INET not in ifaddresses:
raise ValueError("interface does not support IPv4: %s" % (interface,))
ipv4addresses = ifaddresses[netifaces.AF_INET]
if len(ipv4addresses) > 1:
raise ValueError("interface supports multiple IPv4 addresses: %s" % (interface,))
ifaddress = ipv4addresses[0]
addr = ifaddress['addr']
self.addrTuple = (addr, self.addrPort)
addrstr = socket.inet_aton(addr)
self.addrIP = struct.unpack('!L', addrstr)[0]
if 'netmask' in ifaddress:
maskstr = socket.inet_aton(ifaddress['netmask'])
self.addrMask = struct.unpack('!L', maskstr)[0]
else:
self.addrMask = _long_mask
self.addrHost = (self.addrIP & ~self.addrMask)
self.addrSubnet = (self.addrIP & self.addrMask)
if 'broadcast' in ifaddress:
self.addrBroadcastTuple = (ifaddress['broadcast'], self.addrPort)
else:
self.addrBroadcastTuple = None
self.addrAddr = addrstr + struct.pack('!H', self.addrPort & _short_mask)
self.addrLen = 6
else:
raise ValueError("unrecognized format")

View File

@ -8,6 +8,11 @@ import re
import socket
import struct
try:
import netifaces
except ImportError:
netifaces = None
from .debugging import ModuleLogger, bacpypes_debugging, btox, xtob
from .comm import PCI as _PCI, PDUData
@ -25,6 +30,7 @@ _log = ModuleLogger(globals())
ip_address_mask_port_re = re.compile(r'^(?:(\d+):)?(\d+\.\d+\.\d+\.\d+)(?:/(\d+))?(?::(\d+))?$')
ethernet_re = re.compile(r'^([0-9A-Fa-f][0-9A-Fa-f][:]){5}([0-9A-Fa-f][0-9A-Fa-f])$' )
interface_re = re.compile(r'^(?:([\w]+))(?::(\d+))?$')
@bacpypes_debugging
class Address:
@ -219,6 +225,55 @@ class Address:
self.addrAddr = xtob(addr[2:-1])
self.addrLen = len(self.addrAddr)
elif netifaces and interface_re.match(addr):
if _debug: Address._debug(" - interface name with optional port")
interface, port = interface_re.match(addr).groups()
if port is not None:
self.addrPort = int(port)
else:
self.addrPort = 47808
interfaces = netifaces.interfaces()
if interface not in interfaces:
raise ValueError("not an interface: %s" % (interface,))
if _debug: Address._debug(" - interfaces: %r", interfaces)
ifaddresses = netifaces.ifaddresses(interface)
if netifaces.AF_INET not in ifaddresses:
raise ValueError("interface does not support IPv4: %s" % (interface,))
ipv4addresses = ifaddresses[netifaces.AF_INET]
if len(ipv4addresses) > 1:
raise ValueError("interface supports multiple IPv4 addresses: %s" % (interface,))
ifaddress = ipv4addresses[0]
if _debug: Address._debug(" - ifaddress: %r", ifaddress)
addr = ifaddress['addr']
self.addrTuple = (addr, self.addrPort)
if _debug: Address._debug(" - addrTuple: %r", self.addrTuple)
addrstr = socket.inet_aton(addr)
self.addrIP = struct.unpack('!L', addrstr)[0]
if 'netmask' in ifaddress:
maskstr = socket.inet_aton(ifaddress['netmask'])
self.addrMask = struct.unpack('!L', maskstr)[0]
else:
self.addrMask = _long_mask
self.addrHost = (self.addrIP & ~self.addrMask)
self.addrSubnet = (self.addrIP & self.addrMask)
if 'broadcast' in ifaddress:
self.addrBroadcastTuple = (ifaddress['broadcast'], self.addrPort)
else:
self.addrBroadcastTuple = None
if _debug: Address._debug(" - addrBroadcastTuple: %r", self.addrBroadcastTuple)
self.addrAddr = addrstr + struct.pack('!H', self.addrPort & _short_mask)
self.addrLen = 6
else:
raise ValueError("unrecognized format")