From 992ac5301fc8727d83017b242af2df9895eebfcc Mon Sep 17 00:00:00 2001 From: Egor Tensin Date: Wed, 15 Nov 2023 13:50:15 +0100 Subject: implement a command to list runs --- src/CMakeLists.txt | 3 ++ src/buf.c | 53 +++++++++++++++++++++++ src/buf.h | 22 ++++++++++ src/client.c | 6 ++- src/const.h | 1 + src/json.c | 33 ++++++++++++++ src/json.h | 5 +++ src/net.c | 45 ++------------------ src/net.h | 2 + src/protocol.c | 44 ++++++++++++++++--- src/protocol.h | 6 +++ src/run_queue.c | 118 +++++++++++++++++++++++++++++++++++++++++++-------- src/run_queue.h | 21 +++++++-- src/server.c | 41 +++++++++++++++--- src/storage.c | 16 ++++++- src/storage.h | 2 + src/storage_sqlite.c | 88 ++++++++++++++++++++++++++++---------- src/storage_sqlite.h | 2 + src/worker.c | 2 +- 19 files changed, 410 insertions(+), 100 deletions(-) create mode 100644 src/buf.c create mode 100644 src/buf.h (limited to 'src') diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index c046f88..ca19416 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -55,6 +55,7 @@ endfunction() add_my_executable(server server_main.c server.c base64.c + buf.c cmd_line.c command.c const.c @@ -80,6 +81,7 @@ target_include_directories(server PRIVATE "${CMAKE_CURRENT_BINARY_DIR}") add_my_executable(client client_main.c client.c base64.c + buf.c cmd_line.c const.c file.c @@ -94,6 +96,7 @@ target_link_libraries(client PRIVATE json-c sodium) add_my_executable(worker worker_main.c worker.c base64.c + buf.c ci.c cmd_line.c command.c diff --git a/src/buf.c b/src/buf.c new file mode 100644 index 0000000..90e33ad --- /dev/null +++ b/src/buf.c @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2023 Egor Tensin + * This file is part of the "cimple" project. + * For details, see https://github.com/egor-tensin/cimple. + * Distributed under the MIT License. + */ + +#include "buf.h" +#include "log.h" + +#include +#include +#include + +struct buf { + uint32_t size; + const void *data; +}; + +int buf_create(struct buf **_buf, const void *data, uint32_t size) +{ + struct buf *buf = malloc(sizeof(struct buf)); + if (!buf) { + log_errno("malloc"); + return -1; + } + + buf->data = data; + buf->size = size; + + *_buf = buf; + return 0; +} + +int buf_create_from_string(struct buf **buf, const char *str) +{ + return buf_create(buf, str, strlen(str) + 1); +} + +void buf_destroy(struct buf *buf) +{ + free(buf); +} + +uint32_t buf_get_size(const struct buf *buf) +{ + return buf->size; +} + +const void *buf_get_data(const struct buf *buf) +{ + return buf->data; +} diff --git a/src/buf.h b/src/buf.h new file mode 100644 index 0000000..033a54c --- /dev/null +++ b/src/buf.h @@ -0,0 +1,22 @@ +/* + * Copyright (c) 2023 Egor Tensin + * This file is part of the "cimple" project. + * For details, see https://github.com/egor-tensin/cimple. + * Distributed under the MIT License. + */ + +#ifndef __BUF_H__ +#define __BUF_H__ + +#include + +struct buf; + +int buf_create(struct buf **, const void *, uint32_t); +int buf_create_from_string(struct buf **, const char *); +void buf_destroy(struct buf *); + +uint32_t buf_get_size(const struct buf *); +const void *buf_get_data(const struct buf *); + +#endif diff --git a/src/client.c b/src/client.c index 1594014..cb10167 100644 --- a/src/client.c +++ b/src/client.c @@ -53,13 +53,17 @@ static int make_request(struct jsonrpc_request **request, int argc, const char * return -1; struct run *run = NULL; - int ret = run_create(&run, 0, argv[1], argv[2]); + int ret = run_queued(&run, argv[1], argv[2]); if (ret < 0) return ret; ret = request_create_queue_run(request, run); run_destroy(run); return ret; + } else if (!strcmp(argv[0], CMD_GET_RUNS)) { + if (argc != 1) + return -1; + return request_create_get_runs(request); } return -1; diff --git a/src/const.h b/src/const.h index 27dd1d3..d3925c6 100644 --- a/src/const.h +++ b/src/const.h @@ -19,5 +19,6 @@ extern const char *default_sqlite_path; #define CMD_NEW_WORKER "new-worker" #define CMD_START_RUN "start-run" #define CMD_FINISHED_RUN "finished-run" +#define CMD_GET_RUNS "get-runs" #endif diff --git a/src/json.c b/src/json.c index 2909c74..ec555b9 100644 --- a/src/json.c +++ b/src/json.c @@ -6,6 +6,7 @@ */ #include "json.h" +#include "buf.h" #include "log.h" #include "net.h" @@ -108,6 +109,28 @@ destroy_buf: return result; } +int json_new_object(struct json_object **_obj) +{ + struct json_object *obj = json_object_new_object(); + if (!obj) { + json_errno("json_object_new_object"); + return -1; + } + *_obj = obj; + return 0; +} + +int json_new_array(struct json_object **_arr) +{ + struct json_object *arr = json_object_new_array(); + if (!arr) { + json_errno("json_object_new_array"); + return -1; + } + *_arr = arr; + return 0; +} + int json_has(const struct json_object *obj, const char *key) { return json_object_object_get_ex(obj, key, NULL); @@ -254,3 +277,13 @@ int json_set_int_const_key(struct json_object *obj, const char *key, int64_t val { return json_set_int_internal(obj, key, value, json_const_key_flags); } + +int json_append(struct json_object *arr, struct json_object *elem) +{ + int ret = json_object_array_add(arr, elem); + if (ret < 0) { + json_errno("json_object_array_add"); + return ret; + } + return ret; +} diff --git a/src/json.h b/src/json.h index d14fd97..92cd688 100644 --- a/src/json.h +++ b/src/json.h @@ -28,6 +28,9 @@ int json_clone(const struct json_object *, const char *key, struct json_object * int json_send(struct json_object *, int fd); struct json_object *json_recv(int fd); +int json_new_object(struct json_object **); +int json_new_array(struct json_object **); + int json_has(const struct json_object *, const char *key); int json_get(const struct json_object *, const char *key, struct json_object **value); @@ -42,4 +45,6 @@ int json_set_const_key(struct json_object *, const char *, struct json_object *v int json_set_string_const_key(struct json_object *, const char *, const char *value); int json_set_int_const_key(struct json_object *, const char *, int64_t value); +int json_append(struct json_object *arr, struct json_object *elem); + #endif diff --git a/src/net.c b/src/net.c index cb85356..20460d9 100644 --- a/src/net.c +++ b/src/net.c @@ -6,6 +6,7 @@ */ #include "net.h" +#include "buf.h" #include "file.h" #include "log.h" @@ -208,56 +209,16 @@ int net_recv(int fd, void *buf, size_t size) return 0; } -struct buf { - uint32_t size; - const void *data; -}; - -int buf_create(struct buf **_buf, const void *data, uint32_t size) -{ - struct buf *buf = malloc(sizeof(struct buf)); - if (!buf) { - log_errno("malloc"); - return -1; - } - - buf->data = data; - buf->size = size; - - *_buf = buf; - return 0; -} - -int buf_create_from_string(struct buf **buf, const char *str) -{ - return buf_create(buf, str, strlen(str) + 1); -} - -void buf_destroy(struct buf *buf) -{ - free(buf); -} - -uint32_t buf_get_size(const struct buf *buf) -{ - return buf->size; -} - -const void *buf_get_data(const struct buf *buf) -{ - return buf->data; -} - int net_send_buf(int fd, const struct buf *buf) { int ret = 0; - uint32_t size = htonl(buf->size); + uint32_t size = htonl(buf_get_size(buf)); ret = net_send(fd, &size, sizeof(size)); if (ret < 0) return ret; - ret = net_send(fd, buf->data, buf->size); + ret = net_send(fd, buf_get_data(buf), buf_get_size(buf)); if (ret < 0) return ret; diff --git a/src/net.h b/src/net.h index 68ae311..ea8aa30 100644 --- a/src/net.h +++ b/src/net.h @@ -8,6 +8,8 @@ #ifndef __NET_H__ #define __NET_H__ +#include "buf.h" + #include #include diff --git a/src/protocol.c b/src/protocol.c index 8aac63d..880c449 100644 --- a/src/protocol.c +++ b/src/protocol.c @@ -28,10 +28,10 @@ int request_create_queue_run(struct jsonrpc_request **request, const struct run ret = jsonrpc_request_create(request, jsonrpc_generate_request_id(), CMD_QUEUE_RUN, NULL); if (ret < 0) return ret; - ret = jsonrpc_request_set_param_string(*request, run_key_url, run_get_url(run)); + ret = jsonrpc_request_set_param_string(*request, run_key_url, run_get_repo_url(run)); if (ret < 0) goto free_request; - ret = jsonrpc_request_set_param_string(*request, run_key_rev, run_get_rev(run)); + ret = jsonrpc_request_set_param_string(*request, run_key_rev, run_get_repo_rev(run)); if (ret < 0) goto free_request; @@ -56,7 +56,7 @@ int request_parse_queue_run(const struct jsonrpc_request *request, struct run ** if (ret < 0) return ret; - return run_create(run, 0, url, rev); + return run_queued(run, url, rev); } int request_create_new_worker(struct jsonrpc_request **request) @@ -79,10 +79,10 @@ int request_create_start_run(struct jsonrpc_request **request, const struct run ret = jsonrpc_request_set_param_int(*request, run_key_id, run_get_id(run)); if (ret < 0) goto free_request; - ret = jsonrpc_request_set_param_string(*request, run_key_url, run_get_url(run)); + ret = jsonrpc_request_set_param_string(*request, run_key_url, run_get_repo_url(run)); if (ret < 0) goto free_request; - ret = jsonrpc_request_set_param_string(*request, run_key_rev, run_get_rev(run)); + ret = jsonrpc_request_set_param_string(*request, run_key_rev, run_get_repo_rev(run)); if (ret < 0) goto free_request; @@ -111,7 +111,7 @@ int request_parse_start_run(const struct jsonrpc_request *request, struct run ** if (ret < 0) return ret; - return run_create(run, (int)id, url, rev); + return run_created(run, (int)id, url, rev); } static const char *const finished_key_run_id = "run_id"; @@ -190,3 +190,35 @@ free_output: return ret; } + +int request_create_get_runs(struct jsonrpc_request **request) +{ + return jsonrpc_request_create(request, jsonrpc_generate_request_id(), CMD_GET_RUNS, NULL); +} + +int request_parse_get_runs(UNUSED const struct jsonrpc_request *request) +{ + return 0; +} + +int response_create_get_runs(struct jsonrpc_response **response, + const struct jsonrpc_request *request, const struct run_queue *runs) +{ + struct json_object *runs_json = NULL; + int ret = 0; + + ret = run_queue_to_json(runs, &runs_json); + if (ret < 0) + return ret; + + ret = jsonrpc_response_create(response, request, runs_json); + if (ret < 0) + goto free_json; + + return ret; + +free_json: + json_object_put(runs_json); + + return ret; +} diff --git a/src/protocol.h b/src/protocol.h index 220eead..53b4acb 100644 --- a/src/protocol.h +++ b/src/protocol.h @@ -24,4 +24,10 @@ int request_parse_start_run(const struct jsonrpc_request *, struct run **); int request_create_finished_run(struct jsonrpc_request **, int run_id, const struct proc_output *); int request_parse_finished_run(const struct jsonrpc_request *, int *run_id, struct proc_output **); +int request_create_get_runs(struct jsonrpc_request **); +int request_parse_get_runs(const struct jsonrpc_request *); + +int response_create_get_runs(struct jsonrpc_response **, const struct jsonrpc_request *, + const struct run_queue *); + #endif diff --git a/src/run_queue.c b/src/run_queue.c index 188a651..6414e6e 100644 --- a/src/run_queue.c +++ b/src/run_queue.c @@ -6,20 +6,27 @@ */ #include "run_queue.h" +#include "json.h" #include "log.h" +#include + #include #include #include struct run { int id; - char *url; - char *rev; + char *repo_url; + char *repo_rev; + int status; + int exit_code; + SIMPLEQ_ENTRY(run) entries; }; -int run_create(struct run **_entry, int id, const char *_url, const char *_rev) +int run_new(struct run **_entry, int id, const char *_repo_url, const char *_repo_rev, + enum run_status status, int exit_code) { struct run *entry = malloc(sizeof(struct run)); if (!entry) { @@ -27,27 +34,29 @@ int run_create(struct run **_entry, int id, const char *_url, const char *_rev) goto fail; } - char *url = strdup(_url); - if (!url) { + char *repo_url = strdup(_repo_url); + if (!repo_url) { log_errno("strdup"); goto free_entry; } - char *rev = strdup(_rev); - if (!rev) { + char *repo_rev = strdup(_repo_rev); + if (!repo_rev) { log_errno("strdup"); - goto free_url; + goto free_repo_url; } entry->id = id; - entry->url = url; - entry->rev = rev; + entry->repo_url = repo_url; + entry->repo_rev = repo_rev; + entry->status = status; + entry->exit_code = exit_code; *_entry = entry; return 0; -free_url: - free(url); +free_repo_url: + free(repo_url); free_entry: free(entry); @@ -58,24 +67,64 @@ fail: void run_destroy(struct run *entry) { - free(entry->rev); - free(entry->url); + free(entry->repo_rev); + free(entry->repo_url); free(entry); } +int run_queued(struct run **entry, const char *repo_url, const char *repo_rev) +{ + return run_new(entry, -1, repo_url, repo_rev, RUN_STATUS_CREATED, -1); +} + +int run_created(struct run **entry, int id, const char *repo_url, const char *repo_rev) +{ + return run_new(entry, id, repo_url, repo_rev, RUN_STATUS_CREATED, -1); +} + +int run_to_json(const struct run *entry, struct json_object **_json) +{ + struct json_object *json = NULL; + int ret = 0; + + ret = json_new_object(&json); + if (ret < 0) + return -1; + ret = json_set_int_const_key(json, "id", entry->id); + if (ret < 0) + goto free; + ret = json_set_int_const_key(json, "exit_code", entry->exit_code); + if (ret < 0) + goto free; + ret = json_set_string_const_key(json, "repo_url", entry->repo_url); + if (ret < 0) + goto free; + ret = json_set_string_const_key(json, "repo_rev", entry->repo_rev); + if (ret < 0) + goto free; + + *_json = json; + return ret; + +free: + json_object_put(json); + + return ret; +} + int run_get_id(const struct run *entry) { return entry->id; } -const char *run_get_url(const struct run *entry) +const char *run_get_repo_url(const struct run *entry) { - return entry->url; + return entry->repo_url; } -const char *run_get_rev(const struct run *entry) +const char *run_get_repo_rev(const struct run *entry) { - return entry->rev; + return entry->repo_rev; } void run_set_id(struct run *entry, int id) @@ -99,6 +148,39 @@ void run_queue_destroy(struct run_queue *queue) SIMPLEQ_INIT(queue); } +int run_queue_to_json(const struct run_queue *queue, struct json_object **_json) +{ + struct json_object *json = NULL; + int ret = 0; + + ret = json_new_array(&json); + if (ret < 0) + return ret; + + struct run *entry = NULL; + SIMPLEQ_FOREACH(entry, queue, entries) + { + struct json_object *entry_json = NULL; + ret = run_to_json(entry, &entry_json); + if (ret < 0) + goto free; + + ret = json_append(json, entry_json); + if (ret < 0) { + json_object_put(entry_json); + goto free; + } + } + + *_json = json; + return ret; + +free: + json_object_put(json); + + return ret; +} + int run_queue_is_empty(const struct run_queue *queue) { return SIMPLEQ_EMPTY(queue); diff --git a/src/run_queue.h b/src/run_queue.h index ac4554b..3c3d8e4 100644 --- a/src/run_queue.h +++ b/src/run_queue.h @@ -8,16 +8,29 @@ #ifndef __RUN_QUEUE_H__ #define __RUN_QUEUE_H__ +#include + #include +enum run_status { + RUN_STATUS_CREATED = 1, + RUN_STATUS_FINISHED = 2, +}; + struct run; -int run_create(struct run **, int id, const char *url, const char *rev); +int run_new(struct run **, int id, const char *repo_url, const char *repo_rev, enum run_status, + int exit_code); void run_destroy(struct run *); +int run_queued(struct run **, const char *repo_url, const char *repo_rev); +int run_created(struct run **, int id, const char *repo_url, const char *repo_rev); + +int run_to_json(const struct run *, struct json_object **); + int run_get_id(const struct run *); -const char *run_get_url(const struct run *); -const char *run_get_rev(const struct run *); +const char *run_get_repo_url(const struct run *); +const char *run_get_repo_rev(const struct run *); void run_set_id(struct run *, int id); @@ -26,6 +39,8 @@ SIMPLEQ_HEAD(run_queue, run); void run_queue_create(struct run_queue *); void run_queue_destroy(struct run_queue *); +int run_queue_to_json(const struct run_queue *, struct json_object **); + int run_queue_is_empty(const struct run_queue *); void run_queue_add_first(struct run_queue *, struct run *); diff --git a/src/server.c b/src/server.c index 37806c9..7ffba68 100644 --- a/src/server.c +++ b/src/server.c @@ -125,7 +125,7 @@ static int server_enqueue_run(struct server *server, struct run *run) { int ret = 0; - ret = storage_run_create(&server->storage, run_get_url(run), run_get_rev(run)); + ret = storage_run_create(&server->storage, run_get_repo_url(run), run_get_repo_rev(run)); if (ret < 0) return ret; run_set_id(run, ret); @@ -136,7 +136,7 @@ static int server_enqueue_run(struct server *server, struct run *run) run_queue_add_last(&server->run_queue, run); log("Added a new run %d for repository %s to the queue\n", run_get_id(run), - run_get_url(run)); + run_get_repo_url(run)); server_notify(server); server_unlock(server); @@ -164,7 +164,8 @@ static int server_wait_for_action(struct server *server) static void server_assign_run(struct server *server) { struct run *run = run_queue_remove_first(&server->run_queue); - log("Removed run %d for repository %s from the queue\n", run_get_id(run), run_get_url(run)); + log("Removed run %d for repository %s from the queue\n", run_get_id(run), + run_get_repo_url(run)); struct worker *worker = worker_queue_remove_first(&server->worker_queue); log("Removed worker %d from the queue\n", worker_get_fd(worker)); @@ -184,11 +185,11 @@ static void server_assign_run(struct server *server) exit: if (ret < 0) { log("Failed to assign run for repository %s to worker %d, requeueing\n", - run_get_url(run), worker_get_fd(worker)); + run_get_repo_url(run), worker_get_fd(worker)); run_queue_add_first(&server->run_queue, run); } else { log("Assigned run %d for repository %s to worker %d\n", run_get_id(run), - run_get_url(run), worker_get_fd(worker)); + run_get_repo_url(run), worker_get_fd(worker)); run_destroy(run); } @@ -316,10 +317,40 @@ free_output: return ret; } +static int server_handle_cmd_get_runs(const struct jsonrpc_request *request, + struct jsonrpc_response **response, void *_ctx) +{ + struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx; + struct server *server = (struct server *)ctx->arg; + int ret = 0; + + ret = request_parse_get_runs(request); + if (ret < 0) + return ret; + + struct run_queue runs; + + ret = storage_get_runs(&server->storage, &runs); + if (ret < 0) { + log_err("Failed to fetch runs\n"); + return ret; + } + + ret = response_create_get_runs(response, request, &runs); + if (ret < 0) + goto destroy_runs; + +destroy_runs: + run_queue_destroy(&runs); + + return ret; +} + static struct cmd_desc commands[] = { {CMD_NEW_WORKER, server_handle_cmd_new_worker}, {CMD_QUEUE_RUN, server_handle_cmd_queue_run}, {CMD_FINISHED_RUN, server_handle_cmd_finished_run}, + {CMD_GET_RUNS, server_handle_cmd_get_runs}, }; static const size_t numof_commands = sizeof(commands) / sizeof(commands[0]); diff --git a/src/storage.c b/src/storage.c index ebb2ce8..1916b01 100644 --- a/src/storage.c +++ b/src/storage.c @@ -19,7 +19,9 @@ typedef void (*storage_destroy_t)(struct storage *); typedef int (*storage_run_create_t)(struct storage *, const char *repo_url, const char *rev); typedef int (*storage_run_finished_t)(struct storage *, int repo_id, const struct proc_output *); -typedef int (*storage_get_run_queue_t)(struct storage *, struct run_queue *); + +typedef int (*storage_get_runs_t)(struct storage *, struct run_queue *); +typedef storage_get_runs_t storage_get_run_queue_t; struct storage_api { storage_settings_destroy_t destroy_settings; @@ -28,6 +30,8 @@ struct storage_api { storage_run_create_t run_create; storage_run_finished_t run_finished; + + storage_get_runs_t get_runs; storage_get_run_queue_t get_run_queue; }; @@ -39,6 +43,8 @@ static const struct storage_api apis[] = { storage_sqlite_run_create, storage_sqlite_run_finished, + + storage_sqlite_get_runs, storage_sqlite_get_run_queue, }, }; @@ -107,6 +113,14 @@ int storage_run_finished(struct storage *storage, int run_id, const struct proc_ return api->run_finished(storage, run_id, output); } +int storage_get_runs(struct storage *storage, struct run_queue *queue) +{ + const struct storage_api *api = get_api(storage->type); + if (!api) + return -1; + return api->get_runs(storage, queue); +} + int storage_get_run_queue(struct storage *storage, struct run_queue *queue) { const struct storage_api *api = get_api(storage->type); diff --git a/src/storage.h b/src/storage.h index f7406a5..83e12f5 100644 --- a/src/storage.h +++ b/src/storage.h @@ -37,6 +37,8 @@ void storage_destroy(struct storage *); int storage_run_create(struct storage *, const char *repo_url, const char *rev); int storage_run_finished(struct storage *, int run_id, const struct proc_output *); + +int storage_get_runs(struct storage *, struct run_queue *); int storage_get_run_queue(struct storage *, struct run_queue *); #endif diff --git a/src/storage_sqlite.c b/src/storage_sqlite.c index 8973fd9..143f302 100644 --- a/src/storage_sqlite.c +++ b/src/storage_sqlite.c @@ -54,11 +54,6 @@ void storage_sqlite_settings_destroy(const struct storage_settings *settings) free(settings->sqlite); } -enum run_status { - RUN_STATUS_CREATED = 1, - RUN_STATUS_FINISHED = 2, -}; - struct prepared_stmt { pthread_mutex_t mtx; sqlite3_stmt *impl; @@ -114,6 +109,8 @@ struct storage_sqlite { struct prepared_stmt stmt_repo_insert; struct prepared_stmt stmt_run_insert; struct prepared_stmt stmt_run_finished; + struct prepared_stmt stmt_get_runs; + struct prepared_stmt stmt_get_run_queue; }; static int storage_sqlite_upgrade_to(struct storage_sqlite *storage, size_t version) @@ -209,6 +206,10 @@ static int storage_sqlite_prepare_statements(struct storage_sqlite *storage) "INSERT INTO cimple_runs(status, exit_code, output, repo_id, repo_rev) VALUES (?, -1, x'', ?, ?) RETURNING id;"; static const char *const fmt_run_finished = "UPDATE cimple_runs SET status = ?, exit_code = ?, output = ? WHERE id = ?;"; + static const char *const fmt_get_runs = + "SELECT id, status, exit_code, repo_url, repo_rev FROM cimple_runs_view ORDER BY id DESC"; + static const char *const fmt_get_run_queue = + "SELECT id, status, exit_code, repo_url, repo_rev FROM cimple_runs_view WHERE status = ? ORDER BY id;"; int ret = 0; @@ -224,9 +225,19 @@ static int storage_sqlite_prepare_statements(struct storage_sqlite *storage) ret = prepared_stmt_init(&storage->stmt_run_finished, storage->db, fmt_run_finished); if (ret < 0) goto finalize_run_insert; + ret = prepared_stmt_init(&storage->stmt_get_runs, storage->db, fmt_get_runs); + if (ret < 0) + goto finalize_run_finished; + ret = prepared_stmt_init(&storage->stmt_get_run_queue, storage->db, fmt_get_run_queue); + if (ret < 0) + goto finalize_get_runs; return ret; +finalize_get_runs: + prepared_stmt_destroy(&storage->stmt_get_runs); +finalize_run_finished: + prepared_stmt_destroy(&storage->stmt_run_finished); finalize_run_insert: prepared_stmt_destroy(&storage->stmt_run_insert); finalize_repo_insert: @@ -239,6 +250,8 @@ finalize_repo_find: static void storage_sqlite_finalize_statements(struct storage_sqlite *storage) { + prepared_stmt_destroy(&storage->stmt_get_run_queue); + prepared_stmt_destroy(&storage->stmt_get_runs); prepared_stmt_destroy(&storage->stmt_run_finished); prepared_stmt_destroy(&storage->stmt_run_insert); prepared_stmt_destroy(&storage->stmt_repo_insert); @@ -433,18 +446,20 @@ static int storage_sqlite_row_to_run(struct sqlite3_stmt *stmt, struct run **run int ret = 0; int id = sqlite_column_int(stmt, 0); + int status = sqlite_column_int(stmt, 1); + int exit_code = sqlite_column_int(stmt, 2); char *url = NULL; - ret = sqlite_column_text(stmt, 1, &url); + ret = sqlite_column_text(stmt, 3, &url); if (ret < 0) return ret; char *rev = NULL; - ret = sqlite_column_text(stmt, 2, &rev); + ret = sqlite_column_text(stmt, 4, &rev); if (ret < 0) goto free_url; - ret = run_create(run, id, url, rev); + ret = run_new(run, id, url, rev, status, exit_code); if (ret < 0) goto free_rev; @@ -459,21 +474,10 @@ free_url: return ret; } -int storage_sqlite_get_run_queue(struct storage *storage, struct run_queue *queue) +static int storage_sqlite_rows_to_runs(struct sqlite3_stmt *stmt, struct run_queue *queue) { - static const char *const fmt = - "SELECT id, repo_url, repo_rev FROM cimple_runs_view WHERE status = ?;"; - - sqlite3_stmt *stmt; int ret = 0; - ret = sqlite_prepare(storage->sqlite->db, fmt, &stmt); - if (ret < 0) - return ret; - ret = sqlite_bind_int(stmt, 1, RUN_STATUS_CREATED); - if (ret < 0) - goto finalize; - run_queue_create(queue); while (1) { @@ -492,13 +496,51 @@ int storage_sqlite_get_run_queue(struct storage *storage, struct run_queue *queu run_queue_add_last(queue, run); } - goto finalize; + return ret; run_queue_destroy: run_queue_destroy(queue); -finalize: - sqlite_finalize(stmt); + return ret; +} + +int storage_sqlite_get_runs(struct storage *storage, struct run_queue *queue) +{ + struct prepared_stmt *stmt = &storage->sqlite->stmt_get_runs; + int ret = 0; + + ret = prepared_stmt_lock(stmt); + if (ret < 0) + return ret; + ret = storage_sqlite_rows_to_runs(stmt->impl, queue); + if (ret < 0) + goto reset; + +reset: + sqlite_reset(stmt->impl); + prepared_stmt_unlock(stmt); + + return ret; +} + +int storage_sqlite_get_run_queue(struct storage *storage, struct run_queue *queue) +{ + struct prepared_stmt *stmt = &storage->sqlite->stmt_get_run_queue; + int ret = 0; + + ret = prepared_stmt_lock(stmt); + if (ret < 0) + return ret; + ret = sqlite_bind_int(stmt->impl, 1, RUN_STATUS_CREATED); + if (ret < 0) + goto reset; + ret = storage_sqlite_rows_to_runs(stmt->impl, queue); + if (ret < 0) + goto reset; + +reset: + sqlite_reset(stmt->impl); + prepared_stmt_unlock(stmt); return ret; } diff --git a/src/storage_sqlite.h b/src/storage_sqlite.h index 857b9c0..1a9deea 100644 --- a/src/storage_sqlite.h +++ b/src/storage_sqlite.h @@ -25,6 +25,8 @@ void storage_sqlite_destroy(struct storage *); int storage_sqlite_run_create(struct storage *, const char *repo_url, const char *rev); int storage_sqlite_run_finished(struct storage *, int id, const struct proc_output *); + +int storage_sqlite_get_runs(struct storage *, struct run_queue *runs); int storage_sqlite_get_run_queue(struct storage *, struct run_queue *runs); #endif diff --git a/src/worker.c b/src/worker.c index 5a0804f..ce024b8 100644 --- a/src/worker.c +++ b/src/worker.c @@ -184,7 +184,7 @@ static int worker_do_run(struct worker *worker) if (ret < 0) return ret; - ret = ci_run_git_repo(run_get_url(worker->run), run_get_rev(worker->run), result); + ret = ci_run_git_repo(run_get_repo_url(worker->run), run_get_repo_rev(worker->run), result); if (ret < 0) { log_err("Run failed with an error\n"); goto free_output; -- cgit v1.2.3