# Copyright (c) 2023 Egor Tensin # This file is part of the "cimple" project. # For details, see https://github.com/egor-tensin/cimple. # Distributed under the MIT License. from contextlib import contextmanager import logging import os import shutil import subprocess from threading import Event, Thread _COMMON_ARGS = { 'text': True, 'stdin': subprocess.DEVNULL, 'stdout': subprocess.PIPE, 'stderr': subprocess.STDOUT, } class LoggingThread(Thread): def __init__(self, process): self.process = process self.ready_event = Event() target = lambda pipe: self.consume(pipe) super().__init__(target=target, args=[process.stdout]) self.start() self.ready_event.wait() def consume(self, pipe): for line in pipe: line = line.removesuffix('\n') logging.info('%s: %s', self.process.log_id, line) if not self.ready_event.is_set() and self.process.cmd_line.log_line_means_ready(line): logging.info('Process %s is ready', self.process.log_id) self.ready_event.set() class CmdLine: @staticmethod def which(binary): if os.path.split(binary)[0]: # shutil.which('bin/bash') doesn't work. return os.path.abspath(binary) path = shutil.which(binary) if path is None: raise RuntimeError("couldn't find a binary: " + binary) return path def __init__(self, binary, *args, name=None): binary = self.which(binary) argv = [binary] + list(args) self.binary = binary self.argv = argv if name is None: name = os.path.basename(binary) self.process_name = name def log_line_means_ready(self, line): return True @classmethod def wrap(cls, outer, inner): return cls(outer.argv[0], *outer.argv[1:], *inner.argv, name=inner.process_name) def log_process_start(self): if len(self.argv) > 1: logging.info('Executing binary %s with arguments: %s', self.binary, ' '.join(self.argv[1:])) else: logging.info('Executing binary %s', self.binary) class Process(subprocess.Popen): def __init__(self, cmd_line): self.cmd_line = cmd_line super().__init__(cmd_line.argv, **_COMMON_ARGS) logging.info('Process %s has started', self.log_id) self.logger = LoggingThread(self) @property def log_id(self): return f'{self.pid}/{self.cmd_line.process_name}' def __exit__(self, *args): try: self.shut_down() self.logger.join() except Exception as e: logging.exception(e) # Postpone closing the pipes until after the logging thread is finished # so that it doesn't attempt to read from closed descriptors. super().__exit__(*args) SHUT_DOWN_TIMEOUT_SEC = 3 def shut_down(self): ec = self.poll() if ec is not None: return logging.info('Terminating process %s', self.log_id) self.terminate() try: self.wait(timeout=Process.SHUT_DOWN_TIMEOUT_SEC) return except subprocess.TimeoutExpired: pass logging.info('Process %s failed to terminate in time, killing it', self.log_id) self.kill() self.wait(timeout=Process.SHUT_DOWN_TIMEOUT_SEC) class Runner: @staticmethod def unbuffered(): return CmdLine('stdbuf', '-o0') def __init__(self): self.wrappers = [] self.add_wrapper(self.unbuffered()) def add_wrapper(self, cmd_line): self.wrappers.append(cmd_line) def _wrap(self, cmd_line): for wrapper in self.wrappers: cmd_line = cmd_line.wrap(wrapper, cmd_line) return cmd_line def run(self, cmd_line): cmd_line = self._wrap(cmd_line) cmd_line.log_process_start() result = subprocess.run(cmd_line.argv, **_COMMON_ARGS) return result.returncode, result.stdout @contextmanager def run_async(self, cmd_line): cmd_line = self._wrap(cmd_line) cmd_line.log_process_start() with Process(cmd_line) as process: yield process class CmdLineRunner: def __init__(self, runner, binary, *args): self.runner = runner self.binary = binary self.args = list(args) def _build(self, *args): return CmdLine(self.binary, *self.args, *args) def run(self, *args): return self.runner.run(self._build(*args)) def run_async(self, *args): return self.runner.run_async(self._build(*args))