aboutsummaryrefslogtreecommitdiffstatshomepage
diff options
context:
space:
mode:
-rw-r--r--src/CMakeLists.txt3
-rw-r--r--src/buf.c53
-rw-r--r--src/buf.h22
-rw-r--r--src/client.c6
-rw-r--r--src/const.h1
-rw-r--r--src/json.c33
-rw-r--r--src/json.h5
-rw-r--r--src/net.c45
-rw-r--r--src/net.h2
-rw-r--r--src/protocol.c44
-rw-r--r--src/protocol.h6
-rw-r--r--src/run_queue.c118
-rw-r--r--src/run_queue.h21
-rw-r--r--src/server.c41
-rw-r--r--src/storage.c16
-rw-r--r--src/storage.h2
-rw-r--r--src/storage_sqlite.c88
-rw-r--r--src/storage_sqlite.h2
-rw-r--r--src/worker.c2
-rw-r--r--test/py/test_repo.py14
20 files changed, 424 insertions, 100 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index c046f88..ca19416 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -55,6 +55,7 @@ endfunction()
add_my_executable(server server_main.c server.c
base64.c
+ buf.c
cmd_line.c
command.c
const.c
@@ -80,6 +81,7 @@ target_include_directories(server PRIVATE "${CMAKE_CURRENT_BINARY_DIR}")
add_my_executable(client client_main.c client.c
base64.c
+ buf.c
cmd_line.c
const.c
file.c
@@ -94,6 +96,7 @@ target_link_libraries(client PRIVATE json-c sodium)
add_my_executable(worker worker_main.c worker.c
base64.c
+ buf.c
ci.c
cmd_line.c
command.c
diff --git a/src/buf.c b/src/buf.c
new file mode 100644
index 0000000..90e33ad
--- /dev/null
+++ b/src/buf.c
@@ -0,0 +1,53 @@
+/*
+ * Copyright (c) 2023 Egor Tensin <Egor.Tensin@gmail.com>
+ * This file is part of the "cimple" project.
+ * For details, see https://github.com/egor-tensin/cimple.
+ * Distributed under the MIT License.
+ */
+
+#include "buf.h"
+#include "log.h"
+
+#include <stdint.h>
+#include <stdlib.h>
+#include <string.h>
+
+struct buf {
+ uint32_t size;
+ const void *data;
+};
+
+int buf_create(struct buf **_buf, const void *data, uint32_t size)
+{
+ struct buf *buf = malloc(sizeof(struct buf));
+ if (!buf) {
+ log_errno("malloc");
+ return -1;
+ }
+
+ buf->data = data;
+ buf->size = size;
+
+ *_buf = buf;
+ return 0;
+}
+
+int buf_create_from_string(struct buf **buf, const char *str)
+{
+ return buf_create(buf, str, strlen(str) + 1);
+}
+
+void buf_destroy(struct buf *buf)
+{
+ free(buf);
+}
+
+uint32_t buf_get_size(const struct buf *buf)
+{
+ return buf->size;
+}
+
+const void *buf_get_data(const struct buf *buf)
+{
+ return buf->data;
+}
diff --git a/src/buf.h b/src/buf.h
new file mode 100644
index 0000000..033a54c
--- /dev/null
+++ b/src/buf.h
@@ -0,0 +1,22 @@
+/*
+ * Copyright (c) 2023 Egor Tensin <Egor.Tensin@gmail.com>
+ * This file is part of the "cimple" project.
+ * For details, see https://github.com/egor-tensin/cimple.
+ * Distributed under the MIT License.
+ */
+
+#ifndef __BUF_H__
+#define __BUF_H__
+
+#include <stdint.h>
+
+struct buf;
+
+int buf_create(struct buf **, const void *, uint32_t);
+int buf_create_from_string(struct buf **, const char *);
+void buf_destroy(struct buf *);
+
+uint32_t buf_get_size(const struct buf *);
+const void *buf_get_data(const struct buf *);
+
+#endif
diff --git a/src/client.c b/src/client.c
index 1594014..cb10167 100644
--- a/src/client.c
+++ b/src/client.c
@@ -53,13 +53,17 @@ static int make_request(struct jsonrpc_request **request, int argc, const char *
return -1;
struct run *run = NULL;
- int ret = run_create(&run, 0, argv[1], argv[2]);
+ int ret = run_queued(&run, argv[1], argv[2]);
if (ret < 0)
return ret;
ret = request_create_queue_run(request, run);
run_destroy(run);
return ret;
+ } else if (!strcmp(argv[0], CMD_GET_RUNS)) {
+ if (argc != 1)
+ return -1;
+ return request_create_get_runs(request);
}
return -1;
diff --git a/src/const.h b/src/const.h
index 27dd1d3..d3925c6 100644
--- a/src/const.h
+++ b/src/const.h
@@ -19,5 +19,6 @@ extern const char *default_sqlite_path;
#define CMD_NEW_WORKER "new-worker"
#define CMD_START_RUN "start-run"
#define CMD_FINISHED_RUN "finished-run"
+#define CMD_GET_RUNS "get-runs"
#endif
diff --git a/src/json.c b/src/json.c
index 2909c74..ec555b9 100644
--- a/src/json.c
+++ b/src/json.c
@@ -6,6 +6,7 @@
*/
#include "json.h"
+#include "buf.h"
#include "log.h"
#include "net.h"
@@ -108,6 +109,28 @@ destroy_buf:
return result;
}
+int json_new_object(struct json_object **_obj)
+{
+ struct json_object *obj = json_object_new_object();
+ if (!obj) {
+ json_errno("json_object_new_object");
+ return -1;
+ }
+ *_obj = obj;
+ return 0;
+}
+
+int json_new_array(struct json_object **_arr)
+{
+ struct json_object *arr = json_object_new_array();
+ if (!arr) {
+ json_errno("json_object_new_array");
+ return -1;
+ }
+ *_arr = arr;
+ return 0;
+}
+
int json_has(const struct json_object *obj, const char *key)
{
return json_object_object_get_ex(obj, key, NULL);
@@ -254,3 +277,13 @@ int json_set_int_const_key(struct json_object *obj, const char *key, int64_t val
{
return json_set_int_internal(obj, key, value, json_const_key_flags);
}
+
+int json_append(struct json_object *arr, struct json_object *elem)
+{
+ int ret = json_object_array_add(arr, elem);
+ if (ret < 0) {
+ json_errno("json_object_array_add");
+ return ret;
+ }
+ return ret;
+}
diff --git a/src/json.h b/src/json.h
index d14fd97..92cd688 100644
--- a/src/json.h
+++ b/src/json.h
@@ -28,6 +28,9 @@ int json_clone(const struct json_object *, const char *key, struct json_object *
int json_send(struct json_object *, int fd);
struct json_object *json_recv(int fd);
+int json_new_object(struct json_object **);
+int json_new_array(struct json_object **);
+
int json_has(const struct json_object *, const char *key);
int json_get(const struct json_object *, const char *key, struct json_object **value);
@@ -42,4 +45,6 @@ int json_set_const_key(struct json_object *, const char *, struct json_object *v
int json_set_string_const_key(struct json_object *, const char *, const char *value);
int json_set_int_const_key(struct json_object *, const char *, int64_t value);
+int json_append(struct json_object *arr, struct json_object *elem);
+
#endif
diff --git a/src/net.c b/src/net.c
index cb85356..20460d9 100644
--- a/src/net.c
+++ b/src/net.c
@@ -6,6 +6,7 @@
*/
#include "net.h"
+#include "buf.h"
#include "file.h"
#include "log.h"
@@ -208,56 +209,16 @@ int net_recv(int fd, void *buf, size_t size)
return 0;
}
-struct buf {
- uint32_t size;
- const void *data;
-};
-
-int buf_create(struct buf **_buf, const void *data, uint32_t size)
-{
- struct buf *buf = malloc(sizeof(struct buf));
- if (!buf) {
- log_errno("malloc");
- return -1;
- }
-
- buf->data = data;
- buf->size = size;
-
- *_buf = buf;
- return 0;
-}
-
-int buf_create_from_string(struct buf **buf, const char *str)
-{
- return buf_create(buf, str, strlen(str) + 1);
-}
-
-void buf_destroy(struct buf *buf)
-{
- free(buf);
-}
-
-uint32_t buf_get_size(const struct buf *buf)
-{
- return buf->size;
-}
-
-const void *buf_get_data(const struct buf *buf)
-{
- return buf->data;
-}
-
int net_send_buf(int fd, const struct buf *buf)
{
int ret = 0;
- uint32_t size = htonl(buf->size);
+ uint32_t size = htonl(buf_get_size(buf));
ret = net_send(fd, &size, sizeof(size));
if (ret < 0)
return ret;
- ret = net_send(fd, buf->data, buf->size);
+ ret = net_send(fd, buf_get_data(buf), buf_get_size(buf));
if (ret < 0)
return ret;
diff --git a/src/net.h b/src/net.h
index 68ae311..ea8aa30 100644
--- a/src/net.h
+++ b/src/net.h
@@ -8,6 +8,8 @@
#ifndef __NET_H__
#define __NET_H__
+#include "buf.h"
+
#include <stddef.h>
#include <stdint.h>
diff --git a/src/protocol.c b/src/protocol.c
index 8aac63d..880c449 100644
--- a/src/protocol.c
+++ b/src/protocol.c
@@ -28,10 +28,10 @@ int request_create_queue_run(struct jsonrpc_request **request, const struct run
ret = jsonrpc_request_create(request, jsonrpc_generate_request_id(), CMD_QUEUE_RUN, NULL);
if (ret < 0)
return ret;
- ret = jsonrpc_request_set_param_string(*request, run_key_url, run_get_url(run));
+ ret = jsonrpc_request_set_param_string(*request, run_key_url, run_get_repo_url(run));
if (ret < 0)
goto free_request;
- ret = jsonrpc_request_set_param_string(*request, run_key_rev, run_get_rev(run));
+ ret = jsonrpc_request_set_param_string(*request, run_key_rev, run_get_repo_rev(run));
if (ret < 0)
goto free_request;
@@ -56,7 +56,7 @@ int request_parse_queue_run(const struct jsonrpc_request *request, struct run **
if (ret < 0)
return ret;
- return run_create(run, 0, url, rev);
+ return run_queued(run, url, rev);
}
int request_create_new_worker(struct jsonrpc_request **request)
@@ -79,10 +79,10 @@ int request_create_start_run(struct jsonrpc_request **request, const struct run
ret = jsonrpc_request_set_param_int(*request, run_key_id, run_get_id(run));
if (ret < 0)
goto free_request;
- ret = jsonrpc_request_set_param_string(*request, run_key_url, run_get_url(run));
+ ret = jsonrpc_request_set_param_string(*request, run_key_url, run_get_repo_url(run));
if (ret < 0)
goto free_request;
- ret = jsonrpc_request_set_param_string(*request, run_key_rev, run_get_rev(run));
+ ret = jsonrpc_request_set_param_string(*request, run_key_rev, run_get_repo_rev(run));
if (ret < 0)
goto free_request;
@@ -111,7 +111,7 @@ int request_parse_start_run(const struct jsonrpc_request *request, struct run **
if (ret < 0)
return ret;
- return run_create(run, (int)id, url, rev);
+ return run_created(run, (int)id, url, rev);
}
static const char *const finished_key_run_id = "run_id";
@@ -190,3 +190,35 @@ free_output:
return ret;
}
+
+int request_create_get_runs(struct jsonrpc_request **request)
+{
+ return jsonrpc_request_create(request, jsonrpc_generate_request_id(), CMD_GET_RUNS, NULL);
+}
+
+int request_parse_get_runs(UNUSED const struct jsonrpc_request *request)
+{
+ return 0;
+}
+
+int response_create_get_runs(struct jsonrpc_response **response,
+ const struct jsonrpc_request *request, const struct run_queue *runs)
+{
+ struct json_object *runs_json = NULL;
+ int ret = 0;
+
+ ret = run_queue_to_json(runs, &runs_json);
+ if (ret < 0)
+ return ret;
+
+ ret = jsonrpc_response_create(response, request, runs_json);
+ if (ret < 0)
+ goto free_json;
+
+ return ret;
+
+free_json:
+ json_object_put(runs_json);
+
+ return ret;
+}
diff --git a/src/protocol.h b/src/protocol.h
index 220eead..53b4acb 100644
--- a/src/protocol.h
+++ b/src/protocol.h
@@ -24,4 +24,10 @@ int request_parse_start_run(const struct jsonrpc_request *, struct run **);
int request_create_finished_run(struct jsonrpc_request **, int run_id, const struct proc_output *);
int request_parse_finished_run(const struct jsonrpc_request *, int *run_id, struct proc_output **);
+int request_create_get_runs(struct jsonrpc_request **);
+int request_parse_get_runs(const struct jsonrpc_request *);
+
+int response_create_get_runs(struct jsonrpc_response **, const struct jsonrpc_request *,
+ const struct run_queue *);
+
#endif
diff --git a/src/run_queue.c b/src/run_queue.c
index 188a651..6414e6e 100644
--- a/src/run_queue.c
+++ b/src/run_queue.c
@@ -6,20 +6,27 @@
*/
#include "run_queue.h"
+#include "json.h"
#include "log.h"
+#include <json-c/json_object.h>
+
#include <stdlib.h>
#include <string.h>
#include <sys/queue.h>
struct run {
int id;
- char *url;
- char *rev;
+ char *repo_url;
+ char *repo_rev;
+ int status;
+ int exit_code;
+
SIMPLEQ_ENTRY(run) entries;
};
-int run_create(struct run **_entry, int id, const char *_url, const char *_rev)
+int run_new(struct run **_entry, int id, const char *_repo_url, const char *_repo_rev,
+ enum run_status status, int exit_code)
{
struct run *entry = malloc(sizeof(struct run));
if (!entry) {
@@ -27,27 +34,29 @@ int run_create(struct run **_entry, int id, const char *_url, const char *_rev)
goto fail;
}
- char *url = strdup(_url);
- if (!url) {
+ char *repo_url = strdup(_repo_url);
+ if (!repo_url) {
log_errno("strdup");
goto free_entry;
}
- char *rev = strdup(_rev);
- if (!rev) {
+ char *repo_rev = strdup(_repo_rev);
+ if (!repo_rev) {
log_errno("strdup");
- goto free_url;
+ goto free_repo_url;
}
entry->id = id;
- entry->url = url;
- entry->rev = rev;
+ entry->repo_url = repo_url;
+ entry->repo_rev = repo_rev;
+ entry->status = status;
+ entry->exit_code = exit_code;
*_entry = entry;
return 0;
-free_url:
- free(url);
+free_repo_url:
+ free(repo_url);
free_entry:
free(entry);
@@ -58,24 +67,64 @@ fail:
void run_destroy(struct run *entry)
{
- free(entry->rev);
- free(entry->url);
+ free(entry->repo_rev);
+ free(entry->repo_url);
free(entry);
}
+int run_queued(struct run **entry, const char *repo_url, const char *repo_rev)
+{
+ return run_new(entry, -1, repo_url, repo_rev, RUN_STATUS_CREATED, -1);
+}
+
+int run_created(struct run **entry, int id, const char *repo_url, const char *repo_rev)
+{
+ return run_new(entry, id, repo_url, repo_rev, RUN_STATUS_CREATED, -1);
+}
+
+int run_to_json(const struct run *entry, struct json_object **_json)
+{
+ struct json_object *json = NULL;
+ int ret = 0;
+
+ ret = json_new_object(&json);
+ if (ret < 0)
+ return -1;
+ ret = json_set_int_const_key(json, "id", entry->id);
+ if (ret < 0)
+ goto free;
+ ret = json_set_int_const_key(json, "exit_code", entry->exit_code);
+ if (ret < 0)
+ goto free;
+ ret = json_set_string_const_key(json, "repo_url", entry->repo_url);
+ if (ret < 0)
+ goto free;
+ ret = json_set_string_const_key(json, "repo_rev", entry->repo_rev);
+ if (ret < 0)
+ goto free;
+
+ *_json = json;
+ return ret;
+
+free:
+ json_object_put(json);
+
+ return ret;
+}
+
int run_get_id(const struct run *entry)
{
return entry->id;
}
-const char *run_get_url(const struct run *entry)
+const char *run_get_repo_url(const struct run *entry)
{
- return entry->url;
+ return entry->repo_url;
}
-const char *run_get_rev(const struct run *entry)
+const char *run_get_repo_rev(const struct run *entry)
{
- return entry->rev;
+ return entry->repo_rev;
}
void run_set_id(struct run *entry, int id)
@@ -99,6 +148,39 @@ void run_queue_destroy(struct run_queue *queue)
SIMPLEQ_INIT(queue);
}
+int run_queue_to_json(const struct run_queue *queue, struct json_object **_json)
+{
+ struct json_object *json = NULL;
+ int ret = 0;
+
+ ret = json_new_array(&json);
+ if (ret < 0)
+ return ret;
+
+ struct run *entry = NULL;
+ SIMPLEQ_FOREACH(entry, queue, entries)
+ {
+ struct json_object *entry_json = NULL;
+ ret = run_to_json(entry, &entry_json);
+ if (ret < 0)
+ goto free;
+
+ ret = json_append(json, entry_json);
+ if (ret < 0) {
+ json_object_put(entry_json);
+ goto free;
+ }
+ }
+
+ *_json = json;
+ return ret;
+
+free:
+ json_object_put(json);
+
+ return ret;
+}
+
int run_queue_is_empty(const struct run_queue *queue)
{
return SIMPLEQ_EMPTY(queue);
diff --git a/src/run_queue.h b/src/run_queue.h
index ac4554b..3c3d8e4 100644
--- a/src/run_queue.h
+++ b/src/run_queue.h
@@ -8,16 +8,29 @@
#ifndef __RUN_QUEUE_H__
#define __RUN_QUEUE_H__
+#include <json-c/json_object.h>
+
#include <sys/queue.h>
+enum run_status {
+ RUN_STATUS_CREATED = 1,
+ RUN_STATUS_FINISHED = 2,
+};
+
struct run;
-int run_create(struct run **, int id, const char *url, const char *rev);
+int run_new(struct run **, int id, const char *repo_url, const char *repo_rev, enum run_status,
+ int exit_code);
void run_destroy(struct run *);
+int run_queued(struct run **, const char *repo_url, const char *repo_rev);
+int run_created(struct run **, int id, const char *repo_url, const char *repo_rev);
+
+int run_to_json(const struct run *, struct json_object **);
+
int run_get_id(const struct run *);
-const char *run_get_url(const struct run *);
-const char *run_get_rev(const struct run *);
+const char *run_get_repo_url(const struct run *);
+const char *run_get_repo_rev(const struct run *);
void run_set_id(struct run *, int id);
@@ -26,6 +39,8 @@ SIMPLEQ_HEAD(run_queue, run);
void run_queue_create(struct run_queue *);
void run_queue_destroy(struct run_queue *);
+int run_queue_to_json(const struct run_queue *, struct json_object **);
+
int run_queue_is_empty(const struct run_queue *);
void run_queue_add_first(struct run_queue *, struct run *);
diff --git a/src/server.c b/src/server.c
index 37806c9..7ffba68 100644
--- a/src/server.c
+++ b/src/server.c
@@ -125,7 +125,7 @@ static int server_enqueue_run(struct server *server, struct run *run)
{
int ret = 0;
- ret = storage_run_create(&server->storage, run_get_url(run), run_get_rev(run));
+ ret = storage_run_create(&server->storage, run_get_repo_url(run), run_get_repo_rev(run));
if (ret < 0)
return ret;
run_set_id(run, ret);
@@ -136,7 +136,7 @@ static int server_enqueue_run(struct server *server, struct run *run)
run_queue_add_last(&server->run_queue, run);
log("Added a new run %d for repository %s to the queue\n", run_get_id(run),
- run_get_url(run));
+ run_get_repo_url(run));
server_notify(server);
server_unlock(server);
@@ -164,7 +164,8 @@ static int server_wait_for_action(struct server *server)
static void server_assign_run(struct server *server)
{
struct run *run = run_queue_remove_first(&server->run_queue);
- log("Removed run %d for repository %s from the queue\n", run_get_id(run), run_get_url(run));
+ log("Removed run %d for repository %s from the queue\n", run_get_id(run),
+ run_get_repo_url(run));
struct worker *worker = worker_queue_remove_first(&server->worker_queue);
log("Removed worker %d from the queue\n", worker_get_fd(worker));
@@ -184,11 +185,11 @@ static void server_assign_run(struct server *server)
exit:
if (ret < 0) {
log("Failed to assign run for repository %s to worker %d, requeueing\n",
- run_get_url(run), worker_get_fd(worker));
+ run_get_repo_url(run), worker_get_fd(worker));
run_queue_add_first(&server->run_queue, run);
} else {
log("Assigned run %d for repository %s to worker %d\n", run_get_id(run),
- run_get_url(run), worker_get_fd(worker));
+ run_get_repo_url(run), worker_get_fd(worker));
run_destroy(run);
}
@@ -316,10 +317,40 @@ free_output:
return ret;
}
+static int server_handle_cmd_get_runs(const struct jsonrpc_request *request,
+ struct jsonrpc_response **response, void *_ctx)
+{
+ struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx;
+ struct server *server = (struct server *)ctx->arg;
+ int ret = 0;
+
+ ret = request_parse_get_runs(request);
+ if (ret < 0)
+ return ret;
+
+ struct run_queue runs;
+
+ ret = storage_get_runs(&server->storage, &runs);
+ if (ret < 0) {
+ log_err("Failed to fetch runs\n");
+ return ret;
+ }
+
+ ret = response_create_get_runs(response, request, &runs);
+ if (ret < 0)
+ goto destroy_runs;
+
+destroy_runs:
+ run_queue_destroy(&runs);
+
+ return ret;
+}
+
static struct cmd_desc commands[] = {
{CMD_NEW_WORKER, server_handle_cmd_new_worker},
{CMD_QUEUE_RUN, server_handle_cmd_queue_run},
{CMD_FINISHED_RUN, server_handle_cmd_finished_run},
+ {CMD_GET_RUNS, server_handle_cmd_get_runs},
};
static const size_t numof_commands = sizeof(commands) / sizeof(commands[0]);
diff --git a/src/storage.c b/src/storage.c
index ebb2ce8..1916b01 100644
--- a/src/storage.c
+++ b/src/storage.c
@@ -19,7 +19,9 @@ typedef void (*storage_destroy_t)(struct storage *);
typedef int (*storage_run_create_t)(struct storage *, const char *repo_url, const char *rev);
typedef int (*storage_run_finished_t)(struct storage *, int repo_id, const struct proc_output *);
-typedef int (*storage_get_run_queue_t)(struct storage *, struct run_queue *);
+
+typedef int (*storage_get_runs_t)(struct storage *, struct run_queue *);
+typedef storage_get_runs_t storage_get_run_queue_t;
struct storage_api {
storage_settings_destroy_t destroy_settings;
@@ -28,6 +30,8 @@ struct storage_api {
storage_run_create_t run_create;
storage_run_finished_t run_finished;
+
+ storage_get_runs_t get_runs;
storage_get_run_queue_t get_run_queue;
};
@@ -39,6 +43,8 @@ static const struct storage_api apis[] = {
storage_sqlite_run_create,
storage_sqlite_run_finished,
+
+ storage_sqlite_get_runs,
storage_sqlite_get_run_queue,
},
};
@@ -107,6 +113,14 @@ int storage_run_finished(struct storage *storage, int run_id, const struct proc_
return api->run_finished(storage, run_id, output);
}
+int storage_get_runs(struct storage *storage, struct run_queue *queue)
+{
+ const struct storage_api *api = get_api(storage->type);
+ if (!api)
+ return -1;
+ return api->get_runs(storage, queue);
+}
+
int storage_get_run_queue(struct storage *storage, struct run_queue *queue)
{
const struct storage_api *api = get_api(storage->type);
diff --git a/src/storage.h b/src/storage.h
index f7406a5..83e12f5 100644
--- a/src/storage.h
+++ b/src/storage.h
@@ -37,6 +37,8 @@ void storage_destroy(struct storage *);
int storage_run_create(struct storage *, const char *repo_url, const char *rev);
int storage_run_finished(struct storage *, int run_id, const struct proc_output *);
+
+int storage_get_runs(struct storage *, struct run_queue *);
int storage_get_run_queue(struct storage *, struct run_queue *);
#endif
diff --git a/src/storage_sqlite.c b/src/storage_sqlite.c
index 8973fd9..143f302 100644
--- a/src/storage_sqlite.c
+++ b/src/storage_sqlite.c
@@ -54,11 +54,6 @@ void storage_sqlite_settings_destroy(const struct storage_settings *settings)
free(settings->sqlite);
}
-enum run_status {
- RUN_STATUS_CREATED = 1,
- RUN_STATUS_FINISHED = 2,
-};
-
struct prepared_stmt {
pthread_mutex_t mtx;
sqlite3_stmt *impl;
@@ -114,6 +109,8 @@ struct storage_sqlite {
struct prepared_stmt stmt_repo_insert;
struct prepared_stmt stmt_run_insert;
struct prepared_stmt stmt_run_finished;
+ struct prepared_stmt stmt_get_runs;
+ struct prepared_stmt stmt_get_run_queue;
};
static int storage_sqlite_upgrade_to(struct storage_sqlite *storage, size_t version)
@@ -209,6 +206,10 @@ static int storage_sqlite_prepare_statements(struct storage_sqlite *storage)
"INSERT INTO cimple_runs(status, exit_code, output, repo_id, repo_rev) VALUES (?, -1, x'', ?, ?) RETURNING id;";
static const char *const fmt_run_finished =
"UPDATE cimple_runs SET status = ?, exit_code = ?, output = ? WHERE id = ?;";
+ static const char *const fmt_get_runs =
+ "SELECT id, status, exit_code, repo_url, repo_rev FROM cimple_runs_view ORDER BY id DESC";
+ static const char *const fmt_get_run_queue =
+ "SELECT id, status, exit_code, repo_url, repo_rev FROM cimple_runs_view WHERE status = ? ORDER BY id;";
int ret = 0;
@@ -224,9 +225,19 @@ static int storage_sqlite_prepare_statements(struct storage_sqlite *storage)
ret = prepared_stmt_init(&storage->stmt_run_finished, storage->db, fmt_run_finished);
if (ret < 0)
goto finalize_run_insert;
+ ret = prepared_stmt_init(&storage->stmt_get_runs, storage->db, fmt_get_runs);
+ if (ret < 0)
+ goto finalize_run_finished;
+ ret = prepared_stmt_init(&storage->stmt_get_run_queue, storage->db, fmt_get_run_queue);
+ if (ret < 0)
+ goto finalize_get_runs;
return ret;
+finalize_get_runs:
+ prepared_stmt_destroy(&storage->stmt_get_runs);
+finalize_run_finished:
+ prepared_stmt_destroy(&storage->stmt_run_finished);
finalize_run_insert:
prepared_stmt_destroy(&storage->stmt_run_insert);
finalize_repo_insert:
@@ -239,6 +250,8 @@ finalize_repo_find:
static void storage_sqlite_finalize_statements(struct storage_sqlite *storage)
{
+ prepared_stmt_destroy(&storage->stmt_get_run_queue);
+ prepared_stmt_destroy(&storage->stmt_get_runs);
prepared_stmt_destroy(&storage->stmt_run_finished);
prepared_stmt_destroy(&storage->stmt_run_insert);
prepared_stmt_destroy(&storage->stmt_repo_insert);
@@ -433,18 +446,20 @@ static int storage_sqlite_row_to_run(struct sqlite3_stmt *stmt, struct run **run
int ret = 0;
int id = sqlite_column_int(stmt, 0);
+ int status = sqlite_column_int(stmt, 1);
+ int exit_code = sqlite_column_int(stmt, 2);
char *url = NULL;
- ret = sqlite_column_text(stmt, 1, &url);
+ ret = sqlite_column_text(stmt, 3, &url);
if (ret < 0)
return ret;
char *rev = NULL;
- ret = sqlite_column_text(stmt, 2, &rev);
+ ret = sqlite_column_text(stmt, 4, &rev);
if (ret < 0)
goto free_url;
- ret = run_create(run, id, url, rev);
+ ret = run_new(run, id, url, rev, status, exit_code);
if (ret < 0)
goto free_rev;
@@ -459,21 +474,10 @@ free_url:
return ret;
}
-int storage_sqlite_get_run_queue(struct storage *storage, struct run_queue *queue)
+static int storage_sqlite_rows_to_runs(struct sqlite3_stmt *stmt, struct run_queue *queue)
{
- static const char *const fmt =
- "SELECT id, repo_url, repo_rev FROM cimple_runs_view WHERE status = ?;";
-
- sqlite3_stmt *stmt;
int ret = 0;
- ret = sqlite_prepare(storage->sqlite->db, fmt, &stmt);
- if (ret < 0)
- return ret;
- ret = sqlite_bind_int(stmt, 1, RUN_STATUS_CREATED);
- if (ret < 0)
- goto finalize;
-
run_queue_create(queue);
while (1) {
@@ -492,13 +496,51 @@ int storage_sqlite_get_run_queue(struct storage *storage, struct run_queue *queu
run_queue_add_last(queue, run);
}
- goto finalize;
+ return ret;
run_queue_destroy:
run_queue_destroy(queue);
-finalize:
- sqlite_finalize(stmt);
+ return ret;
+}
+
+int storage_sqlite_get_runs(struct storage *storage, struct run_queue *queue)
+{
+ struct prepared_stmt *stmt = &storage->sqlite->stmt_get_runs;
+ int ret = 0;
+
+ ret = prepared_stmt_lock(stmt);
+ if (ret < 0)
+ return ret;
+ ret = storage_sqlite_rows_to_runs(stmt->impl, queue);
+ if (ret < 0)
+ goto reset;
+
+reset:
+ sqlite_reset(stmt->impl);
+ prepared_stmt_unlock(stmt);
+
+ return ret;
+}
+
+int storage_sqlite_get_run_queue(struct storage *storage, struct run_queue *queue)
+{
+ struct prepared_stmt *stmt = &storage->sqlite->stmt_get_run_queue;
+ int ret = 0;
+
+ ret = prepared_stmt_lock(stmt);
+ if (ret < 0)
+ return ret;
+ ret = sqlite_bind_int(stmt->impl, 1, RUN_STATUS_CREATED);
+ if (ret < 0)
+ goto reset;
+ ret = storage_sqlite_rows_to_runs(stmt->impl, queue);
+ if (ret < 0)
+ goto reset;
+
+reset:
+ sqlite_reset(stmt->impl);
+ prepared_stmt_unlock(stmt);
return ret;
}
diff --git a/src/storage_sqlite.h b/src/storage_sqlite.h
index 857b9c0..1a9deea 100644
--- a/src/storage_sqlite.h
+++ b/src/storage_sqlite.h
@@ -25,6 +25,8 @@ void storage_sqlite_destroy(struct storage *);
int storage_sqlite_run_create(struct storage *, const char *repo_url, const char *rev);
int storage_sqlite_run_finished(struct storage *, int id, const struct proc_output *);
+
+int storage_sqlite_get_runs(struct storage *, struct run_queue *runs);
int storage_sqlite_get_run_queue(struct storage *, struct run_queue *runs);
#endif
diff --git a/src/worker.c b/src/worker.c
index 5a0804f..ce024b8 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -184,7 +184,7 @@ static int worker_do_run(struct worker *worker)
if (ret < 0)
return ret;
- ret = ci_run_git_repo(run_get_url(worker->run), run_get_rev(worker->run), result);
+ ret = ci_run_git_repo(run_get_repo_url(worker->run), run_get_repo_rev(worker->run), result);
if (ret < 0) {
log_err("Run failed with an error\n");
goto free_output;
diff --git a/test/py/test_repo.py b/test/py/test_repo.py
index e500b9f..e5845fe 100644
--- a/test/py/test_repo.py
+++ b/test/py/test_repo.py
@@ -3,6 +3,7 @@
# For details, see https://github.com/egor-tensin/cimple.
# Distributed under the MIT License.
+import json
import logging
import multiprocessing as mp
import re
@@ -71,6 +72,19 @@ def _test_repo_internal(env, repo, numof_processes, runs_per_process):
assert repo.run_exit_code_matches(ec), f"Exit code doesn't match: {ec}"
assert repo.run_output_matches(output), f"Output doesn't match: {output}"
+ runs = env.client.run('get-runs')
+ runs = json.loads(runs)['result']
+ assert len(runs) == numof_runs
+
+ for run in runs:
+ id = run['id']
+ ec = run['exit_code']
+
+ assert repo.run_exit_code_matches(ec), f"Exit code doesn't match: {ec}"
+ # Not implemented yet:
+ assert 'status' not in run
+ assert 'output' not in run
+
@my_parametrize('runs_per_client', [1, 5])
@my_parametrize('numof_clients', [1, 5])