diff options
author | Egor Tensin <egor@tensin.name> | 2023-12-28 01:00:45 +0100 |
---|---|---|
committer | Egor Tensin <egor@tensin.name> | 2023-12-28 01:00:45 +0100 |
commit | a9ae33c4b9dbe566d2ada07affb8b5a135f9b6eb (patch) | |
tree | 72b47c96da722d5e034af789f4af432e3fdfa051 /test/src/lib/test_repo.py | |
parent | json: factor out json_object_put into json_free (diff) | |
download | cimple-a9ae33c4b9dbe566d2ada07affb8b5a135f9b6eb.tar.gz cimple-a9ae33c4b9dbe566d2ada07affb8b5a135f9b6eb.zip |
test/py/ -> test/src/
Diffstat (limited to 'test/src/lib/test_repo.py')
-rw-r--r-- | test/src/lib/test_repo.py | 231 |
1 files changed, 231 insertions, 0 deletions
diff --git a/test/src/lib/test_repo.py b/test/src/lib/test_repo.py new file mode 100644 index 0000000..1d07a4e --- /dev/null +++ b/test/src/lib/test_repo.py @@ -0,0 +1,231 @@ +# Copyright (c) 2023 Egor Tensin <egor@tensin.name> +# This file is part of the "cimple" project. +# For details, see https://github.com/egor-tensin/cimple. +# Distributed under the MIT License. + +import abc +import base64 +import logging +import os +import random +import shlex +import shutil + +from .process import Process + + +class Repo: + BRANCH = 'main' + + def __init__(self, path): + self.path = os.path.abspath(path) + os.makedirs(path, exist_ok=True) + self.run('git', 'init', '-q', f'--initial-branch={Repo.BRANCH}') + self.run('git', 'config', 'user.name', 'Test User') + self.run('git', 'config', 'user.email', 'test@example.com') + + def run(self, *args, **kwargs): + Process.run(*args, cwd=self.path, **kwargs) + + +CI_SCRIPT = r'''#!/usr/bin/env bash +set -o errexit -o nounset -o pipefail +readonly runs_dir={runs_dir} +readonly run_output_template=run_XXXXXX +run_output_path="$( mktemp --tmpdir="$runs_dir" "$run_output_template" )" +touch -- "$run_output_path" +''' + + +class TestRepo(Repo): + # Prevent Pytest from discovering test cases in this class: + __test__ = False + + def __init__(self, path, ci_script='ci'): + super().__init__(path) + + self.runs_dir = os.path.join(self.path, 'runs') + os.makedirs(self.runs_dir, exist_ok=True) + + self.ci_script_path = os.path.join(self.path, ci_script) + + self.write_ci_script() + self.run('git', 'add', '--', ci_script) + self.run('git', 'commit', '-q', '-m', 'add CI script') + + @staticmethod + @abc.abstractmethod + def codename(): + pass + + @abc.abstractmethod + def run_exit_code_matches(self, ec): + pass + + @abc.abstractmethod + def run_output_matches(self, output): + pass + + def write_ci_script(self): + with open(self.ci_script_path, mode='x') as f: + f.write(self.format_ci_script()) + os.chmod(self.ci_script_path, 0o755) + + def format_ci_script(self): + runs_dir = shlex.quote(self.runs_dir) + return CI_SCRIPT.format(runs_dir=runs_dir) + + def _count_run_files(self): + return len([name for name in os.listdir(self.runs_dir) if os.path.isfile(os.path.join(self.runs_dir, name))]) + + def run_files_are_present(self, expected): + assert expected == self._count_run_files() + + +class TestRepoOutput(TestRepo, abc.ABC): + __test__ = False + + OUTPUT_SCRIPT_NAME = 'generate-output' + + def __init__(self, path): + self.output_script_path = os.path.join(path, TestRepoOutput.OUTPUT_SCRIPT_NAME) + super().__init__(path) + + self.write_output_script() + self.run('git', 'add', '--', TestRepoOutput.OUTPUT_SCRIPT_NAME) + self.run('git', 'commit', '-q', '-m', 'add output script') + + def format_ci_script(self): + script = super().format_ci_script() + added = r'{output_script} | tee -a "$run_output_path"'.format( + output_script=shlex.quote(self.output_script_path)) + return f'{script}\n{added}\n' + + def write_output_script(self): + with open(self.output_script_path, mode='x') as f: + f.write(self.format_output_script()) + os.chmod(self.output_script_path, 0o755) + + @abc.abstractmethod + def format_output_script(self): + pass + + def run_exit_code_matches(self, ec): + return ec == 0 + + +OUTPUT_SCRIPT_SIMPLE = r'''#!/bin/sh -e +timestamp="$( date --iso-8601=ns )" +echo "A CI run happened at $timestamp" +''' + + +class TestRepoOutputSimple(TestRepoOutput): + __test__ = False + + @staticmethod + def codename(): + return 'output_simple' + + def format_output_script(self): + return OUTPUT_SCRIPT_SIMPLE + + def run_output_matches(self, output): + return output.decode().startswith('A CI run happened at ') + + +OUTPUT_SCRIPT_EMPTY = r'''#!/bin/sh +''' + + +class TestRepoOutputEmpty(TestRepoOutput): + __test__ = False + + @staticmethod + def codename(): + return 'output_empty' + + def format_output_script(self): + return OUTPUT_SCRIPT_EMPTY + + def run_output_matches(self, output): + return len(output) == 0 + + +OUTPUT_SCRIPT_LONG = r'''#!/bin/sh -e +dd if=/dev/urandom count={output_len_kb} bs=1024 | base64 +''' + + +class TestRepoOutputLong(TestRepoOutput): + __test__ = False + + OUTPUT_LEN_KB = 300 + + @staticmethod + def codename(): + return 'output_long' + + def format_output_script(self): + output_len = TestRepoOutputLong.OUTPUT_LEN_KB + output_len = shlex.quote(str(output_len)) + return OUTPUT_SCRIPT_LONG.format(output_len_kb=output_len) + + def run_output_matches(self, output): + return len(output) > TestRepoOutputLong.OUTPUT_LEN_KB * 1024 + + +OUTPUT_SCRIPT_NULL = r'''#!/usr/bin/env python3 +output = {output} +import sys +sys.stdout.buffer.write(output) +sys.exit(-2) +''' + + +class TestRepoOutputNull(TestRepoOutput): + __test__ = False + + OUTPUT = b'123\x00456\x00789' + + def __init__(self, *args, **kwargs): + assert len(TestRepoOutputNull.OUTPUT) == 11 + self.output = TestRepoOutputNull.OUTPUT + super().__init__(*args, **kwargs) + + @staticmethod + def codename(): + return 'output_null' + + def format_output_script(self): + return OUTPUT_SCRIPT_NULL.format(output=repr(self.output)) + + def run_exit_code_matches(self, ec): + return ec == 254 + + def run_output_matches(self, output): + return output == self.output + + +class TestRepoSegfault(TestRepo): + __test__ = False + + def __init__(self, repo_path, prog_path): + self.prog_path = prog_path + super().__init__(repo_path) + + @staticmethod + def codename(): + return 'segfault' + + def write_ci_script(self): + shutil.copy(self.prog_path, self.ci_script_path) + + def run_exit_code_matches(self, ec): + return ec == -11 + + def run_output_matches(self, output): + return "Started the test program.\n" == output.decode() + + def run_files_are_present(self, *args): + return True |