1
0
mirror of https://github.com/thingsboard/thingsboard-gateway synced 2025-10-26 22:31:42 +08:00

Added Gateway Shell

This commit is contained in:
samson0v
2022-12-05 14:36:55 +02:00
parent fa3c08c706
commit 733e98a1ff
6 changed files with 213 additions and 0 deletions

View File

@@ -0,0 +1,138 @@
import cmd
import multiprocessing.managers
import signal
import sys
from threading import Thread
from time import sleep
from thingsboard_gateway.gateway.tb_gateway_service import TBGatewayService
from thingsboard_gateway.gateway.tb_gateway_service import GatewayManager
class ShellSyntaxError(Exception):
pass
class Shell(cmd.Cmd, Thread):
prompt = '(gateway) |> '
def __init__(self, stdin=None, stdout=None):
cmd.Cmd.__init__(self, stdin=stdin, stdout=stdout)
Thread.__init__(self, name='Gateway Shell', daemon=False)
self.manager = GatewayManager(address='/tmp/gateway', authkey=b'gateway')
try:
self.manager.connect()
GatewayManager.register('get_gateway')
self.gateway: TBGatewayService = self.manager.get_gateway()
except (FileNotFoundError, AssertionError):
self.stdout.write('Gateway is not running!\n')
return
self.connected = False
self.stdout.write('Connecting to Gateway')
while not self.connected:
try:
self.gateway.ping()
self.stdout.write('\n')
self.connected = True
self.stdout.write('Connected to Gateway\n')
except (multiprocessing.managers.RemoteError, KeyError, AttributeError):
self.stdout.write('.')
self.gateway = self.manager.get_gateway()
self.manager.connect()
sleep(1)
self.interactive = sys.__stdin__.isatty()
if hasattr(signal, 'SIGINT'):
signal.signal(signal.SIGINT, self.interrupt)
self.start()
def run(self):
self.cmdloop()
@staticmethod
def parser(arg: str, args):
input_args = arg.strip().split(' ')
opt, val = None, None
try:
(opt, val) = input_args
except ValueError:
opt = input_args[0]
try:
for keys, func in args.items():
if opt in keys:
return func(val) if val else func()
except Exception as _:
raise ShellSyntaxError('Invalid syntax (see `help storage`)')
def _print(self, *args):
for arg in args:
if isinstance(arg, list):
self.stdout.write(','.join(arg))
elif isinstance(arg, dict):
for key, value in arg.items():
self.stdout.write(str(key).capitalize() + ': ' + str(value) + '\n')
else:
self.stdout.write(arg + '\n')
def do_storage(self, arg):
"""Storage
-n/--name: name of storage
-c/--count: events in storage
"""
args = {
('-n', '--name'): self.gateway.get_storage_name,
('-c', '--count'): self.gateway.get_storage_events_count
}
try:
self._print(self.parser(arg, args))
except ShellSyntaxError as e:
self.stdout.write(e.__str__() + '\n')
def do_connector(self, arg):
"""Connector
-a/--available: list of available connectors
-s/--status <name>: get connector status
-n/--name <name>: get connector name
-c/--config <name>:
"""
args = {
('-a', '--available'): self.gateway.get_available_connectors,
('-s', '--status'): self.gateway.get_connector_status,
('-n', '--name'): self.gateway.get_connector_name,
('-c', '--config'): self.gateway.get_connector_config
}
try:
self._print(self.parser(arg, args))
except ShellSyntaxError as e:
self.stdout.write(e.__str__() + '\n')
# ---
@staticmethod
def interrupt(_, __):
sys.stderr.write("\nKeyboard interrupt trapped - use EOF to end\n")
def precmd(self, line: str) -> str:
return line.strip()
def postcmd(self, stop: bool, line: str) -> bool:
return stop
def emptyline(self) -> bool:
pass
@staticmethod
def do_exit(_):
return -1
def do_EOF(self, args):
return self.do_exit(args)
if __name__ == '__main__':
shell = Shell()

View File

@@ -15,6 +15,7 @@
import logging
import logging.config
import logging.handlers
import multiprocessing.managers
from signal import signal, SIGINT
import subprocess
from copy import deepcopy
@@ -116,7 +117,29 @@ def get_env_variables():
return converted_env_variables
class GatewayManager(multiprocessing.managers.BaseManager):
def __init__(self, address=None, authkey=b''):
super().__init__(address=address, authkey=authkey)
self.gateway = None
def has_gateway(self):
return self.gateway is not None
def add_gateway(self, gateway):
self.gateway = gateway
class TBGatewayService:
EXPOSED_GETTERS = [
'ping',
'get_storage_name',
'get_storage_events_count',
'get_available_connectors',
'get_connector_status',
'get_connector_name',
'get_connector_config'
]
def __init__(self, config_file=None):
signal(SIGINT, lambda _, __: self.__stop_gateway())
@@ -269,6 +292,18 @@ class TBGatewayService:
self._watchers_thread = Thread(target=self._watchers, name='Watchers', daemon=True)
self._watchers_thread.start()
self.manager = GatewayManager(address='/tmp/gateway', authkey=b'gateway')
GatewayManager.register('get_gateway', self.get_gateway, exposed=self.EXPOSED_GETTERS)
self.server = self.manager.get_server()
self.server.serve_forever()
def get_gateway(self):
if self.manager.has_gateway():
return self.manager.gateway
else:
self.manager.add_gateway(self)
self.manager.register('gateway', lambda: self)
def _watchers(self):
try:
gateway_statistic_send = 0
@@ -379,6 +414,7 @@ class TBGatewayService:
log.info("The gateway has been stopped.")
self.tb_client.disconnect()
self.tb_client.stop()
self.manager.shutdown()
def __init_remote_configuration(self, force=False):
if (self.__config["thingsboard"].get("remoteConfiguration") or force) and self.__remote_configurator is None:
@@ -1268,6 +1304,31 @@ class TBGatewayService:
sleep(check_devices_idle_every_sec)
# GETTERS --------------------
def ping(self):
return self.name
# ----------------------------
# Storage --------------------
def get_storage_name(self):
return self._event_storage.__class__.__name__
def get_storage_events_count(self):
return self._event_storage.len()
# Connectors -----------------
def get_available_connectors(self):
return [con for con in self.available_connectors]
def get_connector_status(self, name):
return {'connected': self.available_connectors[name].is_connected()}
def get_connector_name(self, name):
return self.available_connectors[name].get_name()
def get_connector_config(self, name):
return self.available_connectors[name].get_config()
if __name__ == '__main__':
TBGatewayService(