From 609fea008b0091715bcdba1239b8126069c3473e Mon Sep 17 00:00:00 2001 From: Egor Tensin Date: Mon, 11 Apr 2022 01:26:12 +0200 Subject: put things into proper directories --- app.py | 549 ----------------------------------------------------------------- 1 file changed, 549 deletions(-) delete mode 100755 app.py (limited to 'app.py') diff --git a/app.py b/app.py deleted file mode 100755 index 5af55af..0000000 --- a/app.py +++ /dev/null @@ -1,549 +0,0 @@ -#!/usr/bin/env python3 - -# Copyright (c) 2021 Egor Tensin -# This file is part of the "linux-status" project. -# For details, see https://github.com/egor-tensin/linux-status. -# Distributed under the MIT License. - -# Initially it was just index.html and a bunch of shell scripts in cgi-bin. -# The web server could be launched using just `python -m http.server --cgi`, -# and all was well. Until I decided that I want to package this little app -# of mine so that it can be easily installed and run on a variety of systems. -# There was a problem with that, however: the app was designed to be run by -# a regular user, not root (mostly due to all the `systemctl --user` calls), -# and it didn't play nicely with the whole package idea. Plus, I've long -# wanted to make it show all the systemd user instances, not just the running -# user's. In addition, I wanted to dedupe the small chunks of code in cgi-bin. -# -# So this script was born. It could handle a lot more complexity than the -# shell scripts; it could be run as root and as a regular user, etc. It was -# still a CGI script (and it still is BTW), and a separate Python instance was -# used by the web server to execute it. -# -# There was a problem, of course. The thing is, I for once had a very strict -# performance requirement for this script: it had to work nicely on my rather -# old Raspberry Pi 1 Model B+. It, being a Python script, didn't. After -# some investigation, it was obvious that the main problem were the imports -# (especially those of the cgi & logging modules); they would take more time -# than the interval between requests. So I had to find a way to make it more -# performant. -# -# I played with Lua for a bit, but couldn't quite make it work in a -# satisfactory way. Then I had an idea: I could use http.server's HTTP server -# classes, overriding them to handle the "CGI" requests using the code in this -# script. That way, the imports would only be done once for the entire life -# of the server. In addition, no more tricks with sudo were required -# (http.server runs CGI scripts as user nobody, which is a no-no for this app; -# some trickery like whitelisting nobody in /etc/sudoers.d were employed). -# -# So that's where we are now, and it works! Doesn't load my good ol' Raspberry -# no more. - -import abc -import cgi -from collections import namedtuple -from concurrent.futures import ThreadPoolExecutor -from enum import Enum -import http.server -import json -import os -import pwd -import shlex -import socket -import subprocess -from subprocess import DEVNULL, PIPE, STDOUT - - -def split_by(xs, sep): - group = [] - for x in xs: - if x == sep: - yield group - group = [] - else: - group.append(x) - yield group - - -class Response: - DEFAULT_STATUS = http.server.HTTPStatus.OK - - @staticmethod - def body_from_json(body): - return json.dumps(body, ensure_ascii=False, indent=4) - - def __init__(self, body, status=None): - if status is None: - status = Response.DEFAULT_STATUS - self.status = status - self.body = body - - def headers(self): - yield 'Content-Type', 'text/html; charset=utf-8' - - def encode_body(self): - return self.body.encode(errors='replace') - - def write_as_cgi_script(self): - self.write_headers_as_cgi_script() - self.write_body_as_cgi_script() - - def write_headers_as_cgi_script(self): - for name, val in self.headers(): - print(f'{name}: {val}') - print() - - def write_body_as_cgi_script(self): - if self.body is not None: - print(self.body) - - def write_to_request_handler(self, handler): - handler.send_response(self.status) - self.write_headers_to_request_handler(handler) - self.write_body_to_request_handler(handler) - - def write_headers_to_request_handler(self, handler): - for name, val in self.headers(): - handler.send_header(name, val) - handler.end_headers() - - def write_body_to_request_handler(self, handler): - if self.body is not None: - handler.wfile.write(self.encode_body()) - - -def run_do(*args, **kwargs): - output = subprocess.run(args, stdin=DEVNULL, stdout=PIPE, stderr=STDOUT, universal_newlines=True, **kwargs) - # Include the output in the exception's message: - try: - output.check_returncode() - except Exception as e: - raise RuntimeError("Command's output was this:\n" + output.stdout) from e - return output.stdout - - -class Task(abc.ABC): - def complete(self): - self.run() - return Response(Response.body_from_json(self.result())) - - @abc.abstractmethod - def run(self): - pass - - @abc.abstractmethod - def result(self): - pass - - -class TaskList(Task): - def __init__(self, tasks): - self.tasks = tasks - - def add(self, name, task): - if name in self.tasks: - raise RuntimeError(f'duplicate task name: {name}') - self.tasks[name] = task - - def run(self): - for task in self.tasks.values(): - task.run() - - def result(self): - return {name: task.result() for name, task in self.tasks.items()} - - -pool = ThreadPoolExecutor() - - -def run(*args, **kwargs): - return pool.submit(run_do, *args, **kwargs) - - -class Command(Task): - def __init__(self, *args): - self.args = args - self.env = None - - def run(self): - self.task = run(*self.args, env=self.env) - return self.task - - def result(self): - return self.task.result() - - def now(self): - return self.run().result() - - -class Systemd(Command): - def __init__(self, *args): - super().__init__(*args) - self.env = self.make_env() - - @staticmethod - def make_env(): - env = os.environ.copy() - env['SYSTEMD_PAGER'] = '' - env['SYSTEMD_COLORS'] = 'no' - return env - - @staticmethod - def su(user, cmd): - new = Systemd('su', '-c', shlex.join(cmd.args), user.name) - new.env = Systemd.fix_su_env(user, cmd.env.copy()) - return new - - @staticmethod - def fix_su_env(user, env): - # https://unix.stackexchange.com/q/483948 - # https://unix.stackexchange.com/q/346841 - # https://unix.stackexchange.com/q/423632 - # https://unix.stackexchange.com/q/245768 - # https://unix.stackexchange.com/q/434494 - env['XDG_RUNTIME_DIR'] = user.runtime_dir - # I'm not sure the bus part works everywhere. - bus_path = os.path.join(user.runtime_dir, 'bus') - env['DBUS_SESSION_BUS_ADDRESS'] = 'unix:path=' + bus_path - return env - - -class Ctl(Systemd): - def __init__(self, executable, *args): - super().__init__(executable, *args, '--full') - - @classmethod - def system(cls, *args): - return cls('--system', *args) - - @classmethod - def user(cls, *args): - return cls('--user', *args) - - -class Systemctl(Ctl): - def __init__(self, *args): - super().__init__('systemctl', *args) - - -class Journalctl(Ctl): - def __init__(self, *args): - super().__init__('journalctl', *args) - - -class Loginctl(Systemd): - def __init__(self, *args): - super().__init__('loginctl', *args, '--full') - - -class Docker(Command): - def __init__(self, *args): - super().__init__('docker', *args) - - -class DockerVersion(Docker): - def __init__(self, *args): - super().__init__('version') - - @staticmethod - def is_daemon_running(): - try: - DockerVersion().now() - return True - except: - return False - - -class DockerPs(Docker): - def __init__(self, *args): - super().__init__('ps', *args) - - @staticmethod - def quiet(*args): - return DockerPs('--quiet', *args) - - @staticmethod - def get_all_ids(): - cmd = DockerPs.quiet('--all') - return cmd.now().splitlines() - - -class DockerInspect(Docker): - # This is pretty cool. I wanted to separate container entries with \0, and - # this is the best I could come up with. Note that a newline still gets - # added at the end. - FORMAT = '{{printf "%s%c" (json .) 0}}' - - def __init__(self, *args): - super().__init__('inspect', f'--format={DockerInspect.FORMAT}', *args) - - -class DockerStatus(DockerInspect): - def __init__(self): - self.containers = DockerPs.get_all_ids() - super().__init__(*self.containers) - - def run(self): - if not self.containers: - # `docker inspect` requires at least one container argument. - return '' - return super().run() - - def result(self): - if not self.containers: - # `docker inspect` requires at least one container argument. - return [] - result = super().result() - result = result.split('\0') - result = [json.loads(info) for info in result if info.strip()] - result = [DockerStatus.filter_info(info) for info in result] - return result - - @staticmethod - def filter_info(info): - assert info['Name'][0] == '/' - return { - 'exit_code': info['State']['ExitCode'], - 'health': info['State'].get('Health', {}).get('Status', None), - 'image': info['Config']['Image'], - # Strip the leading /: - 'name': info['Name'][1:], - 'started_at': info['State']['StartedAt'], - 'status': info['State']['Status'], - } - -class Hostname(Task): - def run(self): - pass - - def result(self): - return socket.gethostname() - - -class Top(Command): - COMMAND = None - - def __init__(self): - super().__init__(*Top.get_command()) - - @staticmethod - def get_command(): - # On more modern versions of top, we want to enable memory scaling - # from the command line (another option is the rc file, but that's too - # complicated). For that, we simply run `top -h` once, and check if - # the output contains the flags we want to use. - if Top.COMMAND is not None: - return Top.COMMAND - help_output = run_do('top', '-h') - args = ['top', '-b', '-n', '1', '-w', '512'] - if 'Ee' in help_output: - args += ['-E', 'm', '-e', 'm'] - Top.COMMAND = args - return Top.COMMAND - - -class Reboot(Command): - def __init__(self): - super().__init__('systemctl', 'reboot') - - -class Poweroff(Command): - def __init__(self): - super().__init__('systemctl', 'poweroff') - - -class InstanceStatus(TaskList): - def __init__(self, systemctl, journalctl): - tasks = { - 'overview': systemctl('status'), - 'failed': systemctl('list-units', '--failed'), - 'timers': systemctl('list-timers', '--all'), - 'journal': journalctl('-b', '--lines=20'), - } - super().__init__(tasks) - - -class SystemStatus(InstanceStatus): - def __init__(self): - super().__init__(Systemctl.system, Journalctl.system) - if DockerVersion.is_daemon_running(): - self.add('docker', DockerStatus()) - - -class UserStatus(InstanceStatus): - def __init__(self, systemctl=Systemctl.user, journalctl=Journalctl.user): - super().__init__(systemctl, journalctl) - - @staticmethod - def su(user): - systemctl = lambda *args: Systemd.su(user, Systemctl.user(*args)) - journalctl = lambda *args: Systemd.su(user, Journalctl.user(*args)) - return UserStatus(systemctl, journalctl) - - -class UserStatusList(TaskList): - def __init__(self): - if running_as_root(): - # As root, we can query all the user instances. - tasks = {user.name: UserStatus.su(user) for user in systemd_users()} - else: - # As a regular user, we can only query ourselves. - tasks = {} - user = get_current_user() - if user_instance_active(user): - tasks[user.name] = UserStatus() - super().__init__(tasks) - - -class Status(TaskList): - def __init__(self): - tasks = { - 'hostname': Hostname(), - 'system': SystemStatus(), - 'user': UserStatusList(), - } - super().__init__(tasks) - - -User = namedtuple('User', ['uid', 'name']) -SystemdUser = namedtuple('SystemdUser', ['uid', 'name', 'runtime_dir']) - - -def running_as_root(): - # AFAIK, Python's http.server drops root privileges and executes the scripts - # as user nobody. - # It does no such thing if run as a regular user though. - return os.geteuid() == 0 or running_as_nobody() - - -def running_as_nobody(): - for user in users(): - if user.name == 'nobody': - return user.uid == os.geteuid() - return False - - -def get_current_user(): - uid = os.getuid() - entry = pwd.getpwuid(uid) - return User(entry.pw_uid, entry.pw_name) - - -def user_instance_active(user): - # I'm pretty sure this is the way to determine if the user instance is - # running? - # Source: https://www.freedesktop.org/software/systemd/man/user@.service.html - unit_name = f'user@{user.uid}.service' - cmd = Systemctl.system('is-active', unit_name, '--quiet') - try: - cmd.now() - return True - except Exception: - return False - - -# A pitiful attempt to find a list of possibly-systemd-enabled users follows -# (i.e. users that might be running a per-user systemd instance). -# I don't know of a better way than probing /run/user/UID. -# Maybe running something like `loginctl list-sessions` would be better? - -# These are the default values, see [1]. -# The actual values are specified in /etc/login.defs. -# Still, they can be overridden on the command line, so I don't think there -# actually is a way to list non-system users (imma call them "human"). -# -# [1]: https://man7.org/linux/man-pages/man8/useradd.8.html -UID_MIN = 1000 -UID_MAX = 60000 - - -def users(): - for entry in pwd.getpwall(): - yield User(entry.pw_uid, entry.pw_name) - - -def human_users(): - for user in users(): - if user.uid < UID_MIN: - continue - if user.uid > UID_MAX: - continue - yield user - - -# You know what, loginctl might just be the answer. -# Namely, `loginctl list-users`: during my testing, it did only list the users -# that were running a systemd instance. -def systemd_users(): - def list_users(): - output = Loginctl('list-users', '--no-legend').now() - lines = output.splitlines() - if not lines: - return - for line in lines: - # This assumes user names cannot contain spaces. - # loginctl list-users output must be in the UID NAME format. - info = line.lstrip().split(' ', 2) - if len(info) < 2: - raise RuntimeError(f'invalid `loginctl list-users` output:\n{output}') - uid, user = info[0], info[1] - yield User(uid, user) - - def show_users(users): - user_args = [user.name for user in users] - if not user_args: - return None - properties = 'UID', 'Name', 'RuntimePath' - prop_args = (arg for prop in properties for arg in ('-p', prop)) - output = Loginctl('show-user', *prop_args, '--value', *user_args).now() - lines = output.splitlines() - # Assuming that for muptiple users, the properties will be separated by - # an empty string. - groups = split_by(lines, '') - for group in groups: - if len(group) != len(properties): - raise RuntimeError(f'invalid `loginctl show-user` output:\n{output}') - yield SystemdUser(int(group[0]), group[1], group[2]) - - return show_users(list_users()) - - -class Request(Enum): - STATUS = 'status' - TOP = 'top' - REBOOT = 'reboot' - POWEROFF = 'poweroff' - - def __str__(self): - return self.value - - @staticmethod - def from_http_path(path): - if not path or path[0] != '/': - raise ValueError('HTTP path must start with a forward slash /') - return Request(path[1:]) - - def process(self): - if self is Request.STATUS: - return Status().complete() - if self is Request.TOP: - return Top().complete() - if self is Request.REBOOT: - return Reboot().complete() - if self is Request.POWEROFF: - return Poweroff().complete() - raise NotImplementedError(f'unknown request: {self}') - - -def process_cgi_request(): - params = cgi.FieldStorage() - what = params['what'].value - Request(what).process().write_as_cgi_script() - - -def main(): - process_cgi_request() - - -if __name__ == '__main__': - main() -- cgit v1.2.3