aboutsummaryrefslogtreecommitdiffstatshomepage
diff options
context:
space:
mode:
authorEgor Tensin <Egor.Tensin@gmail.com>2023-07-04 18:29:26 +0200
committerEgor Tensin <Egor.Tensin@gmail.com>2023-07-04 20:51:17 +0200
commita892d9a21fb321b82cb74cba2ec617a4edd7e2c9 (patch)
tree1774a176d1fbed474e8bfeea044703461745e86a
parenttcp_server: always clean up connection descriptors (diff)
downloadcimple-a892d9a21fb321b82cb74cba2ec617a4edd7e2c9.tar.gz
cimple-a892d9a21fb321b82cb74cba2ec617a4edd7e2c9.zip
storage: requeue old runs from storage on startup
Diffstat (limited to '')
-rw-r--r--src/run_queue.c18
-rw-r--r--src/run_queue.h4
-rw-r--r--src/server.c17
-rw-r--r--src/sqlite.c8
-rw-r--r--src/sqlite.h2
-rw-r--r--src/storage.c12
-rw-r--r--src/storage.h2
-rw-r--r--src/storage_sqlite.c72
-rw-r--r--src/storage_sqlite.h3
9 files changed, 116 insertions, 22 deletions
diff --git a/src/run_queue.c b/src/run_queue.c
index 8bfa816..cb000e0 100644
--- a/src/run_queue.c
+++ b/src/run_queue.c
@@ -14,13 +14,13 @@
#include <sys/queue.h>
struct run {
+ int id;
char *url;
char *rev;
- int id;
SIMPLEQ_ENTRY(run) entries;
};
-int run_create(struct run **_entry, const char *_url, const char *_rev, int id)
+int run_create(struct run **_entry, int id, const char *_url, const char *_rev)
{
struct run *entry = malloc(sizeof(struct run));
if (!entry) {
@@ -40,9 +40,9 @@ int run_create(struct run **_entry, const char *_url, const char *_rev, int id)
goto free_url;
}
+ entry->id = id;
entry->url = url;
entry->rev = rev;
- entry->id = id;
*_entry = entry;
return 0;
@@ -69,7 +69,7 @@ int run_from_msg(struct run **run, const struct msg *msg)
const char **argv = msg_get_strings(msg);
/* We don't know the ID yet. */
- return run_create(run, argv[1], argv[2], 0);
+ return run_create(run, 0, argv[1], argv[2]);
}
void run_destroy(struct run *entry)
@@ -79,6 +79,11 @@ void run_destroy(struct run *entry)
free(entry);
}
+int run_get_id(const struct run *entry)
+{
+ return entry->id;
+}
+
const char *run_get_url(const struct run *entry)
{
return entry->url;
@@ -89,11 +94,6 @@ const char *run_get_rev(const struct run *entry)
return entry->rev;
}
-int run_get_id(const struct run *entry)
-{
- return entry->id;
-}
-
void run_set_id(struct run *entry, int id)
{
entry->id = id;
diff --git a/src/run_queue.h b/src/run_queue.h
index 7f07819..74fcca6 100644
--- a/src/run_queue.h
+++ b/src/run_queue.h
@@ -14,13 +14,13 @@
struct run;
-int run_create(struct run **, const char *url, const char *rev, int id);
+int run_create(struct run **, int id, const char *url, const char *rev);
int run_from_msg(struct run **, const struct msg *);
void run_destroy(struct run *);
+int run_get_id(const struct run *);
const char *run_get_url(const struct run *);
const char *run_get_rev(const struct run *);
-int run_get_id(const struct run *);
void run_set_id(struct run *, int id);
diff --git a/src/server.c b/src/server.c
index ee605c0..acc3756 100644
--- a/src/server.c
+++ b/src/server.c
@@ -339,21 +339,26 @@ int server_create(struct server **_server, const struct settings *settings)
goto close_signalfd;
worker_queue_create(&server->worker_queue);
- run_queue_create(&server->run_queue);
ret = storage_sqlite_settings_create(&storage_settings, settings->sqlite_path);
if (ret < 0)
- goto destroy_run_queue;
+ goto destroy_worker_queue;
ret = storage_create(&server->storage, &storage_settings);
storage_settings_destroy(&storage_settings);
if (ret < 0)
+ goto destroy_worker_queue;
+
+ run_queue_create(&server->run_queue);
+
+ ret = storage_get_run_queue(&server->storage, &server->run_queue);
+ if (ret < 0)
goto destroy_run_queue;
ret = tcp_server_create(&server->tcp_server, settings->port, cmd_dispatcher_handle_conn,
server->cmd_dispatcher);
if (ret < 0)
- goto destroy_storage;
+ goto destroy_run_queue;
ret = tcp_server_add_to_event_loop(server->tcp_server, server->event_loop);
if (ret < 0)
@@ -371,12 +376,12 @@ int server_create(struct server **_server, const struct settings *settings)
destroy_tcp_server:
tcp_server_destroy(server->tcp_server);
-destroy_storage:
- storage_destroy(&server->storage);
-
destroy_run_queue:
run_queue_destroy(&server->run_queue);
+ storage_destroy(&server->storage);
+
+destroy_worker_queue:
worker_queue_destroy(&server->worker_queue);
close_signalfd:
diff --git a/src/sqlite.c b/src/sqlite.c
index 88ec7bf..8078739 100644
--- a/src/sqlite.c
+++ b/src/sqlite.c
@@ -76,11 +76,11 @@ void sqlite_close(sqlite3 *db)
sqlite_errno_if(sqlite3_close(db), "sqlite3_close");
}
-int sqlite_exec(sqlite3 *db, const char *stmt, sqlite3_callback callback)
+int sqlite_exec(sqlite3 *db, const char *stmt, sqlite3_callback callback, void *arg)
{
int ret = 0;
- ret = sqlite3_exec(db, stmt, callback, NULL, NULL);
+ ret = sqlite3_exec(db, stmt, callback, arg, NULL);
if (ret) {
sqlite_errno(ret, "sqlite3_exec");
return ret;
@@ -247,7 +247,7 @@ int sqlite_exec_as_transaction(sqlite3 *db, const char *stmt)
}
snprintf(full_stmt, nb, fmt, stmt);
- ret = sqlite_exec(db, stmt, NULL);
+ ret = sqlite_exec(db, stmt, NULL, NULL);
goto free;
free:
@@ -294,5 +294,5 @@ finalize:
int sqlite_set_foreign_keys(sqlite3 *db)
{
static const char *const sql = "PRAGMA foreign_keys = ON;";
- return sqlite_exec(db, sql, NULL);
+ return sqlite_exec(db, sql, NULL, NULL);
}
diff --git a/src/sqlite.h b/src/sqlite.h
index 28c9b7a..5f5b285 100644
--- a/src/sqlite.h
+++ b/src/sqlite.h
@@ -18,7 +18,7 @@ int sqlite_open_rw(const char *path, sqlite3 **db);
int sqlite_open_ro(const char *path, sqlite3 **db);
void sqlite_close(sqlite3 *db);
-int sqlite_exec(sqlite3 *db, const char *stmt, sqlite3_callback callback);
+int sqlite_exec(sqlite3 *db, const char *stmt, sqlite3_callback callback, void *arg);
int sqlite_log_result(void *, int, char **, char **);
int sqlite_prepare(sqlite3 *db, const char *stmt, sqlite3_stmt **result);
diff --git a/src/storage.c b/src/storage.c
index 7c59776..5eaf5e7 100644
--- a/src/storage.c
+++ b/src/storage.c
@@ -7,6 +7,7 @@
#include "storage.h"
#include "log.h"
+#include "run_queue.h"
#include "storage_sqlite.h"
#include <stddef.h>
@@ -16,6 +17,7 @@ typedef int (*storage_create_t)(struct storage *, const struct storage_settings
typedef void (*storage_destroy_t)(struct storage *);
typedef int (*storage_run_create_t)(struct storage *, const char *repo_url, const char *rev);
+typedef int (*storage_get_run_queue_t)(struct storage *, struct run_queue *);
struct storage_api {
storage_settings_destroy_t destroy_settings;
@@ -23,6 +25,7 @@ struct storage_api {
storage_destroy_t destroy;
storage_run_create_t run_create;
+ storage_get_run_queue_t get_run_queue;
};
static const struct storage_api apis[] = {
@@ -32,6 +35,7 @@ static const struct storage_api apis[] = {
storage_sqlite_destroy,
storage_sqlite_run_create,
+ storage_sqlite_get_run_queue,
},
};
@@ -90,3 +94,11 @@ int storage_run_create(struct storage *storage, const char *repo_url, const char
return -1;
return api->run_create(storage, repo_url, rev);
}
+
+int storage_get_run_queue(struct storage *storage, struct run_queue *queue)
+{
+ const struct storage_api *api = get_api(storage->type);
+ if (!api)
+ return -1;
+ return api->get_run_queue(storage, queue);
+}
diff --git a/src/storage.h b/src/storage.h
index 0dcd2f9..1457095 100644
--- a/src/storage.h
+++ b/src/storage.h
@@ -8,6 +8,7 @@
#ifndef __STORAGE_H__
#define __STORAGE_H__
+#include "run_queue.h"
#include "storage_sqlite.h"
enum storage_type {
@@ -34,5 +35,6 @@ int storage_create(struct storage *, const struct storage_settings *);
void storage_destroy(struct storage *);
int storage_run_create(struct storage *, const char *repo_url, const char *rev);
+int storage_get_run_queue(struct storage *, struct run_queue *);
#endif
diff --git a/src/storage_sqlite.c b/src/storage_sqlite.c
index 887e2a5..96c58d9 100644
--- a/src/storage_sqlite.c
+++ b/src/storage_sqlite.c
@@ -7,6 +7,7 @@
#include "storage_sqlite.h"
#include "log.h"
+#include "run_queue.h"
#include "sql/sqlite_sql.h"
#include "sqlite.h"
#include "storage.h"
@@ -328,3 +329,74 @@ int storage_sqlite_run_create(struct storage *storage, const char *repo_url, con
return ret;
}
+
+static int storage_sqlite_row_to_run(struct sqlite3_stmt *stmt, struct run **run)
+{
+ int ret = 0;
+
+ int id = sqlite_column_int(stmt, 0);
+
+ char *url = NULL;
+ ret = sqlite_column_text(stmt, 1, &url);
+ if (ret < 0)
+ return ret;
+
+ char *rev = NULL;
+ ret = sqlite_column_text(stmt, 2, &rev);
+ if (ret < 0)
+ goto free_url;
+
+ ret = run_create(run, id, url, rev);
+ if (ret < 0)
+ goto free_rev;
+
+ log("Adding a run %d for repository %s to the queue\n", id, url);
+
+free_rev:
+ free(rev);
+
+free_url:
+ free(url);
+
+ return ret;
+}
+
+int storage_sqlite_get_run_queue(struct storage *storage, struct run_queue *queue)
+{
+ /* clang-format off */
+ static const char *const fmt = "SELECT cimple_runs.id, cimple_repos.url, cimple_runs.rev FROM cimple_runs INNER JOIN cimple_repos ON cimple_runs.repo_id = cimple_repos.id WHERE cimple_runs.status = 1;";
+ /* clang-format on */
+
+ sqlite3_stmt *stmt;
+ int ret = 0;
+
+ ret = sqlite_prepare(storage->sqlite->db, fmt, &stmt);
+ if (ret < 0)
+ return ret;
+
+ while (1) {
+ ret = sqlite_step(stmt);
+ if (!ret)
+ break;
+ if (ret < 0)
+ goto run_queue_destroy;
+
+ struct run *run = NULL;
+
+ ret = storage_sqlite_row_to_run(stmt, &run);
+ if (ret < 0)
+ goto run_queue_destroy;
+
+ run_queue_add_last(queue, run);
+ }
+
+ goto finalize;
+
+run_queue_destroy:
+ run_queue_destroy(queue);
+
+finalize:
+ sqlite_finalize(stmt);
+
+ return ret;
+}
diff --git a/src/storage_sqlite.h b/src/storage_sqlite.h
index b133ab8..a755f51 100644
--- a/src/storage_sqlite.h
+++ b/src/storage_sqlite.h
@@ -8,6 +8,8 @@
#ifndef __STORAGE_SQLITE_H__
#define __STORAGE_SQLITE_H__
+#include "run_queue.h"
+
struct storage_settings;
struct storage_sqlite_setttings;
@@ -21,5 +23,6 @@ int storage_sqlite_create(struct storage *, const struct storage_settings *);
void storage_sqlite_destroy(struct storage *);
int storage_sqlite_run_create(struct storage *, const char *repo_url, const char *rev);
+int storage_sqlite_get_run_queue(struct storage *, struct run_queue *runs);
#endif