diff options
author | Egor Tensin <Egor.Tensin@gmail.com> | 2023-05-13 08:59:43 +0200 |
---|---|---|
committer | Egor Tensin <Egor.Tensin@gmail.com> | 2023-05-13 11:37:46 +0200 |
commit | f471fbdf27462b82febe4e8db8358ab3380d2a28 (patch) | |
tree | 6c41abf4e32214cd8bdb0f8a377f93b3c7c0d830 /src | |
parent | ci_queue: fix a broken getter (diff) | |
download | cimple-f471fbdf27462b82febe4e8db8358ab3380d2a28.tar.gz cimple-f471fbdf27462b82febe4e8db8358ab3380d2a28.zip |
add command module to handle request-response communications
Diffstat (limited to '')
-rw-r--r-- | src/CMakeLists.txt | 2 | ||||
-rw-r--r-- | src/client.c | 19 | ||||
-rw-r--r-- | src/client.h | 2 | ||||
-rw-r--r-- | src/client_main.c | 2 | ||||
-rw-r--r-- | src/command.c | 157 | ||||
-rw-r--r-- | src/command.h | 35 | ||||
-rw-r--r-- | src/msg.c | 143 | ||||
-rw-r--r-- | src/msg.h | 24 | ||||
-rw-r--r-- | src/server.c | 129 | ||||
-rw-r--r-- | src/worker.c | 106 |
10 files changed, 395 insertions, 224 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 6f2cc6b..545fd82 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -36,6 +36,7 @@ generate_sql_header(sqlite) add_my_executable(server server_main.c server.c ci_queue.c cmd_line.c + command.c file.c msg.c net.c @@ -57,6 +58,7 @@ add_my_executable(client client_main.c client.c add_my_executable(worker worker_main.c worker.c ci.c cmd_line.c + command.c file.c git.c msg.c diff --git a/src/client.c b/src/client.c index 739c612..14eab14 100644 --- a/src/client.c +++ b/src/client.c @@ -47,24 +47,31 @@ void client_destroy(struct client *client) free(client); } -int client_main(const struct client *client, int argc, char *argv[]) +int client_main(const struct client *client, const char **argv) { - struct msg request = {argc, argv}; - struct msg response; + struct msg *request; + struct msg *response; int ret = 0; - ret = msg_send_and_wait(client->fd, &request, &response); + ret = msg_from_argv(&request, argv); if (ret < 0) return ret; - if (msg_is_error(&response)) { + ret = msg_send_and_wait(client->fd, request, &response); + if (ret < 0) + goto free_request; + + if (msg_is_error(response)) { log_err("Server failed to process the request\n"); ret = -1; goto free_response; } free_response: - msg_free(&response); + msg_free(response); + +free_request: + msg_free(request); return ret; } diff --git a/src/client.h b/src/client.h index 141c6be..5bc3f5c 100644 --- a/src/client.h +++ b/src/client.h @@ -18,6 +18,6 @@ struct client; int client_create(struct client **, const struct settings *); void client_destroy(struct client *); -int client_main(const struct client *, int argc, char *argv[]); +int client_main(const struct client *, const char **argv); #endif diff --git a/src/client_main.c b/src/client_main.c index a895ccf..dd22508 100644 --- a/src/client_main.c +++ b/src/client_main.c @@ -73,7 +73,7 @@ int main(int argc, char *argv[]) if (ret < 0) return ret; - ret = client_main(client, argc - optind, argv + optind); + ret = client_main(client, (const char **)argv + optind); if (ret < 0) goto destroy_client; diff --git a/src/command.c b/src/command.c new file mode 100644 index 0000000..b1204a9 --- /dev/null +++ b/src/command.c @@ -0,0 +1,157 @@ +/* + * Copyright (c) 2023 Egor Tensin <Egor.Tensin@gmail.com> + * This file is part of the "cimple" project. + * For details, see https://github.com/egor-tensin/cimple. + * Distributed under the MIT License. + */ + +#include "command.h" +#include "log.h" +#include "msg.h" + +#include <stdlib.h> +#include <string.h> + +struct command_dispatcher { + struct command_def *defs; + size_t numof_defs; + void *ctx; +}; + +static int copy_def(struct command_def *dest, const struct command_def *src) +{ + dest->name = strdup(src->name); + if (!dest->name) { + log_errno("strdup"); + return -1; + } + dest->processor = src->processor; + return 0; +} + +static void free_def(struct command_def *def) +{ + free(def->name); +} + +static int copy_defs(struct command_def *dest, const struct command_def *src, size_t numof_defs) +{ + size_t numof_copied, i; + int ret = 0; + + for (numof_copied = 0; numof_copied < numof_defs; ++numof_copied) { + ret = copy_def(&dest[numof_copied], &src[numof_copied]); + if (ret < 0) + goto free; + } + + return 0; + +free: + for (i = 0; i < numof_copied; ++i) + free_def(&dest[numof_copied]); + + return -1; +} + +static void free_defs(struct command_def *defs, size_t numof_defs) +{ + size_t i; + + for (i = 0; i < numof_defs; ++i) + free_def(&defs[i]); +} + +int command_dispatcher_create(struct command_dispatcher **_dispatcher, struct command_def *defs, + size_t numof_defs, void *ctx) +{ + struct command_dispatcher *dispatcher; + int ret = 0; + + *_dispatcher = malloc(sizeof(struct command_dispatcher)); + if (!*_dispatcher) { + log_errno("malloc"); + return -1; + } + dispatcher = *_dispatcher; + + dispatcher->defs = malloc(sizeof(struct command_def) * numof_defs); + if (!dispatcher->defs) { + log_errno("malloc"); + goto free; + } + dispatcher->numof_defs = numof_defs; + + ret = copy_defs(dispatcher->defs, defs, numof_defs); + if (ret < 0) + goto free_defs; + + dispatcher->ctx = ctx; + return 0; + +free_defs: + free(dispatcher->defs); + +free: + free(dispatcher); + + return -1; +} + +void command_dispatcher_destroy(struct command_dispatcher *dispatcher) +{ + free_defs(dispatcher->defs, dispatcher->numof_defs); + free(dispatcher->defs); + free(dispatcher); +} + +int command_dispatcher_msg_handler(const struct command_dispatcher *dispatcher, int conn_fd, + const struct msg *request) +{ + struct msg *response = NULL; + int ret = 0; + + size_t numof_words = msg_get_length(request); + if (numof_words == 0) + goto unknown; + + const char **words = msg_get_words(request); + + for (size_t i = 0; i < dispatcher->numof_defs; ++i) { + struct command_def *def = &dispatcher->defs[i]; + + if (strcmp(def->name, words[0])) + continue; + + ret = def->processor(conn_fd, request, dispatcher->ctx, &response); + goto exit; + } + +unknown: + log_err("Received an unknown command\n"); + ret = -1; + msg_dump(request); + goto exit; + +exit: + if (ret < 0 && !response) + msg_error(&response); + if (response) + return msg_send(conn_fd, response); + return ret; +} + +int command_dispatcher_conn_handler(int conn_fd, void *_dispatcher) +{ + struct command_dispatcher *dispatcher = (struct command_dispatcher *)_dispatcher; + struct msg *request; + int ret = 0; + + ret = msg_recv(conn_fd, &request); + if (ret < 0) + return ret; + + ret = command_dispatcher_msg_handler(dispatcher, conn_fd, request); + msg_free(request); + return ret; +} diff --git a/src/command.h b/src/command.h new file mode 100644 index 0000000..b4c3b40 --- /dev/null +++ b/src/command.h @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2023 Egor Tensin <Egor.Tensin@gmail.com> + * This file is part of the "cimple" project. + * For details, see https://github.com/egor-tensin/cimple. + * Distributed under the MIT License. + */ + +#ifndef __COMMAND_H__ +#define __COMMAND_H__ + +#include "msg.h" + +#include <stdlib.h> + +typedef int (*command_processor)(int conn_fd, const struct msg *request, void *ctx, + struct msg **response); + +struct command_def { + char *name; + command_processor processor; +}; + +struct command_dispatcher; + +int command_dispatcher_create(struct command_dispatcher **, struct command_def *, size_t numof_defs, + void *ctx); +void command_dispatcher_destroy(struct command_dispatcher *); + +int command_dispatcher_msg_handler(const struct command_dispatcher *, int conn_fd, + const struct msg *); + +/* This is supposed to be used as an argument to tcp_server_accept. */ +int command_dispatcher_conn_handler(int conn_fd, void *dispatcher); + +#endif @@ -13,13 +13,28 @@ #include <stdlib.h> #include <string.h> -int msg_success(struct msg *msg) +struct msg { + size_t argc; + const char **argv; +}; + +size_t msg_get_length(const struct msg *msg) +{ + return msg->argc; +} + +const char **msg_get_words(const struct msg *msg) +{ + return msg->argv; +} + +int msg_success(struct msg **msg) { const char *argv[] = {"success", NULL}; return msg_from_argv(msg, argv); } -int msg_error(struct msg *msg) +int msg_error(struct msg **msg) { const char *argv[] = {"error", NULL}; return msg_from_argv(msg, argv); @@ -37,82 +52,107 @@ int msg_is_error(const struct msg *msg) static int msg_copy_argv(struct msg *msg, const char **argv) { - msg->argv = calloc(msg->argc, sizeof(const char *)); + size_t copied = 0; + msg->argv = calloc(msg->argc + 1, sizeof(const char *)); if (!msg->argv) { log_errno("calloc"); return -1; } - for (int i = 0; i < msg->argc; ++i) { - msg->argv[i] = strdup(argv[i]); - if (!msg->argv[i]) { + for (copied = 0; copied < msg->argc; ++copied) { + msg->argv[copied] = strdup(argv[copied]); + if (!msg->argv[copied]) { log_errno("strdup"); - goto free; + goto free_copied; } } return 0; -free: - msg_free(msg); +free_copied: + for (size_t i = 0; i < copied; ++i) { + free((char *)msg->argv[i]); + } + + free(msg->argv); return -1; } -struct msg *msg_copy(const struct msg *src) +int msg_copy(struct msg **_dest, const struct msg *src) { - struct msg *dest; + struct msg *dest = NULL; int ret = 0; - dest = malloc(sizeof(*dest)); - if (!dest) { - log_errno("calloc"); - return NULL; + *_dest = malloc(sizeof(struct msg)); + if (!_dest) { + log_errno("malloc"); + return -1; } + dest = *_dest; + dest->argc = src->argc; ret = msg_copy_argv(dest, (const char **)src->argv); if (ret < 0) goto free; - return dest; + return 0; free: free(dest); - return NULL; + return -1; } -void msg_free(const struct msg *msg) +void msg_free(struct msg *msg) { - for (int i = 0; i < msg->argc; ++i) + for (size_t i = 0; i < msg->argc; ++i) free((char *)msg->argv[i]); free(msg->argv); + free(msg); } -int msg_from_argv(struct msg *msg, const char **argv) +int msg_from_argv(struct msg **_msg, const char **argv) { - int argc = 0; + struct msg *msg = NULL; + int ret = 0; + + *_msg = malloc(sizeof(struct msg)); + if (!*_msg) { + log_errno("malloc"); + return -1; + } + msg = *_msg; + msg->argc = 0; for (const char **s = argv; *s; ++s) - ++argc; + ++msg->argc; + + ret = msg_copy_argv(msg, argv); + if (ret < 0) + goto free; + + return 0; + +free: + free(msg); - msg->argc = argc; - return msg_copy_argv(msg, argv); + return -1; } static uint32_t calc_buf_size(const struct msg *msg) { uint32_t len = 0; - for (int i = 0; i < msg->argc; ++i) + for (size_t i = 0; i < msg->argc; ++i) len += strlen(msg->argv[i]) + 1; return len; } -static int calc_argv_len(const void *buf, size_t len) +static size_t calc_argv_len(const void *buf, size_t len) { - int argc = 0; + size_t argc = 0; for (const char *it = buf; it < (const char *)buf + len; it += strlen(it) + 1) ++argc; return argc; @@ -120,7 +160,7 @@ static int calc_argv_len(const void *buf, size_t len) static void argv_pack(char *dest, const struct msg *msg) { - for (int i = 0; i < msg->argc; ++i) { + for (size_t i = 0; i < msg->argc; ++i) { strcpy(dest, msg->argv[i]); dest += strlen(msg->argv[i]) + 1; } @@ -128,28 +168,31 @@ static void argv_pack(char *dest, const struct msg *msg) static int argv_unpack(struct msg *msg, const char *src) { - msg->argv = calloc(msg->argc, sizeof(char *)); + size_t copied = 0; + + msg->argv = calloc(msg->argc + 1, sizeof(const char *)); if (!msg->argv) { log_errno("calloc"); return -1; } - for (int i = 0; i < msg->argc; ++i) { - size_t len = strlen(src); - - msg->argv[i] = malloc(len + 1); - if (!msg->argv[i]) { - log_errno("malloc"); + for (copied = 0; copied < msg->argc; ++copied) { + msg->argv[copied] = strdup(src); + if (!msg->argv[copied]) { + log_errno("strdup"); goto free; } - strcpy(msg->argv[i], src); - src += len + 1; + src += strlen(msg->argv[copied]) + 1; } return 0; free: + for (size_t i = 0; i < copied; ++i) { + free((char *)msg->argv[i]); + } + msg_free(msg); return -1; @@ -185,7 +228,7 @@ free_data: return ret; } -int msg_send_and_wait(int fd, const struct msg *request, struct msg *response) +int msg_send_and_wait(int fd, const struct msg *request, struct msg **response) { int ret = 0; @@ -200,20 +243,34 @@ int msg_send_and_wait(int fd, const struct msg *request, struct msg *response) return ret; } -int msg_recv(int fd, struct msg *msg) +int msg_recv(int fd, struct msg **_msg) { - struct buf *buf; + struct msg *msg = NULL; + struct buf *buf = NULL; int ret = 0; ret = net_recv_buf(fd, &buf); if (ret < 0) return ret; + *_msg = malloc(sizeof(struct msg)); + if (!*_msg) { + log_errno("malloc"); + ret = -1; + goto destroy_buf; + } + msg = *_msg; + msg->argc = calc_argv_len(buf_get_data(buf), buf_get_size(buf)); ret = argv_unpack(msg, buf_get_data(buf)); if (ret < 0) - goto destroy_buf; + goto free_msg; + + goto destroy_buf; + +free_msg: + free(msg); destroy_buf: buf_destroy(buf); @@ -223,7 +280,7 @@ destroy_buf: void msg_dump(const struct msg *msg) { - log("Message[%d]:\n", msg->argc); - for (int i = 0; i < msg->argc; ++i) + log("Message[%zu]:\n", msg->argc); + for (size_t i = 0; i < msg->argc; ++i) log("\t%s\n", msg->argv[i]); } @@ -8,26 +8,28 @@ #ifndef __MSG_H__ #define __MSG_H__ -struct msg { - int argc; - char **argv; -}; +#include <stdlib.h> -int msg_success(struct msg *); -int msg_error(struct msg *); +struct msg; + +size_t msg_get_length(const struct msg *); +const char **msg_get_words(const struct msg *); + +int msg_success(struct msg **); +int msg_error(struct msg **); int msg_is_success(const struct msg *); int msg_is_error(const struct msg *); -struct msg *msg_copy(const struct msg *); -void msg_free(const struct msg *); +int msg_copy(struct msg **, const struct msg *); +void msg_free(struct msg *); -int msg_from_argv(struct msg *, const char **argv); +int msg_from_argv(struct msg **, const char **argv); -int msg_recv(int fd, struct msg *); +int msg_recv(int fd, struct msg **); int msg_send(int fd, const struct msg *); -int msg_send_and_wait(int fd, const struct msg *, struct msg *response); +int msg_send_and_wait(int fd, const struct msg *, struct msg **response); void msg_dump(const struct msg *); diff --git a/src/server.c b/src/server.c index 22a9a86..806da11 100644 --- a/src/server.c +++ b/src/server.c @@ -7,6 +7,7 @@ #include "server.h" #include "ci_queue.h" +#include "command.h" #include "compiler.h" #include "const.h" #include "log.h" @@ -18,7 +19,6 @@ #include <pthread.h> #include <stdlib.h> -#include <string.h> struct server { pthread_mutex_t server_mtx; @@ -111,7 +111,7 @@ static int server_has_runs(const struct server *server) static int worker_ci_run(int fd, const struct ci_queue_entry *ci_run) { - struct msg request, response; + struct msg *request, *response; int ret = 0; const char *argv[] = {CMD_CI_RUN, ci_queue_entry_get_url(ci_run), @@ -121,21 +121,21 @@ static int worker_ci_run(int fd, const struct ci_queue_entry *ci_run) if (ret < 0) return ret; - ret = msg_send_and_wait(fd, &request, &response); - msg_free(&request); + ret = msg_send_and_wait(fd, request, &response); + msg_free(request); if (ret < 0) return ret; - if (response.argc < 0) { + if (!msg_is_success(response)) { log_err("Failed to schedule a CI run: worker is busy?\n"); - ret = -1; + msg_dump(response); goto free_response; } /* TODO: handle the response. */ free_response: - msg_free(&response); + msg_free(response); return ret; } @@ -234,15 +234,10 @@ static int worker_thread(struct server *server, int fd) return ret; } -static int msg_new_worker_handler(struct server *server, int client_fd, - UNUSED const struct msg *request) +static int msg_new_worker_handler(int client_fd, UNUSED const struct msg *request, void *_server, + UNUSED struct msg **response) { - return worker_thread(server, client_fd); -} - -static int msg_new_worker_parser(UNUSED const struct msg *msg) -{ - return 1; + return worker_thread((struct server *)_server, client_fd); } static int msg_ci_run_queue(struct server *server, const char *url, const char *rev) @@ -276,87 +271,33 @@ unlock: return ret; } -static int msg_ci_run_handler(struct server *server, int client_fd, const struct msg *msg) +static int msg_ci_run_handler(UNUSED int client_fd, const struct msg *request, void *_server, + struct msg **response) { - struct msg response; + struct server *server = (struct server *)_server; int ret = 0; - ret = msg_ci_run_queue(server, msg->argv[1], msg->argv[2]); - if (ret < 0) - ret = msg_error(&response); - else - ret = msg_success(&response); - - if (ret < 0) - return ret; - - ret = msg_send(client_fd, &response); - msg_free(&response); - return ret; -} - -static int msg_ci_run_parser(const struct msg *msg) -{ - if (msg->argc != 3) { - log_err("Invalid number of arguments for a message: %d\n", msg->argc); - return 0; - } - - return 1; -} - -typedef int (*msg_parser)(const struct msg *msg); -typedef int (*msg_handler)(struct server *, int client_fd, const struct msg *msg); - -struct msg_descr { - const char *cmd; - msg_parser parser; - msg_handler handler; -}; - -struct msg_descr messages[] = { - {CMD_NEW_WORKER, msg_new_worker_parser, msg_new_worker_handler}, - {CMD_CI_RUN, msg_ci_run_parser, msg_ci_run_handler}, -}; - -static int server_msg_handler(struct server *server, int client_fd, const struct msg *request) -{ - if (request->argc == 0) - goto unknown_request; - - size_t numof_messages = sizeof(messages) / sizeof(messages[0]); - - for (size_t i = 0; i < numof_messages; ++i) { - if (strcmp(messages[i].cmd, request->argv[0])) - continue; - if (!messages[i].parser(request)) - continue; - return messages[i].handler(server, client_fd, request); + if (msg_get_length(request) != 3) { + log_err("Invalid number of arguments for a message: %zu\n", + msg_get_length(request)); + msg_dump(request); + return -1; } -unknown_request: - log_err("Received an unknown message\n"); - msg_dump(request); - struct msg response; - msg_error(&response); - return msg_send(client_fd, &response); -} - -static int server_conn_handler(int client_fd, void *_server) -{ - struct server *server = (struct server *)_server; - struct msg request; - int ret = 0; + const char **words = msg_get_words(request); - ret = msg_recv(client_fd, &request); + ret = msg_ci_run_queue(server, words[1], words[2]); if (ret < 0) return ret; - ret = server_msg_handler(server, client_fd, &request); - msg_free(&request); - return ret; + return msg_success(response); } +static struct command_def commands[] = { + {CMD_NEW_WORKER, msg_new_worker_handler}, + {CMD_CI_RUN, msg_ci_run_handler}, +}; + static int server_set_stopping(struct server *server) { int ret = 0; @@ -383,24 +324,32 @@ unlock: int server_main(struct server *server) { + struct command_dispatcher *dispatcher = NULL; int ret = 0; ret = signal_install_global_handler(); if (ret < 0) return ret; + ret = command_dispatcher_create(&dispatcher, commands, + sizeof(commands) / sizeof(commands[0]), server); + if (ret < 0) + return ret; + while (!global_stop_flag) { log("Waiting for new connections\n"); - ret = tcp_server_accept(server->tcp_server, server_conn_handler, server); + ret = tcp_server_accept(server->tcp_server, command_dispatcher_conn_handler, + dispatcher); if (ret < 0) { - if (errno == EINVAL && global_stop_flag) { + if (errno == EINVAL && global_stop_flag) ret = 0; - break; - } - return ret; + goto dispatcher_destroy; } } +dispatcher_destroy: + command_dispatcher_destroy(dispatcher); + return server_set_stopping(server); } diff --git a/src/worker.c b/src/worker.c index ed3d7e5..ae9a578 100644 --- a/src/worker.c +++ b/src/worker.c @@ -7,6 +7,7 @@ #include "worker.h" #include "ci.h" +#include "command.h" #include "compiler.h" #include "const.h" #include "git.h" @@ -16,8 +17,8 @@ #include "process.h" #include "signal.h" +#include <pthread.h> #include <stdlib.h> -#include <string.h> #include <unistd.h> struct worker { @@ -73,20 +74,15 @@ void worker_destroy(struct worker *worker) static int msg_send_new_worker(const struct worker *worker) { static const char *argv[] = {CMD_NEW_WORKER, NULL}; - struct msg msg; + struct msg *msg; int ret = 0; ret = msg_from_argv(&msg, argv); if (ret < 0) return ret; - ret = msg_send(worker->fd, &msg); - if (ret < 0) - goto free_msg; - -free_msg: - msg_free(&msg); - + ret = msg_send(worker->fd, msg); + msg_free(msg); return ret; } @@ -109,77 +105,37 @@ static int msg_ci_run_do(const char *url, const char *rev, struct proc_output *r return 0; } -static int msg_ci_run_handler(struct worker *worker, const struct msg *request) +static int msg_ci_run_handler(UNUSED int conn_fd, const struct msg *request, UNUSED void *_worker, + struct msg **response) { - struct msg response; struct proc_output result; int ret = 0; + if (msg_get_length(request) != 3) { + log_err("Invalid number of arguments for a message\n"); + msg_dump(request); + return -1; + } + + const char **words = msg_get_words(request); + proc_output_init(&result); - ret = msg_ci_run_do(request->argv[1], request->argv[2], &result); + ret = msg_ci_run_do(words[1], words[2], &result); proc_output_free(&result); if (ret < 0) - ret = msg_error(&response); - else - ret = msg_success(&response); - - if (ret < 0) return ret; - ret = msg_send(worker->fd, &response); - msg_free(&response); - return ret; -} - -static int msg_ci_run_parser(const struct msg *msg) -{ - if (msg->argc != 3) { - log_err("Invalid number of arguments for a message: %d\n", msg->argc); - return 0; - } - - return 1; + return msg_success(response); } -typedef int (*msg_parser)(const struct msg *msg); -typedef int (*msg_handler)(struct worker *, const struct msg *); - -struct msg_descr { - const char *cmd; - msg_parser parser; - msg_handler handler; -}; - -struct msg_descr messages[] = { - {CMD_CI_RUN, msg_ci_run_parser, msg_ci_run_handler}, +static struct command_def commands[] = { + {CMD_CI_RUN, msg_ci_run_handler}, }; -static int worker_msg_handler(struct worker *worker, const struct msg *request) -{ - if (request->argc == 0) - goto unknown_request; - - size_t numof_messages = sizeof(messages) / sizeof(messages[0]); - - for (size_t i = 0; i < numof_messages; ++i) { - if (strcmp(messages[i].cmd, request->argv[0])) - continue; - if (!messages[i].parser(request)) - continue; - return messages[i].handler(worker, request); - } - -unknown_request: - log_err("Received an unknown message\n"); - msg_dump(request); - struct msg response; - msg_error(&response); - return msg_send(worker->fd, &response); -} - int worker_main(struct worker *worker, UNUSED int argc, UNUSED char *argv[]) { + struct command_dispatcher *dispatcher = NULL; int ret = 0; ret = signal_install_global_handler(); @@ -190,25 +146,31 @@ int worker_main(struct worker *worker, UNUSED int argc, UNUSED char *argv[]) if (ret < 0) return ret; + ret = command_dispatcher_create(&dispatcher, commands, + sizeof(commands) / sizeof(commands[0]), worker); + if (ret < 0) + return ret; + while (!global_stop_flag) { - struct msg request; + struct msg *request; log("Waiting for a new command\n"); ret = msg_recv(worker->fd, &request); if (ret < 0) { - if (errno == EINVAL && global_stop_flag) { + if (errno == EINVAL && global_stop_flag) ret = 0; - break; - } - return ret; + goto dispatcher_destroy; } - ret = worker_msg_handler(worker, &request); - msg_free(&request); + ret = command_dispatcher_msg_handler(dispatcher, worker->fd, request); + msg_free(request); if (ret < 0) - return ret; + goto dispatcher_destroy; } +dispatcher_destroy: + command_dispatcher_destroy(dispatcher); + return ret; } |