# Copyright (c) 2023 Egor Tensin <Egor.Tensin@gmail.com>
# This file is part of the "cimple" project.
# For details, see https://github.com/egor-tensin/cimple.
# Distributed under the MIT License.
from multiprocessing import Process
import re
import pytest
from lib.process import LoggingEvent
class LoggingEventRunComplete(LoggingEvent):
def __init__(self, target):
self.counter = 0
self.target = target
self.re = re.compile(r'run \d+ as finished')
super().__init__(timeout=150)
def log_line_matches(self, line):
return bool(self.re.search(line))
def set(self):
self.counter += 1
if self.counter == self.target:
super().set()
def _test_repo_internal(env, repo, numof_processes, runs_per_process):
numof_runs = numof_processes * runs_per_process
event = LoggingEventRunComplete(numof_runs)
# Count the number of times the server receives the "run complete" message.
env.server.logger.add_event(event)
def client_runner():
for i in range(runs_per_process):
env.client.run('run', repo.path, 'HEAD')
processes = [Process(target=client_runner) for i in range(numof_processes)]
for proc in processes:
proc.start()
event.wait()
for proc in processes:
proc.join()
assert numof_runs == repo.count_run_files()
runs = env.db.get_all_runs()
assert numof_runs == len(runs)
for id, status, ec, output, url, rev in runs:
assert status == 'finished', f'Invalid status for run {id}: {status}'
assert repo.run_output_matches(output), f"Output doesn't match: {output}"
# Reference: https://github.com/pytest-dev/pytest/issues/3628
# Automatic generation of readable test IDs.
def my_parametrize(names, values, ids=None, **kwargs):
_names = names.split(',') if isinstance(names, str) else names
if not ids:
if len(_names) == 1:
ids = [f'{names}={v}' for v in values]
else:
ids = [
'-'.join(f'{k}={v}' for k, v in zip(_names, combination))
for combination in values
]
return pytest.mark.parametrize(names, values, ids=ids, **kwargs)
@my_parametrize('runs_per_client', [1, 5])
@my_parametrize('numof_clients', [1, 5])
def test_repo(env, test_repo, numof_clients, runs_per_client):
_test_repo_internal(env, test_repo, numof_clients, runs_per_client)
@pytest.mark.stress
@my_parametrize(('numof_clients', 'runs_per_client'),
[(10, 50), (1, 2000), (4, 500)])
def test_repo_stress(env, test_repo, numof_clients, runs_per_client):
_test_repo_internal(env, test_repo, numof_clients, runs_per_client)