diff options
author | Egor Tensin <Egor.Tensin@gmail.com> | 2023-05-13 08:59:43 +0200 |
---|---|---|
committer | Egor Tensin <Egor.Tensin@gmail.com> | 2023-05-13 11:37:46 +0200 |
commit | f471fbdf27462b82febe4e8db8358ab3380d2a28 (patch) | |
tree | 6c41abf4e32214cd8bdb0f8a377f93b3c7c0d830 /src/worker.c | |
parent | ci_queue: fix a broken getter (diff) | |
download | cimple-f471fbdf27462b82febe4e8db8358ab3380d2a28.tar.gz cimple-f471fbdf27462b82febe4e8db8358ab3380d2a28.zip |
add command module to handle request-response communications
Diffstat (limited to '')
-rw-r--r-- | src/worker.c | 106 |
1 files changed, 34 insertions, 72 deletions
diff --git a/src/worker.c b/src/worker.c index ed3d7e5..ae9a578 100644 --- a/src/worker.c +++ b/src/worker.c @@ -7,6 +7,7 @@ #include "worker.h" #include "ci.h" +#include "command.h" #include "compiler.h" #include "const.h" #include "git.h" @@ -16,8 +17,8 @@ #include "process.h" #include "signal.h" +#include <pthread.h> #include <stdlib.h> -#include <string.h> #include <unistd.h> struct worker { @@ -73,20 +74,15 @@ void worker_destroy(struct worker *worker) static int msg_send_new_worker(const struct worker *worker) { static const char *argv[] = {CMD_NEW_WORKER, NULL}; - struct msg msg; + struct msg *msg; int ret = 0; ret = msg_from_argv(&msg, argv); if (ret < 0) return ret; - ret = msg_send(worker->fd, &msg); - if (ret < 0) - goto free_msg; - -free_msg: - msg_free(&msg); - + ret = msg_send(worker->fd, msg); + msg_free(msg); return ret; } @@ -109,77 +105,37 @@ static int msg_ci_run_do(const char *url, const char *rev, struct proc_output *r return 0; } -static int msg_ci_run_handler(struct worker *worker, const struct msg *request) +static int msg_ci_run_handler(UNUSED int conn_fd, const struct msg *request, UNUSED void *_worker, + struct msg **response) { - struct msg response; struct proc_output result; int ret = 0; + if (msg_get_length(request) != 3) { + log_err("Invalid number of arguments for a message\n"); + msg_dump(request); + return -1; + } + + const char **words = msg_get_words(request); + proc_output_init(&result); - ret = msg_ci_run_do(request->argv[1], request->argv[2], &result); + ret = msg_ci_run_do(words[1], words[2], &result); proc_output_free(&result); if (ret < 0) - ret = msg_error(&response); - else - ret = msg_success(&response); - - if (ret < 0) return ret; - ret = msg_send(worker->fd, &response); - msg_free(&response); - return ret; -} - -static int msg_ci_run_parser(const struct msg *msg) -{ - if (msg->argc != 3) { - log_err("Invalid number of arguments for a message: %d\n", msg->argc); - return 0; - } - - return 1; + return msg_success(response); } -typedef int (*msg_parser)(const struct msg *msg); -typedef int (*msg_handler)(struct worker *, const struct msg *); - -struct msg_descr { - const char *cmd; - msg_parser parser; - msg_handler handler; -}; - -struct msg_descr messages[] = { - {CMD_CI_RUN, msg_ci_run_parser, msg_ci_run_handler}, +static struct command_def commands[] = { + {CMD_CI_RUN, msg_ci_run_handler}, }; -static int worker_msg_handler(struct worker *worker, const struct msg *request) -{ - if (request->argc == 0) - goto unknown_request; - - size_t numof_messages = sizeof(messages) / sizeof(messages[0]); - - for (size_t i = 0; i < numof_messages; ++i) { - if (strcmp(messages[i].cmd, request->argv[0])) - continue; - if (!messages[i].parser(request)) - continue; - return messages[i].handler(worker, request); - } - -unknown_request: - log_err("Received an unknown message\n"); - msg_dump(request); - struct msg response; - msg_error(&response); - return msg_send(worker->fd, &response); -} - int worker_main(struct worker *worker, UNUSED int argc, UNUSED char *argv[]) { + struct command_dispatcher *dispatcher = NULL; int ret = 0; ret = signal_install_global_handler(); @@ -190,25 +146,31 @@ int worker_main(struct worker *worker, UNUSED int argc, UNUSED char *argv[]) if (ret < 0) return ret; + ret = command_dispatcher_create(&dispatcher, commands, + sizeof(commands) / sizeof(commands[0]), worker); + if (ret < 0) + return ret; + while (!global_stop_flag) { - struct msg request; + struct msg *request; log("Waiting for a new command\n"); ret = msg_recv(worker->fd, &request); if (ret < 0) { - if (errno == EINVAL && global_stop_flag) { + if (errno == EINVAL && global_stop_flag) ret = 0; - break; - } - return ret; + goto dispatcher_destroy; } - ret = worker_msg_handler(worker, &request); - msg_free(&request); + ret = command_dispatcher_msg_handler(dispatcher, worker->fd, request); + msg_free(request); if (ret < 0) - return ret; + goto dispatcher_destroy; } +dispatcher_destroy: + command_dispatcher_destroy(dispatcher); + return ret; } |