1
0
mirror of https://github.com/JoelBender/bacpypes synced 2025-10-05 22:18:16 +08:00

decode the response value, add a favicon

This commit is contained in:
Joel Bender 2018-11-08 21:15:31 -05:00
parent 54e9e9c88f
commit 87fe8e46fe

View File

@ -6,8 +6,11 @@ HTTPServer
import threading
import json
import zlib
from collections import OrderedDict
from urlparse import urlparse, parse_qs
import SocketServer
import SimpleHTTPServer
@ -19,7 +22,8 @@ from bacpypes.iocb import IOCB
from bacpypes.pdu import Address, GlobalBroadcast
from bacpypes.apdu import ReadPropertyRequest, WhoIsRequest
from bacpypes.primitivedata import ObjectIdentifier
from bacpypes.primitivedata import Unsigned, ObjectIdentifier
from bacpypes.constructeddata import Array
from bacpypes.app import BIPSimpleApplication
from bacpypes.object import get_object_class, get_datatype
@ -33,6 +37,29 @@ _log = ModuleLogger(globals())
this_application = None
server = None
# favorite icon
favicon = zlib.decompress(
'x\x9c\xb5\x93\xcdN\xdb@\x14\x85\x07\x95\x07\xc8\x8amYv\xc9#\xe4\x11x\x04\x96}'
'\x8c\x88\x1dl\xa0\x9b\xb6A\xa2)\x0bVTB\xa9"\xa5?*I\x16\xad"\x84d\x84DE\x93'
'\x14;v\xc01M\xe2$\x988\xb1l\x9d\xde;v\\\x03\x89TU\xea\xb5N\xe4\xb9\x9a\xef'
'\x1c\xcfO\x84X\xa0\'\x95\x12\xf4\xbb,\x9e/\n\xb1$\x84xF\xa2\x16u\xc2>WzQ\xfc'
'\xf7\xca\xad\xafo\x91T\xd2\x1ai\xe5\x1fx[\xf9\xf4\x01\xc57\xbb\xd8\xdf\xd8'
'\x00\x8d\x11\xf9\x95\x12\xda\x9a\xc3\xae\xe5_\xbdDpk\x03\xc3\xaeT\xd0\xb3\xd0'
'>?\x83Z\xfd\x86Z\xa5\x84\x1fG_\xa4\xe7\x1c^\xa9W\xbfJ\xfe\xb4\xf0\x0e^\xdb'
'\x88}0 \xafA\x0f\xa3+c&O\xbd\xf4\xc1\xf6\xb6d\x9d\xc6\x05\xdcVSz\xb0x\x1c\x10'
'\x0fo\x02\xc7\xd0\xe7\xf1%\xe5\xf3\xc78\xdb\xf9Y\x93\x1eI\x1f\xf8>\xfa\xb5'
'\x8bG<\x8dW\x0f^\x84\xd9\xee\xb5~\x8f\xe1w\xaf{\x83\x80\xb2\xbd\xe1\x10\x83'
'\x88\'\xa5\x12\xbcZ?9\x8e\xb3%\xd3\xeb`\xd4\xd2\xffdS\xb9\x96\x89!}W!\xfb\x9a'
'\xf9t\xc4f\x8aos\x92\x9dtn\xe0\xe8Z\xcc\xc8=\xec\xf7d6\x97\xa3]\xc2Q\x1b(\xec'
'd\x99_\x8dx\xd4\x15%\xce\x96\xf9\xbf\xacP\xd1:\xfc\xf1\x18\xbe\xeb\xe2\xaey'
'\x89;]\xc5\xf1\xfb<\xf3\x99\xe9\x99\xefon\xa2\xdb6\xe5\x1c\xbb^\x8b}FV\x1b'
'\x9es+\xb3\xbd\x81M\xeb\xd1\xe0^5\xf1\xbd|\xc4\xfca\xf2\xde\xf0w\x9cW\xabr.'
'\xe7\xd9\x8dFx\x0e\xa6){\x93\x8e\x85\xf1\xb5\x81\x89\xd9\x82\xa1\x9c\xc8;\xf9'
'\xe0\x0cV\xb8W\xdc\xdb\x83\xa9i\xb1O@g\xa6T*\xd3=O\xeaP\xcc(^\x17\xfb\xe4\xb3'
'Y\xc9\xb1\x17{N\xf7\xfbo\x8b\xf7\x97\x94\xe3;\xcd\xff)\xd2\xf2\xacy\xa0\x9b'
'\xd4g=\x11B\x8bT\x8e\x94Y\x08%\x12\xe2q\x99\xd4\x7f*\x84O\xfa\r\xb5\x916R'
)
#
# ThreadedHTTPRequestHandler
#
@ -42,6 +69,7 @@ class ThreadedHTTPRequestHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
def do_GET(self):
if _debug: ThreadedHTTPRequestHandler._debug("do_GET")
global favicon
# get the thread
cur_thread = threading.current_thread()
@ -61,6 +89,14 @@ class ThreadedHTTPRequestHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
self.do_read(args[2:])
elif (args[1] == 'whois'):
self.do_whois(args[2:])
if args[1] == 'favicon.ico':
self.send_response(200)
self.send_header("Content-type", 'image/x-icon')
self.send_header("Content-Length", len(favicon))
self.end_headers()
self.wfile.write(favicon)
else:
return "'read' or 'whois' expected"
def do_read(self, args):
if _debug: ThreadedHTTPRequestHandler._debug("do_read %r", args)
@ -113,7 +149,29 @@ class ThreadedHTTPRequestHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
result = { "error": str(iocb.ioError) }
else:
if _debug: ThreadedHTTPRequestHandler._debug(" - response: %r", iocb.ioResponse)
result = { "value": iocb.ioResponse }
apdu = iocb.ioResponse
# find the datatype
datatype = get_datatype(apdu.objectIdentifier[0], apdu.propertyIdentifier)
if _debug: ThreadedHTTPRequestHandler._debug(" - datatype: %r", datatype)
if not datatype:
raise TypeError("unknown datatype")
# special case for array parts, others are managed by cast_out
if issubclass(datatype, Array) and (apdu.propertyArrayIndex is not None):
if apdu.propertyArrayIndex == 0:
datatype = Unsigned
else:
datatype = datatype.subtype
if _debug: ThreadedHTTPRequestHandler._debug(" - datatype: %r", datatype)
# convert the value to a dict if possible
value = apdu.propertyValue.cast_out(datatype)
if hasattr(value, 'dict_contents'):
value = value.dict_contents(as_class=OrderedDict)
if _debug: ThreadedHTTPRequestHandler._debug(" - value: %r", value)
result = { "value": value }
except Exception as err:
ThreadedHTTPRequestHandler._exception("exception: %r", err)