From 32bea4675dd751c0d07aa1f348b1b7201794d884 Mon Sep 17 00:00:00 2001 From: Egor Tensin Date: Tue, 4 Jul 2023 16:19:32 +0200 Subject: sqlite: store new runs in SQLite --- src/run_queue.c | 17 +++++- src/run_queue.h | 5 +- src/server.c | 17 ++++-- src/sqlite.c | 38 +++++++++++++ src/sqlite.h | 4 ++ src/sqlite/v01.sql | 17 +++--- src/storage.c | 14 +++++ src/storage.h | 2 + src/storage_sqlite.c | 152 ++++++++++++++++++++++++++++++++++++++++++++++++++- src/storage_sqlite.h | 2 + 10 files changed, 251 insertions(+), 17 deletions(-) diff --git a/src/run_queue.c b/src/run_queue.c index a00f986..8bfa816 100644 --- a/src/run_queue.c +++ b/src/run_queue.c @@ -16,10 +16,11 @@ struct run { char *url; char *rev; + int id; SIMPLEQ_ENTRY(run) entries; }; -int run_create(struct run **_entry, const char *_url, const char *_rev) +int run_create(struct run **_entry, const char *_url, const char *_rev, int id) { struct run *entry = malloc(sizeof(struct run)); if (!entry) { @@ -41,6 +42,7 @@ int run_create(struct run **_entry, const char *_url, const char *_rev) entry->url = url; entry->rev = rev; + entry->id = id; *_entry = entry; return 0; @@ -66,7 +68,8 @@ int run_from_msg(struct run **run, const struct msg *msg) } const char **argv = msg_get_strings(msg); - return run_create(run, argv[1], argv[2]); + /* We don't know the ID yet. */ + return run_create(run, argv[1], argv[2], 0); } void run_destroy(struct run *entry) @@ -86,6 +89,16 @@ const char *run_get_rev(const struct run *entry) return entry->rev; } +int run_get_id(const struct run *entry) +{ + return entry->id; +} + +void run_set_id(struct run *entry, int id) +{ + entry->id = id; +} + void run_queue_create(struct run_queue *queue) { SIMPLEQ_INIT(queue); diff --git a/src/run_queue.h b/src/run_queue.h index 59bc71e..7f07819 100644 --- a/src/run_queue.h +++ b/src/run_queue.h @@ -14,12 +14,15 @@ struct run; -int run_create(struct run **, const char *url, const char *rev); +int run_create(struct run **, const char *url, const char *rev, int id); int run_from_msg(struct run **, const struct msg *); void run_destroy(struct run *); const char *run_get_url(const struct run *); const char *run_get_rev(const struct run *); +int run_get_id(const struct run *); + +void run_set_id(struct run *, int id); SIMPLEQ_HEAD(run_queue, run); diff --git a/src/server.c b/src/server.c index 18e8391..678f1c9 100644 --- a/src/server.c +++ b/src/server.c @@ -125,11 +125,20 @@ static int server_enqueue_run(struct server *server, struct run *run) if (ret < 0) return ret; + ret = storage_run_create(&server->storage, run_get_url(run), run_get_rev(run)); + if (ret < 0) + goto unlock; + run_set_id(run, ret); + run_queue_add_last(&server->run_queue, run); - log("Added a new run for repository %s to the queue\n", run_get_url(run)); + log("Added a new run %d for repository %s to the queue\n", run_get_id(run), + run_get_url(run)); server_notify(server); + +unlock: server_unlock(server); + return ret; } @@ -154,7 +163,7 @@ static int server_wait_for_action(struct server *server) static void server_assign_run(struct server *server) { struct run *run = run_queue_remove_first(&server->run_queue); - log("Removed run for repository %s from the queue\n", run_get_url(run)); + log("Removed run %d for repository %s from the queue\n", run_get_id(run), run_get_url(run)); struct worker *worker = worker_queue_remove_first(&server->worker_queue); log("Removed worker %d from the queue\n", worker_get_fd(worker)); @@ -167,8 +176,8 @@ static void server_assign_run(struct server *server) run_get_url(run), worker_get_fd(worker)); run_queue_add_first(&server->run_queue, run); } else { - log("Assigned run for repository %s to worker %d\n", run_get_url(run), - worker_get_fd(worker)); + log("Assigned run %d for repository %s to worker %d\n", run_get_id(run), + run_get_url(run), worker_get_fd(worker)); run_destroy(run); } diff --git a/src/sqlite.c b/src/sqlite.c index 7a2e482..88ec7bf 100644 --- a/src/sqlite.c +++ b/src/sqlite.c @@ -111,6 +111,12 @@ int sqlite_prepare(sqlite3 *db, const char *stmt, sqlite3_stmt **result) return ret; } +void sqlite_reset(sqlite3_stmt *stmt) +{ + sqlite_errno_if(sqlite3_reset(stmt), "sqlite3_reset"); + sqlite_errno_if(sqlite3_clear_bindings(stmt), "sqlite3_clear_bindings"); +} + void sqlite_finalize(sqlite3_stmt *stmt) { sqlite_errno_if(sqlite3_finalize(stmt), "sqlite3_finalize"); @@ -124,6 +130,7 @@ int sqlite_step(sqlite3_stmt *stmt) switch (ret) { case SQLITE_ROW: + return 1; case SQLITE_DONE: return 0; @@ -198,6 +205,32 @@ int sqlite_column_blob(sqlite3_stmt *stmt, int index, unsigned char **_result) return 0; } +int sqlite_bind_int(sqlite3_stmt *stmt, int index, int value) +{ + int ret = 0; + + ret = sqlite3_bind_int(stmt, index, value); + if (ret) { + sqlite_errno(ret, "sqlite3_bind_int"); + return ret; + } + + return ret; +} + +int sqlite_bind_text(sqlite3_stmt *stmt, int index, const char *value) +{ + int ret = 0; + + ret = sqlite3_bind_text(stmt, index, value, -1, SQLITE_STATIC); + if (ret) { + sqlite_errno(ret, "sqlite3_bind_text"); + return ret; + } + + return ret; +} + int sqlite_exec_as_transaction(sqlite3 *db, const char *stmt) { static const char *const fmt = "BEGIN; %s COMMIT;"; @@ -236,6 +269,11 @@ int sqlite_get_user_version(sqlite3 *db, unsigned int *output) ret = sqlite_step(stmt); if (ret < 0) goto finalize; + if (!ret) { + ret = -1; + log_err("Failed to read database version\n"); + goto finalize; + } result = sqlite_column_int(stmt, 0); if (result < 0) { diff --git a/src/sqlite.h b/src/sqlite.h index 5a6a548..28c9b7a 100644 --- a/src/sqlite.h +++ b/src/sqlite.h @@ -22,9 +22,13 @@ int sqlite_exec(sqlite3 *db, const char *stmt, sqlite3_callback callback); int sqlite_log_result(void *, int, char **, char **); int sqlite_prepare(sqlite3 *db, const char *stmt, sqlite3_stmt **result); +void sqlite_reset(sqlite3_stmt *); void sqlite_finalize(sqlite3_stmt *); int sqlite_step(sqlite3_stmt *); +int sqlite_bind_int(sqlite3_stmt *, int column_index, int value); +int sqlite_bind_text(sqlite3_stmt *, int column_index, const char *value); + int sqlite_column_int(sqlite3_stmt *, int column_index); int sqlite_column_text(sqlite3_stmt *, int column_index, char **result); int sqlite_column_blob(sqlite3_stmt *, int column_index, unsigned char **result); diff --git a/src/sqlite/v01.sql b/src/sqlite/v01.sql index 2793b8b..44c5116 100644 --- a/src/sqlite/v01.sql +++ b/src/sqlite/v01.sql @@ -1,19 +1,19 @@ -CREATE TABLE cimple_repositories ( +CREATE TABLE cimple_repos ( id INTEGER PRIMARY KEY, url TEXT NOT NULL ) STRICT; -CREATE UNIQUE INDEX cimple_repositories_url_index ON cimple_repositories(url); +CREATE UNIQUE INDEX cimple_repos_index_url ON cimple_repos(url); CREATE TABLE cimple_run_status ( id INTEGER PRIMARY KEY, label TEXT NOT NULL ) STRICT; -CREATE UNIQUE INDEX cimple_run_status_label_index ON cimple_run_status(label); +CREATE UNIQUE INDEX cimple_run_status_index_label ON cimple_run_status(label); -INSERT INTO cimple_run_status(id, label) VALUES (0, 'created'); -INSERT INTO cimple_run_status(id, label) VALUES (1, 'finished'); +INSERT INTO cimple_run_status(id, label) VALUES (1, 'created'); +INSERT INTO cimple_run_status(id, label) VALUES (2, 'finished'); CREATE TABLE cimple_runs ( id INTEGER PRIMARY KEY, @@ -21,10 +21,11 @@ CREATE TABLE cimple_runs ( result INTEGER NOT NULL, output BLOB NOT NULL, repo_id INTEGER NOT NULL, + rev TEXT NOT NULL, FOREIGN KEY (status) REFERENCES cimple_run_status(id), - FOREIGN KEY (repo_id) REFERENCES cimple_repositories(id) + FOREIGN KEY (repo_id) REFERENCES cimple_repos(id) ON DELETE CASCADE ON UPDATE CASCADE ) STRICT; -CREATE INDEX cimple_runs_status_index ON cimple_runs(status); -CREATE INDEX cimple_runs_repo_id_index ON cimple_runs(repo_id); +CREATE INDEX cimple_runs_index_status ON cimple_runs(status); +CREATE INDEX cimple_runs_index_repo_id ON cimple_runs(repo_id); diff --git a/src/storage.c b/src/storage.c index a273030..7c59776 100644 --- a/src/storage.c +++ b/src/storage.c @@ -15,10 +15,14 @@ typedef void (*storage_settings_destroy_t)(const struct storage_settings *); typedef int (*storage_create_t)(struct storage *, const struct storage_settings *); typedef void (*storage_destroy_t)(struct storage *); +typedef int (*storage_run_create_t)(struct storage *, const char *repo_url, const char *rev); + struct storage_api { storage_settings_destroy_t destroy_settings; storage_create_t create; storage_destroy_t destroy; + + storage_run_create_t run_create; }; static const struct storage_api apis[] = { @@ -26,6 +30,8 @@ static const struct storage_api apis[] = { storage_sqlite_settings_destroy, storage_sqlite_create, storage_sqlite_destroy, + + storage_sqlite_run_create, }, }; @@ -76,3 +82,11 @@ void storage_destroy(struct storage *storage) return; api->destroy(storage); } + +int storage_run_create(struct storage *storage, const char *repo_url, const char *rev) +{ + const struct storage_api *api = get_api(storage->type); + if (!api) + return -1; + return api->run_create(storage, repo_url, rev); +} diff --git a/src/storage.h b/src/storage.h index 139dcdd..0dcd2f9 100644 --- a/src/storage.h +++ b/src/storage.h @@ -33,4 +33,6 @@ struct storage { int storage_create(struct storage *, const struct storage_settings *); void storage_destroy(struct storage *); +int storage_run_create(struct storage *, const char *repo_url, const char *rev); + #endif diff --git a/src/storage_sqlite.c b/src/storage_sqlite.c index 3483bae..887e2a5 100644 --- a/src/storage_sqlite.c +++ b/src/storage_sqlite.c @@ -53,6 +53,10 @@ void storage_sqlite_settings_destroy(const struct storage_settings *settings) struct storage_sqlite { sqlite3 *db; + + sqlite3_stmt *stmt_repo_find; + sqlite3_stmt *stmt_repo_insert; + sqlite3_stmt *stmt_run_insert; }; static int storage_sqlite_upgrade_to(struct storage_sqlite *storage, size_t version) @@ -124,7 +128,7 @@ static int storage_sqlite_upgrade(struct storage_sqlite *storage) return storage_sqlite_upgrade_from_to(storage, current_version, newest_version); } -static int storage_sqlite_prepare(struct storage_sqlite *storage) +static int storage_sqlite_setup(struct storage_sqlite *storage) { int ret = 0; @@ -139,6 +143,43 @@ static int storage_sqlite_prepare(struct storage_sqlite *storage) return ret; } +static int storage_sqlite_prepare_statements(struct storage_sqlite *storage) +{ + /* clang-format off */ + static const char *const fmt_repo_find = "SELECT id FROM cimple_repos WHERE url = ?;"; + static const char *const fmt_repo_insert = "INSERT INTO cimple_repos(url) VALUES (?) RETURNING id;"; + static const char *const fmt_run_insert = "INSERT INTO cimple_runs(status, result, output, repo_id, rev) VALUES (1, 0, x'', ?, ?) RETURNING id;"; + /* clang-format on */ + + int ret = 0; + + ret = sqlite_prepare(storage->db, fmt_repo_find, &storage->stmt_repo_find); + if (ret < 0) + return ret; + ret = sqlite_prepare(storage->db, fmt_repo_insert, &storage->stmt_repo_insert); + if (ret < 0) + goto finalize_repo_find; + ret = sqlite_prepare(storage->db, fmt_run_insert, &storage->stmt_run_insert); + if (ret < 0) + goto finalize_repo_insert; + + return ret; + +finalize_repo_insert: + sqlite_finalize(storage->stmt_repo_insert); +finalize_repo_find: + sqlite_finalize(storage->stmt_repo_find); + + return ret; +} + +static void storage_sqlite_finalize_statements(struct storage_sqlite *storage) +{ + sqlite_finalize(storage->stmt_run_insert); + sqlite_finalize(storage->stmt_repo_insert); + sqlite_finalize(storage->stmt_repo_find); +} + int storage_sqlite_create(struct storage *storage, const struct storage_settings *settings) { int ret = 0; @@ -157,7 +198,10 @@ int storage_sqlite_create(struct storage *storage, const struct storage_settings ret = sqlite_open_rw(settings->sqlite->path, &sqlite->db); if (ret < 0) goto destroy; - ret = storage_sqlite_prepare(sqlite); + ret = storage_sqlite_setup(sqlite); + if (ret < 0) + goto close; + ret = storage_sqlite_prepare_statements(sqlite); if (ret < 0) goto close; @@ -176,7 +220,111 @@ free: void storage_sqlite_destroy(struct storage *storage) { + storage_sqlite_finalize_statements(storage->sqlite); sqlite_close(storage->sqlite->db); sqlite_destroy(); free(storage->sqlite); } + +static int storage_sqlite_find_repo(struct storage_sqlite *storage, const char *url) +{ + sqlite3_stmt *stmt = storage->stmt_repo_find; + int ret = 0; + + ret = sqlite_bind_text(stmt, 1, url); + if (ret < 0) + goto reset; + ret = sqlite_step(stmt); + if (ret < 0) + goto reset; + + if (!ret) + goto reset; + + ret = sqlite_column_int(stmt, 0); + goto reset; + +reset: + sqlite_reset(stmt); + + return ret; +} + +static int storage_sqlite_insert_repo(struct storage_sqlite *storage, const char *url) +{ + sqlite3_stmt *stmt = storage->stmt_repo_insert; + int ret = 0; + + ret = storage_sqlite_find_repo(storage, url); + if (ret < 0) + return ret; + + if (ret) + return ret; + + ret = sqlite_bind_text(stmt, 1, url); + if (ret < 0) + goto reset; + ret = sqlite_step(stmt); + if (ret < 0) + goto reset; + + if (!ret) { + ret = -1; + log_err("Failed to insert a repository\n"); + goto reset; + } + + ret = sqlite_column_int(stmt, 0); + goto reset; + +reset: + sqlite_reset(stmt); + + return ret; +} + +static int storage_sqlite_insert_run(struct storage_sqlite *storage, int repo_id, const char *rev) +{ + sqlite3_stmt *stmt = storage->stmt_run_insert; + int ret = 0; + + ret = sqlite_bind_int(stmt, 1, repo_id); + if (ret < 0) + goto reset; + ret = sqlite_bind_text(stmt, 2, rev); + if (ret < 0) + goto reset; + ret = sqlite_step(stmt); + if (ret < 0) + goto reset; + + if (!ret) { + ret = -1; + log_err("Failed to insert a run\n"); + goto reset; + } + + ret = sqlite_column_int(stmt, 0); + goto reset; + +reset: + sqlite_reset(stmt); + + return ret; +} + +int storage_sqlite_run_create(struct storage *storage, const char *repo_url, const char *rev) +{ + int ret = 0; + + ret = storage_sqlite_insert_repo(storage->sqlite, repo_url); + if (ret < 0) + return ret; + + ret = storage_sqlite_insert_run(storage->sqlite, ret, rev); + if (ret < 0) + return ret; + + return ret; +} diff --git a/src/storage_sqlite.h b/src/storage_sqlite.h index e9e0581..b133ab8 100644 --- a/src/storage_sqlite.h +++ b/src/storage_sqlite.h @@ -20,4 +20,6 @@ void storage_sqlite_settings_destroy(const struct storage_settings *); int storage_sqlite_create(struct storage *, const struct storage_settings *); void storage_sqlite_destroy(struct storage *); +int storage_sqlite_run_create(struct storage *, const char *repo_url, const char *rev); + #endif -- cgit v1.2.3