diff options
-rw-r--r-- | src/server.c | 23 | ||||
-rw-r--r-- | src/sqlite/v01.sql | 6 | ||||
-rw-r--r-- | src/storage_sqlite.c | 162 | ||||
-rw-r--r-- | test/py/conftest.py | 6 | ||||
-rw-r--r-- | test/py/lib/db.py | 28 | ||||
-rw-r--r-- | test/py/test_repo.py | 32 |
6 files changed, 191 insertions, 66 deletions
diff --git a/src/server.c b/src/server.c index da9cfc3..a4358d5 100644 --- a/src/server.c +++ b/src/server.c @@ -125,24 +125,21 @@ static int server_enqueue_run(struct server *server, struct run *run) { int ret = 0; - ret = server_lock(server); + ret = storage_run_create(&server->storage, run_get_url(run), run_get_rev(run)); if (ret < 0) return ret; + run_set_id(run, ret); - ret = storage_run_create(&server->storage, run_get_url(run), run_get_rev(run)); + ret = server_lock(server); if (ret < 0) - goto unlock; - run_set_id(run, ret); + return ret; run_queue_add_last(&server->run_queue, run); log("Added a new run %d for repository %s to the queue\n", run_get_id(run), run_get_url(run)); server_notify(server); - -unlock: server_unlock(server); - return ret; } @@ -297,9 +294,7 @@ static int server_handle_cmd_finished(const struct msg *request, UNUSED struct m struct server *server = (struct server *)ctx->arg; int ret = 0; - log("Received a \"run finished\" message from worker %d\n", ctx->fd); - - int run_id; + int run_id = 0; struct proc_output output; ret = msg_finished_parse(request, &run_id, &output); @@ -307,12 +302,16 @@ static int server_handle_cmd_finished(const struct msg *request, UNUSED struct m return ret; ret = storage_run_finished(&server->storage, run_id, output.ec); - proc_output_free(&output); if (ret < 0) { log_err("Failed to mark run %d as finished\n", run_id); - return ret; + goto free_output; } + log("Marked run %d as finished\n", run_id); + +free_output: + proc_output_free(&output); + return ret; } diff --git a/src/sqlite/v01.sql b/src/sqlite/v01.sql index 454c19e..b37e486 100644 --- a/src/sqlite/v01.sql +++ b/src/sqlite/v01.sql @@ -29,3 +29,9 @@ CREATE TABLE cimple_runs ( CREATE INDEX cimple_runs_index_status ON cimple_runs(status); CREATE INDEX cimple_runs_index_repo_id ON cimple_runs(repo_id); + +CREATE VIEW cimple_runs_readable AS + SELECT run.id, status.label, run.ec, run.output, repo.url, run.rev + FROM cimple_runs AS run + INNER JOIN cimple_run_status as status ON run.status = status.id + INNER JOIN cimple_repos as repo ON run.repo_id = repo.id; diff --git a/src/storage_sqlite.c b/src/storage_sqlite.c index 8de9c38..2ac8b30 100644 --- a/src/storage_sqlite.c +++ b/src/storage_sqlite.c @@ -14,6 +14,7 @@ #include <sqlite3.h> +#include <pthread.h> #include <stdio.h> #include <stdlib.h> #include <string.h> @@ -52,13 +53,66 @@ void storage_sqlite_settings_destroy(const struct storage_settings *settings) free(settings->sqlite); } +enum run_status { + RUN_STATUS_CREATED = 1, + RUN_STATUS_FINISHED = 2, +}; + +struct prepared_stmt { + pthread_mutex_t mtx; + sqlite3_stmt *impl; +}; + +static int prepared_stmt_init(struct prepared_stmt *stmt, sqlite3 *db, const char *sql) +{ + int ret = 0; + + ret = pthread_mutex_init(&stmt->mtx, NULL); + if (ret) { + pthread_errno(ret, "pthread_mutex_init"); + return ret; + } + + ret = sqlite_prepare(db, sql, &stmt->impl); + if (ret < 0) + goto destroy_mtx; + + return ret; + +destroy_mtx: + pthread_errno_if(pthread_mutex_destroy(&stmt->mtx), "pthread_mutex_destroy"); + + return ret; +} + +static void prepared_stmt_destroy(struct prepared_stmt *stmt) +{ + pthread_errno_if(pthread_mutex_destroy(&stmt->mtx), "pthread_mutex_destroy"); + sqlite_finalize(stmt->impl); +} + +static int prepared_stmt_lock(struct prepared_stmt *stmt) +{ + int ret = pthread_mutex_lock(&stmt->mtx); + if (ret) { + pthread_errno(ret, "pthread_mutex_unlock"); + return ret; + } + return ret; +} + +static void prepared_stmt_unlock(struct prepared_stmt *stmt) +{ + pthread_errno_if(pthread_mutex_unlock(&stmt->mtx), "pthread_mutex_unlock"); +} + struct storage_sqlite { sqlite3 *db; - sqlite3_stmt *stmt_repo_find; - sqlite3_stmt *stmt_repo_insert; - sqlite3_stmt *stmt_run_insert; - sqlite3_stmt *stmt_run_finished; + struct prepared_stmt stmt_repo_find; + struct prepared_stmt stmt_repo_insert; + struct prepared_stmt stmt_run_insert; + struct prepared_stmt stmt_run_finished; }; static int storage_sqlite_upgrade_to(struct storage_sqlite *storage, size_t version) @@ -151,43 +205,43 @@ static int storage_sqlite_prepare_statements(struct storage_sqlite *storage) static const char *const fmt_repo_insert = "INSERT INTO cimple_repos(url) VALUES (?) ON CONFLICT(url) DO NOTHING;"; static const char *const fmt_run_insert = - "INSERT INTO cimple_runs(status, ec, output, repo_id, rev) VALUES (1, -1, x'', ?, ?) RETURNING id;"; + "INSERT INTO cimple_runs(status, ec, output, repo_id, rev) VALUES (?, -1, x'', ?, ?) RETURNING id;"; static const char *const fmt_run_finished = - "UPDATE cimple_runs SET status = 2, ec = ? WHERE id = ?;"; + "UPDATE cimple_runs SET status = ?, ec = ? WHERE id = ?;"; int ret = 0; - ret = sqlite_prepare(storage->db, fmt_repo_find, &storage->stmt_repo_find); + ret = prepared_stmt_init(&storage->stmt_repo_find, storage->db, fmt_repo_find); if (ret < 0) return ret; - ret = sqlite_prepare(storage->db, fmt_repo_insert, &storage->stmt_repo_insert); + ret = prepared_stmt_init(&storage->stmt_repo_insert, storage->db, fmt_repo_insert); if (ret < 0) goto finalize_repo_find; - ret = sqlite_prepare(storage->db, fmt_run_insert, &storage->stmt_run_insert); + ret = prepared_stmt_init(&storage->stmt_run_insert, storage->db, fmt_run_insert); if (ret < 0) goto finalize_repo_insert; - ret = sqlite_prepare(storage->db, fmt_run_finished, &storage->stmt_run_finished); + ret = prepared_stmt_init(&storage->stmt_run_finished, storage->db, fmt_run_finished); if (ret < 0) goto finalize_run_insert; return ret; finalize_run_insert: - sqlite_finalize(storage->stmt_run_insert); + prepared_stmt_destroy(&storage->stmt_run_insert); finalize_repo_insert: - sqlite_finalize(storage->stmt_repo_insert); + prepared_stmt_destroy(&storage->stmt_repo_insert); finalize_repo_find: - sqlite_finalize(storage->stmt_repo_find); + prepared_stmt_destroy(&storage->stmt_repo_find); return ret; } static void storage_sqlite_finalize_statements(struct storage_sqlite *storage) { - sqlite_finalize(storage->stmt_run_finished); - sqlite_finalize(storage->stmt_run_insert); - sqlite_finalize(storage->stmt_repo_insert); - sqlite_finalize(storage->stmt_repo_find); + prepared_stmt_destroy(&storage->stmt_run_finished); + prepared_stmt_destroy(&storage->stmt_run_insert); + prepared_stmt_destroy(&storage->stmt_repo_insert); + prepared_stmt_destroy(&storage->stmt_repo_find); } int storage_sqlite_create(struct storage *storage, const struct storage_settings *settings) @@ -238,62 +292,75 @@ void storage_sqlite_destroy(struct storage *storage) static int storage_sqlite_find_repo(struct storage_sqlite *storage, const char *url) { - sqlite3_stmt *stmt = storage->stmt_repo_find; + struct prepared_stmt *stmt = &storage->stmt_repo_find; int ret = 0; - ret = sqlite_bind_text(stmt, 1, url); + ret = prepared_stmt_lock(stmt); + if (ret < 0) + return ret; + ret = sqlite_bind_text(stmt->impl, 1, url); if (ret < 0) goto reset; - ret = sqlite_step(stmt); + ret = sqlite_step(stmt->impl); if (ret < 0) goto reset; if (!ret) goto reset; - ret = sqlite_column_int(stmt, 0); + ret = sqlite_column_int(stmt->impl, 0); goto reset; reset: - sqlite_reset(stmt); + sqlite_reset(stmt->impl); + prepared_stmt_unlock(stmt); return ret; } static int storage_sqlite_insert_repo(struct storage_sqlite *storage, const char *url) { - sqlite3_stmt *stmt = storage->stmt_repo_insert; + struct prepared_stmt *stmt = &storage->stmt_repo_insert; int ret = 0; - ret = sqlite_bind_text(stmt, 1, url); + ret = prepared_stmt_lock(stmt); if (ret < 0) - goto reset; - ret = sqlite_step(stmt); + return ret; + ret = sqlite_bind_text(stmt->impl, 1, url); if (ret < 0) goto reset; - - ret = storage_sqlite_find_repo(storage, url); + ret = sqlite_step(stmt->impl); if (ret < 0) goto reset; reset: - sqlite_reset(stmt); + sqlite_reset(stmt->impl); + prepared_stmt_unlock(stmt); - return ret; + if (ret < 0) + return ret; + + return storage_sqlite_find_repo(storage, url); } static int storage_sqlite_insert_run(struct storage_sqlite *storage, int repo_id, const char *rev) { - sqlite3_stmt *stmt = storage->stmt_run_insert; + struct prepared_stmt *stmt = &storage->stmt_run_insert; int ret = 0; - ret = sqlite_bind_int(stmt, 1, repo_id); + ret = prepared_stmt_lock(stmt); + if (ret < 0) + return ret; + ret = sqlite_bind_int(stmt->impl, 1, RUN_STATUS_CREATED); + if (ret < 0) + goto reset; + ret = sqlite_bind_int(stmt->impl, 2, repo_id); if (ret < 0) goto reset; - ret = sqlite_bind_text(stmt, 2, rev); + ret = sqlite_bind_text(stmt->impl, 3, rev); if (ret < 0) goto reset; - ret = sqlite_step(stmt); + ret = sqlite_step(stmt->impl); if (ret < 0) goto reset; @@ -303,11 +370,12 @@ static int storage_sqlite_insert_run(struct storage_sqlite *storage, int repo_id goto reset; } - ret = sqlite_column_int(stmt, 0); + ret = sqlite_column_int(stmt->impl, 0); goto reset; reset: - sqlite_reset(stmt); + sqlite_reset(stmt->impl); + prepared_stmt_unlock(stmt); return ret; } @@ -329,21 +397,28 @@ int storage_sqlite_run_create(struct storage *storage, const char *repo_url, con int storage_sqlite_run_finished(struct storage *storage, int run_id, int ec) { - sqlite3_stmt *stmt = storage->sqlite->stmt_run_finished; + struct prepared_stmt *stmt = &storage->sqlite->stmt_run_finished; int ret = 0; - ret = sqlite_bind_int(stmt, 1, ec); + ret = prepared_stmt_lock(stmt); + if (ret < 0) + return ret; + ret = sqlite_bind_int(stmt->impl, 1, RUN_STATUS_FINISHED); if (ret < 0) goto reset; - ret = sqlite_bind_int(stmt, 2, run_id); + ret = sqlite_bind_int(stmt->impl, 2, ec); if (ret < 0) goto reset; - ret = sqlite_step(stmt); + ret = sqlite_bind_int(stmt->impl, 3, run_id); + if (ret < 0) + goto reset; + ret = sqlite_step(stmt->impl); if (ret < 0) goto reset; reset: - sqlite_reset(stmt); + sqlite_reset(stmt->impl); + prepared_stmt_unlock(stmt); return ret; } @@ -383,7 +458,7 @@ int storage_sqlite_get_run_queue(struct storage *storage, struct run_queue *queu { static const char *const fmt = "SELECT cimple_runs.id, cimple_repos.url, cimple_runs.rev FROM cimple_runs" - " INNER JOIN cimple_repos ON cimple_runs.repo_id = cimple_repos.id WHERE cimple_runs.status = 1;"; + " INNER JOIN cimple_repos ON cimple_runs.repo_id = cimple_repos.id WHERE cimple_runs.status = ?;"; sqlite3_stmt *stmt; int ret = 0; @@ -391,6 +466,9 @@ int storage_sqlite_get_run_queue(struct storage *storage, struct run_queue *queu ret = sqlite_prepare(storage->sqlite->db, fmt, &stmt); if (ret < 0) return ret; + ret = sqlite_bind_int(stmt, 1, RUN_STATUS_CREATED); + if (ret < 0) + goto finalize; run_queue_create(queue); diff --git a/test/py/conftest.py b/test/py/conftest.py index 750e4e8..ead09d1 100644 --- a/test/py/conftest.py +++ b/test/py/conftest.py @@ -8,6 +8,7 @@ import os from pytest import fixture +from lib.db import Database from lib.net import random_unused_port from lib.process import CmdLine from lib.test_repo import TestRepo @@ -190,3 +191,8 @@ def client(client_cmd): @fixture def test_repo(tmp_path): return TestRepo(tmp_path) + + +@fixture +def sqlite_db(server, sqlite_path): + return Database(sqlite_path) diff --git a/test/py/lib/db.py b/test/py/lib/db.py new file mode 100644 index 0000000..b9059c5 --- /dev/null +++ b/test/py/lib/db.py @@ -0,0 +1,28 @@ +# Copyright (c) 2023 Egor Tensin <Egor.Tensin@gmail.com> +# This file is part of the "cimple" project. +# For details, see https://github.com/egor-tensin/cimple. +# Distributed under the MIT License. + +from contextlib import closing, contextmanager +import sqlite3 + + +class Database: + def __init__(self, path): + self.conn = sqlite3.connect(f'file:{path}?mode=ro') + + def __enter__(self): + return self + + def __exit__(*args): + self.conn.close() + + @contextmanager + def get_cursor(self): + with closing(self.conn.cursor()) as cur: + yield cur + + def get_all_runs(self): + with self.get_cursor() as cur: + cur.execute('SELECT * FROM cimple_runs_readable') + return cur.fetchall() diff --git a/test/py/test_repo.py b/test/py/test_repo.py index 782240b..1cc1434 100644 --- a/test/py/test_repo.py +++ b/test/py/test_repo.py @@ -4,6 +4,7 @@ # Distributed under the MIT License. from multiprocessing import Process +import re import pytest @@ -14,10 +15,11 @@ class LoggingEventRunComplete(LoggingEvent): def __init__(self, target): self.counter = 0 self.target = target + self.re = re.compile(r'run \d+ as finished') super().__init__(timeout=60) def log_line_matches(self, line): - return 'Received a "run finished" message from worker' in line + return bool(self.re.search(line)) def set(self): self.counter += 1 @@ -25,7 +27,7 @@ class LoggingEventRunComplete(LoggingEvent): super().set() -def _test_repo_internal(server_and_workers, test_repo, client, numof_processes, runs_per_process): +def _test_repo_internal(server_and_workers, test_repo, client, numof_processes, runs_per_process, db): numof_runs = numof_processes * runs_per_process server, workers = server_and_workers @@ -47,24 +49,30 @@ def _test_repo_internal(server_and_workers, test_repo, client, numof_processes, event.wait() assert numof_runs == test_repo.count_ci_output_files() + runs = db.get_all_runs() + assert numof_runs == len(runs) -def test_repo_1_client_1_run(server_and_workers, test_repo, client): - _test_repo_internal(server_and_workers, test_repo, client, 1, 1) + for id, status, ec, output, url, rev in runs: + assert status == 'finished', f'Invalid status for run {id}: {status}' -def test_repo_1_client_2_runs(server_and_workers, test_repo, client): - _test_repo_internal(server_and_workers, test_repo, client, 1, 2) +def test_repo_1_client_1_run(server_and_workers, test_repo, client, sqlite_db): + _test_repo_internal(server_and_workers, test_repo, client, 1, 1, sqlite_db) -def test_repo_1_client_10_runs(server_and_workers, test_repo, client): - _test_repo_internal(server_and_workers, test_repo, client, 1, 10) +def test_repo_1_client_2_runs(server_and_workers, test_repo, client, sqlite_db): + _test_repo_internal(server_and_workers, test_repo, client, 1, 2, sqlite_db) + + +def test_repo_1_client_10_runs(server_and_workers, test_repo, client, sqlite_db): + _test_repo_internal(server_and_workers, test_repo, client, 1, 10, sqlite_db) @pytest.mark.stress -def test_repo_1_client_2000_runs(server_and_workers, test_repo, client): - _test_repo_internal(server_and_workers, test_repo, client, 1, 2000) +def test_repo_1_client_2000_runs(server_and_workers, test_repo, client, sqlite_db): + _test_repo_internal(server_and_workers, test_repo, client, 1, 2000, sqlite_db) @pytest.mark.stress -def test_repo_4_clients_500_runs(server_and_workers, test_repo, client): - _test_repo_internal(server_and_workers, test_repo, client, 4, 500) +def test_repo_4_clients_500_runs(server_and_workers, test_repo, client, sqlite_db): + _test_repo_internal(server_and_workers, test_repo, client, 4, 500, sqlite_db) |