diff options
author | Egor Tensin <Egor.Tensin@gmail.com> | 2023-06-12 01:42:08 +0200 |
---|---|---|
committer | Egor Tensin <Egor.Tensin@gmail.com> | 2023-06-13 01:37:08 +0200 |
commit | 48ce9170b057ddd2165b0239a92aede15849f7a3 (patch) | |
tree | c9928af6202081d9521107f1dc0ae362f54a6adc | |
parent | log: refactoring (diff) | |
download | cimple-48ce9170b057ddd2165b0239a92aede15849f7a3.tar.gz cimple-48ce9170b057ddd2165b0239a92aede15849f7a3.zip |
use signalfd to stop on SIGTERM
Is this an overkill? I don't know.
The thing is, correctly intercepting SIGTERM (also SIGINT, etc.) is
incredibly tricky. For example, before this commit, my I/O loops in
server.c and worker.c were inherently racy.
This was immediately obvious if you tried to run the tests. The tests
(especially the Valgrind flavour) would run a worker, wait until it
prints a "Waiting for a new command" line, and try to kill it using
SIGTERM. The problem is, the global_stop_flag check could have already
been executed by the worker, and it would hang forever in recv().
The solution seems to be to use signalfd and select()/poll(). I've never
used either before, but it seems to work well enough - at least the very
same tests pass and don't hang now.
-rw-r--r-- | src/CMakeLists.txt | 6 | ||||
-rw-r--r-- | src/command.c | 79 | ||||
-rw-r--r-- | src/command.h | 3 | ||||
-rw-r--r-- | src/event_loop.c | 204 | ||||
-rw-r--r-- | src/event_loop.h | 36 | ||||
-rw-r--r-- | src/server.c | 261 | ||||
-rw-r--r-- | src/signal.c | 57 | ||||
-rw-r--r-- | src/signal.h | 9 | ||||
-rw-r--r-- | src/string.c | 25 | ||||
-rw-r--r-- | src/string.h | 13 | ||||
-rw-r--r-- | src/tcp_server.c | 76 | ||||
-rw-r--r-- | src/tcp_server.h | 11 | ||||
-rw-r--r-- | src/worker.c | 208 | ||||
-rw-r--r-- | src/worker.h | 4 | ||||
-rw-r--r-- | src/worker_main.c | 4 |
15 files changed, 766 insertions, 230 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index af6e9fb..fb94b3a 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -39,6 +39,7 @@ generate_sql_header(sqlite) add_my_executable(server server_main.c server.c cmd_line.c command.c + event_loop.c file.c msg.c net.c @@ -48,6 +49,7 @@ add_my_executable(server server_main.c server.c sqlite.c storage.c storage_sqlite.c + string.c tcp_server.c worker_queue.c) target_link_libraries(server PRIVATE pthread sqlite3) @@ -63,11 +65,13 @@ add_my_executable(worker worker_main.c worker.c ci.c cmd_line.c command.c + event_loop.c file.c git.c msg.c net.c process.c run_queue.c - signal.c) + signal.c + string.c) target_link_libraries(worker PRIVATE git2 pthread) diff --git a/src/command.c b/src/command.c index bd58f05..6306f7e 100644 --- a/src/command.c +++ b/src/command.c @@ -6,6 +6,8 @@ */ #include "command.h" +#include "compiler.h" +#include "event_loop.h" #include "log.h" #include "msg.h" @@ -128,20 +130,29 @@ int cmd_dispatcher_handle(const struct cmd_dispatcher *dispatcher, const struct return cmd_dispatcher_handle_internal(dispatcher, command, result, dispatcher->ctx); } +static struct cmd_conn_ctx *make_conn_ctx(int fd, void *arg) +{ + struct cmd_conn_ctx *ctx = malloc(sizeof(struct cmd_conn_ctx)); + if (!ctx) { + log_errno("malloc"); + return NULL; + } + + ctx->fd = fd; + ctx->arg = arg; + + return ctx; +} + int cmd_dispatcher_handle_conn(int conn_fd, void *_dispatcher) { struct cmd_dispatcher *dispatcher = (struct cmd_dispatcher *)_dispatcher; struct msg *request = NULL, *response = NULL; int ret = 0; - struct cmd_conn_ctx *new_ctx = malloc(sizeof(struct cmd_conn_ctx)); - if (!new_ctx) { - log_errno("malloc"); + struct cmd_conn_ctx *new_ctx = make_conn_ctx(conn_fd, dispatcher->ctx); + if (!new_ctx) return -1; - } - - new_ctx->fd = conn_fd; - new_ctx->arg = dispatcher->ctx; ret = msg_recv(conn_fd, &request); if (ret < 0) @@ -168,3 +179,57 @@ free_ctx: return ret; } + +static int cmd_dispatcher_handle_event(UNUSED struct event_loop *loop, int fd, short revents, + void *_dispatcher) +{ + struct cmd_dispatcher *dispatcher = (struct cmd_dispatcher *)_dispatcher; + struct msg *request = NULL, *response = NULL; + int ret = 0; + + if (!(revents & POLLIN)) { + log_err("Descriptor %d is not readable\n", fd); + return -1; + } + + struct cmd_conn_ctx *new_ctx = make_conn_ctx(fd, dispatcher->ctx); + if (!new_ctx) + return -1; + + ret = msg_recv(fd, &request); + if (ret < 0) + goto free_ctx; + + ret = cmd_dispatcher_handle_internal(dispatcher, request, &response, new_ctx); + if (ret < 0) + goto free_response; + + if (response) { + ret = msg_send(fd, response); + if (ret < 0) + goto free_response; + } + +free_response: + if (response) + msg_free(response); + + msg_free(request); + +free_ctx: + free(new_ctx); + + return ret; +} + +int cmd_dispatcher_add_to_event_loop(const struct cmd_dispatcher *dispatcher, + struct event_loop *loop, int fd) +{ + struct event_fd entry = { + .fd = fd, + .events = POLLIN, + .handler = cmd_dispatcher_handle_event, + .arg = (void *)dispatcher, + }; + return event_loop_add(loop, &entry); +} diff --git a/src/command.h b/src/command.h index 90facbb..3d13abd 100644 --- a/src/command.h +++ b/src/command.h @@ -8,6 +8,7 @@ #ifndef __COMMAND_H__ #define __COMMAND_H__ +#include "event_loop.h" #include "msg.h" #include <stddef.h> @@ -28,6 +29,8 @@ void cmd_dispatcher_destroy(struct cmd_dispatcher *); int cmd_dispatcher_handle(const struct cmd_dispatcher *, const struct msg *command, struct msg **response); +int cmd_dispatcher_add_to_event_loop(const struct cmd_dispatcher *, struct event_loop *, int fd); + struct cmd_conn_ctx { int fd; void *arg; diff --git a/src/event_loop.c b/src/event_loop.c new file mode 100644 index 0000000..8d91480 --- /dev/null +++ b/src/event_loop.c @@ -0,0 +1,204 @@ +/* + * Copyright (c) 2023 Egor Tensin <Egor.Tensin@gmail.com> + * This file is part of the "cimple" project. + * For details, see https://github.com/egor-tensin/cimple. + * Distributed under the MIT License. + */ + +#include "event_loop.h" +#include "log.h" +#include "net.h" +#include "string.h" + +#include <poll.h> +#include <stddef.h> +#include <stdlib.h> +#include <sys/queue.h> + +SIMPLEQ_HEAD(event_fd_queue, event_fd); + +static struct event_fd *event_fd_copy(const struct event_fd *src) +{ + struct event_fd *res = malloc(sizeof(*src)); + if (!res) { + log_errno("malloc"); + return NULL; + } + *res = *src; + return res; +} + +static void event_fd_destroy(struct event_fd *entry) +{ + free(entry); +} + +static void event_fd_queue_create(struct event_fd_queue *queue) +{ + SIMPLEQ_INIT(queue); +} + +static void event_fd_queue_destroy(struct event_fd_queue *queue) +{ + struct event_fd *entry1 = SIMPLEQ_FIRST(queue); + while (entry1) { + struct event_fd *entry2 = SIMPLEQ_NEXT(entry1, entries); + event_fd_destroy(entry1); + entry1 = entry2; + } + SIMPLEQ_INIT(queue); +} + +struct event_loop { + nfds_t nfds; + struct event_fd_queue entries; +}; + +int event_loop_create(struct event_loop **_loop) +{ + int ret = 0; + + struct event_loop *loop = calloc(1, sizeof(struct event_loop)); + if (!loop) { + log_errno("calloc"); + return -1; + } + *_loop = loop; + + event_fd_queue_create(&loop->entries); + + return ret; +} + +void event_loop_destroy(struct event_loop *loop) +{ + event_fd_queue_destroy(&loop->entries); + free(loop); +} + +int event_loop_add(struct event_loop *loop, const struct event_fd *entry) +{ + log("Adding descriptor %d to event loop\n", entry->fd); + + struct event_fd *copied = event_fd_copy(entry); + if (!copied) + return -1; + + nfds_t nfds = loop->nfds + 1; + SIMPLEQ_INSERT_TAIL(&loop->entries, copied, entries); + loop->nfds = nfds; + + return 0; +} + +static void event_loop_remove(struct event_loop *loop, struct event_fd *entry) +{ + log("Removing descriptor %d from event loop\n", entry->fd); + + SIMPLEQ_REMOVE(&loop->entries, entry, event_fd, entries); + net_close(entry->fd); + event_fd_destroy(entry); + --loop->nfds; +} + +static char *append_event(char *buf, size_t sz, char *ptr, const char *event) +{ + if (ptr > buf) + ptr = stpecpy(ptr, buf + sz, ","); + return stpecpy(ptr, buf + sz, event); +} + +static char *events_to_string(short events) +{ + const size_t sz = 128; + char *buf = calloc(1, sz); + if (!buf) + return NULL; + + char *ptr = buf; + + if (events & POLLNVAL) + ptr = append_event(buf, sz, ptr, "POLLNVAL"); + if (events & POLLERR) + ptr = append_event(buf, sz, ptr, "POLLERR"); + if (events & POLLHUP) + ptr = append_event(buf, sz, ptr, "POLLHUP"); + if (events & POLLIN) + ptr = append_event(buf, sz, ptr, "POLLIN"); + if (events & POLLOUT) + ptr = append_event(buf, sz, ptr, "POLLOUT"); + + return buf; +} + +static struct pollfd *make_pollfds(const struct event_loop *loop) +{ + struct pollfd *fds = calloc(loop->nfds, sizeof(struct pollfd)); + if (!fds) { + log_errno("calloc"); + return NULL; + } + + struct event_fd *entry = SIMPLEQ_FIRST(&loop->entries); + for (nfds_t i = 0; i < loop->nfds; ++i, entry = SIMPLEQ_NEXT(entry, entries)) { + fds[i].fd = entry->fd; + fds[i].events = entry->events; + } + + log("Descriptors:\n"); + for (nfds_t i = 0; i < loop->nfds; ++i) { + char *events = events_to_string(fds[i].events); + log(" %d (%s)\n", fds[i].fd, events ? events : ""); + free(events); + } + + return fds; +} + +int event_loop_run(struct event_loop *loop) +{ + struct pollfd *fds = make_pollfds(loop); + if (!fds) + return -1; + + int ret = poll(fds, loop->nfds, -1); + if (ret < 0) { + log_errno("poll"); + return ret; + } + ret = 0; + + struct event_fd *entry = SIMPLEQ_FIRST(&loop->entries); + for (nfds_t i = 0; i < loop->nfds; ++i) { + struct event_fd *next = SIMPLEQ_NEXT(entry, entries); + + if (!fds[i].revents) + goto next; + + char *events = events_to_string(fds[i].revents); + log("Descriptor %d is ready: %s\n", fds[i].fd, events ? events : ""); + free(events); + + /* Execute all handlers but notice if any of them fail. */ + const int handler_ret = entry->handler(loop, fds[i].fd, fds[i].revents, entry->arg); + switch (handler_ret) { + case 0: + goto next; + case EVENT_LOOP_REMOVE: + goto remove; + default: + break; + } + + remove: + event_loop_remove(loop, entry); + goto next; + + next: + entry = next; + continue; + } + + free(fds); + return ret; +} diff --git a/src/event_loop.h b/src/event_loop.h new file mode 100644 index 0000000..eb5ff05 --- /dev/null +++ b/src/event_loop.h @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2023 Egor Tensin <Egor.Tensin@gmail.com> + * This file is part of the "cimple" project. + * For details, see https://github.com/egor-tensin/cimple. + * Distributed under the MIT License. + */ + +#ifndef __EVENT_LOOP_H__ +#define __EVENT_LOOP_H__ + +#include <poll.h> +#include <sys/queue.h> + +struct event_loop; + +int event_loop_create(struct event_loop **); +void event_loop_destroy(struct event_loop *); + +int event_loop_run(struct event_loop *); + +#define EVENT_LOOP_REMOVE 1 + +typedef int (*event_loop_handler)(struct event_loop *, int fd, short revents, void *arg); + +struct event_fd { + int fd; + short events; + event_loop_handler handler; + void *arg; + + SIMPLEQ_ENTRY(event_fd) entries; +}; + +int event_loop_add(struct event_loop *, const struct event_fd *); + +#endif diff --git a/src/server.c b/src/server.c index a5319e0..2839a61 100644 --- a/src/server.c +++ b/src/server.c @@ -9,6 +9,7 @@ #include "command.h" #include "compiler.h" #include "const.h" +#include "event_loop.h" #include "log.h" #include "msg.h" #include "run_queue.h" @@ -27,6 +28,11 @@ struct server { int stopping; + struct cmd_dispatcher *cmd_dispatcher; + + struct event_loop *event_loop; + int signalfd; + struct worker_queue worker_queue; struct run_queue run_queue; @@ -67,8 +73,10 @@ static void server_notify(struct server *server) pthread_errno_if(pthread_cond_signal(&server->server_cv), "pthread_cond_signal"); } -static int server_set_stopping(struct server *server) +static int server_set_stopping(UNUSED struct event_loop *loop, UNUSED int fd, UNUSED short revents, + void *_server) { + struct server *server = (struct server *)_server; int ret = 0; ret = server_lock(server); @@ -117,7 +125,7 @@ static int server_enqueue_run(struct server *server, struct run *run) return ret; run_queue_add_last(&server->run_queue, run); - log("Added a new CI run for repository %s to the queue\n", run_get_url(run)); + log("Added a new run for repository %s to the queue\n", run_get_url(run)); server_notify(server); server_unlock(server); @@ -147,25 +155,26 @@ static int server_assign_run(struct server *server) int ret = 0; struct run *run = run_queue_remove_first(&server->run_queue); - log("Removed a CI run for repository %s from the queue\n", run_get_url(run)); + log("Removed run for repository %s from the queue\n", run_get_url(run)); struct worker *worker = worker_queue_remove_first(&server->worker_queue); log("Removed worker %d from the queue\n", worker_get_fd(worker)); const char *argv[] = {CMD_RUN, run_get_url(run), run_get_rev(run), NULL}; - ret = msg_talk_argv(worker_get_fd(worker), argv, NULL); + if (ret < 0) { - /* Failed to communicate with the worker, requeue the run - * and forget about the worker. */ - worker_destroy(worker); + log("Failed to assign run for repository %s to worker %d, requeueing\n", + run_get_url(run), worker_get_fd(worker)); run_queue_add_first(&server->run_queue, run); - return ret; + } else { + log("Assigned run for repository %s to worker %d\n", run_get_url(run), + worker_get_fd(worker)); + run_destroy(run); } - /* Send the run to the worker, forget about both of them for a while. */ worker_destroy(worker); - run_destroy(run); + return ret; } @@ -198,15 +207,89 @@ exit: return NULL; } -int server_create(struct server **_server, const struct settings *settings) +static int handle_cmd_new_worker(UNUSED const struct msg *request, UNUSED struct msg **response, + void *_ctx) { - struct storage_settings storage_settings; + struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx; + struct server *server = (struct server *)ctx->arg; + int client_fd = ctx->fd; + + struct worker *worker = NULL; + int ret = 0; + + ret = worker_create(&worker, client_fd); + if (ret < 0) + return ret; + + ret = server_enqueue_worker(server, worker); + if (ret < 0) + goto destroy_worker; + + return ret; + +destroy_worker: + worker_destroy(worker); + + return ret; +} + +static int handle_cmd_run(const struct msg *request, struct msg **response, void *_ctx) +{ + struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx; + struct server *server = (struct server *)ctx->arg; + struct run *run = NULL; + int ret = 0; - ret = signal_handle_stops(); + ret = run_from_msg(&run, request); if (ret < 0) return ret; + ret = msg_success(response); + if (ret < 0) + goto destroy_run; + + ret = server_enqueue_run(server, run); + if (ret < 0) + goto free_response; + + return ret; + +free_response: + msg_free(*response); + *response = NULL; + +destroy_run: + run_destroy(run); + + return ret; +} + +static int handle_cmd_complete(UNUSED const struct msg *request, UNUSED struct msg **response, + void *_ctx) +{ + struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx; + int client_fd = ctx->fd; + int ret = 0; + + log("Received a \"run complete\" message from worker %d\n", client_fd); + + return ret; +} + +static struct cmd_desc commands[] = { + {CMD_NEW_WORKER, handle_cmd_new_worker}, + {CMD_RUN, handle_cmd_run}, + {CMD_COMPLETE, handle_cmd_complete}, +}; + +static const size_t numof_commands = sizeof(commands) / sizeof(commands[0]); + +int server_create(struct server **_server, const struct settings *settings) +{ + struct storage_settings storage_settings; + int ret = 0; + struct server *server = malloc(sizeof(struct server)); if (!server) { log_errno("malloc"); @@ -227,6 +310,24 @@ int server_create(struct server **_server, const struct settings *settings) server->stopping = 0; + ret = cmd_dispatcher_create(&server->cmd_dispatcher, commands, numof_commands, server); + if (ret < 0) + goto destroy_cv; + + ret = event_loop_create(&server->event_loop); + if (ret < 0) + goto destroy_cmd_dispatcher; + + ret = signalfd_listen_for_stops(); + if (ret < 0) + goto destroy_event_loop; + server->signalfd = ret; + + ret = signalfd_add_to_event_loop(server->signalfd, server->event_loop, server_set_stopping, + server); + if (ret < 0) + goto close_signalfd; + worker_queue_create(&server->worker_queue); run_queue_create(&server->run_queue); @@ -239,10 +340,15 @@ int server_create(struct server **_server, const struct settings *settings) if (ret < 0) goto destroy_run_queue; - ret = tcp_server_create(&server->tcp_server, settings->port); + ret = tcp_server_create(&server->tcp_server, settings->port, cmd_dispatcher_handle_conn, + server->cmd_dispatcher); if (ret < 0) goto destroy_storage; + ret = tcp_server_add_to_event_loop(server->tcp_server, server->event_loop); + if (ret < 0) + goto destroy_tcp_server; + ret = pthread_create(&server->main_thread, NULL, server_main_thread, server); if (ret) { pthread_errno(ret, "pthread_create"); @@ -263,6 +369,16 @@ destroy_run_queue: worker_queue_destroy(&server->worker_queue); +close_signalfd: + signalfd_destroy(server->signalfd); + +destroy_event_loop: + event_loop_destroy(server->event_loop); + +destroy_cmd_dispatcher: + cmd_dispatcher_destroy(server->cmd_dispatcher); + +destroy_cv: pthread_errno_if(pthread_cond_destroy(&server->server_cv), "pthread_cond_destroy"); destroy_mtx: @@ -283,128 +399,27 @@ void server_destroy(struct server *server) storage_destroy(&server->storage); run_queue_destroy(&server->run_queue); worker_queue_destroy(&server->worker_queue); + signalfd_destroy(server->signalfd); + event_loop_destroy(server->event_loop); + cmd_dispatcher_destroy(server->cmd_dispatcher); pthread_errno_if(pthread_cond_destroy(&server->server_cv), "pthread_cond_destroy"); pthread_errno_if(pthread_mutex_destroy(&server->server_mtx), "pthread_mutex_destroy"); free(server); } -static int handle_cmd_new_worker(UNUSED const struct msg *request, UNUSED struct msg **response, - void *_ctx) -{ - struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx; - struct server *server = (struct server *)ctx->arg; - int client_fd = ctx->fd; - - struct worker *worker = NULL; - int ret = 0; - - ret = worker_create(&worker, client_fd); - if (ret < 0) - return ret; - - ret = server_enqueue_worker(server, worker); - if (ret < 0) - goto destroy_worker; - - return ret; - -destroy_worker: - worker_destroy(worker); - - return ret; -} - -static int handle_cmd_run(const struct msg *request, struct msg **response, void *_ctx) -{ - struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx; - struct server *server = (struct server *)ctx->arg; - struct run *run = NULL; - - int ret = 0; - - ret = run_from_msg(&run, request); - if (ret < 0) - return ret; - - ret = msg_success(response); - if (ret < 0) - goto destroy_run; - - ret = server_enqueue_run(server, run); - if (ret < 0) - goto free_response; - - return ret; - -free_response: - msg_free(*response); - -destroy_run: - run_destroy(run); - - return ret; -} - -static int handle_cmd_complete(UNUSED const struct msg *request, UNUSED struct msg **response, - void *_ctx) -{ - struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx; - struct server *server = (struct server *)ctx->arg; - int client_fd = ctx->fd; - - struct worker *worker = NULL; - int ret = 0; - - log("Received a \"run complete\" message from worker %d\n", client_fd); - - ret = worker_create(&worker, client_fd); - if (ret < 0) - return ret; - - ret = server_enqueue_worker(server, worker); - if (ret < 0) - goto destroy_worker; - - return ret; - -destroy_worker: - worker_destroy(worker); - - return 0; -} - -static struct cmd_desc commands[] = { - {CMD_NEW_WORKER, handle_cmd_new_worker}, - {CMD_RUN, handle_cmd_run}, - {CMD_COMPLETE, handle_cmd_complete}, -}; - -static const size_t numof_commands = sizeof(commands) / sizeof(commands[0]); - static int server_listen_thread(struct server *server) { - struct cmd_dispatcher *dispatcher = NULL; int ret = 0; - ret = cmd_dispatcher_create(&dispatcher, commands, numof_commands, server); - if (ret < 0) - return ret; - - while (!global_stop_flag) { + while (!server->stopping) { log("Waiting for new connections\n"); - ret = tcp_server_accept(server->tcp_server, cmd_dispatcher_handle_conn, dispatcher); - if (ret < 0) { - if ((errno == EINTR || errno == EINVAL) && global_stop_flag) - ret = 0; - goto dispatcher_destroy; - } + ret = event_loop_run(server->event_loop); + if (ret < 0) + return ret; } -dispatcher_destroy: - cmd_dispatcher_destroy(dispatcher); - - return server_set_stopping(server); + return 0; } int server_main(struct server *server) diff --git a/src/signal.c b/src/signal.c index 7d00bc9..9f788b2 100644 --- a/src/signal.c +++ b/src/signal.c @@ -7,14 +7,25 @@ #include "signal.h" #include "compiler.h" +#include "event_loop.h" +#include "file.h" #include "log.h" #include <signal.h> #include <stddef.h> #include <string.h> +#include <sys/signalfd.h> +#include <unistd.h> static int stop_signals[] = {SIGINT, SIGTERM, SIGQUIT}; +static void stops_set(sigset_t *set) +{ + sigemptyset(set); + for (size_t i = 0; i < sizeof(stop_signals) / sizeof(stop_signals[0]); ++i) + sigaddset(set, stop_signals[i]); +} + volatile sig_atomic_t global_stop_flag = 0; static void set_global_stop_flag(UNUSED int signum) @@ -78,11 +89,7 @@ int signal_block_all(sigset_t *old) int signal_block_stops(void) { sigset_t set; - sigemptyset(&set); - - for (size_t i = 0; i < sizeof(stop_signals) / sizeof(stop_signals[0]); ++i) - sigaddset(&set, stop_signals[i]); - + stops_set(&set); return signal_set(&set, NULL); } @@ -90,3 +97,43 @@ int signal_restore(const sigset_t *new) { return signal_set(new, NULL); } + +int signalfd_create(const sigset_t *set) +{ + sigset_t old; + int ret = 0; + + ret = signal_set(set, &old); + if (ret < 0) + return ret; + + ret = signalfd(-1, set, SFD_CLOEXEC); + if (ret < 0) + goto restore; + + return ret; + +restore: + signal_set(&old, NULL); + + return ret; +} + +int signalfd_listen_for_stops(void) +{ + sigset_t set; + stops_set(&set); + return signalfd_create(&set); +} + +void signalfd_destroy(int fd) +{ + file_close(fd); +} + +int signalfd_add_to_event_loop(int fd, struct event_loop *loop, event_loop_handler handler, + void *arg) +{ + struct event_fd entry = {.fd = fd, .events = POLLIN, .handler = handler, .arg = arg}; + return event_loop_add(loop, &entry); +} diff --git a/src/signal.h b/src/signal.h index 4f1c280..e3f5897 100644 --- a/src/signal.h +++ b/src/signal.h @@ -8,6 +8,8 @@ #ifndef __SIGNAL_H__ #define __SIGNAL_H__ +#include "event_loop.h" + #include <signal.h> extern volatile sig_atomic_t global_stop_flag; @@ -18,4 +20,11 @@ int signal_block_stops(void); int signal_restore(const sigset_t *new); +int signalfd_create(const sigset_t *); +void signalfd_destroy(int fd); + +int signalfd_add_to_event_loop(int fd, struct event_loop *, event_loop_handler handler, void *arg); + +int signalfd_listen_for_stops(void); + #endif diff --git a/src/string.c b/src/string.c new file mode 100644 index 0000000..878efeb --- /dev/null +++ b/src/string.c @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2023 Egor Tensin <Egor.Tensin@gmail.com> + * This file is part of the "cimple" project. + * For details, see https://github.com/egor-tensin/cimple. + * Distributed under the MIT License. + */ + +#include "string.h" + +#include <string.h> + +char *stpecpy(char *dst, char *end, const char *src) +{ + if (!dst) + return NULL; + if (dst == end) + return end; + + char *p = memccpy(dst, src, '\0', end - dst); + if (p) + return p - 1; + + end[-1] = '\0'; + return end; +} diff --git a/src/string.h b/src/string.h new file mode 100644 index 0000000..fb77e64 --- /dev/null +++ b/src/string.h @@ -0,0 +1,13 @@ +/* + * Copyright (c) 2023 Egor Tensin <Egor.Tensin@gmail.com> + * This file is part of the "cimple" project. + * For details, see https://github.com/egor-tensin/cimple. + * Distributed under the MIT License. + */ + +#ifndef __STRING_H__ +#define __STRING_H__ + +char *stpecpy(char *dst, char *end, const char *src); + +#endif diff --git a/src/tcp_server.c b/src/tcp_server.c index 2e65bc5..a6263f7 100644 --- a/src/tcp_server.c +++ b/src/tcp_server.c @@ -6,6 +6,8 @@ */ #include "tcp_server.h" +#include "compiler.h" +#include "event_loop.h" #include "log.h" #include "net.h" #include "signal.h" @@ -16,18 +18,24 @@ struct tcp_server { int fd; + tcp_server_conn_handler conn_handler; + void *arg; }; -int tcp_server_create(struct tcp_server **_server, const char *port) +int tcp_server_create(struct tcp_server **_server, const char *port, + tcp_server_conn_handler conn_handler, void *arg) { int ret = 0; - struct tcp_server *server = malloc(sizeof(struct tcp_server)); + struct tcp_server *server = calloc(1, sizeof(struct tcp_server)); if (!server) { log_errno("malloc"); return -1; } + server->conn_handler = conn_handler; + server->arg = arg; + ret = net_bind(port); if (ret < 0) goto free; @@ -50,7 +58,7 @@ void tcp_server_destroy(struct tcp_server *server) struct child_context { int fd; - tcp_server_conn_handler handler; + tcp_server_conn_handler conn_handler; void *arg; }; @@ -65,7 +73,7 @@ static void *connection_thread(void *_ctx) if (ret < 0) goto free_ctx; - ctx->handler(ctx->fd, ctx->arg); + ctx->conn_handler(ctx->fd, ctx->arg); free_ctx: free(ctx); @@ -73,7 +81,7 @@ free_ctx: return NULL; } -int tcp_server_accept(const struct tcp_server *server, tcp_server_conn_handler handler, void *arg) +static int create_connection_thread(int fd, tcp_server_conn_handler conn_handler, void *arg) { sigset_t old_mask; pthread_t child; @@ -81,24 +89,20 @@ int tcp_server_accept(const struct tcp_server *server, tcp_server_conn_handler h struct child_context *ctx = calloc(1, sizeof(*ctx)); if (!ctx) { - log_errno("malloc"); + log_errno("calloc"); return -1; } - ctx->handler = handler; + ctx->fd = fd; + ctx->conn_handler = conn_handler; ctx->arg = arg; - ret = net_accept(server->fd); - if (ret < 0) - goto free_ctx; - ctx->fd = ret; - /* Block all signals (we'll unblock them later); the child thread will * have all signals blocked initially. This allows the main thread to * handle SIGINT/SIGTERM/etc. */ ret = signal_block_all(&old_mask); if (ret < 0) - goto close_conn; + goto free_ctx; ret = pthread_create(&child, NULL, connection_thread, ctx); if (ret) { @@ -106,19 +110,53 @@ int tcp_server_accept(const struct tcp_server *server, tcp_server_conn_handler h goto restore_mask; } +restore_mask: /* Restore the previously-enabled signals for handling in the main thread. */ signal_restore(&old_mask); return ret; -restore_mask: - signal_restore(&old_mask); - -close_conn: - net_close(ctx->fd); - free_ctx: free(ctx); return ret; } + +int tcp_server_accept(const struct tcp_server *server) +{ + int fd = -1, ret = 0; + + ret = net_accept(server->fd); + if (ret < 0) + return ret; + fd = ret; + + ret = create_connection_thread(fd, server->conn_handler, server->arg); + if (ret < 0) + goto close_conn; + + return ret; + +close_conn: + net_close(fd); + + return ret; +} + +static int tcp_server_event_loop_handler(UNUSED struct event_loop *loop, UNUSED int fd, + UNUSED short revents, void *_server) +{ + struct tcp_server *server = (struct tcp_server *)_server; + return tcp_server_accept(server); +} + +int tcp_server_add_to_event_loop(struct tcp_server *server, struct event_loop *loop) +{ + struct event_fd entry = { + .fd = server->fd, + .events = POLLIN, + .handler = tcp_server_event_loop_handler, + .arg = server, + }; + return event_loop_add(loop, &entry); +} diff --git a/src/tcp_server.h b/src/tcp_server.h index 0f377d8..f96e3f1 100644 --- a/src/tcp_server.h +++ b/src/tcp_server.h @@ -8,12 +8,17 @@ #ifndef __TCP_SERVER_H__ #define __TCP_SERVER_H__ +#include "event_loop.h" + struct tcp_server; -int tcp_server_create(struct tcp_server **, const char *port); +typedef int (*tcp_server_conn_handler)(int conn_fd, void *arg); + +int tcp_server_create(struct tcp_server **, const char *port, tcp_server_conn_handler, void *arg); void tcp_server_destroy(struct tcp_server *); -typedef int (*tcp_server_conn_handler)(int conn_fd, void *arg); -int tcp_server_accept(const struct tcp_server *, tcp_server_conn_handler, void *arg); +int tcp_server_accept(const struct tcp_server *); + +int tcp_server_add_to_event_loop(struct tcp_server *, struct event_loop *); #endif diff --git a/src/worker.c b/src/worker.c index d7ad310..8ecaed8 100644 --- a/src/worker.c +++ b/src/worker.c @@ -10,74 +10,78 @@ #include "command.h" #include "compiler.h" #include "const.h" +#include "event_loop.h" #include "git.h" #include "log.h" #include "msg.h" +#include "net.h" #include "process.h" #include "run_queue.h" #include "signal.h" #include <stdlib.h> +#include <string.h> struct worker { - int dummy; -}; + struct settings *settings; -int worker_create(struct worker **_worker) -{ - int ret = 0; + int stopping; - ret = signal_handle_stops(); - if (ret < 0) - return ret; + struct cmd_dispatcher *cmd_dispatcher; - struct worker *worker = malloc(sizeof(struct worker)); - if (!worker) { + struct event_loop *event_loop; + int signalfd; +}; + +static struct settings *worker_settings_copy(const struct settings *src) +{ + struct settings *result = malloc(sizeof(*src)); + if (!result) { log_errno("malloc"); - return -1; + return NULL; } - ret = libgit_init(); - if (ret < 0) - goto free; - - *_worker = worker; - return ret; + result->host = strdup(src->host); + if (!result->host) { + log_errno("strdup"); + goto free_result; + } -free: - free(worker); + result->port = strdup(src->port); + if (!result->port) { + log_errno("strdup"); + goto free_host; + } - return ret; -} + return result; -void worker_destroy(struct worker *worker) -{ - log("Shutting down\n"); +free_host: + free(result->host); - libgit_shutdown(); - free(worker); -} +free_result: + free(result); -static int worker_send_to_server(const struct settings *settings, const struct msg *request, - struct msg **response) -{ - return msg_connect_and_talk(settings->host, settings->port, request, response); + return NULL; } -static int worker_send_to_server_argv(const struct settings *settings, const char **argv, - struct msg **response) +static void worker_settings_destroy(struct settings *settings) { - return msg_connect_and_talk_argv(settings->host, settings->port, argv, response); + free(settings->port); + free(settings->host); + free(settings); } -static int worker_send_new_worker(const struct settings *settings, struct msg **task) +static int worker_set_stopping(UNUSED struct event_loop *loop, UNUSED int fd, UNUSED short revents, + void *_worker) { - static const char *argv[] = {CMD_NEW_WORKER, NULL}; - return worker_send_to_server_argv(settings, argv, task); + struct worker *worker = (struct worker *)_worker; + worker->stopping = 1; + return 0; } -static int msg_run_handler(const struct msg *request, struct msg **response, UNUSED void *ctx) +static int worker_handle_run(const struct msg *request, UNUSED struct msg **response, void *_ctx) { + struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx; struct run *run = NULL; struct proc_output result; int ret = 0; @@ -96,11 +100,17 @@ static int msg_run_handler(const struct msg *request, struct msg **response, UNU proc_output_dump(&result); + struct worker *worker = (struct worker *)ctx->arg; static const char *argv[] = {CMD_COMPLETE, NULL}; - ret = msg_from_argv(response, argv); + + ret = msg_connect_and_talk_argv(worker->settings->host, worker->settings->port, argv, NULL); if (ret < 0) goto free_output; + /* Close the descriptor and remove it from the event loop. + * poll(2) is too confusing, honestly. */ + ret = EVENT_LOOP_REMOVE; + free_output: proc_output_free(&result); @@ -110,49 +120,111 @@ free_output: } static struct cmd_desc commands[] = { - {CMD_RUN, msg_run_handler}, + {CMD_RUN, worker_handle_run}, }; static const size_t numof_commands = sizeof(commands) / sizeof(commands[0]); -int worker_main(UNUSED struct worker *worker, const struct settings *settings) +int worker_create(struct worker **_worker, const struct settings *settings) { - struct msg *task = NULL; - struct cmd_dispatcher *dispatcher = NULL; int ret = 0; - ret = cmd_dispatcher_create(&dispatcher, commands, numof_commands, NULL); + struct worker *worker = malloc(sizeof(struct worker)); + if (!worker) { + log_errno("malloc"); + return -1; + } + + worker->settings = worker_settings_copy(settings); + if (!worker->settings) { + ret = -1; + goto free; + } + + worker->stopping = 0; + + ret = cmd_dispatcher_create(&worker->cmd_dispatcher, commands, numof_commands, worker); if (ret < 0) - return ret; + goto free_settings; - log("Waiting for a new command\n"); + ret = event_loop_create(&worker->event_loop); + if (ret < 0) + goto destroy_cmd_dispatcher; - ret = worker_send_new_worker(settings, &task); - if (ret < 0) { - if ((errno == EINTR || errno == EINVAL) && global_stop_flag) - ret = 0; - goto dispatcher_destroy; - } + ret = signalfd_listen_for_stops(); + if (ret < 0) + goto destroy_event_loop; + worker->signalfd = ret; + + ret = signalfd_add_to_event_loop(worker->signalfd, worker->event_loop, worker_set_stopping, + worker); + if (ret < 0) + goto close_signalfd; + + ret = libgit_init(); + if (ret < 0) + goto close_signalfd; + + *_worker = worker; + return ret; + +close_signalfd: + signalfd_destroy(worker->signalfd); + +destroy_event_loop: + event_loop_destroy(worker->event_loop); + +destroy_cmd_dispatcher: + cmd_dispatcher_destroy(worker->cmd_dispatcher); + +free_settings: + worker_settings_destroy(worker->settings); + +free: + free(worker); + + return ret; +} - while (!global_stop_flag) { - struct msg *result = NULL; +void worker_destroy(struct worker *worker) +{ + log("Shutting down\n"); - ret = cmd_dispatcher_handle(dispatcher, task, &result); - msg_free(task); + libgit_shutdown(); + signalfd_destroy(worker->signalfd); + event_loop_destroy(worker->event_loop); + cmd_dispatcher_destroy(worker->cmd_dispatcher); + worker_settings_destroy(worker->settings); + free(worker); +} + +int worker_main(struct worker *worker) +{ + int ret = 0; + + while (!worker->stopping) { + ret = net_connect(worker->settings->host, worker->settings->port); if (ret < 0) - goto dispatcher_destroy; - - ret = worker_send_to_server(settings, result, &task); - msg_free(result); - if (ret < 0) { - if ((errno == EINTR || errno == EINVAL) && global_stop_flag) - ret = 0; - goto dispatcher_destroy; - } - } + return ret; + + const int fd = ret; + static const char *argv[] = {CMD_NEW_WORKER, NULL}; + + ret = msg_send_argv(fd, argv); + if (ret < 0) + return ret; -dispatcher_destroy: - cmd_dispatcher_destroy(dispatcher); + ret = cmd_dispatcher_add_to_event_loop(worker->cmd_dispatcher, worker->event_loop, + fd); + if (ret < 0) + return ret; + + log("Waiting for a new command\n"); + + ret = event_loop_run(worker->event_loop); + if (ret < 0) + return ret; + } return ret; } diff --git a/src/worker.h b/src/worker.h index bf603df..8eb325c 100644 --- a/src/worker.h +++ b/src/worker.h @@ -15,9 +15,9 @@ struct settings { struct worker; -int worker_create(struct worker **); +int worker_create(struct worker **, const struct settings *); void worker_destroy(struct worker *); -int worker_main(struct worker *, const struct settings *); +int worker_main(struct worker *); #endif diff --git a/src/worker_main.c b/src/worker_main.c index b9e9b9b..dcfee47 100644 --- a/src/worker_main.c +++ b/src/worker_main.c @@ -70,11 +70,11 @@ int main(int argc, char *argv[]) if (ret < 0) return ret; - ret = worker_create(&worker); + ret = worker_create(&worker, &settings); if (ret < 0) return ret; - ret = worker_main(worker, &settings); + ret = worker_main(worker); if (ret < 0) goto destroy_worker; |