mirror of
https://github.com/thingsboard/thingsboard-gateway
synced 2025-10-26 22:31:42 +08:00
Some improvements and fixes for file storage.
This commit is contained in:
@@ -93,6 +93,7 @@ class MqttConnector(Connector, Thread):
|
||||
time.sleep(10)
|
||||
except Exception as e:
|
||||
log.error(e)
|
||||
time.sleep(10)
|
||||
|
||||
except Exception as e:
|
||||
log.error(e)
|
||||
|
||||
@@ -66,7 +66,7 @@ class TBGatewayService:
|
||||
self.__load_connectors(config)
|
||||
self.__connect_with_connectors()
|
||||
self.__load_persistent_devices()
|
||||
self.__send_thread = Thread(target=self.__read_data_from_storage, daemon=True)
|
||||
self.__send_thread = Thread(target=self.__read_data_from_storage, daemon=True, name="Send data to Thingsboard Thread")
|
||||
self.__event_storage = self.__event_storage_types[config["storage"]["type"]](config["storage"])
|
||||
self.tb_client.connect()
|
||||
self.tb_client.client.gw_set_server_side_rpc_request_handler(self.__rpc_request_handler)
|
||||
@@ -197,12 +197,17 @@ class TBGatewayService:
|
||||
telemetry[key] = item[key]
|
||||
else:
|
||||
telemetry = current_event["telemetry"]
|
||||
filtered_telemetry = {}
|
||||
for telemetry_key in telemetry:
|
||||
if telemetry[telemetry_key] is not None:
|
||||
filtered_telemetry[telemetry_key] = telemetry[telemetry_key]
|
||||
log.debug(telemetry)
|
||||
data_to_send = loads('{"ts": %f,"values": %s}' % (int(time.time()*1000), dumps(telemetry)))
|
||||
# data_to_send = loads('{"ts": %f,"values": {%s}}' % (int(time.time()*1000),
|
||||
# ','.join(dumps(param) for param in current_event["telemetry"])))
|
||||
self.__published_events.append(self.tb_client.client.gw_send_telemetry(current_event["deviceName"],
|
||||
data_to_send))
|
||||
if filtered_telemetry != {}:
|
||||
self.__published_events.append(self.tb_client.client.gw_send_telemetry(current_event["deviceName"],
|
||||
data_to_send))
|
||||
if current_event.get("attributes"):
|
||||
log.debug(current_event)
|
||||
attributes = {}
|
||||
@@ -212,16 +217,22 @@ class TBGatewayService:
|
||||
attributes[key] = item[key]
|
||||
else:
|
||||
attributes = current_event["attributes"]
|
||||
filtered_attributes = {}
|
||||
for attribute_key in attributes:
|
||||
if attributes[attribute_key] is not None:
|
||||
filtered_attributes[attribute_key] = attributes[attribute_key]
|
||||
log.debug(attributes)
|
||||
data_to_send = loads('%s' % dumps(attributes))
|
||||
self.__published_events.append(self.tb_client.client.gw_send_attributes(current_event["deviceName"],
|
||||
data_to_send))
|
||||
if filtered_attributes != {}:
|
||||
self.__published_events.append(self.tb_client.client.gw_send_attributes(current_event["deviceName"],
|
||||
data_to_send))
|
||||
success = True
|
||||
for event in range(len(self.__published_events)):
|
||||
result = self.__published_events[event].get()
|
||||
success = result == self.__published_events[event].TB_ERR_SUCCESS
|
||||
if success:
|
||||
self.__event_storage.event_pack_processing_done()
|
||||
log.debug("Run event pack processing done")
|
||||
else:
|
||||
time.sleep(1)
|
||||
except Exception as e:
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
class EventStorageFiles:
|
||||
def __init__(self, state_file, data_files):
|
||||
self.state_file = state_file
|
||||
|
||||
@@ -13,21 +13,19 @@
|
||||
# limitations under the License.
|
||||
|
||||
import copy
|
||||
import logging
|
||||
import io
|
||||
import os
|
||||
import base64
|
||||
import json
|
||||
from time import sleep
|
||||
try:
|
||||
from json.decoder import JSONDecodeError
|
||||
except ImportError:
|
||||
from simplejson import JSONDecodeError
|
||||
from thingsboard_gateway.storage.event_storage_files import EventStorageFiles
|
||||
from thingsboard_gateway.storage.file_event_storage_settings import FileEventStorageSettings
|
||||
from thingsboard_gateway.storage.file_event_storage_settings import FileEventStorageSettings, log
|
||||
from thingsboard_gateway.storage.event_storage_reader_pointer import EventStorageReaderPointer
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class EventStorageReader:
|
||||
def __init__(self, name, files: EventStorageFiles, settings: FileEventStorageSettings):
|
||||
@@ -94,15 +92,15 @@ class EventStorageReader:
|
||||
def discard_batch(self):
|
||||
if (self.current_pos.get_line() + self.settings.get_max_read_records_count()) >= \
|
||||
self.settings.get_max_records_per_file():
|
||||
if self.buffered_reader is not None:
|
||||
if self.buffered_reader is not None and not self.buffered_reader.closed:
|
||||
self.buffered_reader.flush()
|
||||
self.buffered_reader.close()
|
||||
self.delete_read_file(self.current_pos.get_file())
|
||||
if len(self.files.get_data_files()) == 0:
|
||||
os.remove(self.settings.get_data_folder_path() + self.files.get_state_file())
|
||||
else:
|
||||
self.current_pos = copy.deepcopy(self.new_pos)
|
||||
self.write_info_to_state_file(self.current_pos)
|
||||
if len(self.files.get_data_files()) == 0:
|
||||
os.remove(self.settings.get_data_folder_path() + self.files.get_state_file())
|
||||
else:
|
||||
self.write_info_to_state_file(self.current_pos)
|
||||
self.current_pos = copy.deepcopy(self.new_pos)
|
||||
self.current_batch = None
|
||||
# TODO add logging of flushing reader with try expression
|
||||
|
||||
@@ -120,9 +118,11 @@ class EventStorageReader:
|
||||
|
||||
def get_or_init_buffered_reader(self, pointer):
|
||||
try:
|
||||
if self.buffered_reader is None:
|
||||
self.buffered_reader = io.BufferedReader(io.FileIO(
|
||||
self.settings.get_data_folder_path() + pointer.get_file(), 'r'))
|
||||
if self.buffered_reader is None or self.buffered_reader.closed:
|
||||
new_file_to_read_path = self.settings.get_data_folder_path() + pointer.get_file()
|
||||
while not os.path.exists(new_file_to_read_path):
|
||||
sleep(.1)
|
||||
self.buffered_reader = io.BufferedReader(io.FileIO(new_file_to_read_path, 'r'))
|
||||
lines_to_skip = pointer.get_line()
|
||||
if lines_to_skip > 0:
|
||||
while self.buffered_reader.readline() is not None:
|
||||
@@ -137,30 +137,33 @@ class EventStorageReader:
|
||||
raise RuntimeError("Failed to initialize buffered reader!", e)
|
||||
|
||||
def read_state_file(self):
|
||||
state_data_node = {}
|
||||
try:
|
||||
with io.BufferedReader(io.FileIO(self.settings.get_data_folder_path() +
|
||||
self.files.get_state_file(), 'r')) as br:
|
||||
state_data_node = json.load(br)
|
||||
except JSONDecodeError:
|
||||
log.error("Failed to decode JSON from state file")
|
||||
state_data_node = 0
|
||||
except IOError as e:
|
||||
log.warning("Failed to fetch info from state file!", e)
|
||||
reader_file = None
|
||||
reader_pos = 0
|
||||
if state_data_node:
|
||||
reader_pos = state_data_node['position']
|
||||
for file in sorted(self.files.get_data_files()):
|
||||
if file == state_data_node['file']:
|
||||
reader_file = file
|
||||
break
|
||||
if reader_file is None:
|
||||
reader_file = sorted(self.files.get_data_files())[0]
|
||||
state_data_node = {}
|
||||
try:
|
||||
with io.BufferedReader(io.FileIO(self.settings.get_data_folder_path() +
|
||||
self.files.get_state_file(), 'r')) as br:
|
||||
state_data_node = json.load(br)
|
||||
except JSONDecodeError:
|
||||
log.error("Failed to decode JSON from state file")
|
||||
state_data_node = 0
|
||||
except IOError as e:
|
||||
log.warning("Failed to fetch info from state file!", e)
|
||||
reader_file = None
|
||||
reader_pos = 0
|
||||
log.info("{} -- Initializing from state file: [{}:{}]".format(str(self.name) + '_reader',
|
||||
self.settings.get_data_folder_path() + reader_file, reader_pos))
|
||||
return EventStorageReaderPointer(reader_file, reader_pos)
|
||||
if state_data_node:
|
||||
reader_pos = state_data_node['position']
|
||||
for file in sorted(self.files.get_data_files()):
|
||||
if file == state_data_node['file']:
|
||||
reader_file = file
|
||||
break
|
||||
if reader_file is None:
|
||||
reader_file = sorted(self.files.get_data_files())[0]
|
||||
reader_pos = 0
|
||||
log.info("{} -- Initializing from state file: [{}:{}]".format(str(self.name) + '_reader',
|
||||
self.settings.get_data_folder_path() + reader_file, reader_pos))
|
||||
return EventStorageReaderPointer(reader_file, reader_pos)
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
|
||||
def write_info_to_state_file(self, pointer: EventStorageReaderPointer):
|
||||
try:
|
||||
@@ -169,6 +172,8 @@ class EventStorageReader:
|
||||
json.dump(state_file_node, outfile)
|
||||
except IOError as e:
|
||||
log.warning("Failed to update state file!", e)
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
|
||||
def delete_read_file(self, current_file):
|
||||
if os.path.exists(self.settings.get_data_folder_path() + current_file):
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
class EventStorageReaderPointer:
|
||||
def __init__(self, file, line):
|
||||
self.file = file
|
||||
|
||||
@@ -13,15 +13,12 @@
|
||||
# limitations under the License.
|
||||
|
||||
from thingsboard_gateway.storage.event_storage_files import EventStorageFiles
|
||||
from thingsboard_gateway.storage.file_event_storage_settings import FileEventStorageSettings
|
||||
import logging
|
||||
from thingsboard_gateway.storage.file_event_storage_settings import FileEventStorageSettings, log
|
||||
import time
|
||||
import io
|
||||
import os
|
||||
import base64
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class EventStorageWriter:
|
||||
def __init__(self, name, files: EventStorageFiles, settings: FileEventStorageSettings):
|
||||
@@ -100,7 +97,7 @@ class EventStorageWriter:
|
||||
|
||||
def get_or_init_buffered_writer(self, file):
|
||||
try:
|
||||
if self.buffered_writer is None:
|
||||
if self.buffered_writer is None or self.buffered_writer.closed:
|
||||
buffered_writer = io.BufferedWriter(io.FileIO(self.settings.get_data_folder_path() + file, 'a'))
|
||||
return buffered_writer
|
||||
else:
|
||||
|
||||
@@ -16,17 +16,17 @@ from thingsboard_gateway.storage.event_storage import EventStorage
|
||||
from thingsboard_gateway.storage.event_storage_files import EventStorageFiles
|
||||
from thingsboard_gateway.storage.event_storage_writer import EventStorageWriter
|
||||
from thingsboard_gateway.storage.event_storage_reader import EventStorageReader
|
||||
from thingsboard_gateway.storage.file_event_storage_settings import FileEventStorageSettings
|
||||
from thingsboard_gateway.storage.file_event_storage_settings import FileEventStorageSettings, log
|
||||
from random import choice
|
||||
from string import ascii_lowercase
|
||||
import os
|
||||
import time
|
||||
import logging
|
||||
import json
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class FileEventStorage(EventStorage):
|
||||
def __init__(self, name, config):
|
||||
def __init__(self, config, name=None):
|
||||
name = name if name is not None else ''.join(choice(ascii_lowercase) for _ in range(10))
|
||||
self.settings = FileEventStorageSettings(config)
|
||||
self.init_data_folder_if_not_exist()
|
||||
self.event_storage_files = self.init_data_files()
|
||||
@@ -36,7 +36,12 @@ class FileEventStorage(EventStorage):
|
||||
self.__reader = EventStorageReader(name, self.event_storage_files, self.settings)
|
||||
|
||||
def put(self, event):
|
||||
self.__writer.write(event)
|
||||
try:
|
||||
self.__writer.write(event)
|
||||
return True
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
return False
|
||||
|
||||
def get_event_pack(self):
|
||||
return self.__reader.read()
|
||||
|
||||
@@ -12,6 +12,11 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from logging import getLogger
|
||||
|
||||
log = getLogger('storage')
|
||||
|
||||
|
||||
class FileEventStorageSettings:
|
||||
def __init__(self, config):
|
||||
self.data_folder_path = config.get("data_folder_path", "./")
|
||||
|
||||
Reference in New Issue
Block a user