From a9ae33c4b9dbe566d2ada07affb8b5a135f9b6eb Mon Sep 17 00:00:00 2001 From: Egor Tensin Date: Thu, 28 Dec 2023 01:00:45 +0100 Subject: test/py/ -> test/src/ --- test/src/conftest.py | 232 ++++++++++++++++++++++++++++++++++++++++++++++ test/src/lib/__init__.py | 0 test/src/lib/db.py | 30 ++++++ test/src/lib/logging.py | 45 +++++++++ test/src/lib/net.py | 15 +++ test/src/lib/process.py | 201 +++++++++++++++++++++++++++++++++++++++ test/src/lib/test_repo.py | 231 +++++++++++++++++++++++++++++++++++++++++++++ test/src/lib/tests.py | 23 +++++ test/src/test_basic.py | 69 ++++++++++++++ test/src/test_repo.py | 110 ++++++++++++++++++++++ 10 files changed, 956 insertions(+) create mode 100644 test/src/conftest.py create mode 100644 test/src/lib/__init__.py create mode 100644 test/src/lib/db.py create mode 100644 test/src/lib/logging.py create mode 100644 test/src/lib/net.py create mode 100644 test/src/lib/process.py create mode 100644 test/src/lib/test_repo.py create mode 100644 test/src/lib/tests.py create mode 100644 test/src/test_basic.py create mode 100644 test/src/test_repo.py (limited to 'test/src') diff --git a/test/src/conftest.py b/test/src/conftest.py new file mode 100644 index 0000000..6c34f4d --- /dev/null +++ b/test/src/conftest.py @@ -0,0 +1,232 @@ +# Copyright (c) 2023 Egor Tensin +# This file is part of the "cimple" project. +# For details, see https://github.com/egor-tensin/cimple. +# Distributed under the MIT License. + +from collections import namedtuple +import logging +import os + +from pytest import fixture + +from lib import test_repo as repo +from lib.db import Database +from lib.net import random_unused_port +from lib.process import CmdLine + + +class Param: + def __init__(self, codename, help_string, required=True): + self.codename = codename + self.help_string = help_string + self.required = required + + @property + def cmd_line(self): + return f"--{self.codename.replace('_', '-')}" + + def add_to_parser(self, parser): + parser.addoption(self.cmd_line, required=self.required, help=self.help_string) + + +PARAMS = [ + Param(f'{name}', f'cimple-{name} binary path') + for name in ('server', 'worker', 'client') +] +PARAMS += [ + Param('sigsegv', 'sigsegv binary path'), + Param('project_version', 'project version'), + Param('valgrind', 'path to valgrind.sh', required=False), + Param('flamegraph', 'path to flamegraph.sh', required=False), + Param('flame_graphs_dir', 'directory to store flame graphs', required=False), +] + + +def pytest_addoption(parser): + for opt in PARAMS: + opt.add_to_parser(parser) + + +class Params: + def __init__(self, pytestconfig): + for opt in PARAMS: + setattr(self, opt.codename, None) + for opt in PARAMS: + path = pytestconfig.getoption(opt.codename) + if path is None: + continue + logging.info("'%s' parameter value: %s", opt.codename, path) + setattr(self, opt.codename, path) + + +@fixture(scope='session') +def params(pytestconfig): + return Params(pytestconfig) + + +class CmdLineValgrind(CmdLine): + def __init__(self, binary): + # Signal to Valgrind that ci scripts should obviously be exempt from + # memory leak checking: + super().__init__(binary, '--trace-children-skip=*/ci', '--') + + +@fixture(scope='session') +def base_cmd_line(params): + cmd_line = CmdLine.unbuffered() + valgrind = params.valgrind + if valgrind is not None: + cmd_line = CmdLine.wrap(CmdLineValgrind(valgrind), cmd_line) + return cmd_line + + +@fixture(scope='session') +def version(params): + return params.project_version + + +@fixture(scope='session') +def server_port(): + return str(random_unused_port()) + + +@fixture +def sqlite_path(tmp_path): + return os.path.join(tmp_path, 'cimple.sqlite') + + +@fixture +def sqlite_db(server, sqlite_path): + return Database(sqlite_path) + + +class CmdLineServer(CmdLine): + def log_line_means_process_ready(self, line): + return line.endswith('Waiting for new connections') + + +class CmdLineWorker(CmdLine): + def log_line_means_process_ready(self, line): + return line.endswith('Waiting for a new command') + + +@fixture +def server_exe(params): + return CmdLineServer(params.server) + + +@fixture +def worker_exe(params): + return CmdLineWorker(params.worker) + + +@fixture +def client_exe(params): + return CmdLine(params.client) + + +@fixture +def server_cmd(base_cmd_line, params, server_port, sqlite_path): + args = ['--port', server_port, '--sqlite', sqlite_path] + return CmdLineServer.wrap(base_cmd_line, CmdLine(params.server, *args)) + + +@fixture +def worker_cmd(base_cmd_line, params, server_port): + args = ['--host', '127.0.0.1', '--port', server_port] + return CmdLineWorker.wrap(base_cmd_line, CmdLine(params.worker, *args)) + + +@fixture +def client(base_cmd_line, params, server_port): + args = ['--host', '127.0.0.1', '--port', server_port] + return CmdLine.wrap(base_cmd_line, CmdLine(params.client, *args)) + + +@fixture +def sigsegv(params): + return CmdLine(params.sigsegv) + + +@fixture +def server(server_cmd): + with server_cmd.run_async() as server: + yield server + assert server.returncode == 0 + + +@fixture +def workers(worker_cmd): + with worker_cmd.run_async() as worker1, \ + worker_cmd.run_async() as worker2: + yield [worker1, worker2] + assert worker1.returncode == 0 + assert worker2.returncode == 0 + + +@fixture +def flame_graph_svg(params, tmp_path, flame_graph_repo): + dir = params.flame_graphs_dir + if dir is None: + return os.path.join(tmp_path, 'flame_graph.svg') + os.makedirs(dir, exist_ok=True) + return os.path.join(dir, f'flame_graph_{flame_graph_repo.codename()}.svg') + + +@fixture +def profiler(params, server, workers, flame_graph_svg): + pids = [server.pid] + [worker.pid for worker in workers] + pids = map(str, pids) + cmd_line = CmdLine(params.flamegraph, flame_graph_svg, *pids) + with cmd_line.run_async() as proc: + yield + assert proc.returncode == 0 + + +@fixture +def repo_path(tmp_path): + return os.path.join(tmp_path, 'repo') + + +TEST_REPOS = [ + repo.TestRepoOutputSimple, + repo.TestRepoOutputEmpty, + repo.TestRepoOutputLong, + repo.TestRepoOutputNull, + repo.TestRepoSegfault, +] + +STRESS_TEST_REPOS = [ + repo.TestRepoOutputSimple, + repo.TestRepoOutputLong, +] + + +def _make_repo(repo_path, params, cls): + args = [repo_path] + if cls is repo.TestRepoSegfault: + args += [params.sigsegv] + return cls(*args) + + +@fixture(params=TEST_REPOS, ids=[repo.codename() for repo in TEST_REPOS]) +def test_repo(repo_path, params, request): + return _make_repo(repo_path, params, request.param) + + +@fixture(params=STRESS_TEST_REPOS, ids=[repo.codename() for repo in STRESS_TEST_REPOS]) +def stress_test_repo(repo_path, params, request): + return _make_repo(repo_path, params, request.param) + + +@fixture +def flame_graph_repo(stress_test_repo): + return stress_test_repo + + +Env = namedtuple('Env', ['server', 'workers', 'client', 'db']) + + +@fixture +def env(server, workers, client, sqlite_db): + return Env(server, workers, client, sqlite_db) diff --git a/test/src/lib/__init__.py b/test/src/lib/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test/src/lib/db.py b/test/src/lib/db.py new file mode 100644 index 0000000..de6960a --- /dev/null +++ b/test/src/lib/db.py @@ -0,0 +1,30 @@ +# Copyright (c) 2023 Egor Tensin +# This file is part of the "cimple" project. +# For details, see https://github.com/egor-tensin/cimple. +# Distributed under the MIT License. + +from contextlib import closing, contextmanager +import logging +import sqlite3 + + +class Database: + def __init__(self, path): + logging.info('Opening SQLite database: %s', path) + self.conn = sqlite3.connect(f'file:{path}?mode=ro', uri=True) + + def __enter__(self): + return self + + def __exit__(*args): + self.conn.close() + + @contextmanager + def get_cursor(self): + with closing(self.conn.cursor()) as cur: + yield cur + + def get_all_runs(self): + with self.get_cursor() as cur: + cur.execute('SELECT * FROM cimple_runs_view') + return cur.fetchall() diff --git a/test/src/lib/logging.py b/test/src/lib/logging.py new file mode 100644 index 0000000..663eb3a --- /dev/null +++ b/test/src/lib/logging.py @@ -0,0 +1,45 @@ +# Copyright (c) 2023 Egor Tensin +# This file is part of the "cimple" project. +# For details, see https://github.com/egor-tensin/cimple. +# Distributed under the MIT License. + +from contextlib import contextmanager +import logging +import logging.config +import logging.handlers +import multiprocessing as mp + + +@contextmanager +def child_logging_thread(): + # Delegating logging to the parent logger. + ctx = mp.get_context('spawn') + queue = ctx.Queue() + listener = logging.handlers.QueueListener(queue, logging.getLogger()) + listener.start() + try: + yield queue + finally: + listener.stop() + + +@contextmanager +def configure_logging_in_child(queue): + config = { + 'version': 1, + 'handlers': { + 'sink': { + 'class': 'logging.handlers.QueueHandler', + 'queue': queue, + }, + }, + 'root': { + 'handlers': ['sink'], + 'level': 'DEBUG', + }, + } + logging.config.dictConfig(config) + try: + yield + except Exception as e: + logging.exception(e) diff --git a/test/src/lib/net.py b/test/src/lib/net.py new file mode 100644 index 0000000..06bfda0 --- /dev/null +++ b/test/src/lib/net.py @@ -0,0 +1,15 @@ +# Copyright (c) 2023 Egor Tensin +# This file is part of the "cimple" project. +# For details, see https://github.com/egor-tensin/cimple. +# Distributed under the MIT License. + +from contextlib import closing +import socket + + +def random_unused_port(): + with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock: + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind(('0.0.0.0', 0)) + port = sock.getsockname()[1] + return port diff --git a/test/src/lib/process.py b/test/src/lib/process.py new file mode 100644 index 0000000..3b1b6b9 --- /dev/null +++ b/test/src/lib/process.py @@ -0,0 +1,201 @@ +# Copyright (c) 2023 Egor Tensin +# This file is part of the "cimple" project. +# For details, see https://github.com/egor-tensin/cimple. +# Distributed under the MIT License. + +from contextlib import contextmanager +import logging +import os +import shutil +import subprocess +from threading import Event, Lock, Thread + + +class LoggingEvent(Event): + def __init__(self, timeout=10): + self.timeout = timeout + super().__init__() + + def log_line_matches(self, line): + return False + + def wait(self): + if not super().wait(self.timeout): + raise RuntimeError('timed out while waiting for an event') + + +class LoggingThread(Thread): + def __init__(self, process, events=None): + self.process = process + self.events_lock = Lock() + if events is None: + events = [] + self.events = events + + super().__init__(target=lambda: self.process_output_lines()) + self.start() + + def add_event(self, event): + with self.events_lock: + self.events.append(event) + + def process_output_lines(self): + for line in self.process.stdout: + line = line.removesuffix('\n') + logging.info('%s: %s', self.process.log_id, line) + with self.events_lock: + for event in self.events: + if event.is_set(): + continue + if not event.log_line_matches(line): + continue + event.set() + self.events = [event for event in self.events if not event.is_set()] + + +class CmdLine: + @staticmethod + def which(binary): + if os.path.split(binary)[0]: + # shutil.which('bin/bash') doesn't work. + return os.path.abspath(binary) + path = shutil.which(binary) + if path is None: + raise RuntimeError("couldn't find a binary: " + binary) + return path + + @staticmethod + def unbuffered(): + return CmdLine('stdbuf', '-o0') + + def __init__(self, binary, *args, name=None): + binary = self.which(binary) + argv = [binary] + list(args) + + self.binary = binary + self.argv = argv + + if name is None: + name = os.path.basename(binary) + self.process_name = name + + def log_line_means_process_ready(self, line): + return True + + @classmethod + def wrap(cls, outer, inner): + return cls(outer.argv[0], *outer.argv[1:], *inner.argv, name=inner.process_name) + + def run(self, *argv): + return Process.run(*self.argv, *argv) + + def try_run(self, *argv): + return Process.try_run(*self.argv, *argv) + + @contextmanager + def run_async(self, *argv): + with Process(self, *argv) as process: + yield process + + +class LoggingEventProcessReady(LoggingEvent): + def __init__(self, process): + self.process = process + super().__init__() + + def set(self): + logging.info('Process %s is ready', self.process.log_id) + super().set() + + def log_line_matches(self, line): + return self.process.cmd_line.log_line_means_process_ready(line) + + +class Process(subprocess.Popen): + _COMMON_ARGS = { + 'text': True, + 'stdin': subprocess.DEVNULL, + 'stdout': subprocess.PIPE, + 'stderr': subprocess.STDOUT, + } + + @staticmethod + def _log_process_start(argv): + logging.info('Executing command: %s', argv) + + @staticmethod + def _log_process_end(argv, ec, output): + log = logging.info + if ec: + log = logging.error + if ec: + log('Command %s exited with code %s', argv, ec) + else: + log('Command %s completed successfully', argv) + if output: + log('Output:\n%s', output) + + @staticmethod + def run(*args, **kwargs): + argv = list(args) + Process._log_process_start(argv) + try: + result = subprocess.run(argv, check=True, **Process._COMMON_ARGS, **kwargs) + ec, output = result.returncode, result.stdout + Process._log_process_end(argv, ec, output) + return output + except subprocess.CalledProcessError as e: + ec, output = e.returncode, e.stdout + Process._log_process_end(argv, ec, output) + raise + + @staticmethod + def try_run(*args, **kwargs): + try: + return 0, Process.run(*args, **kwargs) + except subprocess.CalledProcessError as e: + return e.returncode, e.stdout + + def __init__(self, cmd_line, *args): + self.cmd_line = cmd_line + + argv = cmd_line.argv + list(args) + self._log_process_start(argv) + + super().__init__(argv, **Process._COMMON_ARGS) + logging.info('Process %s has started', self.log_id) + + ready_event = LoggingEventProcessReady(self) + self.logger = LoggingThread(self, [ready_event]) + ready_event.wait() + + @property + def log_id(self): + return f'{self.pid}/{self.cmd_line.process_name}' + + def __exit__(self, *args): + try: + self.shut_down() + self.logger.join() + except Exception as e: + logging.exception(e) + # Postpone closing the pipes until after the logging thread is finished + # so that it doesn't attempt to read from closed descriptors. + super().__exit__(*args) + + SHUT_DOWN_TIMEOUT_SEC = 10 + + def shut_down(self): + ec = self.poll() + if ec is not None: + return + logging.info('Terminating process %s', self.log_id) + self.terminate() + try: + self.wait(timeout=Process.SHUT_DOWN_TIMEOUT_SEC) + return + except subprocess.TimeoutExpired: + pass + logging.info('Process %s failed to terminate in time, killing it', self.log_id) + self.kill() + self.wait(timeout=Process.SHUT_DOWN_TIMEOUT_SEC) diff --git a/test/src/lib/test_repo.py b/test/src/lib/test_repo.py new file mode 100644 index 0000000..1d07a4e --- /dev/null +++ b/test/src/lib/test_repo.py @@ -0,0 +1,231 @@ +# Copyright (c) 2023 Egor Tensin +# This file is part of the "cimple" project. +# For details, see https://github.com/egor-tensin/cimple. +# Distributed under the MIT License. + +import abc +import base64 +import logging +import os +import random +import shlex +import shutil + +from .process import Process + + +class Repo: + BRANCH = 'main' + + def __init__(self, path): + self.path = os.path.abspath(path) + os.makedirs(path, exist_ok=True) + self.run('git', 'init', '-q', f'--initial-branch={Repo.BRANCH}') + self.run('git', 'config', 'user.name', 'Test User') + self.run('git', 'config', 'user.email', 'test@example.com') + + def run(self, *args, **kwargs): + Process.run(*args, cwd=self.path, **kwargs) + + +CI_SCRIPT = r'''#!/usr/bin/env bash +set -o errexit -o nounset -o pipefail +readonly runs_dir={runs_dir} +readonly run_output_template=run_XXXXXX +run_output_path="$( mktemp --tmpdir="$runs_dir" "$run_output_template" )" +touch -- "$run_output_path" +''' + + +class TestRepo(Repo): + # Prevent Pytest from discovering test cases in this class: + __test__ = False + + def __init__(self, path, ci_script='ci'): + super().__init__(path) + + self.runs_dir = os.path.join(self.path, 'runs') + os.makedirs(self.runs_dir, exist_ok=True) + + self.ci_script_path = os.path.join(self.path, ci_script) + + self.write_ci_script() + self.run('git', 'add', '--', ci_script) + self.run('git', 'commit', '-q', '-m', 'add CI script') + + @staticmethod + @abc.abstractmethod + def codename(): + pass + + @abc.abstractmethod + def run_exit_code_matches(self, ec): + pass + + @abc.abstractmethod + def run_output_matches(self, output): + pass + + def write_ci_script(self): + with open(self.ci_script_path, mode='x') as f: + f.write(self.format_ci_script()) + os.chmod(self.ci_script_path, 0o755) + + def format_ci_script(self): + runs_dir = shlex.quote(self.runs_dir) + return CI_SCRIPT.format(runs_dir=runs_dir) + + def _count_run_files(self): + return len([name for name in os.listdir(self.runs_dir) if os.path.isfile(os.path.join(self.runs_dir, name))]) + + def run_files_are_present(self, expected): + assert expected == self._count_run_files() + + +class TestRepoOutput(TestRepo, abc.ABC): + __test__ = False + + OUTPUT_SCRIPT_NAME = 'generate-output' + + def __init__(self, path): + self.output_script_path = os.path.join(path, TestRepoOutput.OUTPUT_SCRIPT_NAME) + super().__init__(path) + + self.write_output_script() + self.run('git', 'add', '--', TestRepoOutput.OUTPUT_SCRIPT_NAME) + self.run('git', 'commit', '-q', '-m', 'add output script') + + def format_ci_script(self): + script = super().format_ci_script() + added = r'{output_script} | tee -a "$run_output_path"'.format( + output_script=shlex.quote(self.output_script_path)) + return f'{script}\n{added}\n' + + def write_output_script(self): + with open(self.output_script_path, mode='x') as f: + f.write(self.format_output_script()) + os.chmod(self.output_script_path, 0o755) + + @abc.abstractmethod + def format_output_script(self): + pass + + def run_exit_code_matches(self, ec): + return ec == 0 + + +OUTPUT_SCRIPT_SIMPLE = r'''#!/bin/sh -e +timestamp="$( date --iso-8601=ns )" +echo "A CI run happened at $timestamp" +''' + + +class TestRepoOutputSimple(TestRepoOutput): + __test__ = False + + @staticmethod + def codename(): + return 'output_simple' + + def format_output_script(self): + return OUTPUT_SCRIPT_SIMPLE + + def run_output_matches(self, output): + return output.decode().startswith('A CI run happened at ') + + +OUTPUT_SCRIPT_EMPTY = r'''#!/bin/sh +''' + + +class TestRepoOutputEmpty(TestRepoOutput): + __test__ = False + + @staticmethod + def codename(): + return 'output_empty' + + def format_output_script(self): + return OUTPUT_SCRIPT_EMPTY + + def run_output_matches(self, output): + return len(output) == 0 + + +OUTPUT_SCRIPT_LONG = r'''#!/bin/sh -e +dd if=/dev/urandom count={output_len_kb} bs=1024 | base64 +''' + + +class TestRepoOutputLong(TestRepoOutput): + __test__ = False + + OUTPUT_LEN_KB = 300 + + @staticmethod + def codename(): + return 'output_long' + + def format_output_script(self): + output_len = TestRepoOutputLong.OUTPUT_LEN_KB + output_len = shlex.quote(str(output_len)) + return OUTPUT_SCRIPT_LONG.format(output_len_kb=output_len) + + def run_output_matches(self, output): + return len(output) > TestRepoOutputLong.OUTPUT_LEN_KB * 1024 + + +OUTPUT_SCRIPT_NULL = r'''#!/usr/bin/env python3 +output = {output} +import sys +sys.stdout.buffer.write(output) +sys.exit(-2) +''' + + +class TestRepoOutputNull(TestRepoOutput): + __test__ = False + + OUTPUT = b'123\x00456\x00789' + + def __init__(self, *args, **kwargs): + assert len(TestRepoOutputNull.OUTPUT) == 11 + self.output = TestRepoOutputNull.OUTPUT + super().__init__(*args, **kwargs) + + @staticmethod + def codename(): + return 'output_null' + + def format_output_script(self): + return OUTPUT_SCRIPT_NULL.format(output=repr(self.output)) + + def run_exit_code_matches(self, ec): + return ec == 254 + + def run_output_matches(self, output): + return output == self.output + + +class TestRepoSegfault(TestRepo): + __test__ = False + + def __init__(self, repo_path, prog_path): + self.prog_path = prog_path + super().__init__(repo_path) + + @staticmethod + def codename(): + return 'segfault' + + def write_ci_script(self): + shutil.copy(self.prog_path, self.ci_script_path) + + def run_exit_code_matches(self, ec): + return ec == -11 + + def run_output_matches(self, output): + return "Started the test program.\n" == output.decode() + + def run_files_are_present(self, *args): + return True diff --git a/test/src/lib/tests.py b/test/src/lib/tests.py new file mode 100644 index 0000000..7ccedef --- /dev/null +++ b/test/src/lib/tests.py @@ -0,0 +1,23 @@ +# Copyright (c) 2023 Egor Tensin +# This file is part of the "cimple" project. +# For details, see https://github.com/egor-tensin/cimple. +# Distributed under the MIT License. + +import pytest + + +# Reference: https://github.com/pytest-dev/pytest/issues/3628 +# Automatic generation of readable test IDs. +def my_parametrize(names, values, ids=None, **kwargs): + _names = names.split(',') if isinstance(names, str) else names + if not ids: + if len(_names) == 1: + ids = [f'{names}={v}' for v in values] + else: + _values = [combination.values if hasattr(combination, 'values') else combination + for combination in values] + ids = [ + '-'.join(f'{k}={v}' for k, v in zip(_names, combination)) + for combination in _values + ] + return pytest.mark.parametrize(names, values, ids=ids, **kwargs) diff --git a/test/src/test_basic.py b/test/src/test_basic.py new file mode 100644 index 0000000..26732fe --- /dev/null +++ b/test/src/test_basic.py @@ -0,0 +1,69 @@ +# Copyright (c) 2023 Egor Tensin +# This file is part of the "cimple" project. +# For details, see https://github.com/egor-tensin/cimple. +# Distributed under the MIT License. + +import re + + +def _test_cmd_line_version_internal(cmd_line, name, version): + for flag in ('--version', '-V'): + output = cmd_line.run(flag).removesuffix('\n') + match = re.match(r'^cimple-(\w+) v(\d+\.\d+\.\d+) \([0-9a-f]{40,}\)$', output) + assert match, f'Invalid {flag} output:\n{output}' + assert match.group(1) == name + assert match.group(2) == version + + +def test_cmd_line_version(server_exe, worker_exe, client_exe, version): + _test_cmd_line_version_internal(server_exe, 'server', version) + _test_cmd_line_version_internal(worker_exe, 'worker', version) + _test_cmd_line_version_internal(client_exe, 'client', version) + + +def _test_cmd_line_help_internal(cmd_line, name): + for flag in ('--help', '-h'): + output = cmd_line.run(flag).removesuffix('\n') + match = re.match(r'^usage: cimple-(\w+) ', output) + assert match, f'Invalid {flag} output:\n{output}' + assert match.group(1) == name + + +def test_cmd_line_help(server_exe, worker_exe, client_exe): + _test_cmd_line_help_internal(server_exe, 'server') + _test_cmd_line_help_internal(worker_exe, 'worker') + _test_cmd_line_help_internal(client_exe, 'client') + + +def _test_cmd_line_invalid_option_internal(cmd_line, name): + for args in (['-x'], ['--invalid', 'value']): + ec, output = cmd_line.try_run(*args) + assert ec != 0, f'Invalid exit code {ec}, output:\n{output}' + + +def test_cmd_line_invalid_option(server_exe, worker_exe, client_exe): + _test_cmd_line_invalid_option_internal(server_exe, 'server') + _test_cmd_line_invalid_option_internal(worker_exe, 'worker') + _test_cmd_line_invalid_option_internal(client_exe, 'client') + + +def test_run_client_no_action(client): + ec, output = client.try_run() + assert ec != 0, f'Invalid exit code {ec}, output:\n{output}' + prefix = 'usage error: no action specified\n' + assert output.startswith(prefix), f'Invalid output:\n{output}' + + +def test_run_client_invalid_request(server, client): + ec, output = client.try_run('hello') + assert ec != 0, f'Invalid exit code {ec}, output:\n{output}' + prefix = 'usage error: invalid request\n' + assert output.startswith(prefix), f'Invalid output:\n{output}' + + +def test_run_noop_server(server): + pass + + +def test_run_noop_server_and_workers(server, workers): + pass diff --git a/test/src/test_repo.py b/test/src/test_repo.py new file mode 100644 index 0000000..d68cd4a --- /dev/null +++ b/test/src/test_repo.py @@ -0,0 +1,110 @@ +# Copyright (c) 2023 Egor Tensin +# This file is part of the "cimple" project. +# For details, see https://github.com/egor-tensin/cimple. +# Distributed under the MIT License. + +import json +import logging +import multiprocessing as mp +import re + +import pytest + +from lib.logging import child_logging_thread, configure_logging_in_child +from lib.process import LoggingEvent +from lib.tests import my_parametrize + + +def test_sigsegv(sigsegv): + ec, output = sigsegv.try_run() + assert ec == -11 + assert output == 'Started the test program.\n' + + +class LoggingEventRunComplete(LoggingEvent): + def __init__(self, target): + self.counter = 0 + self.target = target + self.re = re.compile(r'run \d+ as finished') + super().__init__(timeout=150) + + def log_line_matches(self, line): + return bool(self.re.search(line)) + + def set(self): + self.counter += 1 + if self.counter == self.target: + super().set() + + +def client_runner_process(log_queue, client, runs_per_process, repo): + with configure_logging_in_child(log_queue): + logging.info('Executing %s clients', runs_per_process) + for i in range(runs_per_process): + client.run('queue-run', repo.path, 'HEAD') + + +def _test_repo_internal(env, repo, numof_processes, runs_per_process): + numof_runs = numof_processes * runs_per_process + + event = LoggingEventRunComplete(numof_runs) + # Count the number of times the server receives the "run complete" message. + env.server.logger.add_event(event) + + with child_logging_thread() as log_queue: + ctx = mp.get_context('spawn') + args = (log_queue, env.client, runs_per_process, repo) + processes = [ctx.Process(target=client_runner_process, args=args) for i in range(numof_processes)] + + for proc in processes: + proc.start() + for proc in processes: + proc.join() + event.wait() + + repo.run_files_are_present(numof_runs) + + runs = env.db.get_all_runs() + assert numof_runs == len(runs) + + for id, status, ec, output, url, rev in runs: + assert status == 'finished', f'Invalid status for run {id}: {status}' + assert repo.run_exit_code_matches(ec), f"Exit code doesn't match: {ec}" + assert repo.run_output_matches(output), f"Output doesn't match: {output}" + + runs = env.client.run('get-runs') + runs = json.loads(runs)['result'] + assert len(runs) == numof_runs + + for run in runs: + id = run['id'] + ec = run['exit_code'] + + assert repo.run_exit_code_matches(ec), f"Exit code doesn't match: {ec}" + # Not implemented yet: + assert 'status' not in run + assert 'output' not in run + + +@my_parametrize('runs_per_client', [1, 5]) +@my_parametrize('numof_clients', [1, 5]) +def test_repo(env, test_repo, numof_clients, runs_per_client): + _test_repo_internal(env, test_repo, numof_clients, runs_per_client) + + +@pytest.mark.stress +@my_parametrize('numof_clients,runs_per_client', + [ + (10, 50), + (1, 2000), + (4, 500), + ]) +def test_repo_stress(env, stress_test_repo, numof_clients, runs_per_client): + _test_repo_internal(env, stress_test_repo, numof_clients, runs_per_client) + + +# Nice workaround to skip tests by default: https://stackoverflow.com/a/43938191 +@pytest.mark.skipif("not config.getoption('flamegraph')") +@pytest.mark.flame_graph +def test_repo_flame_graph(env, profiler, flame_graph_repo): + _test_repo_internal(env, flame_graph_repo, 4, 500) -- cgit v1.2.3