diff options
Diffstat (limited to 'src/app.py')
-rwxr-xr-x | src/app.py | 37 |
1 files changed, 32 insertions, 5 deletions
@@ -40,12 +40,10 @@ # no more. import abc -import cgi from collections import namedtuple from concurrent.futures import ThreadPoolExecutor from enum import Enum from http import HTTPStatus -import http.server import json import os import pwd @@ -53,6 +51,8 @@ import shlex import socket import subprocess from subprocess import DEVNULL, PIPE, STDOUT +import traceback +import urllib.parse def split_by(xs, sep): @@ -510,6 +510,17 @@ def systemd_users(): return show_users(list_users()) +def cgi_one_value(params, name, default=None): + values = params.get(name, []) + if not values: + if default is None: + raise ValueError(f'must have at least one value: {name}') + return default + if len(values) > 1: + raise ValueError(f'multiple values are not supported: {name}') + return values[0] + + class Request(Enum): STATUS = 'status' TOP = 'top' @@ -525,22 +536,38 @@ class Request(Enum): raise ValueError('HTTP path must start with a forward slash /') return Request(path[1:]) + @staticmethod + def from_query_string(qs): + params = urllib.parse.parse_qs(qs, strict_parsing=True) + request = cgi_one_value(params, 'what') + request = Request(request) + request.disable_power = int(cgi_one_value(params, 'disable_power', '0')) + return request + def process(self): if self is Request.STATUS: return Status().complete() if self is Request.TOP: return Top().complete() + + if self in [Request.REBOOT, Request.POWEROFF] and self.disable_power: + return Response(None, HTTPStatus.FORBIDDEN) if self is Request.REBOOT: return Reboot().complete() if self is Request.POWEROFF: return Poweroff().complete() + raise NotImplementedError(f'unknown request: {self}') def process_cgi_request(): - params = cgi.FieldStorage() - what = params['what'].value - Request(what).process().write_as_cgi_script() + try: + request = Request.from_query_string(os.environ['QUERY_STRING']) + request.process().write_as_cgi_script() + except: + status = HTTPStatus.INTERNAL_SERVER_ERROR + response = Response(traceback.format_exc(), status) + response.write_as_cgi_script() def main(): |