1
0
mirror of https://github.com/FreeOpcUa/opcua-asyncio synced 2025-10-29 17:07:18 +08:00

Merge pull request #65 from FreeOpcUa/fix_60_aiosqlite

Fix 60 aiosqlite
This commit is contained in:
cbergmiller 2019-09-01 08:13:01 +02:00 committed by GitHub
commit 52f1453b77
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 67 additions and 73 deletions

View File

@ -7,7 +7,7 @@ install:
- pip install pytz
- pip install lxml
- pip install aiofiles
- pip install asyncio-contextmanager
- pip install aiosqlite
- pip install pytest --upgrade
- pip install pytest-asyncio
- pip install cryptography

View File

@ -13,11 +13,11 @@ class UaNodeAlreadyHistorizedError(ua.UaError):
class HistoryStorageInterface:
"""
Interface of a history backend.
Must be implemented by backends
"""
async def init(self):
"""
Async. initialization.
@ -185,7 +185,7 @@ class HistoryDict(HistoryStorageInterface):
class SubHandler:
def __init__(self, storage, loop):
self.storage = storage
self.loop = loop
self.loop = loop
def datachange_notification(self, node, val, data):
self.loop.create_task(
@ -274,7 +274,7 @@ class HistoryManager:
"""
if node in self._handlers:
await self._sub.unsubscribe(self._handlers[node])
del(self._handlers[node])
del (self._handlers[node])
else:
self.logger.error("History Manager isn't subscribed to %s", node)
@ -329,9 +329,9 @@ class HistoryManager:
starttime = ua.ua_binary.Primitives.DateTime.unpack(Buffer(rv.ContinuationPoint))
dv, cont = await self.storage.read_node_history(rv.NodeId,
starttime,
details.EndTime,
details.NumValuesPerNode)
starttime,
details.EndTime,
details.NumValuesPerNode)
if cont:
cont = ua.ua_binary.Primitives.DateTime.pack(cont)
# rv.IndexRange
@ -348,10 +348,10 @@ class HistoryManager:
starttime = ua.ua_binary.Primitives.DateTime.unpack(Buffer(rv.ContinuationPoint))
evts, cont = await self.storage.read_event_history(rv.NodeId,
starttime,
details.EndTime,
details.NumValuesPerNode,
details.Filter)
starttime,
details.EndTime,
details.NumValuesPerNode,
details.Filter)
results = []
for ev in evts:
field_list = ua.HistoryEventFieldList()

View File

