1
0
mirror of https://github.com/FreeOpcUa/opcua-asyncio synced 2025-10-29 17:07:18 +08:00

migration to aiosqlite

This commit is contained in:
Christian Bergmiller 2019-06-21 08:32:06 +02:00
parent 18f4713f33
commit 0a5ca1e44c
2 changed files with 37 additions and 34 deletions

View File

@ -1,4 +1,5 @@
import logging
import aiosqlite
from typing import Iterable, Optional
from datetime import timedelta
from datetime import datetime
@ -25,40 +26,38 @@ class HistorySQLite(HistoryStorageInterface):
self._datachanges_period = {}
self._db_file = path
self._event_fields = {}
self._conn: Optional[sqlite3.Connection] = None
self._db: aiosqlite.Connection = None
self._loop = loop or get_event_loop()
async def init(self):
self._conn = sqlite3.connect(self._db_file, detect_types=sqlite3.PARSE_DECLTYPES, check_same_thread=False)
self._db = aiosqlite.connect(self._db_file, loop=self._loop)
async def stop(self):
await self._loop.run_in_executor(None, self._conn.close)
self._conn = None
await self._db.close()
self.logger.info('Historizing SQL connection closed')
async def _execute_sql(self, sql: str, params: Iterable = None):
return await self._loop.run_in_executor(None, self._conn.execute, sql, params or ())
async def new_historized_node(self, node_id, period, count=0):
table = self._get_table_name(node_id)
self._datachanges_period[node_id] = period, count
# create a table for the node which will store attributes of the DataValue object
# note: Value/VariantType TEXT is only for human reading, the actual data is stored in VariantBinary column
try:
await self._execute_sql(f'CREATE TABLE "{table}" (_Id INTEGER PRIMARY KEY NOT NULL,'
' ServerTimestamp TIMESTAMP,'
' SourceTimestamp TIMESTAMP,'
' StatusCode INTEGER,'
' Value TEXT,'
' VariantType TEXT,'
' VariantBinary BLOB)', None)
await self._db.execute(f'CREATE TABLE "{table}" (_Id INTEGER PRIMARY KEY NOT NULL,'
' ServerTimestamp TIMESTAMP,'
' SourceTimestamp TIMESTAMP,'
' StatusCode INTEGER,'
' Value TEXT,'
' VariantType TEXT,'
' VariantBinary BLOB)', None)
await self._db.commit()
except sqlite3.Error as e:
self.logger.info("Historizing SQL Table Creation Error for %s: %s", node_id, e)
async def execute_sql_delete(self, condition: str, args: Iterable, table: str, node_id):
try:
await self._execute_sql(f'DELETE FROM "{table}" WHERE {condition}', args)
await self._db.execute(f'DELETE FROM "{table}" WHERE {condition}', args)
await self._db.commit()
except sqlite3.Error as e:
self.logger.error("Historizing SQL Delete Old Data Error for %s: %s", node_id, e)
@ -66,7 +65,7 @@ class HistorySQLite(HistoryStorageInterface):
table = self._get_table_name(node_id)
# insert the data change into the database
try:
await self._execute_sql(f'INSERT INTO "{table}" VALUES (NULL, ?, ?, ?, ?, ?, ?)', (
await self._db.execute(f'INSERT INTO "{table}" VALUES (NULL, ?, ?, ?, ?, ?, ?)', (
datavalue.ServerTimestamp,
datavalue.SourceTimestamp,
datavalue.StatusCode.value,
@ -74,6 +73,7 @@ class HistorySQLite(HistoryStorageInterface):
datavalue.Value.VariantType.name,
sqlite3.Binary(variant_to_binary(datavalue.Value))
))
await self._db.commit()
except sqlite3.Error as e:
self.logger.error("Historizing SQL Insert Error for %s: %s", node_id, e)
# get this node's period from the period dict and calculate the limit
@ -89,7 +89,7 @@ class HistorySQLite(HistoryStorageInterface):
f'THEN MIN(SourceTimestamp) ELSE NULL END FROM "{table}")', (count,), table, node_id)
async def read_node_history(self, node_id, start, end, nb_values):
table = self._get_table_name(node_id)
start_time, end_time, order, limit = self._get_bounds(start, end, nb_values)
cont = None
@ -97,8 +97,8 @@ class HistorySQLite(HistoryStorageInterface):
# select values from the database; recreate UA Variant from binary
try:
rows = await self._execute_sql(
f'SELECT * FROM "{table}" WHERE "SourceTimestamp" BETWEEN ? AND ? '
f'ORDER BY "_Id" {order} LIMIT ?', (start_time, end_time, limit,)
f'SELECT * FROM "{table}" WHERE "SourceTimestamp" BETWEEN ? AND ? '
f'ORDER BY "_Id" {order} LIMIT ?', (start_time, end_time, limit,)
)
for row in rows:
# rebuild the data value object
@ -128,11 +128,12 @@ class HistorySQLite(HistoryStorageInterface):
# note that _Timestamp is for SQL query, _EventTypeName is for debugging, be careful not to create event
# properties with these names
try:
self._execute_sql(
await self._db.execute(
f'CREATE TABLE "{table}" '
f'(_Id INTEGER PRIMARY KEY NOT NULL, _Timestamp TIMESTAMP, _EventTypeName TEXT, {columns})',
None
)
await self._db.commit()
except sqlite3.Error as e:
self.logger.info("Historizing SQL Table Creation Error for events from %s: %s", source_id, e)
@ -142,11 +143,12 @@ class HistorySQLite(HistoryStorageInterface):
event_type = event.EventType # useful for troubleshooting database
# insert the event into the database
try:
await self._execute_sql(
await self._db.execute(
f'INSERT INTO "{table}" ("_Id", "_Timestamp", "_EventTypeName", {columns}) '
f'VALUES (NULL, "{event.Time}", "{event_type}", {placeholders})',
evtup
)
await self._db.commit()
except sqlite3.Error as e:
self.logger.error("Historizing SQL Insert Error for events from %s: %s", event.SourceNode, e)
# get this node's period from the period dict and calculate the limit
@ -155,7 +157,8 @@ class HistorySQLite(HistoryStorageInterface):
# after the insert, if a period was specified delete all records older than period
date_limit = datetime.utcnow() - period
try:
await self._execute_sql(f'DELETE FROM "{table}" WHERE Time < ?', (date_limit.isoformat(' '),))
await self._db.execute(f'DELETE FROM "{table}" WHERE Time < ?', (date_limit.isoformat(' '),))
await self._db.commit()
except sqlite3.Error as e:
self.logger.error("Historizing SQL Delete Old Data Error for events from %s: %s", event.SourceNode, e)
@ -168,18 +171,19 @@ class HistorySQLite(HistoryStorageInterface):
results = []
# select events from the database; SQL select clause is built from EventFilter and available fields
try:
for row in await self._execute_sql(
async with self._db.execute(
f'SELECT "_Timestamp", {clauses_str} FROM "{table}" '
f'WHERE "_Timestamp" BETWEEN ? AND ? ORDER BY "_Id" {order} LIMIT ?',
(start_time, end_time, limit)):
fdict = {}
cont_timestamps.append(row[0])
for i, field in enumerate(row[1:]):
if field is not None:
fdict[clauses[i]] = variant_from_binary(Buffer(field))
else:
fdict[clauses[i]] = ua.Variant(None)
results.append(Event.from_field_dict(fdict))
(start_time, end_time, limit)) as cursor:
async for row in cursor:
fdict = {}
cont_timestamps.append(row[0])
for i, field in enumerate(row[1:]):
if field is not None:
fdict[clauses[i]] = variant_from_binary(Buffer(field))
else:
fdict[clauses[i]] = ua.Variant(None)
results.append(Event.from_field_dict(fdict))
except sqlite3.Error as e:
self.logger.error("Historizing SQL Read Error events for node %s: %s", source_id, e)
if nb_values:
@ -277,4 +281,3 @@ class HistorySQLite(HistoryStorageInterface):
def _list_to_sql_str(ls, quotes=True):
items = [f'"{item}"' if quotes else str(item) for item in ls]
return ", ".join(items)

View File

@ -10,7 +10,7 @@ setup(
packages=find_packages(),
provides=["asyncua"],
license="GNU Lesser General Public License v3 or later",
install_requires=["aiofiles", "python-dateutil", "pytz", "lxml", 'cryptography'],
install_requires=["aiofiles", "aiosqlite", "python-dateutil", "pytz", "lxml", 'cryptography'],
classifiers=[
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",