From c0068e035ca825579e8d41f60d3864eeb6df0c88 Mon Sep 17 00:00:00 2001 From: Egor Tensin Date: Wed, 3 Mar 2021 23:25:52 +0300 Subject: no more CGI The server.py script now launches a web server and handles all the requests internally, without delegating anything to external scripts. --- app.py | 404 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 404 insertions(+) create mode 100755 app.py (limited to 'app.py') diff --git a/app.py b/app.py new file mode 100755 index 0000000..e852561 --- /dev/null +++ b/app.py @@ -0,0 +1,404 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2021 Egor Tensin +# This file is part of the "linux-status" project. +# For details, see https://github.com/egor-tensin/linux-status. +# Distributed under the MIT License. + +import abc +import cgi +from collections import namedtuple +from concurrent.futures import ThreadPoolExecutor +from enum import Enum +import http.server +import json +import os +import pwd +import shlex +import socket +import subprocess +from subprocess import DEVNULL, PIPE, STDOUT + + +def split_by(xs, sep): + group = [] + for x in xs: + if x == sep: + yield group + group = [] + else: + group.append(x) + yield group + + +def hostname(): + return socket.gethostname() + + +class Response: + def __init__(self, data): + self.data = data + + def headers(self): + yield 'Content-Type', 'text/html; charset=utf-8' + + @staticmethod + def dump_json(data): + return json.dumps(data, ensure_ascii=False) + + def body(self): + return self.dump_json(self.data) + + def write_as_cgi_script(self): + self.write_headers_as_cgi_script() + self.write_body_as_cgi_script() + + def write_headers_as_cgi_script(self): + for name, val in self.headers(): + print(f'{name}: {val}') + print() + + def write_body_as_cgi_script(self): + if self.data is not None: + print(self.body()) + + def write_as_request_handler(self, handler): + handler.send_response(http.server.HTTPStatus.OK) + self.write_headers_as_request_handler(handler) + self.write_body_as_request_handler(handler) + + def write_headers_as_request_handler(self, handler): + for name, val in self.headers(): + handler.send_header(name, val) + handler.end_headers() + + def write_body_as_request_handler(self, handler): + if self.data is not None: + handler.wfile.write(self.body().encode(errors='replace')) + + +def run_do(*args, **kwargs): + output = subprocess.run(args, stdin=DEVNULL, stdout=PIPE, stderr=STDOUT, universal_newlines=True, **kwargs) + return output.stdout + + +pool = ThreadPoolExecutor() + + +def run(*args, **kwargs): + return pool.submit(run_do, *args, **kwargs) + + +class Command: + def __init__(self, *args): + self.args = args + self.env = None + + def run(self): + return run(*self.args, env=self.env) + + +class Systemd(Command): + def __init__(self, *args): + super().__init__(*args) + self.env = self.make_env() + + @staticmethod + def make_env(): + env = os.environ.copy() + env['SYSTEMD_PAGER'] = '' + env['SYSTEMD_COLORS'] = 'no' + return env + + @staticmethod + def su(user, cmd): + new = Systemd('su', '-c', shlex.join(cmd.args), user.name) + new.env = Systemd.fix_su_env(user, cmd.env.copy()) + return new + + @staticmethod + def fix_su_env(user, env): + # https://unix.stackexchange.com/q/483948 + # https://unix.stackexchange.com/q/346841 + # https://unix.stackexchange.com/q/423632 + # https://unix.stackexchange.com/q/245768 + # https://unix.stackexchange.com/q/434494 + env['XDG_RUNTIME_DIR'] = user.runtime_dir + # I'm not sure the bus part works everywhere. + bus_path = os.path.join(user.runtime_dir, 'bus') + env['DBUS_SESSION_BUS_ADDRESS'] = 'unix:path=' + bus_path + return env + + +class Systemctl(Systemd): + def __init__(self, *args): + super().__init__('systemctl', *args, '--full') + + @staticmethod + def system(*args): + return Systemctl('--system', *args) + + @staticmethod + def user(*args): + return Systemctl('--user', *args) + + +class Loginctl(Systemd): + def __init__(self, *args): + super().__init__('loginctl', *args) + + +class Task(abc.ABC): + def complete(self): + self.run() + return Response(self.result()) + + @abc.abstractmethod + def run(self): + pass + + @abc.abstractmethod + def result(self): + pass + + +class TopTask(Task): + def __init__(self): + self.cmd = Command('top', '-b', '-n', '1', '-w', '512') + + def run(self): + self.task = self.cmd.run() + + def result(self): + return self.task.result() + + +class RebootTask(Task): + def __init__(self): + self.cmd = Command('systemctl', 'reboot') + + def run(self): + self.task = self.cmd.run() + + def result(self): + return self.task.result() + + +class PoweroffTask(Task): + def __init__(self): + self.cmd = Command('systemctl', 'poweroff') + + def run(self): + self.task = self.cmd.run() + + def result(self): + return self.task.result() + + +class InstanceStatusTask(Task): + def __init__(self, systemctl): + self.overview_cmd = systemctl('status') + self.failed_cmd = systemctl('list-units', '--failed') + self.timers_cmd = systemctl('list-timers', '--all') + + def run(self): + self.overview_task = self.overview_cmd.run() + self.failed_task = self.failed_cmd.run() + self.timers_task = self.timers_cmd.run() + + def result(self): + return { + 'overview': self.overview_task.result(), + 'failed': self.failed_task.result(), + 'timers': self.timers_task.result(), + } + + +class SystemInstanceStatusTask(InstanceStatusTask): + def __init__(self): + super().__init__(Systemctl.system) + + +class UserInstanceStatusTask(InstanceStatusTask): + def __init__(self, systemctl=Systemctl.user): + super().__init__(systemctl) + + @staticmethod + def su(user): + systemctl = lambda *args: Systemd.su(user.name, Systemctl.user(*args)) + return UserInstanceStatusTask(systemctl) + + +class UserInstanceStatusTaskList(Task): + def __init__(self): + if running_as_root(): + # As root, we can query all the user instances. + self.tasks = {user.name: UserInstanceStatusTask.su(user) for user in systemd_users()} + else: + # As a regular user, we can only query ourselves. + self.tasks = {get_current_user().name: UserInstanceStatusTask()} + + def run(self): + for task in self.tasks.values(): + task.run() + + def result(self): + return {name: task.result() for name, task in self.tasks.items()} + + +class StatusTask(Task): + def __init__(self): + self.hostname = hostname() + self.top = TopTask() + self.system = SystemInstanceStatusTask() + self.user = UserInstanceStatusTaskList() + + def run(self): + self.top.run() + self.system.run() + self.user.run() + + def result(self): + return { + 'hostname': self.hostname, + 'top': self.top.result(), + 'system': self.system.result(), + 'user': self.user.result(), + } + + +class TimersTask(StatusTask): + # TODO: I'm going to remove the timers-only endpoint completely. + pass + + +User = namedtuple('User', ['uid', 'name']) +SystemdUser = namedtuple('SystemdUser', ['uid', 'name', 'runtime_dir']) + + +def running_as_root(): + # AFAIK, Python's http.server drops root privileges and executes the scripts + # as user nobody. + # It does no such thing if run as a regular user though. + return os.geteuid() == 0 or running_as_nobody() + + +def running_as_nobody(): + for user in users(): + if user.name == 'nobody': + return user.uid == os.geteuid() + return False + + +def get_current_user(): + uid = os.getuid() + entry = pwd.getpwuid(uid) + return User(entry.pw_uid, entry.pw_name) + + +# A pitiful attempt to find a list of possibly-systemd-enabled users follows +# (i.e. users that might be running a per-user systemd instance). +# I don't know of a better way than probing /run/user/UID. +# Maybe running something like `loginctl list-sessions` would be better? + +# These are the default values, see [1]. +# The actual values are specified in /etc/login.defs. +# Still, they can be overridden on the command line, so I don't think there +# actually is a way to list non-system users (imma call them "human"). +# +# [1]: https://man7.org/linux/man-pages/man8/useradd.8.html +UID_MIN = 1000 +UID_MAX = 60000 + + +def users(): + for entry in pwd.getpwall(): + yield User(entry.pw_uid, entry.pw_name) + + +def human_users(): + for user in users(): + if user.uid < UID_MIN: + continue + if user.uid > UID_MAX: + continue + yield user + + +# You know what, loginctl might just be the answer. +# Namely, `loginctl list-users`: during my testing, it did only list the users +# that were running a systemd instance. +def systemd_users(): + def list_users(): + output = Loginctl('list-users', '--no-legend').run().result() + lines = output.splitlines() + if not lines: + return + for line in lines: + # This assumes user names cannot contain spaces. + # loginctl list-users output must be in the UID NAME format. + info = line.split(' ', 2) + if len(info) < 2: + raise RuntimeError(f'invalid `loginctl list-users` output:\n{output}') + uid, user = info[0], info[1] + yield User(uid, user) + + def show_users(users): + properties = 'UID', 'Name', 'RuntimePath' + prop_args = (arg for prop in properties for arg in ('-p', prop)) + user_args = (user.name for user in users) + output = Loginctl('show-user', *prop_args, '--value', *user_args).run().result() + lines = output.splitlines() + # Assuming that for muptiple users, the properties will be separated by + # an empty string. + groups = split_by(lines, '') + for group in groups: + if len(group) != len(properties): + raise RuntimeError(f'invalid `loginctl show-user` output:\n{output}') + yield SystemdUser(int(group[0]), group[1], group[2]) + + return show_users(list_users()) + + +class Request(Enum): + STATUS = 'status' + TIMERS = 'timers' + TOP = 'top' + REBOOT = 'reboot' + POWEROFF = 'poweroff' + + def __str__(self): + return self.value + + @staticmethod + def from_http_path(path): + if not path or path[0] != '/': + raise ValueError('HTTP path must start with a forward slash /') + return Request(path[1:]) + + def process(self): + if self is Request.STATUS: + return StatusTask().complete() + if self is Request.TIMERS: + return TimersTask().complete() + if self is Request.TOP: + return TopTask().complete() + if self is Request.REBOOT: + return RebootTask().complete() + if self is Request.POWEROFF: + return PoweroffTask().complete() + raise NotImplementedError(f'unknown request: {self}') + + +def process_cgi_request(): + params = cgi.FieldStorage() + what = params['what'].value + Request(what).process().write_as_cgi_script() + + +def main(): + process_cgi_request() + + +if __name__ == '__main__': + main() -- cgit v1.2.3