From 8465f8181eda45e3d6cc5d6c3d08ca36db04763b Mon Sep 17 00:00:00 2001 From: Egor Tensin Date: Wed, 5 Jul 2023 15:30:57 +0200 Subject: tcp_server: keep track of client threads This is a major change, obviously; brought to me by Valgrind, which noticed that we don't actually clean up after cimple-client threads. For a more thorough explanation, please see the added comment in tcp_server.c. --- src/event_loop.c | 10 +- src/server.c | 8 +- src/tcp_server.c | 272 ++++++++++++++++++++++++++++++++++++++-------------- src/tcp_server.h | 7 +- src/worker.c | 9 +- src/worker_queue.c | 5 - test/CMakeLists.txt | 2 +- 7 files changed, 217 insertions(+), 96 deletions(-) diff --git a/src/event_loop.c b/src/event_loop.c index 93b67f2..75a323c 100644 --- a/src/event_loop.c +++ b/src/event_loop.c @@ -7,7 +7,6 @@ #include "event_loop.h" #include "log.h" -#include "net.h" #include "string.h" #include @@ -126,7 +125,6 @@ static void event_loop_remove(struct event_loop *loop, struct event_fd *entry) log_debug("Removing descriptor %d from event loop\n", entry->fd); SIMPLEQ_REMOVE(&loop->entries, entry, event_fd, entries); - net_close(entry->fd); event_fd_destroy(entry); --loop->nfds; } @@ -191,11 +189,15 @@ static struct pollfd *make_pollfds(const struct event_loop *loop) int event_loop_run(struct event_loop *loop) { + /* Cache the number of event descriptors so that event handlers can + * append new ones. */ + const nfds_t nfds = loop->nfds; + struct pollfd *fds = make_pollfds(loop); if (!fds) return -1; - int ret = poll(fds, loop->nfds, -1); + int ret = poll(fds, nfds, -1); if (ret < 0) { log_errno("poll"); return ret; @@ -203,7 +205,7 @@ int event_loop_run(struct event_loop *loop) ret = 0; struct event_fd *entry = SIMPLEQ_FIRST(&loop->entries); - for (nfds_t i = 0; i < loop->nfds; ++i) { + for (nfds_t i = 0; i < nfds; ++i) { struct event_fd *next = SIMPLEQ_NEXT(entry, entries); if (!fds[i].revents) diff --git a/src/server.c b/src/server.c index b48dc79..2fbb13c 100644 --- a/src/server.c +++ b/src/server.c @@ -377,15 +377,11 @@ int server_create(struct server **_server, const struct settings *settings) if (ret < 0) goto destroy_storage; - ret = tcp_server_create(&server->tcp_server, settings->port, cmd_dispatcher_handle_conn, - server->cmd_dispatcher); + ret = tcp_server_create(&server->tcp_server, server->event_loop, settings->port, + cmd_dispatcher_handle_conn, server->cmd_dispatcher); if (ret < 0) goto destroy_run_queue; - ret = tcp_server_add_to_event_loop(server->tcp_server, server->event_loop); - if (ret < 0) - goto destroy_tcp_server; - ret = pthread_create(&server->main_thread, NULL, server_main_thread, server); if (ret) { pthread_errno(ret, "pthread_create"); diff --git a/src/tcp_server.c b/src/tcp_server.c index 221b27e..2681ec0 100644 --- a/src/tcp_server.c +++ b/src/tcp_server.c @@ -16,96 +16,127 @@ #include #include #include +#include +#include +#include -struct tcp_server { - int fd; - tcp_server_conn_handler conn_handler; - void *arg; +/* + * This is a simple threaded TCP server implementation. Each client is handled + * in a separate thread. + * + * It used to be much simpler; basically, we have two types of client + * connections: those made by cimple-worker and cimple-client respectively. + * cimple-server would keep track of cimple-worker threads/connections, and + * clean them up when assigning tasks/on shutdown. + * + * What about cimple-client connections though? I struggled to come up with a + * scheme that would allow cimple-server to clean them up gracefully. When + * would it do the cleanup even? I didn't want to do it on shutdown, since + * there would be potentially a lot of them. + * + * One solution is to make client threads detached. This is a common advise; + * I really don't understand the merit of this approach though. Client threads + * actively work on shared data, take locks, etc. Data corruption is very + * likely after the main thread exits and all the rest are killed. + * + * Another approach is pre-threading; we make a number of threads beforehand + * and handle all client connections; I view this approach as limiting in + * principle; probably that's foolish of me. + * + * Finally, I cannot bring myself to do non-blocking I/O. I honestly fear the + * amount of work it would require to maintain read buffers, etc. + * + * So I came up with this convoluted scheme. The TCP server adds the listening + * socket to the event loop, as before. Each client thread makes an eventfd + * descriptor that it writes to when it's about to finish. The eventfd + * descriptor is added to the event loop; once it's readable, we clean up the + * client thread quickly from the main event loop thread. The TCP server itself + * keeps track of client threads; on shutdown, it cleans up those still working. + * + * I'm _really_ not sure about this approach, it seems fishy as hell; I guess, + * we'll see. + */ + +struct client { + struct tcp_server *server; + int conn_fd; + + int cleanup_fd; + + pid_t tid; + pthread_t thread; + + SIMPLEQ_ENTRY(client) entries; }; -int tcp_server_create(struct tcp_server **_server, const char *port, - tcp_server_conn_handler conn_handler, void *arg) -{ - int ret = 0; +SIMPLEQ_HEAD(client_queue, client); - struct tcp_server *server = calloc(1, sizeof(struct tcp_server)); - if (!server) { - log_errno("calloc"); - return -1; - } +struct tcp_server { + struct event_loop *loop; - server->conn_handler = conn_handler; - server->arg = arg; + tcp_server_conn_handler conn_handler; + void *conn_handler_arg; - ret = net_bind(port); - if (ret < 0) - goto free; - server->fd = ret; + struct client_queue client_queue; - *_server = server; - return ret; + int accept_fd; +}; -free: - free(server); +static void client_destroy(struct client *client) +{ + log_debug("Cleaning up client thread %d\n", client->tid); - return ret; + SIMPLEQ_REMOVE(&client->server->client_queue, client, client, entries); + pthread_errno_if(pthread_join(client->thread, NULL), "pthread_join"); + net_close(client->cleanup_fd); + free(client); } -void tcp_server_destroy(struct tcp_server *server) +static int client_destroy_handler(UNUSED struct event_loop *loop, UNUSED int fd, + UNUSED short revents, void *_client) { - net_close(server->fd); - free(server); -} + struct client *client = (struct client *)_client; + log_debug("Client thread %d indicated that it's done\n", client->tid); -struct child_context { - int fd; - tcp_server_conn_handler conn_handler; - void *arg; -}; + client_destroy(client); + return 0; +} -static void *connection_thread(void *_ctx) +static void *client_thread_func(void *_client) { - struct child_context *ctx = (struct child_context *)_ctx; + struct client *client = (struct client *)_client; int ret = 0; - /* Let the child thread handle its signals except those that should be + client->tid = gettid(); + log_debug("New client thread thread %d has started\n", client->tid); + + /* Let the client thread handle its signals except those that should be * handled in the main thread. */ ret = signal_block_sigterms(); if (ret < 0) - goto close; + goto cleanup; - ctx->conn_handler(ctx->fd, ctx->arg); + client->server->conn_handler(client->conn_fd, client->server->conn_handler_arg); + +cleanup: + log_errno_if(eventfd_write(client->cleanup_fd, 1), "eventfd_write"); -close: - net_close(ctx->fd); - free(ctx); return NULL; } -static int create_connection_thread(int fd, tcp_server_conn_handler conn_handler, void *arg) +static int client_create_thread(struct client *client) { sigset_t old_mask; - pthread_t child; int ret = 0; - struct child_context *ctx = calloc(1, sizeof(*ctx)); - if (!ctx) { - log_errno("calloc"); - return -1; - } - - ctx->fd = fd; - ctx->conn_handler = conn_handler; - ctx->arg = arg; - - /* Block all signals (we'll unblock them later); the child thread will + /* Block all signals (we'll unblock them later); the client thread will * have all signals blocked initially. This allows the main thread to * handle SIGINT/SIGTERM/etc. */ ret = signal_block_all(&old_mask); if (ret < 0) - goto free_ctx; + return ret; - ret = pthread_create(&child, NULL, connection_thread, ctx); + ret = pthread_create(&client->thread, NULL, client_thread_func, client); if (ret) { pthread_errno(ret, "pthread_create"); goto restore_mask; @@ -116,42 +147,137 @@ restore_mask: signal_set_mask(&old_mask); return ret; +} + +static int client_create(struct tcp_server *server, int conn_fd) +{ + int ret = 0; + + struct client *client = calloc(1, sizeof(struct client)); + if (!client) { + log_errno("calloc"); + return -1; + } + + client->server = server; + client->conn_fd = conn_fd; + + ret = eventfd(0, EFD_CLOEXEC); + if (ret < 0) { + log_errno("eventfd"); + goto free; + } + client->cleanup_fd = ret; + + ret = event_loop_add_once(server->loop, client->cleanup_fd, POLLIN, client_destroy_handler, + client); + if (ret < 0) + goto close_cleanup_fd; -free_ctx: - free(ctx); + SIMPLEQ_INSERT_TAIL(&server->client_queue, client, entries); + + ret = client_create_thread(client); + if (ret < 0) + goto remove_from_client_queue; + + return ret; + +remove_from_client_queue: + SIMPLEQ_REMOVE(&server->client_queue, client, client, entries); + +close_cleanup_fd: + net_close(client->cleanup_fd); + +free: + free(client); return ret; } -int tcp_server_accept(const struct tcp_server *server) +static void client_queue_create(struct client_queue *client_queue) { - int fd = -1, ret = 0; + SIMPLEQ_INIT(client_queue); +} - ret = net_accept(server->fd); +static void client_queue_destroy(struct client_queue *client_queue) +{ + struct client *entry1 = SIMPLEQ_FIRST(client_queue); + while (entry1) { + struct client *entry2 = SIMPLEQ_NEXT(entry1, entries); + client_destroy(entry1); + entry1 = entry2; + } +} + +static int tcp_server_accept_handler(UNUSED struct event_loop *loop, UNUSED int fd, + UNUSED short revents, void *_server) +{ + struct tcp_server *server = (struct tcp_server *)_server; + return tcp_server_accept(server); +} + +int tcp_server_create(struct tcp_server **_server, struct event_loop *loop, const char *port, + tcp_server_conn_handler conn_handler, void *conn_handler_arg) +{ + int ret = 0; + + struct tcp_server *server = calloc(1, sizeof(struct tcp_server)); + if (!server) { + log_errno("calloc"); + return -1; + } + + server->loop = loop; + + server->conn_handler = conn_handler; + server->conn_handler_arg = conn_handler_arg; + + client_queue_create(&server->client_queue); + + ret = net_bind(port); if (ret < 0) - return ret; - fd = ret; + goto free; + server->accept_fd = ret; - ret = create_connection_thread(fd, server->conn_handler, server->arg); + ret = event_loop_add(loop, server->accept_fd, POLLIN, tcp_server_accept_handler, server); if (ret < 0) - goto close_conn; + goto close; + *_server = server; return ret; -close_conn: - net_close(fd); +close: + net_close(server->accept_fd); +free: + free(server); return ret; } -static int tcp_server_event_handler(UNUSED struct event_loop *loop, UNUSED int fd, - UNUSED short revents, void *_server) +void tcp_server_destroy(struct tcp_server *server) { - struct tcp_server *server = (struct tcp_server *)_server; - return tcp_server_accept(server); + net_close(server->accept_fd); + client_queue_destroy(&server->client_queue); + free(server); } -int tcp_server_add_to_event_loop(struct tcp_server *server, struct event_loop *loop) +int tcp_server_accept(struct tcp_server *server) { - return event_loop_add(loop, server->fd, POLLIN, tcp_server_event_handler, server); + int conn_fd = -1, ret = 0; + + ret = net_accept(server->accept_fd); + if (ret < 0) + return ret; + conn_fd = ret; + + ret = client_create(server, conn_fd); + if (ret < 0) + goto close_conn; + + return ret; + +close_conn: + net_close(conn_fd); + + return ret; } diff --git a/src/tcp_server.h b/src/tcp_server.h index f96e3f1..979e6dc 100644 --- a/src/tcp_server.h +++ b/src/tcp_server.h @@ -14,11 +14,10 @@ struct tcp_server; typedef int (*tcp_server_conn_handler)(int conn_fd, void *arg); -int tcp_server_create(struct tcp_server **, const char *port, tcp_server_conn_handler, void *arg); +int tcp_server_create(struct tcp_server **, struct event_loop *, const char *port, + tcp_server_conn_handler, void *arg); void tcp_server_destroy(struct tcp_server *); -int tcp_server_accept(const struct tcp_server *); - -int tcp_server_add_to_event_loop(struct tcp_server *, struct event_loop *); +int tcp_server_accept(struct tcp_server *); #endif diff --git a/src/worker.c b/src/worker.c index d7cd610..3bfca42 100644 --- a/src/worker.c +++ b/src/worker.c @@ -237,10 +237,13 @@ int worker_main(struct worker *worker) ret = event_loop_run(worker->event_loop); if (ret < 0) goto close; - } -close: - net_close(fd); + close: + net_close(fd); + + if (ret < 0) + break; + } return ret; } diff --git a/src/worker_queue.c b/src/worker_queue.c index fd449c1..867ad90 100644 --- a/src/worker_queue.c +++ b/src/worker_queue.c @@ -9,12 +9,10 @@ #include "log.h" #include "net.h" -#include #include #include struct worker { - pthread_t thread; int fd; SIMPLEQ_ENTRY(worker) entries; }; @@ -27,7 +25,6 @@ int worker_create(struct worker **_entry, int fd) return -1; } - entry->thread = pthread_self(); entry->fd = fd; *_entry = entry; @@ -36,8 +33,6 @@ int worker_create(struct worker **_entry, int fd) void worker_destroy(struct worker *entry) { - log("Waiting for worker %d thread to exit\n", entry->fd); - pthread_errno_if(pthread_join(entry->thread, NULL), "pthread_join"); net_close(entry->fd); free(entry); } diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 18b49be..b8a115c 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -13,7 +13,7 @@ function(add_python_tests name) add_test(NAME "${name}" WORKING_DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}/py" COMMAND ${ARGV}) - set_tests_properties("${name}" PROPERTIES TIMEOUT 60) + set_tests_properties("${name}" PROPERTIES TIMEOUT 300) endfunction() add_python_tests(python_tests -- cgit v1.2.3