diff --git a/py34/bacpypes/primitivedata.py b/py34/bacpypes/primitivedata.py index bb46f08..62773e8 100755 --- a/py34/bacpypes/primitivedata.py +++ b/py34/bacpypes/primitivedata.py @@ -25,37 +25,50 @@ _log = ModuleLogger(globals()) # Tag # + class Tag(object): - applicationTagClass = 0 - contextTagClass = 1 - openingTagClass = 2 - closingTagClass = 3 + applicationTagClass = 0 + contextTagClass = 1 + openingTagClass = 2 + closingTagClass = 3 - nullAppTag = 0 - booleanAppTag = 1 - unsignedAppTag = 2 - integerAppTag = 3 - realAppTag = 4 - doubleAppTag = 5 - octetStringAppTag = 6 - characterStringAppTag = 7 - bitStringAppTag = 8 - enumeratedAppTag = 9 - dateAppTag = 10 - timeAppTag = 11 - objectIdentifierAppTag = 12 - reservedAppTag13 = 13 - reservedAppTag14 = 14 - reservedAppTag15 = 15 + nullAppTag = 0 + booleanAppTag = 1 + unsignedAppTag = 2 + integerAppTag = 3 + realAppTag = 4 + doubleAppTag = 5 + octetStringAppTag = 6 + characterStringAppTag = 7 + bitStringAppTag = 8 + enumeratedAppTag = 9 + dateAppTag = 10 + timeAppTag = 11 + objectIdentifierAppTag = 12 + reservedAppTag13 = 13 + reservedAppTag14 = 14 + reservedAppTag15 = 15 - _app_tag_name = \ - [ 'null', 'boolean', 'unsigned', 'integer' - , 'real', 'double', 'octetString', 'characterString' - , 'bitString', 'enumerated', 'date', 'time' - , 'objectIdentifier', 'reserved13', 'reserved14', 'reserved15' - ] - _app_tag_class = [] # defined later + _app_tag_name = [ + "null", + "boolean", + "unsigned", + "integer", + "real", + "double", + "octetString", + "characterString", + "bitString", + "enumerated", + "date", + "time", + "objectIdentifier", + "reserved13", + "reserved14", + "reserved15", + ] + _app_tag_class = [] # defined later def __init__(self, *args): self.tagClass = None @@ -66,12 +79,12 @@ class Tag(object): if args: if (len(args) == 1) and isinstance(args[0], PDUData): self.decode(args[0]) - elif (len(args) >= 2): + elif len(args) >= 2: self.set(*args) else: raise ValueError("invalid Tag ctor arguments") - def set(self, tclass, tnum, tlvt=0, tdata=b''): + def set(self, tclass, tnum, tlvt=0, tdata=b""): """set the values of the tag.""" if isinstance(tdata, bytearray): tdata = bytes(tdata) @@ -98,42 +111,42 @@ class Tag(object): def encode(self, pdu): """Encode a tag on the end of the PDU.""" # check for special encoding - if (self.tagClass == Tag.contextTagClass): + if self.tagClass == Tag.contextTagClass: data = 0x08 - elif (self.tagClass == Tag.openingTagClass): + elif self.tagClass == Tag.openingTagClass: data = 0x0E - elif (self.tagClass == Tag.closingTagClass): + elif self.tagClass == Tag.closingTagClass: data = 0x0F else: data = 0x00 # encode the tag number part - if (self.tagNumber < 15): - data += (self.tagNumber << 4) + if self.tagNumber < 15: + data += self.tagNumber << 4 else: data += 0xF0 # encode the length/value/type part - if (self.tagLVT < 5): + if self.tagLVT < 5: data += self.tagLVT else: data += 0x05 # save this and the extended tag value - pdu.put( data ) - if (self.tagNumber >= 15): + pdu.put(data) + if self.tagNumber >= 15: pdu.put(self.tagNumber) # really short lengths are already done - if (self.tagLVT >= 5): - if (self.tagLVT <= 253): - pdu.put( self.tagLVT ) - elif (self.tagLVT <= 65535): - pdu.put( 254 ) - pdu.put_short( self.tagLVT ) + if self.tagLVT >= 5: + if self.tagLVT <= 253: + pdu.put(self.tagLVT) + elif self.tagLVT <= 65535: + pdu.put(254) + pdu.put_short(self.tagLVT) else: - pdu.put( 255 ) - pdu.put_long( self.tagLVT ) + pdu.put(255) + pdu.put_long(self.tagLVT) # now put the data pdu.put_data(self.tagData) @@ -147,29 +160,31 @@ class Tag(object): self.tagClass = (tag >> 3) & 0x01 # extract the tag number - self.tagNumber = (tag >> 4) - if (self.tagNumber == 0x0F): + self.tagNumber = tag >> 4 + if self.tagNumber == 0x0F: self.tagNumber = pdu.get() # extract the length self.tagLVT = tag & 0x07 - if (self.tagLVT == 5): + if self.tagLVT == 5: self.tagLVT = pdu.get() - if (self.tagLVT == 254): + if self.tagLVT == 254: self.tagLVT = pdu.get_short() - elif (self.tagLVT == 255): + elif self.tagLVT == 255: self.tagLVT = pdu.get_long() - elif (self.tagLVT == 6): + elif self.tagLVT == 6: self.tagClass = Tag.openingTagClass self.tagLVT = 0 - elif (self.tagLVT == 7): + elif self.tagLVT == 7: self.tagClass = Tag.closingTagClass self.tagLVT = 0 # application tagged boolean has no more data - if (self.tagClass == Tag.applicationTagClass) and (self.tagNumber == Tag.booleanAppTag): + if (self.tagClass == Tag.applicationTagClass) and ( + self.tagNumber == Tag.booleanAppTag + ): # tagLVT contains value - self.tagData = b'' + self.tagData = b"" else: # tagLVT contains length self.tagData = pdu.get_data(self.tagLVT) @@ -182,7 +197,7 @@ class Tag(object): raise ValueError("application tag required") # application tagged boolean now has data - if (self.tagNumber == Tag.booleanAppTag): + if self.tagNumber == Tag.booleanAppTag: return ContextTag(context, bytearray([self.tagLVT])) else: return ContextTag(context, self.tagData) @@ -193,8 +208,13 @@ class Tag(object): raise ValueError("context tag required") # context booleans have value in data - if (dataType == Tag.booleanAppTag): - return Tag(Tag.applicationTagClass, Tag.booleanAppTag, struct.unpack('B', self.tagData)[0], b'') + if dataType == Tag.booleanAppTag: + return Tag( + Tag.applicationTagClass, + Tag.booleanAppTag, + struct.unpack("B", self.tagData)[0], + b"", + ) else: return ApplicationTag(dataType, self.tagData) @@ -212,7 +232,7 @@ class Tag(object): return klass(self) def __repr__(self): - sname = self.__module__ + '.' + self.__class__.__name__ + sname = self.__module__ + "." + self.__class__.__name__ try: if self.tagClass == Tag.openingTagClass: desc = "(open(%d))" % (self.tagNumber,) @@ -227,15 +247,17 @@ class Tag(object): except: desc = "(?)" - return '<' + sname + desc + ' instance at 0x%08x' % (id(self),) + '>' + return "<" + sname + desc + " instance at 0x%08x" % (id(self),) + ">" def __eq__(self, tag): - return (self.tagClass == tag.tagClass) \ - and (self.tagNumber == tag.tagNumber) \ - and (self.tagLVT == tag.tagLVT) \ + return ( + (self.tagClass == tag.tagClass) + and (self.tagNumber == tag.tagNumber) + and (self.tagLVT == tag.tagLVT) and (self.tagData == tag.tagData) + ) - def __ne__(self,arg): + def __ne__(self, arg): return not self.__eq__(arg) def debug_contents(self, indent=1, file=sys.stdout, _ids=None): @@ -245,11 +267,16 @@ class Tag(object): # tag class msg = "%stagClass = %s " % (" " * indent, self.tagClass) - if self.tagClass == Tag.applicationTagClass: msg += 'application' - elif self.tagClass == Tag.contextTagClass: msg += 'context' - elif self.tagClass == Tag.openingTagClass: msg += 'opening' - elif self.tagClass == Tag.closingTagClass: msg += 'closing' - else: msg += "?" + if self.tagClass == Tag.applicationTagClass: + msg += "application" + elif self.tagClass == Tag.contextTagClass: + msg += "context" + elif self.tagClass == Tag.openingTagClass: + msg += "opening" + elif self.tagClass == Tag.closingTagClass: + msg += "closing" + else: + msg += "?" file.write(msg + "\n") # tag number @@ -258,21 +285,22 @@ class Tag(object): try: msg += self._app_tag_name[self.tagNumber] except: - msg += '?' + msg += "?" file.write(msg + "\n") # length, value, type file.write("%stagLVT = %s\n" % (" " * indent, self.tagLVT)) # data - file.write("%stagData = '%s'\n" % (" " * indent, btox(self.tagData,'.'))) + file.write("%stagData = '%s'\n" % (" " * indent, btox(self.tagData, "."))) + # # ApplicationTag # -class ApplicationTag(Tag): +class ApplicationTag(Tag): def __init__(self, *args): if len(args) == 1 and isinstance(args[0], PDUData): Tag.__init__(self, args[0]) @@ -284,12 +312,13 @@ class ApplicationTag(Tag): else: raise ValueError("ApplicationTag ctor requires a type and data or PDUData") + # # ContextTag # -class ContextTag(Tag): +class ContextTag(Tag): def __init__(self, *args): if len(args) == 1 and isinstance(args[0], PDUData): Tag.__init__(self, args[0]) @@ -301,12 +330,13 @@ class ContextTag(Tag): else: raise ValueError("ContextTag ctor requires a type and data or PDUData") + # # OpeningTag # -class OpeningTag(Tag): +class OpeningTag(Tag): def __init__(self, context): if isinstance(context, PDUData): Tag.__init__(self, context) @@ -317,12 +347,13 @@ class OpeningTag(Tag): else: raise TypeError("OpeningTag ctor requires an integer or PDUData") + # # ClosingTag # -class ClosingTag(Tag): +class ClosingTag(Tag): def __init__(self, context): if isinstance(context, PDUData): Tag.__init__(self, context) @@ -333,12 +364,13 @@ class ClosingTag(Tag): else: raise TypeError("ClosingTag ctor requires an integer or PDUData") + # # TagList # -class TagList(object): +class TagList(object): def __init__(self, arg=None): self.tagList = [] @@ -412,7 +444,8 @@ class TagList(object): lvl += 1 elif tag.tagClass == Tag.closingTagClass: lvl -= 1 - if lvl < 0: break + if lvl < 0: + break rslt.append(tag) i += 1 @@ -441,16 +474,18 @@ class TagList(object): def decode(self, pdu): """decode the tags from a PDU.""" while pdu.pduData: - self.tagList.append( Tag(pdu) ) + self.tagList.append(Tag(pdu)) def debug_contents(self, indent=1, file=sys.stdout, _ids=None): for tag in self.tagList: - tag.debug_contents(indent+1, file, _ids) + tag.debug_contents(indent + 1, file, _ids) + # # Atomic # + class Atomic(object): _app_tag = None @@ -463,9 +498,9 @@ class Atomic(object): other = self.__class__(other) # now compare the values - if (self.value < other.value): + if self.value < other.value: return -1 - elif (self.value > other.value): + elif self.value > other.value: return 1 else: return 0 @@ -478,7 +513,7 @@ class Atomic(object): other = self.__class__(other) # now compare the values - return (self.value < other.value) + return self.value < other.value def __eq__(self, other): # sys.stderr.write("__eq__ %r %r\n" % (self, other)) @@ -503,10 +538,12 @@ class Atomic(object): """Return True if arg is valid value for the class.""" raise NotImplementedError("call on a derived class of Atomic") + # # Null # + class Null(Atomic): _app_tag = Tag.nullAppTag @@ -527,10 +564,12 @@ class Null(Atomic): raise TypeError("invalid constructor datatype") def encode(self, tag): - tag.set_app_data(Tag.nullAppTag, b'') + tag.set_app_data(Tag.nullAppTag, b"") def decode(self, tag): - if (tag.tagClass != Tag.applicationTagClass) or (tag.tagNumber != Tag.nullAppTag): + if (tag.tagClass != Tag.applicationTagClass) or ( + tag.tagNumber != Tag.nullAppTag + ): raise InvalidTag("null application tag required") if len(tag.tagData) != 0: raise InvalidTag("invalid tag length") @@ -545,10 +584,12 @@ class Null(Atomic): def __str__(self): return "Null" + # # Boolean # + class Boolean(Atomic): _app_tag = Tag.booleanAppTag @@ -564,20 +605,22 @@ class Boolean(Atomic): self.value = arg elif isinstance(arg, Boolean): self.value = arg.value - elif str(arg) == 'True' or str(arg) == 'true': + elif str(arg) == "True" or str(arg) == "true": self.value = True - elif str(arg) == 'False' or str(arg) == 'false': + elif str(arg) == "False" or str(arg) == "false": self.value = False else: raise TypeError("invalid constructor datatype") def encode(self, tag): - tag.set(Tag.applicationTagClass, Tag.booleanAppTag, int(self.value), b'') + tag.set(Tag.applicationTagClass, Tag.booleanAppTag, int(self.value), b"") def decode(self, tag): - if (tag.tagClass != Tag.applicationTagClass) or (tag.tagNumber != Tag.booleanAppTag): + if (tag.tagClass != Tag.applicationTagClass) or ( + tag.tagNumber != Tag.booleanAppTag + ): raise InvalidTag("boolean application tag required") - if (tag.tagLVT > 1): + if tag.tagLVT > 1: raise InvalidTag("invalid tag value") # get the data @@ -589,17 +632,19 @@ class Boolean(Atomic): return isinstance(arg, bool) def __str__(self): - return "Boolean(%s)" % (str(self.value), ) + return "Boolean(%s)" % (str(self.value),) + # # Unsigned # + class Unsigned(Atomic): _app_tag = Tag.unsignedAppTag - def __init__(self,arg = None): + def __init__(self, arg=None): self.value = 0 if arg is None: @@ -607,7 +652,7 @@ class Unsigned(Atomic): elif isinstance(arg, Tag): self.decode(arg) elif isinstance(arg, int): - if (arg < 0): + if arg < 0: raise ValueError("unsigned integer required") self.value = arg elif isinstance(arg, Unsigned): @@ -617,7 +662,7 @@ class Unsigned(Atomic): def encode(self, tag): # rip apart the number - data = bytearray(struct.pack('>L', self.value)) + data = bytearray(struct.pack(">L", self.value)) # reduce the value to the smallest number of octets while (len(data) > 1) and (data[0] == 0): @@ -627,7 +672,9 @@ class Unsigned(Atomic): tag.set_app_data(Tag.unsignedAppTag, data) def decode(self, tag): - if (tag.tagClass != Tag.applicationTagClass) or (tag.tagNumber != Tag.unsignedAppTag): + if (tag.tagClass != Tag.applicationTagClass) or ( + tag.tagNumber != Tag.unsignedAppTag + ): raise InvalidTag("unsigned application tag required") if len(tag.tagData) == 0: raise InvalidTag("invalid tag length") @@ -646,17 +693,19 @@ class Unsigned(Atomic): return isinstance(arg, int) and (not isinstance(arg, bool)) and (arg >= 0) def __str__(self): - return "Unsigned(%s)" % (self.value, ) + return "Unsigned(%s)" % (self.value,) + # # Integer # + class Integer(Atomic): _app_tag = Tag.integerAppTag - def __init__(self,arg = None): + def __init__(self, arg=None): self.value = 0 if arg is None: @@ -672,22 +721,22 @@ class Integer(Atomic): def encode(self, tag): # rip apart the number - data = bytearray(struct.pack('>I', self.value & 0xFFFFFFFF)) + data = bytearray(struct.pack(">I", self.value & 0xFFFFFFFF)) # reduce the value to the smallest number of bytes, be # careful about sign extension if self.value < 0: - while (len(data) > 1): - if (data[0] != 255): + while len(data) > 1: + if data[0] != 255: break - if (data[1] < 128): + if data[1] < 128: break del data[0] else: - while (len(data) > 1): - if (data[0] != 0): + while len(data) > 1: + if data[0] != 0: break - if (data[1] >= 128): + if data[1] >= 128: break del data[0] @@ -695,7 +744,9 @@ class Integer(Atomic): tag.set_app_data(Tag.integerAppTag, data) def decode(self, tag): - if (tag.tagClass != Tag.applicationTagClass) or (tag.tagNumber != Tag.integerAppTag): + if (tag.tagClass != Tag.applicationTagClass) or ( + tag.tagNumber != Tag.integerAppTag + ): raise InvalidTag("integer application tag required") if len(tag.tagData) == 0: raise InvalidTag("invalid tag length") @@ -719,12 +770,14 @@ class Integer(Atomic): return isinstance(arg, int) and (not isinstance(arg, bool)) def __str__(self): - return "Integer(%s)" % (self.value, ) + return "Integer(%s)" % (self.value,) + # # Real # + class Real(Atomic): _app_tag = Tag.realAppTag @@ -747,16 +800,18 @@ class Real(Atomic): def encode(self, tag): # encode the tag - tag.set_app_data(Tag.realAppTag, struct.pack('>f',self.value)) + tag.set_app_data(Tag.realAppTag, struct.pack(">f", self.value)) def decode(self, tag): - if (tag.tagClass != Tag.applicationTagClass) or (tag.tagNumber != Tag.realAppTag): + if (tag.tagClass != Tag.applicationTagClass) or ( + tag.tagNumber != Tag.realAppTag + ): raise InvalidTag("real application tag required") if len(tag.tagData) != 4: raise InvalidTag("invalid tag length") # extract the data - self.value = struct.unpack('>f',tag.tagData)[0] + self.value = struct.unpack(">f", tag.tagData)[0] @classmethod def is_valid(cls, arg): @@ -766,15 +821,17 @@ class Real(Atomic): def __str__(self): return "Real(%g)" % (self.value,) + # # Double # + class Double(Atomic): _app_tag = Tag.doubleAppTag - def __init__(self,arg = None): + def __init__(self, arg=None): self.value = 0.0 if arg is None: @@ -792,16 +849,18 @@ class Double(Atomic): def encode(self, tag): # encode the tag - tag.set_app_data(Tag.doubleAppTag, struct.pack('>d',self.value)) + tag.set_app_data(Tag.doubleAppTag, struct.pack(">d", self.value)) def decode(self, tag): - if (tag.tagClass != Tag.applicationTagClass) or (tag.tagNumber != Tag.doubleAppTag): + if (tag.tagClass != Tag.applicationTagClass) or ( + tag.tagNumber != Tag.doubleAppTag + ): raise InvalidTag("double application tag required") if len(tag.tagData) != 8: raise InvalidTag("invalid tag length") # extract the data - self.value = struct.unpack('>d',tag.tagData)[0] + self.value = struct.unpack(">d", tag.tagData)[0] @classmethod def is_valid(cls, arg): @@ -811,10 +870,12 @@ class Double(Atomic): def __str__(self): return "Double(%g)" % (self.value,) + # # OctetString # + class OctetString(Atomic): _app_tag = Tag.octetStringAppTag @@ -838,7 +899,9 @@ class OctetString(Atomic): tag.set_app_data(Tag.octetStringAppTag, self.value) def decode(self, tag): - if (tag.tagClass != Tag.applicationTagClass) or (tag.tagNumber != Tag.octetStringAppTag): + if (tag.tagClass != Tag.applicationTagClass) or ( + tag.tagNumber != Tag.octetStringAppTag + ): raise InvalidTag("octet string application tag required") self.value = tag.tagData @@ -851,18 +914,20 @@ class OctetString(Atomic): def __str__(self): return "OctetString(X'" + btox(self.value) + "')" + # # CharacterString # + class CharacterString(Atomic): _app_tag = Tag.characterStringAppTag def __init__(self, arg=None): - self.value = '' + self.value = "" self.strEncoding = 0 - self.strValue = b'' + self.strValue = b"" if arg is None: pass @@ -870,7 +935,7 @@ class CharacterString(Atomic): self.decode(arg) elif isinstance(arg, str): self.value = arg - self.strValue = arg.encode('utf-8') + self.strValue = arg.encode("utf-8") elif isinstance(arg, CharacterString): self.value = arg.value self.strEncoding = arg.strEncoding @@ -880,10 +945,14 @@ class CharacterString(Atomic): def encode(self, tag): # encode the tag - tag.set_app_data(Tag.characterStringAppTag, bytes([self.strEncoding]) + self.strValue) + tag.set_app_data( + Tag.characterStringAppTag, bytes([self.strEncoding]) + self.strValue + ) def decode(self, tag): - if (tag.tagClass != Tag.applicationTagClass) or (tag.tagNumber != Tag.characterStringAppTag): + if (tag.tagClass != Tag.applicationTagClass) or ( + tag.tagNumber != Tag.characterStringAppTag + ): raise InvalidTag("character string application tag required") if len(tag.tagData) == 0: raise InvalidTag("invalid tag length") @@ -896,16 +965,26 @@ class CharacterString(Atomic): self.strValue = tag_data[1:] # normalize the value - if (self.strEncoding == 0): - self.value = self.strValue.decode('utf-8') - elif (self.strEncoding == 3): - self.value = self.strValue.decode('utf_32be') - elif (self.strEncoding == 4): - self.value = self.strValue.decode('utf_16be') - elif (self.strEncoding == 5): - self.value = self.strValue.decode('latin_1') - else: - self.value = '### unknown encoding: %d ###' % (self.strEncoding,) + try: + if self.strEncoding == 0: + try: + self.value = self.strValue.decode("utf-8", "strict") + except UnicodeDecodeError: + # Wrong encoding... trying with latin-1 as + # we probably face a Windows software encoding issue + try: + self.value = self.strValue.decode("latin-1") + except UnicodeDecodeError: + raise + + elif self.strEncoding == 3: + self.value = self.strValue.decode("utf_32be") + elif self.strEncoding == 4: + self.value = self.strValue.decode("utf_16be") + elif self.strEncoding == 5: + self.value = self.strValue.decode("latin_1") + except UnicodeDecodeError: + self.value = "### unknown encoding: %d ###" % (self.strEncoding,) @classmethod def is_valid(cls, arg): @@ -915,17 +994,19 @@ class CharacterString(Atomic): def __str__(self): return "CharacterString(%d,X'%s')" % (self.strEncoding, btox(self.strValue)) + # # BitString # + class BitString(Atomic): _app_tag = Tag.bitStringAppTag bitNames = {} bitLen = 0 - def __init__(self, arg = None): + def __init__(self, arg=None): self.value = [0] * self.bitLen if arg is None: @@ -948,7 +1029,7 @@ class BitString(Atomic): self.value[bit] = 1 else: raise TypeError("invalid constructor list element(s)") - elif isinstance(arg,BitString): + elif isinstance(arg, BitString): self.value = arg.value[:] else: raise TypeError("invalid constructor datatype") @@ -963,9 +1044,9 @@ class BitString(Atomic): # build and append each packed octet bits = self.value + [0] * unused - for i in range(0,len(bits),8): + for i in range(0, len(bits), 8): x = 0 - for j in range(0,8): + for j in range(0, 8): x |= bits[i + j] << (7 - j) data.append(x) @@ -973,7 +1054,9 @@ class BitString(Atomic): tag.set_app_data(Tag.bitStringAppTag, data) def decode(self, tag): - if (tag.tagClass != Tag.applicationTagClass) or (tag.tagNumber != Tag.bitStringAppTag): + if (tag.tagClass != Tag.applicationTagClass) or ( + tag.tagNumber != Tag.bitStringAppTag + ): raise InvalidTag("bit string application tag required") if len(tag.tagData) == 0: raise InvalidTag("invalid tag length") @@ -988,9 +1071,9 @@ class BitString(Atomic): for x in tag_data[1:]: for i in range(8): if (x & (1 << (7 - i))) != 0: - data.append( 1 ) + data.append(1) else: - data.append( 0 ) + data.append(0) # trim off the unused bits if unused: @@ -1019,17 +1102,17 @@ class BitString(Atomic): # build a list of values and/or names valueList = [] - for value, index in zip(self.value,range(len(self.value))): + for value, index in zip(self.value, range(len(self.value))): if index in bitNames: if value: valueList.append(bitNames[index]) else: - valueList.append('!' + bitNames[index]) + valueList.append("!" + bitNames[index]) else: valueList.append(str(value)) # bundle it together - return "BitString(" + ','.join(valueList) + ")" + return "BitString(" + ",".join(valueList) + ")" def __getitem__(self, bit): if isinstance(bit, int): @@ -1048,15 +1131,17 @@ class BitString(Atomic): return self.value[bit] def __setitem__(self, bit, value): - if isinstance(bit,int): + if isinstance(bit, int): pass - elif isinstance(bit,str): + elif isinstance(bit, str): if not bit in self.bitNames: raise IndexError("unknown bit name '%s'" % (bit,)) bit = self.bitNames[bit] else: - raise TypeError("bit index must be an integer or bit name : %s is %s" % (bit,type(bit))) + raise TypeError( + "bit index must be an integer or bit name : %s is %s" % (bit, type(bit)) + ) if (bit < 0) or (bit > len(self.value)): raise IndexError("list index out of range") @@ -1064,10 +1149,12 @@ class BitString(Atomic): # funny cast to a bit self.value[bit] = value and 1 or 0 + # # Enumerated # + class Enumerated(Atomic): _app_tag = Tag.enumeratedAppTag @@ -1079,7 +1166,7 @@ class Enumerated(Atomic): self.value = int(0) # see if the class has a translate table - if '_xlate_table' not in self.__class__.__dict__: + if "_xlate_table" not in self.__class__.__dict__: expand_enumerations(self.__class__) # initialize the object @@ -1088,7 +1175,7 @@ class Enumerated(Atomic): elif isinstance(arg, Tag): self.decode(arg) elif isinstance(arg, int): - if (arg < 0): + if arg < 0: raise ValueError("unsigned integer required") # convert it to a string if you can @@ -1112,7 +1199,9 @@ class Enumerated(Atomic): elif isinstance(self.value, str): return int(self._xlate_table[self.value]) else: - raise TypeError("%s is an invalid enumeration value datatype" % (type(self.value),)) + raise TypeError( + "%s is an invalid enumeration value datatype" % (type(self.value),) + ) def keylist(self): """Return a list of names in order by value.""" @@ -1141,9 +1230,9 @@ class Enumerated(Atomic): b = other.get_long() # now compare the values - if (a < b): + if a < b: return -1 - elif (a > b): + elif a > b: return 1 else: return 0 @@ -1154,10 +1243,12 @@ class Enumerated(Atomic): elif isinstance(self.value, str): value = self._xlate_table[self.value] else: - raise TypeError("%s is an invalid enumeration value datatype" % (type(self.value),)) + raise TypeError( + "%s is an invalid enumeration value datatype" % (type(self.value),) + ) # rip apart the number - data = bytearray(struct.pack('>L', value)) + data = bytearray(struct.pack(">L", value)) # reduce the value to the smallest number of octets while (len(data) > 1) and (data[0] == 0): @@ -1167,7 +1258,9 @@ class Enumerated(Atomic): tag.set_app_data(Tag.enumeratedAppTag, data) def decode(self, tag): - if (tag.tagClass != Tag.applicationTagClass) or (tag.tagNumber != Tag.enumeratedAppTag): + if (tag.tagClass != Tag.applicationTagClass) or ( + tag.tagNumber != Tag.enumeratedAppTag + ): raise InvalidTag("enumerated application tag required") if len(tag.tagData) == 0: raise InvalidTag("invalid tag length") @@ -1188,22 +1281,23 @@ class Enumerated(Atomic): """Return True if arg is valid value for the class. If the string value is wrong for the enumeration, the encoding will fail. """ - return (isinstance(arg, int) and (arg >= 0)) or \ - isinstance(arg, str) + return (isinstance(arg, int) and (arg >= 0)) or isinstance(arg, str) def __str__(self): return "%s(%s)" % (self.__class__.__name__, self.value) + # # expand_enumerations # + def expand_enumerations(klass): # build a value dictionary xlateTable = {} for c in klass.__mro__: - enumerations = getattr(c, 'enumerations', {}) + enumerations = getattr(c, "enumerations", {}) if enumerations: for name, value in enumerations.items(): # save the results @@ -1214,30 +1308,52 @@ def expand_enumerations(klass): setattr(klass, name, value) # save the dictionary in the class - setattr(klass, '_xlate_table', xlateTable) + setattr(klass, "_xlate_table", xlateTable) + # # Date # -_mm = r'(?P0?[1-9]|1[0-4]|odd|even|255|[*])' -_dd = r'(?P[0-3]?\d|last|odd|even|255|[*])' -_yy = r'(?P\d{2}|255|[*])' -_yyyy = r'(?P\d{4}|255|[*])' -_dow = r'(?P[1-7]|mon|tue|wed|thu|fri|sat|sun|255|[*])' -_special_mon = {'*': 255, 'odd': 13, 'even': 14, None: 255} -_special_mon_inv = {255: '*', 13: 'odd', 14: 'even'} +_mm = r"(?P0?[1-9]|1[0-4]|odd|even|255|[*])" +_dd = r"(?P[0-3]?\d|last|odd|even|255|[*])" +_yy = r"(?P\d{2}|255|[*])" +_yyyy = r"(?P\d{4}|255|[*])" +_dow = r"(?P[1-7]|mon|tue|wed|thu|fri|sat|sun|255|[*])" -_special_day = {'*': 255, 'last': 32, 'odd': 33, 'even': 34, None: 255} -_special_day_inv = {255: '*', 32: 'last', 33: 'odd', 34: 'even'} +_special_mon = {"*": 255, "odd": 13, "even": 14, None: 255} +_special_mon_inv = {255: "*", 13: "odd", 14: "even"} + +_special_day = {"*": 255, "last": 32, "odd": 33, "even": 34, None: 255} +_special_day_inv = {255: "*", 32: "last", 33: "odd", 34: "even"} + +_special_dow = { + "*": 255, + "mon": 1, + "tue": 2, + "wed": 3, + "thu": 4, + "fri": 5, + "sat": 6, + "sun": 7, +} +_special_dow_inv = { + 255: "*", + 1: "mon", + 2: "tue", + 3: "wed", + 4: "thu", + 5: "fri", + 6: "sat", + 7: "sun", +} -_special_dow = {'*': 255, 'mon': 1, 'tue': 2, 'wed': 3, 'thu': 4, 'fri': 5, 'sat': 6, 'sun': 7} -_special_dow_inv = {255: '*', 1: 'mon', 2: 'tue', 3: 'wed', 4: 'thu', 5: 'fri', 6: 'sat', 7: 'sun'} def _merge(*args): """Create a composite pattern and compile it.""" - return re.compile(r'^' + r'[/-]'.join(args) + r'(?:\s+' + _dow + ')?$') + return re.compile(r"^" + r"[/-]".join(args) + r"(?:\s+" + _dow + ")?$") + # make a list of compiled patterns _date_patterns = [ @@ -1247,7 +1363,7 @@ _date_patterns = [ _merge(_yy, _mm, _dd), _merge(_mm, _dd, _yy), _merge(_dd, _mm, _yy), - ] +] class Date(Atomic): @@ -1284,7 +1400,7 @@ class Date(Atomic): match = matches[0] else: # check to see if they really are the same - for a, b in zip(matches[:-1],matches[1:]): + for a, b in zip(matches[:-1], matches[1:]): if a != b: raise ValueError("ambiguous") break @@ -1292,12 +1408,12 @@ class Date(Atomic): match = matches[0] # extract the year and normalize - year = match['year'] - if (year == '*') or (not year): + year = match["year"] + if (year == "*") or (not year): year = 255 else: year = int(year) - if (year == 255): + if year == 255: pass elif year < 35: year += 2000 @@ -1307,36 +1423,36 @@ class Date(Atomic): raise ValueError("invalid year") # extract the month and normalize - month = match['month'] + month = match["month"] if month in _special_mon: month = _special_mon[month] else: month = int(month) - if (month == 255): + if month == 255: pass elif (month == 0) or (month > 14): raise ValueError("invalid month") # extract the day and normalize - day = match['day'] + day = match["day"] if day in _special_day: day = _special_day[day] else: day = int(day) - if (day == 255): + if day == 255: pass elif (day == 0) or (day > 34): raise ValueError("invalid day") # extract the day-of-week and normalize - day_of_week = match['dow'] + day_of_week = match["dow"] if day_of_week in _special_dow: day_of_week = _special_dow[day_of_week] elif not day_of_week: pass else: day_of_week = int(day_of_week) - if (day_of_week == 255): + if day_of_week == 255: pass elif day_of_week > 7: raise ValueError("invalid day of week") @@ -1378,7 +1494,7 @@ class Date(Atomic): pass else: try: - today = time.mktime( (year + 1900, month, day, 0, 0, 0, 0, 0, -1) ) + today = time.mktime((year + 1900, month, day, 0, 0, 0, 0, 0, -1)) day_of_week = time.gmtime(today)[6] + 1 except OverflowError: pass @@ -1395,7 +1511,7 @@ class Date(Atomic): when = _TaskManager().get_time() tup = time.localtime(when) - self.value = (tup[0]-1900, tup[1], tup[2], tup[6] + 1) + self.value = (tup[0] - 1900, tup[1], tup[2], tup[6] + 1) return self @@ -1409,14 +1525,16 @@ class Date(Atomic): raise ValueError("no wildcard values") # convert to time.time() value - return time.mktime( (year + 1900, month, day, 0, 0, 0, 0, 0, -1) ) + return time.mktime((year + 1900, month, day, 0, 0, 0, 0, 0, -1)) def encode(self, tag): # encode the tag tag.set_app_data(Tag.dateAppTag, bytearray(self.value)) def decode(self, tag): - if (tag.tagClass != Tag.applicationTagClass) or (tag.tagNumber != Tag.dateAppTag): + if (tag.tagClass != Tag.applicationTagClass) or ( + tag.tagNumber != Tag.dateAppTag + ): raise InvalidTag("date application tag required") if len(tag.tagData) != 4: raise InvalidTag("invalid tag length") @@ -1443,17 +1561,26 @@ class Date(Atomic): day = _special_day_inv.get(day, str(day)) day_of_week = _special_dow_inv.get(day_of_week, str(day_of_week)) - return "%s(%s-%s-%s %s)" % (self.__class__.__name__, year, month, day, day_of_week) + return "%s(%s-%s-%s %s)" % ( + self.__class__.__name__, + year, + month, + day, + day_of_week, + ) # # Time # + class Time(Atomic): _app_tag = Tag.timeAppTag - _time_regex = re.compile("^([*]|[0-9]+)[:]([*]|[0-9]+)(?:[:]([*]|[0-9]+)(?:[.]([*]|[0-9]+))?)?$") + _time_regex = re.compile( + "^([*]|[0-9]+)[:]([*]|[0-9]+)(?:[:]([*]|[0-9]+)(?:[.]([*]|[0-9]+))?)?$" + ) DONT_CARE = 255 @@ -1475,10 +1602,10 @@ class Time(Atomic): tup_list = [] tup_items = list(tup_match.groups()) for s in tup_items: - if s == '*': + if s == "*": tup_list.append(255) elif s is None: - if '*' in tup_items: + if "*" in tup_items: tup_list.append(255) else: tup_list.append(0) @@ -1529,7 +1656,9 @@ class Time(Atomic): tag.set_app_data(Tag.timeAppTag, bytearray(self.value)) def decode(self, tag): - if (tag.tagClass != Tag.applicationTagClass) or (tag.tagNumber != Tag.timeAppTag): + if (tag.tagClass != Tag.applicationTagClass) or ( + tag.tagNumber != Tag.timeAppTag + ): raise InvalidTag("time application tag required") if len(tag.tagData) != 4: raise InvalidTag("invalid tag length") @@ -1566,67 +1695,70 @@ class Time(Atomic): return rslt + # # ObjectType # + class ObjectType(Enumerated): vendor_range = (128, 1023) - enumerations = \ - { 'accessDoor':30 - , 'accessPoint':33 - , 'accessRights':34 - , 'accessUser':35 - , 'accessZone':36 - , 'accumulator':23 - , 'alertEnrollment':52 - , 'analogInput':0 - , 'analogOutput':1 - , 'analogValue':2 - , 'averaging':18 - , 'binaryInput':3 - , 'binaryOutput':4 - , 'binaryValue':5 - , 'bitstringValue':39 - , 'calendar':6 - , 'channel':53 - , 'characterstringValue':40 - , 'command':7 - , 'credentialDataInput':37 - , 'datePatternValue':41 - , 'dateValue':42 - , 'datetimePatternValue':43 - , 'datetimeValue':44 - , 'device':8 - , 'eventEnrollment':9 - , 'eventLog':25 - , 'file':10 - , 'globalGroup':26 - , 'group':11 - , 'integerValue':45 - , 'largeAnalogValue':46 - , 'lifeSafetyPoint':21 - , 'lifeSafetyZone':22 - , 'lightingOutput':54 - , 'loadControl':28 - , 'loop':12 - , 'multiStateInput':13 - , 'multiStateOutput':14 - , 'multiStateValue':19 - , 'networkSecurity':38 - , 'notificationClass':15 - , 'notificationForwarder':51 - , 'octetstringValue':47 - , 'positiveIntegerValue':48 - , 'program':16 - , 'pulseConverter':24 - , 'schedule':17 - , 'structuredView':29 - , 'timePatternValue':49 - , 'timeValue':50 - , 'trendLog':20 - , 'trendLogMultiple':27 - } + enumerations = { + "accessDoor": 30, + "accessPoint": 33, + "accessRights": 34, + "accessUser": 35, + "accessZone": 36, + "accumulator": 23, + "alertEnrollment": 52, + "analogInput": 0, + "analogOutput": 1, + "analogValue": 2, + "averaging": 18, + "binaryInput": 3, + "binaryOutput": 4, + "binaryValue": 5, + "bitstringValue": 39, + "calendar": 6, + "channel": 53, + "characterstringValue": 40, + "command": 7, + "credentialDataInput": 37, + "datePatternValue": 41, + "dateValue": 42, + "datetimePatternValue": 43, + "datetimeValue": 44, + "device": 8, + "eventEnrollment": 9, + "eventLog": 25, + "file": 10, + "globalGroup": 26, + "group": 11, + "integerValue": 45, + "largeAnalogValue": 46, + "lifeSafetyPoint": 21, + "lifeSafetyZone": 22, + "lightingOutput": 54, + "loadControl": 28, + "loop": 12, + "multiStateInput": 13, + "multiStateOutput": 14, + "multiStateValue": 19, + "networkSecurity": 38, + "notificationClass": 15, + "notificationForwarder": 51, + "octetstringValue": 47, + "positiveIntegerValue": 48, + "program": 16, + "pulseConverter": 24, + "schedule": 17, + "structuredView": 29, + "timePatternValue": 49, + "timeValue": 50, + "trendLog": 20, + "trendLogMultiple": 27, + } + expand_enumerations(ObjectType) @@ -1634,6 +1766,7 @@ expand_enumerations(ObjectType) # ObjectIdentifier # + class ObjectIdentifier(Atomic): _app_tag = Tag.objectIdentifierAppTag @@ -1642,7 +1775,7 @@ class ObjectIdentifier(Atomic): maximum_instance_number = 0x003FFFFF def __init__(self, *args): - self.value = ('analogInput', 0) + self.value = ("analogInput", 0) if len(args) == 0: pass @@ -1654,7 +1787,7 @@ class ObjectIdentifier(Atomic): self.set_long(arg) elif isinstance(arg, str): try: - objType, objInstance = arg.split(':') + objType, objInstance = arg.split(":") if objType.isdigit(): objType = int(objType) objInstance = int(objInstance) @@ -1683,10 +1816,14 @@ class ObjectIdentifier(Atomic): if objType not in self.objectTypeClass._xlate_table: raise ValueError("unrecognized object type '%s'" % (objType,)) else: - raise TypeError("invalid datatype for objType: %r, %r" % (type(objType), objType)) + raise TypeError( + "invalid datatype for objType: %r, %r" % (type(objType), objType) + ) # check for valid instance number - if (objInstance < 0) or (objInstance > ObjectIdentifier.maximum_instance_number): + if (objInstance < 0) or ( + objInstance > ObjectIdentifier.maximum_instance_number + ): raise ValueError("instance number out of range") # pack the components together @@ -1725,20 +1862,22 @@ class ObjectIdentifier(Atomic): objType, objInstance = self.get_tuple() # pack the components together - return ((objType << 22) + objInstance) + return (objType << 22) + objInstance def encode(self, tag): # encode the tag - tag.set_app_data(Tag.objectIdentifierAppTag, struct.pack('>L', self.get_long())) + tag.set_app_data(Tag.objectIdentifierAppTag, struct.pack(">L", self.get_long())) def decode(self, tag): - if (tag.tagClass != Tag.applicationTagClass) or (tag.tagNumber != Tag.objectIdentifierAppTag): + if (tag.tagClass != Tag.applicationTagClass) or ( + tag.tagNumber != Tag.objectIdentifierAppTag + ): raise InvalidTag("object identifier application tag required") if len(tag.tagData) != 4: raise InvalidTag("invalid tag length") # extract the data - self.set_long(struct.unpack('>L',tag.tagData)[0]) + self.set_long(struct.unpack(">L", tag.tagData)[0]) @classmethod def is_valid(cls, arg): @@ -1755,7 +1894,7 @@ class ObjectIdentifier(Atomic): typestr = "Bad %d" % (objType,) elif objType in self.objectTypeClass._xlate_table: typestr = self.objectTypeClass._xlate_table[objType] - elif (objType < 128): + elif objType < 128: typestr = "Reserved %d" % (objType,) else: typestr = "Vendor %d" % (objType,) @@ -1776,7 +1915,8 @@ class ObjectIdentifier(Atomic): b = other.get_long() # now compare the values - return (a < b) + return a < b + # # Application Tag Classes @@ -1787,10 +1927,22 @@ class ObjectIdentifier(Atomic): # classes aren't defined yet. # -Tag._app_tag_class = \ - [ Null, Boolean, Unsigned, Integer - , Real, Double, OctetString, CharacterString - , BitString, Enumerated, Date, Time - , ObjectIdentifier, None, None, None - ] +Tag._app_tag_class = [ + Null, + Boolean, + Unsigned, + Integer, + Real, + Double, + OctetString, + CharacterString, + BitString, + Enumerated, + Date, + Time, + ObjectIdentifier, + None, + None, + None, +]