aboutsummaryrefslogtreecommitdiffstatshomepage
diff options
context:
space:
mode:
authorEgor Tensin <Egor.Tensin@gmail.com>2023-07-04 20:51:29 +0200
committerEgor Tensin <Egor.Tensin@gmail.com>2023-07-04 20:51:29 +0200
commitd4e47fdb640c3ddce285157eee88db899461fa3a (patch)
treeec11a0df88f6db64a6017db7bc7efcaefedd04ec
parentstorage: requeue old runs from storage on startup (diff)
downloadcimple-d4e47fdb640c3ddce285157eee88db899461fa3a.tar.gz
cimple-d4e47fdb640c3ddce285157eee88db899461fa3a.zip
storage: mark completed runs as such
-rw-r--r--src/const.h3
-rw-r--r--src/run_queue.c35
-rw-r--r--src/run_queue.h4
-rw-r--r--src/server.c43
-rw-r--r--src/storage.c11
-rw-r--r--src/storage.h1
-rw-r--r--src/storage_sqlite.c35
-rw-r--r--src/storage_sqlite.h1
-rw-r--r--src/string.c24
-rw-r--r--src/string.h2
-rw-r--r--src/worker.c26
-rw-r--r--test/py/test_repo.py2
12 files changed, 161 insertions, 26 deletions
diff --git a/src/const.h b/src/const.h
index 0c0c5fc..6928235 100644
--- a/src/const.h
+++ b/src/const.h
@@ -17,6 +17,7 @@ extern const char *default_sqlite_path;
#define CMD_RUN "run"
#define CMD_NEW_WORKER "new-worker"
-#define CMD_COMPLETE "complete"
+#define CMD_START "start"
+#define CMD_FINISHED "finished"
#endif
diff --git a/src/run_queue.c b/src/run_queue.c
index cb000e0..e1fdf84 100644
--- a/src/run_queue.c
+++ b/src/run_queue.c
@@ -8,6 +8,7 @@
#include "run_queue.h"
#include "log.h"
#include "msg.h"
+#include "string.h"
#include <stdlib.h>
#include <string.h>
@@ -57,26 +58,46 @@ fail:
return -1;
}
+void run_destroy(struct run *entry)
+{
+ free(entry->rev);
+ free(entry->url);
+ free(entry);
+}
+
int run_from_msg(struct run **run, const struct msg *msg)
{
size_t msg_len = msg_get_length(msg);
- if (msg_len != 3) {
+ if (msg_len != 4) {
log_err("Invalid number of arguments for a message: %zu\n", msg_len);
msg_dump(msg);
return -1;
}
const char **argv = msg_get_strings(msg);
- /* We don't know the ID yet. */
- return run_create(run, 0, argv[1], argv[2]);
+
+ int id = 0;
+ int ret = string_to_int(argv[1], &id);
+ if (ret < 0)
+ return ret;
+
+ return run_create(run, id, argv[2], argv[3]);
}
-void run_destroy(struct run *entry)
+int run_from_msg_unknown_id(struct run **run, const struct msg *msg)
{
- free(entry->rev);
- free(entry->url);
- free(entry);
+ size_t msg_len = msg_get_length(msg);
+
+ if (msg_len != 3) {
+ log_err("Invalid number of arguments for a message: %zu\n", msg_len);
+ msg_dump(msg);
+ return -1;
+ }
+
+ const char **argv = msg_get_strings(msg);
+ /* We don't know the ID yet. */
+ return run_create(run, 0, argv[1], argv[2]);
}
int run_get_id(const struct run *entry)
diff --git a/src/run_queue.h b/src/run_queue.h
index 74fcca6..df716a4 100644
--- a/src/run_queue.h
+++ b/src/run_queue.h
@@ -15,9 +15,11 @@
struct run;
int run_create(struct run **, int id, const char *url, const char *rev);
-int run_from_msg(struct run **, const struct msg *);
void run_destroy(struct run *);
+int run_from_msg(struct run **, const struct msg *);
+int run_from_msg_unknown_id(struct run **, const struct msg *);
+
int run_get_id(const struct run *);
const char *run_get_url(const struct run *);
const char *run_get_rev(const struct run *);
diff --git a/src/server.c b/src/server.c
index acc3756..f00385f 100644
--- a/src/server.c
+++ b/src/server.c
@@ -17,6 +17,7 @@
#include "signal.h"
#include "storage.h"
#include "storage_sqlite.h"
+#include "string.h"
#include "tcp_server.h"
#include "worker_queue.h"
@@ -169,7 +170,10 @@ static void server_assign_run(struct server *server)
struct worker *worker = worker_queue_remove_first(&server->worker_queue);
log("Removed worker %d from the queue\n", worker_get_fd(worker));
- const char *argv[] = {CMD_RUN, run_get_url(run), run_get_rev(run), NULL};
+ char id[16];
+ snprintf(id, sizeof(id), "%d", run_get_id(run));
+
+ const char *argv[] = {CMD_START, id, run_get_url(run), run_get_rev(run), NULL};
int ret = msg_talk_argv(worker_get_fd(worker), argv, NULL);
if (ret < 0) {
@@ -251,7 +255,7 @@ static int server_handle_cmd_run(const struct msg *request, struct msg **respons
int ret = 0;
- ret = run_from_msg(&run, request);
+ ret = run_from_msg_unknown_id(&run, request);
if (ret < 0)
return ret;
@@ -275,14 +279,41 @@ destroy_run:
return ret;
}
-static int server_handle_cmd_complete(UNUSED const struct msg *request,
- UNUSED struct msg **response, void *_ctx)
+static int server_handle_cmd_finished(const struct msg *request, UNUSED struct msg **response,
+ void *_ctx)
{
struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx;
int client_fd = ctx->fd;
int ret = 0;
- log("Received a \"run complete\" message from worker %d\n", client_fd);
+ log("Received a \"run finished\" message from worker %d\n", client_fd);
+
+ size_t msg_len = msg_get_length(request);
+
+ if (msg_len != 3) {
+ log_err("Invalid number of arguments for a message: %zu\n", msg_len);
+ msg_dump(request);
+ return -1;
+ }
+
+ const char **argv = msg_get_strings(request);
+
+ int run_id, ec;
+
+ ret = string_to_int(argv[1], &run_id);
+ if (ret < 0)
+ return ret;
+ ret = string_to_int(argv[2], &ec);
+ if (ret < 0)
+ return ret;
+
+ struct server *server = (struct server *)ctx->arg;
+
+ ret = storage_run_finished(&server->storage, run_id, ec);
+ if (ret < 0) {
+ log_err("Failed to mark run %d as finished\n", run_id);
+ return ret;
+ }
return ret;
}
@@ -290,7 +321,7 @@ static int server_handle_cmd_complete(UNUSED const struct msg *request,
static struct cmd_desc commands[] = {
{CMD_NEW_WORKER, server_handle_cmd_new_worker},
{CMD_RUN, server_handle_cmd_run},
- {CMD_COMPLETE, server_handle_cmd_complete},
+ {CMD_FINISHED, server_handle_cmd_finished},
};
static const size_t numof_commands = sizeof(commands) / sizeof(commands[0]);
diff --git a/src/storage.c b/src/storage.c
index 5eaf5e7..5df9843 100644
--- a/src/storage.c
+++ b/src/storage.c
@@ -17,6 +17,7 @@ typedef int (*storage_create_t)(struct storage *, const struct storage_settings
typedef void (*storage_destroy_t)(struct storage *);
typedef int (*storage_run_create_t)(struct storage *, const char *repo_url, const char *rev);
+typedef int (*storage_run_finished_t)(struct storage *, int repo_id, int ec);
typedef int (*storage_get_run_queue_t)(struct storage *, struct run_queue *);
struct storage_api {
@@ -25,6 +26,7 @@ struct storage_api {
storage_destroy_t destroy;
storage_run_create_t run_create;
+ storage_run_finished_t run_finished;
storage_get_run_queue_t get_run_queue;
};
@@ -35,6 +37,7 @@ static const struct storage_api apis[] = {
storage_sqlite_destroy,
storage_sqlite_run_create,
+ storage_sqlite_run_finished,
storage_sqlite_get_run_queue,
},
};
@@ -95,6 +98,14 @@ int storage_run_create(struct storage *storage, const char *repo_url, const char
return api->run_create(storage, repo_url, rev);
}
+int storage_run_finished(struct storage *storage, int run_id, int ec)
+{
+ const struct storage_api *api = get_api(storage->type);
+ if (!api)
+ return -1;
+ return api->run_finished(storage, run_id, ec);
+}
+
int storage_get_run_queue(struct storage *storage, struct run_queue *queue)
{
const struct storage_api *api = get_api(storage->type);
diff --git a/src/storage.h b/src/storage.h
index 1457095..139d878 100644
--- a/src/storage.h
+++ b/src/storage.h
@@ -35,6 +35,7 @@ int storage_create(struct storage *, const struct storage_settings *);
void storage_destroy(struct storage *);
int storage_run_create(struct storage *, const char *repo_url, const char *rev);
+int storage_run_finished(struct storage *, int run_id, int ec);
int storage_get_run_queue(struct storage *, struct run_queue *);
#endif
diff --git a/src/storage_sqlite.c b/src/storage_sqlite.c
index 96c58d9..087174e 100644
--- a/src/storage_sqlite.c
+++ b/src/storage_sqlite.c
@@ -58,6 +58,7 @@ struct storage_sqlite {
sqlite3_stmt *stmt_repo_find;
sqlite3_stmt *stmt_repo_insert;
sqlite3_stmt *stmt_run_insert;
+ sqlite3_stmt *stmt_run_finished;
};
static int storage_sqlite_upgrade_to(struct storage_sqlite *storage, size_t version)
@@ -147,9 +148,10 @@ static int storage_sqlite_setup(struct storage_sqlite *storage)
static int storage_sqlite_prepare_statements(struct storage_sqlite *storage)
{
/* clang-format off */
- static const char *const fmt_repo_find = "SELECT id FROM cimple_repos WHERE url = ?;";
- static const char *const fmt_repo_insert = "INSERT INTO cimple_repos(url) VALUES (?) RETURNING id;";
- static const char *const fmt_run_insert = "INSERT INTO cimple_runs(status, result, output, repo_id, rev) VALUES (1, 0, x'', ?, ?) RETURNING id;";
+ static const char *const fmt_repo_find = "SELECT id FROM cimple_repos WHERE url = ?;";
+ static const char *const fmt_repo_insert = "INSERT INTO cimple_repos(url) VALUES (?) RETURNING id;";
+ static const char *const fmt_run_insert = "INSERT INTO cimple_runs(status, result, output, repo_id, rev) VALUES (1, -1, x'', ?, ?) RETURNING id;";
+ static const char *const fmt_run_finished = "UPDATE cimple_runs SET status = 2, result = ? WHERE id = ?;";
/* clang-format on */
int ret = 0;
@@ -163,9 +165,14 @@ static int storage_sqlite_prepare_statements(struct storage_sqlite *storage)
ret = sqlite_prepare(storage->db, fmt_run_insert, &storage->stmt_run_insert);
if (ret < 0)
goto finalize_repo_insert;
+ ret = sqlite_prepare(storage->db, fmt_run_finished, &storage->stmt_run_finished);
+ if (ret < 0)
+ goto finalize_run_insert;
return ret;
+finalize_run_insert:
+ sqlite_finalize(storage->stmt_run_insert);
finalize_repo_insert:
sqlite_finalize(storage->stmt_repo_insert);
finalize_repo_find:
@@ -176,6 +183,7 @@ finalize_repo_find:
static void storage_sqlite_finalize_statements(struct storage_sqlite *storage)
{
+ sqlite_finalize(storage->stmt_run_finished);
sqlite_finalize(storage->stmt_run_insert);
sqlite_finalize(storage->stmt_repo_insert);
sqlite_finalize(storage->stmt_repo_find);
@@ -330,6 +338,27 @@ int storage_sqlite_run_create(struct storage *storage, const char *repo_url, con
return ret;
}
+int storage_sqlite_run_finished(struct storage *storage, int run_id, int ec)
+{
+ sqlite3_stmt *stmt = storage->sqlite->stmt_run_finished;
+ int ret = 0;
+
+ ret = sqlite_bind_int(stmt, 1, ec);
+ if (ret < 0)
+ goto reset;
+ ret = sqlite_bind_int(stmt, 2, run_id);
+ if (ret < 0)
+ goto reset;
+ ret = sqlite_step(stmt);
+ if (ret < 0)
+ goto reset;
+
+reset:
+ sqlite_reset(stmt);
+
+ return ret;
+}
+
static int storage_sqlite_row_to_run(struct sqlite3_stmt *stmt, struct run **run)
{
int ret = 0;
diff --git a/src/storage_sqlite.h b/src/storage_sqlite.h
index a755f51..cecf7e1 100644
--- a/src/storage_sqlite.h
+++ b/src/storage_sqlite.h
@@ -23,6 +23,7 @@ int storage_sqlite_create(struct storage *, const struct storage_settings *);
void storage_sqlite_destroy(struct storage *);
int storage_sqlite_run_create(struct storage *, const char *repo_url, const char *rev);
+int storage_sqlite_run_finished(struct storage *, int id, int ec);
int storage_sqlite_get_run_queue(struct storage *, struct run_queue *runs);
#endif
diff --git a/src/string.c b/src/string.c
index 878efeb..3a52ab2 100644
--- a/src/string.c
+++ b/src/string.c
@@ -6,7 +6,10 @@
*/
#include "string.h"
+#include "log.h"
+#include <errno.h>
+#include <stdlib.h>
#include <string.h>
char *stpecpy(char *dst, char *end, const char *src)
@@ -23,3 +26,24 @@ char *stpecpy(char *dst, char *end, const char *src)
end[-1] = '\0';
return end;
}
+
+int string_to_int(const char *src, int *result)
+{
+ char *endptr = NULL;
+
+ errno = 0;
+ long ret = strtol(src, &endptr, 10);
+
+ if (errno) {
+ log_errno("strtol");
+ return -1;
+ }
+
+ if (endptr == src || *endptr != '\0') {
+ log_err("Invalid number: %s\n", src);
+ return -1;
+ }
+
+ *result = (int)ret;
+ return 0;
+}
diff --git a/src/string.h b/src/string.h
index fb77e64..b472e82 100644
--- a/src/string.h
+++ b/src/string.h
@@ -10,4 +10,6 @@
char *stpecpy(char *dst, char *end, const char *src);
+int string_to_int(const char *src, int *result);
+
#endif
diff --git a/src/worker.c b/src/worker.c
index 4b4413d..4e71f49 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -80,8 +80,23 @@ static int worker_set_stopping(UNUSED struct event_loop *loop, UNUSED int fd, UN
return 0;
}
-static int worker_handle_cmd_run(const struct msg *request, UNUSED struct msg **response,
- void *_ctx)
+static int worker_send_finished(struct worker *worker, const struct run *run,
+ struct proc_output *output)
+{
+ char id[16];
+ char ec[16];
+
+ snprintf(id, sizeof(id), "%d", run_get_id(run));
+ snprintf(ec, sizeof(ec), "%d", output->ec);
+
+ const char *argv[] = {CMD_FINISHED, id, ec, NULL};
+
+ return msg_connect_and_talk_argv(worker->settings->host, worker->settings->port, argv,
+ NULL);
+}
+
+static int worker_handle_cmd_start(const struct msg *request, UNUSED struct msg **response,
+ void *_ctx)
{
struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx;
struct run *run = NULL;
@@ -102,10 +117,7 @@ static int worker_handle_cmd_run(const struct msg *request, UNUSED struct msg **
proc_output_dump(&result);
- struct worker *worker = (struct worker *)ctx->arg;
- static const char *argv[] = {CMD_COMPLETE, NULL};
-
- ret = msg_connect_and_talk_argv(worker->settings->host, worker->settings->port, argv, NULL);
+ ret = worker_send_finished((struct worker *)ctx->arg, run, &result);
if (ret < 0)
goto free_output;
@@ -118,7 +130,7 @@ free_output:
}
static struct cmd_desc commands[] = {
- {CMD_RUN, worker_handle_cmd_run},
+ {CMD_START, worker_handle_cmd_start},
};
static const size_t numof_commands = sizeof(commands) / sizeof(commands[0]);
diff --git a/test/py/test_repo.py b/test/py/test_repo.py
index f3a10d7..abc7f18 100644
--- a/test/py/test_repo.py
+++ b/test/py/test_repo.py
@@ -13,7 +13,7 @@ class LoggingEventRunComplete(LoggingEvent):
super().__init__(timeout=60)
def log_line_matches(self, line):
- return 'Received a "run complete" message from worker' in line
+ return 'Received a "run finished" message from worker' in line
def set(self):
self.counter += 1