mirror of
https://github.com/FreeOpcUa/opcua-asyncio
synced 2025-10-29 17:07:18 +08:00
Await connection lost tasks when stopping internal server (#98)
Await connection lost tasks when stopping internal server
This commit is contained in:
@@ -18,7 +18,7 @@ class OPCUAProtocol(asyncio.Protocol):
|
||||
Instantiated for every connection.
|
||||
"""
|
||||
|
||||
def __init__(self, iserver: InternalServer, policies, clients):
|
||||
def __init__(self, iserver: InternalServer, policies, clients, protocol_tasks):
|
||||
self.peer_name = None
|
||||
self.transport = None
|
||||
self.processor = None
|
||||
@@ -26,6 +26,7 @@ class OPCUAProtocol(asyncio.Protocol):
|
||||
self.iserver: InternalServer = iserver
|
||||
self.policies = policies
|
||||
self.clients = clients
|
||||
self.protocol_tasks = protocol_tasks
|
||||
self.messages = asyncio.Queue()
|
||||
self._task = None
|
||||
|
||||
@@ -48,11 +49,13 @@ class OPCUAProtocol(asyncio.Protocol):
|
||||
logger.info('Lost connection from %s, %s', self.peer_name, ex)
|
||||
self.transport.close()
|
||||
self.iserver.asyncio_transports.remove(self.transport)
|
||||
self.iserver.loop.create_task(self.processor.close())
|
||||
closing_task = self.iserver.loop.create_task(self.processor.close())
|
||||
self.protocol_tasks.append(closing_task)
|
||||
if self in self.clients:
|
||||
self.clients.remove(self)
|
||||
self.messages.put_nowait((None, None))
|
||||
self._task.cancel()
|
||||
self.protocol_tasks.append(self._task)
|
||||
|
||||
def data_received(self, data):
|
||||
self._buffer += data
|
||||
@@ -108,13 +111,19 @@ class BinaryServer:
|
||||
self._server: Optional[asyncio.AbstractServer] = None
|
||||
self._policies = []
|
||||
self.clients = []
|
||||
self.protocol_tasks = []
|
||||
|
||||
def set_policies(self, policies):
|
||||
self._policies = policies
|
||||
|
||||
def _make_protocol(self):
|
||||
"""Protocol Factory"""
|
||||
return OPCUAProtocol(iserver=self.iserver, policies=self._policies, clients=self.clients)
|
||||
return OPCUAProtocol(
|
||||
iserver=self.iserver,
|
||||
policies=self._policies,
|
||||
clients=self.clients,
|
||||
protocol_tasks=self.protocol_tasks,
|
||||
)
|
||||
|
||||
async def start(self):
|
||||
self._server = await self.iserver.loop.create_server(self._make_protocol, self.hostname, self.port)
|
||||
@@ -132,6 +141,14 @@ class BinaryServer:
|
||||
self.logger.info('Closing asyncio socket server')
|
||||
for transport in self.iserver.asyncio_transports:
|
||||
transport.close()
|
||||
|
||||
# Wait for all transport closing tasks to complete
|
||||
results = await asyncio.gather(*self.protocol_tasks, return_exceptions=True)
|
||||
for result in results:
|
||||
if isinstance(result, Exception):
|
||||
self.logger.error(f"An error ocurred while closing a transport: {result}")
|
||||
self.protocol_tasks = []
|
||||
|
||||
if self._server:
|
||||
self.iserver.loop.call_soon(self._server.close)
|
||||
await self._server.wait_closed()
|
||||
|
||||
Reference in New Issue
Block a user