aboutsummaryrefslogblamecommitdiffstatshomepage
path: root/test/lib/process.py
blob: bdfe3c30cdeae2d5d1a6968bf98d998524b1c470 (plain) (tree)
1
2
3
4
5
6
7
8
9







                                                         
             
                 
                                   









                                


                                
                                  
 


                                                              


                               
                            

                                          
                                                             
                                                                                                  

                                                                        
 
 













                                                                   

                            
                        
 



                                           
                                         















                                                                                                        
                                                       
                                                           
                                         


                     
                                                         



                              







                                                                               







                                                           
                                                            


                                         
                                                                                       
                   
                                                        

 



















                                                       
 





                                                              


                                          


                         
                    






                                                      
 

                                                  
 

                                                        
# Copyright (c) 2023 Egor Tensin <Egor.Tensin@gmail.com>
# This file is part of the "cimple" project.
# For details, see https://github.com/egor-tensin/cimple.
# Distributed under the MIT License.

from contextlib import contextmanager
import logging
import os
import shutil
import subprocess
from threading import Event, Thread


_COMMON_ARGS = {
    'text': True,
    'stdin': subprocess.DEVNULL,
    'stdout': subprocess.PIPE,
    'stderr': subprocess.STDOUT,
}


class LoggingThread(Thread):
    def __init__(self, process):
        self.process = process
        self.ready_event = Event()

        target = lambda pipe: self.consume(pipe)
        super().__init__(target=target, args=[process.stdout])

        self.start()
        self.ready_event.wait()

    def consume(self, pipe):
        for line in pipe:
            line = line.removesuffix('\n')
            logging.info('%s: %s', self.process.log_id, line)
            if not self.ready_event.is_set() and self.process.cmd_line.log_line_means_ready(line):
                logging.info('Process %s is ready', self.process.log_id)
                self.ready_event.set()


class CmdLine:
    @staticmethod
    def which(binary):
        if os.path.split(binary)[0]:
            # shutil.which('bin/bash') doesn't work.
            return os.path.abspath(binary)
        path = shutil.which(binary)
        if path is None:
            raise RuntimeError("couldn't find a binary: " + binary)
        return path

    def __init__(self, binary, *args, name=None):
        binary = self.which(binary)
        argv = [binary] + list(args)

        self.binary = binary
        self.argv = argv

        if name is None:
            name = os.path.basename(binary)
        self.process_name = name

    def log_line_means_ready(self, line):
        return True

    @classmethod
    def wrap(cls, outer, inner):
        return cls(outer.argv[0], *outer.argv[1:], *inner.argv, name=inner.process_name)

    def log_process_start(self):
        if len(self.argv) > 1:
            logging.info('Executing binary %s with arguments: %s', self.binary, ' '.join(self.argv[1:]))
        else:
            logging.info('Executing binary %s', self.binary)


class Process(subprocess.Popen):
    def __init__(self, cmd_line):
        self.cmd_line = cmd_line
        super().__init__(cmd_line.argv, **_COMMON_ARGS)
        logging.info('Process %s has started', self.log_id)
        self.logger = LoggingThread(self)

    @property
    def log_id(self):
        return f'{self.pid}/{self.cmd_line.process_name}'

    def __exit__(self, *args):
        try:
            self.shut_down()
            self.logger.join()
        except Exception as e:
            logging.exception(e)
        # Postpone closing the pipes until after the logging thread is finished
        # so that it doesn't attempt to read from closed descriptors.
        super().__exit__(*args)

    SHUT_DOWN_TIMEOUT_SEC = 3

    def shut_down(self):
        ec = self.poll()
        if ec is not None:
            return
        logging.info('Terminating process %s', self.log_id)
        self.terminate()
        try:
            self.wait(timeout=Process.SHUT_DOWN_TIMEOUT_SEC)
            return
        except subprocess.TimeoutExpired:
            pass
        logging.info('Process %s failed to terminate in time, killing it', self.log_id)
        self.kill()
        self.wait(timeout=Process.SHUT_DOWN_TIMEOUT_SEC)


class Runner:
    @staticmethod
    def unbuffered():
        return CmdLine('stdbuf', '-o0')

    def __init__(self):
        self.wrappers = []
        self.add_wrapper(self.unbuffered())

    def add_wrapper(self, cmd_line):
        self.wrappers.append(cmd_line)

    def _wrap(self, cmd_line):
        for wrapper in self.wrappers:
            cmd_line = cmd_line.wrap(wrapper, cmd_line)
        return cmd_line

    def run(self, cmd_line):
        cmd_line = self._wrap(cmd_line)
        cmd_line.log_process_start()

        result = subprocess.run(cmd_line.argv, **_COMMON_ARGS)
        return result.returncode, result.stdout

    @contextmanager
    def run_async(self, cmd_line):
        cmd_line = self._wrap(cmd_line)
        cmd_line.log_process_start()

        with Process(cmd_line) as process:
            yield process


class CmdLineRunner:
    def __init__(self, runner, binary, *args):
        self.runner = runner
        self.binary = binary
        self.args = list(args)

    def _build(self, *args):
        return CmdLine(self.binary, *self.args, *args)

    def run(self, *args):
        return self.runner.run(self._build(*args))

    def run_async(self, *args):
        return self.runner.run_async(self._build(*args))