aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/src/tcp_server.c
diff options
context:
space:
mode:
authorEgor Tensin <Egor.Tensin@gmail.com>2023-07-05 15:30:57 +0200
committerEgor Tensin <Egor.Tensin@gmail.com>2023-07-05 16:18:05 +0200
commit8465f8181eda45e3d6cc5d6c3d08ca36db04763b (patch)
tree1ca1d03cfe373a1ca28ee021d9b3ec73781fb487 /src/tcp_server.c
parentsanitize #include-s (diff)
downloadcimple-8465f8181eda45e3d6cc5d6c3d08ca36db04763b.tar.gz
cimple-8465f8181eda45e3d6cc5d6c3d08ca36db04763b.zip
tcp_server: keep track of client threads
This is a major change, obviously; brought to me by Valgrind, which noticed that we don't actually clean up after cimple-client threads. For a more thorough explanation, please see the added comment in tcp_server.c.
Diffstat (limited to '')
-rw-r--r--src/tcp_server.c272
1 files changed, 199 insertions, 73 deletions
diff --git a/src/tcp_server.c b/src/tcp_server.c
index 221b27e..2681ec0 100644
--- a/src/tcp_server.c
+++ b/src/tcp_server.c
@@ -16,96 +16,127 @@
#include <pthread.h>
#include <signal.h>
#include <stdlib.h>
+#include <sys/eventfd.h>
+#include <sys/queue.h>
+#include <unistd.h>
-struct tcp_server {
- int fd;
- tcp_server_conn_handler conn_handler;
- void *arg;
+/*
+ * This is a simple threaded TCP server implementation. Each client is handled
+ * in a separate thread.
+ *
+ * It used to be much simpler; basically, we have two types of client
+ * connections: those made by cimple-worker and cimple-client respectively.
+ * cimple-server would keep track of cimple-worker threads/connections, and
+ * clean them up when assigning tasks/on shutdown.
+ *
+ * What about cimple-client connections though? I struggled to come up with a
+ * scheme that would allow cimple-server to clean them up gracefully. When
+ * would it do the cleanup even? I didn't want to do it on shutdown, since
+ * there would be potentially a lot of them.
+ *
+ * One solution is to make client threads detached. This is a common advise;
+ * I really don't understand the merit of this approach though. Client threads
+ * actively work on shared data, take locks, etc. Data corruption is very
+ * likely after the main thread exits and all the rest are killed.
+ *
+ * Another approach is pre-threading; we make a number of threads beforehand
+ * and handle all client connections; I view this approach as limiting in
+ * principle; probably that's foolish of me.
+ *
+ * Finally, I cannot bring myself to do non-blocking I/O. I honestly fear the
+ * amount of work it would require to maintain read buffers, etc.
+ *
+ * So I came up with this convoluted scheme. The TCP server adds the listening
+ * socket to the event loop, as before. Each client thread makes an eventfd
+ * descriptor that it writes to when it's about to finish. The eventfd
+ * descriptor is added to the event loop; once it's readable, we clean up the
+ * client thread quickly from the main event loop thread. The TCP server itself
+ * keeps track of client threads; on shutdown, it cleans up those still working.
+ *
+ * I'm _really_ not sure about this approach, it seems fishy as hell; I guess,
+ * we'll see.
+ */
+
+struct client {
+ struct tcp_server *server;
+ int conn_fd;
+
+ int cleanup_fd;
+
+ pid_t tid;
+ pthread_t thread;
+
+ SIMPLEQ_ENTRY(client) entries;
};
-int tcp_server_create(struct tcp_server **_server, const char *port,
- tcp_server_conn_handler conn_handler, void *arg)
-{
- int ret = 0;
+SIMPLEQ_HEAD(client_queue, client);
- struct tcp_server *server = calloc(1, sizeof(struct tcp_server));
- if (!server) {
- log_errno("calloc");
- return -1;
- }
+struct tcp_server {
+ struct event_loop *loop;
- server->conn_handler = conn_handler;
- server->arg = arg;
+ tcp_server_conn_handler conn_handler;
+ void *conn_handler_arg;
- ret = net_bind(port);
- if (ret < 0)
- goto free;
- server->fd = ret;
+ struct client_queue client_queue;
- *_server = server;
- return ret;
+ int accept_fd;
+};
-free:
- free(server);
+static void client_destroy(struct client *client)
+{
+ log_debug("Cleaning up client thread %d\n", client->tid);
- return ret;
+ SIMPLEQ_REMOVE(&client->server->client_queue, client, client, entries);
+ pthread_errno_if(pthread_join(client->thread, NULL), "pthread_join");
+ net_close(client->cleanup_fd);
+ free(client);
}
-void tcp_server_destroy(struct tcp_server *server)
+static int client_destroy_handler(UNUSED struct event_loop *loop, UNUSED int fd,
+ UNUSED short revents, void *_client)
{
- net_close(server->fd);
- free(server);
-}
+ struct client *client = (struct client *)_client;
+ log_debug("Client thread %d indicated that it's done\n", client->tid);
-struct child_context {
- int fd;
- tcp_server_conn_handler conn_handler;
- void *arg;
-};
+ client_destroy(client);
+ return 0;
+}
-static void *connection_thread(void *_ctx)
+static void *client_thread_func(void *_client)
{
- struct child_context *ctx = (struct child_context *)_ctx;
+ struct client *client = (struct client *)_client;
int ret = 0;
- /* Let the child thread handle its signals except those that should be
+ client->tid = gettid();
+ log_debug("New client thread thread %d has started\n", client->tid);
+
+ /* Let the client thread handle its signals except those that should be
* handled in the main thread. */
ret = signal_block_sigterms();
if (ret < 0)
- goto close;
+ goto cleanup;
- ctx->conn_handler(ctx->fd, ctx->arg);
+ client->server->conn_handler(client->conn_fd, client->server->conn_handler_arg);
+
+cleanup:
+ log_errno_if(eventfd_write(client->cleanup_fd, 1), "eventfd_write");
-close:
- net_close(ctx->fd);
- free(ctx);
return NULL;
}
-static int create_connection_thread(int fd, tcp_server_conn_handler conn_handler, void *arg)
+static int client_create_thread(struct client *client)
{
sigset_t old_mask;
- pthread_t child;
int ret = 0;
- struct child_context *ctx = calloc(1, sizeof(*ctx));
- if (!ctx) {
- log_errno("calloc");
- return -1;
- }
-
- ctx->fd = fd;
- ctx->conn_handler = conn_handler;
- ctx->arg = arg;
-
- /* Block all signals (we'll unblock them later); the child thread will
+ /* Block all signals (we'll unblock them later); the client thread will
* have all signals blocked initially. This allows the main thread to
* handle SIGINT/SIGTERM/etc. */
ret = signal_block_all(&old_mask);
if (ret < 0)
- goto free_ctx;
+ return ret;
- ret = pthread_create(&child, NULL, connection_thread, ctx);
+ ret = pthread_create(&client->thread, NULL, client_thread_func, client);
if (ret) {
pthread_errno(ret, "pthread_create");
goto restore_mask;
@@ -116,42 +147,137 @@ restore_mask:
signal_set_mask(&old_mask);
return ret;
+}
+
+static int client_create(struct tcp_server *server, int conn_fd)
+{
+ int ret = 0;
+
+ struct client *client = calloc(1, sizeof(struct client));
+ if (!client) {
+ log_errno("calloc");
+ return -1;
+ }
+
+ client->server = server;
+ client->conn_fd = conn_fd;
+
+ ret = eventfd(0, EFD_CLOEXEC);
+ if (ret < 0) {
+ log_errno("eventfd");
+ goto free;
+ }
+ client->cleanup_fd = ret;
+
+ ret = event_loop_add_once(server->loop, client->cleanup_fd, POLLIN, client_destroy_handler,
+ client);
+ if (ret < 0)
+ goto close_cleanup_fd;
-free_ctx:
- free(ctx);
+ SIMPLEQ_INSERT_TAIL(&server->client_queue, client, entries);
+
+ ret = client_create_thread(client);
+ if (ret < 0)
+ goto remove_from_client_queue;
+
+ return ret;
+
+remove_from_client_queue:
+ SIMPLEQ_REMOVE(&server->client_queue, client, client, entries);
+
+close_cleanup_fd:
+ net_close(client->cleanup_fd);
+
+free:
+ free(client);
return ret;
}
-int tcp_server_accept(const struct tcp_server *server)
+static void client_queue_create(struct client_queue *client_queue)
{
- int fd = -1, ret = 0;
+ SIMPLEQ_INIT(client_queue);
+}
- ret = net_accept(server->fd);
+static void client_queue_destroy(struct client_queue *client_queue)
+{
+ struct client *entry1 = SIMPLEQ_FIRST(client_queue);
+ while (entry1) {
+ struct client *entry2 = SIMPLEQ_NEXT(entry1, entries);
+ client_destroy(entry1);
+ entry1 = entry2;
+ }
+}
+
+static int tcp_server_accept_handler(UNUSED struct event_loop *loop, UNUSED int fd,
+ UNUSED short revents, void *_server)
+{
+ struct tcp_server *server = (struct tcp_server *)_server;
+ return tcp_server_accept(server);
+}
+
+int tcp_server_create(struct tcp_server **_server, struct event_loop *loop, const char *port,
+ tcp_server_conn_handler conn_handler, void *conn_handler_arg)
+{
+ int ret = 0;
+
+ struct tcp_server *server = calloc(1, sizeof(struct tcp_server));
+ if (!server) {
+ log_errno("calloc");
+ return -1;
+ }
+
+ server->loop = loop;
+
+ server->conn_handler = conn_handler;
+ server->conn_handler_arg = conn_handler_arg;
+
+ client_queue_create(&server->client_queue);
+
+ ret = net_bind(port);
if (ret < 0)
- return ret;
- fd = ret;
+ goto free;
+ server->accept_fd = ret;
- ret = create_connection_thread(fd, server->conn_handler, server->arg);
+ ret = event_loop_add(loop, server->accept_fd, POLLIN, tcp_server_accept_handler, server);
if (ret < 0)
- goto close_conn;
+ goto close;
+ *_server = server;
return ret;
-close_conn:
- net_close(fd);
+close:
+ net_close(server->accept_fd);
+free:
+ free(server);
return ret;
}
-static int tcp_server_event_handler(UNUSED struct event_loop *loop, UNUSED int fd,
- UNUSED short revents, void *_server)
+void tcp_server_destroy(struct tcp_server *server)
{
- struct tcp_server *server = (struct tcp_server *)_server;
- return tcp_server_accept(server);
+ net_close(server->accept_fd);
+ client_queue_destroy(&server->client_queue);
+ free(server);
}
-int tcp_server_add_to_event_loop(struct tcp_server *server, struct event_loop *loop)
+int tcp_server_accept(struct tcp_server *server)
{
- return event_loop_add(loop, server->fd, POLLIN, tcp_server_event_handler, server);
+ int conn_fd = -1, ret = 0;
+
+ ret = net_accept(server->accept_fd);
+ if (ret < 0)
+ return ret;
+ conn_fd = ret;
+
+ ret = client_create(server, conn_fd);
+ if (ret < 0)
+ goto close_conn;
+
+ return ret;
+
+close_conn:
+ net_close(conn_fd);
+
+ return ret;
}