From 607c31bd7695defb1350a54e0d1b1f3dd2adf194 Mon Sep 17 00:00:00 2001 From: Egor Tensin Date: Fri, 11 Mar 2022 21:15:01 +0500 Subject: add app.py & server.py Quite obviously inspired (a.k.a. copied from) my other project, linux-status. The same goes for test/. --- app.py | 152 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 152 insertions(+) create mode 100755 app.py (limited to 'app.py') diff --git a/app.py b/app.py new file mode 100755 index 0000000..82a1616 --- /dev/null +++ b/app.py @@ -0,0 +1,152 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2022 Egor Tensin +# This file is part of the "void" project. +# For details, see https://github.com/egor-tensin/void. +# Distributed under the MIT License. + +import argparse +import cgi +from enum import Enum +import http.server +import os +import sys +from threading import Lock + + +class Response: + DEFAULT_STATUS = http.server.HTTPStatus.OK + + def __init__(self, body, status=None): + if status is None: + status = Response.DEFAULT_STATUS + self.status = status + self.body = body + + def headers(self): + yield 'Content-Type', 'text/html; charset=utf-8' + + def encode_body(self): + return self.body.encode(errors='replace') + + def write_as_cgi_script(self): + self.write_headers_as_cgi_script() + self.write_body_as_cgi_script() + + def write_headers_as_cgi_script(self): + for name, val in self.headers(): + print(f'{name}: {val}') + print() + + def write_body_as_cgi_script(self): + if self.body is not None: + print(self.body) + + def write_to_request_handler(self, handler): + handler.send_response(self.status) + self.write_headers_to_request_handler(handler) + self.write_body_to_request_handler(handler) + + def write_headers_to_request_handler(self, handler): + for name, val in self.headers(): + handler.send_header(name, val) + handler.end_headers() + + def write_body_to_request_handler(self, handler): + if self.body is not None: + handler.wfile.write(self.encode_body()) + + +class Void: + def __init__(self, backup_path): + self.lck = Lock() + self.cnt = 0 + self.backup_path = backup_path + if backup_path is not None: + self.backup_path = os.path.abspath(backup_path) + + def __enter__(self): + if self.backup_path is not None and os.path.exists(self.backup_path): + self.restore(self.backup_path) + return self + + def __exit__(self, type, value, traceback): + if self.backup_path is not None: + os.makedirs(os.path.dirname(self.backup_path), exist_ok=True) + self.save(self.backup_path) + + def write(self, fd): + with self.lck: + cnt = self.cnt + fd.write(str(cnt)) + + def read(self, fd): + src = fd.read() + cnt = int(src) + with self.lck: + self.cnt = cnt + + def save(self, path): + with open(path, 'w') as fd: + self.write(fd) + + def restore(self, path): + with open(path) as fd: + self.read(fd) + + def scream_once(self): + with self.lck: + self.cnt += 1 + cnt = self.cnt + return Response(str(cnt)) + + def get_numof_screams(self): + with self.lck: + cnt = self.cnt + return Response(str(cnt)) + + +class Request(Enum): + SCREAM_ONCE = 'scream' + HOW_MANY_SCREAMS = 'screams' + + def __str__(self): + return self.value + + @staticmethod + def from_http_path(path): + if not path or path[0] != '/': + raise ValueError('HTTP path must start with a forward slash /') + return Request(path[1:]) + + def process(self, void): + if self is Request.SCREAM_ONCE: + return void.scream_once() + if self is Request.HOW_MANY_SCREAMS: + return void.get_numof_screams() + raise NotImplementedError(f'unknown request: {self}') + + +def process_cgi_request(void): + params = cgi.FieldStorage() + what = params['what'].value + Request(what).process(void).write_as_cgi_script() + + +def parse_args(args=None): + if args is None: + args = sys.argv[1:] + parser = argparse.ArgumentParser() + parser.add_argument('-v', '--void', metavar='PATH', dest='backup', + help='set path to the void') + return parser.parse_args(args) + + +def main(args=None): + args = parse_args(args) + with Void(args.backup) as void: + process_cgi_request(void) + + +if __name__ == '__main__': + main() -- cgit v1.2.3