aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/src/app.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rwxr-xr-xsrc/app.py (renamed from app.py)251
1 files changed, 157 insertions, 94 deletions
diff --git a/app.py b/src/app.py
index dd3edfc..5af55af 100755
--- a/app.py
+++ b/src/app.py
@@ -65,10 +65,6 @@ def split_by(xs, sep):
yield group
-def hostname():
- return socket.gethostname()
-
-
class Response:
DEFAULT_STATUS = http.server.HTTPStatus.OK
@@ -126,6 +122,37 @@ def run_do(*args, **kwargs):
return output.stdout
+class Task(abc.ABC):
+ def complete(self):
+ self.run()
+ return Response(Response.body_from_json(self.result()))
+
+ @abc.abstractmethod
+ def run(self):
+ pass
+
+ @abc.abstractmethod
+ def result(self):
+ pass
+
+
+class TaskList(Task):
+ def __init__(self, tasks):
+ self.tasks = tasks
+
+ def add(self, name, task):
+ if name in self.tasks:
+ raise RuntimeError(f'duplicate task name: {name}')
+ self.tasks[name] = task
+
+ def run(self):
+ for task in self.tasks.values():
+ task.run()
+
+ def result(self):
+ return {name: task.result() for name, task in self.tasks.items()}
+
+
pool = ThreadPoolExecutor()
@@ -133,13 +160,20 @@ def run(*args, **kwargs):
return pool.submit(run_do, *args, **kwargs)
-class Command:
+class Command(Task):
def __init__(self, *args):
self.args = args
self.env = None
def run(self):
- return run(*self.args, env=self.env)
+ self.task = run(*self.args, env=self.env)
+ return self.task
+
+ def result(self):
+ return self.task.result()
+
+ def now(self):
+ return self.run().result()
class Systemd(Command):
@@ -202,31 +236,95 @@ class Loginctl(Systemd):
super().__init__('loginctl', *args, '--full')
-class Task(abc.ABC):
- def complete(self):
- self.run()
- return Response(Response.body_from_json(self.result()))
+class Docker(Command):
+ def __init__(self, *args):
+ super().__init__('docker', *args)
- @abc.abstractmethod
- def run(self):
- pass
- @abc.abstractmethod
- def result(self):
- pass
+class DockerVersion(Docker):
+ def __init__(self, *args):
+ super().__init__('version')
+
+ @staticmethod
+ def is_daemon_running():
+ try:
+ DockerVersion().now()
+ return True
+ except:
+ return False
-class TopTask(Task):
- COMMAND = None
+class DockerPs(Docker):
+ def __init__(self, *args):
+ super().__init__('ps', *args)
+ @staticmethod
+ def quiet(*args):
+ return DockerPs('--quiet', *args)
+
+ @staticmethod
+ def get_all_ids():
+ cmd = DockerPs.quiet('--all')
+ return cmd.now().splitlines()
+
+
+class DockerInspect(Docker):
+ # This is pretty cool. I wanted to separate container entries with \0, and
+ # this is the best I could come up with. Note that a newline still gets
+ # added at the end.
+ FORMAT = '{{printf "%s%c" (json .) 0}}'
+
+ def __init__(self, *args):
+ super().__init__('inspect', f'--format={DockerInspect.FORMAT}', *args)
+
+
+class DockerStatus(DockerInspect):
def __init__(self):
- self.cmd = TopTask.get_command()
+ self.containers = DockerPs.get_all_ids()
+ super().__init__(*self.containers)
def run(self):
- self.task = self.cmd.run()
+ if not self.containers:
+ # `docker inspect` requires at least one container argument.
+ return ''
+ return super().run()
def result(self):
- return self.task.result()
+ if not self.containers:
+ # `docker inspect` requires at least one container argument.
+ return []
+ result = super().result()
+ result = result.split('\0')
+ result = [json.loads(info) for info in result if info.strip()]
+ result = [DockerStatus.filter_info(info) for info in result]
+ return result
+
+ @staticmethod
+ def filter_info(info):
+ assert info['Name'][0] == '/'
+ return {
+ 'exit_code': info['State']['ExitCode'],
+ 'health': info['State'].get('Health', {}).get('Status', None),
+ 'image': info['Config']['Image'],
+ # Strip the leading /:
+ 'name': info['Name'][1:],
+ 'started_at': info['State']['StartedAt'],
+ 'status': info['State']['Status'],
+ }
+
+class Hostname(Task):
+ def run(self):
+ pass
+
+ def result(self):
+ return socket.gethostname()
+
+
+class Top(Command):
+ COMMAND = None
+
+ def __init__(self):
+ super().__init__(*Top.get_command())
@staticmethod
def get_command():
@@ -234,66 +332,45 @@ class TopTask(Task):
# from the command line (another option is the rc file, but that's too
# complicated). For that, we simply run `top -h` once, and check if
# the output contains the flags we want to use.
- if TopTask.COMMAND is not None:
- return TopTask.COMMAND
+ if Top.COMMAND is not None:
+ return Top.COMMAND
help_output = run_do('top', '-h')
args = ['top', '-b', '-n', '1', '-w', '512']
if 'Ee' in help_output:
args += ['-E', 'm', '-e', 'm']
- TopTask.COMMAND = Command(*args)
- return TopTask.COMMAND
+ Top.COMMAND = args
+ return Top.COMMAND
-class RebootTask(Task):
+class Reboot(Command):
def __init__(self):
- self.cmd = Command('systemctl', 'reboot')
-
- def run(self):
- self.task = self.cmd.run()
-
- def result(self):
- return self.task.result()
+ super().__init__('systemctl', 'reboot')
-class PoweroffTask(Task):
+class Poweroff(Command):
def __init__(self):
- self.cmd = Command('systemctl', 'poweroff')
-
- def run(self):
- self.task = self.cmd.run()
-
- def result(self):
- return self.task.result()
+ super().__init__('systemctl', 'poweroff')
-class InstanceStatusTask(Task):
+class InstanceStatus(TaskList):
def __init__(self, systemctl, journalctl):
- self.overview_cmd = systemctl('status')
- self.failed_cmd = systemctl('list-units', '--failed')
- self.timers_cmd = systemctl('list-timers', '--all')
- self.journal_cmd = journalctl('-b', '--lines=20')
-
- def run(self):
- self.overview_task = self.overview_cmd.run()
- self.failed_task = self.failed_cmd.run()
- self.timers_task = self.timers_cmd.run()
- self.journal_task = self.journal_cmd.run()
-
- def result(self):
- return {
- 'overview': self.overview_task.result(),
- 'failed': self.failed_task.result(),
- 'timers': self.timers_task.result(),
- 'journal': self.journal_task.result(),
+ tasks = {
+ 'overview': systemctl('status'),
+ 'failed': systemctl('list-units', '--failed'),
+ 'timers': systemctl('list-timers', '--all'),
+ 'journal': journalctl('-b', '--lines=20'),
}
+ super().__init__(tasks)
-class SystemInstanceStatusTask(InstanceStatusTask):
+class SystemStatus(InstanceStatus):
def __init__(self):
super().__init__(Systemctl.system, Journalctl.system)
+ if DockerVersion.is_daemon_running():
+ self.add('docker', DockerStatus())
-class UserInstanceStatusTask(InstanceStatusTask):
+class UserStatus(InstanceStatus):
def __init__(self, systemctl=Systemctl.user, journalctl=Journalctl.user):
super().__init__(systemctl, journalctl)
@@ -301,45 +378,31 @@ class UserInstanceStatusTask(InstanceStatusTask):
def su(user):
systemctl = lambda *args: Systemd.su(user, Systemctl.user(*args))
journalctl = lambda *args: Systemd.su(user, Journalctl.user(*args))
- return UserInstanceStatusTask(systemctl, journalctl)
+ return UserStatus(systemctl, journalctl)
-class UserInstanceStatusTaskList(Task):
+class UserStatusList(TaskList):
def __init__(self):
if running_as_root():
# As root, we can query all the user instances.
- self.tasks = {user.name: UserInstanceStatusTask.su(user) for user in systemd_users()}
+ tasks = {user.name: UserStatus.su(user) for user in systemd_users()}
else:
# As a regular user, we can only query ourselves.
- self.tasks = {}
+ tasks = {}
user = get_current_user()
if user_instance_active(user):
- self.tasks[user.name] = UserInstanceStatusTask()
+ tasks[user.name] = UserStatus()
+ super().__init__(tasks)
- def run(self):
- for task in self.tasks.values():
- task.run()
-
- def result(self):
- return {name: task.result() for name, task in self.tasks.items()}
-
-class StatusTask(Task):
+class Status(TaskList):
def __init__(self):
- self.hostname = hostname()
- self.system = SystemInstanceStatusTask()
- self.user = UserInstanceStatusTaskList()
-
- def run(self):
- self.system.run()
- self.user.run()
-
- def result(self):
- return {
- 'hostname': self.hostname,
- 'system': self.system.result(),
- 'user': self.user.result(),
+ tasks = {
+ 'hostname': Hostname(),
+ 'system': SystemStatus(),
+ 'user': UserStatusList(),
}
+ super().__init__(tasks)
User = namedtuple('User', ['uid', 'name'])
@@ -373,7 +436,7 @@ def user_instance_active(user):
unit_name = f'user@{user.uid}.service'
cmd = Systemctl.system('is-active', unit_name, '--quiet')
try:
- cmd.run().result()
+ cmd.now()
return True
except Exception:
return False
@@ -413,7 +476,7 @@ def human_users():
# that were running a systemd instance.
def systemd_users():
def list_users():
- output = Loginctl('list-users', '--no-legend').run().result()
+ output = Loginctl('list-users', '--no-legend').now()
lines = output.splitlines()
if not lines:
return
@@ -432,7 +495,7 @@ def systemd_users():
return None
properties = 'UID', 'Name', 'RuntimePath'
prop_args = (arg for prop in properties for arg in ('-p', prop))
- output = Loginctl('show-user', *prop_args, '--value', *user_args).run().result()
+ output = Loginctl('show-user', *prop_args, '--value', *user_args).now()
lines = output.splitlines()
# Assuming that for muptiple users, the properties will be separated by
# an empty string.
@@ -462,13 +525,13 @@ class Request(Enum):
def process(self):
if self is Request.STATUS:
- return StatusTask().complete()
+ return Status().complete()
if self is Request.TOP:
- return TopTask().complete()
+ return Top().complete()
if self is Request.REBOOT:
- return RebootTask().complete()
+ return Reboot().complete()
if self is Request.POWEROFF:
- return PoweroffTask().complete()
+ return Poweroff().complete()
raise NotImplementedError(f'unknown request: {self}')