From a9ae33c4b9dbe566d2ada07affb8b5a135f9b6eb Mon Sep 17 00:00:00 2001 From: Egor Tensin Date: Thu, 28 Dec 2023 01:00:45 +0100 Subject: test/py/ -> test/src/ --- test/py/lib/__init__.py | 0 test/py/lib/db.py | 30 ------ test/py/lib/logging.py | 45 --------- test/py/lib/net.py | 15 --- test/py/lib/process.py | 201 ----------------------------------------- test/py/lib/test_repo.py | 231 ----------------------------------------------- test/py/lib/tests.py | 23 ----- 7 files changed, 545 deletions(-) delete mode 100644 test/py/lib/__init__.py delete mode 100644 test/py/lib/db.py delete mode 100644 test/py/lib/logging.py delete mode 100644 test/py/lib/net.py delete mode 100644 test/py/lib/process.py delete mode 100644 test/py/lib/test_repo.py delete mode 100644 test/py/lib/tests.py (limited to 'test/py/lib') diff --git a/test/py/lib/__init__.py b/test/py/lib/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/test/py/lib/db.py b/test/py/lib/db.py deleted file mode 100644 index de6960a..0000000 --- a/test/py/lib/db.py +++ /dev/null @@ -1,30 +0,0 @@ -# Copyright (c) 2023 Egor Tensin -# This file is part of the "cimple" project. -# For details, see https://github.com/egor-tensin/cimple. -# Distributed under the MIT License. - -from contextlib import closing, contextmanager -import logging -import sqlite3 - - -class Database: - def __init__(self, path): - logging.info('Opening SQLite database: %s', path) - self.conn = sqlite3.connect(f'file:{path}?mode=ro', uri=True) - - def __enter__(self): - return self - - def __exit__(*args): - self.conn.close() - - @contextmanager - def get_cursor(self): - with closing(self.conn.cursor()) as cur: - yield cur - - def get_all_runs(self): - with self.get_cursor() as cur: - cur.execute('SELECT * FROM cimple_runs_view') - return cur.fetchall() diff --git a/test/py/lib/logging.py b/test/py/lib/logging.py deleted file mode 100644 index 663eb3a..0000000 --- a/test/py/lib/logging.py +++ /dev/null @@ -1,45 +0,0 @@ -# Copyright (c) 2023 Egor Tensin -# This file is part of the "cimple" project. -# For details, see https://github.com/egor-tensin/cimple. -# Distributed under the MIT License. - -from contextlib import contextmanager -import logging -import logging.config -import logging.handlers -import multiprocessing as mp - - -@contextmanager -def child_logging_thread(): - # Delegating logging to the parent logger. - ctx = mp.get_context('spawn') - queue = ctx.Queue() - listener = logging.handlers.QueueListener(queue, logging.getLogger()) - listener.start() - try: - yield queue - finally: - listener.stop() - - -@contextmanager -def configure_logging_in_child(queue): - config = { - 'version': 1, - 'handlers': { - 'sink': { - 'class': 'logging.handlers.QueueHandler', - 'queue': queue, - }, - }, - 'root': { - 'handlers': ['sink'], - 'level': 'DEBUG', - }, - } - logging.config.dictConfig(config) - try: - yield - except Exception as e: - logging.exception(e) diff --git a/test/py/lib/net.py b/test/py/lib/net.py deleted file mode 100644 index 06bfda0..0000000 --- a/test/py/lib/net.py +++ /dev/null @@ -1,15 +0,0 @@ -# Copyright (c) 2023 Egor Tensin -# This file is part of the "cimple" project. -# For details, see https://github.com/egor-tensin/cimple. -# Distributed under the MIT License. - -from contextlib import closing -import socket - - -def random_unused_port(): - with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock: - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - sock.bind(('0.0.0.0', 0)) - port = sock.getsockname()[1] - return port diff --git a/test/py/lib/process.py b/test/py/lib/process.py deleted file mode 100644 index 3b1b6b9..0000000 --- a/test/py/lib/process.py +++ /dev/null @@ -1,201 +0,0 @@ -# Copyright (c) 2023 Egor Tensin -# This file is part of the "cimple" project. -# For details, see https://github.com/egor-tensin/cimple. -# Distributed under the MIT License. - -from contextlib import contextmanager -import logging -import os -import shutil -import subprocess -from threading import Event, Lock, Thread - - -class LoggingEvent(Event): - def __init__(self, timeout=10): - self.timeout = timeout - super().__init__() - - def log_line_matches(self, line): - return False - - def wait(self): - if not super().wait(self.timeout): - raise RuntimeError('timed out while waiting for an event') - - -class LoggingThread(Thread): - def __init__(self, process, events=None): - self.process = process - self.events_lock = Lock() - if events is None: - events = [] - self.events = events - - super().__init__(target=lambda: self.process_output_lines()) - self.start() - - def add_event(self, event): - with self.events_lock: - self.events.append(event) - - def process_output_lines(self): - for line in self.process.stdout: - line = line.removesuffix('\n') - logging.info('%s: %s', self.process.log_id, line) - with self.events_lock: - for event in self.events: - if event.is_set(): - continue - if not event.log_line_matches(line): - continue - event.set() - self.events = [event for event in self.events if not event.is_set()] - - -class CmdLine: - @staticmethod - def which(binary): - if os.path.split(binary)[0]: - # shutil.which('bin/bash') doesn't work. - return os.path.abspath(binary) - path = shutil.which(binary) - if path is None: - raise RuntimeError("couldn't find a binary: " + binary) - return path - - @staticmethod - def unbuffered(): - return CmdLine('stdbuf', '-o0') - - def __init__(self, binary, *args, name=None): - binary = self.which(binary) - argv = [binary] + list(args) - - self.binary = binary - self.argv = argv - - if name is None: - name = os.path.basename(binary) - self.process_name = name - - def log_line_means_process_ready(self, line): - return True - - @classmethod - def wrap(cls, outer, inner): - return cls(outer.argv[0], *outer.argv[1:], *inner.argv, name=inner.process_name) - - def run(self, *argv): - return Process.run(*self.argv, *argv) - - def try_run(self, *argv): - return Process.try_run(*self.argv, *argv) - - @contextmanager - def run_async(self, *argv): - with Process(self, *argv) as process: - yield process - - -class LoggingEventProcessReady(LoggingEvent): - def __init__(self, process): - self.process = process - super().__init__() - - def set(self): - logging.info('Process %s is ready', self.process.log_id) - super().set() - - def log_line_matches(self, line): - return self.process.cmd_line.log_line_means_process_ready(line) - - -class Process(subprocess.Popen): - _COMMON_ARGS = { - 'text': True, - 'stdin': subprocess.DEVNULL, - 'stdout': subprocess.PIPE, - 'stderr': subprocess.STDOUT, - } - - @staticmethod - def _log_process_start(argv): - logging.info('Executing command: %s', argv) - - @staticmethod - def _log_process_end(argv, ec, output): - log = logging.info - if ec: - log = logging.error - if ec: - log('Command %s exited with code %s', argv, ec) - else: - log('Command %s completed successfully', argv) - if output: - log('Output:\n%s', output) - - @staticmethod - def run(*args, **kwargs): - argv = list(args) - Process._log_process_start(argv) - try: - result = subprocess.run(argv, check=True, **Process._COMMON_ARGS, **kwargs) - ec, output = result.returncode, result.stdout - Process._log_process_end(argv, ec, output) - return output - except subprocess.CalledProcessError as e: - ec, output = e.returncode, e.stdout - Process._log_process_end(argv, ec, output) - raise - - @staticmethod - def try_run(*args, **kwargs): - try: - return 0, Process.run(*args, **kwargs) - except subprocess.CalledProcessError as e: - return e.returncode, e.stdout - - def __init__(self, cmd_line, *args): - self.cmd_line = cmd_line - - argv = cmd_line.argv + list(args) - self._log_process_start(argv) - - super().__init__(argv, **Process._COMMON_ARGS) - logging.info('Process %s has started', self.log_id) - - ready_event = LoggingEventProcessReady(self) - self.logger = LoggingThread(self, [ready_event]) - ready_event.wait() - - @property - def log_id(self): - return f'{self.pid}/{self.cmd_line.process_name}' - - def __exit__(self, *args): - try: - self.shut_down() - self.logger.join() - except Exception as e: - logging.exception(e) - # Postpone closing the pipes until after the logging thread is finished - # so that it doesn't attempt to read from closed descriptors. - super().__exit__(*args) - - SHUT_DOWN_TIMEOUT_SEC = 10 - - def shut_down(self): - ec = self.poll() - if ec is not None: - return - logging.info('Terminating process %s', self.log_id) - self.terminate() - try: - self.wait(timeout=Process.SHUT_DOWN_TIMEOUT_SEC) - return - except subprocess.TimeoutExpired: - pass - logging.info('Process %s failed to terminate in time, killing it', self.log_id) - self.kill() - self.wait(timeout=Process.SHUT_DOWN_TIMEOUT_SEC) diff --git a/test/py/lib/test_repo.py b/test/py/lib/test_repo.py deleted file mode 100644 index 1d07a4e..0000000 --- a/test/py/lib/test_repo.py +++ /dev/null @@ -1,231 +0,0 @@ -# Copyright (c) 2023 Egor Tensin -# This file is part of the "cimple" project. -# For details, see https://github.com/egor-tensin/cimple. -# Distributed under the MIT License. - -import abc -import base64 -import logging -import os -import random -import shlex -import shutil - -from .process import Process - - -class Repo: - BRANCH = 'main' - - def __init__(self, path): - self.path = os.path.abspath(path) - os.makedirs(path, exist_ok=True) - self.run('git', 'init', '-q', f'--initial-branch={Repo.BRANCH}') - self.run('git', 'config', 'user.name', 'Test User') - self.run('git', 'config', 'user.email', 'test@example.com') - - def run(self, *args, **kwargs): - Process.run(*args, cwd=self.path, **kwargs) - - -CI_SCRIPT = r'''#!/usr/bin/env bash -set -o errexit -o nounset -o pipefail -readonly runs_dir={runs_dir} -readonly run_output_template=run_XXXXXX -run_output_path="$( mktemp --tmpdir="$runs_dir" "$run_output_template" )" -touch -- "$run_output_path" -''' - - -class TestRepo(Repo): - # Prevent Pytest from discovering test cases in this class: - __test__ = False - - def __init__(self, path, ci_script='ci'): - super().__init__(path) - - self.runs_dir = os.path.join(self.path, 'runs') - os.makedirs(self.runs_dir, exist_ok=True) - - self.ci_script_path = os.path.join(self.path, ci_script) - - self.write_ci_script() - self.run('git', 'add', '--', ci_script) - self.run('git', 'commit', '-q', '-m', 'add CI script') - - @staticmethod - @abc.abstractmethod - def codename(): - pass - - @abc.abstractmethod - def run_exit_code_matches(self, ec): - pass - - @abc.abstractmethod - def run_output_matches(self, output): - pass - - def write_ci_script(self): - with open(self.ci_script_path, mode='x') as f: - f.write(self.format_ci_script()) - os.chmod(self.ci_script_path, 0o755) - - def format_ci_script(self): - runs_dir = shlex.quote(self.runs_dir) - return CI_SCRIPT.format(runs_dir=runs_dir) - - def _count_run_files(self): - return len([name for name in os.listdir(self.runs_dir) if os.path.isfile(os.path.join(self.runs_dir, name))]) - - def run_files_are_present(self, expected): - assert expected == self._count_run_files() - - -class TestRepoOutput(TestRepo, abc.ABC): - __test__ = False - - OUTPUT_SCRIPT_NAME = 'generate-output' - - def __init__(self, path): - self.output_script_path = os.path.join(path, TestRepoOutput.OUTPUT_SCRIPT_NAME) - super().__init__(path) - - self.write_output_script() - self.run('git', 'add', '--', TestRepoOutput.OUTPUT_SCRIPT_NAME) - self.run('git', 'commit', '-q', '-m', 'add output script') - - def format_ci_script(self): - script = super().format_ci_script() - added = r'{output_script} | tee -a "$run_output_path"'.format( - output_script=shlex.quote(self.output_script_path)) - return f'{script}\n{added}\n' - - def write_output_script(self): - with open(self.output_script_path, mode='x') as f: - f.write(self.format_output_script()) - os.chmod(self.output_script_path, 0o755) - - @abc.abstractmethod - def format_output_script(self): - pass - - def run_exit_code_matches(self, ec): - return ec == 0 - - -OUTPUT_SCRIPT_SIMPLE = r'''#!/bin/sh -e -timestamp="$( date --iso-8601=ns )" -echo "A CI run happened at $timestamp" -''' - - -class TestRepoOutputSimple(TestRepoOutput): - __test__ = False - - @staticmethod - def codename(): - return 'output_simple' - - def format_output_script(self): - return OUTPUT_SCRIPT_SIMPLE - - def run_output_matches(self, output): - return output.decode().startswith('A CI run happened at ') - - -OUTPUT_SCRIPT_EMPTY = r'''#!/bin/sh -''' - - -class TestRepoOutputEmpty(TestRepoOutput): - __test__ = False - - @staticmethod - def codename(): - return 'output_empty' - - def format_output_script(self): - return OUTPUT_SCRIPT_EMPTY - - def run_output_matches(self, output): - return len(output) == 0 - - -OUTPUT_SCRIPT_LONG = r'''#!/bin/sh -e -dd if=/dev/urandom count={output_len_kb} bs=1024 | base64 -''' - - -class TestRepoOutputLong(TestRepoOutput): - __test__ = False - - OUTPUT_LEN_KB = 300 - - @staticmethod - def codename(): - return 'output_long' - - def format_output_script(self): - output_len = TestRepoOutputLong.OUTPUT_LEN_KB - output_len = shlex.quote(str(output_len)) - return OUTPUT_SCRIPT_LONG.format(output_len_kb=output_len) - - def run_output_matches(self, output): - return len(output) > TestRepoOutputLong.OUTPUT_LEN_KB * 1024 - - -OUTPUT_SCRIPT_NULL = r'''#!/usr/bin/env python3 -output = {output} -import sys -sys.stdout.buffer.write(output) -sys.exit(-2) -''' - - -class TestRepoOutputNull(TestRepoOutput): - __test__ = False - - OUTPUT = b'123\x00456\x00789' - - def __init__(self, *args, **kwargs): - assert len(TestRepoOutputNull.OUTPUT) == 11 - self.output = TestRepoOutputNull.OUTPUT - super().__init__(*args, **kwargs) - - @staticmethod - def codename(): - return 'output_null' - - def format_output_script(self): - return OUTPUT_SCRIPT_NULL.format(output=repr(self.output)) - - def run_exit_code_matches(self, ec): - return ec == 254 - - def run_output_matches(self, output): - return output == self.output - - -class TestRepoSegfault(TestRepo): - __test__ = False - - def __init__(self, repo_path, prog_path): - self.prog_path = prog_path - super().__init__(repo_path) - - @staticmethod - def codename(): - return 'segfault' - - def write_ci_script(self): - shutil.copy(self.prog_path, self.ci_script_path) - - def run_exit_code_matches(self, ec): - return ec == -11 - - def run_output_matches(self, output): - return "Started the test program.\n" == output.decode() - - def run_files_are_present(self, *args): - return True diff --git a/test/py/lib/tests.py b/test/py/lib/tests.py deleted file mode 100644 index 7ccedef..0000000 --- a/test/py/lib/tests.py +++ /dev/null @@ -1,23 +0,0 @@ -# Copyright (c) 2023 Egor Tensin -# This file is part of the "cimple" project. -# For details, see https://github.com/egor-tensin/cimple. -# Distributed under the MIT License. - -import pytest - - -# Reference: https://github.com/pytest-dev/pytest/issues/3628 -# Automatic generation of readable test IDs. -def my_parametrize(names, values, ids=None, **kwargs): - _names = names.split(',') if isinstance(names, str) else names - if not ids: - if len(_names) == 1: - ids = [f'{names}={v}' for v in values] - else: - _values = [combination.values if hasattr(combination, 'values') else combination - for combination in values] - ids = [ - '-'.join(f'{k}={v}' for k, v in zip(_names, combination)) - for combination in _values - ] - return pytest.mark.parametrize(names, values, ids=ids, **kwargs) -- cgit v1.2.3