1
0
mirror of https://github.com/JoelBender/bacpypes synced 2025-10-27 00:57:47 +08:00

switch pcap libraries #137

This commit is contained in:
Joel Bender
2018-03-05 14:37:36 -05:00
parent 3130ca3d9c
commit 22ce545404
3 changed files with 63 additions and 108 deletions

View File

@@ -2,6 +2,14 @@
"""
Analysis - Decoding pcap files
Before analyzing files, install libpcap-dev:
$ sudo apt install libpcap-dev
then install pypcap:
https://github.com/pynetwork/pypcap
"""
import sys
@@ -15,7 +23,7 @@ try:
except:
pass
from .debugging import ModuleLogger, DebugContents, bacpypes_debugging
from .debugging import ModuleLogger, DebugContents, bacpypes_debugging, btox
from .pdu import PDU, Address
from .bvll import BVLPDU, bvl_pdu_types, ForwardedNPDU, \
@@ -33,13 +41,6 @@ _protocols={socket.IPPROTO_TCP:'tcp',
socket.IPPROTO_UDP:'udp',
socket.IPPROTO_ICMP:'icmp'}
#
# _hexify
#
def _hexify(s, sep='.'):
return sep.join('%02X' % ord(c) for c in s)
#
# strftimestamp
#
@@ -53,11 +54,11 @@ def strftimestamp(ts):
#
def decode_ethernet(s):
if _debug: decode_ethernet._debug("decode_ethernet %s...", _hexify(s[:14]))
if _debug: decode_ethernet._debug("decode_ethernet %s...", btox(s[:14]))
d={}
d['destination_address'] = _hexify(s[0:6], ':')
d['source_address'] = _hexify(s[6:12], ':')
d['destination_address'] = btox(s[0:6], ':')
d['source_address'] = btox(s[6:12], ':')
d['type'] = struct.unpack('!H',s[12:14])[0]
d['data'] = s[14:]
@@ -70,7 +71,7 @@ bacpypes_debugging(decode_ethernet)
#
def decode_vlan(s):
if _debug: decode_vlan._debug("decode_vlan %s...", _hexify(s[:4]))
if _debug: decode_vlan._debug("decode_vlan %s...", btox(s[:4]))
d = {}
x = struct.unpack('!H',s[0:2])[0]
@@ -89,7 +90,7 @@ bacpypes_debugging(decode_vlan)
#
def decode_ip(s):
if _debug: decode_ip._debug("decode_ip %r", _hexify(s[:20]))
if _debug: decode_ip._debug("decode_ip %r", btox(s[:20]))
d = {}
d['version'] = (ord(s[0]) & 0xf0) >> 4
@@ -119,7 +120,7 @@ bacpypes_debugging(decode_ip)
#
def decode_udp(s):
if _debug: decode_udp._debug("decode_udp %s...", _hexify(s[:8]))
if _debug: decode_udp._debug("decode_udp %s...", btox(s[:8]))
d = {}
d['source_port'] = struct.unpack('!H',s[0:2])[0]
@@ -225,7 +226,7 @@ def decode_packet(data):
# check for version number
if (pdu.pduData[0] != '\x01'):
if _debug: decode_packet._debug(" - not a version 1 packet: %s...", _hexify(pdu.pduData[:30]))
if _debug: decode_packet._debug(" - not a version 1 packet: %s...", btox(pdu.pduData[:30]))
return None
# it's an NPDU
@@ -355,33 +356,7 @@ def decode_file(fname):
"""Given the name of a pcap file, open it, decode the contents and yield each packet."""
if _debug: decode_file._debug("decode_file %r", fname)
if not pcap:
raise RuntimeError("failed to import pcap")
# create a pcap object
p = pcap.pcapObject()
p.open_offline(fname)
i = 0
while 1:
# the object acts like an iterator
pkt = p.next()
if not pkt:
break
# returns a tuple
pktlen, data, timestamp = pkt
pkt = decode_packet(data)
if not pkt:
continue
# save the index and timestamp in the packet
pkt._index = i
pkt._timestamp = timestamp
yield pkt
i += 1
raise NotImplementedError("not implemented")
bacpypes_debugging(decode_file)