aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/app.py
blob: 82a1616315a01f76fb7ddd1c534c659f0959eb32 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
#!/usr/bin/env python3

# Copyright (c) 2022 Egor Tensin <Egor.Tensin@gmail.com>
# This file is part of the "void" project.
# For details, see https://github.com/egor-tensin/void.
# Distributed under the MIT License.

import argparse
import cgi
from enum import Enum
import http.server
import os
import sys
from threading import Lock


class Response:
    DEFAULT_STATUS = http.server.HTTPStatus.OK

    def __init__(self, body, status=None):
        if status is None:
            status = Response.DEFAULT_STATUS
        self.status = status
        self.body = body

    def headers(self):
        yield 'Content-Type', 'text/html; charset=utf-8'

    def encode_body(self):
        return self.body.encode(errors='replace')

    def write_as_cgi_script(self):
        self.write_headers_as_cgi_script()
        self.write_body_as_cgi_script()

    def write_headers_as_cgi_script(self):
        for name, val in self.headers():
            print(f'{name}: {val}')
        print()

    def write_body_as_cgi_script(self):
        if self.body is not None:
            print(self.body)

    def write_to_request_handler(self, handler):
        handler.send_response(self.status)
        self.write_headers_to_request_handler(handler)
        self.write_body_to_request_handler(handler)

    def write_headers_to_request_handler(self, handler):
        for name, val in self.headers():
            handler.send_header(name, val)
        handler.end_headers()

    def write_body_to_request_handler(self, handler):
        if self.body is not None:
            handler.wfile.write(self.encode_body())


class Void:
    def __init__(self, backup_path):
        self.lck = Lock()
        self.cnt = 0
        self.backup_path = backup_path
        if backup_path is not None:
            self.backup_path = os.path.abspath(backup_path)

    def __enter__(self):
        if self.backup_path is not None and os.path.exists(self.backup_path):
            self.restore(self.backup_path)
        return self

    def __exit__(self, type, value, traceback):
        if self.backup_path is not None:
            os.makedirs(os.path.dirname(self.backup_path), exist_ok=True)
            self.save(self.backup_path)

    def write(self, fd):
        with self.lck:
            cnt = self.cnt
        fd.write(str(cnt))

    def read(self, fd):
        src = fd.read()
        cnt = int(src)
        with self.lck:
            self.cnt = cnt

    def save(self, path):
        with open(path, 'w') as fd:
            self.write(fd)

    def restore(self, path):
        with open(path) as fd:
            self.read(fd)

    def scream_once(self):
        with self.lck:
            self.cnt += 1
            cnt = self.cnt
        return Response(str(cnt))

    def get_numof_screams(self):
        with self.lck:
            cnt = self.cnt
        return Response(str(cnt))


class Request(Enum):
    SCREAM_ONCE = 'scream'
    HOW_MANY_SCREAMS = 'screams'

    def __str__(self):
        return self.value

    @staticmethod
    def from_http_path(path):
        if not path or path[0] != '/':
            raise ValueError('HTTP path must start with a forward slash /')
        return Request(path[1:])

    def process(self, void):
        if self is Request.SCREAM_ONCE:
            return void.scream_once()
        if self is Request.HOW_MANY_SCREAMS:
            return void.get_numof_screams()
        raise NotImplementedError(f'unknown request: {self}')


def process_cgi_request(void):
    params = cgi.FieldStorage()
    what = params['what'].value
    Request(what).process(void).write_as_cgi_script()


def parse_args(args=None):
    if args is None:
        args = sys.argv[1:]
    parser = argparse.ArgumentParser()
    parser.add_argument('-v', '--void', metavar='PATH', dest='backup',
                        help='set path to the void')
    return parser.parse_args(args)


def main(args=None):
    args = parse_args(args)
    with Void(args.backup) as void:
        process_cgi_request(void)


if __name__ == '__main__':
    main()