diff options
author | Egor Tensin <Egor.Tensin@gmail.com> | 2023-07-05 15:30:57 +0200 |
---|---|---|
committer | Egor Tensin <Egor.Tensin@gmail.com> | 2023-07-05 16:18:05 +0200 |
commit | 8465f8181eda45e3d6cc5d6c3d08ca36db04763b (patch) | |
tree | 1ca1d03cfe373a1ca28ee021d9b3ec73781fb487 /src/tcp_server.c | |
parent | sanitize #include-s (diff) | |
download | cimple-8465f8181eda45e3d6cc5d6c3d08ca36db04763b.tar.gz cimple-8465f8181eda45e3d6cc5d6c3d08ca36db04763b.zip |
tcp_server: keep track of client threads
This is a major change, obviously; brought to me by Valgrind, which
noticed that we don't actually clean up after cimple-client threads.
For a more thorough explanation, please see the added comment in
tcp_server.c.
Diffstat (limited to 'src/tcp_server.c')
-rw-r--r-- | src/tcp_server.c | 272 |
1 files changed, 199 insertions, 73 deletions
diff --git a/src/tcp_server.c b/src/tcp_server.c index 221b27e..2681ec0 100644 --- a/src/tcp_server.c +++ b/src/tcp_server.c @@ -16,96 +16,127 @@ #include <pthread.h> #include <signal.h> #include <stdlib.h> +#include <sys/eventfd.h> +#include <sys/queue.h> +#include <unistd.h> -struct tcp_server { - int fd; - tcp_server_conn_handler conn_handler; - void *arg; +/* + * This is a simple threaded TCP server implementation. Each client is handled + * in a separate thread. + * + * It used to be much simpler; basically, we have two types of client + * connections: those made by cimple-worker and cimple-client respectively. + * cimple-server would keep track of cimple-worker threads/connections, and + * clean them up when assigning tasks/on shutdown. + * + * What about cimple-client connections though? I struggled to come up with a + * scheme that would allow cimple-server to clean them up gracefully. When + * would it do the cleanup even? I didn't want to do it on shutdown, since + * there would be potentially a lot of them. + * + * One solution is to make client threads detached. This is a common advise; + * I really don't understand the merit of this approach though. Client threads + * actively work on shared data, take locks, etc. Data corruption is very + * likely after the main thread exits and all the rest are killed. + * + * Another approach is pre-threading; we make a number of threads beforehand + * and handle all client connections; I view this approach as limiting in + * principle; probably that's foolish of me. + * + * Finally, I cannot bring myself to do non-blocking I/O. I honestly fear the + * amount of work it would require to maintain read buffers, etc. + * + * So I came up with this convoluted scheme. The TCP server adds the listening + * socket to the event loop, as before. Each client thread makes an eventfd + * descriptor that it writes to when it's about to finish. The eventfd + * descriptor is added to the event loop; once it's readable, we clean up the + * client thread quickly from the main event loop thread. The TCP server itself + * keeps track of client threads; on shutdown, it cleans up those still working. + * + * I'm _really_ not sure about this approach, it seems fishy as hell; I guess, + * we'll see. + */ + +struct client { + struct tcp_server *server; + int conn_fd; + + int cleanup_fd; + + pid_t tid; + pthread_t thread; + + SIMPLEQ_ENTRY(client) entries; }; -int tcp_server_create(struct tcp_server **_server, const char *port, - tcp_server_conn_handler conn_handler, void *arg) -{ - int ret = 0; +SIMPLEQ_HEAD(client_queue, client); - struct tcp_server *server = calloc(1, sizeof(struct tcp_server)); - if (!server) { - log_errno("calloc"); - return -1; - } +struct tcp_server { + struct event_loop *loop; - server->conn_handler = conn_handler; - server->arg = arg; + tcp_server_conn_handler conn_handler; + void *conn_handler_arg; - ret = net_bind(port); - if (ret < 0) - goto free; - server->fd = ret; + struct client_queue client_queue; - *_server = server; - return ret; + int accept_fd; +}; -free: - free(server); +static void client_destroy(struct client *client) +{ + log_debug("Cleaning up client thread %d\n", client->tid); - return ret; + SIMPLEQ_REMOVE(&client->server->client_queue, client, client, entries); + pthread_errno_if(pthread_join(client->thread, NULL), "pthread_join"); + net_close(client->cleanup_fd); + free(client); } -void tcp_server_destroy(struct tcp_server *server) +static int client_destroy_handler(UNUSED struct event_loop *loop, UNUSED int fd, + UNUSED short revents, void *_client) { - net_close(server->fd); - free(server); -} + struct client *client = (struct client *)_client; + log_debug("Client thread %d indicated that it's done\n", client->tid); -struct child_context { - int fd; - tcp_server_conn_handler conn_handler; - void *arg; -}; + client_destroy(client); + return 0; +} -static void *connection_thread(void *_ctx) +static void *client_thread_func(void *_client) { - struct child_context *ctx = (struct child_context *)_ctx; + struct client *client = (struct client *)_client; int ret = 0; - /* Let the child thread handle its signals except those that should be + client->tid = gettid(); + log_debug("New client thread thread %d has started\n", client->tid); + + /* Let the client thread handle its signals except those that should be * handled in the main thread. */ ret = signal_block_sigterms(); if (ret < 0) - goto close; + goto cleanup; - ctx->conn_handler(ctx->fd, ctx->arg); + client->server->conn_handler(client->conn_fd, client->server->conn_handler_arg); + +cleanup: + log_errno_if(eventfd_write(client->cleanup_fd, 1), "eventfd_write"); -close: - net_close(ctx->fd); - free(ctx); return NULL; } -static int create_connection_thread(int fd, tcp_server_conn_handler conn_handler, void *arg) +static int client_create_thread(struct client *client) { sigset_t old_mask; - pthread_t child; int ret = 0; - struct child_context *ctx = calloc(1, sizeof(*ctx)); - if (!ctx) { - log_errno("calloc"); - return -1; - } - - ctx->fd = fd; - ctx->conn_handler = conn_handler; - ctx->arg = arg; - - /* Block all signals (we'll unblock them later); the child thread will + /* Block all signals (we'll unblock them later); the client thread will * have all signals blocked initially. This allows the main thread to * handle SIGINT/SIGTERM/etc. */ ret = signal_block_all(&old_mask); if (ret < 0) - goto free_ctx; + return ret; - ret = pthread_create(&child, NULL, connection_thread, ctx); + ret = pthread_create(&client->thread, NULL, client_thread_func, client); if (ret) { pthread_errno(ret, "pthread_create"); goto restore_mask; @@ -116,42 +147,137 @@ restore_mask: signal_set_mask(&old_mask); return ret; +} + +static int client_create(struct tcp_server *server, int conn_fd) +{ + int ret = 0; + + struct client *client = calloc(1, sizeof(struct client)); + if (!client) { + log_errno("calloc"); + return -1; + } + + client->server = server; + client->conn_fd = conn_fd; + + ret = eventfd(0, EFD_CLOEXEC); + if (ret < 0) { + log_errno("eventfd"); + goto free; + } + client->cleanup_fd = ret; + + ret = event_loop_add_once(server->loop, client->cleanup_fd, POLLIN, client_destroy_handler, + client); + if (ret < 0) + goto close_cleanup_fd; -free_ctx: - free(ctx); + SIMPLEQ_INSERT_TAIL(&server->client_queue, client, entries); + + ret = client_create_thread(client); + if (ret < 0) + goto remove_from_client_queue; + + return ret; + +remove_from_client_queue: + SIMPLEQ_REMOVE(&server->client_queue, client, client, entries); + +close_cleanup_fd: + net_close(client->cleanup_fd); + +free: + free(client); return ret; } -int tcp_server_accept(const struct tcp_server *server) +static void client_queue_create(struct client_queue *client_queue) { - int fd = -1, ret = 0; + SIMPLEQ_INIT(client_queue); +} - ret = net_accept(server->fd); +static void client_queue_destroy(struct client_queue *client_queue) +{ + struct client *entry1 = SIMPLEQ_FIRST(client_queue); + while (entry1) { + struct client *entry2 = SIMPLEQ_NEXT(entry1, entries); + client_destroy(entry1); + entry1 = entry2; + } +} + +static int tcp_server_accept_handler(UNUSED struct event_loop *loop, UNUSED int fd, + UNUSED short revents, void *_server) +{ + struct tcp_server *server = (struct tcp_server *)_server; + return tcp_server_accept(server); +} + +int tcp_server_create(struct tcp_server **_server, struct event_loop *loop, const char *port, + tcp_server_conn_handler conn_handler, void *conn_handler_arg) +{ + int ret = 0; + + struct tcp_server *server = calloc(1, sizeof(struct tcp_server)); + if (!server) { + log_errno("calloc"); + return -1; + } + + server->loop = loop; + + server->conn_handler = conn_handler; + server->conn_handler_arg = conn_handler_arg; + + client_queue_create(&server->client_queue); + + ret = net_bind(port); if (ret < 0) - return ret; - fd = ret; + goto free; + server->accept_fd = ret; - ret = create_connection_thread(fd, server->conn_handler, server->arg); + ret = event_loop_add(loop, server->accept_fd, POLLIN, tcp_server_accept_handler, server); if (ret < 0) - goto close_conn; + goto close; + *_server = server; return ret; -close_conn: - net_close(fd); +close: + net_close(server->accept_fd); +free: + free(server); return ret; } -static int tcp_server_event_handler(UNUSED struct event_loop *loop, UNUSED int fd, - UNUSED short revents, void *_server) +void tcp_server_destroy(struct tcp_server *server) { - struct tcp_server *server = (struct tcp_server *)_server; - return tcp_server_accept(server); + net_close(server->accept_fd); + client_queue_destroy(&server->client_queue); + free(server); } -int tcp_server_add_to_event_loop(struct tcp_server *server, struct event_loop *loop) +int tcp_server_accept(struct tcp_server *server) { - return event_loop_add(loop, server->fd, POLLIN, tcp_server_event_handler, server); + int conn_fd = -1, ret = 0; + + ret = net_accept(server->accept_fd); + if (ret < 0) + return ret; + conn_fd = ret; + + ret = client_create(server, conn_fd); + if (ret < 0) + goto close_conn; + + return ret; + +close_conn: + net_close(conn_fd); + + return ret; } |