From 48ce9170b057ddd2165b0239a92aede15849f7a3 Mon Sep 17 00:00:00 2001 From: Egor Tensin Date: Mon, 12 Jun 2023 01:42:08 +0200 Subject: use signalfd to stop on SIGTERM Is this an overkill? I don't know. The thing is, correctly intercepting SIGTERM (also SIGINT, etc.) is incredibly tricky. For example, before this commit, my I/O loops in server.c and worker.c were inherently racy. This was immediately obvious if you tried to run the tests. The tests (especially the Valgrind flavour) would run a worker, wait until it prints a "Waiting for a new command" line, and try to kill it using SIGTERM. The problem is, the global_stop_flag check could have already been executed by the worker, and it would hang forever in recv(). The solution seems to be to use signalfd and select()/poll(). I've never used either before, but it seems to work well enough - at least the very same tests pass and don't hang now. --- src/worker.c | 208 ++++++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 140 insertions(+), 68 deletions(-) (limited to 'src/worker.c') diff --git a/src/worker.c b/src/worker.c index d7ad310..8ecaed8 100644 --- a/src/worker.c +++ b/src/worker.c @@ -10,74 +10,78 @@ #include "command.h" #include "compiler.h" #include "const.h" +#include "event_loop.h" #include "git.h" #include "log.h" #include "msg.h" +#include "net.h" #include "process.h" #include "run_queue.h" #include "signal.h" #include +#include struct worker { - int dummy; -}; + struct settings *settings; -int worker_create(struct worker **_worker) -{ - int ret = 0; + int stopping; - ret = signal_handle_stops(); - if (ret < 0) - return ret; + struct cmd_dispatcher *cmd_dispatcher; - struct worker *worker = malloc(sizeof(struct worker)); - if (!worker) { + struct event_loop *event_loop; + int signalfd; +}; + +static struct settings *worker_settings_copy(const struct settings *src) +{ + struct settings *result = malloc(sizeof(*src)); + if (!result) { log_errno("malloc"); - return -1; + return NULL; } - ret = libgit_init(); - if (ret < 0) - goto free; - - *_worker = worker; - return ret; + result->host = strdup(src->host); + if (!result->host) { + log_errno("strdup"); + goto free_result; + } -free: - free(worker); + result->port = strdup(src->port); + if (!result->port) { + log_errno("strdup"); + goto free_host; + } - return ret; -} + return result; -void worker_destroy(struct worker *worker) -{ - log("Shutting down\n"); +free_host: + free(result->host); - libgit_shutdown(); - free(worker); -} +free_result: + free(result); -static int worker_send_to_server(const struct settings *settings, const struct msg *request, - struct msg **response) -{ - return msg_connect_and_talk(settings->host, settings->port, request, response); + return NULL; } -static int worker_send_to_server_argv(const struct settings *settings, const char **argv, - struct msg **response) +static void worker_settings_destroy(struct settings *settings) { - return msg_connect_and_talk_argv(settings->host, settings->port, argv, response); + free(settings->port); + free(settings->host); + free(settings); } -static int worker_send_new_worker(const struct settings *settings, struct msg **task) +static int worker_set_stopping(UNUSED struct event_loop *loop, UNUSED int fd, UNUSED short revents, + void *_worker) { - static const char *argv[] = {CMD_NEW_WORKER, NULL}; - return worker_send_to_server_argv(settings, argv, task); + struct worker *worker = (struct worker *)_worker; + worker->stopping = 1; + return 0; } -static int msg_run_handler(const struct msg *request, struct msg **response, UNUSED void *ctx) +static int worker_handle_run(const struct msg *request, UNUSED struct msg **response, void *_ctx) { + struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx; struct run *run = NULL; struct proc_output result; int ret = 0; @@ -96,11 +100,17 @@ static int msg_run_handler(const struct msg *request, struct msg **response, UNU proc_output_dump(&result); + struct worker *worker = (struct worker *)ctx->arg; static const char *argv[] = {CMD_COMPLETE, NULL}; - ret = msg_from_argv(response, argv); + + ret = msg_connect_and_talk_argv(worker->settings->host, worker->settings->port, argv, NULL); if (ret < 0) goto free_output; + /* Close the descriptor and remove it from the event loop. + * poll(2) is too confusing, honestly. */ + ret = EVENT_LOOP_REMOVE; + free_output: proc_output_free(&result); @@ -110,49 +120,111 @@ free_output: } static struct cmd_desc commands[] = { - {CMD_RUN, msg_run_handler}, + {CMD_RUN, worker_handle_run}, }; static const size_t numof_commands = sizeof(commands) / sizeof(commands[0]); -int worker_main(UNUSED struct worker *worker, const struct settings *settings) +int worker_create(struct worker **_worker, const struct settings *settings) { - struct msg *task = NULL; - struct cmd_dispatcher *dispatcher = NULL; int ret = 0; - ret = cmd_dispatcher_create(&dispatcher, commands, numof_commands, NULL); + struct worker *worker = malloc(sizeof(struct worker)); + if (!worker) { + log_errno("malloc"); + return -1; + } + + worker->settings = worker_settings_copy(settings); + if (!worker->settings) { + ret = -1; + goto free; + } + + worker->stopping = 0; + + ret = cmd_dispatcher_create(&worker->cmd_dispatcher, commands, numof_commands, worker); if (ret < 0) - return ret; + goto free_settings; - log("Waiting for a new command\n"); + ret = event_loop_create(&worker->event_loop); + if (ret < 0) + goto destroy_cmd_dispatcher; - ret = worker_send_new_worker(settings, &task); - if (ret < 0) { - if ((errno == EINTR || errno == EINVAL) && global_stop_flag) - ret = 0; - goto dispatcher_destroy; - } + ret = signalfd_listen_for_stops(); + if (ret < 0) + goto destroy_event_loop; + worker->signalfd = ret; + + ret = signalfd_add_to_event_loop(worker->signalfd, worker->event_loop, worker_set_stopping, + worker); + if (ret < 0) + goto close_signalfd; + + ret = libgit_init(); + if (ret < 0) + goto close_signalfd; + + *_worker = worker; + return ret; + +close_signalfd: + signalfd_destroy(worker->signalfd); + +destroy_event_loop: + event_loop_destroy(worker->event_loop); + +destroy_cmd_dispatcher: + cmd_dispatcher_destroy(worker->cmd_dispatcher); + +free_settings: + worker_settings_destroy(worker->settings); + +free: + free(worker); + + return ret; +} - while (!global_stop_flag) { - struct msg *result = NULL; +void worker_destroy(struct worker *worker) +{ + log("Shutting down\n"); - ret = cmd_dispatcher_handle(dispatcher, task, &result); - msg_free(task); + libgit_shutdown(); + signalfd_destroy(worker->signalfd); + event_loop_destroy(worker->event_loop); + cmd_dispatcher_destroy(worker->cmd_dispatcher); + worker_settings_destroy(worker->settings); + free(worker); +} + +int worker_main(struct worker *worker) +{ + int ret = 0; + + while (!worker->stopping) { + ret = net_connect(worker->settings->host, worker->settings->port); if (ret < 0) - goto dispatcher_destroy; - - ret = worker_send_to_server(settings, result, &task); - msg_free(result); - if (ret < 0) { - if ((errno == EINTR || errno == EINVAL) && global_stop_flag) - ret = 0; - goto dispatcher_destroy; - } - } + return ret; + + const int fd = ret; + static const char *argv[] = {CMD_NEW_WORKER, NULL}; + + ret = msg_send_argv(fd, argv); + if (ret < 0) + return ret; -dispatcher_destroy: - cmd_dispatcher_destroy(dispatcher); + ret = cmd_dispatcher_add_to_event_loop(worker->cmd_dispatcher, worker->event_loop, + fd); + if (ret < 0) + return ret; + + log("Waiting for a new command\n"); + + ret = event_loop_run(worker->event_loop); + if (ret < 0) + return ret; + } return ret; } -- cgit v1.2.3