aboutsummaryrefslogtreecommitdiffstatshomepage
diff options
context:
space:
mode:
authorEgor Tensin <Egor.Tensin@gmail.com>2023-07-04 22:27:21 +0200
committerEgor Tensin <Egor.Tensin@gmail.com>2023-07-04 22:27:21 +0200
commit933ea998820c9697ea0d43370366edd24e443b65 (patch)
treedb480f3d69eed2ef6bce0ad8921cc0f2f30ab5f6
parentstorage_sqlite: refactoring (diff)
downloadcimple-933ea998820c9697ea0d43370366edd24e443b65.tar.gz
cimple-933ea998820c9697ea0d43370366edd24e443b65.zip
move custom message parsing to a separate module
Diffstat (limited to '')
-rw-r--r--src/CMakeLists.txt3
-rw-r--r--src/protocol.c111
-rw-r--r--src/protocol.h24
-rw-r--r--src/run_queue.c35
-rw-r--r--src/run_queue.h3
-rw-r--r--src/server.c59
-rw-r--r--src/worker.c45
7 files changed, 188 insertions, 92 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 6006b7d..5413976 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -52,6 +52,8 @@ add_my_executable(server server_main.c server.c
log.c
msg.c
net.c
+ process.c
+ protocol.c
run_queue.c
signal.c
sql/sqlite_sql.h
@@ -84,6 +86,7 @@ add_my_executable(worker worker_main.c worker.c
msg.c
net.c
process.c
+ protocol.c
run_queue.c
signal.c
string.c)
diff --git a/src/protocol.c b/src/protocol.c
new file mode 100644
index 0000000..0b6e74e
--- /dev/null
+++ b/src/protocol.c
@@ -0,0 +1,111 @@
+/*
+ * Copyright (c) 2023 Egor Tensin <Egor.Tensin@gmail.com>
+ * This file is part of the "cimple" project.
+ * For details, see https://github.com/egor-tensin/cimple.
+ * Distributed under the MIT License.
+ */
+
+#include "protocol.h"
+#include "const.h"
+#include "log.h"
+#include "msg.h"
+#include "process.h"
+#include "run_queue.h"
+#include "string.h"
+
+#include <stddef.h>
+#include <stdio.h>
+
+static int check_msg_length(const struct msg *msg, size_t expected)
+{
+ size_t actual = msg_get_length(msg);
+
+ if (actual != expected) {
+ log_err("Invalid number of arguments for a message: %zu\n", actual);
+ msg_dump(msg);
+ return -1;
+ }
+
+ return 0;
+}
+
+int msg_run_parse(const struct msg *msg, struct run **run)
+{
+ int ret = check_msg_length(msg, 3);
+ if (ret < 0)
+ return ret;
+
+ const char **argv = msg_get_strings(msg);
+ /* We don't know the ID yet. */
+ return run_create(run, 0, argv[1], argv[2]);
+}
+
+int msg_new_worker_create(struct msg **msg)
+{
+ static const char *argv[] = {CMD_NEW_WORKER, NULL};
+ return msg_from_argv(msg, argv);
+}
+
+int msg_start_create(struct msg **msg, const struct run *run)
+{
+ char id[16];
+ snprintf(id, sizeof(id), "%d", run_get_id(run));
+
+ const char *argv[] = {CMD_START, id, run_get_url(run), run_get_rev(run), NULL};
+
+ return msg_from_argv(msg, argv);
+}
+
+int msg_start_parse(const struct msg *msg, struct run **run)
+{
+ int ret = 0;
+
+ ret = check_msg_length(msg, 4);
+ if (ret < 0)
+ return ret;
+
+ const char **argv = msg_get_strings(msg);
+
+ int id = 0;
+
+ ret = string_to_int(argv[1], &id);
+ if (ret < 0)
+ return ret;
+
+ return run_create(run, id, argv[2], argv[3]);
+}
+
+int msg_finished_create(struct msg **msg, int run_id, const struct proc_output *output)
+{
+ char id[16];
+ char ec[16];
+
+ snprintf(id, sizeof(id), "%d", run_id);
+ snprintf(ec, sizeof(ec), "%d", output->ec);
+
+ const char *argv[] = {CMD_FINISHED, id, ec, NULL};
+
+ return msg_from_argv(msg, argv);
+}
+
+int msg_finished_parse(const struct msg *msg, int *run_id, struct proc_output *output)
+{
+ int ret = 0;
+
+ ret = check_msg_length(msg, 3);
+ if (ret < 0)
+ return ret;
+
+ const char **argv = msg_get_strings(msg);
+
+ proc_output_init(output);
+
+ ret = string_to_int(argv[1], run_id);
+ if (ret < 0)
+ return ret;
+ ret = string_to_int(argv[2], &output->ec);
+ if (ret < 0)
+ return ret;
+
+ return 0;
+}
diff --git a/src/protocol.h b/src/protocol.h
new file mode 100644
index 0000000..77d800b
--- /dev/null
+++ b/src/protocol.h
@@ -0,0 +1,24 @@
+/*
+ * Copyright (c) 2023 Egor Tensin <Egor.Tensin@gmail.com>
+ * This file is part of the "cimple" project.
+ * For details, see https://github.com/egor-tensin/cimple.
+ * Distributed under the MIT License.
+ */
+
+#ifndef __PROTOCOL_H__
+#define __PROTOCOL_H__
+
+#include "process.h"
+#include "run_queue.h"
+
+int msg_run_parse(const struct msg *, struct run **);
+
+int msg_new_worker_create(struct msg **);
+
+int msg_start_create(struct msg **, const struct run *);
+int msg_start_parse(const struct msg *, struct run **);
+
+int msg_finished_create(struct msg **, int run_id, const struct proc_output *);
+int msg_finished_parse(const struct msg *, int *run_id, struct proc_output *);
+
+#endif
diff --git a/src/run_queue.c b/src/run_queue.c
index e1fdf84..184dae5 100644
--- a/src/run_queue.c
+++ b/src/run_queue.c
@@ -65,41 +65,6 @@ void run_destroy(struct run *entry)
free(entry);
}
-int run_from_msg(struct run **run, const struct msg *msg)
-{
- size_t msg_len = msg_get_length(msg);
-
- if (msg_len != 4) {
- log_err("Invalid number of arguments for a message: %zu\n", msg_len);
- msg_dump(msg);
- return -1;
- }
-
- const char **argv = msg_get_strings(msg);
-
- int id = 0;
- int ret = string_to_int(argv[1], &id);
- if (ret < 0)
- return ret;
-
- return run_create(run, id, argv[2], argv[3]);
-}
-
-int run_from_msg_unknown_id(struct run **run, const struct msg *msg)
-{
- size_t msg_len = msg_get_length(msg);
-
- if (msg_len != 3) {
- log_err("Invalid number of arguments for a message: %zu\n", msg_len);
- msg_dump(msg);
- return -1;
- }
-
- const char **argv = msg_get_strings(msg);
- /* We don't know the ID yet. */
- return run_create(run, 0, argv[1], argv[2]);
-}
-
int run_get_id(const struct run *entry)
{
return entry->id;
diff --git a/src/run_queue.h b/src/run_queue.h
index df716a4..5447bd4 100644
--- a/src/run_queue.h
+++ b/src/run_queue.h
@@ -17,9 +17,6 @@ struct run;
int run_create(struct run **, int id, const char *url, const char *rev);
void run_destroy(struct run *);
-int run_from_msg(struct run **, const struct msg *);
-int run_from_msg_unknown_id(struct run **, const struct msg *);
-
int run_get_id(const struct run *);
const char *run_get_url(const struct run *);
const char *run_get_rev(const struct run *);
diff --git a/src/server.c b/src/server.c
index a1bbbe8..6f82888 100644
--- a/src/server.c
+++ b/src/server.c
@@ -13,6 +13,7 @@
#include "file.h"
#include "log.h"
#include "msg.h"
+#include "protocol.h"
#include "run_queue.h"
#include "signal.h"
#include "storage.h"
@@ -170,12 +171,19 @@ static void server_assign_run(struct server *server)
struct worker *worker = worker_queue_remove_first(&server->worker_queue);
log("Removed worker %d from the queue\n", worker_get_fd(worker));
- char id[16];
- snprintf(id, sizeof(id), "%d", run_get_id(run));
+ struct msg *start_msg = NULL;
+ int ret = 0;
- const char *argv[] = {CMD_START, id, run_get_url(run), run_get_rev(run), NULL};
- int ret = msg_talk_argv(worker_get_fd(worker), argv, NULL);
+ ret = msg_start_create(&start_msg, run);
+ if (ret < 0)
+ goto exit;
+ ret = msg_talk(worker_get_fd(worker), start_msg, NULL);
+ msg_free(start_msg);
+ if (ret < 0)
+ goto exit;
+
+exit:
if (ret < 0) {
log("Failed to assign run for repository %s to worker %d, requeueing\n",
run_get_url(run), worker_get_fd(worker));
@@ -221,17 +229,15 @@ static int server_handle_cmd_new_worker(UNUSED const struct msg *request,
{
struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx;
struct server *server = (struct server *)ctx->arg;
- int client_fd = ctx->fd;
-
- struct worker *worker = NULL;
int ret = 0;
- ret = file_dup(client_fd);
+ ret = file_dup(ctx->fd);
if (ret < 0)
return ret;
- client_fd = ret;
- ret = worker_create(&worker, client_fd);
+ struct worker *worker = NULL;
+
+ ret = worker_create(&worker, ret);
if (ret < 0)
return ret;
@@ -251,11 +257,11 @@ static int server_handle_cmd_run(const struct msg *request, struct msg **respons
{
struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx;
struct server *server = (struct server *)ctx->arg;
- struct run *run = NULL;
-
int ret = 0;
- ret = run_from_msg_unknown_id(&run, request);
+ struct run *run = NULL;
+
+ ret = msg_run_parse(request, &run);
if (ret < 0)
return ret;
@@ -283,33 +289,20 @@ static int server_handle_cmd_finished(const struct msg *request, UNUSED struct m
void *_ctx)
{
struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx;
- int client_fd = ctx->fd;
+ struct server *server = (struct server *)ctx->arg;
int ret = 0;
- log("Received a \"run finished\" message from worker %d\n", client_fd);
-
- size_t msg_len = msg_get_length(request);
-
- if (msg_len != 3) {
- log_err("Invalid number of arguments for a message: %zu\n", msg_len);
- msg_dump(request);
- return -1;
- }
-
- const char **argv = msg_get_strings(request);
+ log("Received a \"run finished\" message from worker %d\n", ctx->fd);
- int run_id, ec;
+ int run_id;
+ struct proc_output output;
- ret = string_to_int(argv[1], &run_id);
- if (ret < 0)
- return ret;
- ret = string_to_int(argv[2], &ec);
+ ret = msg_finished_parse(request, &run_id, &output);
if (ret < 0)
return ret;
- struct server *server = (struct server *)ctx->arg;
-
- ret = storage_run_finished(&server->storage, run_id, ec);
+ ret = storage_run_finished(&server->storage, run_id, output.ec);
+ proc_output_free(&output);
if (ret < 0) {
log_err("Failed to mark run %d as finished\n", run_id);
return ret;
diff --git a/src/worker.c b/src/worker.c
index 4e71f49..3bd2155 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -16,6 +16,7 @@
#include "msg.h"
#include "net.h"
#include "process.h"
+#include "protocol.h"
#include "run_queue.h"
#include "signal.h"
@@ -80,33 +81,20 @@ static int worker_set_stopping(UNUSED struct event_loop *loop, UNUSED int fd, UN
return 0;
}
-static int worker_send_finished(struct worker *worker, const struct run *run,
- struct proc_output *output)
-{
- char id[16];
- char ec[16];
-
- snprintf(id, sizeof(id), "%d", run_get_id(run));
- snprintf(ec, sizeof(ec), "%d", output->ec);
-
- const char *argv[] = {CMD_FINISHED, id, ec, NULL};
-
- return msg_connect_and_talk_argv(worker->settings->host, worker->settings->port, argv,
- NULL);
-}
-
static int worker_handle_cmd_start(const struct msg *request, UNUSED struct msg **response,
void *_ctx)
{
struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx;
- struct run *run = NULL;
- struct proc_output result;
+ struct worker *worker = (struct worker *)ctx->arg;
int ret = 0;
- ret = run_from_msg(&run, request);
+ struct run *run = NULL;
+
+ ret = msg_start_parse(request, &run);
if (ret < 0)
return ret;
+ struct proc_output result;
proc_output_init(&result);
ret = ci_run_git_repo(run_get_url(run), run_get_rev(run), &result);
@@ -117,10 +105,20 @@ static int worker_handle_cmd_start(const struct msg *request, UNUSED struct msg
proc_output_dump(&result);
- ret = worker_send_finished((struct worker *)ctx->arg, run, &result);
+ struct msg *finished_msg = NULL;
+
+ ret = msg_finished_create(&finished_msg, run_get_id(run), &result);
if (ret < 0)
goto free_output;
+ ret = msg_connect_and_talk(worker->settings->host, worker->settings->port, finished_msg,
+ NULL);
+ if (ret < 0)
+ goto free_finished_msg;
+
+free_finished_msg:
+ msg_free(finished_msg);
+
free_output:
proc_output_free(&result);
@@ -218,9 +216,14 @@ int worker_main(struct worker *worker)
return ret;
const int fd = ret;
- static const char *argv[] = {CMD_NEW_WORKER, NULL};
+ struct msg *new_worker_msg = NULL;
+
+ ret = msg_new_worker_create(&new_worker_msg);
+ if (ret < 0)
+ return ret;
- ret = msg_send_argv(fd, argv);
+ ret = msg_send(fd, new_worker_msg);
+ msg_free(new_worker_msg);
if (ret < 0)
return ret;