mirror of
https://github.com/thingsboard/thingsboard-gateway
synced 2025-10-26 22:31:42 +08:00
Some fixes for file storage.
This commit is contained in:
@@ -181,15 +181,8 @@ class TBGatewayService:
|
||||
if events:
|
||||
for event in events:
|
||||
current_event = loads(event)
|
||||
if current_event["deviceName"] not in self.get_devices():
|
||||
self.add_device(current_event["deviceName"],
|
||||
{"current_event": current_event["deviceName"]}, wait_for_publish=True)
|
||||
else:
|
||||
self.update_device(current_event["deviceName"],
|
||||
"current_event",
|
||||
current_event["deviceName"])
|
||||
if current_event.get("telemetry"):
|
||||
log.debug(current_event)
|
||||
# log.debug(current_event)
|
||||
telemetry = {}
|
||||
if type(current_event["telemetry"]) == list:
|
||||
for item in current_event["telemetry"]:
|
||||
@@ -201,15 +194,15 @@ class TBGatewayService:
|
||||
for telemetry_key in telemetry:
|
||||
if telemetry[telemetry_key] is not None:
|
||||
filtered_telemetry[telemetry_key] = telemetry[telemetry_key]
|
||||
log.debug(telemetry)
|
||||
data_to_send = loads('{"ts": %f,"values": %s}' % (int(time.time()*1000), dumps(telemetry)))
|
||||
# log.debug(telemetry)
|
||||
data_to_send = loads('{"ts": %f,"values": %s}' % (int(time.time()*1000), dumps(filtered_telemetry)))
|
||||
# data_to_send = loads('{"ts": %f,"values": {%s}}' % (int(time.time()*1000),
|
||||
# ','.join(dumps(param) for param in current_event["telemetry"])))
|
||||
if filtered_telemetry != {}:
|
||||
self.__published_events.append(self.tb_client.client.gw_send_telemetry(current_event["deviceName"],
|
||||
data_to_send))
|
||||
if current_event.get("attributes"):
|
||||
log.debug(current_event)
|
||||
# log.debug(current_event)
|
||||
attributes = {}
|
||||
if type(current_event["attributes"]) == list:
|
||||
for item in current_event["attributes"]:
|
||||
@@ -221,8 +214,8 @@ class TBGatewayService:
|
||||
for attribute_key in attributes:
|
||||
if attributes[attribute_key] is not None:
|
||||
filtered_attributes[attribute_key] = attributes[attribute_key]
|
||||
log.debug(attributes)
|
||||
data_to_send = loads('%s' % dumps(attributes))
|
||||
# log.debug(attributes)
|
||||
data_to_send = loads('%s' % dumps(filtered_attributes))
|
||||
if filtered_attributes != {}:
|
||||
self.__published_events.append(self.tb_client.client.gw_send_attributes(current_event["deviceName"],
|
||||
data_to_send))
|
||||
@@ -290,17 +283,18 @@ class TBGatewayService:
|
||||
log.debug(content)
|
||||
|
||||
def add_device(self, device_name, content, wait_for_publish=False):
|
||||
self.__connected_devices[device_name] = content
|
||||
if wait_for_publish:
|
||||
self.tb_client.client.gw_connect_device(device_name).wait_for_publish()
|
||||
else:
|
||||
self.tb_client.client.gw_connect_device(device_name)
|
||||
self.__save_persistent_devices()
|
||||
if device_name not in self.__connected_devices:
|
||||
self.__connected_devices[device_name] = content
|
||||
if wait_for_publish:
|
||||
self.tb_client.client.gw_connect_device(device_name).wait_for_publish()
|
||||
else:
|
||||
self.tb_client.client.gw_connect_device(device_name)
|
||||
self.__save_persistent_devices()
|
||||
|
||||
def update_device(self, device_name, event, content):
|
||||
self.__connected_devices[device_name][event] = content
|
||||
if event == 'connector':
|
||||
if event == 'connector' and self.__connected_devices[device_name].get(event) != content:
|
||||
self.__save_persistent_devices()
|
||||
self.__connected_devices[device_name][event] = content
|
||||
|
||||
def del_device(self, device_name):
|
||||
del self.__connected_devices[device_name]
|
||||
|
||||
@@ -22,5 +22,5 @@ class EventStorageFiles:
|
||||
return self.state_file
|
||||
|
||||
def get_data_files(self):
|
||||
return self.data_files
|
||||
return sorted(self.data_files)
|
||||
|
||||
|
||||
@@ -39,8 +39,8 @@ class EventStorageReader:
|
||||
|
||||
def read(self):
|
||||
log.debug("{} -- [{}:{}] Check for new messages in storage".format(str(self.name) +
|
||||
'_reader', self.settings.get_data_folder_path() + self.new_pos.get_file(), self.new_pos.get_line()))
|
||||
if self.current_batch is not None and self.current_pos != self.new_pos:
|
||||
'_reader', self.settings.get_data_folder_path() + self.current_pos.get_file(), self.current_pos.get_line()))
|
||||
if self.current_batch is not None and self.current_batch:
|
||||
log.debug("The previous batch was not discarded!")
|
||||
return self.current_batch
|
||||
self.current_batch = []
|
||||
@@ -49,29 +49,30 @@ class EventStorageReader:
|
||||
while records_to_read > 0:
|
||||
try:
|
||||
current_line_in_file = self.new_pos.get_line()
|
||||
reader = self.get_or_init_buffered_reader(self.new_pos)
|
||||
line = reader.readline()
|
||||
self.buffered_reader = self.get_or_init_buffered_reader(self.new_pos)
|
||||
line = self.buffered_reader.readline()
|
||||
while line != b'':
|
||||
try:
|
||||
self.current_batch.append(base64.b64decode(line).decode("utf-8"))
|
||||
records_to_read -= 1
|
||||
except IOError as e:
|
||||
log.warning("Could not parse line [{}] to uplink message!".format(line), e)
|
||||
log.warning("Could not parse line [%s] to uplink message! %s", line, e)
|
||||
finally:
|
||||
current_line_in_file += 1
|
||||
if records_to_read > 0:
|
||||
line = reader.readline()
|
||||
line = self.buffered_reader.readline()
|
||||
self.new_pos.set_line(current_line_in_file)
|
||||
if records_to_read == 0:
|
||||
break
|
||||
|
||||
if current_line_in_file >= self.settings.get_max_records_per_file():
|
||||
if current_line_in_file >= self.settings.get_max_records_per_file()-1:
|
||||
next_file = self.get_next_file(self.files, self.new_pos)
|
||||
if next_file is not None:
|
||||
if self.buffered_reader is not None:
|
||||
self.buffered_reader.close()
|
||||
self.buffered_reader = None
|
||||
self.new_pos = EventStorageReaderPointer(next_file, 0)
|
||||
self.write_info_to_state_file(self.new_pos)
|
||||
else:
|
||||
# No more records to read for now
|
||||
break
|
||||
@@ -86,33 +87,39 @@ class EventStorageReader:
|
||||
except IOError as e:
|
||||
log.warning("[{}] Failed to read file!".format(self.new_pos.get_file(), e))
|
||||
break
|
||||
log.debug("{} -- Got {} messages from storage".format(str(self.name) + '_reader', len(self.current_batch)))
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
# log.debug("{} -- Got {} messages from storage".format(str(self.name) + '_reader', len(self.current_batch)))
|
||||
return self.current_batch
|
||||
|
||||
def discard_batch(self):
|
||||
if self.current_pos.get_line() == self.settings.get_max_records_per_file()-1:
|
||||
if self.buffered_reader is not None and not self.buffered_reader.closed:
|
||||
self.buffered_reader.flush()
|
||||
self.buffered_reader.close()
|
||||
try:
|
||||
# if self.current_pos.get_line() == self.settings.get_max_records_per_file():
|
||||
if self.current_pos.get_line() >= self.settings.get_max_records_per_file():
|
||||
if self.buffered_reader is not None and not self.buffered_reader.closed:
|
||||
self.buffered_reader.close()
|
||||
self.delete_read_file(self.current_pos.get_file())
|
||||
if len(self.files.get_data_files()) == 0:
|
||||
os.remove(self.settings.get_data_folder_path() + self.files.get_state_file())
|
||||
self.write_info_to_state_file(self.current_pos)
|
||||
self.current_pos = copy.deepcopy(self.new_pos)
|
||||
self.current_batch = None
|
||||
if len(self.files.get_data_files()) == 0:
|
||||
os.remove(self.settings.get_data_folder_path() + self.files.get_state_file())
|
||||
# self.write_info_to_state_file(self.current_pos)
|
||||
self.current_pos = copy.deepcopy(self.new_pos)
|
||||
self.current_batch = None
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
# TODO add logging of flushing reader with try expression
|
||||
|
||||
def get_next_file(self, files: EventStorageFiles, new_pos: EventStorageReaderPointer):
|
||||
found = False
|
||||
for file in files.get_data_files():
|
||||
data_files = files.get_data_files()
|
||||
target_file = None
|
||||
for file_index in range(len(data_files)):
|
||||
if found:
|
||||
return file
|
||||
if file == new_pos.get_file():
|
||||
target_file = data_files[file_index]
|
||||
break
|
||||
if data_files[file_index] == new_pos.get_file():
|
||||
found = True
|
||||
if found:
|
||||
return None
|
||||
else:
|
||||
return files.get_data_files()[0]
|
||||
return target_file
|
||||
|
||||
|
||||
def get_or_init_buffered_reader(self, pointer):
|
||||
try:
|
||||
@@ -167,20 +174,23 @@ class EventStorageReader:
|
||||
try:
|
||||
state_file_node = {'file': pointer.get_file(), 'position': pointer.get_line()}
|
||||
with open(self.settings.get_data_folder_path() + self.files.get_state_file(), 'w') as outfile:
|
||||
json.dump(state_file_node, outfile)
|
||||
outfile.write(json.dumps(state_file_node))
|
||||
except IOError as e:
|
||||
log.warning("Failed to update state file!", e)
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
|
||||
def delete_read_file(self, current_file):
|
||||
if os.path.exists(self.settings.get_data_folder_path() + current_file):
|
||||
os.remove(self.settings.get_data_folder_path() + current_file)
|
||||
try:
|
||||
self.files.get_data_files().pop(0)
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
log.info("{} -- Cleanup old data file: {}!".format(str(self.name) + '_reader', self.settings.get_data_folder_path() + current_file))
|
||||
data_files = self.files.get_data_files()
|
||||
if len(data_files) > 2:
|
||||
for current_file in data_files[:-2]:
|
||||
if os.path.exists(self.settings.get_data_folder_path() + current_file):
|
||||
os.remove(self.settings.get_data_folder_path() + current_file)
|
||||
try:
|
||||
data_files.pop(0)
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
log.info("{} -- Cleanup old data file: {}!".format(str(self.name) + '_reader', self.settings.get_data_folder_path() + current_file))
|
||||
|
||||
def destroy(self):
|
||||
if self.buffered_reader is not None:
|
||||
|
||||
@@ -55,7 +55,7 @@ class EventStorageWriter:
|
||||
self.files.get_data_files().append(self.current_file)
|
||||
self.current_file_records_count = 0
|
||||
try:
|
||||
if self.buffered_writer is not None or self.buffered_writer.closed is False:
|
||||
if self.buffered_writer is not None and self.buffered_writer.closed is False:
|
||||
self.buffered_writer.close()
|
||||
except IOError as e:
|
||||
log.warning("Failed to close buffered writer!", e)
|
||||
@@ -107,7 +107,11 @@ class EventStorageWriter:
|
||||
raise RuntimeError("Failed to initialize buffered writer!", e)
|
||||
|
||||
def create_datafile(self):
|
||||
return self.create_file('data_', str(round(time.time() * 1000)))
|
||||
prefix = 'data_'
|
||||
datafile_name = str(round(time.time() * 1000))
|
||||
|
||||
self.files.data_files.append(prefix + datafile_name + '.txt')
|
||||
return self.create_file(prefix, datafile_name)
|
||||
|
||||
def create_file(self, prefix, filename):
|
||||
file_path = self.settings.get_data_folder_path() + prefix + filename + '.txt'
|
||||
|
||||
Reference in New Issue
Block a user