aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/src/app.py
diff options
context:
space:
mode:
authorEgor Tensin <Egor.Tensin@gmail.com>2022-04-11 03:48:03 +0200
committerEgor Tensin <Egor.Tensin@gmail.com>2022-04-11 03:48:03 +0200
commit997196078d03e3ed7171368a4734ca16d6394f63 (patch)
tree8904f7f7d3d2b266b08bc55af0a865cef509fd99 /src/app.py
parentdebian: 1.2-2 (diff)
parentREADME: update (diff)
downloadlinux-status-997196078d03e3ed7171368a4734ca16d6394f63.tar.gz
linux-status-997196078d03e3ed7171368a4734ca16d6394f63.zip
Merge tag 'v2.0' into debian
Diffstat (limited to 'src/app.py')
-rwxr-xr-xsrc/app.py549
1 files changed, 549 insertions, 0 deletions
diff --git a/src/app.py b/src/app.py
new file mode 100755
index 0000000..5af55af
--- /dev/null
+++ b/src/app.py
@@ -0,0 +1,549 @@
+#!/usr/bin/env python3
+
+# Copyright (c) 2021 Egor Tensin <Egor.Tensin@gmail.com>
+# This file is part of the "linux-status" project.
+# For details, see https://github.com/egor-tensin/linux-status.
+# Distributed under the MIT License.
+
+# Initially it was just index.html and a bunch of shell scripts in cgi-bin.
+# The web server could be launched using just `python -m http.server --cgi`,
+# and all was well. Until I decided that I want to package this little app
+# of mine so that it can be easily installed and run on a variety of systems.
+# There was a problem with that, however: the app was designed to be run by
+# a regular user, not root (mostly due to all the `systemctl --user` calls),
+# and it didn't play nicely with the whole package idea. Plus, I've long
+# wanted to make it show all the systemd user instances, not just the running
+# user's. In addition, I wanted to dedupe the small chunks of code in cgi-bin.
+#
+# So this script was born. It could handle a lot more complexity than the
+# shell scripts; it could be run as root and as a regular user, etc. It was
+# still a CGI script (and it still is BTW), and a separate Python instance was
+# used by the web server to execute it.
+#
+# There was a problem, of course. The thing is, I for once had a very strict
+# performance requirement for this script: it had to work nicely on my rather
+# old Raspberry Pi 1 Model B+. It, being a Python script, didn't. After
+# some investigation, it was obvious that the main problem were the imports
+# (especially those of the cgi & logging modules); they would take more time
+# than the interval between requests. So I had to find a way to make it more
+# performant.
+#
+# I played with Lua for a bit, but couldn't quite make it work in a
+# satisfactory way. Then I had an idea: I could use http.server's HTTP server
+# classes, overriding them to handle the "CGI" requests using the code in this
+# script. That way, the imports would only be done once for the entire life
+# of the server. In addition, no more tricks with sudo were required
+# (http.server runs CGI scripts as user nobody, which is a no-no for this app;
+# some trickery like whitelisting nobody in /etc/sudoers.d were employed).
+#
+# So that's where we are now, and it works! Doesn't load my good ol' Raspberry
+# no more.
+
+import abc
+import cgi
+from collections import namedtuple
+from concurrent.futures import ThreadPoolExecutor
+from enum import Enum
+import http.server
+import json
+import os
+import pwd
+import shlex
+import socket
+import subprocess
+from subprocess import DEVNULL, PIPE, STDOUT
+
+
+def split_by(xs, sep):
+ group = []
+ for x in xs:
+ if x == sep:
+ yield group
+ group = []
+ else:
+ group.append(x)
+ yield group
+
+
+class Response:
+ DEFAULT_STATUS = http.server.HTTPStatus.OK
+
+ @staticmethod
+ def body_from_json(body):
+ return json.dumps(body, ensure_ascii=False, indent=4)
+
+ def __init__(self, body, status=None):
+ if status is None:
+ status = Response.DEFAULT_STATUS
+ self.status = status
+ self.body = body
+
+ def headers(self):
+ yield 'Content-Type', 'text/html; charset=utf-8'
+
+ def encode_body(self):
+ return self.body.encode(errors='replace')
+
+ def write_as_cgi_script(self):
+ self.write_headers_as_cgi_script()
+ self.write_body_as_cgi_script()
+
+ def write_headers_as_cgi_script(self):
+ for name, val in self.headers():
+ print(f'{name}: {val}')
+ print()
+
+ def write_body_as_cgi_script(self):
+ if self.body is not None:
+ print(self.body)
+
+ def write_to_request_handler(self, handler):
+ handler.send_response(self.status)
+ self.write_headers_to_request_handler(handler)
+ self.write_body_to_request_handler(handler)
+
+ def write_headers_to_request_handler(self, handler):
+ for name, val in self.headers():
+ handler.send_header(name, val)
+ handler.end_headers()
+
+ def write_body_to_request_handler(self, handler):
+ if self.body is not None:
+ handler.wfile.write(self.encode_body())
+
+
+def run_do(*args, **kwargs):
+ output = subprocess.run(args, stdin=DEVNULL, stdout=PIPE, stderr=STDOUT, universal_newlines=True, **kwargs)
+ # Include the output in the exception's message:
+ try:
+ output.check_returncode()
+ except Exception as e:
+ raise RuntimeError("Command's output was this:\n" + output.stdout) from e
+ return output.stdout
+
+
+class Task(abc.ABC):
+ def complete(self):
+ self.run()
+ return Response(Response.body_from_json(self.result()))
+
+ @abc.abstractmethod
+ def run(self):
+ pass
+
+ @abc.abstractmethod
+ def result(self):
+ pass
+
+
+class TaskList(Task):
+ def __init__(self, tasks):
+ self.tasks = tasks
+
+ def add(self, name, task):
+ if name in self.tasks:
+ raise RuntimeError(f'duplicate task name: {name}')
+ self.tasks[name] = task
+
+ def run(self):
+ for task in self.tasks.values():
+ task.run()
+
+ def result(self):
+ return {name: task.result() for name, task in self.tasks.items()}
+
+
+pool = ThreadPoolExecutor()
+
+
+def run(*args, **kwargs):
+ return pool.submit(run_do, *args, **kwargs)
+
+
+class Command(Task):
+ def __init__(self, *args):
+ self.args = args
+ self.env = None
+
+ def run(self):
+ self.task = run(*self.args, env=self.env)
+ return self.task
+
+ def result(self):
+ return self.task.result()
+
+ def now(self):
+ return self.run().result()
+
+
+class Systemd(Command):
+ def __init__(self, *args):
+ super().__init__(*args)
+ self.env = self.make_env()
+
+ @staticmethod
+ def make_env():
+ env = os.environ.copy()
+ env['SYSTEMD_PAGER'] = ''
+ env['SYSTEMD_COLORS'] = 'no'
+ return env
+
+ @staticmethod
+ def su(user, cmd):
+ new = Systemd('su', '-c', shlex.join(cmd.args), user.name)
+ new.env = Systemd.fix_su_env(user, cmd.env.copy())
+ return new
+
+ @staticmethod
+ def fix_su_env(user, env):
+ # https://unix.stackexchange.com/q/483948
+ # https://unix.stackexchange.com/q/346841
+ # https://unix.stackexchange.com/q/423632
+ # https://unix.stackexchange.com/q/245768
+ # https://unix.stackexchange.com/q/434494
+ env['XDG_RUNTIME_DIR'] = user.runtime_dir
+ # I'm not sure the bus part works everywhere.
+ bus_path = os.path.join(user.runtime_dir, 'bus')
+ env['DBUS_SESSION_BUS_ADDRESS'] = 'unix:path=' + bus_path
+ return env
+
+
+class Ctl(Systemd):
+ def __init__(self, executable, *args):
+ super().__init__(executable, *args, '--full')
+
+ @classmethod
+ def system(cls, *args):
+ return cls('--system', *args)
+
+ @classmethod
+ def user(cls, *args):
+ return cls('--user', *args)
+
+
+class Systemctl(Ctl):
+ def __init__(self, *args):
+ super().__init__('systemctl', *args)
+
+
+class Journalctl(Ctl):
+ def __init__(self, *args):
+ super().__init__('journalctl', *args)
+
+
+class Loginctl(Systemd):
+ def __init__(self, *args):
+ super().__init__('loginctl', *args, '--full')
+
+
+class Docker(Command):
+ def __init__(self, *args):
+ super().__init__('docker', *args)
+
+
+class DockerVersion(Docker):
+ def __init__(self, *args):
+ super().__init__('version')
+
+ @staticmethod
+ def is_daemon_running():
+ try:
+ DockerVersion().now()
+ return True
+ except:
+ return False
+
+
+class DockerPs(Docker):
+ def __init__(self, *args):
+ super().__init__('ps', *args)
+
+ @staticmethod
+ def quiet(*args):
+ return DockerPs('--quiet', *args)
+
+ @staticmethod
+ def get_all_ids():
+ cmd = DockerPs.quiet('--all')
+ return cmd.now().splitlines()
+
+
+class DockerInspect(Docker):
+ # This is pretty cool. I wanted to separate container entries with \0, and
+ # this is the best I could come up with. Note that a newline still gets
+ # added at the end.
+ FORMAT = '{{printf "%s%c" (json .) 0}}'
+
+ def __init__(self, *args):
+ super().__init__('inspect', f'--format={DockerInspect.FORMAT}', *args)
+
+
+class DockerStatus(DockerInspect):
+ def __init__(self):
+ self.containers = DockerPs.get_all_ids()
+ super().__init__(*self.containers)
+
+ def run(self):
+ if not self.containers:
+ # `docker inspect` requires at least one container argument.
+ return ''
+ return super().run()
+
+ def result(self):
+ if not self.containers:
+ # `docker inspect` requires at least one container argument.
+ return []
+ result = super().result()
+ result = result.split('\0')
+ result = [json.loads(info) for info in result if info.strip()]
+ result = [DockerStatus.filter_info(info) for info in result]
+ return result
+
+ @staticmethod
+ def filter_info(info):
+ assert info['Name'][0] == '/'
+ return {
+ 'exit_code': info['State']['ExitCode'],
+ 'health': info['State'].get('Health', {}).get('Status', None),
+ 'image': info['Config']['Image'],
+ # Strip the leading /:
+ 'name': info['Name'][1:],
+ 'started_at': info['State']['StartedAt'],
+ 'status': info['State']['Status'],
+ }
+
+class Hostname(Task):
+ def run(self):
+ pass
+
+ def result(self):
+ return socket.gethostname()
+
+
+class Top(Command):
+ COMMAND = None
+
+ def __init__(self):
+ super().__init__(*Top.get_command())
+
+ @staticmethod
+ def get_command():
+ # On more modern versions of top, we want to enable memory scaling
+ # from the command line (another option is the rc file, but that's too
+ # complicated). For that, we simply run `top -h` once, and check if
+ # the output contains the flags we want to use.
+ if Top.COMMAND is not None:
+ return Top.COMMAND
+ help_output = run_do('top', '-h')
+ args = ['top', '-b', '-n', '1', '-w', '512']
+ if 'Ee' in help_output:
+ args += ['-E', 'm', '-e', 'm']
+ Top.COMMAND = args
+ return Top.COMMAND
+
+
+class Reboot(Command):
+ def __init__(self):
+ super().__init__('systemctl', 'reboot')
+
+
+class Poweroff(Command):
+ def __init__(self):
+ super().__init__('systemctl', 'poweroff')
+
+
+class InstanceStatus(TaskList):
+ def __init__(self, systemctl, journalctl):
+ tasks = {
+ 'overview': systemctl('status'),
+ 'failed': systemctl('list-units', '--failed'),
+ 'timers': systemctl('list-timers', '--all'),
+ 'journal': journalctl('-b', '--lines=20'),
+ }
+ super().__init__(tasks)
+
+
+class SystemStatus(InstanceStatus):
+ def __init__(self):
+ super().__init__(Systemctl.system, Journalctl.system)
+ if DockerVersion.is_daemon_running():
+ self.add('docker', DockerStatus())
+
+
+class UserStatus(InstanceStatus):
+ def __init__(self, systemctl=Systemctl.user, journalctl=Journalctl.user):
+ super().__init__(systemctl, journalctl)
+
+ @staticmethod
+ def su(user):
+ systemctl = lambda *args: Systemd.su(user, Systemctl.user(*args))
+ journalctl = lambda *args: Systemd.su(user, Journalctl.user(*args))
+ return UserStatus(systemctl, journalctl)
+
+
+class UserStatusList(TaskList):
+ def __init__(self):
+ if running_as_root():
+ # As root, we can query all the user instances.
+ tasks = {user.name: UserStatus.su(user) for user in systemd_users()}
+ else:
+ # As a regular user, we can only query ourselves.
+ tasks = {}
+ user = get_current_user()
+ if user_instance_active(user):
+ tasks[user.name] = UserStatus()
+ super().__init__(tasks)
+
+
+class Status(TaskList):
+ def __init__(self):
+ tasks = {
+ 'hostname': Hostname(),
+ 'system': SystemStatus(),
+ 'user': UserStatusList(),
+ }
+ super().__init__(tasks)
+
+
+User = namedtuple('User', ['uid', 'name'])
+SystemdUser = namedtuple('SystemdUser', ['uid', 'name', 'runtime_dir'])
+
+
+def running_as_root():
+ # AFAIK, Python's http.server drops root privileges and executes the scripts
+ # as user nobody.
+ # It does no such thing if run as a regular user though.
+ return os.geteuid() == 0 or running_as_nobody()
+
+
+def running_as_nobody():
+ for user in users():
+ if user.name == 'nobody':
+ return user.uid == os.geteuid()
+ return False
+
+
+def get_current_user():
+ uid = os.getuid()
+ entry = pwd.getpwuid(uid)
+ return User(entry.pw_uid, entry.pw_name)
+
+
+def user_instance_active(user):
+ # I'm pretty sure this is the way to determine if the user instance is
+ # running?
+ # Source: https://www.freedesktop.org/software/systemd/man/user@.service.html
+ unit_name = f'user@{user.uid}.service'
+ cmd = Systemctl.system('is-active', unit_name, '--quiet')
+ try:
+ cmd.now()
+ return True
+ except Exception:
+ return False
+
+
+# A pitiful attempt to find a list of possibly-systemd-enabled users follows
+# (i.e. users that might be running a per-user systemd instance).
+# I don't know of a better way than probing /run/user/UID.
+# Maybe running something like `loginctl list-sessions` would be better?
+
+# These are the default values, see [1].
+# The actual values are specified in /etc/login.defs.
+# Still, they can be overridden on the command line, so I don't think there
+# actually is a way to list non-system users (imma call them "human").
+#
+# [1]: https://man7.org/linux/man-pages/man8/useradd.8.html
+UID_MIN = 1000
+UID_MAX = 60000
+
+
+def users():
+ for entry in pwd.getpwall():
+ yield User(entry.pw_uid, entry.pw_name)
+
+
+def human_users():
+ for user in users():
+ if user.uid < UID_MIN:
+ continue
+ if user.uid > UID_MAX:
+ continue
+ yield user
+
+
+# You know what, loginctl might just be the answer.
+# Namely, `loginctl list-users`: during my testing, it did only list the users
+# that were running a systemd instance.
+def systemd_users():
+ def list_users():
+ output = Loginctl('list-users', '--no-legend').now()
+ lines = output.splitlines()
+ if not lines:
+ return
+ for line in lines:
+ # This assumes user names cannot contain spaces.
+ # loginctl list-users output must be in the UID NAME format.
+ info = line.lstrip().split(' ', 2)
+ if len(info) < 2:
+ raise RuntimeError(f'invalid `loginctl list-users` output:\n{output}')
+ uid, user = info[0], info[1]
+ yield User(uid, user)
+
+ def show_users(users):
+ user_args = [user.name for user in users]
+ if not user_args:
+ return None
+ properties = 'UID', 'Name', 'RuntimePath'
+ prop_args = (arg for prop in properties for arg in ('-p', prop))
+ output = Loginctl('show-user', *prop_args, '--value', *user_args).now()
+ lines = output.splitlines()
+ # Assuming that for muptiple users, the properties will be separated by
+ # an empty string.
+ groups = split_by(lines, '')
+ for group in groups:
+ if len(group) != len(properties):
+ raise RuntimeError(f'invalid `loginctl show-user` output:\n{output}')
+ yield SystemdUser(int(group[0]), group[1], group[2])
+
+ return show_users(list_users())
+
+
+class Request(Enum):
+ STATUS = 'status'
+ TOP = 'top'
+ REBOOT = 'reboot'
+ POWEROFF = 'poweroff'
+
+ def __str__(self):
+ return self.value
+
+ @staticmethod
+ def from_http_path(path):
+ if not path or path[0] != '/':
+ raise ValueError('HTTP path must start with a forward slash /')
+ return Request(path[1:])
+
+ def process(self):
+ if self is Request.STATUS:
+ return Status().complete()
+ if self is Request.TOP:
+ return Top().complete()
+ if self is Request.REBOOT:
+ return Reboot().complete()
+ if self is Request.POWEROFF:
+ return Poweroff().complete()
+ raise NotImplementedError(f'unknown request: {self}')
+
+
+def process_cgi_request():
+ params = cgi.FieldStorage()
+ what = params['what'].value
+ Request(what).process().write_as_cgi_script()
+
+
+def main():
+ process_cgi_request()
+
+
+if __name__ == '__main__':
+ main()