aboutsummaryrefslogblamecommitdiffstatshomepage
path: root/test/py/lib/test_repo.py
blob: 0d722bef8331dab92100b659fede18eef24bdab5 (plain) (tree)
1
2
3
4
5
6
7
8
9




                                                         
          
             

              
             
            

             
                            







                                         
                                                                        



                                                                   
                                                   

 
                            
                            
                                       
                                                                         



                           



                                                               








                                                       
 















                                                                                                                     


                              
 










                                                                                            
 




                                         
                                                                     






                                                               
                                       
                                   










                                           
                                                                  

 
                                   










                                          



                                                                       
                                               
                 



















                                                                       

 
                                               
                 







                                         
                                 

                                        
                                                   



                                               



                                                                  
# Copyright (c) 2023 Egor Tensin <Egor.Tensin@gmail.com>
# This file is part of the "cimple" project.
# For details, see https://github.com/egor-tensin/cimple.
# Distributed under the MIT License.

import abc
import base64
import logging
import os
import random
import shlex
import shutil

from .process import Process


class Repo:
    BRANCH = 'main'

    def __init__(self, path):
        self.path = os.path.abspath(path)
        os.makedirs(path, exist_ok=True)
        self.run('git', 'init', '-q', f'--initial-branch={Repo.BRANCH}')
        self.run('git', 'config', 'user.name', 'Test User')
        self.run('git', 'config', 'user.email', 'test@example.com')

    def run(self, *args, **kwargs):
        Process.run(*args, cwd=self.path, **kwargs)


CI_SCRIPT = r'''#!/bin/sh -e
readonly runs_dir={runs_dir}
readonly run_output_template=run_XXXXXX
run_output_path="$( mktemp --tmpdir="$runs_dir" "$run_output_template" )"
touch -- "$run_output_path"
'''


class TestRepo(Repo):
    # Prevent Pytest from discovering test cases in this class:
    __test__ = False

    def _format_ci_script(self):
        runs_dir = shlex.quote(self.runs_dir)
        return CI_SCRIPT.format(runs_dir=runs_dir)

    def __init__(self, path, ci_script='ci.sh'):
        super().__init__(path)

        self.runs_dir = os.path.join(self.path, 'runs')
        os.makedirs(self.runs_dir, exist_ok=True)

        self.ci_script_path = os.path.join(self.path, ci_script)
        with open(self.ci_script_path, mode='x') as f:
            f.write(self._format_ci_script())
        os.chmod(self.ci_script_path, 0o755)

        self.run('git', 'add', '--', ci_script)
        self.run('git', 'commit', '-q', '-m', 'add CI script')

    def count_run_files(self):
        return len([name for name in os.listdir(self.runs_dir) if os.path.isfile(os.path.join(self.runs_dir, name))])


class TestRepoOutput(TestRepo, abc.ABC):
    __test__ = False

    OUTPUT_SCRIPT_NAME = 'generate-output'

    def __init__(self, path):
        super().__init__(path)

        self.output_script_path = os.path.join(self.path, TestRepoOutput.OUTPUT_SCRIPT_NAME)
        with open(self.output_script_path, mode='x') as f:
            f.write(self._format_output_script())
        os.chmod(self.output_script_path, 0o755)

        with open(self.ci_script_path, mode='a') as f:
            f.write(self._format_ci_script_addition())

        self.run('git', 'add', '--', TestRepoOutput.OUTPUT_SCRIPT_NAME)
        self.run('git', 'add', '-u')
        self.run('git', 'commit', '-q', '-m', 'add output script')

    @abc.abstractmethod
    def _format_output_script(self):
        pass

    def _format_ci_script_addition(self):
        return r'{output_script} | tee -a "$run_output_path"'.format(
            output_script=shlex.quote(self.output_script_path))

    @abc.abstractmethod
    def run_output_matches(self, output):
        pass


OUTPUT_SCRIPT_SIMPLE = r'''#!/bin/sh -e
timestamp="$( date --iso-8601=ns )"
echo "A CI run happened at $timestamp"
'''


class TestRepoOutputSimple(TestRepoOutput):
    __test__ = False

    def _format_output_script(self):
        return OUTPUT_SCRIPT_SIMPLE

    def run_output_matches(self, output):
        return output.decode().startswith('A CI run happened at ')


OUTPUT_SCRIPT_EMPTY = r'''#!/bin/sh
'''


class TestRepoOutputEmpty(TestRepoOutput):
    __test__ = False

    def _format_output_script(self):
        return OUTPUT_SCRIPT_EMPTY

    def run_output_matches(self, output):
        return len(output) == 0


# Making it a bash script introduces way too much overhead with all the
# argument expansions; it slows things down considerably.
OUTPUT_SCRIPT_LONG = r'''#!/usr/bin/env python3
output = {output}
import sys
sys.stdout.write(output)
'''


class TestRepoOutputLong(TestRepoOutput):
    __test__ = False

    OUTPUT_LEN_KB = 300

    def __init__(self, *args, **kwargs):
        nb = TestRepoOutputLong.OUTPUT_LEN_KB * 1024
        self.output = base64.encodebytes(random.randbytes(nb)).decode()
        super().__init__(*args, **kwargs)

    def _format_output_script(self):
        return OUTPUT_SCRIPT_LONG.format(output=repr(self.output))

    def run_output_matches(self, output):
        return output.decode() == self.output


OUTPUT_SCRIPT_NULL = r'''#!/usr/bin/env python3
output = {output}
import sys
sys.stdout.buffer.write(output)
'''


class TestRepoOutputNull(TestRepoOutput):
    __test__ = False

    OUTPUT = b'123\x00456\x00789'

    def __init__(self, *args, **kwargs):
        assert len(TestRepoOutputNull.OUTPUT) == 11
        self.output = TestRepoOutputNull.OUTPUT
        super().__init__(*args, **kwargs)

    def _format_output_script(self):
        return OUTPUT_SCRIPT_NULL.format(output=repr(self.output))

    def run_output_matches(self, output):
        return output == self.output