1
0
mirror of https://github.com/thingsboard/thingsboard-gateway synced 2025-10-26 22:31:42 +08:00

Refactored

This commit is contained in:
zbeacon
2020-01-28 16:54:19 +02:00
parent 0eb1346039
commit e810bb6f9a
2 changed files with 47 additions and 70 deletions

View File

@@ -59,12 +59,16 @@ class RemoteConfigurator:
self.__update_logs_configuration()
if self.__old_configuration != decoded_configuration:
log.info("Remote configuration received: \n %s", decoded_configuration)
self.__process_connectors_configuration()
self.__old_configuration = self.__new_configuration
# self.send_current_configuration()
self.in_process = False
result = self.__process_connectors_configuration()
self.in_process = False
if result:
self.__old_configuration = self.__new_configuration
return True
else:
return False
else:
log.error("Remote configuration is already in processing")
return False
except Exception as e:
self.in_process = False
log.exception(e)
@@ -89,7 +93,7 @@ class RemoteConfigurator:
log.exception(e)
def __process_connectors_configuration(self):
log.debug("Processing remote connectors configuration...")
log.info("Processing remote connectors configuration...")
if self.__apply_new_connectors_configuration():
self.__write_new_configuration_files()
if self.__safe_apply_connection_configuration():

View File

