From c0068e035ca825579e8d41f60d3864eeb6df0c88 Mon Sep 17 00:00:00 2001 From: Egor Tensin Date: Wed, 3 Mar 2021 23:25:52 +0300 Subject: no more CGI The server.py script now launches a web server and handles all the requests internally, without delegating anything to external scripts. --- cgi-bin/get.py | 404 --------------------------------------------------------- cgi-bin/get.sh | 17 --- 2 files changed, 421 deletions(-) delete mode 100755 cgi-bin/get.py delete mode 100755 cgi-bin/get.sh (limited to 'cgi-bin') diff --git a/cgi-bin/get.py b/cgi-bin/get.py deleted file mode 100755 index e852561..0000000 --- a/cgi-bin/get.py +++ /dev/null @@ -1,404 +0,0 @@ -#!/usr/bin/env python3 - -# Copyright (c) 2021 Egor Tensin -# This file is part of the "linux-status" project. -# For details, see https://github.com/egor-tensin/linux-status. -# Distributed under the MIT License. - -import abc -import cgi -from collections import namedtuple -from concurrent.futures import ThreadPoolExecutor -from enum import Enum -import http.server -import json -import os -import pwd -import shlex -import socket -import subprocess -from subprocess import DEVNULL, PIPE, STDOUT - - -def split_by(xs, sep): - group = [] - for x in xs: - if x == sep: - yield group - group = [] - else: - group.append(x) - yield group - - -def hostname(): - return socket.gethostname() - - -class Response: - def __init__(self, data): - self.data = data - - def headers(self): - yield 'Content-Type', 'text/html; charset=utf-8' - - @staticmethod - def dump_json(data): - return json.dumps(data, ensure_ascii=False) - - def body(self): - return self.dump_json(self.data) - - def write_as_cgi_script(self): - self.write_headers_as_cgi_script() - self.write_body_as_cgi_script() - - def write_headers_as_cgi_script(self): - for name, val in self.headers(): - print(f'{name}: {val}') - print() - - def write_body_as_cgi_script(self): - if self.data is not None: - print(self.body()) - - def write_as_request_handler(self, handler): - handler.send_response(http.server.HTTPStatus.OK) - self.write_headers_as_request_handler(handler) - self.write_body_as_request_handler(handler) - - def write_headers_as_request_handler(self, handler): - for name, val in self.headers(): - handler.send_header(name, val) - handler.end_headers() - - def write_body_as_request_handler(self, handler): - if self.data is not None: - handler.wfile.write(self.body().encode(errors='replace')) - - -def run_do(*args, **kwargs): - output = subprocess.run(args, stdin=DEVNULL, stdout=PIPE, stderr=STDOUT, universal_newlines=True, **kwargs) - return output.stdout - - -pool = ThreadPoolExecutor() - - -def run(*args, **kwargs): - return pool.submit(run_do, *args, **kwargs) - - -class Command: - def __init__(self, *args): - self.args = args - self.env = None - - def run(self): - return run(*self.args, env=self.env) - - -class Systemd(Command): - def __init__(self, *args): - super().__init__(*args) - self.env = self.make_env() - - @staticmethod - def make_env(): - env = os.environ.copy() - env['SYSTEMD_PAGER'] = '' - env['SYSTEMD_COLORS'] = 'no' - return env - - @staticmethod - def su(user, cmd): - new = Systemd('su', '-c', shlex.join(cmd.args), user.name) - new.env = Systemd.fix_su_env(user, cmd.env.copy()) - return new - - @staticmethod - def fix_su_env(user, env): - # https://unix.stackexchange.com/q/483948 - # https://unix.stackexchange.com/q/346841 - # https://unix.stackexchange.com/q/423632 - # https://unix.stackexchange.com/q/245768 - # https://unix.stackexchange.com/q/434494 - env['XDG_RUNTIME_DIR'] = user.runtime_dir - # I'm not sure the bus part works everywhere. - bus_path = os.path.join(user.runtime_dir, 'bus') - env['DBUS_SESSION_BUS_ADDRESS'] = 'unix:path=' + bus_path - return env - - -class Systemctl(Systemd): - def __init__(self, *args): - super().__init__('systemctl', *args, '--full') - - @staticmethod - def system(*args): - return Systemctl('--system', *args) - - @staticmethod - def user(*args): - return Systemctl('--user', *args) - - -class Loginctl(Systemd): - def __init__(self, *args): - super().__init__('loginctl', *args) - - -class Task(abc.ABC): - def complete(self): - self.run() - return Response(self.result()) - - @abc.abstractmethod - def run(self): - pass - - @abc.abstractmethod - def result(self): - pass - - -class TopTask(Task): - def __init__(self): - self.cmd = Command('top', '-b', '-n', '1', '-w', '512') - - def run(self): - self.task = self.cmd.run() - - def result(self): - return self.task.result() - - -class RebootTask(Task): - def __init__(self): - self.cmd = Command('systemctl', 'reboot') - - def run(self): - self.task = self.cmd.run() - - def result(self): - return self.task.result() - - -class PoweroffTask(Task): - def __init__(self): - self.cmd = Command('systemctl', 'poweroff') - - def run(self): - self.task = self.cmd.run() - - def result(self): - return self.task.result() - - -class InstanceStatusTask(Task): - def __init__(self, systemctl): - self.overview_cmd = systemctl('status') - self.failed_cmd = systemctl('list-units', '--failed') - self.timers_cmd = systemctl('list-timers', '--all') - - def run(self): - self.overview_task = self.overview_cmd.run() - self.failed_task = self.failed_cmd.run() - self.timers_task = self.timers_cmd.run() - - def result(self): - return { - 'overview': self.overview_task.result(), - 'failed': self.failed_task.result(), - 'timers': self.timers_task.result(), - } - - -class SystemInstanceStatusTask(InstanceStatusTask): - def __init__(self): - super().__init__(Systemctl.system) - - -class UserInstanceStatusTask(InstanceStatusTask): - def __init__(self, systemctl=Systemctl.user): - super().__init__(systemctl) - - @staticmethod - def su(user): - systemctl = lambda *args: Systemd.su(user.name, Systemctl.user(*args)) - return UserInstanceStatusTask(systemctl) - - -class UserInstanceStatusTaskList(Task): - def __init__(self): - if running_as_root(): - # As root, we can query all the user instances. - self.tasks = {user.name: UserInstanceStatusTask.su(user) for user in systemd_users()} - else: - # As a regular user, we can only query ourselves. - self.tasks = {get_current_user().name: UserInstanceStatusTask()} - - def run(self): - for task in self.tasks.values(): - task.run() - - def result(self): - return {name: task.result() for name, task in self.tasks.items()} - - -class StatusTask(Task): - def __init__(self): - self.hostname = hostname() - self.top = TopTask() - self.system = SystemInstanceStatusTask() - self.user = UserInstanceStatusTaskList() - - def run(self): - self.top.run() - self.system.run() - self.user.run() - - def result(self): - return { - 'hostname': self.hostname, - 'top': self.top.result(), - 'system': self.system.result(), - 'user': self.user.result(), - } - - -class TimersTask(StatusTask): - # TODO: I'm going to remove the timers-only endpoint completely. - pass - - -User = namedtuple('User', ['uid', 'name']) -SystemdUser = namedtuple('SystemdUser', ['uid', 'name', 'runtime_dir']) - - -def running_as_root(): - # AFAIK, Python's http.server drops root privileges and executes the scripts - # as user nobody. - # It does no such thing if run as a regular user though. - return os.geteuid() == 0 or running_as_nobody() - - -def running_as_nobody(): - for user in users(): - if user.name == 'nobody': - return user.uid == os.geteuid() - return False - - -def get_current_user(): - uid = os.getuid() - entry = pwd.getpwuid(uid) - return User(entry.pw_uid, entry.pw_name) - - -# A pitiful attempt to find a list of possibly-systemd-enabled users follows -# (i.e. users that might be running a per-user systemd instance). -# I don't know of a better way than probing /run/user/UID. -# Maybe running something like `loginctl list-sessions` would be better? - -# These are the default values, see [1]. -# The actual values are specified in /etc/login.defs. -# Still, they can be overridden on the command line, so I don't think there -# actually is a way to list non-system users (imma call them "human"). -# -# [1]: https://man7.org/linux/man-pages/man8/useradd.8.html -UID_MIN = 1000 -UID_MAX = 60000 - - -def users(): - for entry in pwd.getpwall(): - yield User(entry.pw_uid, entry.pw_name) - - -def human_users(): - for user in users(): - if user.uid < UID_MIN: - continue - if user.uid > UID_MAX: - continue - yield user - - -# You know what, loginctl might just be the answer. -# Namely, `loginctl list-users`: during my testing, it did only list the users -# that were running a systemd instance. -def systemd_users(): - def list_users(): - output = Loginctl('list-users', '--no-legend').run().result() - lines = output.splitlines() - if not lines: - return - for line in lines: - # This assumes user names cannot contain spaces. - # loginctl list-users output must be in the UID NAME format. - info = line.split(' ', 2) - if len(info) < 2: - raise RuntimeError(f'invalid `loginctl list-users` output:\n{output}') - uid, user = info[0], info[1] - yield User(uid, user) - - def show_users(users): - properties = 'UID', 'Name', 'RuntimePath' - prop_args = (arg for prop in properties for arg in ('-p', prop)) - user_args = (user.name for user in users) - output = Loginctl('show-user', *prop_args, '--value', *user_args).run().result() - lines = output.splitlines() - # Assuming that for muptiple users, the properties will be separated by - # an empty string. - groups = split_by(lines, '') - for group in groups: - if len(group) != len(properties): - raise RuntimeError(f'invalid `loginctl show-user` output:\n{output}') - yield SystemdUser(int(group[0]), group[1], group[2]) - - return show_users(list_users()) - - -class Request(Enum): - STATUS = 'status' - TIMERS = 'timers' - TOP = 'top' - REBOOT = 'reboot' - POWEROFF = 'poweroff' - - def __str__(self): - return self.value - - @staticmethod - def from_http_path(path): - if not path or path[0] != '/': - raise ValueError('HTTP path must start with a forward slash /') - return Request(path[1:]) - - def process(self): - if self is Request.STATUS: - return StatusTask().complete() - if self is Request.TIMERS: - return TimersTask().complete() - if self is Request.TOP: - return TopTask().complete() - if self is Request.REBOOT: - return RebootTask().complete() - if self is Request.POWEROFF: - return PoweroffTask().complete() - raise NotImplementedError(f'unknown request: {self}') - - -def process_cgi_request(): - params = cgi.FieldStorage() - what = params['what'].value - Request(what).process().write_as_cgi_script() - - -def main(): - process_cgi_request() - - -if __name__ == '__main__': - main() diff --git a/cgi-bin/get.sh b/cgi-bin/get.sh deleted file mode 100755 index ea4f4ce..0000000 --- a/cgi-bin/get.sh +++ /dev/null @@ -1,17 +0,0 @@ -#!/usr/bin/env bash - -set -o errexit -o nounset -o pipefail - -script_dir="$( dirname -- "${BASH_SOURCE[0]}" )" -script_dir="$( cd -- "$script_dir" && pwd )" -readonly script_dir - -# Python's http.server runs CGI scripts under user nobody. -# This is not what we want unfortunately. -# The best solution I could find so far is to create an entry in -# /etc/sudoers.d, allowing the nobody user to run the real scripts w/ sudo. -if [ "$( id --user --name )" == nobody ]; then - sudo --non-interactive --preserve-env "$script_dir/get.py" -else - "$script_dir/get.py" -fi -- cgit v1.2.3