diff options
author | Egor Tensin <Egor.Tensin@gmail.com> | 2022-04-11 03:48:03 +0200 |
---|---|---|
committer | Egor Tensin <Egor.Tensin@gmail.com> | 2022-04-11 03:48:03 +0200 |
commit | 997196078d03e3ed7171368a4734ca16d6394f63 (patch) | |
tree | 8904f7f7d3d2b266b08bc55af0a865cef509fd99 /src/app.py | |
parent | debian: 1.2-2 (diff) | |
parent | README: update (diff) | |
download | linux-status-997196078d03e3ed7171368a4734ca16d6394f63.tar.gz linux-status-997196078d03e3ed7171368a4734ca16d6394f63.zip |
Merge tag 'v2.0' into debian
Diffstat (limited to '')
-rwxr-xr-x | src/app.py (renamed from app.py) | 251 |
1 files changed, 157 insertions, 94 deletions
@@ -65,10 +65,6 @@ def split_by(xs, sep): yield group -def hostname(): - return socket.gethostname() - - class Response: DEFAULT_STATUS = http.server.HTTPStatus.OK @@ -126,6 +122,37 @@ def run_do(*args, **kwargs): return output.stdout +class Task(abc.ABC): + def complete(self): + self.run() + return Response(Response.body_from_json(self.result())) + + @abc.abstractmethod + def run(self): + pass + + @abc.abstractmethod + def result(self): + pass + + +class TaskList(Task): + def __init__(self, tasks): + self.tasks = tasks + + def add(self, name, task): + if name in self.tasks: + raise RuntimeError(f'duplicate task name: {name}') + self.tasks[name] = task + + def run(self): + for task in self.tasks.values(): + task.run() + + def result(self): + return {name: task.result() for name, task in self.tasks.items()} + + pool = ThreadPoolExecutor() @@ -133,13 +160,20 @@ def run(*args, **kwargs): return pool.submit(run_do, *args, **kwargs) -class Command: +class Command(Task): def __init__(self, *args): self.args = args self.env = None def run(self): - return run(*self.args, env=self.env) + self.task = run(*self.args, env=self.env) + return self.task + + def result(self): + return self.task.result() + + def now(self): + return self.run().result() class Systemd(Command): @@ -202,31 +236,95 @@ class Loginctl(Systemd): super().__init__('loginctl', *args, '--full') -class Task(abc.ABC): - def complete(self): - self.run() - return Response(Response.body_from_json(self.result())) +class Docker(Command): + def __init__(self, *args): + super().__init__('docker', *args) - @abc.abstractmethod - def run(self): - pass - @abc.abstractmethod - def result(self): - pass +class DockerVersion(Docker): + def __init__(self, *args): + super().__init__('version') + + @staticmethod + def is_daemon_running(): + try: + DockerVersion().now() + return True + except: + return False -class TopTask(Task): - COMMAND = None +class DockerPs(Docker): + def __init__(self, *args): + super().__init__('ps', *args) + @staticmethod + def quiet(*args): + return DockerPs('--quiet', *args) + + @staticmethod + def get_all_ids(): + cmd = DockerPs.quiet('--all') + return cmd.now().splitlines() + + +class DockerInspect(Docker): + # This is pretty cool. I wanted to separate container entries with \0, and + # this is the best I could come up with. Note that a newline still gets + # added at the end. + FORMAT = '{{printf "%s%c" (json .) 0}}' + + def __init__(self, *args): + super().__init__('inspect', f'--format={DockerInspect.FORMAT}', *args) + + +class DockerStatus(DockerInspect): def __init__(self): - self.cmd = TopTask.get_command() + self.containers = DockerPs.get_all_ids() + super().__init__(*self.containers) def run(self): - self.task = self.cmd.run() + if not self.containers: + # `docker inspect` requires at least one container argument. + return '' + return super().run() def result(self): - return self.task.result() + if not self.containers: + # `docker inspect` requires at least one container argument. + return [] + result = super().result() + result = result.split('\0') + result = [json.loads(info) for info in result if info.strip()] + result = [DockerStatus.filter_info(info) for info in result] + return result + + @staticmethod + def filter_info(info): + assert info['Name'][0] == '/' + return { + 'exit_code': info['State']['ExitCode'], + 'health': info['State'].get('Health', {}).get('Status', None), + 'image': info['Config']['Image'], + # Strip the leading /: + 'name': info['Name'][1:], + 'started_at': info['State']['StartedAt'], + 'status': info['State']['Status'], + } + +class Hostname(Task): + def run(self): + pass + + def result(self): + return socket.gethostname() + + +class Top(Command): + COMMAND = None + + def __init__(self): + super().__init__(*Top.get_command()) @staticmethod def get_command(): @@ -234,66 +332,45 @@ class TopTask(Task): # from the command line (another option is the rc file, but that's too # complicated). For that, we simply run `top -h` once, and check if # the output contains the flags we want to use. - if TopTask.COMMAND is not None: - return TopTask.COMMAND + if Top.COMMAND is not None: + return Top.COMMAND help_output = run_do('top', '-h') args = ['top', '-b', '-n', '1', '-w', '512'] if 'Ee' in help_output: args += ['-E', 'm', '-e', 'm'] - TopTask.COMMAND = Command(*args) - return TopTask.COMMAND + Top.COMMAND = args + return Top.COMMAND -class RebootTask(Task): +class Reboot(Command): def __init__(self): - self.cmd = Command('systemctl', 'reboot') - - def run(self): - self.task = self.cmd.run() - - def result(self): - return self.task.result() + super().__init__('systemctl', 'reboot') -class PoweroffTask(Task): +class Poweroff(Command): def __init__(self): - self.cmd = Command('systemctl', 'poweroff') - - def run(self): - self.task = self.cmd.run() - - def result(self): - return self.task.result() + super().__init__('systemctl', 'poweroff') -class InstanceStatusTask(Task): +class InstanceStatus(TaskList): def __init__(self, systemctl, journalctl): - self.overview_cmd = systemctl('status') - self.failed_cmd = systemctl('list-units', '--failed') - self.timers_cmd = systemctl('list-timers', '--all') - self.journal_cmd = journalctl('-b', '--lines=20') - - def run(self): - self.overview_task = self.overview_cmd.run() - self.failed_task = self.failed_cmd.run() - self.timers_task = self.timers_cmd.run() - self.journal_task = self.journal_cmd.run() - - def result(self): - return { - 'overview': self.overview_task.result(), - 'failed': self.failed_task.result(), - 'timers': self.timers_task.result(), - 'journal': self.journal_task.result(), + tasks = { + 'overview': systemctl('status'), + 'failed': systemctl('list-units', '--failed'), + 'timers': systemctl('list-timers', '--all'), + 'journal': journalctl('-b', '--lines=20'), } + super().__init__(tasks) -class SystemInstanceStatusTask(InstanceStatusTask): +class SystemStatus(InstanceStatus): def __init__(self): super().__init__(Systemctl.system, Journalctl.system) + if DockerVersion.is_daemon_running(): + self.add('docker', DockerStatus()) -class UserInstanceStatusTask(InstanceStatusTask): +class UserStatus(InstanceStatus): def __init__(self, systemctl=Systemctl.user, journalctl=Journalctl.user): super().__init__(systemctl, journalctl) @@ -301,45 +378,31 @@ class UserInstanceStatusTask(InstanceStatusTask): def su(user): systemctl = lambda *args: Systemd.su(user, Systemctl.user(*args)) journalctl = lambda *args: Systemd.su(user, Journalctl.user(*args)) - return UserInstanceStatusTask(systemctl, journalctl) + return UserStatus(systemctl, journalctl) -class UserInstanceStatusTaskList(Task): +class UserStatusList(TaskList): def __init__(self): if running_as_root(): # As root, we can query all the user instances. - self.tasks = {user.name: UserInstanceStatusTask.su(user) for user in systemd_users()} + tasks = {user.name: UserStatus.su(user) for user in systemd_users()} else: # As a regular user, we can only query ourselves. - self.tasks = {} + tasks = {} user = get_current_user() if user_instance_active(user): - self.tasks[user.name] = UserInstanceStatusTask() + tasks[user.name] = UserStatus() + super().__init__(tasks) - def run(self): - for task in self.tasks.values(): - task.run() - - def result(self): - return {name: task.result() for name, task in self.tasks.items()} - -class StatusTask(Task): +class Status(TaskList): def __init__(self): - self.hostname = hostname() - self.system = SystemInstanceStatusTask() - self.user = UserInstanceStatusTaskList() - - def run(self): - self.system.run() - self.user.run() - - def result(self): - return { - 'hostname': self.hostname, - 'system': self.system.result(), - 'user': self.user.result(), + tasks = { + 'hostname': Hostname(), + 'system': SystemStatus(), + 'user': UserStatusList(), } + super().__init__(tasks) User = namedtuple('User', ['uid', 'name']) @@ -373,7 +436,7 @@ def user_instance_active(user): unit_name = f'user@{user.uid}.service' cmd = Systemctl.system('is-active', unit_name, '--quiet') try: - cmd.run().result() + cmd.now() return True except Exception: return False @@ -413,7 +476,7 @@ def human_users(): # that were running a systemd instance. def systemd_users(): def list_users(): - output = Loginctl('list-users', '--no-legend').run().result() + output = Loginctl('list-users', '--no-legend').now() lines = output.splitlines() if not lines: return @@ -432,7 +495,7 @@ def systemd_users(): return None properties = 'UID', 'Name', 'RuntimePath' prop_args = (arg for prop in properties for arg in ('-p', prop)) - output = Loginctl('show-user', *prop_args, '--value', *user_args).run().result() + output = Loginctl('show-user', *prop_args, '--value', *user_args).now() lines = output.splitlines() # Assuming that for muptiple users, the properties will be separated by # an empty string. @@ -462,13 +525,13 @@ class Request(Enum): def process(self): if self is Request.STATUS: - return StatusTask().complete() + return Status().complete() if self is Request.TOP: - return TopTask().complete() + return Top().complete() if self is Request.REBOOT: - return RebootTask().complete() + return Reboot().complete() if self is Request.POWEROFF: - return PoweroffTask().complete() + return Poweroff().complete() raise NotImplementedError(f'unknown request: {self}') |