From f926a23e2cb67044a76471af07dc30143b01f8bd Mon Sep 17 00:00:00 2001 From: samson0v Date: Tue, 28 Sep 2021 15:36:46 +0300 Subject: [PATCH] Refactored sqlite storage --- tests/storage/__init__.py | 0 tests/storage/test_database.py | 129 -------- tests/storage/test_database_connector.py | 32 -- tests/storage/test_storage_handler.py | 38 --- .../gateway/tb_gateway_service.py | 110 ++----- .../storage/file/event_storage_reader.py | 8 +- .../storage/file/event_storage_writer.py | 6 +- .../storage/file/file_event_storage.py | 8 +- .../storage/sqlite/database.py | 285 ++---------------- .../storage/sqlite/database_action_type.py | 5 +- .../storage/sqlite/database_connector.py | 33 +- .../storage/sqlite/database_request.py | 6 - .../storage/sqlite/storage_handler.py | 85 ++---- .../storage/sqlite/storage_settings.py | 6 - 14 files changed, 116 insertions(+), 635 deletions(-) delete mode 100644 tests/storage/__init__.py delete mode 100644 tests/storage/test_database.py delete mode 100644 tests/storage/test_database_connector.py delete mode 100644 tests/storage/test_storage_handler.py diff --git a/tests/storage/__init__.py b/tests/storage/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/tests/storage/test_database.py b/tests/storage/test_database.py deleted file mode 100644 index 1ff528bd..00000000 --- a/tests/storage/test_database.py +++ /dev/null @@ -1,129 +0,0 @@ -import unittest -from time import time -from sys import path -import os -from simplejson import loads -path.insert(0, '../../') -from thingsboard_gateway.new_storage.database import Database - -class TestDatabase(unittest.TestCase): - - def setUp(self): - self.config = { - "data_file_path": "./testing.db", - "max_read_records_count": 100, - "max_days_to_store_data": 1 - } - self.db = Database(self.config) - - - def test_create_device_table(self): - - device_name = "DraginoLHT" - - self.db.create_device_table(device_name) - - self.db.cur.execute("SELECT name FROM sqlite_master WHERE type='table';") - device_table = "device_" + device_name - self.assertIn(device_table, self.db.cur.fetchall()[0]) - - - def test_write_to_device_table(self): - - data = { - "deviceName": "DraginoLHT", - "ts": 16888888874, - "telemetry": [ - {"temp": 24.30}, - { "humi": 90} - ] - } - - self.db.create_device_table(data["deviceName"]) - - self.db.write(data) - - ret = self.db.readAll(data["deviceName"]) - - self.assertDictEqual(data, loads(ret[0][1])) - - def test_create_connected_devices_table(self): - - self.db.create_connected_devices_table() - - tables = self.db.get_all_tables() - - self.assertIn('connected_devices', tables) - - def test_get_connected_devices(self): - - # create table for this test - self.db.create_connected_devices_table() - returned_devices = [] - devices = [ - { - "deviceName": "device1", - "deviceType": "humiSensor", - "connector": "MQTT" - }, - { - "deviceName": "device2", - "connector": "Modbus" - }, - { - "deviceName": "device3", - "deviceType": "tempSensor", - "connector": "REST" - } - ] - for device in devices: - self.db.add_new_connecting_device(**device) - - for i, d in enumerate(devices): - if i > 0 and i < 2: - d["deviceType"] = 'default' - - returned_devices.append(tuple(d.values())) - print(returned_devices) - - connected_devices = self.db.get_connected_devices() - - - self.assertCountEqual(returned_devices, connected_devices) - - def test_read_from_to_timestamp(self): - self.db.create_device_table("testingDevice") - - initTimestamp = 1627164000000 - messageCount = 0 - - for _ in range(6): - # create messages in the span of a week where each message - # was taken each day - message = { - "deviceName": "testingDevice", - "ts": initTimestamp, - "data": "123" - } - self.db.write(message) - initTimestamp += 86400000 - messageCount += 1 - # Check data is for the whole week - wholeWeek = self.db.readAll("testingDevice") - self.assertEqual(messageCount, len(wholeWeek)) - - # Check only data 2 days since initTimestamp - twoDays = self.db.read(1627164000000, 1627336799999, "testingDevice") - print("\ntwoDays: %s" % twoDays) - self.assertEqual(2 , len(twoDays)) - - - - def tearDown(self): - # COMMENT IF YOU WANT DB TO EXIST AFTER TESTS - os.remove(self.db.settings.get_data_file_path()) - # pass - - -if __name__ == "__main__": - unittest.main() \ No newline at end of file diff --git a/tests/storage/test_database_connector.py b/tests/storage/test_database_connector.py deleted file mode 100644 index 00ffa227..00000000 --- a/tests/storage/test_database_connector.py +++ /dev/null @@ -1,32 +0,0 @@ -import unittest -from simplejson import loads -import os - -from sys import path - -path.insert(0, "../../") - -from thingsboard_gateway.new_storage.database_connector import DatabaseConnector -from thingsboard_gateway.new_storage.storage_settings import StorageSettings - -class TestDatabaseConnector(unittest.TestCase): - - def setUp(self): - self.config = { - "data_file_path": "./testing.db", - "max_read_records_count": 100 - } - self.settings = StorageSettings(self.config) - - - def test_database_creation(self): - db = DatabaseConnector(self.settings) - db.connect() - - is_created = os.access(self.config["data_file_path"], os.F_OK) - - self.assertTrue(is_created) - - -if __name__ == "__main__": - unittest.main() \ No newline at end of file diff --git a/tests/storage/test_storage_handler.py b/tests/storage/test_storage_handler.py deleted file mode 100644 index 4a928a3b..00000000 --- a/tests/storage/test_storage_handler.py +++ /dev/null @@ -1,38 +0,0 @@ -import unittest -from time import time -from sys import path -import os -from simplejson import loads -path.insert(0, '../../') -from thingsboard_gateway.new_storage.storage_handler import StorageHandler - -class TestStorageHandler(unittest.TestCase): - - def setUp(self): - - config = { - "data_file_path": "./testing.db", - "max_read_records_count": 100, - "max_days_to_store_data": 1 - } - - self.sh = StorageHandler(config) - - def test_get_connected_devices(self): - - devices_add = { - "testDevice": "Mqtt", - "testDevice2": "REST" - } - - for d in devices_add: - print(d) - self.sh.add_device(d, devices_add[d]) - - devices = self.sh.connected_device_names - - print(devices) - - -if __name__ == "__main__": - unittest.main() diff --git a/thingsboard_gateway/gateway/tb_gateway_service.py b/thingsboard_gateway/gateway/tb_gateway_service.py index 31f6c47a..f5213b04 100644 --- a/thingsboard_gateway/gateway/tb_gateway_service.py +++ b/thingsboard_gateway/gateway/tb_gateway_service.py @@ -27,8 +27,8 @@ from simplejson import dumps, load, loads from yaml import safe_load from thingsboard_gateway.gateway.tb_client import TBClient -from thingsboard_gateway.storage.file_event_storage import FileEventStorage -from thingsboard_gateway.storage.memory_event_storage import MemoryEventStorage +from thingsboard_gateway.storage.file.file_event_storage import FileEventStorage +from thingsboard_gateway.storage.memory.memory_event_storage import MemoryEventStorage from thingsboard_gateway.tb_utility.tb_gateway_remote_configurator import RemoteConfigurator from thingsboard_gateway.tb_utility.tb_loader import TBModuleLoader from thingsboard_gateway.tb_utility.tb_logger import TBLoggerHandler @@ -137,16 +137,16 @@ class TBGatewayService: self.__remote_configurator = None self.__request_config_after_connect = False - if self.__config['storage']['type'] == 'sqlite': - self.__connected_devices = self._event_storage.connected_devices - else: - self.__connected_devices = {} - self.__load_persistent_devices() + # if self.__config['storage']['type'] == 'sqlite': + # self.__connected_devices = self._event_storage.connected_devices + # else: + self.__connected_devices = {} + # self.__load_persistent_devices() self.__init_remote_configuration() self._load_connectors() self._connect_with_connectors() - # self.__load_persistent_devices() + self.__load_persistent_devices() self._published_events = Queue(-1) self._send_thread = Thread(target=self.__read_data_from_storage, daemon=True, name="Send data to Thingsboard Thread") @@ -246,7 +246,7 @@ class TBGatewayService: self.__close_connectors() if self.__config['storage']['type'] == 'sqlite': - self._event_storage.closeDB() + self._event_storage.close_db() log.info("The gateway has been stopped.") self.tb_client.disconnect() @@ -415,12 +415,8 @@ class TBGatewayService: data["telemetry"] = telemetry_with_ts else: data["telemetry"] = {"ts": int(time() * 1000), "values": telemetry} - - if self.__config["storage"]["type"] != "sqlite": - json_data = dumps(data) - save_result = self._event_storage.put(json_data) - else: - save_result = self._event_storage.put(data) + json_data = dumps(data) + save_result = self._event_storage.put(json_data) if not save_result: log.error('Data from the device "%s" cannot be saved, connector name is %s.', data["deviceName"], @@ -438,8 +434,8 @@ class TBGatewayService: devices_data_in_event_pack = {} log.debug("Send data Thread has been started successfully.") - disconnected = False - last_disconnect = None + # disconnected = False + # last_disconnect = None while True: try: @@ -447,25 +443,8 @@ class TBGatewayService: size = getsizeof(devices_data_in_event_pack) events = [] - if self.__config["storage"]["type"] == "sqlite": - if disconnected and last_disconnect is not None: - for device in self.__connected_devices: - - data_from_storage = self._event_storage.readFrom(device, last_disconnect) - for singe_data in data_from_storage: - self._event_storage.readQueue.put(singe_data) - - # Reset data recovery upload signal - disconnected = False - - for _ in range(self._event_storage.readQueue.qsize()): - # readQueue.get() returns a json string - events.append(self._event_storage.readQueue.get()) - - # Cover Memory and File storage - else: - if self.__remote_configurator is None or not self.__remote_configurator.in_process: - events = self._event_storage.get_event_pack() + if self.__remote_configurator is None or not self.__remote_configurator.in_process: + events = self._event_storage.get_event_pack() if events: for event in events: @@ -476,6 +455,7 @@ class TBGatewayService: except Exception as e: log.exception(e) continue + if not devices_data_in_event_pack.get(current_event["deviceName"]): devices_data_in_event_pack[current_event["deviceName"]] = {"telemetry": [], "attributes": {}} @@ -534,9 +514,7 @@ class TBGatewayService: success = False sleep(.01) if success: - log.info("Event pack successfully sent!") - if self.__config["storage"]["type"] != "sqlite": - self._event_storage.event_pack_processing_done() + self._event_storage.event_pack_processing_done() del devices_data_in_event_pack devices_data_in_event_pack = {} else: @@ -545,9 +523,6 @@ class TBGatewayService: sleep(.01) else: sleep(.1) - if not disconnected: - last_disconnect = int(time() * 1000) - disconnected = True except Exception as e: log.exception(e) sleep(1) @@ -765,50 +740,21 @@ class TBGatewayService: return summary_messages def add_device(self, device_name, content, device_type=None): - if self.__config["storage"]["type"] == "file" or self.__config['storage']['type'] == 'memory': - if device_name not in self.__saved_devices: - device_type = device_type if device_type is not None else 'default' - self.__connected_devices[device_name] = {**content, "device_type": device_type} - self.__saved_devices[device_name] = {**content, "device_type": device_type} - self.__save_persistent_devices() - self.tb_client.client.gw_connect_device(device_name, device_type) - else: - if device_name not in self.__connected_devices: - log.info("Adding device: %s on connector: %s" % (device_name, str(content))) - device_type = device_type if device_type is not None else 'default' - - # Storage handler handles device connections - if isinstance(content, dict): - log.debug("content is instance of Dict") - self._event_storage.add_device(device_name, content.get("connector").get_name(), device_type) - elif isinstance(content, str): - log.debug("content is instance of String") - self._event_storage.add_device(device_name, content, device_type) - else: - log.error("Cannot find connector name in content") - - self.__connected_devices = self._event_storage.get_connected_devices() - - self.tb_client.client.gw_connect_device(device_name, device_type) - else: - log.debug("Device: %s is already added!", device_name) + if device_name not in self.__saved_devices: + device_type = device_type if device_type is not None else 'default' + self.__connected_devices[device_name] = {**content, "device_type": device_type} + self.__saved_devices[device_name] = {**content, "device_type": device_type} + self.__save_persistent_devices() + self.tb_client.client.gw_connect_device(device_name, device_type) def del_device(self, device_name): - if self.__config["storage"]["type"] == "sqlite": - self._event_storage.del_device(device_name) - self.__connected_devices = self._event_storage.get_connected_devices() - self.tb_client.client.gw_disconnect_device(device_name) - else: - del self.__connected_devices[device_name] - del self.__saved_devices[device_name] - self.tb_client.client.gw_disconnect_device(device_name) - self.__save_persistent_devices() + del self.__connected_devices[device_name] + del self.__saved_devices[device_name] + self.tb_client.client.gw_disconnect_device(device_name) + self.__save_persistent_devices() def get_devices(self): - if self.__config["storage"]["type"] == "sqlite": - return self._event_storage.get_connected_devices() - else: - return self.__connected_devices + return self.__connected_devices def __load_persistent_devices(self): devices = {} diff --git a/thingsboard_gateway/storage/file/event_storage_reader.py b/thingsboard_gateway/storage/file/event_storage_reader.py index 552afc0d..596562a9 100644 --- a/thingsboard_gateway/storage/file/event_storage_reader.py +++ b/thingsboard_gateway/storage/file/event_storage_reader.py @@ -19,10 +19,10 @@ from os.path import exists from simplejson import JSONDecodeError, dumps, load -from thingsboard_gateway.storage.event_storage_files import EventStorageFiles -from thingsboard_gateway.storage.event_storage_reader_pointer import EventStorageReaderPointer -from thingsboard_gateway.storage.file_event_storage import log -from thingsboard_gateway.storage.file_event_storage_settings import FileEventStorageSettings +from thingsboard_gateway.storage.file.event_storage_files import EventStorageFiles +from thingsboard_gateway.storage.file.event_storage_reader_pointer import EventStorageReaderPointer +from thingsboard_gateway.storage.file.file_event_storage import log +from thingsboard_gateway.storage.file.file_event_storage_settings import FileEventStorageSettings class EventStorageReader: diff --git a/thingsboard_gateway/storage/file/event_storage_writer.py b/thingsboard_gateway/storage/file/event_storage_writer.py index f79bf832..92408975 100644 --- a/thingsboard_gateway/storage/file/event_storage_writer.py +++ b/thingsboard_gateway/storage/file/event_storage_writer.py @@ -18,9 +18,9 @@ from os import O_CREAT, O_EXCL, close as os_close, linesep, open as os_open from os.path import exists from time import time -from thingsboard_gateway.storage.event_storage_files import EventStorageFiles -from thingsboard_gateway.storage.file_event_storage import log -from thingsboard_gateway.storage.file_event_storage_settings import FileEventStorageSettings +from thingsboard_gateway.storage.file.event_storage_files import EventStorageFiles +from thingsboard_gateway.storage.file.file_event_storage import log +from thingsboard_gateway.storage.file.file_event_storage_settings import FileEventStorageSettings class DataFileCountError(Exception): diff --git a/thingsboard_gateway/storage/file/file_event_storage.py b/thingsboard_gateway/storage/file/file_event_storage.py index 96c1a0dc..fc23221c 100644 --- a/thingsboard_gateway/storage/file/file_event_storage.py +++ b/thingsboard_gateway/storage/file/file_event_storage.py @@ -18,10 +18,10 @@ import time from simplejson import dump from thingsboard_gateway.storage.event_storage import EventStorage, log -from thingsboard_gateway.storage.event_storage_files import EventStorageFiles -from thingsboard_gateway.storage.event_storage_reader import EventStorageReader -from thingsboard_gateway.storage.event_storage_writer import DataFileCountError, EventStorageWriter -from thingsboard_gateway.storage.file_event_storage_settings import FileEventStorageSettings +from thingsboard_gateway.storage.file.event_storage_files import EventStorageFiles +from thingsboard_gateway.storage.file.event_storage_reader import EventStorageReader +from thingsboard_gateway.storage.file.event_storage_writer import DataFileCountError, EventStorageWriter +from thingsboard_gateway.storage.file.file_event_storage_settings import FileEventStorageSettings class FileEventStorage(EventStorage): diff --git a/thingsboard_gateway/storage/sqlite/database.py b/thingsboard_gateway/storage/sqlite/database.py index d0055c2b..1ea475b2 100644 --- a/thingsboard_gateway/storage/sqlite/database.py +++ b/thingsboard_gateway/storage/sqlite/database.py @@ -13,21 +13,19 @@ # limitations under the License. from os.path import exists -from simplejson import dumps from time import time -from datetime import datetime, timedelta -from hashlib import sha1 from logging import getLogger +from threading import Thread +from queue import Queue from thingsboard_gateway.storage.sqlite.database_connector import DatabaseConnector from thingsboard_gateway.storage.sqlite.database_action_type import DatabaseActionType -from thingsboard_gateway.storage.sqlite.database_request import DatabaseRequest from thingsboard_gateway.storage.sqlite.storage_settings import StorageSettings log = getLogger("database") -class Database: +class Database(Thread): """ What this component does: - abstracts creating tables for devices. @@ -37,7 +35,9 @@ class Database: ------------- ALL OF THIS IN AN ATOMIC WAY --------- """ - def __init__(self, config): + def __init__(self, config, processing_queue: Queue): + super().__init__() + self.setDaemon(True) self.settings = StorageSettings(config) if not exists(self.settings.data_folder_path): @@ -49,10 +49,10 @@ class Database: self.db.connect() - self.cur = self.db.get_cursor() + # self.cur = self.db.get_cursor() # process Queue - self.processQueue = None + self.processQueue = processing_queue # Response Queue self.readQueue = None @@ -60,160 +60,24 @@ class Database: self.__processing = False self.msg_counter = 0 + self.start() - def add_new_connecting_device(self, deviceName=None, connector=None, deviceType=None): - + def init_table(self): try: - if connector is None: - log.error("Connector was not specified") - return - - if isinstance(connector, dict): - connector_name = connector["connector"] - elif isinstance(connector, str): - connector_name = connector - - dataSavedIndex = 1 - dataUploadedIndex = 0 - - log.debug("Inserting new connecting device to DB") - self.cur.execute( - '''INSERT INTO connected_devices(deviceName,deviceType,connector,dataSavedIndex,dataUploadedIndex) VALUES(?,?,?,?,?);''', - [deviceName, deviceType, connector_name, dataSavedIndex, dataUploadedIndex]) - - self.db.commit() - - except Exception as e: - self.db.rollback() - log.exception(e) - - def del_connected_device(self, deviceName): - try: - self.cur.execute(''' - DELETE FROM connected_devices WHERE deviceName = ? ;''', [deviceName]) - self.db.commit() - - except Exception as e: - self.db.rollback() - log.exception(e) - - """ - TODO: update_connected_device - - update device in database - don't know why but its in legacy API - """ - - def update_device_data_index(self, deviceName, dataIndex): - try: - log.debug("Updating device %s storage data index to: %d" % (deviceName, dataIndex)) - self.cur.execute(''' - UPDATE connected_devices SET dataSavedIndex = ? WHERE deviceName = ?''', [dataIndex, deviceName]) - + self.db.execute('''CREATE TABLE IF NOT EXISTS messages (timestamp INTEGER, message TEXT); ''') self.db.commit() except Exception as e: self.db.rollback() log.exception(e) - def create_connected_devices_table(self): - try: - self.cur.execute(''' - CREATE TABLE IF NOT EXISTS connected_devices (deviceName TEXT, deviceType TEXT, connector TEXT, dataSavedIndex INTEGER, dataUploadedIndex INTEGER);''') + def run(self): + while True: + self.process() - # dataIndex is a rowid of the actual device table that was read to cluster - - self.db.commit() - - except Exception as e: - self.db.rollback() - log.exception(e) - - def create_device_table(self, deviceName): - """ - Params: - deviceName: str - Desc: - Creates a table for each connected device. - Where connectors can store converted messages for backup - Device name is Hashed so that SQLite doesn't have a problem with - special symbols like: "-" "." "_" ... - """ - - try: - - h = sha1() - h.update(bytes(deviceName, 'utf-8')) - - device_table = h.hexdigest()[:10].upper() - device_table = "_" + device_table - - self.cur.execute(''' - CREATE TABLE IF NOT EXISTS ''' + device_table + - ''' (dataIndex INTEGER PRIMARY KEY, timestamp INTEGER, message TEXT); ''') - - self.db.commit() - - except Exception as e: - self.db.rollback() - log.exception(e) - - def get_all_tables(self): - """ - Return list of all tables - """ - try: - self.cur.execute("SELECT name FROM sqlite_master WHERE type='table';") - - return self.cur.fetchall()[0] - - except Exception as e: - self.db.rollback() - log.exception(e) - - def get_connected_devices(self): - """ - Returns a list of connected devices in database - """ - try: - self.cur.execute("SELECT * FROM connected_devices;") - - return self.cur.fetchall() - - except Exception as e: - self.db.rollback() - log.exception(e) - - def delete_old_storage_data(self): - try: - today = datetime.today() - days_to_store_data = float(self.settings.get_max_days_to_store_data()) - log.debug("days to store data: %s" % str(days_to_store_data)) - older_than = timedelta(days=days_to_store_data) - - old_after = (today - older_than).timestamp() * 1000 - - # get all device tables and for each delete older rows - # than config specifies - device_tables = self.get_connected_devices() - log.debug("Deleting data older than %d" % old_after) - for device in device_tables: - h = sha1() - h.update(bytes(device[0], 'utf-8')) - - device_table = h.hexdigest()[:10].upper() - device_table = "_" + device_table - self.cur.execute("DELETE FROM " + device_table + " WHERE timestamp <= ?", [str(old_after)]) - self.db.commit() - - except Exception as e: - self.db.rollback() - log.exception(e) - - # TESTED - # PROCESSING def process(self): try: # Signalization so that we can spam call process() - if not self.__processing: + if not self.__processing and self.processQueue: self.__processing = True while self.processQueue.qsize() > 0: @@ -222,132 +86,33 @@ class Database: log.debug("Processing %s" % req.type) if req.type is DatabaseActionType.WRITE_DATA_STORAGE: - message = req.get_data() + message = req.data - h = sha1() - h.update(bytes(message.get("deviceName"), 'utf-8')) + timestamp = time() - device_table = h.hexdigest()[:10].upper() - device_table = "_" + device_table - - timestamp = message.get("ts", message.get("timestamp", int(time()) * 1000)) - - self.cur.execute(''' - INSERT INTO ''' + device_table + '''(timestamp, message) VALUES (?, ?);''', - [timestamp, dumps(message)]) + self.db.execute('''INSERT INTO messages (timestamp, message) VALUES (?, ?);''', + [timestamp, message]) self.db.commit() - self.readQueue.put(dumps(message)) - - self.msg_counter += 1 - - # We are checking old data every N messages - if self.msg_counter >= self.settings.check_data_freshness_in_messages: - # Deleting old data base on how many days were defined in - # tb_gateway.yaml config - _type = DatabaseActionType.DELETE_OLD_DATA - data = None - req = DatabaseRequest(_type, data) - self.processQueue.put(req) - self.msg_counter = 0 - - continue - - if req.type is DatabaseActionType.WRITE_STORAGE_INDEX: - # 0 - deviceName - # 1 - storageIndex - data = req.get_data() - - log.debug("%s" % str(data)) - # log.debug("Updating device %s storage data index to: %d" % (data[0], data[1])) - self.cur.execute(''' - UPDATE connected_devices SET dataSavedIndex = ? WHERE deviceName = ?''', - [data[1], data[0]]) - - self.db.commit() - continue - - if req.type is DatabaseActionType.READ_DEVICE: - # Expects 2 arguments: - # - DeviceName - # - ts (timestamp from which to read to present) - - data = req.get_data() - - deviceName = data.get("deviceName") - ts = data.get("ts") - - h = sha1() - h.update(bytes(deviceName, 'utf-8')) - - device_table = h.hexdigest()[:10].upper() - device_table = "_" + device_table - self.cur.execute(''' - SELECT message FROM ''' + device_table + " WHERE timestamp >= ? ;", [ts]) - - data_pack = self.cur.fetchall() - log.debug(str(data_pack)) - - for single_data in data_pack: - self.readQueue.put(single_data[0]) - continue - - if req.type is DatabaseActionType.READ_CONNECTED_DEVICES: - - # here req.get_data() returns handle from storage_handler.py to set self.connected_devices - data = req.get_data() - - connected_devices_querry = self.get_connected_devices() - - devices = {} - for device in connected_devices_querry: - devices[device[0]] = {"connector": device[2], "device_type": device[1], - 'data_saved_index': device[3], 'data_uploaded_index': device[4]} - log.debug("Appending device %s to return connected_devices" % device[0]) - log.debug("Returning %s" % str(devices)) - - data.connected_devices = devices - continue - - if req.type is DatabaseActionType.DELETE_OLD_DATA: - self.delete_old_storage_data() - continue - self.__processing = False except Exception as e: self.db.rollback() log.exception(e) - def readAll(self, deviceName): + def read_data(self, ts): try: - h = sha1() - h.update(bytes(deviceName, 'utf-8')) - - device_table = h.hexdigest()[:10].upper() - device_table = "_" + device_table - self.cur.execute(''' - SELECT message FROM ''' + device_table + ";") - - return self.cur.fetchall() - + data = self.db.execute('''SELECT message FROM messages WHERE timestamp >= ? ;''', [ts]) + return data except Exception as e: self.db.rollback() log.exception(e) - def readFrom(self, deviceName, ts): + def delete_data(self, ts): try: - h = sha1() - h.update(bytes(deviceName, 'utf-8')) - - device_table = h.hexdigest()[:10].upper() - device_table = "_" + device_table - self.cur.execute(''' - SELECT message FROM ''' + device_table + " WHERE timestamp >= ? ;", [ts]) - - return self.cur.fetchall() - + data = self.db.execute('''DELETE FROM messages WHERE timestamp >= ? ;''', [ts]) + return data except Exception as e: self.db.rollback() log.exception(e) diff --git a/thingsboard_gateway/storage/sqlite/database_action_type.py b/thingsboard_gateway/storage/sqlite/database_action_type.py index d1cb99dc..a6871073 100644 --- a/thingsboard_gateway/storage/sqlite/database_action_type.py +++ b/thingsboard_gateway/storage/sqlite/database_action_type.py @@ -17,7 +17,4 @@ from enum import Enum, auto class DatabaseActionType(Enum): WRITE_DATA_STORAGE = auto() # Writes do not require a response on the request - WRITE_STORAGE_INDEX = auto() - READ_CONNECTED_DEVICES = auto() # Reads need response to get requested data - READ_DEVICE = auto() # RPC CALL - DELETE_OLD_DATA = auto() + diff --git a/thingsboard_gateway/storage/sqlite/database_connector.py b/thingsboard_gateway/storage/sqlite/database_connector.py index a6d2803b..1e55f005 100644 --- a/thingsboard_gateway/storage/sqlite/database_connector.py +++ b/thingsboard_gateway/storage/sqlite/database_connector.py @@ -12,7 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -from sqlite3 import connect +import sqlite3 +from sqlite3 import connect, Connection +from threading import RLock +from typing import Optional + from thingsboard_gateway.storage.sqlite.storage_settings import StorageSettings @@ -23,8 +27,9 @@ log = getLogger("storage") class DatabaseConnector: def __init__(self, settings: StorageSettings): - self.data_file_path = settings.get_data_file_path() - self.connection = None + self.data_file_path = settings.data_folder_path + self.connection: Optional[Connection] = None + self.lock = RLock() def connect(self): """ @@ -41,18 +46,33 @@ class DatabaseConnector: """ log.debug("Committing changes to DB") try: - self.connection.commit() + with self.lock: + self.connection.commit() except Exception as e: log.exception(e) + def execute(self, *args): + """ + Execute changes + """ + # log.debug("Execute %s to DB", str(args)) + try: + with self.lock: + return self.connection.execute(*args) + except sqlite3.ProgrammingError: + pass + except Exception as e: + log.exception(e) + def rollback(self): """ Rollback changes after exception """ log.debug("Rollback transaction") try: - self.connection.rollback() + with self.lock: + self.connection.rollback() except Exception as e: log.exception(e) @@ -62,7 +82,8 @@ class DatabaseConnector: Closes database file """ try: - self.connection.close() + with self.lock: + self.connection.close() except Exception as e: log.exception(e) diff --git a/thingsboard_gateway/storage/sqlite/database_request.py b/thingsboard_gateway/storage/sqlite/database_request.py index 86159440..90185ce2 100644 --- a/thingsboard_gateway/storage/sqlite/database_request.py +++ b/thingsboard_gateway/storage/sqlite/database_request.py @@ -21,9 +21,3 @@ class DatabaseRequest: def __init__(self, _type: DatabaseActionType, data): self.type = _type self.data = data - - def get_type(self): - return self.type - - def get_data(self): - return self.data diff --git a/thingsboard_gateway/storage/sqlite/storage_handler.py b/thingsboard_gateway/storage/sqlite/storage_handler.py index 2bc095dd..feb1327c 100644 --- a/thingsboard_gateway/storage/sqlite/storage_handler.py +++ b/thingsboard_gateway/storage/sqlite/storage_handler.py @@ -11,7 +11,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import sqlite3 +from time import time +from thingsboard_gateway.storage.event_storage import EventStorage from thingsboard_gateway.storage.sqlite.database import Database from queue import Queue from thingsboard_gateway.storage.sqlite.database_request import DatabaseRequest @@ -24,96 +27,56 @@ from logging import getLogger log = getLogger("storage") -class StorageHandler: +class StorageHandler(EventStorage): """ HIGH level api for thingsboard_gateway main loop """ def __init__(self, config): log.info("Sqlite Storage initializing...") - - self.db = Database(config) - - # We need queues to stay atomic when multiple connectors/Threads are - # trying to write or read from database log.info("Initializing read and process queues") self.processQueue = Queue(-1) self.readQueue = Queue(-1) + self.db = Database(config, self.processQueue) + self.db.setReadQueue(self.readQueue) self.db.setProcessQueue(self.processQueue) - # Create table if not exists for connected devices - self.db.create_connected_devices_table() - self.connected_devices = self.get_connected_devices() + self.db.init_table() log.info("Sqlite storage initialized!") + self.delete_time_point = None + self.last_read = time() + self.closed = False - def get_connected_devices(self): - """ - Util func, to only parse and store connected devices names in a list - """ - _type = DatabaseActionType.READ_CONNECTED_DEVICES - data = self - req = DatabaseRequest(_type, data) - self.processQueue.put(req) + def get_event_pack(self): + self.delete_time_point = self.last_read + data_from_storage = self.read_data(self.last_read) + self.last_read = time() - self.db.process() + return [item[0] for item in data_from_storage or []] - return self.connected_devices + def event_pack_processing_done(self): + self.delete_data(self.delete_time_point) - def readAll(self, deviceName): - return self.db.readAll(deviceName) + def read_data(self, ts): + return self.db.read_data(ts) - def readFrom(self, deviceName, ts): - return self.db.readFrom(deviceName, ts) + def delete_data(self, ts): + return self.db.delete_data(ts) def put(self, message): try: - - device_name = message.get("deviceName") - - if device_name is not None and device_name not in self.connected_devices: - self.db.create_device_table(device_name) - _type = DatabaseActionType.WRITE_DATA_STORAGE request = DatabaseRequest(_type, message) log.info("Sending data to storage") self.processQueue.put(request) - - # Left for discussion - log.debug("data %s from device %s " % (str(self.connected_devices[device_name]), device_name)) - self.connected_devices[device_name]["data_saved_index"] += 1 - - storageIndex = self.connected_devices[device_name]["data_saved_index"] - data = (device_name, storageIndex) - _type = DatabaseActionType.WRITE_STORAGE_INDEX - log.debug("Index request data: %s" % str(data)) - index_request = DatabaseRequest(_type, data) - - log.debug("Updating device storage index") - self.processQueue.put(index_request) - - self.db.process() # This call is necessary + self.db.process() return True except Exception as e: log.exception(e) - def add_device(self, deviceName, connector, deviceType=None): - - self.db.add_new_connecting_device(deviceName, connector, deviceType) - # Update connected devices list - self.connected_devices = self.get_connected_devices() - - # Create device table - self.db.create_device_table(deviceName) - - def del_device(self, device_name): - - self.db.del_connected_device(device_name) - - # Update connected devices list - self.connected_devices = self.get_connected_devices() - - def closeDB(self): + def close_db(self): self.db.closeDB() + self.closed = True diff --git a/thingsboard_gateway/storage/sqlite/storage_settings.py b/thingsboard_gateway/storage/sqlite/storage_settings.py index 30b79210..be150be7 100644 --- a/thingsboard_gateway/storage/sqlite/storage_settings.py +++ b/thingsboard_gateway/storage/sqlite/storage_settings.py @@ -17,9 +17,3 @@ class StorageSettings: self.data_folder_path = config.get("data_file_path", "./") self.max_days_to_store_data = config.get("max_days_to_store_data", 7) self.check_data_freshness_in_messages = config.get('check_data_freshness_in_messages', 10) - - def get_data_file_path(self): - return self.data_folder_path - - def get_max_days_to_store_data(self): - return self.max_days_to_store_data