aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/app.py
diff options
context:
space:
mode:
authorEgor Tensin <Egor.Tensin@gmail.com>2022-01-25 02:36:46 +0300
committerEgor Tensin <Egor.Tensin@gmail.com>2022-01-25 02:36:46 +0300
commit8d71b8b95ee96d3727ddac9c270977c6f90732de (patch)
treee4b255483253e2247986efcbf90358947ee5d951 /app.py
parentdebian: 1.1.0-1 (diff)
parentworkflows/ci: run tests as root also (diff)
downloadlinux-status-8d71b8b95ee96d3727ddac9c270977c6f90732de.tar.gz
linux-status-8d71b8b95ee96d3727ddac9c270977c6f90732de.zip
Merge tag 'v1.2' into debian
Diffstat (limited to '')
-rwxr-xr-xapp.py72
1 files changed, 50 insertions, 22 deletions
diff --git a/app.py b/app.py
index 1ae9760..dd3edfc 100755
--- a/app.py
+++ b/app.py
@@ -70,18 +70,23 @@ def hostname():
class Response:
- def __init__(self, data):
- self.data = data
+ DEFAULT_STATUS = http.server.HTTPStatus.OK
+
+ @staticmethod
+ def body_from_json(body):
+ return json.dumps(body, ensure_ascii=False, indent=4)
+
+ def __init__(self, body, status=None):
+ if status is None:
+ status = Response.DEFAULT_STATUS
+ self.status = status
+ self.body = body
def headers(self):
yield 'Content-Type', 'text/html; charset=utf-8'
- @staticmethod
- def dump_json(data):
- return json.dumps(data, ensure_ascii=False, indent=4)
-
- def body(self):
- return self.dump_json(self.data)
+ def encode_body(self):
+ return self.body.encode(errors='replace')
def write_as_cgi_script(self):
self.write_headers_as_cgi_script()
@@ -93,26 +98,31 @@ class Response:
print()
def write_body_as_cgi_script(self):
- if self.data is not None:
- print(self.body())
+ if self.body is not None:
+ print(self.body)
- def write_as_request_handler(self, handler):
- handler.send_response(http.server.HTTPStatus.OK)
- self.write_headers_as_request_handler(handler)
- self.write_body_as_request_handler(handler)
+ def write_to_request_handler(self, handler):
+ handler.send_response(self.status)
+ self.write_headers_to_request_handler(handler)
+ self.write_body_to_request_handler(handler)
- def write_headers_as_request_handler(self, handler):
+ def write_headers_to_request_handler(self, handler):
for name, val in self.headers():
handler.send_header(name, val)
handler.end_headers()
- def write_body_as_request_handler(self, handler):
- if self.data is not None:
- handler.wfile.write(self.body().encode(errors='replace'))
+ def write_body_to_request_handler(self, handler):
+ if self.body is not None:
+ handler.wfile.write(self.encode_body())
def run_do(*args, **kwargs):
output = subprocess.run(args, stdin=DEVNULL, stdout=PIPE, stderr=STDOUT, universal_newlines=True, **kwargs)
+ # Include the output in the exception's message:
+ try:
+ output.check_returncode()
+ except Exception as e:
+ raise RuntimeError("Command's output was this:\n" + output.stdout) from e
return output.stdout
@@ -195,7 +205,7 @@ class Loginctl(Systemd):
class Task(abc.ABC):
def complete(self):
self.run()
- return Response(self.result())
+ return Response(Response.body_from_json(self.result()))
@abc.abstractmethod
def run(self):
@@ -301,7 +311,10 @@ class UserInstanceStatusTaskList(Task):
self.tasks = {user.name: UserInstanceStatusTask.su(user) for user in systemd_users()}
else:
# As a regular user, we can only query ourselves.
- self.tasks = {get_current_user().name: UserInstanceStatusTask()}
+ self.tasks = {}
+ user = get_current_user()
+ if user_instance_active(user):
+ self.tasks[user.name] = UserInstanceStatusTask()
def run(self):
for task in self.tasks.values():
@@ -353,6 +366,19 @@ def get_current_user():
return User(entry.pw_uid, entry.pw_name)
+def user_instance_active(user):
+ # I'm pretty sure this is the way to determine if the user instance is
+ # running?
+ # Source: https://www.freedesktop.org/software/systemd/man/user@.service.html
+ unit_name = f'user@{user.uid}.service'
+ cmd = Systemctl.system('is-active', unit_name, '--quiet')
+ try:
+ cmd.run().result()
+ return True
+ except Exception:
+ return False
+
+
# A pitiful attempt to find a list of possibly-systemd-enabled users follows
# (i.e. users that might be running a per-user systemd instance).
# I don't know of a better way than probing /run/user/UID.
@@ -394,16 +420,18 @@ def systemd_users():
for line in lines:
# This assumes user names cannot contain spaces.
# loginctl list-users output must be in the UID NAME format.
- info = line.split(' ', 2)
+ info = line.lstrip().split(' ', 2)
if len(info) < 2:
raise RuntimeError(f'invalid `loginctl list-users` output:\n{output}')
uid, user = info[0], info[1]
yield User(uid, user)
def show_users(users):
+ user_args = [user.name for user in users]
+ if not user_args:
+ return None
properties = 'UID', 'Name', 'RuntimePath'
prop_args = (arg for prop in properties for arg in ('-p', prop))
- user_args = (user.name for user in users)
output = Loginctl('show-user', *prop_args, '--value', *user_args).run().result()
lines = output.splitlines()
# Assuming that for muptiple users, the properties will be separated by