From d4e47fdb640c3ddce285157eee88db899461fa3a Mon Sep 17 00:00:00 2001 From: Egor Tensin Date: Tue, 4 Jul 2023 20:51:29 +0200 Subject: storage: mark completed runs as such --- src/const.h | 3 ++- src/run_queue.c | 35 ++++++++++++++++++++++++++++------- src/run_queue.h | 4 +++- src/server.c | 43 +++++++++++++++++++++++++++++++++++++------ src/storage.c | 11 +++++++++++ src/storage.h | 1 + src/storage_sqlite.c | 35 ++++++++++++++++++++++++++++++++--- src/storage_sqlite.h | 1 + src/string.c | 24 ++++++++++++++++++++++++ src/string.h | 2 ++ src/worker.c | 26 +++++++++++++++++++------- 11 files changed, 160 insertions(+), 25 deletions(-) (limited to 'src') diff --git a/src/const.h b/src/const.h index 0c0c5fc..6928235 100644 --- a/src/const.h +++ b/src/const.h @@ -17,6 +17,7 @@ extern const char *default_sqlite_path; #define CMD_RUN "run" #define CMD_NEW_WORKER "new-worker" -#define CMD_COMPLETE "complete" +#define CMD_START "start" +#define CMD_FINISHED "finished" #endif diff --git a/src/run_queue.c b/src/run_queue.c index cb000e0..e1fdf84 100644 --- a/src/run_queue.c +++ b/src/run_queue.c @@ -8,6 +8,7 @@ #include "run_queue.h" #include "log.h" #include "msg.h" +#include "string.h" #include #include @@ -57,26 +58,46 @@ fail: return -1; } +void run_destroy(struct run *entry) +{ + free(entry->rev); + free(entry->url); + free(entry); +} + int run_from_msg(struct run **run, const struct msg *msg) { size_t msg_len = msg_get_length(msg); - if (msg_len != 3) { + if (msg_len != 4) { log_err("Invalid number of arguments for a message: %zu\n", msg_len); msg_dump(msg); return -1; } const char **argv = msg_get_strings(msg); - /* We don't know the ID yet. */ - return run_create(run, 0, argv[1], argv[2]); + + int id = 0; + int ret = string_to_int(argv[1], &id); + if (ret < 0) + return ret; + + return run_create(run, id, argv[2], argv[3]); } -void run_destroy(struct run *entry) +int run_from_msg_unknown_id(struct run **run, const struct msg *msg) { - free(entry->rev); - free(entry->url); - free(entry); + size_t msg_len = msg_get_length(msg); + + if (msg_len != 3) { + log_err("Invalid number of arguments for a message: %zu\n", msg_len); + msg_dump(msg); + return -1; + } + + const char **argv = msg_get_strings(msg); + /* We don't know the ID yet. */ + return run_create(run, 0, argv[1], argv[2]); } int run_get_id(const struct run *entry) diff --git a/src/run_queue.h b/src/run_queue.h index 74fcca6..df716a4 100644 --- a/src/run_queue.h +++ b/src/run_queue.h @@ -15,9 +15,11 @@ struct run; int run_create(struct run **, int id, const char *url, const char *rev); -int run_from_msg(struct run **, const struct msg *); void run_destroy(struct run *); +int run_from_msg(struct run **, const struct msg *); +int run_from_msg_unknown_id(struct run **, const struct msg *); + int run_get_id(const struct run *); const char *run_get_url(const struct run *); const char *run_get_rev(const struct run *); diff --git a/src/server.c b/src/server.c index acc3756..f00385f 100644 --- a/src/server.c +++ b/src/server.c @@ -17,6 +17,7 @@ #include "signal.h" #include "storage.h" #include "storage_sqlite.h" +#include "string.h" #include "tcp_server.h" #include "worker_queue.h" @@ -169,7 +170,10 @@ static void server_assign_run(struct server *server) struct worker *worker = worker_queue_remove_first(&server->worker_queue); log("Removed worker %d from the queue\n", worker_get_fd(worker)); - const char *argv[] = {CMD_RUN, run_get_url(run), run_get_rev(run), NULL}; + char id[16]; + snprintf(id, sizeof(id), "%d", run_get_id(run)); + + const char *argv[] = {CMD_START, id, run_get_url(run), run_get_rev(run), NULL}; int ret = msg_talk_argv(worker_get_fd(worker), argv, NULL); if (ret < 0) { @@ -251,7 +255,7 @@ static int server_handle_cmd_run(const struct msg *request, struct msg **respons int ret = 0; - ret = run_from_msg(&run, request); + ret = run_from_msg_unknown_id(&run, request); if (ret < 0) return ret; @@ -275,14 +279,41 @@ destroy_run: return ret; } -static int server_handle_cmd_complete(UNUSED const struct msg *request, - UNUSED struct msg **response, void *_ctx) +static int server_handle_cmd_finished(const struct msg *request, UNUSED struct msg **response, + void *_ctx) { struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx; int client_fd = ctx->fd; int ret = 0; - log("Received a \"run complete\" message from worker %d\n", client_fd); + log("Received a \"run finished\" message from worker %d\n", client_fd); + + size_t msg_len = msg_get_length(request); + + if (msg_len != 3) { + log_err("Invalid number of arguments for a message: %zu\n", msg_len); + msg_dump(request); + return -1; + } + + const char **argv = msg_get_strings(request); + + int run_id, ec; + + ret = string_to_int(argv[1], &run_id); + if (ret < 0) + return ret; + ret = string_to_int(argv[2], &ec); + if (ret < 0) + return ret; + + struct server *server = (struct server *)ctx->arg; + + ret = storage_run_finished(&server->storage, run_id, ec); + if (ret < 0) { + log_err("Failed to mark run %d as finished\n", run_id); + return ret; + } return ret; } @@ -290,7 +321,7 @@ static int server_handle_cmd_complete(UNUSED const struct msg *request, static struct cmd_desc commands[] = { {CMD_NEW_WORKER, server_handle_cmd_new_worker}, {CMD_RUN, server_handle_cmd_run}, - {CMD_COMPLETE, server_handle_cmd_complete}, + {CMD_FINISHED, server_handle_cmd_finished}, }; static const size_t numof_commands = sizeof(commands) / sizeof(commands[0]); diff --git a/src/storage.c b/src/storage.c index 5eaf5e7..5df9843 100644 --- a/src/storage.c +++ b/src/storage.c @@ -17,6 +17,7 @@ typedef int (*storage_create_t)(struct storage *, const struct storage_settings typedef void (*storage_destroy_t)(struct storage *); typedef int (*storage_run_create_t)(struct storage *, const char *repo_url, const char *rev); +typedef int (*storage_run_finished_t)(struct storage *, int repo_id, int ec); typedef int (*storage_get_run_queue_t)(struct storage *, struct run_queue *); struct storage_api { @@ -25,6 +26,7 @@ struct storage_api { storage_destroy_t destroy; storage_run_create_t run_create; + storage_run_finished_t run_finished; storage_get_run_queue_t get_run_queue; }; @@ -35,6 +37,7 @@ static const struct storage_api apis[] = { storage_sqlite_destroy, storage_sqlite_run_create, + storage_sqlite_run_finished, storage_sqlite_get_run_queue, }, }; @@ -95,6 +98,14 @@ int storage_run_create(struct storage *storage, const char *repo_url, const char return api->run_create(storage, repo_url, rev); } +int storage_run_finished(struct storage *storage, int run_id, int ec) +{ + const struct storage_api *api = get_api(storage->type); + if (!api) + return -1; + return api->run_finished(storage, run_id, ec); +} + int storage_get_run_queue(struct storage *storage, struct run_queue *queue) { const struct storage_api *api = get_api(storage->type); diff --git a/src/storage.h b/src/storage.h index 1457095..139d878 100644 --- a/src/storage.h +++ b/src/storage.h @@ -35,6 +35,7 @@ int storage_create(struct storage *, const struct storage_settings *); void storage_destroy(struct storage *); int storage_run_create(struct storage *, const char *repo_url, const char *rev); +int storage_run_finished(struct storage *, int run_id, int ec); int storage_get_run_queue(struct storage *, struct run_queue *); #endif diff --git a/src/storage_sqlite.c b/src/storage_sqlite.c index 96c58d9..087174e 100644 --- a/src/storage_sqlite.c +++ b/src/storage_sqlite.c @@ -58,6 +58,7 @@ struct storage_sqlite { sqlite3_stmt *stmt_repo_find; sqlite3_stmt *stmt_repo_insert; sqlite3_stmt *stmt_run_insert; + sqlite3_stmt *stmt_run_finished; }; static int storage_sqlite_upgrade_to(struct storage_sqlite *storage, size_t version) @@ -147,9 +148,10 @@ static int storage_sqlite_setup(struct storage_sqlite *storage) static int storage_sqlite_prepare_statements(struct storage_sqlite *storage) { /* clang-format off */ - static const char *const fmt_repo_find = "SELECT id FROM cimple_repos WHERE url = ?;"; - static const char *const fmt_repo_insert = "INSERT INTO cimple_repos(url) VALUES (?) RETURNING id;"; - static const char *const fmt_run_insert = "INSERT INTO cimple_runs(status, result, output, repo_id, rev) VALUES (1, 0, x'', ?, ?) RETURNING id;"; + static const char *const fmt_repo_find = "SELECT id FROM cimple_repos WHERE url = ?;"; + static const char *const fmt_repo_insert = "INSERT INTO cimple_repos(url) VALUES (?) RETURNING id;"; + static const char *const fmt_run_insert = "INSERT INTO cimple_runs(status, result, output, repo_id, rev) VALUES (1, -1, x'', ?, ?) RETURNING id;"; + static const char *const fmt_run_finished = "UPDATE cimple_runs SET status = 2, result = ? WHERE id = ?;"; /* clang-format on */ int ret = 0; @@ -163,9 +165,14 @@ static int storage_sqlite_prepare_statements(struct storage_sqlite *storage) ret = sqlite_prepare(storage->db, fmt_run_insert, &storage->stmt_run_insert); if (ret < 0) goto finalize_repo_insert; + ret = sqlite_prepare(storage->db, fmt_run_finished, &storage->stmt_run_finished); + if (ret < 0) + goto finalize_run_insert; return ret; +finalize_run_insert: + sqlite_finalize(storage->stmt_run_insert); finalize_repo_insert: sqlite_finalize(storage->stmt_repo_insert); finalize_repo_find: @@ -176,6 +183,7 @@ finalize_repo_find: static void storage_sqlite_finalize_statements(struct storage_sqlite *storage) { + sqlite_finalize(storage->stmt_run_finished); sqlite_finalize(storage->stmt_run_insert); sqlite_finalize(storage->stmt_repo_insert); sqlite_finalize(storage->stmt_repo_find); @@ -330,6 +338,27 @@ int storage_sqlite_run_create(struct storage *storage, const char *repo_url, con return ret; } +int storage_sqlite_run_finished(struct storage *storage, int run_id, int ec) +{ + sqlite3_stmt *stmt = storage->sqlite->stmt_run_finished; + int ret = 0; + + ret = sqlite_bind_int(stmt, 1, ec); + if (ret < 0) + goto reset; + ret = sqlite_bind_int(stmt, 2, run_id); + if (ret < 0) + goto reset; + ret = sqlite_step(stmt); + if (ret < 0) + goto reset; + +reset: + sqlite_reset(stmt); + + return ret; +} + static int storage_sqlite_row_to_run(struct sqlite3_stmt *stmt, struct run **run) { int ret = 0; diff --git a/src/storage_sqlite.h b/src/storage_sqlite.h index a755f51..cecf7e1 100644 --- a/src/storage_sqlite.h +++ b/src/storage_sqlite.h @@ -23,6 +23,7 @@ int storage_sqlite_create(struct storage *, const struct storage_settings *); void storage_sqlite_destroy(struct storage *); int storage_sqlite_run_create(struct storage *, const char *repo_url, const char *rev); +int storage_sqlite_run_finished(struct storage *, int id, int ec); int storage_sqlite_get_run_queue(struct storage *, struct run_queue *runs); #endif diff --git a/src/string.c b/src/string.c index 878efeb..3a52ab2 100644 --- a/src/string.c +++ b/src/string.c @@ -6,7 +6,10 @@ */ #include "string.h" +#include "log.h" +#include +#include #include char *stpecpy(char *dst, char *end, const char *src) @@ -23,3 +26,24 @@ char *stpecpy(char *dst, char *end, const char *src) end[-1] = '\0'; return end; } + +int string_to_int(const char *src, int *result) +{ + char *endptr = NULL; + + errno = 0; + long ret = strtol(src, &endptr, 10); + + if (errno) { + log_errno("strtol"); + return -1; + } + + if (endptr == src || *endptr != '\0') { + log_err("Invalid number: %s\n", src); + return -1; + } + + *result = (int)ret; + return 0; +} diff --git a/src/string.h b/src/string.h index fb77e64..b472e82 100644 --- a/src/string.h +++ b/src/string.h @@ -10,4 +10,6 @@ char *stpecpy(char *dst, char *end, const char *src); +int string_to_int(const char *src, int *result); + #endif diff --git a/src/worker.c b/src/worker.c index 4b4413d..4e71f49 100644 --- a/src/worker.c +++ b/src/worker.c @@ -80,8 +80,23 @@ static int worker_set_stopping(UNUSED struct event_loop *loop, UNUSED int fd, UN return 0; } -static int worker_handle_cmd_run(const struct msg *request, UNUSED struct msg **response, - void *_ctx) +static int worker_send_finished(struct worker *worker, const struct run *run, + struct proc_output *output) +{ + char id[16]; + char ec[16]; + + snprintf(id, sizeof(id), "%d", run_get_id(run)); + snprintf(ec, sizeof(ec), "%d", output->ec); + + const char *argv[] = {CMD_FINISHED, id, ec, NULL}; + + return msg_connect_and_talk_argv(worker->settings->host, worker->settings->port, argv, + NULL); +} + +static int worker_handle_cmd_start(const struct msg *request, UNUSED struct msg **response, + void *_ctx) { struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx; struct run *run = NULL; @@ -102,10 +117,7 @@ static int worker_handle_cmd_run(const struct msg *request, UNUSED struct msg ** proc_output_dump(&result); - struct worker *worker = (struct worker *)ctx->arg; - static const char *argv[] = {CMD_COMPLETE, NULL}; - - ret = msg_connect_and_talk_argv(worker->settings->host, worker->settings->port, argv, NULL); + ret = worker_send_finished((struct worker *)ctx->arg, run, &result); if (ret < 0) goto free_output; @@ -118,7 +130,7 @@ free_output: } static struct cmd_desc commands[] = { - {CMD_RUN, worker_handle_cmd_run}, + {CMD_START, worker_handle_cmd_start}, }; static const size_t numof_commands = sizeof(commands) / sizeof(commands[0]); -- cgit v1.2.3