@ -1,9 +1,10 @@
import logging
from typing import Iterable, Optional
import aiosqlite
import sqlite3
from typing import Iterable
from datetime import timedelta
from datetime import datetime
from asyncio import get_event_loop
import sqlite3
from asyncua import ua
from ..ua.ua_binary import variant_from_binary, variant_to_binary
@ -25,48 +26,45 @@ class HistorySQLite(HistoryStorageInterface):
self._datachanges_period = {}
self._db_file = path
self._event_fields = {}
self._conn: Optional[sqlite3.Connection] = None
self._db: aiosqlite.Connection = None
self._loop = loop or get_event_loop()
async def init(self):
self._conn = sqlite3.connect(self._db_file, detect_types=sqlite3.PARSE_DECLTYPES, check_same_thread=False)
self._db = await aiosqlite.connect(self._db_file, loop=self._loop, detect_types=sqlite3.PARSE_DECLTYPES)
async def stop(self):
await self._loop.run_in_executor(None, self._conn.close)
self._conn = None
await self._db.close()
self.logger.info('Historizing SQL connection closed')
async def _execute_sql(self, sql: str, params: Iterable = None):
return await self._loop.run_in_executor(None, self._conn.execute, sql, params or ())
async def new_historized_node(self, node_id, period, count=0):
table = self._get_table_name(node_id)
self._datachanges_period[node_id] = period, count
# create a table for the node which will store attributes of the DataValue object
# note: Value/VariantType TEXT is only for human reading, the actual data is stored in VariantBinary column
try:
await self._execute_sql(f'CREATE TABLE "{table}" (_Id INTEGER PRIMARY KEY NOT NULL,'
' ServerTimestamp TIMESTAMP,'
' SourceTimestamp TIMESTAMP,'
' StatusCode INTEGER,'
' Value TEXT,'
' VariantType TEXT,'
' VariantBinary BLOB)', None)
except sqlite3.Error as e:
await self._db.execute(f'CREATE TABLE "{table}" (_Id INTEGER PRIMARY KEY NOT NULL,'
' ServerTimestamp TIMESTAMP,'
' SourceTimestamp TIMESTAMP,'
' StatusCode INTEGER,'
' Value TEXT,'
' VariantType TEXT,'
' VariantBinary BLOB)', None)
await self._db.commit()
except aiosqlite.Error as e:
self.logger.info("Historizing SQL Table Creation Error for %s: %s", node_id, e)
async def execute_sql_delete(self, condition: str, args: Iterable, table: str, node_id):
try:
await self._execute_sql(f'DELETE FROM "{table}" WHERE {condition}', args)
except sqlite3.Error as e:
await self._db.execute(f'DELETE FROM "{table}" WHERE {condition}', args)
await self._db.commit()
except aiosqlite.Error as e:
self.logger.error("Historizing SQL Delete Old Data Error for %s: %s", node_id, e)
async def save_node_value(self, node_id, datavalue):
table = self._get_table_name(node_id)
# insert the data change into the database
try:
await self._execute_sql(f'INSERT INTO "{table}" VALUES (NULL, ?, ?, ?, ?, ?, ?)', (
await self._db.execute(f'INSERT INTO "{table}" VALUES (NULL, ?, ?, ?, ?, ?, ?)', (
datavalue.ServerTimestamp,
datavalue.SourceTimestamp,
datavalue.StatusCode.value,
@ -74,7 +72,8 @@ class HistorySQLite(HistoryStorageInterface):
datavalue.Value.VariantType.name,
sqlite3.Binary(variant_to_binary(datavalue.Value))
))
except sqlite3.Error as e:
await self._db.commit()
except aiosqlite.Error as e:
self.logger.error("Historizing SQL Insert Error for %s: %s", node_id, e)
# get this node's period from the period dict and calculate the limit
period, count = self._datachanges_period[node_id]
@ -89,28 +88,24 @@ class HistorySQLite(HistoryStorageInterface):
f'THEN MIN(SourceTimestamp) ELSE NULL END FROM "{table}")', (count,), table, node_id)
async def read_node_history(self, node_id, start, end, nb_values):
table = self._get_table_name(node_id)
start_time, end_time, order, limit = self._get_bounds(start, end, nb_values)
cont = None
results = []
# select values from the database; recreate UA Variant from binary
try:
rows = await self._execute_sql(
async with self._db.execute(
f'SELECT * FROM "{table}" WHERE "SourceTimestamp" BETWEEN ? AND ? '
f'ORDER BY "_Id" {order} LIMIT ?', (start_time, end_time, limit,)
)
for row in rows:
# rebuild the data value object
dv = ua.DataValue(variant_from_binary(Buffer(row[6])))
dv.ServerTimestamp = row[1]
dv.SourceTimestamp = row[2]
dv.StatusCode = ua.StatusCode(row[3])
results.append(dv)
except sqlite3.Error as e:
f'ORDER BY "_Id" {order} LIMIT ?', (start_time, end_time, limit,)) as cursor:
async for row in cursor:
# rebuild the data value object
dv = ua.DataValue(variant_from_binary(Buffer(row[6])))
dv.ServerTimestamp = row[1]
dv.SourceTimestamp = row[2]
dv.StatusCode = ua.StatusCode(row[3])
results.append(dv)
except aiosqlite.Error as e:
self.logger.error("Historizing SQL Read Error for %s: %s", node_id, e)
if nb_values:
if len(results) > nb_values:
cont = results[nb_values].SourceTimestamp
@ -128,12 +123,13 @@ class HistorySQLite(HistoryStorageInterface):
# note that _Timestamp is for SQL query, _EventTypeName is for debugging, be careful not to create event
# properties with these names
try:
self._execute_sql(
await self._db.execute(
f'CREATE TABLE "{table}" '
f'(_Id INTEGER PRIMARY KEY NOT NULL, _Timestamp TIMESTAMP, _EventTypeName TEXT, {columns})',
None
)
except sqlite3.Error as e:
await self._db.commit()
except aiosqlite.Error as e:
self.logger.info("Historizing SQL Table Creation Error for events from %s: %s", source_id, e)
async def save_event(self, event):
@ -142,12 +138,13 @@ class HistorySQLite(HistoryStorageInterface):
event_type = event.EventType # useful for troubleshooting database
# insert the event into the database
try:
await self._execute_sql(
await self._db.execute(
f'INSERT INTO "{table}" ("_Id", "_Timestamp", "_EventTypeName", {columns}) '
f'VALUES (NULL, "{event.Time}", "{event_type}", {placeholders})',
evtup
)
except sqlite3.Error as e:
await self._db.commit()
except aiosqlite.Error as e:
self.logger.error("Historizing SQL Insert Error for events from %s: %s", event.SourceNode, e)
# get this node's period from the period dict and calculate the limit
period = self._datachanges_period[event.SourceNode]
@ -155,8 +152,9 @@ class HistorySQLite(HistoryStorageInterface):
# after the insert, if a period was specified delete all records older than period
date_limit = datetime.utcnow() - period
try:
await self._execute_sql(f'DELETE FROM "{table}" WHERE Time < ?', (date_limit.isoformat(' '),))
except sqlite3.Error as e:
await self._db.execute(f'DELETE FROM "{table}" WHERE Time < ?', (date_limit.isoformat(' '),))
await self._db.commit()
except aiosqlite.Error as e:
self.logger.error("Historizing SQL Delete Old Data Error for events from %s: %s", event.SourceNode, e)
async def read_event_history(self, source_id, start, end, nb_values, evfilter):
@ -168,19 +166,20 @@ class HistorySQLite(HistoryStorageInterface):
results = []
# select events from the database; SQL select clause is built from EventFilter and available fields
try:
for row in await self._execute_sql(
async with self._db.execute(
f'SELECT "_Timestamp", {clauses_str} FROM "{table}" '
f'WHERE "_Timestamp" BETWEEN ? AND ? ORDER BY "_Id" {order} LIMIT ?',
(start_time, end_time, limit)):
fdict = {}
cont_timestamps.append(row[0])
for i, field in enumerate(row[1:]):
if field is not None:
fdict[clauses[i]] = variant_from_binary(Buffer(field))
else:
fdict[clauses[i]] = ua.Variant(None)
results.append(Event.from_field_dict(fdict))
except sqlite3.Error as e:
(start_time, end_time, limit)) as cursor:
async for row in cursor:
fdict = {}
cont_timestamps.append(row[0])
for i, field in enumerate(row[1:]):
if field is not None:
fdict[clauses[i]] = variant_from_binary(Buffer(field))
else:
fdict[clauses[i]] = ua.Variant(None)
results.append(Event.from_field_dict(fdict))
except aiosqlite.Error as e:
self.logger.error("Historizing SQL Read Error events for node %s: %s", source_id, e)
if nb_values:
if len(results) > nb_values: # start > ua.get_win_epoch() and
@ -277,4 +276,3 @@ class HistorySQLite(HistoryStorageInterface):
def _list_to_sql_str(ls, quotes=True):
items = [f'"{item}"' if quotes else str(item) for item in ls]
return ", ".join(items)

View File

@ -36,7 +36,7 @@ class UTC(tzinfo):
return timedelta(0)
def datetime_to_win_epoch(dt):
def datetime_to_win_epoch(dt: datetime):
"""method copied from David Buxton <david@gasmark6.com> sample code"""
if (dt.tzinfo is None) or (dt.tzinfo.utcoffset(dt) is None):
dt = dt.replace(tzinfo=UTC())

View File

@ -10,7 +10,7 @@ setup(
packages=find_packages(),
provides=["asyncua"],
license="GNU Lesser General Public License v3 or later",
install_requires=["aiofiles", "python-dateutil", "pytz", "lxml", 'cryptography'],
install_requires=["aiofiles", "aiosqlite", "python-dateutil", "pytz", "lxml", 'cryptography'],
classifiers=[
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",

View File

@ -19,13 +19,9 @@ def pytest_generate_tests(metafunc):
if 'opc' in metafunc.fixturenames:
metafunc.parametrize('opc', ['client', 'server'], indirect=True)
elif 'history' in metafunc.fixturenames:
#metafunc.parametrize('history', ['dict', 'sqlite'], indirect=True)
#FIXME: disable sqlite backend, it breaks
metafunc.parametrize('history', ['dict'], indirect=True)
metafunc.parametrize('history', ['dict', 'sqlite'], indirect=True)
elif 'history_server' in metafunc.fixturenames:
#FIXME: disable sqlite backend, it breaks
#metafunc.parametrize('history_server', ['dict', 'sqlite'], indirect=True)
metafunc.parametrize('history_server', ['dict'], indirect=True)
metafunc.parametrize('history_server', ['dict', 'sqlite'], indirect=True)
@pytest.yield_fixture(scope='module')