1
0
mirror of https://github.com/thingsboard/thingsboard-gateway synced 2025-10-26 22:31:42 +08:00

Improvements for connectors/converters import

This commit is contained in:
zbeacon
2020-01-28 16:13:25 +02:00
parent 95c1c0c2af
commit 0eb1346039
2 changed files with 57 additions and 61 deletions

View File

@@ -75,7 +75,7 @@ class TBGatewayService:
"file": FileEventStorage,
}
self._event_storage = self._event_storage_types[config["storage"]["type"]](config["storage"])
self._connectors_configs = {}
self.connectors_configs = {}
self._load_connectors(config)
self._connect_with_connectors()
self.__remote_configurator = None
@@ -109,12 +109,11 @@ class TBGatewayService:
except Exception as e:
log.exception(e)
break
if not self.__request_config_after_connect and \
if not self.__request_config_after_connect and \
self.tb_client.is_connected() and not self.tb_client.client.get_subscriptions_in_progress():
self.__request_config_after_connect = True
self.__check_shared_attributes()
if cur_time - gateway_statistic_send > 60.0 and self.tb_client.is_connected():
summary_messages = {"eventsProduced": 0, "eventsSent": 0}
telemetry = {}
@@ -133,7 +132,7 @@ class TBGatewayService:
self.tb_client.client.send_telemetry(summary_messages)
gateway_statistic_send = time.time()
# self.__check_shared_attributes()
except KeyboardInterrupt as e:
except KeyboardInterrupt:
log.info("Stopping...")
self.__close_connectors()
log.info("The gateway has been stopped.")
@@ -155,6 +154,7 @@ class TBGatewayService:
def _attributes_parse(self, content, *args):
try:
log.debug("Received data: %s", content)
log.debug(args)
if content is not None:
shared_attributes = content.get("shared")
client_attributes = content.get("client")
@@ -190,33 +190,33 @@ class TBGatewayService:
self.tb_client.client.request_attributes(callback=self._attributes_parse)
def _load_connectors(self, main_config):
self._connectors_configs = {}
self.connectors_configs = {}
if not main_config.get("connectors"):
raise Exception("Configuration for connectors not found, check your config file.")
for connector in main_config['connectors']:
try:
if connector.get('class') is not None:
try:
connector_class = TBUtility.check_and_import(connector['type'], connector['class'])
self._implemented_connectors[connector['type']] = connector_class
except Exception as e:
log.error("Exception when loading the custom connector:")
log.exception(e)
elif connector.get("type") is not None and connector["type"] in self._default_connectors:
try:
connector_class = TBUtility.check_and_import(connector["type"], self._default_connectors[connector["type"]], default=True)
self._implemented_connectors[connector["type"]] = connector_class
except Exception as e:
log.error("Error on loading default connector:")
log.exception(e)
else:
log.error("Connector with config %s - not found", safe_dump(connector))
# if connector.get('class') is not None:
# try:
# connector_class = TBUtility.check_and_import(connector['type'], connector['class'])
# self._implemented_connectors[connector['type']] = connector_class
# except Exception as e:
# log.error("Exception when loading the custom connector:")
# log.exception(e)
# elif connector.get("type") is not None and connector["type"] in self._default_connectors:
try:
connector_class = TBUtility.check_and_import(connector["type"], self._default_connectors.get(connector["type"], connector.get("class")))
self._implemented_connectors[connector["type"]] = connector_class
except Exception as e:
log.error("Error on loading connector:")
log.exception(e)
# else:
# log.error("Connector with config %s - not found", safe_dump(connector))
with open(self._config_dir + connector['configuration'], 'r') as conf_file:
try:
connector_conf = load(conf_file)
if not self._connectors_configs.get(connector['type']):
self._connectors_configs[connector['type']] = []
self._connectors_configs[connector['type']].append({"name": connector["name"], "config": {connector['configuration']: connector_conf}})
if not self.connectors_configs.get(connector['type']):
self.connectors_configs[connector['type']] = []
self.connectors_configs[connector['type']].append({"name": connector["name"], "config": {connector['configuration']: connector_conf}})
except Exception as e:
log.exception(e)
@@ -224,8 +224,8 @@ class TBGatewayService:
log.exception(e)
def _connect_with_connectors(self):
for connector_type in self._connectors_configs:
for connector_config in self._connectors_configs[connector_type]:
for connector_type in self.connectors_configs:
for connector_config in self.connectors_configs[connector_type]:
for config in connector_config["config"]:
try:
connector = None
@@ -246,9 +246,6 @@ class TBGatewayService:
self.tb_client.client.gw_send_telemetry()
def send_to_storage(self, connector_name, data):
self._send_to_storage(connector_name, data)
def _send_to_storage(self, connector_name, data):
if not connector_name == self.name:
if not TBUtility.validate_converted_data(data):
log.error("Data from %s connector is invalid.", connector_name)
@@ -355,7 +352,6 @@ class TBGatewayService:
time.sleep(.01)
else:
time.sleep(.1)
# self.__request_config_after_connect = False
except Exception as e:
log.exception(e)
time.sleep(1)
@@ -391,7 +387,7 @@ class TBGatewayService:
if connector is not None:
connector.server_side_rpc_handler(content)
else:
log.error("Received RPC request but connector for device %s not found. Request data: \n %s",
log.error("Received RPC request but connector for the device %s not found. Request data: \n %s",
content["device"],
dumps(content))
else:
@@ -417,6 +413,7 @@ class TBGatewayService:
def _attribute_update_callback(self, content, *args):
log.debug("Attribute request received with content: \"%s\"", content)
log.debug(args)
if content.get('device') is not None:
try:
self.__connected_devices[content["device"]]["connector"].on_attributes_update(content)
@@ -491,4 +488,4 @@ class TBGatewayService:
if __name__ == '__main__':
TBGatewayService(path.dirname(path.dirname(path.abspath(__file__))) + '/config/tb_gateway.yaml')
TBGatewayService(path.dirname(path.dirname(path.abspath(__file__))) + path.sep + 'config' + path.sep +'tb_gateway.yaml')