aboutsummaryrefslogtreecommitdiffstatshomepage
diff options
context:
space:
mode:
-rw-r--r--src/server.c23
-rw-r--r--src/sqlite/v01.sql6
-rw-r--r--src/storage_sqlite.c162
-rw-r--r--test/py/conftest.py6
-rw-r--r--test/py/lib/db.py28
-rw-r--r--test/py/test_repo.py32
6 files changed, 191 insertions, 66 deletions
diff --git a/src/server.c b/src/server.c
index da9cfc3..a4358d5 100644
--- a/src/server.c
+++ b/src/server.c
@@ -125,24 +125,21 @@ static int server_enqueue_run(struct server *server, struct run *run)
{
int ret = 0;
- ret = server_lock(server);
+ ret = storage_run_create(&server->storage, run_get_url(run), run_get_rev(run));
if (ret < 0)
return ret;
+ run_set_id(run, ret);
- ret = storage_run_create(&server->storage, run_get_url(run), run_get_rev(run));
+ ret = server_lock(server);
if (ret < 0)
- goto unlock;
- run_set_id(run, ret);
+ return ret;
run_queue_add_last(&server->run_queue, run);
log("Added a new run %d for repository %s to the queue\n", run_get_id(run),
run_get_url(run));
server_notify(server);
-
-unlock:
server_unlock(server);
-
return ret;
}
@@ -297,9 +294,7 @@ static int server_handle_cmd_finished(const struct msg *request, UNUSED struct m
struct server *server = (struct server *)ctx->arg;
int ret = 0;
- log("Received a \"run finished\" message from worker %d\n", ctx->fd);
-
- int run_id;
+ int run_id = 0;
struct proc_output output;
ret = msg_finished_parse(request, &run_id, &output);
@@ -307,12 +302,16 @@ static int server_handle_cmd_finished(const struct msg *request, UNUSED struct m
return ret;
ret = storage_run_finished(&server->storage, run_id, output.ec);
- proc_output_free(&output);
if (ret < 0) {
log_err("Failed to mark run %d as finished\n", run_id);
- return ret;
+ goto free_output;
}
+ log("Marked run %d as finished\n", run_id);
+
+free_output:
+ proc_output_free(&output);
+
return ret;
}
diff --git a/src/sqlite/v01.sql b/src/sqlite/v01.sql
index 454c19e..b37e486 100644
--- a/src/sqlite/v01.sql
+++ b/src/sqlite/v01.sql
@@ -29,3 +29,9 @@ CREATE TABLE cimple_runs (
CREATE INDEX cimple_runs_index_status ON cimple_runs(status);
CREATE INDEX cimple_runs_index_repo_id ON cimple_runs(repo_id);
+
+CREATE VIEW cimple_runs_readable AS
+ SELECT run.id, status.label, run.ec, run.output, repo.url, run.rev
+ FROM cimple_runs AS run
+ INNER JOIN cimple_run_status as status ON run.status = status.id
+ INNER JOIN cimple_repos as repo ON run.repo_id = repo.id;
diff --git a/src/storage_sqlite.c b/src/storage_sqlite.c
index 8de9c38..2ac8b30 100644
--- a/src/storage_sqlite.c
+++ b/src/storage_sqlite.c
@@ -14,6 +14,7 @@
#include <sqlite3.h>
+#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
@@ -52,13 +53,66 @@ void storage_sqlite_settings_destroy(const struct storage_settings *settings)
free(settings->sqlite);
}
+enum run_status {
+ RUN_STATUS_CREATED = 1,
+ RUN_STATUS_FINISHED = 2,
+};
+
+struct prepared_stmt {
+ pthread_mutex_t mtx;
+ sqlite3_stmt *impl;
+};
+
+static int prepared_stmt_init(struct prepared_stmt *stmt, sqlite3 *db, const char *sql)
+{
+ int ret = 0;
+
+ ret = pthread_mutex_init(&stmt->mtx, NULL);
+ if (ret) {
+ pthread_errno(ret, "pthread_mutex_init");
+ return ret;
+ }
+
+ ret = sqlite_prepare(db, sql, &stmt->impl);
+ if (ret < 0)
+ goto destroy_mtx;
+
+ return ret;
+
+destroy_mtx:
+ pthread_errno_if(pthread_mutex_destroy(&stmt->mtx), "pthread_mutex_destroy");
+
+ return ret;
+}
+
+static void prepared_stmt_destroy(struct prepared_stmt *stmt)
+{
+ pthread_errno_if(pthread_mutex_destroy(&stmt->mtx), "pthread_mutex_destroy");
+ sqlite_finalize(stmt->impl);
+}
+
+static int prepared_stmt_lock(struct prepared_stmt *stmt)
+{
+ int ret = pthread_mutex_lock(&stmt->mtx);
+ if (ret) {
+ pthread_errno(ret, "pthread_mutex_unlock");
+ return ret;
+ }
+ return ret;
+}
+
+static void prepared_stmt_unlock(struct prepared_stmt *stmt)
+{
+ pthread_errno_if(pthread_mutex_unlock(&stmt->mtx), "pthread_mutex_unlock");
+}
+
struct storage_sqlite {
sqlite3 *db;
- sqlite3_stmt *stmt_repo_find;
- sqlite3_stmt *stmt_repo_insert;
- sqlite3_stmt *stmt_run_insert;
- sqlite3_stmt *stmt_run_finished;
+ struct prepared_stmt stmt_repo_find;
+ struct prepared_stmt stmt_repo_insert;
+ struct prepared_stmt stmt_run_insert;
+ struct prepared_stmt stmt_run_finished;
};
static int storage_sqlite_upgrade_to(struct storage_sqlite *storage, size_t version)
@@ -151,43 +205,43 @@ static int storage_sqlite_prepare_statements(struct storage_sqlite *storage)
static const char *const fmt_repo_insert =
"INSERT INTO cimple_repos(url) VALUES (?) ON CONFLICT(url) DO NOTHING;";
static const char *const fmt_run_insert =
- "INSERT INTO cimple_runs(status, ec, output, repo_id, rev) VALUES (1, -1, x'', ?, ?) RETURNING id;";
+ "INSERT INTO cimple_runs(status, ec, output, repo_id, rev) VALUES (?, -1, x'', ?, ?) RETURNING id;";
static const char *const fmt_run_finished =
- "UPDATE cimple_runs SET status = 2, ec = ? WHERE id = ?;";
+ "UPDATE cimple_runs SET status = ?, ec = ? WHERE id = ?;";
int ret = 0;
- ret = sqlite_prepare(storage->db, fmt_repo_find, &storage->stmt_repo_find);
+ ret = prepared_stmt_init(&storage->stmt_repo_find, storage->db, fmt_repo_find);
if (ret < 0)
return ret;
- ret = sqlite_prepare(storage->db, fmt_repo_insert, &storage->stmt_repo_insert);
+ ret = prepared_stmt_init(&storage->stmt_repo_insert, storage->db, fmt_repo_insert);
if (ret < 0)
goto finalize_repo_find;
- ret = sqlite_prepare(storage->db, fmt_run_insert, &storage->stmt_run_insert);
+ ret = prepared_stmt_init(&storage->stmt_run_insert, storage->db, fmt_run_insert);
if (ret < 0)
goto finalize_repo_insert;
- ret = sqlite_prepare(storage->db, fmt_run_finished, &storage->stmt_run_finished);
+ ret = prepared_stmt_init(&storage->stmt_run_finished, storage->db, fmt_run_finished);
if (ret < 0)
goto finalize_run_insert;
return ret;
finalize_run_insert:
- sqlite_finalize(storage->stmt_run_insert);
+ prepared_stmt_destroy(&storage->stmt_run_insert);
finalize_repo_insert:
- sqlite_finalize(storage->stmt_repo_insert);
+ prepared_stmt_destroy(&storage->stmt_repo_insert);
finalize_repo_find:
- sqlite_finalize(storage->stmt_repo_find);
+ prepared_stmt_destroy(&storage->stmt_repo_find);
return ret;
}
static void storage_sqlite_finalize_statements(struct storage_sqlite *storage)
{
- sqlite_finalize(storage->stmt_run_finished);
- sqlite_finalize(storage->stmt_run_insert);
- sqlite_finalize(storage->stmt_repo_insert);
- sqlite_finalize(storage->stmt_repo_find);
+ prepared_stmt_destroy(&storage->stmt_run_finished);
+ prepared_stmt_destroy(&storage->stmt_run_insert);
+ prepared_stmt_destroy(&storage->stmt_repo_insert);
+ prepared_stmt_destroy(&storage->stmt_repo_find);
}
int storage_sqlite_create(struct storage *storage, const struct storage_settings *settings)
@@ -238,62 +292,75 @@ void storage_sqlite_destroy(struct storage *storage)
static int storage_sqlite_find_repo(struct storage_sqlite *storage, const char *url)
{
- sqlite3_stmt *stmt = storage->stmt_repo_find;
+ struct prepared_stmt *stmt = &storage->stmt_repo_find;
int ret = 0;
- ret = sqlite_bind_text(stmt, 1, url);
+ ret = prepared_stmt_lock(stmt);
+ if (ret < 0)
+ return ret;
+ ret = sqlite_bind_text(stmt->impl, 1, url);
if (ret < 0)
goto reset;
- ret = sqlite_step(stmt);
+ ret = sqlite_step(stmt->impl);
if (ret < 0)
goto reset;
if (!ret)
goto reset;
- ret = sqlite_column_int(stmt, 0);
+ ret = sqlite_column_int(stmt->impl, 0);
goto reset;
reset:
- sqlite_reset(stmt);
+ sqlite_reset(stmt->impl);
+ prepared_stmt_unlock(stmt);
return ret;
}
static int storage_sqlite_insert_repo(struct storage_sqlite *storage, const char *url)
{
- sqlite3_stmt *stmt = storage->stmt_repo_insert;
+ struct prepared_stmt *stmt = &storage->stmt_repo_insert;
int ret = 0;
- ret = sqlite_bind_text(stmt, 1, url);
+ ret = prepared_stmt_lock(stmt);
if (ret < 0)
- goto reset;
- ret = sqlite_step(stmt);
+ return ret;
+ ret = sqlite_bind_text(stmt->impl, 1, url);
if (ret < 0)
goto reset;
-
- ret = storage_sqlite_find_repo(storage, url);
+ ret = sqlite_step(stmt->impl);
if (ret < 0)
goto reset;
reset:
- sqlite_reset(stmt);
+ sqlite_reset(stmt->impl);
+ prepared_stmt_unlock(stmt);
- return ret;
+ if (ret < 0)
+ return ret;
+
+ return storage_sqlite_find_repo(storage, url);
}
static int storage_sqlite_insert_run(struct storage_sqlite *storage, int repo_id, const char *rev)
{
- sqlite3_stmt *stmt = storage->stmt_run_insert;
+ struct prepared_stmt *stmt = &storage->stmt_run_insert;
int ret = 0;
- ret = sqlite_bind_int(stmt, 1, repo_id);
+ ret = prepared_stmt_lock(stmt);
+ if (ret < 0)
+ return ret;
+ ret = sqlite_bind_int(stmt->impl, 1, RUN_STATUS_CREATED);
+ if (ret < 0)
+ goto reset;
+ ret = sqlite_bind_int(stmt->impl, 2, repo_id);
if (ret < 0)
goto reset;
- ret = sqlite_bind_text(stmt, 2, rev);
+ ret = sqlite_bind_text(stmt->impl, 3, rev);
if (ret < 0)
goto reset;
- ret = sqlite_step(stmt);
+ ret = sqlite_step(stmt->impl);
if (ret < 0)
goto reset;
@@ -303,11 +370,12 @@ static int storage_sqlite_insert_run(struct storage_sqlite *storage, int repo_id
goto reset;
}
- ret = sqlite_column_int(stmt, 0);
+ ret = sqlite_column_int(stmt->impl, 0);
goto reset;
reset:
- sqlite_reset(stmt);
+ sqlite_reset(stmt->impl);
+ prepared_stmt_unlock(stmt);
return ret;
}
@@ -329,21 +397,28 @@ int storage_sqlite_run_create(struct storage *storage, const char *repo_url, con
int storage_sqlite_run_finished(struct storage *storage, int run_id, int ec)
{
- sqlite3_stmt *stmt = storage->sqlite->stmt_run_finished;
+ struct prepared_stmt *stmt = &storage->sqlite->stmt_run_finished;
int ret = 0;
- ret = sqlite_bind_int(stmt, 1, ec);
+ ret = prepared_stmt_lock(stmt);
+ if (ret < 0)
+ return ret;
+ ret = sqlite_bind_int(stmt->impl, 1, RUN_STATUS_FINISHED);
if (ret < 0)
goto reset;
- ret = sqlite_bind_int(stmt, 2, run_id);
+ ret = sqlite_bind_int(stmt->impl, 2, ec);
if (ret < 0)
goto reset;
- ret = sqlite_step(stmt);
+ ret = sqlite_bind_int(stmt->impl, 3, run_id);
+ if (ret < 0)
+ goto reset;
+ ret = sqlite_step(stmt->impl);
if (ret < 0)
goto reset;
reset:
- sqlite_reset(stmt);
+ sqlite_reset(stmt->impl);
+ prepared_stmt_unlock(stmt);
return ret;
}
@@ -383,7 +458,7 @@ int storage_sqlite_get_run_queue(struct storage *storage, struct run_queue *queu
{
static const char *const fmt =
"SELECT cimple_runs.id, cimple_repos.url, cimple_runs.rev FROM cimple_runs"
- " INNER JOIN cimple_repos ON cimple_runs.repo_id = cimple_repos.id WHERE cimple_runs.status = 1;";
+ " INNER JOIN cimple_repos ON cimple_runs.repo_id = cimple_repos.id WHERE cimple_runs.status = ?;";
sqlite3_stmt *stmt;
int ret = 0;
@@ -391,6 +466,9 @@ int storage_sqlite_get_run_queue(struct storage *storage, struct run_queue *queu
ret = sqlite_prepare(storage->sqlite->db, fmt, &stmt);
if (ret < 0)
return ret;
+ ret = sqlite_bind_int(stmt, 1, RUN_STATUS_CREATED);
+ if (ret < 0)
+ goto finalize;
run_queue_create(queue);
diff --git a/test/py/conftest.py b/test/py/conftest.py
index 750e4e8..ead09d1 100644
--- a/test/py/conftest.py
+++ b/test/py/conftest.py
@@ -8,6 +8,7 @@ import os
from pytest import fixture
+from lib.db import Database
from lib.net import random_unused_port
from lib.process import CmdLine
from lib.test_repo import TestRepo
@@ -190,3 +191,8 @@ def client(client_cmd):
@fixture
def test_repo(tmp_path):
return TestRepo(tmp_path)
+
+
+@fixture
+def sqlite_db(server, sqlite_path):
+ return Database(sqlite_path)
diff --git a/test/py/lib/db.py b/test/py/lib/db.py
new file mode 100644
index 0000000..b9059c5
--- /dev/null
+++ b/test/py/lib/db.py
@@ -0,0 +1,28 @@
+# Copyright (c) 2023 Egor Tensin <Egor.Tensin@gmail.com>
+# This file is part of the "cimple" project.
+# For details, see https://github.com/egor-tensin/cimple.
+# Distributed under the MIT License.
+
+from contextlib import closing, contextmanager
+import sqlite3
+
+
+class Database:
+ def __init__(self, path):
+ self.conn = sqlite3.connect(f'file:{path}?mode=ro')
+
+ def __enter__(self):
+ return self
+
+ def __exit__(*args):
+ self.conn.close()
+
+ @contextmanager
+ def get_cursor(self):
+ with closing(self.conn.cursor()) as cur:
+ yield cur
+
+ def get_all_runs(self):
+ with self.get_cursor() as cur:
+ cur.execute('SELECT * FROM cimple_runs_readable')
+ return cur.fetchall()
diff --git a/test/py/test_repo.py b/test/py/test_repo.py
index 782240b..1cc1434 100644
--- a/test/py/test_repo.py
+++ b/test/py/test_repo.py
@@ -4,6 +4,7 @@
# Distributed under the MIT License.
from multiprocessing import Process
+import re
import pytest
@@ -14,10 +15,11 @@ class LoggingEventRunComplete(LoggingEvent):
def __init__(self, target):
self.counter = 0
self.target = target
+ self.re = re.compile(r'run \d+ as finished')
super().__init__(timeout=60)
def log_line_matches(self, line):
- return 'Received a "run finished" message from worker' in line
+ return bool(self.re.search(line))
def set(self):
self.counter += 1
@@ -25,7 +27,7 @@ class LoggingEventRunComplete(LoggingEvent):
super().set()
-def _test_repo_internal(server_and_workers, test_repo, client, numof_processes, runs_per_process):
+def _test_repo_internal(server_and_workers, test_repo, client, numof_processes, runs_per_process, db):
numof_runs = numof_processes * runs_per_process
server, workers = server_and_workers
@@ -47,24 +49,30 @@ def _test_repo_internal(server_and_workers, test_repo, client, numof_processes,
event.wait()
assert numof_runs == test_repo.count_ci_output_files()
+ runs = db.get_all_runs()
+ assert numof_runs == len(runs)
-def test_repo_1_client_1_run(server_and_workers, test_repo, client):
- _test_repo_internal(server_and_workers, test_repo, client, 1, 1)
+ for id, status, ec, output, url, rev in runs:
+ assert status == 'finished', f'Invalid status for run {id}: {status}'
-def test_repo_1_client_2_runs(server_and_workers, test_repo, client):
- _test_repo_internal(server_and_workers, test_repo, client, 1, 2)
+def test_repo_1_client_1_run(server_and_workers, test_repo, client, sqlite_db):
+ _test_repo_internal(server_and_workers, test_repo, client, 1, 1, sqlite_db)
-def test_repo_1_client_10_runs(server_and_workers, test_repo, client):
- _test_repo_internal(server_and_workers, test_repo, client, 1, 10)
+def test_repo_1_client_2_runs(server_and_workers, test_repo, client, sqlite_db):
+ _test_repo_internal(server_and_workers, test_repo, client, 1, 2, sqlite_db)
+
+
+def test_repo_1_client_10_runs(server_and_workers, test_repo, client, sqlite_db):
+ _test_repo_internal(server_and_workers, test_repo, client, 1, 10, sqlite_db)
@pytest.mark.stress
-def test_repo_1_client_2000_runs(server_and_workers, test_repo, client):
- _test_repo_internal(server_and_workers, test_repo, client, 1, 2000)
+def test_repo_1_client_2000_runs(server_and_workers, test_repo, client, sqlite_db):
+ _test_repo_internal(server_and_workers, test_repo, client, 1, 2000, sqlite_db)
@pytest.mark.stress
-def test_repo_4_clients_500_runs(server_and_workers, test_repo, client):
- _test_repo_internal(server_and_workers, test_repo, client, 4, 500)
+def test_repo_4_clients_500_runs(server_and_workers, test_repo, client, sqlite_db):
+ _test_repo_internal(server_and_workers, test_repo, client, 4, 500, sqlite_db)