From 933ea998820c9697ea0d43370366edd24e443b65 Mon Sep 17 00:00:00 2001 From: Egor Tensin Date: Tue, 4 Jul 2023 22:27:21 +0200 Subject: move custom message parsing to a separate module --- src/CMakeLists.txt | 3 ++ src/protocol.c | 111 +++++++++++++++++++++++++++++++++++++++++++++++++++++ src/protocol.h | 24 ++++++++++++ src/run_queue.c | 35 ----------------- src/run_queue.h | 3 -- src/server.c | 59 +++++++++++++--------------- src/worker.c | 45 ++++++++++++---------- 7 files changed, 188 insertions(+), 92 deletions(-) create mode 100644 src/protocol.c create mode 100644 src/protocol.h (limited to 'src') diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 6006b7d..5413976 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -52,6 +52,8 @@ add_my_executable(server server_main.c server.c log.c msg.c net.c + process.c + protocol.c run_queue.c signal.c sql/sqlite_sql.h @@ -84,6 +86,7 @@ add_my_executable(worker worker_main.c worker.c msg.c net.c process.c + protocol.c run_queue.c signal.c string.c) diff --git a/src/protocol.c b/src/protocol.c new file mode 100644 index 0000000..0b6e74e --- /dev/null +++ b/src/protocol.c @@ -0,0 +1,111 @@ +/* + * Copyright (c) 2023 Egor Tensin + * This file is part of the "cimple" project. + * For details, see https://github.com/egor-tensin/cimple. + * Distributed under the MIT License. + */ + +#include "protocol.h" +#include "const.h" +#include "log.h" +#include "msg.h" +#include "process.h" +#include "run_queue.h" +#include "string.h" + +#include +#include + +static int check_msg_length(const struct msg *msg, size_t expected) +{ + size_t actual = msg_get_length(msg); + + if (actual != expected) { + log_err("Invalid number of arguments for a message: %zu\n", actual); + msg_dump(msg); + return -1; + } + + return 0; +} + +int msg_run_parse(const struct msg *msg, struct run **run) +{ + int ret = check_msg_length(msg, 3); + if (ret < 0) + return ret; + + const char **argv = msg_get_strings(msg); + /* We don't know the ID yet. */ + return run_create(run, 0, argv[1], argv[2]); +} + +int msg_new_worker_create(struct msg **msg) +{ + static const char *argv[] = {CMD_NEW_WORKER, NULL}; + return msg_from_argv(msg, argv); +} + +int msg_start_create(struct msg **msg, const struct run *run) +{ + char id[16]; + snprintf(id, sizeof(id), "%d", run_get_id(run)); + + const char *argv[] = {CMD_START, id, run_get_url(run), run_get_rev(run), NULL}; + + return msg_from_argv(msg, argv); +} + +int msg_start_parse(const struct msg *msg, struct run **run) +{ + int ret = 0; + + ret = check_msg_length(msg, 4); + if (ret < 0) + return ret; + + const char **argv = msg_get_strings(msg); + + int id = 0; + + ret = string_to_int(argv[1], &id); + if (ret < 0) + return ret; + + return run_create(run, id, argv[2], argv[3]); +} + +int msg_finished_create(struct msg **msg, int run_id, const struct proc_output *output) +{ + char id[16]; + char ec[16]; + + snprintf(id, sizeof(id), "%d", run_id); + snprintf(ec, sizeof(ec), "%d", output->ec); + + const char *argv[] = {CMD_FINISHED, id, ec, NULL}; + + return msg_from_argv(msg, argv); +} + +int msg_finished_parse(const struct msg *msg, int *run_id, struct proc_output *output) +{ + int ret = 0; + + ret = check_msg_length(msg, 3); + if (ret < 0) + return ret; + + const char **argv = msg_get_strings(msg); + + proc_output_init(output); + + ret = string_to_int(argv[1], run_id); + if (ret < 0) + return ret; + ret = string_to_int(argv[2], &output->ec); + if (ret < 0) + return ret; + + return 0; +} diff --git a/src/protocol.h b/src/protocol.h new file mode 100644 index 0000000..77d800b --- /dev/null +++ b/src/protocol.h @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2023 Egor Tensin + * This file is part of the "cimple" project. + * For details, see https://github.com/egor-tensin/cimple. + * Distributed under the MIT License. + */ + +#ifndef __PROTOCOL_H__ +#define __PROTOCOL_H__ + +#include "process.h" +#include "run_queue.h" + +int msg_run_parse(const struct msg *, struct run **); + +int msg_new_worker_create(struct msg **); + +int msg_start_create(struct msg **, const struct run *); +int msg_start_parse(const struct msg *, struct run **); + +int msg_finished_create(struct msg **, int run_id, const struct proc_output *); +int msg_finished_parse(const struct msg *, int *run_id, struct proc_output *); + +#endif diff --git a/src/run_queue.c b/src/run_queue.c index e1fdf84..184dae5 100644 --- a/src/run_queue.c +++ b/src/run_queue.c @@ -65,41 +65,6 @@ void run_destroy(struct run *entry) free(entry); } -int run_from_msg(struct run **run, const struct msg *msg) -{ - size_t msg_len = msg_get_length(msg); - - if (msg_len != 4) { - log_err("Invalid number of arguments for a message: %zu\n", msg_len); - msg_dump(msg); - return -1; - } - - const char **argv = msg_get_strings(msg); - - int id = 0; - int ret = string_to_int(argv[1], &id); - if (ret < 0) - return ret; - - return run_create(run, id, argv[2], argv[3]); -} - -int run_from_msg_unknown_id(struct run **run, const struct msg *msg) -{ - size_t msg_len = msg_get_length(msg); - - if (msg_len != 3) { - log_err("Invalid number of arguments for a message: %zu\n", msg_len); - msg_dump(msg); - return -1; - } - - const char **argv = msg_get_strings(msg); - /* We don't know the ID yet. */ - return run_create(run, 0, argv[1], argv[2]); -} - int run_get_id(const struct run *entry) { return entry->id; diff --git a/src/run_queue.h b/src/run_queue.h index df716a4..5447bd4 100644 --- a/src/run_queue.h +++ b/src/run_queue.h @@ -17,9 +17,6 @@ struct run; int run_create(struct run **, int id, const char *url, const char *rev); void run_destroy(struct run *); -int run_from_msg(struct run **, const struct msg *); -int run_from_msg_unknown_id(struct run **, const struct msg *); - int run_get_id(const struct run *); const char *run_get_url(const struct run *); const char *run_get_rev(const struct run *); diff --git a/src/server.c b/src/server.c index a1bbbe8..6f82888 100644 --- a/src/server.c +++ b/src/server.c @@ -13,6 +13,7 @@ #include "file.h" #include "log.h" #include "msg.h" +#include "protocol.h" #include "run_queue.h" #include "signal.h" #include "storage.h" @@ -170,12 +171,19 @@ static void server_assign_run(struct server *server) struct worker *worker = worker_queue_remove_first(&server->worker_queue); log("Removed worker %d from the queue\n", worker_get_fd(worker)); - char id[16]; - snprintf(id, sizeof(id), "%d", run_get_id(run)); + struct msg *start_msg = NULL; + int ret = 0; - const char *argv[] = {CMD_START, id, run_get_url(run), run_get_rev(run), NULL}; - int ret = msg_talk_argv(worker_get_fd(worker), argv, NULL); + ret = msg_start_create(&start_msg, run); + if (ret < 0) + goto exit; + ret = msg_talk(worker_get_fd(worker), start_msg, NULL); + msg_free(start_msg); + if (ret < 0) + goto exit; + +exit: if (ret < 0) { log("Failed to assign run for repository %s to worker %d, requeueing\n", run_get_url(run), worker_get_fd(worker)); @@ -221,17 +229,15 @@ static int server_handle_cmd_new_worker(UNUSED const struct msg *request, { struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx; struct server *server = (struct server *)ctx->arg; - int client_fd = ctx->fd; - - struct worker *worker = NULL; int ret = 0; - ret = file_dup(client_fd); + ret = file_dup(ctx->fd); if (ret < 0) return ret; - client_fd = ret; - ret = worker_create(&worker, client_fd); + struct worker *worker = NULL; + + ret = worker_create(&worker, ret); if (ret < 0) return ret; @@ -251,11 +257,11 @@ static int server_handle_cmd_run(const struct msg *request, struct msg **respons { struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx; struct server *server = (struct server *)ctx->arg; - struct run *run = NULL; - int ret = 0; - ret = run_from_msg_unknown_id(&run, request); + struct run *run = NULL; + + ret = msg_run_parse(request, &run); if (ret < 0) return ret; @@ -283,33 +289,20 @@ static int server_handle_cmd_finished(const struct msg *request, UNUSED struct m void *_ctx) { struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx; - int client_fd = ctx->fd; + struct server *server = (struct server *)ctx->arg; int ret = 0; - log("Received a \"run finished\" message from worker %d\n", client_fd); - - size_t msg_len = msg_get_length(request); - - if (msg_len != 3) { - log_err("Invalid number of arguments for a message: %zu\n", msg_len); - msg_dump(request); - return -1; - } - - const char **argv = msg_get_strings(request); + log("Received a \"run finished\" message from worker %d\n", ctx->fd); - int run_id, ec; + int run_id; + struct proc_output output; - ret = string_to_int(argv[1], &run_id); - if (ret < 0) - return ret; - ret = string_to_int(argv[2], &ec); + ret = msg_finished_parse(request, &run_id, &output); if (ret < 0) return ret; - struct server *server = (struct server *)ctx->arg; - - ret = storage_run_finished(&server->storage, run_id, ec); + ret = storage_run_finished(&server->storage, run_id, output.ec); + proc_output_free(&output); if (ret < 0) { log_err("Failed to mark run %d as finished\n", run_id); return ret; diff --git a/src/worker.c b/src/worker.c index 4e71f49..3bd2155 100644 --- a/src/worker.c +++ b/src/worker.c @@ -16,6 +16,7 @@ #include "msg.h" #include "net.h" #include "process.h" +#include "protocol.h" #include "run_queue.h" #include "signal.h" @@ -80,33 +81,20 @@ static int worker_set_stopping(UNUSED struct event_loop *loop, UNUSED int fd, UN return 0; } -static int worker_send_finished(struct worker *worker, const struct run *run, - struct proc_output *output) -{ - char id[16]; - char ec[16]; - - snprintf(id, sizeof(id), "%d", run_get_id(run)); - snprintf(ec, sizeof(ec), "%d", output->ec); - - const char *argv[] = {CMD_FINISHED, id, ec, NULL}; - - return msg_connect_and_talk_argv(worker->settings->host, worker->settings->port, argv, - NULL); -} - static int worker_handle_cmd_start(const struct msg *request, UNUSED struct msg **response, void *_ctx) { struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx; - struct run *run = NULL; - struct proc_output result; + struct worker *worker = (struct worker *)ctx->arg; int ret = 0; - ret = run_from_msg(&run, request); + struct run *run = NULL; + + ret = msg_start_parse(request, &run); if (ret < 0) return ret; + struct proc_output result; proc_output_init(&result); ret = ci_run_git_repo(run_get_url(run), run_get_rev(run), &result); @@ -117,10 +105,20 @@ static int worker_handle_cmd_start(const struct msg *request, UNUSED struct msg proc_output_dump(&result); - ret = worker_send_finished((struct worker *)ctx->arg, run, &result); + struct msg *finished_msg = NULL; + + ret = msg_finished_create(&finished_msg, run_get_id(run), &result); if (ret < 0) goto free_output; + ret = msg_connect_and_talk(worker->settings->host, worker->settings->port, finished_msg, + NULL); + if (ret < 0) + goto free_finished_msg; + +free_finished_msg: + msg_free(finished_msg); + free_output: proc_output_free(&result); @@ -218,9 +216,14 @@ int worker_main(struct worker *worker) return ret; const int fd = ret; - static const char *argv[] = {CMD_NEW_WORKER, NULL}; + struct msg *new_worker_msg = NULL; + + ret = msg_new_worker_create(&new_worker_msg); + if (ret < 0) + return ret; - ret = msg_send_argv(fd, argv); + ret = msg_send(fd, new_worker_msg); + msg_free(new_worker_msg); if (ret < 0) return ret; -- cgit v1.2.3