diff options
author | Egor Tensin <Egor.Tensin@gmail.com> | 2023-06-12 01:42:08 +0200 |
---|---|---|
committer | Egor Tensin <Egor.Tensin@gmail.com> | 2023-06-13 01:37:08 +0200 |
commit | 48ce9170b057ddd2165b0239a92aede15849f7a3 (patch) | |
tree | c9928af6202081d9521107f1dc0ae362f54a6adc /src/server.c | |
parent | log: refactoring (diff) | |
download | cimple-48ce9170b057ddd2165b0239a92aede15849f7a3.tar.gz cimple-48ce9170b057ddd2165b0239a92aede15849f7a3.zip |
use signalfd to stop on SIGTERM
Is this an overkill? I don't know.
The thing is, correctly intercepting SIGTERM (also SIGINT, etc.) is
incredibly tricky. For example, before this commit, my I/O loops in
server.c and worker.c were inherently racy.
This was immediately obvious if you tried to run the tests. The tests
(especially the Valgrind flavour) would run a worker, wait until it
prints a "Waiting for a new command" line, and try to kill it using
SIGTERM. The problem is, the global_stop_flag check could have already
been executed by the worker, and it would hang forever in recv().
The solution seems to be to use signalfd and select()/poll(). I've never
used either before, but it seems to work well enough - at least the very
same tests pass and don't hang now.
Diffstat (limited to 'src/server.c')
-rw-r--r-- | src/server.c | 261 |
1 files changed, 138 insertions, 123 deletions
diff --git a/src/server.c b/src/server.c index a5319e0..2839a61 100644 --- a/src/server.c +++ b/src/server.c @@ -9,6 +9,7 @@ #include "command.h" #include "compiler.h" #include "const.h" +#include "event_loop.h" #include "log.h" #include "msg.h" #include "run_queue.h" @@ -27,6 +28,11 @@ struct server { int stopping; + struct cmd_dispatcher *cmd_dispatcher; + + struct event_loop *event_loop; + int signalfd; + struct worker_queue worker_queue; struct run_queue run_queue; @@ -67,8 +73,10 @@ static void server_notify(struct server *server) pthread_errno_if(pthread_cond_signal(&server->server_cv), "pthread_cond_signal"); } -static int server_set_stopping(struct server *server) +static int server_set_stopping(UNUSED struct event_loop *loop, UNUSED int fd, UNUSED short revents, + void *_server) { + struct server *server = (struct server *)_server; int ret = 0; ret = server_lock(server); @@ -117,7 +125,7 @@ static int server_enqueue_run(struct server *server, struct run *run) return ret; run_queue_add_last(&server->run_queue, run); - log("Added a new CI run for repository %s to the queue\n", run_get_url(run)); + log("Added a new run for repository %s to the queue\n", run_get_url(run)); server_notify(server); server_unlock(server); @@ -147,25 +155,26 @@ static int server_assign_run(struct server *server) int ret = 0; struct run *run = run_queue_remove_first(&server->run_queue); - log("Removed a CI run for repository %s from the queue\n", run_get_url(run)); + log("Removed run for repository %s from the queue\n", run_get_url(run)); struct worker *worker = worker_queue_remove_first(&server->worker_queue); log("Removed worker %d from the queue\n", worker_get_fd(worker)); const char *argv[] = {CMD_RUN, run_get_url(run), run_get_rev(run), NULL}; - ret = msg_talk_argv(worker_get_fd(worker), argv, NULL); + if (ret < 0) { - /* Failed to communicate with the worker, requeue the run - * and forget about the worker. */ - worker_destroy(worker); + log("Failed to assign run for repository %s to worker %d, requeueing\n", + run_get_url(run), worker_get_fd(worker)); run_queue_add_first(&server->run_queue, run); - return ret; + } else { + log("Assigned run for repository %s to worker %d\n", run_get_url(run), + worker_get_fd(worker)); + run_destroy(run); } - /* Send the run to the worker, forget about both of them for a while. */ worker_destroy(worker); - run_destroy(run); + return ret; } @@ -198,15 +207,89 @@ exit: return NULL; } -int server_create(struct server **_server, const struct settings *settings) +static int handle_cmd_new_worker(UNUSED const struct msg *request, UNUSED struct msg **response, + void *_ctx) { - struct storage_settings storage_settings; + struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx; + struct server *server = (struct server *)ctx->arg; + int client_fd = ctx->fd; + + struct worker *worker = NULL; + int ret = 0; + + ret = worker_create(&worker, client_fd); + if (ret < 0) + return ret; + + ret = server_enqueue_worker(server, worker); + if (ret < 0) + goto destroy_worker; + + return ret; + +destroy_worker: + worker_destroy(worker); + + return ret; +} + +static int handle_cmd_run(const struct msg *request, struct msg **response, void *_ctx) +{ + struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx; + struct server *server = (struct server *)ctx->arg; + struct run *run = NULL; + int ret = 0; - ret = signal_handle_stops(); + ret = run_from_msg(&run, request); if (ret < 0) return ret; + ret = msg_success(response); + if (ret < 0) + goto destroy_run; + + ret = server_enqueue_run(server, run); + if (ret < 0) + goto free_response; + + return ret; + +free_response: + msg_free(*response); + *response = NULL; + +destroy_run: + run_destroy(run); + + return ret; +} + +static int handle_cmd_complete(UNUSED const struct msg *request, UNUSED struct msg **response, + void *_ctx) +{ + struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx; + int client_fd = ctx->fd; + int ret = 0; + + log("Received a \"run complete\" message from worker %d\n", client_fd); + + return ret; +} + +static struct cmd_desc commands[] = { + {CMD_NEW_WORKER, handle_cmd_new_worker}, + {CMD_RUN, handle_cmd_run}, + {CMD_COMPLETE, handle_cmd_complete}, +}; + +static const size_t numof_commands = sizeof(commands) / sizeof(commands[0]); + +int server_create(struct server **_server, const struct settings *settings) +{ + struct storage_settings storage_settings; + int ret = 0; + struct server *server = malloc(sizeof(struct server)); if (!server) { log_errno("malloc"); @@ -227,6 +310,24 @@ int server_create(struct server **_server, const struct settings *settings) server->stopping = 0; + ret = cmd_dispatcher_create(&server->cmd_dispatcher, commands, numof_commands, server); + if (ret < 0) + goto destroy_cv; + + ret = event_loop_create(&server->event_loop); + if (ret < 0) + goto destroy_cmd_dispatcher; + + ret = signalfd_listen_for_stops(); + if (ret < 0) + goto destroy_event_loop; + server->signalfd = ret; + + ret = signalfd_add_to_event_loop(server->signalfd, server->event_loop, server_set_stopping, + server); + if (ret < 0) + goto close_signalfd; + worker_queue_create(&server->worker_queue); run_queue_create(&server->run_queue); @@ -239,10 +340,15 @@ int server_create(struct server **_server, const struct settings *settings) if (ret < 0) goto destroy_run_queue; - ret = tcp_server_create(&server->tcp_server, settings->port); + ret = tcp_server_create(&server->tcp_server, settings->port, cmd_dispatcher_handle_conn, + server->cmd_dispatcher); if (ret < 0) goto destroy_storage; + ret = tcp_server_add_to_event_loop(server->tcp_server, server->event_loop); + if (ret < 0) + goto destroy_tcp_server; + ret = pthread_create(&server->main_thread, NULL, server_main_thread, server); if (ret) { pthread_errno(ret, "pthread_create"); @@ -263,6 +369,16 @@ destroy_run_queue: worker_queue_destroy(&server->worker_queue); +close_signalfd: + signalfd_destroy(server->signalfd); + +destroy_event_loop: + event_loop_destroy(server->event_loop); + +destroy_cmd_dispatcher: + cmd_dispatcher_destroy(server->cmd_dispatcher); + +destroy_cv: pthread_errno_if(pthread_cond_destroy(&server->server_cv), "pthread_cond_destroy"); destroy_mtx: @@ -283,128 +399,27 @@ void server_destroy(struct server *server) storage_destroy(&server->storage); run_queue_destroy(&server->run_queue); worker_queue_destroy(&server->worker_queue); + signalfd_destroy(server->signalfd); + event_loop_destroy(server->event_loop); + cmd_dispatcher_destroy(server->cmd_dispatcher); pthread_errno_if(pthread_cond_destroy(&server->server_cv), "pthread_cond_destroy"); pthread_errno_if(pthread_mutex_destroy(&server->server_mtx), "pthread_mutex_destroy"); free(server); } -static int handle_cmd_new_worker(UNUSED const struct msg *request, UNUSED struct msg **response, - void *_ctx) -{ - struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx; - struct server *server = (struct server *)ctx->arg; - int client_fd = ctx->fd; - - struct worker *worker = NULL; - int ret = 0; - - ret = worker_create(&worker, client_fd); - if (ret < 0) - return ret; - - ret = server_enqueue_worker(server, worker); - if (ret < 0) - goto destroy_worker; - - return ret; - -destroy_worker: - worker_destroy(worker); - - return ret; -} - -static int handle_cmd_run(const struct msg *request, struct msg **response, void *_ctx) -{ - struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx; - struct server *server = (struct server *)ctx->arg; - struct run *run = NULL; - - int ret = 0; - - ret = run_from_msg(&run, request); - if (ret < 0) - return ret; - - ret = msg_success(response); - if (ret < 0) - goto destroy_run; - - ret = server_enqueue_run(server, run); - if (ret < 0) - goto free_response; - - return ret; - -free_response: - msg_free(*response); - -destroy_run: - run_destroy(run); - - return ret; -} - -static int handle_cmd_complete(UNUSED const struct msg *request, UNUSED struct msg **response, - void *_ctx) -{ - struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx; - struct server *server = (struct server *)ctx->arg; - int client_fd = ctx->fd; - - struct worker *worker = NULL; - int ret = 0; - - log("Received a \"run complete\" message from worker %d\n", client_fd); - - ret = worker_create(&worker, client_fd); - if (ret < 0) - return ret; - - ret = server_enqueue_worker(server, worker); - if (ret < 0) - goto destroy_worker; - - return ret; - -destroy_worker: - worker_destroy(worker); - - return 0; -} - -static struct cmd_desc commands[] = { - {CMD_NEW_WORKER, handle_cmd_new_worker}, - {CMD_RUN, handle_cmd_run}, - {CMD_COMPLETE, handle_cmd_complete}, -}; - -static const size_t numof_commands = sizeof(commands) / sizeof(commands[0]); - static int server_listen_thread(struct server *server) { - struct cmd_dispatcher *dispatcher = NULL; int ret = 0; - ret = cmd_dispatcher_create(&dispatcher, commands, numof_commands, server); - if (ret < 0) - return ret; - - while (!global_stop_flag) { + while (!server->stopping) { log("Waiting for new connections\n"); - ret = tcp_server_accept(server->tcp_server, cmd_dispatcher_handle_conn, dispatcher); - if (ret < 0) { - if ((errno == EINTR || errno == EINVAL) && global_stop_flag) - ret = 0; - goto dispatcher_destroy; - } + ret = event_loop_run(server->event_loop); + if (ret < 0) + return ret; } -dispatcher_destroy: - cmd_dispatcher_destroy(dispatcher); - - return server_set_stopping(server); + return 0; } int server_main(struct server *server) |