aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/server.c23
-rw-r--r--src/sqlite/v01.sql6
-rw-r--r--src/storage_sqlite.c162
3 files changed, 137 insertions, 54 deletions
diff --git a/src/server.c b/src/server.c
index da9cfc3..a4358d5 100644
--- a/src/server.c
+++ b/src/server.c
@@ -125,24 +125,21 @@ static int server_enqueue_run(struct server *server, struct run *run)
{
int ret = 0;
- ret = server_lock(server);
+ ret = storage_run_create(&server->storage, run_get_url(run), run_get_rev(run));
if (ret < 0)
return ret;
+ run_set_id(run, ret);
- ret = storage_run_create(&server->storage, run_get_url(run), run_get_rev(run));
+ ret = server_lock(server);
if (ret < 0)
- goto unlock;
- run_set_id(run, ret);
+ return ret;
run_queue_add_last(&server->run_queue, run);
log("Added a new run %d for repository %s to the queue\n", run_get_id(run),
run_get_url(run));
server_notify(server);
-
-unlock:
server_unlock(server);
-
return ret;
}
@@ -297,9 +294,7 @@ static int server_handle_cmd_finished(const struct msg *request, UNUSED struct m
struct server *server = (struct server *)ctx->arg;
int ret = 0;
- log("Received a \"run finished\" message from worker %d\n", ctx->fd);
-
- int run_id;
+ int run_id = 0;
struct proc_output output;
ret = msg_finished_parse(request, &run_id, &output);
@@ -307,12 +302,16 @@ static int server_handle_cmd_finished(const struct msg *request, UNUSED struct m
return ret;
ret = storage_run_finished(&server->storage, run_id, output.ec);
- proc_output_free(&output);
if (ret < 0) {
log_err("Failed to mark run %d as finished\n", run_id);
- return ret;
+ goto free_output;
}
+ log("Marked run %d as finished\n", run_id);
+
+free_output:
+ proc_output_free(&output);
+
return ret;
}
diff --git a/src/sqlite/v01.sql b/src/sqlite/v01.sql
index 454c19e..b37e486 100644
--- a/src/sqlite/v01.sql
+++ b/src/sqlite/v01.sql
@@ -29,3 +29,9 @@ CREATE TABLE cimple_runs (
CREATE INDEX cimple_runs_index_status ON cimple_runs(status);
CREATE INDEX cimple_runs_index_repo_id ON cimple_runs(repo_id);
+
+CREATE VIEW cimple_runs_readable AS
+ SELECT run.id, status.label, run.ec, run.output, repo.url, run.rev
+ FROM cimple_runs AS run
+ INNER JOIN cimple_run_status as status ON run.status = status.id
+ INNER JOIN cimple_repos as repo ON run.repo_id = repo.id;
diff --git a/src/storage_sqlite.c b/src/storage_sqlite.c
index 8de9c38..2ac8b30 100644
--- a/src/storage_sqlite.c
+++ b/src/storage_sqlite.c
@@ -14,6 +14,7 @@
#include <sqlite3.h>
+#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
@@ -52,13 +53,66 @@ void storage_sqlite_settings_destroy(const struct storage_settings *settings)
free(settings->sqlite);
}
+enum run_status {
+ RUN_STATUS_CREATED = 1,
+ RUN_STATUS_FINISHED = 2,
+};
+
+struct prepared_stmt {
+ pthread_mutex_t mtx;
+ sqlite3_stmt *impl;
+};
+
+static int prepared_stmt_init(struct prepared_stmt *stmt, sqlite3 *db, const char *sql)
+{
+ int ret = 0;
+
+ ret = pthread_mutex_init(&stmt->mtx, NULL);
+ if (ret) {
+ pthread_errno(ret, "pthread_mutex_init");
+ return ret;
+ }
+
+ ret = sqlite_prepare(db, sql, &stmt->impl);
+ if (ret < 0)
+ goto destroy_mtx;
+
+ return ret;
+
+destroy_mtx:
+ pthread_errno_if(pthread_mutex_destroy(&stmt->mtx), "pthread_mutex_destroy");
+
+ return ret;
+}
+
+static void prepared_stmt_destroy(struct prepared_stmt *stmt)
+{
+ pthread_errno_if(pthread_mutex_destroy(&stmt->mtx), "pthread_mutex_destroy");
+ sqlite_finalize(stmt->impl);
+}
+
+static int prepared_stmt_lock(struct prepared_stmt *stmt)
+{
+ int ret = pthread_mutex_lock(&stmt->mtx);
+ if (ret) {
+ pthread_errno(ret, "pthread_mutex_unlock");
+ return ret;
+ }
+ return ret;
+}
+
+static void prepared_stmt_unlock(struct prepared_stmt *stmt)
+{
+ pthread_errno_if(pthread_mutex_unlock(&stmt->mtx), "pthread_mutex_unlock");
+}
+
struct storage_sqlite {
sqlite3 *db;
- sqlite3_stmt *stmt_repo_find;
- sqlite3_stmt *stmt_repo_insert;
- sqlite3_stmt *stmt_run_insert;
- sqlite3_stmt *stmt_run_finished;
+ struct prepared_stmt stmt_repo_find;
+ struct prepared_stmt stmt_repo_insert;
+ struct prepared_stmt stmt_run_insert;
+ struct prepared_stmt stmt_run_finished;
};
static int storage_sqlite_upgrade_to(struct storage_sqlite *storage, size_t version)
@@ -151,43 +205,43 @@ static int storage_sqlite_prepare_statements(struct storage_sqlite *storage)
static const char *const fmt_repo_insert =
"INSERT INTO cimple_repos(url) VALUES (?) ON CONFLICT(url) DO NOTHING;";
static const char *const fmt_run_insert =
- "INSERT INTO cimple_runs(status, ec, output, repo_id, rev) VALUES (1, -1, x'', ?, ?) RETURNING id;";
+ "INSERT INTO cimple_runs(status, ec, output, repo_id, rev) VALUES (?, -1, x'', ?, ?) RETURNING id;";
static const char *const fmt_run_finished =
- "UPDATE cimple_runs SET status = 2, ec = ? WHERE id = ?;";
+ "UPDATE cimple_runs SET status = ?, ec = ? WHERE id = ?;";
int ret = 0;
- ret = sqlite_prepare(storage->db, fmt_repo_find, &storage->stmt_repo_find);
+ ret = prepared_stmt_init(&storage->stmt_repo_find, storage->db, fmt_repo_find);
if (ret < 0)
return ret;
- ret = sqlite_prepare(storage->db, fmt_repo_insert, &storage->stmt_repo_insert);
+ ret = prepared_stmt_init(&storage->stmt_repo_insert, storage->db, fmt_repo_insert);
if (ret < 0)
goto finalize_repo_find;
- ret = sqlite_prepare(storage->db, fmt_run_insert, &storage->stmt_run_insert);
+ ret = prepared_stmt_init(&storage->stmt_run_insert, storage->db, fmt_run_insert);
if (ret < 0)
goto finalize_repo_insert;
- ret = sqlite_prepare(storage->db, fmt_run_finished, &storage->stmt_run_finished);
+ ret = prepared_stmt_init(&storage->stmt_run_finished, storage->db, fmt_run_finished);
if (ret < 0)
goto finalize_run_insert;
return ret;
finalize_run_insert:
- sqlite_finalize(storage->stmt_run_insert);
+ prepared_stmt_destroy(&storage->stmt_run_insert);
finalize_repo_insert:
- sqlite_finalize(storage->stmt_repo_insert);
+ prepared_stmt_destroy(&storage->stmt_repo_insert);
finalize_repo_find:
- sqlite_finalize(storage->stmt_repo_find);
+ prepared_stmt_destroy(&storage->stmt_repo_find);
return ret;
}
static void storage_sqlite_finalize_statements(struct storage_sqlite *storage)
{
- sqlite_finalize(storage->stmt_run_finished);
- sqlite_finalize(storage->stmt_run_insert);
- sqlite_finalize(storage->stmt_repo_insert);
- sqlite_finalize(storage->stmt_repo_find);
+ prepared_stmt_destroy(&storage->stmt_run_finished);
+ prepared_stmt_destroy(&storage->stmt_run_insert);
+ prepared_stmt_destroy(&storage->stmt_repo_insert);
+ prepared_stmt_destroy(&storage->stmt_repo_find);
}
int storage_sqlite_create(struct storage *storage, const struct storage_settings *settings)
@@ -238,62 +292,75 @@ void storage_sqlite_destroy(struct storage *storage)
static int storage_sqlite_find_repo(struct storage_sqlite *storage, const char *url)
{
- sqlite3_stmt *stmt = storage->stmt_repo_find;
+ struct prepared_stmt *stmt = &storage->stmt_repo_find;
int ret = 0;
- ret = sqlite_bind_text(stmt, 1, url);
+ ret = prepared_stmt_lock(stmt);
+ if (ret < 0)
+ return ret;
+ ret = sqlite_bind_text(stmt->impl, 1, url);
if (ret < 0)
goto reset;
- ret = sqlite_step(stmt);
+ ret = sqlite_step(stmt->impl);
if (ret < 0)
goto reset;
if (!ret)
goto reset;
- ret = sqlite_column_int(stmt, 0);
+ ret = sqlite_column_int(stmt->impl, 0);
goto reset;
reset:
- sqlite_reset(stmt);
+ sqlite_reset(stmt->impl);
+ prepared_stmt_unlock(stmt);
return ret;
}
static int storage_sqlite_insert_repo(struct storage_sqlite *storage, const char *url)
{
- sqlite3_stmt *stmt = storage->stmt_repo_insert;
+ struct prepared_stmt *stmt = &storage->stmt_repo_insert;
int ret = 0;
- ret = sqlite_bind_text(stmt, 1, url);
+ ret = prepared_stmt_lock(stmt);
if (ret < 0)
- goto reset;
- ret = sqlite_step(stmt);
+ return ret;
+ ret = sqlite_bind_text(stmt->impl, 1, url);
if (ret < 0)
goto reset;
-
- ret = storage_sqlite_find_repo(storage, url);
+ ret = sqlite_step(stmt->impl);
if (ret < 0)
goto reset;
reset:
- sqlite_reset(stmt);
+ sqlite_reset(stmt->impl);
+ prepared_stmt_unlock(stmt);
- return ret;
+ if (ret < 0)
+ return ret;
+
+ return storage_sqlite_find_repo(storage, url);
}
static int storage_sqlite_insert_run(struct storage_sqlite *storage, int repo_id, const char *rev)
{
- sqlite3_stmt *stmt = storage->stmt_run_insert;
+ struct prepared_stmt *stmt = &storage->stmt_run_insert;
int ret = 0;
- ret = sqlite_bind_int(stmt, 1, repo_id);
+ ret = prepared_stmt_lock(stmt);
+ if (ret < 0)
+ return ret;
+ ret = sqlite_bind_int(stmt->impl, 1, RUN_STATUS_CREATED);
+ if (ret < 0)
+ goto reset;
+ ret = sqlite_bind_int(stmt->impl, 2, repo_id);
if (ret < 0)
goto reset;
- ret = sqlite_bind_text(stmt, 2, rev);
+ ret = sqlite_bind_text(stmt->impl, 3, rev);
if (ret < 0)
goto reset;
- ret = sqlite_step(stmt);
+ ret = sqlite_step(stmt->impl);
if (ret < 0)
goto reset;
@@ -303,11 +370,12 @@ static int storage_sqlite_insert_run(struct storage_sqlite *storage, int repo_id
goto reset;
}
- ret = sqlite_column_int(stmt, 0);
+ ret = sqlite_column_int(stmt->impl, 0);
goto reset;
reset:
- sqlite_reset(stmt);
+ sqlite_reset(stmt->impl);
+ prepared_stmt_unlock(stmt);
return ret;
}
@@ -329,21 +397,28 @@ int storage_sqlite_run_create(struct storage *storage, const char *repo_url, con
int storage_sqlite_run_finished(struct storage *storage, int run_id, int ec)
{
- sqlite3_stmt *stmt = storage->sqlite->stmt_run_finished;
+ struct prepared_stmt *stmt = &storage->sqlite->stmt_run_finished;
int ret = 0;
- ret = sqlite_bind_int(stmt, 1, ec);
+ ret = prepared_stmt_lock(stmt);
+ if (ret < 0)
+ return ret;
+ ret = sqlite_bind_int(stmt->impl, 1, RUN_STATUS_FINISHED);
if (ret < 0)
goto reset;
- ret = sqlite_bind_int(stmt, 2, run_id);
+ ret = sqlite_bind_int(stmt->impl, 2, ec);
if (ret < 0)
goto reset;
- ret = sqlite_step(stmt);
+ ret = sqlite_bind_int(stmt->impl, 3, run_id);
+ if (ret < 0)
+ goto reset;
+ ret = sqlite_step(stmt->impl);
if (ret < 0)
goto reset;
reset:
- sqlite_reset(stmt);
+ sqlite_reset(stmt->impl);
+ prepared_stmt_unlock(stmt);
return ret;
}
@@ -383,7 +458,7 @@ int storage_sqlite_get_run_queue(struct storage *storage, struct run_queue *queu
{
static const char *const fmt =
"SELECT cimple_runs.id, cimple_repos.url, cimple_runs.rev FROM cimple_runs"
- " INNER JOIN cimple_repos ON cimple_runs.repo_id = cimple_repos.id WHERE cimple_runs.status = 1;";
+ " INNER JOIN cimple_repos ON cimple_runs.repo_id = cimple_repos.id WHERE cimple_runs.status = ?;";
sqlite3_stmt *stmt;
int ret = 0;
@@ -391,6 +466,9 @@ int storage_sqlite_get_run_queue(struct storage *storage, struct run_queue *queu
ret = sqlite_prepare(storage->sqlite->db, fmt, &stmt);
if (ret < 0)
return ret;
+ ret = sqlite_bind_int(stmt, 1, RUN_STATUS_CREATED);
+ if (ret < 0)
+ goto finalize;
run_queue_create(queue);