From f3de377a8fa8be11481a2976120a1a89a24d499f Mon Sep 17 00:00:00 2001 From: oroulet Date: Sat, 5 Sep 2020 19:39:32 +0200 Subject: [PATCH] Joey faulkner bugfix/server memory leak (#272) * implement oroulet changes * closing_task must be a task not onl run once * do not catch CancelleError in closing_Task * make sure we use the correct loop in asyncio binary_server Co-authored-by: Joey Faulkner --- asyncua/server/binary_server_asyncio.py | 40 +++++++++++++++++-------- 1 file changed, 28 insertions(+), 12 deletions(-) diff --git a/asyncua/server/binary_server_asyncio.py b/asyncua/server/binary_server_asyncio.py index 29e0ac80..0be29e2d 100644 --- a/asyncua/server/binary_server_asyncio.py +++ b/asyncua/server/binary_server_asyncio.py @@ -18,7 +18,7 @@ class OPCUAProtocol(asyncio.Protocol): Instantiated for every connection. """ - def __init__(self, iserver: InternalServer, policies, clients, protocol_tasks): + def __init__(self, iserver: InternalServer, policies, clients, closing_tasks): self.peer_name = None self.transport = None self.processor = None @@ -26,7 +26,7 @@ class OPCUAProtocol(asyncio.Protocol): self.iserver: InternalServer = iserver self.policies = policies self.clients = clients - self.protocol_tasks = protocol_tasks + self.closing_tasks = closing_tasks self.messages = asyncio.Queue() self._task = None @@ -50,12 +50,11 @@ class OPCUAProtocol(asyncio.Protocol): self.transport.close() self.iserver.asyncio_transports.remove(self.transport) closing_task = self.iserver.loop.create_task(self.processor.close()) - self.protocol_tasks.append(closing_task) + self.closing_tasks.append(closing_task) if self in self.clients: self.clients.remove(self) self.messages.put_nowait((None, None)) self._task.cancel() - self.protocol_tasks.append(self._task) def data_received(self, data): self._buffer += data @@ -111,7 +110,8 @@ class BinaryServer: self._server: Optional[asyncio.AbstractServer] = None self._policies = [] self.clients = [] - self.protocol_tasks = [] + self.closing_tasks = [] + self.cleanup_task = None def set_policies(self, policies): self._policies = policies @@ -122,7 +122,7 @@ class BinaryServer: iserver=self.iserver, policies=self._policies, clients=self.clients, - protocol_tasks=self.protocol_tasks, + closing_tasks=self.closing_tasks, ) async def start(self): @@ -136,19 +136,35 @@ class BinaryServer: self.hostname = sockname[0] self.port = sockname[1] self.logger.info('Listening on %s:%s', self.hostname, self.port) + self.cleanup_task = self.iserver.loop.create_task(self._await_closing_tasks()) async def stop(self): self.logger.info('Closing asyncio socket server') for transport in self.iserver.asyncio_transports: transport.close() - # Wait for all transport closing tasks to complete - results = await asyncio.gather(*self.protocol_tasks, return_exceptions=True) - for result in results: - if isinstance(result, Exception): - self.logger.error(f"An error ocurred while closing a transport: {result}") - self.protocol_tasks = [] + # stop cleanup process and run it a last time + self.cleanup_task.cancel() + try: + await self.cleanup_task + except asyncio.CancelledError: + pass + await self._await_closing_tasks(recursive=False) if self._server: self.iserver.loop.call_soon(self._server.close) await self._server.wait_closed() + + async def _await_closing_tasks(self, recursive=True): + while self.closing_tasks: + task = self.closing_tasks.pop() + try: + await task + except asyncio.CancelledError: + # this means a stop request has been sent, it should not be catched + raise + except Exception: + logger.exception("Unexpected crash in BinaryServer._await_closing_tasks") + if recursive: + await asyncio.sleep(10) + await self._await_closing_tasks()