mirror of
https://github.com/FreeOpcUa/opcua-asyncio
synced 2025-10-29 17:07:18 +08:00
fix receive_buffer and subscription tests
This commit is contained in:
committed by
oroulet
parent
b005e09d73
commit
59f1dd2b35
@@ -79,6 +79,8 @@ class UASocketProtocol(asyncio.Protocol):
|
||||
self._process_received_message(msg)
|
||||
if not buf:
|
||||
return
|
||||
# Buffer still has bytes left, try to process again
|
||||
data = bytes(buf)
|
||||
except Exception:
|
||||
self.logger.exception('Exception raised while parsing message from server')
|
||||
self.disconnect_socket()
|
||||
|
||||
@@ -26,8 +26,8 @@ class SocketClosedException(UaError):
|
||||
|
||||
class Buffer:
|
||||
"""
|
||||
alternative to io.BytesIO making debug easier
|
||||
and added a few convenience methods
|
||||
Alternative to io.BytesIO making debug easier
|
||||
and added a few convenience methods.
|
||||
"""
|
||||
|
||||
def __init__(self, data, start_pos=0, size=-1):
|
||||
@@ -38,9 +38,7 @@ class Buffer:
|
||||
self._size = size
|
||||
|
||||
def __str__(self):
|
||||
return "Buffer(size:{0}, data:{1})".format(
|
||||
self._size,
|
||||
self._data[self._cur_pos:self._cur_pos + self._size])
|
||||
return f"Buffer(size:{self._size}, data:{self._data[self._cur_pos:self._cur_pos + self._size]})"
|
||||
__repr__ = __str__
|
||||
|
||||
def __len__(self):
|
||||
@@ -49,23 +47,24 @@ class Buffer:
|
||||
def __bool__(self):
|
||||
return self._size > 0
|
||||
|
||||
def __bytes__(self):
|
||||
"""Return remains of buffer as bytes."""
|
||||
return self._data[self._cur_pos:]
|
||||
|
||||
def read(self, size):
|
||||
"""
|
||||
read and pop number of bytes for buffer
|
||||
"""
|
||||
if size > self._size:
|
||||
raise NotEnoughData("Not enough data left in buffer, request for {0}, we have {1}".format(size, self))
|
||||
# self.logger.debug("Request for %s bytes, from %s", size, self)
|
||||
raise NotEnoughData(f"Not enough data left in buffer, request for {size}, we have {self._size}")
|
||||
self._size -= size
|
||||
pos = self._cur_pos
|
||||
self._cur_pos += size
|
||||
data = self._data[pos:self._cur_pos]
|
||||
# self.logger.debug("Returning: %s ", data)
|
||||
return data
|
||||
return self._data[pos:self._cur_pos]
|
||||
|
||||
def copy(self, size=-1):
|
||||
"""
|
||||
return a shadow copy, optionnaly only copy 'size' bytes
|
||||
return a shadow copy, optionally only copy 'size' bytes
|
||||
"""
|
||||
if size == -1 or size > self._size:
|
||||
size = self._size
|
||||
@@ -76,7 +75,7 @@ class Buffer:
|
||||
skip size bytes in buffer
|
||||
"""
|
||||
if size > self._size:
|
||||
raise NotEnoughData("Not enough data left in buffer, request for {0}, we have {1}".format(size, self))
|
||||
raise NotEnoughData(f"Not enough data left in buffer, request for {size}, we have {self._size}")
|
||||
self._size -= size
|
||||
self._cur_pos += size
|
||||
|
||||
|
||||
@@ -352,7 +352,7 @@ async def test_unsubscribe_two_objects_simultaneously(opc):
|
||||
opc.opc.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_State)),
|
||||
]
|
||||
sub = await opc.opc.create_subscription(100, handler)
|
||||
handles = await sub.subscribe_data_change(nodes)
|
||||
handles = await sub.subscribe_data_change(nodes, queuesize=1)
|
||||
await handler.done()
|
||||
assert handler.results[0][0] == nodes[0]
|
||||
assert handler.results[1][0] == nodes[1]
|
||||
@@ -371,7 +371,7 @@ async def test_unsubscribe_two_objects_consecutively(opc):
|
||||
opc.opc.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_State)),
|
||||
]
|
||||
sub = await opc.opc.create_subscription(100, handler)
|
||||
handles = await sub.subscribe_data_change(nodes)
|
||||
handles = await sub.subscribe_data_change(nodes, queuesize=1)
|
||||
assert type(handles) is list
|
||||
await handler.done()
|
||||
for handle in handles:
|
||||
|
||||
Reference in New Issue
Block a user