diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/server.c | 23 | ||||
-rw-r--r-- | src/sqlite/v01.sql | 6 | ||||
-rw-r--r-- | src/storage_sqlite.c | 162 |
3 files changed, 137 insertions, 54 deletions
diff --git a/src/server.c b/src/server.c index da9cfc3..a4358d5 100644 --- a/src/server.c +++ b/src/server.c @@ -125,24 +125,21 @@ static int server_enqueue_run(struct server *server, struct run *run) { int ret = 0; - ret = server_lock(server); + ret = storage_run_create(&server->storage, run_get_url(run), run_get_rev(run)); if (ret < 0) return ret; + run_set_id(run, ret); - ret = storage_run_create(&server->storage, run_get_url(run), run_get_rev(run)); + ret = server_lock(server); if (ret < 0) - goto unlock; - run_set_id(run, ret); + return ret; run_queue_add_last(&server->run_queue, run); log("Added a new run %d for repository %s to the queue\n", run_get_id(run), run_get_url(run)); server_notify(server); - -unlock: server_unlock(server); - return ret; } @@ -297,9 +294,7 @@ static int server_handle_cmd_finished(const struct msg *request, UNUSED struct m struct server *server = (struct server *)ctx->arg; int ret = 0; - log("Received a \"run finished\" message from worker %d\n", ctx->fd); - - int run_id; + int run_id = 0; struct proc_output output; ret = msg_finished_parse(request, &run_id, &output); @@ -307,12 +302,16 @@ static int server_handle_cmd_finished(const struct msg *request, UNUSED struct m return ret; ret = storage_run_finished(&server->storage, run_id, output.ec); - proc_output_free(&output); if (ret < 0) { log_err("Failed to mark run %d as finished\n", run_id); - return ret; + goto free_output; } + log("Marked run %d as finished\n", run_id); + +free_output: + proc_output_free(&output); + return ret; } diff --git a/src/sqlite/v01.sql b/src/sqlite/v01.sql index 454c19e..b37e486 100644 --- a/src/sqlite/v01.sql +++ b/src/sqlite/v01.sql @@ -29,3 +29,9 @@ CREATE TABLE cimple_runs ( CREATE INDEX cimple_runs_index_status ON cimple_runs(status); CREATE INDEX cimple_runs_index_repo_id ON cimple_runs(repo_id); + +CREATE VIEW cimple_runs_readable AS + SELECT run.id, status.label, run.ec, run.output, repo.url, run.rev + FROM cimple_runs AS run + INNER JOIN cimple_run_status as status ON run.status = status.id + INNER JOIN cimple_repos as repo ON run.repo_id = repo.id; diff --git a/src/storage_sqlite.c b/src/storage_sqlite.c index 8de9c38..2ac8b30 100644 --- a/src/storage_sqlite.c +++ b/src/storage_sqlite.c @@ -14,6 +14,7 @@ #include <sqlite3.h> +#include <pthread.h> #include <stdio.h> #include <stdlib.h> #include <string.h> @@ -52,13 +53,66 @@ void storage_sqlite_settings_destroy(const struct storage_settings *settings) free(settings->sqlite); } +enum run_status { + RUN_STATUS_CREATED = 1, + RUN_STATUS_FINISHED = 2, +}; + +struct prepared_stmt { + pthread_mutex_t mtx; + sqlite3_stmt *impl; +}; + +static int prepared_stmt_init(struct prepared_stmt *stmt, sqlite3 *db, const char *sql) +{ + int ret = 0; + + ret = pthread_mutex_init(&stmt->mtx, NULL); + if (ret) { + pthread_errno(ret, "pthread_mutex_init"); + return ret; + } + + ret = sqlite_prepare(db, sql, &stmt->impl); + if (ret < 0) + goto destroy_mtx; + + return ret; + +destroy_mtx: + pthread_errno_if(pthread_mutex_destroy(&stmt->mtx), "pthread_mutex_destroy"); + + return ret; +} + +static void prepared_stmt_destroy(struct prepared_stmt *stmt) +{ + pthread_errno_if(pthread_mutex_destroy(&stmt->mtx), "pthread_mutex_destroy"); + sqlite_finalize(stmt->impl); +} + +static int prepared_stmt_lock(struct prepared_stmt *stmt) +{ + int ret = pthread_mutex_lock(&stmt->mtx); + if (ret) { + pthread_errno(ret, "pthread_mutex_unlock"); + return ret; + } + return ret; +} + +static void prepared_stmt_unlock(struct prepared_stmt *stmt) +{ + pthread_errno_if(pthread_mutex_unlock(&stmt->mtx), "pthread_mutex_unlock"); +} + struct storage_sqlite { sqlite3 *db; - sqlite3_stmt *stmt_repo_find; - sqlite3_stmt *stmt_repo_insert; - sqlite3_stmt *stmt_run_insert; - sqlite3_stmt *stmt_run_finished; + struct prepared_stmt stmt_repo_find; + struct prepared_stmt stmt_repo_insert; + struct prepared_stmt stmt_run_insert; + struct prepared_stmt stmt_run_finished; }; static int storage_sqlite_upgrade_to(struct storage_sqlite *storage, size_t version) @@ -151,43 +205,43 @@ static int storage_sqlite_prepare_statements(struct storage_sqlite *storage) static const char *const fmt_repo_insert = "INSERT INTO cimple_repos(url) VALUES (?) ON CONFLICT(url) DO NOTHING;"; static const char *const fmt_run_insert = - "INSERT INTO cimple_runs(status, ec, output, repo_id, rev) VALUES (1, -1, x'', ?, ?) RETURNING id;"; + "INSERT INTO cimple_runs(status, ec, output, repo_id, rev) VALUES (?, -1, x'', ?, ?) RETURNING id;"; static const char *const fmt_run_finished = - "UPDATE cimple_runs SET status = 2, ec = ? WHERE id = ?;"; + "UPDATE cimple_runs SET status = ?, ec = ? WHERE id = ?;"; int ret = 0; - ret = sqlite_prepare(storage->db, fmt_repo_find, &storage->stmt_repo_find); + ret = prepared_stmt_init(&storage->stmt_repo_find, storage->db, fmt_repo_find); if (ret < 0) return ret; - ret = sqlite_prepare(storage->db, fmt_repo_insert, &storage->stmt_repo_insert); + ret = prepared_stmt_init(&storage->stmt_repo_insert, storage->db, fmt_repo_insert); if (ret < 0) goto finalize_repo_find; - ret = sqlite_prepare(storage->db, fmt_run_insert, &storage->stmt_run_insert); + ret = prepared_stmt_init(&storage->stmt_run_insert, storage->db, fmt_run_insert); if (ret < 0) goto finalize_repo_insert; - ret = sqlite_prepare(storage->db, fmt_run_finished, &storage->stmt_run_finished); + ret = prepared_stmt_init(&storage->stmt_run_finished, storage->db, fmt_run_finished); if (ret < 0) goto finalize_run_insert; return ret; finalize_run_insert: - sqlite_finalize(storage->stmt_run_insert); + prepared_stmt_destroy(&storage->stmt_run_insert); finalize_repo_insert: - sqlite_finalize(storage->stmt_repo_insert); + prepared_stmt_destroy(&storage->stmt_repo_insert); finalize_repo_find: - sqlite_finalize(storage->stmt_repo_find); + prepared_stmt_destroy(&storage->stmt_repo_find); return ret; } static void storage_sqlite_finalize_statements(struct storage_sqlite *storage) { - sqlite_finalize(storage->stmt_run_finished); - sqlite_finalize(storage->stmt_run_insert); - sqlite_finalize(storage->stmt_repo_insert); - sqlite_finalize(storage->stmt_repo_find); + prepared_stmt_destroy(&storage->stmt_run_finished); + prepared_stmt_destroy(&storage->stmt_run_insert); + prepared_stmt_destroy(&storage->stmt_repo_insert); + prepared_stmt_destroy(&storage->stmt_repo_find); } int storage_sqlite_create(struct storage *storage, const struct storage_settings *settings) @@ -238,62 +292,75 @@ void storage_sqlite_destroy(struct storage *storage) static int storage_sqlite_find_repo(struct storage_sqlite *storage, const char *url) { - sqlite3_stmt *stmt = storage->stmt_repo_find; + struct prepared_stmt *stmt = &storage->stmt_repo_find; int ret = 0; - ret = sqlite_bind_text(stmt, 1, url); + ret = prepared_stmt_lock(stmt); + if (ret < 0) + return ret; + ret = sqlite_bind_text(stmt->impl, 1, url); if (ret < 0) goto reset; - ret = sqlite_step(stmt); + ret = sqlite_step(stmt->impl); if (ret < 0) goto reset; if (!ret) goto reset; - ret = sqlite_column_int(stmt, 0); + ret = sqlite_column_int(stmt->impl, 0); goto reset; reset: - sqlite_reset(stmt); + sqlite_reset(stmt->impl); + prepared_stmt_unlock(stmt); return ret; } static int storage_sqlite_insert_repo(struct storage_sqlite *storage, const char *url) { - sqlite3_stmt *stmt = storage->stmt_repo_insert; + struct prepared_stmt *stmt = &storage->stmt_repo_insert; int ret = 0; - ret = sqlite_bind_text(stmt, 1, url); + ret = prepared_stmt_lock(stmt); if (ret < 0) - goto reset; - ret = sqlite_step(stmt); + return ret; + ret = sqlite_bind_text(stmt->impl, 1, url); if (ret < 0) goto reset; - - ret = storage_sqlite_find_repo(storage, url); + ret = sqlite_step(stmt->impl); if (ret < 0) goto reset; reset: - sqlite_reset(stmt); + sqlite_reset(stmt->impl); + prepared_stmt_unlock(stmt); - return ret; + if (ret < 0) + return ret; + + return storage_sqlite_find_repo(storage, url); } static int storage_sqlite_insert_run(struct storage_sqlite *storage, int repo_id, const char *rev) { - sqlite3_stmt *stmt = storage->stmt_run_insert; + struct prepared_stmt *stmt = &storage->stmt_run_insert; int ret = 0; - ret = sqlite_bind_int(stmt, 1, repo_id); + ret = prepared_stmt_lock(stmt); + if (ret < 0) + return ret; + ret = sqlite_bind_int(stmt->impl, 1, RUN_STATUS_CREATED); + if (ret < 0) + goto reset; + ret = sqlite_bind_int(stmt->impl, 2, repo_id); if (ret < 0) goto reset; - ret = sqlite_bind_text(stmt, 2, rev); + ret = sqlite_bind_text(stmt->impl, 3, rev); if (ret < 0) goto reset; - ret = sqlite_step(stmt); + ret = sqlite_step(stmt->impl); if (ret < 0) goto reset; @@ -303,11 +370,12 @@ static int storage_sqlite_insert_run(struct storage_sqlite *storage, int repo_id goto reset; } - ret = sqlite_column_int(stmt, 0); + ret = sqlite_column_int(stmt->impl, 0); goto reset; reset: - sqlite_reset(stmt); + sqlite_reset(stmt->impl); + prepared_stmt_unlock(stmt); return ret; } @@ -329,21 +397,28 @@ int storage_sqlite_run_create(struct storage *storage, const char *repo_url, con int storage_sqlite_run_finished(struct storage *storage, int run_id, int ec) { - sqlite3_stmt *stmt = storage->sqlite->stmt_run_finished; + struct prepared_stmt *stmt = &storage->sqlite->stmt_run_finished; int ret = 0; - ret = sqlite_bind_int(stmt, 1, ec); + ret = prepared_stmt_lock(stmt); + if (ret < 0) + return ret; + ret = sqlite_bind_int(stmt->impl, 1, RUN_STATUS_FINISHED); if (ret < 0) goto reset; - ret = sqlite_bind_int(stmt, 2, run_id); + ret = sqlite_bind_int(stmt->impl, 2, ec); if (ret < 0) goto reset; - ret = sqlite_step(stmt); + ret = sqlite_bind_int(stmt->impl, 3, run_id); + if (ret < 0) + goto reset; + ret = sqlite_step(stmt->impl); if (ret < 0) goto reset; reset: - sqlite_reset(stmt); + sqlite_reset(stmt->impl); + prepared_stmt_unlock(stmt); return ret; } @@ -383,7 +458,7 @@ int storage_sqlite_get_run_queue(struct storage *storage, struct run_queue *queu { static const char *const fmt = "SELECT cimple_runs.id, cimple_repos.url, cimple_runs.rev FROM cimple_runs" - " INNER JOIN cimple_repos ON cimple_runs.repo_id = cimple_repos.id WHERE cimple_runs.status = 1;"; + " INNER JOIN cimple_repos ON cimple_runs.repo_id = cimple_repos.id WHERE cimple_runs.status = ?;"; sqlite3_stmt *stmt; int ret = 0; @@ -391,6 +466,9 @@ int storage_sqlite_get_run_queue(struct storage *storage, struct run_queue *queu ret = sqlite_prepare(storage->sqlite->db, fmt, &stmt); if (ret < 0) return ret; + ret = sqlite_bind_int(stmt, 1, RUN_STATUS_CREATED); + if (ret < 0) + goto finalize; run_queue_create(queue); |