diff options
author | Egor Tensin <Egor.Tensin@gmail.com> | 2022-01-25 02:36:46 +0300 |
---|---|---|
committer | Egor Tensin <Egor.Tensin@gmail.com> | 2022-01-25 02:36:46 +0300 |
commit | 8d71b8b95ee96d3727ddac9c270977c6f90732de (patch) | |
tree | e4b255483253e2247986efcbf90358947ee5d951 /app.py | |
parent | debian: 1.1.0-1 (diff) | |
parent | workflows/ci: run tests as root also (diff) | |
download | linux-status-8d71b8b95ee96d3727ddac9c270977c6f90732de.tar.gz linux-status-8d71b8b95ee96d3727ddac9c270977c6f90732de.zip |
Merge tag 'v1.2' into debian
Diffstat (limited to 'app.py')
-rwxr-xr-x | app.py | 72 |
1 files changed, 50 insertions, 22 deletions
@@ -70,18 +70,23 @@ def hostname(): class Response: - def __init__(self, data): - self.data = data + DEFAULT_STATUS = http.server.HTTPStatus.OK + + @staticmethod + def body_from_json(body): + return json.dumps(body, ensure_ascii=False, indent=4) + + def __init__(self, body, status=None): + if status is None: + status = Response.DEFAULT_STATUS + self.status = status + self.body = body def headers(self): yield 'Content-Type', 'text/html; charset=utf-8' - @staticmethod - def dump_json(data): - return json.dumps(data, ensure_ascii=False, indent=4) - - def body(self): - return self.dump_json(self.data) + def encode_body(self): + return self.body.encode(errors='replace') def write_as_cgi_script(self): self.write_headers_as_cgi_script() @@ -93,26 +98,31 @@ class Response: print() def write_body_as_cgi_script(self): - if self.data is not None: - print(self.body()) + if self.body is not None: + print(self.body) - def write_as_request_handler(self, handler): - handler.send_response(http.server.HTTPStatus.OK) - self.write_headers_as_request_handler(handler) - self.write_body_as_request_handler(handler) + def write_to_request_handler(self, handler): + handler.send_response(self.status) + self.write_headers_to_request_handler(handler) + self.write_body_to_request_handler(handler) - def write_headers_as_request_handler(self, handler): + def write_headers_to_request_handler(self, handler): for name, val in self.headers(): handler.send_header(name, val) handler.end_headers() - def write_body_as_request_handler(self, handler): - if self.data is not None: - handler.wfile.write(self.body().encode(errors='replace')) + def write_body_to_request_handler(self, handler): + if self.body is not None: + handler.wfile.write(self.encode_body()) def run_do(*args, **kwargs): output = subprocess.run(args, stdin=DEVNULL, stdout=PIPE, stderr=STDOUT, universal_newlines=True, **kwargs) + # Include the output in the exception's message: + try: + output.check_returncode() + except Exception as e: + raise RuntimeError("Command's output was this:\n" + output.stdout) from e return output.stdout @@ -195,7 +205,7 @@ class Loginctl(Systemd): class Task(abc.ABC): def complete(self): self.run() - return Response(self.result()) + return Response(Response.body_from_json(self.result())) @abc.abstractmethod def run(self): @@ -301,7 +311,10 @@ class UserInstanceStatusTaskList(Task): self.tasks = {user.name: UserInstanceStatusTask.su(user) for user in systemd_users()} else: # As a regular user, we can only query ourselves. - self.tasks = {get_current_user().name: UserInstanceStatusTask()} + self.tasks = {} + user = get_current_user() + if user_instance_active(user): + self.tasks[user.name] = UserInstanceStatusTask() def run(self): for task in self.tasks.values(): @@ -353,6 +366,19 @@ def get_current_user(): return User(entry.pw_uid, entry.pw_name) +def user_instance_active(user): + # I'm pretty sure this is the way to determine if the user instance is + # running? + # Source: https://www.freedesktop.org/software/systemd/man/user@.service.html + unit_name = f'user@{user.uid}.service' + cmd = Systemctl.system('is-active', unit_name, '--quiet') + try: + cmd.run().result() + return True + except Exception: + return False + + # A pitiful attempt to find a list of possibly-systemd-enabled users follows # (i.e. users that might be running a per-user systemd instance). # I don't know of a better way than probing /run/user/UID. @@ -394,16 +420,18 @@ def systemd_users(): for line in lines: # This assumes user names cannot contain spaces. # loginctl list-users output must be in the UID NAME format. - info = line.split(' ', 2) + info = line.lstrip().split(' ', 2) if len(info) < 2: raise RuntimeError(f'invalid `loginctl list-users` output:\n{output}') uid, user = info[0], info[1] yield User(uid, user) def show_users(users): + user_args = [user.name for user in users] + if not user_args: + return None properties = 'UID', 'Name', 'RuntimePath' prop_args = (arg for prop in properties for arg in ('-p', prop)) - user_args = (user.name for user in users) output = Loginctl('show-user', *prop_args, '--value', *user_args).run().result() lines = output.splitlines() # Assuming that for muptiple users, the properties will be separated by |