From a892d9a21fb321b82cb74cba2ec617a4edd7e2c9 Mon Sep 17 00:00:00 2001 From: Egor Tensin Date: Tue, 4 Jul 2023 18:29:26 +0200 Subject: storage: requeue old runs from storage on startup --- src/run_queue.c | 18 ++++++------- src/run_queue.h | 4 +-- src/server.c | 17 ++++++++----- src/sqlite.c | 8 +++--- src/sqlite.h | 2 +- src/storage.c | 12 +++++++++ src/storage.h | 2 ++ src/storage_sqlite.c | 72 ++++++++++++++++++++++++++++++++++++++++++++++++++++ src/storage_sqlite.h | 3 +++ 9 files changed, 116 insertions(+), 22 deletions(-) diff --git a/src/run_queue.c b/src/run_queue.c index 8bfa816..cb000e0 100644 --- a/src/run_queue.c +++ b/src/run_queue.c @@ -14,13 +14,13 @@ #include struct run { + int id; char *url; char *rev; - int id; SIMPLEQ_ENTRY(run) entries; }; -int run_create(struct run **_entry, const char *_url, const char *_rev, int id) +int run_create(struct run **_entry, int id, const char *_url, const char *_rev) { struct run *entry = malloc(sizeof(struct run)); if (!entry) { @@ -40,9 +40,9 @@ int run_create(struct run **_entry, const char *_url, const char *_rev, int id) goto free_url; } + entry->id = id; entry->url = url; entry->rev = rev; - entry->id = id; *_entry = entry; return 0; @@ -69,7 +69,7 @@ int run_from_msg(struct run **run, const struct msg *msg) const char **argv = msg_get_strings(msg); /* We don't know the ID yet. */ - return run_create(run, argv[1], argv[2], 0); + return run_create(run, 0, argv[1], argv[2]); } void run_destroy(struct run *entry) @@ -79,6 +79,11 @@ void run_destroy(struct run *entry) free(entry); } +int run_get_id(const struct run *entry) +{ + return entry->id; +} + const char *run_get_url(const struct run *entry) { return entry->url; @@ -89,11 +94,6 @@ const char *run_get_rev(const struct run *entry) return entry->rev; } -int run_get_id(const struct run *entry) -{ - return entry->id; -} - void run_set_id(struct run *entry, int id) { entry->id = id; diff --git a/src/run_queue.h b/src/run_queue.h index 7f07819..74fcca6 100644 --- a/src/run_queue.h +++ b/src/run_queue.h @@ -14,13 +14,13 @@ struct run; -int run_create(struct run **, const char *url, const char *rev, int id); +int run_create(struct run **, int id, const char *url, const char *rev); int run_from_msg(struct run **, const struct msg *); void run_destroy(struct run *); +int run_get_id(const struct run *); const char *run_get_url(const struct run *); const char *run_get_rev(const struct run *); -int run_get_id(const struct run *); void run_set_id(struct run *, int id); diff --git a/src/server.c b/src/server.c index ee605c0..acc3756 100644 --- a/src/server.c +++ b/src/server.c @@ -339,21 +339,26 @@ int server_create(struct server **_server, const struct settings *settings) goto close_signalfd; worker_queue_create(&server->worker_queue); - run_queue_create(&server->run_queue); ret = storage_sqlite_settings_create(&storage_settings, settings->sqlite_path); if (ret < 0) - goto destroy_run_queue; + goto destroy_worker_queue; ret = storage_create(&server->storage, &storage_settings); storage_settings_destroy(&storage_settings); + if (ret < 0) + goto destroy_worker_queue; + + run_queue_create(&server->run_queue); + + ret = storage_get_run_queue(&server->storage, &server->run_queue); if (ret < 0) goto destroy_run_queue; ret = tcp_server_create(&server->tcp_server, settings->port, cmd_dispatcher_handle_conn, server->cmd_dispatcher); if (ret < 0) - goto destroy_storage; + goto destroy_run_queue; ret = tcp_server_add_to_event_loop(server->tcp_server, server->event_loop); if (ret < 0) @@ -371,12 +376,12 @@ int server_create(struct server **_server, const struct settings *settings) destroy_tcp_server: tcp_server_destroy(server->tcp_server); -destroy_storage: - storage_destroy(&server->storage); - destroy_run_queue: run_queue_destroy(&server->run_queue); + storage_destroy(&server->storage); + +destroy_worker_queue: worker_queue_destroy(&server->worker_queue); close_signalfd: diff --git a/src/sqlite.c b/src/sqlite.c index 88ec7bf..8078739 100644 --- a/src/sqlite.c +++ b/src/sqlite.c @@ -76,11 +76,11 @@ void sqlite_close(sqlite3 *db) sqlite_errno_if(sqlite3_close(db), "sqlite3_close"); } -int sqlite_exec(sqlite3 *db, const char *stmt, sqlite3_callback callback) +int sqlite_exec(sqlite3 *db, const char *stmt, sqlite3_callback callback, void *arg) { int ret = 0; - ret = sqlite3_exec(db, stmt, callback, NULL, NULL); + ret = sqlite3_exec(db, stmt, callback, arg, NULL); if (ret) { sqlite_errno(ret, "sqlite3_exec"); return ret; @@ -247,7 +247,7 @@ int sqlite_exec_as_transaction(sqlite3 *db, const char *stmt) } snprintf(full_stmt, nb, fmt, stmt); - ret = sqlite_exec(db, stmt, NULL); + ret = sqlite_exec(db, stmt, NULL, NULL); goto free; free: @@ -294,5 +294,5 @@ finalize: int sqlite_set_foreign_keys(sqlite3 *db) { static const char *const sql = "PRAGMA foreign_keys = ON;"; - return sqlite_exec(db, sql, NULL); + return sqlite_exec(db, sql, NULL, NULL); } diff --git a/src/sqlite.h b/src/sqlite.h index 28c9b7a..5f5b285 100644 --- a/src/sqlite.h +++ b/src/sqlite.h @@ -18,7 +18,7 @@ int sqlite_open_rw(const char *path, sqlite3 **db); int sqlite_open_ro(const char *path, sqlite3 **db); void sqlite_close(sqlite3 *db); -int sqlite_exec(sqlite3 *db, const char *stmt, sqlite3_callback callback); +int sqlite_exec(sqlite3 *db, const char *stmt, sqlite3_callback callback, void *arg); int sqlite_log_result(void *, int, char **, char **); int sqlite_prepare(sqlite3 *db, const char *stmt, sqlite3_stmt **result); diff --git a/src/storage.c b/src/storage.c index 7c59776..5eaf5e7 100644 --- a/src/storage.c +++ b/src/storage.c @@ -7,6 +7,7 @@ #include "storage.h" #include "log.h" +#include "run_queue.h" #include "storage_sqlite.h" #include @@ -16,6 +17,7 @@ typedef int (*storage_create_t)(struct storage *, const struct storage_settings typedef void (*storage_destroy_t)(struct storage *); typedef int (*storage_run_create_t)(struct storage *, const char *repo_url, const char *rev); +typedef int (*storage_get_run_queue_t)(struct storage *, struct run_queue *); struct storage_api { storage_settings_destroy_t destroy_settings; @@ -23,6 +25,7 @@ struct storage_api { storage_destroy_t destroy; storage_run_create_t run_create; + storage_get_run_queue_t get_run_queue; }; static const struct storage_api apis[] = { @@ -32,6 +35,7 @@ static const struct storage_api apis[] = { storage_sqlite_destroy, storage_sqlite_run_create, + storage_sqlite_get_run_queue, }, }; @@ -90,3 +94,11 @@ int storage_run_create(struct storage *storage, const char *repo_url, const char return -1; return api->run_create(storage, repo_url, rev); } + +int storage_get_run_queue(struct storage *storage, struct run_queue *queue) +{ + const struct storage_api *api = get_api(storage->type); + if (!api) + return -1; + return api->get_run_queue(storage, queue); +} diff --git a/src/storage.h b/src/storage.h index 0dcd2f9..1457095 100644 --- a/src/storage.h +++ b/src/storage.h @@ -8,6 +8,7 @@ #ifndef __STORAGE_H__ #define __STORAGE_H__ +#include "run_queue.h" #include "storage_sqlite.h" enum storage_type { @@ -34,5 +35,6 @@ int storage_create(struct storage *, const struct storage_settings *); void storage_destroy(struct storage *); int storage_run_create(struct storage *, const char *repo_url, const char *rev); +int storage_get_run_queue(struct storage *, struct run_queue *); #endif diff --git a/src/storage_sqlite.c b/src/storage_sqlite.c index 887e2a5..96c58d9 100644 --- a/src/storage_sqlite.c +++ b/src/storage_sqlite.c @@ -7,6 +7,7 @@ #include "storage_sqlite.h" #include "log.h" +#include "run_queue.h" #include "sql/sqlite_sql.h" #include "sqlite.h" #include "storage.h" @@ -328,3 +329,74 @@ int storage_sqlite_run_create(struct storage *storage, const char *repo_url, con return ret; } + +static int storage_sqlite_row_to_run(struct sqlite3_stmt *stmt, struct run **run) +{ + int ret = 0; + + int id = sqlite_column_int(stmt, 0); + + char *url = NULL; + ret = sqlite_column_text(stmt, 1, &url); + if (ret < 0) + return ret; + + char *rev = NULL; + ret = sqlite_column_text(stmt, 2, &rev); + if (ret < 0) + goto free_url; + + ret = run_create(run, id, url, rev); + if (ret < 0) + goto free_rev; + + log("Adding a run %d for repository %s to the queue\n", id, url); + +free_rev: + free(rev); + +free_url: + free(url); + + return ret; +} + +int storage_sqlite_get_run_queue(struct storage *storage, struct run_queue *queue) +{ + /* clang-format off */ + static const char *const fmt = "SELECT cimple_runs.id, cimple_repos.url, cimple_runs.rev FROM cimple_runs INNER JOIN cimple_repos ON cimple_runs.repo_id = cimple_repos.id WHERE cimple_runs.status = 1;"; + /* clang-format on */ + + sqlite3_stmt *stmt; + int ret = 0; + + ret = sqlite_prepare(storage->sqlite->db, fmt, &stmt); + if (ret < 0) + return ret; + + while (1) { + ret = sqlite_step(stmt); + if (!ret) + break; + if (ret < 0) + goto run_queue_destroy; + + struct run *run = NULL; + + ret = storage_sqlite_row_to_run(stmt, &run); + if (ret < 0) + goto run_queue_destroy; + + run_queue_add_last(queue, run); + } + + goto finalize; + +run_queue_destroy: + run_queue_destroy(queue); + +finalize: + sqlite_finalize(stmt); + + return ret; +} diff --git a/src/storage_sqlite.h b/src/storage_sqlite.h index b133ab8..a755f51 100644 --- a/src/storage_sqlite.h +++ b/src/storage_sqlite.h @@ -8,6 +8,8 @@ #ifndef __STORAGE_SQLITE_H__ #define __STORAGE_SQLITE_H__ +#include "run_queue.h" + struct storage_settings; struct storage_sqlite_setttings; @@ -21,5 +23,6 @@ int storage_sqlite_create(struct storage *, const struct storage_settings *); void storage_sqlite_destroy(struct storage *); int storage_sqlite_run_create(struct storage *, const char *repo_url, const char *rev); +int storage_sqlite_get_run_queue(struct storage *, struct run_queue *runs); #endif -- cgit v1.2.3