mirror of
https://github.com/thingsboard/thingsboard-gateway
synced 2025-10-26 22:31:42 +08:00
Updated Shell
This commit is contained in:
@@ -1,16 +1,18 @@
|
||||
import cmd
|
||||
import multiprocessing.managers
|
||||
import signal
|
||||
import sys
|
||||
import signal
|
||||
import multiprocessing.managers
|
||||
from re import findall
|
||||
from threading import Thread
|
||||
from time import sleep
|
||||
|
||||
from thingsboard_gateway.gateway.tb_gateway_service import TBGatewayService
|
||||
# from thingsboard_gateway.gateway.shell_manager import ShellManager
|
||||
from thingsboard_gateway.gateway.tb_gateway_service import GatewayManager
|
||||
|
||||
|
||||
class ShellSyntaxError(Exception):
|
||||
pass
|
||||
def __str__(self):
|
||||
return 'Invalid syntax'
|
||||
|
||||
|
||||
class Shell(cmd.Cmd, Thread):
|
||||
@@ -20,28 +22,61 @@ class Shell(cmd.Cmd, Thread):
|
||||
cmd.Cmd.__init__(self, stdin=stdin, stdout=stdout)
|
||||
Thread.__init__(self, name='Gateway Shell', daemon=False)
|
||||
|
||||
self.manager = GatewayManager(address='/tmp/gateway', authkey=b'gateway')
|
||||
try:
|
||||
self.manager.connect()
|
||||
GatewayManager.register('get_gateway')
|
||||
self.gateway: TBGatewayService = self.manager.get_gateway()
|
||||
except (FileNotFoundError, AssertionError):
|
||||
self.stdout.write('Gateway Shell\n')
|
||||
self.stdout.write('=============\n')
|
||||
|
||||
# self.gateway_manager = GatewayManager(address='/tmp/gateway', authkey=b'gateway')
|
||||
# try:
|
||||
# self.gateway_manager.connect()
|
||||
# GatewayManager.register('get_gateway')
|
||||
# except (FileNotFoundError, AssertionError):
|
||||
# self.stdout.write('Gateway is not running!\n')
|
||||
# return
|
||||
|
||||
# self.connected = False
|
||||
# self.gateway = None
|
||||
# try:
|
||||
# self.connect()
|
||||
# except Exception as e:
|
||||
# self.stdout.write(e.__str__())
|
||||
|
||||
self.gateway_manager, self.gateway = self.create_manager()
|
||||
if self.gateway_manager is None or self.gateway is None:
|
||||
self.stdout.write('Gateway is not running!\n')
|
||||
return
|
||||
|
||||
self.connected = False
|
||||
self.stdout.write('Connecting to Gateway')
|
||||
while not self.connected:
|
||||
try:
|
||||
self.gateway.ping()
|
||||
self.stdout.write('\n')
|
||||
self.connected = True
|
||||
self.stdout.write('Connected to Gateway\n')
|
||||
except (multiprocessing.managers.RemoteError, KeyError, AttributeError):
|
||||
self.stdout.write('.')
|
||||
self.gateway = self.manager.get_gateway()
|
||||
self.manager.connect()
|
||||
sleep(1)
|
||||
self.command_config = {
|
||||
'storage': [
|
||||
{
|
||||
'arg': ('-n', '--name'),
|
||||
'func': self.gateway.get_storage_name
|
||||
},
|
||||
{
|
||||
'arg': ('-c', '--count'),
|
||||
'func': self.gateway.get_storage_events_count
|
||||
}
|
||||
],
|
||||
'connector': [
|
||||
{
|
||||
'arg': ('-l', '--list'),
|
||||
'func': self.gateway.get_available_connectors
|
||||
},
|
||||
{
|
||||
'arg': ('-s', '--status'),
|
||||
'func': self.gateway.get_connector_status
|
||||
},
|
||||
{
|
||||
'arg': ('-c', '--config'),
|
||||
'func': self.gateway.get_connector_config
|
||||
}
|
||||
],
|
||||
'gateway': [
|
||||
{
|
||||
'arg': ('-s', '--status'),
|
||||
'func': self.gateway.get_status
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
self.interactive = sys.__stdin__.isatty()
|
||||
|
||||
@@ -50,72 +85,130 @@ class Shell(cmd.Cmd, Thread):
|
||||
|
||||
self.start()
|
||||
|
||||
def create_manager(self, timeout=5):
|
||||
elapsed_time = 0
|
||||
gateway_manager = None
|
||||
gateway = None
|
||||
self.stdout.write('Establish connection...\n')
|
||||
while elapsed_time <= timeout:
|
||||
try:
|
||||
gateway_manager = GatewayManager(address='/tmp/gateway', authkey=b'gateway')
|
||||
gateway_manager.connect()
|
||||
GatewayManager.register('get_gateway')
|
||||
GatewayManager.register('gateway')
|
||||
gateway = self.connect(gateway_manager)
|
||||
except (FileNotFoundError, AssertionError):
|
||||
sleep(1)
|
||||
continue
|
||||
else:
|
||||
break
|
||||
finally:
|
||||
elapsed_time += 1
|
||||
|
||||
return gateway_manager, gateway
|
||||
|
||||
def connect(self, manager, timeout=5):
|
||||
elapsed_time = 0
|
||||
gateway = None
|
||||
self.connected = False
|
||||
self.stdout.write('Connecting to Gateway')
|
||||
while not self.connected and elapsed_time <= timeout:
|
||||
try:
|
||||
gateway = manager.get_gateway()
|
||||
gateway.ping()
|
||||
self.connected = True
|
||||
except (multiprocessing.managers.RemoteError, KeyError, AttributeError, BrokenPipeError):
|
||||
self.stdout.write('.')
|
||||
gateway = manager.get_gateway()
|
||||
self.connected = True
|
||||
sleep(1)
|
||||
except FileNotFoundError:
|
||||
self.stdout.write('.')
|
||||
sleep(1)
|
||||
continue
|
||||
finally:
|
||||
elapsed_time += 1
|
||||
|
||||
if not self.connected:
|
||||
raise RuntimeError('Timeout error')
|
||||
|
||||
self.stdout.write('\n')
|
||||
return gateway
|
||||
|
||||
def run(self):
|
||||
self.cmdloop()
|
||||
|
||||
@staticmethod
|
||||
def parser(arg: str, args):
|
||||
input_args = arg.strip().split(' ')
|
||||
opt, val = None, None
|
||||
try:
|
||||
(opt, val) = input_args
|
||||
except ValueError:
|
||||
opt = input_args[0]
|
||||
def parser(arg, args):
|
||||
input_args = findall(r'(-{1,2}[A-Za-z]+)[?^\s]?([A-Za-z\d\s]*)', arg.strip())
|
||||
|
||||
try:
|
||||
for keys, func in args.items():
|
||||
if opt in keys:
|
||||
return func(val) if val else func()
|
||||
except Exception as _:
|
||||
raise ShellSyntaxError('Invalid syntax (see `help storage`)')
|
||||
for input_arg in input_args:
|
||||
for arg in args:
|
||||
opt, val = None, None
|
||||
try:
|
||||
(opt, val) = input_arg
|
||||
except ValueError:
|
||||
opt = input_arg[0]
|
||||
|
||||
if opt in arg['arg']:
|
||||
return arg['func'](val.strip()) if val else arg['func']()
|
||||
|
||||
raise ShellSyntaxError()
|
||||
|
||||
def _print(self, *args):
|
||||
for arg in args:
|
||||
if isinstance(arg, list):
|
||||
self.stdout.write(','.join(arg))
|
||||
self.stdout.write(','.join(arg) + '\n')
|
||||
elif isinstance(arg, dict):
|
||||
for key, value in arg.items():
|
||||
self.stdout.write(str(key).capitalize() + ': ' + str(value) + '\n')
|
||||
else:
|
||||
self.stdout.write(arg + '\n')
|
||||
self.stdout.write(str(arg) + '\n')
|
||||
|
||||
# ---
|
||||
def do_gateway(self, arg):
|
||||
"""Gateway
|
||||
-s/--status: get gateway status
|
||||
"""
|
||||
try:
|
||||
self._print(self.parser(arg, self.command_config['gateway']))
|
||||
except ShellSyntaxError as e:
|
||||
self.stdout.write(e.__str__() + ' (see `help gateway`)\n')
|
||||
self.do_help('gateway')
|
||||
except (FileNotFoundError, AttributeError, BrokenPipeError):
|
||||
self.stdout.write('Connection is lost...\n')
|
||||
self.gateway_manager, self.gateway = self.create_manager()
|
||||
|
||||
def do_storage(self, arg):
|
||||
"""Storage
|
||||
-n/--name: name of storage
|
||||
-c/--count: events in storage
|
||||
"""
|
||||
args = {
|
||||
('-n', '--name'): self.gateway.get_storage_name,
|
||||
('-c', '--count'): self.gateway.get_storage_events_count
|
||||
}
|
||||
try:
|
||||
self._print(self.parser(arg, args))
|
||||
self._print(self.parser(arg, self.command_config['storage']))
|
||||
except ShellSyntaxError as e:
|
||||
self.stdout.write(e.__str__() + '\n')
|
||||
self.stdout.write(e.__str__() + ' (see `help storage`)\n')
|
||||
self.do_help('storage')
|
||||
|
||||
def do_connector(self, arg):
|
||||
"""Connector
|
||||
-a/--available: list of available connectors
|
||||
-l/--list: list of available connectors
|
||||
-s/--status <name>: get connector status
|
||||
-n/--name <name>: get connector name
|
||||
-c/--config <name>:
|
||||
-c/--config <name>: get connector config
|
||||
"""
|
||||
args = {
|
||||
('-a', '--available'): self.gateway.get_available_connectors,
|
||||
('-s', '--status'): self.gateway.get_connector_status,
|
||||
('-n', '--name'): self.gateway.get_connector_name,
|
||||
('-c', '--config'): self.gateway.get_connector_config
|
||||
}
|
||||
|
||||
try:
|
||||
self._print(self.parser(arg, args))
|
||||
self._print(self.parser(arg, self.command_config['connector']))
|
||||
except ShellSyntaxError as e:
|
||||
self.stdout.write(e.__str__() + '\n')
|
||||
self.stdout.write(e.__str__() + ' (see `help connector`)\n')
|
||||
self.do_help('connector')
|
||||
except (FileNotFoundError, AttributeError):
|
||||
self.stdout.write('Connection is lost...\n')
|
||||
self.connect()
|
||||
|
||||
# ---
|
||||
@staticmethod
|
||||
def interrupt(_, __):
|
||||
sys.stderr.write("\nKeyboard interrupt trapped - use EOF to end\n")
|
||||
sys.stderr.write("\nKeyboard interrupt trapped - use Ctrl+d or Cmd+d to end\n")
|
||||
|
||||
def precmd(self, line: str) -> str:
|
||||
return line.strip()
|
||||
@@ -128,9 +221,11 @@ class Shell(cmd.Cmd, Thread):
|
||||
|
||||
@staticmethod
|
||||
def do_exit(_):
|
||||
"""Exit"""
|
||||
return -1
|
||||
|
||||
def do_EOF(self, args):
|
||||
"""EOF"""
|
||||
return self.do_exit(args)
|
||||
|
||||
|
||||
|
||||
@@ -35,6 +35,7 @@ from thingsboard_gateway.gateway.constant_enums import DeviceActions, Status
|
||||
from thingsboard_gateway.gateway.constants import CONNECTED_DEVICES_FILENAME, CONNECTOR_PARAMETER, \
|
||||
PERSISTENT_GRPC_CONNECTORS_KEY_FILENAME
|
||||
from thingsboard_gateway.gateway.duplicate_detector import DuplicateDetector
|
||||
# from thingsboard_gateway.gateway.shell_manager import ShellManager
|
||||
from thingsboard_gateway.gateway.statistics_service import StatisticsService
|
||||
from thingsboard_gateway.gateway.tb_client import TBClient
|
||||
from thingsboard_gateway.storage.file.file_event_storage import FileEventStorage
|
||||
@@ -132,6 +133,7 @@ class GatewayManager(multiprocessing.managers.BaseManager):
|
||||
class TBGatewayService:
|
||||
EXPOSED_GETTERS = [
|
||||
'ping',
|
||||
'get_status',
|
||||
'get_storage_name',
|
||||
'get_storage_events_count',
|
||||
'get_available_connectors',
|
||||
@@ -414,6 +416,8 @@ class TBGatewayService:
|
||||
log.info("The gateway has been stopped.")
|
||||
self.tb_client.disconnect()
|
||||
self.tb_client.stop()
|
||||
# self.shell_manager.connect()
|
||||
# self.shell_manager.gateway_stopped()
|
||||
self.manager.shutdown()
|
||||
|
||||
def __init_remote_configuration(self, force=False):
|
||||
@@ -1318,7 +1322,7 @@ class TBGatewayService:
|
||||
|
||||
# Connectors -----------------
|
||||
def get_available_connectors(self):
|
||||
return [con for con in self.available_connectors]
|
||||
return {num + 1: name for (num, name) in enumerate(self.available_connectors)}
|
||||
|
||||
def get_connector_status(self, name):
|
||||
return {'connected': self.available_connectors[name].is_connected()}
|
||||
@@ -1329,6 +1333,10 @@ class TBGatewayService:
|
||||
def get_connector_config(self, name):
|
||||
return self.available_connectors[name].get_config()
|
||||
|
||||
# Gateway ----------------------
|
||||
def get_status(self):
|
||||
return {'connected': self.tb_client.is_connected()}
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
TBGatewayService(
|
||||
|
||||
Reference in New Issue
Block a user