mirror of
https://github.com/thingsboard/thingsboard-gateway
synced 2025-10-26 22:31:42 +08:00
Refactored sqlite storage
This commit is contained in:
@@ -1,129 +0,0 @@
|
||||
import unittest
|
||||
from time import time
|
||||
from sys import path
|
||||
import os
|
||||
from simplejson import loads
|
||||
path.insert(0, '../../')
|
||||
from thingsboard_gateway.new_storage.database import Database
|
||||
|
||||
class TestDatabase(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.config = {
|
||||
"data_file_path": "./testing.db",
|
||||
"max_read_records_count": 100,
|
||||
"max_days_to_store_data": 1
|
||||
}
|
||||
self.db = Database(self.config)
|
||||
|
||||
|
||||
def test_create_device_table(self):
|
||||
|
||||
device_name = "DraginoLHT"
|
||||
|
||||
self.db.create_device_table(device_name)
|
||||
|
||||
self.db.cur.execute("SELECT name FROM sqlite_master WHERE type='table';")
|
||||
device_table = "device_" + device_name
|
||||
self.assertIn(device_table, self.db.cur.fetchall()[0])
|
||||
|
||||
|
||||
def test_write_to_device_table(self):
|
||||
|
||||
data = {
|
||||
"deviceName": "DraginoLHT",
|
||||
"ts": 16888888874,
|
||||
"telemetry": [
|
||||
{"temp": 24.30},
|
||||
{ "humi": 90}
|
||||
]
|
||||
}
|
||||
|
||||
self.db.create_device_table(data["deviceName"])
|
||||
|
||||
self.db.write(data)
|
||||
|
||||
ret = self.db.readAll(data["deviceName"])
|
||||
|
||||
self.assertDictEqual(data, loads(ret[0][1]))
|
||||
|
||||
def test_create_connected_devices_table(self):
|
||||
|
||||
self.db.create_connected_devices_table()
|
||||
|
||||
tables = self.db.get_all_tables()
|
||||
|
||||
self.assertIn('connected_devices', tables)
|
||||
|
||||
def test_get_connected_devices(self):
|
||||
|
||||
# create table for this test
|
||||
self.db.create_connected_devices_table()
|
||||
returned_devices = []
|
||||
devices = [
|
||||
{
|
||||
"deviceName": "device1",
|
||||
"deviceType": "humiSensor",
|
||||
"connector": "MQTT"
|
||||
},
|
||||
{
|
||||
"deviceName": "device2",
|
||||
"connector": "Modbus"
|
||||
},
|
||||
{
|
||||
"deviceName": "device3",
|
||||
"deviceType": "tempSensor",
|
||||
"connector": "REST"
|
||||
}
|
||||
]
|
||||
for device in devices:
|
||||
self.db.add_new_connecting_device(**device)
|
||||
|
||||
for i, d in enumerate(devices):
|
||||
if i > 0 and i < 2:
|
||||
d["deviceType"] = 'default'
|
||||
|
||||
returned_devices.append(tuple(d.values()))
|
||||
print(returned_devices)
|
||||
|
||||
connected_devices = self.db.get_connected_devices()
|
||||
|
||||
|
||||
self.assertCountEqual(returned_devices, connected_devices)
|
||||
|
||||
def test_read_from_to_timestamp(self):
|
||||
self.db.create_device_table("testingDevice")
|
||||
|
||||
initTimestamp = 1627164000000
|
||||
messageCount = 0
|
||||
|
||||
for _ in range(6):
|
||||
# create messages in the span of a week where each message
|
||||
# was taken each day
|
||||
message = {
|
||||
"deviceName": "testingDevice",
|
||||
"ts": initTimestamp,
|
||||
"data": "123"
|
||||
}
|
||||
self.db.write(message)
|
||||
initTimestamp += 86400000
|
||||
messageCount += 1
|
||||
# Check data is for the whole week
|
||||
wholeWeek = self.db.readAll("testingDevice")
|
||||
self.assertEqual(messageCount, len(wholeWeek))
|
||||
|
||||
# Check only data 2 days since initTimestamp
|
||||
twoDays = self.db.read(1627164000000, 1627336799999, "testingDevice")
|
||||
print("\ntwoDays: %s" % twoDays)
|
||||
self.assertEqual(2 , len(twoDays))
|
||||
|
||||
|
||||
|
||||
def tearDown(self):
|
||||
# COMMENT IF YOU WANT DB TO EXIST AFTER TESTS
|
||||
os.remove(self.db.settings.get_data_file_path())
|
||||
# pass
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -1,32 +0,0 @@
|
||||
import unittest
|
||||
from simplejson import loads
|
||||
import os
|
||||
|
||||
from sys import path
|
||||
|
||||
path.insert(0, "../../")
|
||||
|
||||
from thingsboard_gateway.new_storage.database_connector import DatabaseConnector
|
||||
from thingsboard_gateway.new_storage.storage_settings import StorageSettings
|
||||
|
||||
class TestDatabaseConnector(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.config = {
|
||||
"data_file_path": "./testing.db",
|
||||
"max_read_records_count": 100
|
||||
}
|
||||
self.settings = StorageSettings(self.config)
|
||||
|
||||
|
||||
def test_database_creation(self):
|
||||
db = DatabaseConnector(self.settings)
|
||||
db.connect()
|
||||
|
||||
is_created = os.access(self.config["data_file_path"], os.F_OK)
|
||||
|
||||
self.assertTrue(is_created)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -1,38 +0,0 @@
|
||||
import unittest
|
||||
from time import time
|
||||
from sys import path
|
||||
import os
|
||||
from simplejson import loads
|
||||
path.insert(0, '../../')
|
||||
from thingsboard_gateway.new_storage.storage_handler import StorageHandler
|
||||
|
||||
class TestStorageHandler(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
|
||||
config = {
|
||||
"data_file_path": "./testing.db",
|
||||
"max_read_records_count": 100,
|
||||
"max_days_to_store_data": 1
|
||||
}
|
||||
|
||||
self.sh = StorageHandler(config)
|
||||
|
||||
def test_get_connected_devices(self):
|
||||
|
||||
devices_add = {
|
||||
"testDevice": "Mqtt",
|
||||
"testDevice2": "REST"
|
||||
}
|
||||
|
||||
for d in devices_add:
|
||||
print(d)
|
||||
self.sh.add_device(d, devices_add[d])
|
||||
|
||||
devices = self.sh.connected_device_names
|
||||
|
||||
print(devices)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -27,8 +27,8 @@ from simplejson import dumps, load, loads
|
||||
from yaml import safe_load
|
||||
|
||||
from thingsboard_gateway.gateway.tb_client import TBClient
|
||||
from thingsboard_gateway.storage.file_event_storage import FileEventStorage
|
||||
from thingsboard_gateway.storage.memory_event_storage import MemoryEventStorage
|
||||
from thingsboard_gateway.storage.file.file_event_storage import FileEventStorage
|
||||
from thingsboard_gateway.storage.memory.memory_event_storage import MemoryEventStorage
|
||||
from thingsboard_gateway.tb_utility.tb_gateway_remote_configurator import RemoteConfigurator
|
||||
from thingsboard_gateway.tb_utility.tb_loader import TBModuleLoader
|
||||
from thingsboard_gateway.tb_utility.tb_logger import TBLoggerHandler
|
||||
@@ -137,16 +137,16 @@ class TBGatewayService:
|
||||
self.__remote_configurator = None
|
||||
self.__request_config_after_connect = False
|
||||
|
||||
if self.__config['storage']['type'] == 'sqlite':
|
||||
self.__connected_devices = self._event_storage.connected_devices
|
||||
else:
|
||||
self.__connected_devices = {}
|
||||
self.__load_persistent_devices()
|
||||
# if self.__config['storage']['type'] == 'sqlite':
|
||||
# self.__connected_devices = self._event_storage.connected_devices
|
||||
# else:
|
||||
self.__connected_devices = {}
|
||||
# self.__load_persistent_devices()
|
||||
|
||||
self.__init_remote_configuration()
|
||||
self._load_connectors()
|
||||
self._connect_with_connectors()
|
||||
# self.__load_persistent_devices()
|
||||
self.__load_persistent_devices()
|
||||
self._published_events = Queue(-1)
|
||||
self._send_thread = Thread(target=self.__read_data_from_storage, daemon=True,
|
||||
name="Send data to Thingsboard Thread")
|
||||
@@ -246,7 +246,7 @@ class TBGatewayService:
|
||||
self.__close_connectors()
|
||||
|
||||
if self.__config['storage']['type'] == 'sqlite':
|
||||
self._event_storage.closeDB()
|
||||
self._event_storage.close_db()
|
||||
|
||||
log.info("The gateway has been stopped.")
|
||||
self.tb_client.disconnect()
|
||||
@@ -415,12 +415,8 @@ class TBGatewayService:
|
||||
data["telemetry"] = telemetry_with_ts
|
||||
else:
|
||||
data["telemetry"] = {"ts": int(time() * 1000), "values": telemetry}
|
||||
|
||||
if self.__config["storage"]["type"] != "sqlite":
|
||||
json_data = dumps(data)
|
||||
save_result = self._event_storage.put(json_data)
|
||||
else:
|
||||
save_result = self._event_storage.put(data)
|
||||
json_data = dumps(data)
|
||||
save_result = self._event_storage.put(json_data)
|
||||
if not save_result:
|
||||
log.error('Data from the device "%s" cannot be saved, connector name is %s.',
|
||||
data["deviceName"],
|
||||
@@ -438,8 +434,8 @@ class TBGatewayService:
|
||||
devices_data_in_event_pack = {}
|
||||
log.debug("Send data Thread has been started successfully.")
|
||||
|
||||
disconnected = False
|
||||
last_disconnect = None
|
||||
# disconnected = False
|
||||
# last_disconnect = None
|
||||
|
||||
while True:
|
||||
try:
|
||||
@@ -447,25 +443,8 @@ class TBGatewayService:
|
||||
size = getsizeof(devices_data_in_event_pack)
|
||||
events = []
|
||||
|
||||
if self.__config["storage"]["type"] == "sqlite":
|
||||
if disconnected and last_disconnect is not None:
|
||||
for device in self.__connected_devices:
|
||||
|
||||
data_from_storage = self._event_storage.readFrom(device, last_disconnect)
|
||||
for singe_data in data_from_storage:
|
||||
self._event_storage.readQueue.put(singe_data)
|
||||
|
||||
# Reset data recovery upload signal
|
||||
disconnected = False
|
||||
|
||||
for _ in range(self._event_storage.readQueue.qsize()):
|
||||
# readQueue.get() returns a json string
|
||||
events.append(self._event_storage.readQueue.get())
|
||||
|
||||
# Cover Memory and File storage
|
||||
else:
|
||||
if self.__remote_configurator is None or not self.__remote_configurator.in_process:
|
||||
events = self._event_storage.get_event_pack()
|
||||
if self.__remote_configurator is None or not self.__remote_configurator.in_process:
|
||||
events = self._event_storage.get_event_pack()
|
||||
|
||||
if events:
|
||||
for event in events:
|
||||
@@ -476,6 +455,7 @@ class TBGatewayService:
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
continue
|
||||
|
||||
if not devices_data_in_event_pack.get(current_event["deviceName"]):
|
||||
devices_data_in_event_pack[current_event["deviceName"]] = {"telemetry": [],
|
||||
"attributes": {}}
|
||||
@@ -534,9 +514,7 @@ class TBGatewayService:
|
||||
success = False
|
||||
sleep(.01)
|
||||
if success:
|
||||
log.info("Event pack successfully sent!")
|
||||
if self.__config["storage"]["type"] != "sqlite":
|
||||
self._event_storage.event_pack_processing_done()
|
||||
self._event_storage.event_pack_processing_done()
|
||||
del devices_data_in_event_pack
|
||||
devices_data_in_event_pack = {}
|
||||
else:
|
||||
@@ -545,9 +523,6 @@ class TBGatewayService:
|
||||
sleep(.01)
|
||||
else:
|
||||
sleep(.1)
|
||||
if not disconnected:
|
||||
last_disconnect = int(time() * 1000)
|
||||
disconnected = True
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
sleep(1)
|
||||
@@ -765,50 +740,21 @@ class TBGatewayService:
|
||||
return summary_messages
|
||||
|
||||
def add_device(self, device_name, content, device_type=None):
|
||||
if self.__config["storage"]["type"] == "file" or self.__config['storage']['type'] == 'memory':
|
||||
if device_name not in self.__saved_devices:
|
||||
device_type = device_type if device_type is not None else 'default'
|
||||
self.__connected_devices[device_name] = {**content, "device_type": device_type}
|
||||
self.__saved_devices[device_name] = {**content, "device_type": device_type}
|
||||
self.__save_persistent_devices()
|
||||
self.tb_client.client.gw_connect_device(device_name, device_type)
|
||||
else:
|
||||
if device_name not in self.__connected_devices:
|
||||
log.info("Adding device: %s on connector: %s" % (device_name, str(content)))
|
||||
device_type = device_type if device_type is not None else 'default'
|
||||
|
||||
# Storage handler handles device connections
|
||||
if isinstance(content, dict):
|
||||
log.debug("content is instance of Dict")
|
||||
self._event_storage.add_device(device_name, content.get("connector").get_name(), device_type)
|
||||
elif isinstance(content, str):
|
||||
log.debug("content is instance of String")
|
||||
self._event_storage.add_device(device_name, content, device_type)
|
||||
else:
|
||||
log.error("Cannot find connector name in content")
|
||||
|
||||
self.__connected_devices = self._event_storage.get_connected_devices()
|
||||
|
||||
self.tb_client.client.gw_connect_device(device_name, device_type)
|
||||
else:
|
||||
log.debug("Device: %s is already added!", device_name)
|
||||
if device_name not in self.__saved_devices:
|
||||
device_type = device_type if device_type is not None else 'default'
|
||||
self.__connected_devices[device_name] = {**content, "device_type": device_type}
|
||||
self.__saved_devices[device_name] = {**content, "device_type": device_type}
|
||||
self.__save_persistent_devices()
|
||||
self.tb_client.client.gw_connect_device(device_name, device_type)
|
||||
|
||||
def del_device(self, device_name):
|
||||
if self.__config["storage"]["type"] == "sqlite":
|
||||
self._event_storage.del_device(device_name)
|
||||
self.__connected_devices = self._event_storage.get_connected_devices()
|
||||
self.tb_client.client.gw_disconnect_device(device_name)
|
||||
else:
|
||||
del self.__connected_devices[device_name]
|
||||
del self.__saved_devices[device_name]
|
||||
self.tb_client.client.gw_disconnect_device(device_name)
|
||||
self.__save_persistent_devices()
|
||||
del self.__connected_devices[device_name]
|
||||
del self.__saved_devices[device_name]
|
||||
self.tb_client.client.gw_disconnect_device(device_name)
|
||||
self.__save_persistent_devices()
|
||||
|
||||
def get_devices(self):
|
||||
if self.__config["storage"]["type"] == "sqlite":
|
||||
return self._event_storage.get_connected_devices()
|
||||
else:
|
||||
return self.__connected_devices
|
||||
return self.__connected_devices
|
||||
|
||||
def __load_persistent_devices(self):
|
||||
devices = {}
|
||||
|
||||
@@ -19,10 +19,10 @@ from os.path import exists
|
||||
|
||||
from simplejson import JSONDecodeError, dumps, load
|
||||
|
||||
from thingsboard_gateway.storage.event_storage_files import EventStorageFiles
|
||||
from thingsboard_gateway.storage.event_storage_reader_pointer import EventStorageReaderPointer
|
||||
from thingsboard_gateway.storage.file_event_storage import log
|
||||
from thingsboard_gateway.storage.file_event_storage_settings import FileEventStorageSettings
|
||||
from thingsboard_gateway.storage.file.event_storage_files import EventStorageFiles
|
||||
from thingsboard_gateway.storage.file.event_storage_reader_pointer import EventStorageReaderPointer
|
||||
from thingsboard_gateway.storage.file.file_event_storage import log
|
||||
from thingsboard_gateway.storage.file.file_event_storage_settings import FileEventStorageSettings
|
||||
|
||||
|
||||
class EventStorageReader:
|
||||
|
||||
@@ -18,9 +18,9 @@ from os import O_CREAT, O_EXCL, close as os_close, linesep, open as os_open
|
||||
from os.path import exists
|
||||
from time import time
|
||||
|
||||
from thingsboard_gateway.storage.event_storage_files import EventStorageFiles
|
||||
from thingsboard_gateway.storage.file_event_storage import log
|
||||
from thingsboard_gateway.storage.file_event_storage_settings import FileEventStorageSettings
|
||||
from thingsboard_gateway.storage.file.event_storage_files import EventStorageFiles
|
||||
from thingsboard_gateway.storage.file.file_event_storage import log
|
||||
from thingsboard_gateway.storage.file.file_event_storage_settings import FileEventStorageSettings
|
||||
|
||||
|
||||
class DataFileCountError(Exception):
|
||||
|
||||
@@ -18,10 +18,10 @@ import time
|
||||
from simplejson import dump
|
||||
|
||||
from thingsboard_gateway.storage.event_storage import EventStorage, log
|
||||
from thingsboard_gateway.storage.event_storage_files import EventStorageFiles
|
||||
from thingsboard_gateway.storage.event_storage_reader import EventStorageReader
|
||||
from thingsboard_gateway.storage.event_storage_writer import DataFileCountError, EventStorageWriter
|
||||
from thingsboard_gateway.storage.file_event_storage_settings import FileEventStorageSettings
|
||||
from thingsboard_gateway.storage.file.event_storage_files import EventStorageFiles
|
||||
from thingsboard_gateway.storage.file.event_storage_reader import EventStorageReader
|
||||
from thingsboard_gateway.storage.file.event_storage_writer import DataFileCountError, EventStorageWriter
|
||||
from thingsboard_gateway.storage.file.file_event_storage_settings import FileEventStorageSettings
|
||||
|
||||
|
||||
class FileEventStorage(EventStorage):
|
||||
|
||||
@@ -13,21 +13,19 @@
|
||||
# limitations under the License.
|
||||
|
||||
from os.path import exists
|
||||
from simplejson import dumps
|
||||
from time import time
|
||||
from datetime import datetime, timedelta
|
||||
from hashlib import sha1
|
||||
from logging import getLogger
|
||||
from threading import Thread
|
||||
from queue import Queue
|
||||
|
||||
from thingsboard_gateway.storage.sqlite.database_connector import DatabaseConnector
|
||||
from thingsboard_gateway.storage.sqlite.database_action_type import DatabaseActionType
|
||||
from thingsboard_gateway.storage.sqlite.database_request import DatabaseRequest
|
||||
from thingsboard_gateway.storage.sqlite.storage_settings import StorageSettings
|
||||
|
||||
log = getLogger("database")
|
||||
|
||||
|
||||
class Database:
|
||||
class Database(Thread):
|
||||
"""
|
||||
What this component does:
|
||||
- abstracts creating tables for devices.
|
||||
@@ -37,7 +35,9 @@ class Database:
|
||||
------------- ALL OF THIS IN AN ATOMIC WAY ---------
|
||||
"""
|
||||
|
||||
def __init__(self, config):
|
||||
def __init__(self, config, processing_queue: Queue):
|
||||
super().__init__()
|
||||
self.setDaemon(True)
|
||||
self.settings = StorageSettings(config)
|
||||
|
||||
if not exists(self.settings.data_folder_path):
|
||||
@@ -49,10 +49,10 @@ class Database:
|
||||
|
||||
self.db.connect()
|
||||
|
||||
self.cur = self.db.get_cursor()
|
||||
# self.cur = self.db.get_cursor()
|
||||
|
||||
# process Queue
|
||||
self.processQueue = None
|
||||
self.processQueue = processing_queue
|
||||
# Response Queue
|
||||
self.readQueue = None
|
||||
|
||||
@@ -60,160 +60,24 @@ class Database:
|
||||
self.__processing = False
|
||||
|
||||
self.msg_counter = 0
|
||||
self.start()
|
||||
|
||||
def add_new_connecting_device(self, deviceName=None, connector=None, deviceType=None):
|
||||
|
||||
def init_table(self):
|
||||
try:
|
||||
if connector is None:
|
||||
log.error("Connector was not specified")
|
||||
return
|
||||
|
||||
if isinstance(connector, dict):
|
||||
connector_name = connector["connector"]
|
||||
elif isinstance(connector, str):
|
||||
connector_name = connector
|
||||
|
||||
dataSavedIndex = 1
|
||||
dataUploadedIndex = 0
|
||||
|
||||
log.debug("Inserting new connecting device to DB")
|
||||
self.cur.execute(
|
||||
'''INSERT INTO connected_devices(deviceName,deviceType,connector,dataSavedIndex,dataUploadedIndex) VALUES(?,?,?,?,?);''',
|
||||
[deviceName, deviceType, connector_name, dataSavedIndex, dataUploadedIndex])
|
||||
|
||||
self.db.commit()
|
||||
|
||||
except Exception as e:
|
||||
self.db.rollback()
|
||||
log.exception(e)
|
||||
|
||||
def del_connected_device(self, deviceName):
|
||||
try:
|
||||
self.cur.execute('''
|
||||
DELETE FROM connected_devices WHERE deviceName = ? ;''', [deviceName])
|
||||
self.db.commit()
|
||||
|
||||
except Exception as e:
|
||||
self.db.rollback()
|
||||
log.exception(e)
|
||||
|
||||
"""
|
||||
TODO: update_connected_device
|
||||
- update device in database
|
||||
don't know why but its in legacy API
|
||||
"""
|
||||
|
||||
def update_device_data_index(self, deviceName, dataIndex):
|
||||
try:
|
||||
log.debug("Updating device %s storage data index to: %d" % (deviceName, dataIndex))
|
||||
self.cur.execute('''
|
||||
UPDATE connected_devices SET dataSavedIndex = ? WHERE deviceName = ?''', [dataIndex, deviceName])
|
||||
|
||||
self.db.execute('''CREATE TABLE IF NOT EXISTS messages (timestamp INTEGER, message TEXT); ''')
|
||||
self.db.commit()
|
||||
except Exception as e:
|
||||
self.db.rollback()
|
||||
log.exception(e)
|
||||
|
||||
def create_connected_devices_table(self):
|
||||
try:
|
||||
self.cur.execute('''
|
||||
CREATE TABLE IF NOT EXISTS connected_devices (deviceName TEXT, deviceType TEXT, connector TEXT, dataSavedIndex INTEGER, dataUploadedIndex INTEGER);''')
|
||||
def run(self):
|
||||
while True:
|
||||
self.process()
|
||||
|
||||
# dataIndex is a rowid of the actual device table that was read to cluster
|
||||
|
||||
self.db.commit()
|
||||
|
||||
except Exception as e:
|
||||
self.db.rollback()
|
||||
log.exception(e)
|
||||
|
||||
def create_device_table(self, deviceName):
|
||||
"""
|
||||
Params:
|
||||
deviceName: str
|
||||
Desc:
|
||||
Creates a table for each connected device.
|
||||
Where connectors can store converted messages for backup
|
||||
Device name is Hashed so that SQLite doesn't have a problem with
|
||||
special symbols like: "-" "." "_" ...
|
||||
"""
|
||||
|
||||
try:
|
||||
|
||||
h = sha1()
|
||||
h.update(bytes(deviceName, 'utf-8'))
|
||||
|
||||
device_table = h.hexdigest()[:10].upper()
|
||||
device_table = "_" + device_table
|
||||
|
||||
self.cur.execute('''
|
||||
CREATE TABLE IF NOT EXISTS ''' + device_table +
|
||||
''' (dataIndex INTEGER PRIMARY KEY, timestamp INTEGER, message TEXT); ''')
|
||||
|
||||
self.db.commit()
|
||||
|
||||
except Exception as e:
|
||||
self.db.rollback()
|
||||
log.exception(e)
|
||||
|
||||
def get_all_tables(self):
|
||||
"""
|
||||
Return list of all tables
|
||||
"""
|
||||
try:
|
||||
self.cur.execute("SELECT name FROM sqlite_master WHERE type='table';")
|
||||
|
||||
return self.cur.fetchall()[0]
|
||||
|
||||
except Exception as e:
|
||||
self.db.rollback()
|
||||
log.exception(e)
|
||||
|
||||
def get_connected_devices(self):
|
||||
"""
|
||||
Returns a list of connected devices in database
|
||||
"""
|
||||
try:
|
||||
self.cur.execute("SELECT * FROM connected_devices;")
|
||||
|
||||
return self.cur.fetchall()
|
||||
|
||||
except Exception as e:
|
||||
self.db.rollback()
|
||||
log.exception(e)
|
||||
|
||||
def delete_old_storage_data(self):
|
||||
try:
|
||||
today = datetime.today()
|
||||
days_to_store_data = float(self.settings.get_max_days_to_store_data())
|
||||
log.debug("days to store data: %s" % str(days_to_store_data))
|
||||
older_than = timedelta(days=days_to_store_data)
|
||||
|
||||
old_after = (today - older_than).timestamp() * 1000
|
||||
|
||||
# get all device tables and for each delete older rows
|
||||
# than config specifies
|
||||
device_tables = self.get_connected_devices()
|
||||
log.debug("Deleting data older than %d" % old_after)
|
||||
for device in device_tables:
|
||||
h = sha1()
|
||||
h.update(bytes(device[0], 'utf-8'))
|
||||
|
||||
device_table = h.hexdigest()[:10].upper()
|
||||
device_table = "_" + device_table
|
||||
self.cur.execute("DELETE FROM " + device_table + " WHERE timestamp <= ?", [str(old_after)])
|
||||
self.db.commit()
|
||||
|
||||
except Exception as e:
|
||||
self.db.rollback()
|
||||
log.exception(e)
|
||||
|
||||
# TESTED
|
||||
# PROCESSING
|
||||
def process(self):
|
||||
try:
|
||||
# Signalization so that we can spam call process()
|
||||
if not self.__processing:
|
||||
if not self.__processing and self.processQueue:
|
||||
self.__processing = True
|
||||
while self.processQueue.qsize() > 0:
|
||||
|
||||
@@ -222,132 +86,33 @@ class Database:
|
||||
log.debug("Processing %s" % req.type)
|
||||
if req.type is DatabaseActionType.WRITE_DATA_STORAGE:
|
||||
|
||||
message = req.get_data()
|
||||
message = req.data
|
||||
|
||||
h = sha1()
|
||||
h.update(bytes(message.get("deviceName"), 'utf-8'))
|
||||
timestamp = time()
|
||||
|
||||
device_table = h.hexdigest()[:10].upper()
|
||||
device_table = "_" + device_table
|
||||
|
||||
timestamp = message.get("ts", message.get("timestamp", int(time()) * 1000))
|
||||
|
||||
self.cur.execute('''
|
||||
INSERT INTO ''' + device_table + '''(timestamp, message) VALUES (?, ?);''',
|
||||
[timestamp, dumps(message)])
|
||||
self.db.execute('''INSERT INTO messages (timestamp, message) VALUES (?, ?);''',
|
||||
[timestamp, message])
|
||||
|
||||
self.db.commit()
|
||||
|
||||
self.readQueue.put(dumps(message))
|
||||
|
||||
self.msg_counter += 1
|
||||
|
||||
# We are checking old data every N messages
|
||||
if self.msg_counter >= self.settings.check_data_freshness_in_messages:
|
||||
# Deleting old data base on how many days were defined in
|
||||
# tb_gateway.yaml config
|
||||
_type = DatabaseActionType.DELETE_OLD_DATA
|
||||
data = None
|
||||
req = DatabaseRequest(_type, data)
|
||||
self.processQueue.put(req)
|
||||
self.msg_counter = 0
|
||||
|
||||
continue
|
||||
|
||||
if req.type is DatabaseActionType.WRITE_STORAGE_INDEX:
|
||||
# 0 - deviceName
|
||||
# 1 - storageIndex
|
||||
data = req.get_data()
|
||||
|
||||
log.debug("%s" % str(data))
|
||||
# log.debug("Updating device %s storage data index to: %d" % (data[0], data[1]))
|
||||
self.cur.execute('''
|
||||
UPDATE connected_devices SET dataSavedIndex = ? WHERE deviceName = ?''',
|
||||
[data[1], data[0]])
|
||||
|
||||
self.db.commit()
|
||||
continue
|
||||
|
||||
if req.type is DatabaseActionType.READ_DEVICE:
|
||||
# Expects 2 arguments:
|
||||
# - DeviceName
|
||||
# - ts (timestamp from which to read to present)
|
||||
|
||||
data = req.get_data()
|
||||
|
||||
deviceName = data.get("deviceName")
|
||||
ts = data.get("ts")
|
||||
|
||||
h = sha1()
|
||||
h.update(bytes(deviceName, 'utf-8'))
|
||||
|
||||
device_table = h.hexdigest()[:10].upper()
|
||||
device_table = "_" + device_table
|
||||
self.cur.execute('''
|
||||
SELECT message FROM ''' + device_table + " WHERE timestamp >= ? ;", [ts])
|
||||
|
||||
data_pack = self.cur.fetchall()
|
||||
log.debug(str(data_pack))
|
||||
|
||||
for single_data in data_pack:
|
||||
self.readQueue.put(single_data[0])
|
||||
continue
|
||||
|
||||
if req.type is DatabaseActionType.READ_CONNECTED_DEVICES:
|
||||
|
||||
# here req.get_data() returns handle from storage_handler.py to set self.connected_devices
|
||||
data = req.get_data()
|
||||
|
||||
connected_devices_querry = self.get_connected_devices()
|
||||
|
||||
devices = {}
|
||||
for device in connected_devices_querry:
|
||||
devices[device[0]] = {"connector": device[2], "device_type": device[1],
|
||||
'data_saved_index': device[3], 'data_uploaded_index': device[4]}
|
||||
log.debug("Appending device %s to return connected_devices" % device[0])
|
||||
log.debug("Returning %s" % str(devices))
|
||||
|
||||
data.connected_devices = devices
|
||||
continue
|
||||
|
||||
if req.type is DatabaseActionType.DELETE_OLD_DATA:
|
||||
self.delete_old_storage_data()
|
||||
continue
|
||||
|
||||
self.__processing = False
|
||||
|
||||
except Exception as e:
|
||||
self.db.rollback()
|
||||
log.exception(e)
|
||||
|
||||
def readAll(self, deviceName):
|
||||
def read_data(self, ts):
|
||||
try:
|
||||
h = sha1()
|
||||
h.update(bytes(deviceName, 'utf-8'))
|
||||
|
||||
device_table = h.hexdigest()[:10].upper()
|
||||
device_table = "_" + device_table
|
||||
self.cur.execute('''
|
||||
SELECT message FROM ''' + device_table + ";")
|
||||
|
||||
return self.cur.fetchall()
|
||||
|
||||
data = self.db.execute('''SELECT message FROM messages WHERE timestamp >= ? ;''', [ts])
|
||||
return data
|
||||
except Exception as e:
|
||||
self.db.rollback()
|
||||
log.exception(e)
|
||||
|
||||
def readFrom(self, deviceName, ts):
|
||||
def delete_data(self, ts):
|
||||
try:
|
||||
h = sha1()
|
||||
h.update(bytes(deviceName, 'utf-8'))
|
||||
|
||||
device_table = h.hexdigest()[:10].upper()
|
||||
device_table = "_" + device_table
|
||||
self.cur.execute('''
|
||||
SELECT message FROM ''' + device_table + " WHERE timestamp >= ? ;", [ts])
|
||||
|
||||
return self.cur.fetchall()
|
||||
|
||||
data = self.db.execute('''DELETE FROM messages WHERE timestamp >= ? ;''', [ts])
|
||||
return data
|
||||
except Exception as e:
|
||||
self.db.rollback()
|
||||
log.exception(e)
|
||||
|
||||
@@ -17,7 +17,4 @@ from enum import Enum, auto
|
||||
|
||||
class DatabaseActionType(Enum):
|
||||
WRITE_DATA_STORAGE = auto() # Writes do not require a response on the request
|
||||
WRITE_STORAGE_INDEX = auto()
|
||||
READ_CONNECTED_DEVICES = auto() # Reads need response to get requested data
|
||||
READ_DEVICE = auto() # RPC CALL
|
||||
DELETE_OLD_DATA = auto()
|
||||
|
||||
|
||||
@@ -12,7 +12,11 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from sqlite3 import connect
|
||||
import sqlite3
|
||||
from sqlite3 import connect, Connection
|
||||
from threading import RLock
|
||||
from typing import Optional
|
||||
|
||||
from thingsboard_gateway.storage.sqlite.storage_settings import StorageSettings
|
||||
|
||||
|
||||
@@ -23,8 +27,9 @@ log = getLogger("storage")
|
||||
|
||||
class DatabaseConnector:
|
||||
def __init__(self, settings: StorageSettings):
|
||||
self.data_file_path = settings.get_data_file_path()
|
||||
self.connection = None
|
||||
self.data_file_path = settings.data_folder_path
|
||||
self.connection: Optional[Connection] = None
|
||||
self.lock = RLock()
|
||||
|
||||
def connect(self):
|
||||
"""
|
||||
@@ -41,18 +46,33 @@ class DatabaseConnector:
|
||||
"""
|
||||
log.debug("Committing changes to DB")
|
||||
try:
|
||||
self.connection.commit()
|
||||
with self.lock:
|
||||
self.connection.commit()
|
||||
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
|
||||
def execute(self, *args):
|
||||
"""
|
||||
Execute changes
|
||||
"""
|
||||
# log.debug("Execute %s to DB", str(args))
|
||||
try:
|
||||
with self.lock:
|
||||
return self.connection.execute(*args)
|
||||
except sqlite3.ProgrammingError:
|
||||
pass
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
|
||||
def rollback(self):
|
||||
"""
|
||||
Rollback changes after exception
|
||||
"""
|
||||
log.debug("Rollback transaction")
|
||||
try:
|
||||
self.connection.rollback()
|
||||
with self.lock:
|
||||
self.connection.rollback()
|
||||
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
@@ -62,7 +82,8 @@ class DatabaseConnector:
|
||||
Closes database file
|
||||
"""
|
||||
try:
|
||||
self.connection.close()
|
||||
with self.lock:
|
||||
self.connection.close()
|
||||
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
|
||||
@@ -21,9 +21,3 @@ class DatabaseRequest:
|
||||
def __init__(self, _type: DatabaseActionType, data):
|
||||
self.type = _type
|
||||
self.data = data
|
||||
|
||||
def get_type(self):
|
||||
return self.type
|
||||
|
||||
def get_data(self):
|
||||
return self.data
|
||||
|
||||
@@ -11,7 +11,10 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import sqlite3
|
||||
from time import time
|
||||
|
||||
from thingsboard_gateway.storage.event_storage import EventStorage
|
||||
from thingsboard_gateway.storage.sqlite.database import Database
|
||||
from queue import Queue
|
||||
from thingsboard_gateway.storage.sqlite.database_request import DatabaseRequest
|
||||
@@ -24,96 +27,56 @@ from logging import getLogger
|
||||
log = getLogger("storage")
|
||||
|
||||
|
||||
class StorageHandler:
|
||||
class StorageHandler(EventStorage):
|
||||
"""
|
||||
HIGH level api for thingsboard_gateway main loop
|
||||
"""
|
||||
|
||||
def __init__(self, config):
|
||||
log.info("Sqlite Storage initializing...")
|
||||
|
||||
self.db = Database(config)
|
||||
|
||||
# We need queues to stay atomic when multiple connectors/Threads are
|
||||
# trying to write or read from database
|
||||
log.info("Initializing read and process queues")
|
||||
self.processQueue = Queue(-1)
|
||||
self.readQueue = Queue(-1)
|
||||
|
||||
self.db = Database(config, self.processQueue)
|
||||
|
||||
self.db.setReadQueue(self.readQueue)
|
||||
self.db.setProcessQueue(self.processQueue)
|
||||
|
||||
# Create table if not exists for connected devices
|
||||
self.db.create_connected_devices_table()
|
||||
self.connected_devices = self.get_connected_devices()
|
||||
self.db.init_table()
|
||||
log.info("Sqlite storage initialized!")
|
||||
self.delete_time_point = None
|
||||
self.last_read = time()
|
||||
self.closed = False
|
||||
|
||||
def get_connected_devices(self):
|
||||
"""
|
||||
Util func, to only parse and store connected devices names in a list
|
||||
"""
|
||||
_type = DatabaseActionType.READ_CONNECTED_DEVICES
|
||||
data = self
|
||||
req = DatabaseRequest(_type, data)
|
||||
self.processQueue.put(req)
|
||||
def get_event_pack(self):
|
||||
self.delete_time_point = self.last_read
|
||||
data_from_storage = self.read_data(self.last_read)
|
||||
self.last_read = time()
|
||||
|
||||
self.db.process()
|
||||
return [item[0] for item in data_from_storage or []]
|
||||
|
||||
return self.connected_devices
|
||||
def event_pack_processing_done(self):
|
||||
self.delete_data(self.delete_time_point)
|
||||
|
||||
def readAll(self, deviceName):
|
||||
return self.db.readAll(deviceName)
|
||||
def read_data(self, ts):
|
||||
return self.db.read_data(ts)
|
||||
|
||||
def readFrom(self, deviceName, ts):
|
||||
return self.db.readFrom(deviceName, ts)
|
||||
def delete_data(self, ts):
|
||||
return self.db.delete_data(ts)
|
||||
|
||||
def put(self, message):
|
||||
try:
|
||||
|
||||
device_name = message.get("deviceName")
|
||||
|
||||
if device_name is not None and device_name not in self.connected_devices:
|
||||
self.db.create_device_table(device_name)
|
||||
|
||||
_type = DatabaseActionType.WRITE_DATA_STORAGE
|
||||
request = DatabaseRequest(_type, message)
|
||||
|
||||
log.info("Sending data to storage")
|
||||
self.processQueue.put(request)
|
||||
|
||||
# Left for discussion
|
||||
log.debug("data %s from device %s " % (str(self.connected_devices[device_name]), device_name))
|
||||
self.connected_devices[device_name]["data_saved_index"] += 1
|
||||
|
||||
storageIndex = self.connected_devices[device_name]["data_saved_index"]
|
||||
data = (device_name, storageIndex)
|
||||
_type = DatabaseActionType.WRITE_STORAGE_INDEX
|
||||
log.debug("Index request data: %s" % str(data))
|
||||
index_request = DatabaseRequest(_type, data)
|
||||
|
||||
log.debug("Updating device storage index")
|
||||
self.processQueue.put(index_request)
|
||||
|
||||
self.db.process() # This call is necessary
|
||||
self.db.process()
|
||||
return True
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
|
||||
def add_device(self, deviceName, connector, deviceType=None):
|
||||
|
||||
self.db.add_new_connecting_device(deviceName, connector, deviceType)
|
||||
# Update connected devices list
|
||||
self.connected_devices = self.get_connected_devices()
|
||||
|
||||
# Create device table
|
||||
self.db.create_device_table(deviceName)
|
||||
|
||||
def del_device(self, device_name):
|
||||
|
||||
self.db.del_connected_device(device_name)
|
||||
|
||||
# Update connected devices list
|
||||
self.connected_devices = self.get_connected_devices()
|
||||
|
||||
def closeDB(self):
|
||||
def close_db(self):
|
||||
self.db.closeDB()
|
||||
self.closed = True
|
||||
|
||||
@@ -17,9 +17,3 @@ class StorageSettings:
|
||||
self.data_folder_path = config.get("data_file_path", "./")
|
||||
self.max_days_to_store_data = config.get("max_days_to_store_data", 7)
|
||||
self.check_data_freshness_in_messages = config.get('check_data_freshness_in_messages', 10)
|
||||
|
||||
def get_data_file_path(self):
|
||||
return self.data_folder_path
|
||||
|
||||
def get_max_days_to_store_data(self):
|
||||
return self.max_days_to_store_data
|
||||
|
||||
Reference in New Issue
Block a user