@@ -38,10 +38,10 @@ main_handler = logging.handlers.MemoryHandler(-1)
class TBGatewayService:
def __init__(self, config_file=None):
if config_file is None:
config_file = path.dirname(path.dirname(path.abspath(__file__))) + '/config/tb_gateway.yaml'
config_file = path.dirname(path.dirname(path.abspath(__file__))) + '/config/tb_gateway.yaml'.replace('/', path.sep)
with open(config_file) as general_config:
config = safe_load(general_config)
self._config_dir = path.dirname(path.abspath(config_file)) + '/'
self._config_dir = path.dirname(path.abspath(config_file)) + path.sep
logging.config.fileConfig(self._config_dir + "logs.conf")
global log
log = logging.getLogger('service')
@@ -161,10 +161,11 @@ class TBGatewayService:
new_configuration = shared_attributes.get("configuration") if shared_attributes is not None and shared_attributes.get("configuration") is not None else content.get("configuration")
if new_configuration is not None and self.__remote_configurator is not None:
try:
self.__remote_configurator.process_configuration(new_configuration)
self.__send_thread = Thread(target=self.__read_data_from_storage, daemon=True,
name="Send data to Thingsboard Thread")
self.__send_thread.start()
confirmed = self.__remote_configurator.process_configuration(new_configuration)
if confirmed:
self.__send_thread = Thread(target=self.__read_data_from_storage, daemon=True,
name="Send data to Thingsboard Thread")
self.__send_thread.start()
self.__remote_configurator.send_current_configuration()
except Exception as e:
log.exception(e)
@@ -195,32 +196,15 @@ class TBGatewayService:
raise Exception("Configuration for connectors not found, check your config file.")
for connector in main_config['connectors']:
try:
# if connector.get('class') is not None:
# try:
# connector_class = TBUtility.check_and_import(connector['type'], connector['class'])
# self._implemented_connectors[connector['type']] = connector_class
# except Exception as e:
# log.error("Exception when loading the custom connector:")
# log.exception(e)
# elif connector.get("type") is not None and connector["type"] in self._default_connectors:
try:
connector_class = TBUtility.check_and_import(connector["type"], self._default_connectors.get(connector["type"], connector.get("class")))
self._implemented_connectors[connector["type"]] = connector_class
except Exception as e:
log.error("Error on loading connector:")
log.exception(e)
# else:
# log.error("Connector with config %s - not found", safe_dump(connector))
connector_class = TBUtility.check_and_import(connector["type"], self._default_connectors.get(connector["type"], connector.get("class")))
self._implemented_connectors[connector["type"]] = connector_class
with open(self._config_dir + connector['configuration'], 'r') as conf_file:
try:
connector_conf = load(conf_file)
if not self.connectors_configs.get(connector['type']):
self.connectors_configs[connector['type']] = []
self.connectors_configs[connector['type']].append({"name": connector["name"], "config": {connector['configuration']: connector_conf}})
except Exception as e:
log.exception(e)
connector_conf = load(conf_file)
if not self.connectors_configs.get(connector['type']):
self.connectors_configs[connector['type']] = []
self.connectors_configs[connector['type']].append({"name": connector["name"], "config": {connector['configuration']: connector_conf}})
except Exception as e:
log.error("Error on loading connector:")
log.exception(e)
def _connect_with_connectors(self):
@@ -229,21 +213,15 @@ class TBGatewayService:
for config in connector_config["config"]:
try:
connector = None
try:
connector = self._implemented_connectors[connector_type](self, connector_config["config"][config],
connector_type)
connector.setName(connector_config["name"])
self.available_connectors[connector.get_name()] = connector
connector.open()
except Exception as e:
log.exception(e)
if connector is not None:
connector.close()
connector = self._implemented_connectors[connector_type](self, connector_config["config"][config],
connector_type)
connector.setName(connector_config["name"])
self.available_connectors[connector.get_name()] = connector
connector.open()
except Exception as e:
log.exception(e)
def __send_statistic(self):
self.tb_client.client.gw_send_telemetry()
if connector is not None:
connector.close()
def send_to_storage(self, connector_name, data):
if not connector_name == self.name:
@@ -273,12 +251,19 @@ class TBGatewayService:
json_data = dumps(data)
save_result = self._event_storage.put(json_data)
if not save_result:
log.error('Data from device "%s" cannot be saved, connector name is %s.',
log.error('Data from the device "%s" cannot be saved, connector name is %s.',
data["deviceName"],
connector_name)
def check_size(self, size, devices_data_in_event_pack):
if size >= 48000:
self.__send_data(devices_data_in_event_pack)
size = 0
return size
def __read_data_from_storage(self):
devices_data_in_event_pack = {}
log.debug("Send data Thread has been started successfully.")
while True:
try:
if self.tb_client.is_connected():
@@ -298,41 +283,29 @@ class TBGatewayService:
if type(current_event["telemetry"]) == list:
for item in current_event["telemetry"]:
size += getsizeof(item)
if size >= 48000:
if not self.tb_client.is_connected(): break
self.__send_data(devices_data_in_event_pack)
size = 0
size = self.check_size(size, devices_data_in_event_pack)
devices_data_in_event_pack[current_event["deviceName"]]["telemetry"].append(item)
else:
if not self.tb_client.is_connected(): break
size += getsizeof(current_event["telemetry"])
if size >= 48000:
self.__send_data(devices_data_in_event_pack)
size = 0
size = self.check_size(size, devices_data_in_event_pack)
devices_data_in_event_pack[current_event["deviceName"]]["telemetry"].append(current_event["telemetry"])
if current_event.get("attributes"):
if type(current_event["attributes"]) == list:
for item in current_event["attributes"]:
if not self.tb_client.is_connected(): break
size += getsizeof(item)
if size >= 48000:
self.__send_data(devices_data_in_event_pack)
size = 0
devices_data_in_event_pack[current_event["deviceName"]][
"attributes"].update(
item.items())
size = self.check_size(size, devices_data_in_event_pack)
devices_data_in_event_pack[current_event["deviceName"]]["attributes"].update(item.items())
else:
if not self.tb_client.is_connected(): break
size += getsizeof(current_event["attributes"].items())
if size >= 48000:
self.__send_data(devices_data_in_event_pack)
size = 0
size = self.check_size(size, devices_data_in_event_pack)
devices_data_in_event_pack[current_event["deviceName"]]["attributes"].update(
current_event["attributes"].items())
if devices_data_in_event_pack:
if not self.tb_client.is_connected(): break
self.__send_data(devices_data_in_event_pack)
if self.tb_client.is_connected():
success = True
while not self.__published_events.empty():
@@ -365,14 +338,14 @@ class TBGatewayService:
else:
self.__published_events.put(self.tb_client.client.gw_send_attributes(device,
devices_data_in_event_pack[
device][
"attributes"]))
device][
"attributes"]))
if devices_data_in_event_pack[device].get("telemetry"):
if device == self.name:
self.__published_events.put(self.tb_client.client.send_telemetry(devices_data_in_event_pack[device]["telemetry"]))
else:
self.__published_events.put(self.tb_client.client.gw_send_telemetry(device,
devices_data_in_event_pack[
devices_data_in_event_pack[
device][
"telemetry"]))
devices_data_in_event_pack[device] = {"telemetry": [], "attributes": {}}
@@ -488,4 +461,4 @@ class TBGatewayService:
if __name__ == '__main__':
TBGatewayService(path.dirname(path.dirname(path.abspath(__file__))) + path.sep + 'config' + path.sep +'tb_gateway.yaml')
TBGatewayService(path.dirname(path.dirname(path.abspath(__file__))) + '/config/tb_gateway.yaml'.replace('/', path.sep))