aboutsummaryrefslogtreecommitdiffstatshomepage
diff options
context:
space:
mode:
-rw-r--r--src/event_loop.c10
-rw-r--r--src/server.c8
-rw-r--r--src/tcp_server.c272
-rw-r--r--src/tcp_server.h7
-rw-r--r--src/worker.c9
-rw-r--r--src/worker_queue.c5
-rw-r--r--test/CMakeLists.txt2
7 files changed, 217 insertions, 96 deletions
diff --git a/src/event_loop.c b/src/event_loop.c
index 93b67f2..75a323c 100644
--- a/src/event_loop.c
+++ b/src/event_loop.c
@@ -7,7 +7,6 @@
#include "event_loop.h"
#include "log.h"
-#include "net.h"
#include "string.h"
#include <poll.h>
@@ -126,7 +125,6 @@ static void event_loop_remove(struct event_loop *loop, struct event_fd *entry)
log_debug("Removing descriptor %d from event loop\n", entry->fd);
SIMPLEQ_REMOVE(&loop->entries, entry, event_fd, entries);
- net_close(entry->fd);
event_fd_destroy(entry);
--loop->nfds;
}
@@ -191,11 +189,15 @@ static struct pollfd *make_pollfds(const struct event_loop *loop)
int event_loop_run(struct event_loop *loop)
{
+ /* Cache the number of event descriptors so that event handlers can
+ * append new ones. */
+ const nfds_t nfds = loop->nfds;
+
struct pollfd *fds = make_pollfds(loop);
if (!fds)
return -1;
- int ret = poll(fds, loop->nfds, -1);
+ int ret = poll(fds, nfds, -1);
if (ret < 0) {
log_errno("poll");
return ret;
@@ -203,7 +205,7 @@ int event_loop_run(struct event_loop *loop)
ret = 0;
struct event_fd *entry = SIMPLEQ_FIRST(&loop->entries);
- for (nfds_t i = 0; i < loop->nfds; ++i) {
+ for (nfds_t i = 0; i < nfds; ++i) {
struct event_fd *next = SIMPLEQ_NEXT(entry, entries);
if (!fds[i].revents)
diff --git a/src/server.c b/src/server.c
index b48dc79..2fbb13c 100644
--- a/src/server.c
+++ b/src/server.c
@@ -377,15 +377,11 @@ int server_create(struct server **_server, const struct settings *settings)
if (ret < 0)
goto destroy_storage;
- ret = tcp_server_create(&server->tcp_server, settings->port, cmd_dispatcher_handle_conn,
- server->cmd_dispatcher);
+ ret = tcp_server_create(&server->tcp_server, server->event_loop, settings->port,
+ cmd_dispatcher_handle_conn, server->cmd_dispatcher);
if (ret < 0)
goto destroy_run_queue;
- ret = tcp_server_add_to_event_loop(server->tcp_server, server->event_loop);
- if (ret < 0)
- goto destroy_tcp_server;
-
ret = pthread_create(&server->main_thread, NULL, server_main_thread, server);
if (ret) {
pthread_errno(ret, "pthread_create");
diff --git a/src/tcp_server.c b/src/tcp_server.c
index 221b27e..2681ec0 100644
--- a/src/tcp_server.c
+++ b/src/tcp_server.c
@@ -16,96 +16,127 @@
#include <pthread.h>
#include <signal.h>
#include <stdlib.h>
+#include <sys/eventfd.h>
+#include <sys/queue.h>
+#include <unistd.h>
-struct tcp_server {
- int fd;
- tcp_server_conn_handler conn_handler;
- void *arg;
+/*
+ * This is a simple threaded TCP server implementation. Each client is handled
+ * in a separate thread.
+ *
+ * It used to be much simpler; basically, we have two types of client
+ * connections: those made by cimple-worker and cimple-client respectively.
+ * cimple-server would keep track of cimple-worker threads/connections, and
+ * clean them up when assigning tasks/on shutdown.
+ *
+ * What about cimple-client connections though? I struggled to come up with a
+ * scheme that would allow cimple-server to clean them up gracefully. When
+ * would it do the cleanup even? I didn't want to do it on shutdown, since
+ * there would be potentially a lot of them.
+ *
+ * One solution is to make client threads detached. This is a common advise;
+ * I really don't understand the merit of this approach though. Client threads
+ * actively work on shared data, take locks, etc. Data corruption is very
+ * likely after the main thread exits and all the rest are killed.
+ *
+ * Another approach is pre-threading; we make a number of threads beforehand
+ * and handle all client connections; I view this approach as limiting in
+ * principle; probably that's foolish of me.
+ *
+ * Finally, I cannot bring myself to do non-blocking I/O. I honestly fear the
+ * amount of work it would require to maintain read buffers, etc.
+ *
+ * So I came up with this convoluted scheme. The TCP server adds the listening
+ * socket to the event loop, as before. Each client thread makes an eventfd
+ * descriptor that it writes to when it's about to finish. The eventfd
+ * descriptor is added to the event loop; once it's readable, we clean up the
+ * client thread quickly from the main event loop thread. The TCP server itself
+ * keeps track of client threads; on shutdown, it cleans up those still working.
+ *
+ * I'm _really_ not sure about this approach, it seems fishy as hell; I guess,
+ * we'll see.
+ */
+
+struct client {
+ struct tcp_server *server;
+ int conn_fd;
+
+ int cleanup_fd;
+
+ pid_t tid;
+ pthread_t thread;
+
+ SIMPLEQ_ENTRY(client) entries;
};
-int tcp_server_create(struct tcp_server **_server, const char *port,
- tcp_server_conn_handler conn_handler, void *arg)
-{
- int ret = 0;
+SIMPLEQ_HEAD(client_queue, client);
- struct tcp_server *server = calloc(1, sizeof(struct tcp_server));
- if (!server) {
- log_errno("calloc");
- return -1;
- }
+struct tcp_server {
+ struct event_loop *loop;
- server->conn_handler = conn_handler;
- server->arg = arg;
+ tcp_server_conn_handler conn_handler;
+ void *conn_handler_arg;
- ret = net_bind(port);
- if (ret < 0)
- goto free;
- server->fd = ret;
+ struct client_queue client_queue;
- *_server = server;
- return ret;
+ int accept_fd;
+};
-free:
- free(server);
+static void client_destroy(struct client *client)
+{
+ log_debug("Cleaning up client thread %d\n", client->tid);
- return ret;
+ SIMPLEQ_REMOVE(&client->server->client_queue, client, client, entries);
+ pthread_errno_if(pthread_join(client->thread, NULL), "pthread_join");
+ net_close(client->cleanup_fd);
+ free(client);
}
-void tcp_server_destroy(struct tcp_server *server)
+static int client_destroy_handler(UNUSED struct event_loop *loop, UNUSED int fd,
+ UNUSED short revents, void *_client)
{
- net_close(server->fd);
- free(server);
-}
+ struct client *client = (struct client *)_client;
+ log_debug("Client thread %d indicated that it's done\n", client->tid);
-struct child_context {
- int fd;
- tcp_server_conn_handler conn_handler;
- void *arg;
-};
+ client_destroy(client);
+ return 0;
+}
-static void *connection_thread(void *_ctx)
+static void *client_thread_func(void *_client)
{
- struct child_context *ctx = (struct child_context *)_ctx;
+ struct client *client = (struct client *)_client;
int ret = 0;
- /* Let the child thread handle its signals except those that should be
+ client->tid = gettid();
+ log_debug("New client thread thread %d has started\n", client->tid);
+
+ /* Let the client thread handle its signals except those that should be
* handled in the main thread. */
ret = signal_block_sigterms();
if (ret < 0)
- goto close;
+ goto cleanup;
- ctx->conn_handler(ctx->fd, ctx->arg);
+ client->server->conn_handler(client->conn_fd, client->server->conn_handler_arg);
+
+cleanup:
+ log_errno_if(eventfd_write(client->cleanup_fd, 1), "eventfd_write");
-close:
- net_close(ctx->fd);
- free(ctx);
return NULL;
}
-static int create_connection_thread(int fd, tcp_server_conn_handler conn_handler, void *arg)
+static int client_create_thread(struct client *client)
{
sigset_t old_mask;
- pthread_t child;
int ret = 0;
- struct child_context *ctx = calloc(1, sizeof(*ctx));
- if (!ctx) {
- log_errno("calloc");
- return -1;
- }
-
- ctx->fd = fd;
- ctx->conn_handler = conn_handler;
- ctx->arg = arg;
-
- /* Block all signals (we'll unblock them later); the child thread will
+ /* Block all signals (we'll unblock them later); the client thread will
* have all signals blocked initially. This allows the main thread to
* handle SIGINT/SIGTERM/etc. */
ret = signal_block_all(&old_mask);
if (ret < 0)
- goto free_ctx;
+ return ret;
- ret = pthread_create(&child, NULL, connection_thread, ctx);
+ ret = pthread_create(&client->thread, NULL, client_thread_func, client);
if (ret) {
pthread_errno(ret, "pthread_create");
goto restore_mask;
@@ -116,42 +147,137 @@ restore_mask:
signal_set_mask(&old_mask);
return ret;
+}
+
+static int client_create(struct tcp_server *server, int conn_fd)
+{
+ int ret = 0;
+
+ struct client *client = calloc(1, sizeof(struct client));
+ if (!client) {
+ log_errno("calloc");
+ return -1;
+ }
+
+ client->server = server;
+ client->conn_fd = conn_fd;
+
+ ret = eventfd(0, EFD_CLOEXEC);
+ if (ret < 0) {
+ log_errno("eventfd");
+ goto free;
+ }
+ client->cleanup_fd = ret;
+
+ ret = event_loop_add_once(server->loop, client->cleanup_fd, POLLIN, client_destroy_handler,
+ client);
+ if (ret < 0)
+ goto close_cleanup_fd;
-free_ctx:
- free(ctx);
+ SIMPLEQ_INSERT_TAIL(&server->client_queue, client, entries);
+
+ ret = client_create_thread(client);
+ if (ret < 0)
+ goto remove_from_client_queue;
+
+ return ret;
+
+remove_from_client_queue:
+ SIMPLEQ_REMOVE(&server->client_queue, client, client, entries);
+
+close_cleanup_fd:
+ net_close(client->cleanup_fd);
+
+free:
+ free(client);
return ret;
}
-int tcp_server_accept(const struct tcp_server *server)
+static void client_queue_create(struct client_queue *client_queue)
{
- int fd = -1, ret = 0;
+ SIMPLEQ_INIT(client_queue);
+}
- ret = net_accept(server->fd);
+static void client_queue_destroy(struct client_queue *client_queue)
+{
+ struct client *entry1 = SIMPLEQ_FIRST(client_queue);
+ while (entry1) {
+ struct client *entry2 = SIMPLEQ_NEXT(entry1, entries);
+ client_destroy(entry1);
+ entry1 = entry2;
+ }
+}
+
+static int tcp_server_accept_handler(UNUSED struct event_loop *loop, UNUSED int fd,
+ UNUSED short revents, void *_server)
+{
+ struct tcp_server *server = (struct tcp_server *)_server;
+ return tcp_server_accept(server);
+}
+
+int tcp_server_create(struct tcp_server **_server, struct event_loop *loop, const char *port,
+ tcp_server_conn_handler conn_handler, void *conn_handler_arg)
+{
+ int ret = 0;
+
+ struct tcp_server *server = calloc(1, sizeof(struct tcp_server));
+ if (!server) {
+ log_errno("calloc");
+ return -1;
+ }
+
+ server->loop = loop;
+
+ server->conn_handler = conn_handler;
+ server->conn_handler_arg = conn_handler_arg;
+
+ client_queue_create(&server->client_queue);
+
+ ret = net_bind(port);
if (ret < 0)
- return ret;
- fd = ret;
+ goto free;
+ server->accept_fd = ret;
- ret = create_connection_thread(fd, server->conn_handler, server->arg);
+ ret = event_loop_add(loop, server->accept_fd, POLLIN, tcp_server_accept_handler, server);
if (ret < 0)
- goto close_conn;
+ goto close;
+ *_server = server;
return ret;
-close_conn:
- net_close(fd);
+close:
+ net_close(server->accept_fd);
+free:
+ free(server);
return ret;
}
-static int tcp_server_event_handler(UNUSED struct event_loop *loop, UNUSED int fd,
- UNUSED short revents, void *_server)
+void tcp_server_destroy(struct tcp_server *server)
{
- struct tcp_server *server = (struct tcp_server *)_server;
- return tcp_server_accept(server);
+ net_close(server->accept_fd);
+ client_queue_destroy(&server->client_queue);
+ free(server);
}
-int tcp_server_add_to_event_loop(struct tcp_server *server, struct event_loop *loop)
+int tcp_server_accept(struct tcp_server *server)
{
- return event_loop_add(loop, server->fd, POLLIN, tcp_server_event_handler, server);
+ int conn_fd = -1, ret = 0;
+
+ ret = net_accept(server->accept_fd);
+ if (ret < 0)
+ return ret;
+ conn_fd = ret;
+
+ ret = client_create(server, conn_fd);
+ if (ret < 0)
+ goto close_conn;
+
+ return ret;
+
+close_conn:
+ net_close(conn_fd);
+
+ return ret;
}
diff --git a/src/tcp_server.h b/src/tcp_server.h
index f96e3f1..979e6dc 100644
--- a/src/tcp_server.h
+++ b/src/tcp_server.h
@@ -14,11 +14,10 @@ struct tcp_server;
typedef int (*tcp_server_conn_handler)(int conn_fd, void *arg);
-int tcp_server_create(struct tcp_server **, const char *port, tcp_server_conn_handler, void *arg);
+int tcp_server_create(struct tcp_server **, struct event_loop *, const char *port,
+ tcp_server_conn_handler, void *arg);
void tcp_server_destroy(struct tcp_server *);
-int tcp_server_accept(const struct tcp_server *);
-
-int tcp_server_add_to_event_loop(struct tcp_server *, struct event_loop *);
+int tcp_server_accept(struct tcp_server *);
#endif
diff --git a/src/worker.c b/src/worker.c
index d7cd610..3bfca42 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -237,10 +237,13 @@ int worker_main(struct worker *worker)
ret = event_loop_run(worker->event_loop);
if (ret < 0)
goto close;
- }
-close:
- net_close(fd);
+ close:
+ net_close(fd);
+
+ if (ret < 0)
+ break;
+ }
return ret;
}
diff --git a/src/worker_queue.c b/src/worker_queue.c
index fd449c1..867ad90 100644
--- a/src/worker_queue.c
+++ b/src/worker_queue.c
@@ -9,12 +9,10 @@
#include "log.h"
#include "net.h"
-#include <pthread.h>
#include <stdlib.h>
#include <sys/queue.h>
struct worker {
- pthread_t thread;
int fd;
SIMPLEQ_ENTRY(worker) entries;
};
@@ -27,7 +25,6 @@ int worker_create(struct worker **_entry, int fd)
return -1;
}
- entry->thread = pthread_self();
entry->fd = fd;
*_entry = entry;
@@ -36,8 +33,6 @@ int worker_create(struct worker **_entry, int fd)
void worker_destroy(struct worker *entry)
{
- log("Waiting for worker %d thread to exit\n", entry->fd);
- pthread_errno_if(pthread_join(entry->thread, NULL), "pthread_join");
net_close(entry->fd);
free(entry);
}
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index 18b49be..b8a115c 100644
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -13,7 +13,7 @@ function(add_python_tests name)
add_test(NAME "${name}"
WORKING_DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}/py"
COMMAND ${ARGV})
- set_tests_properties("${name}" PROPERTIES TIMEOUT 60)
+ set_tests_properties("${name}" PROPERTIES TIMEOUT 300)
endfunction()
add_python_tests(python_tests