aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/src/server.c
diff options
context:
space:
mode:
authorEgor Tensin <Egor.Tensin@gmail.com>2023-06-12 01:42:08 +0200
committerEgor Tensin <Egor.Tensin@gmail.com>2023-06-13 01:37:08 +0200
commit48ce9170b057ddd2165b0239a92aede15849f7a3 (patch)
treec9928af6202081d9521107f1dc0ae362f54a6adc /src/server.c
parentlog: refactoring (diff)
downloadcimple-48ce9170b057ddd2165b0239a92aede15849f7a3.tar.gz
cimple-48ce9170b057ddd2165b0239a92aede15849f7a3.zip
use signalfd to stop on SIGTERM
Is this an overkill? I don't know. The thing is, correctly intercepting SIGTERM (also SIGINT, etc.) is incredibly tricky. For example, before this commit, my I/O loops in server.c and worker.c were inherently racy. This was immediately obvious if you tried to run the tests. The tests (especially the Valgrind flavour) would run a worker, wait until it prints a "Waiting for a new command" line, and try to kill it using SIGTERM. The problem is, the global_stop_flag check could have already been executed by the worker, and it would hang forever in recv(). The solution seems to be to use signalfd and select()/poll(). I've never used either before, but it seems to work well enough - at least the very same tests pass and don't hang now.
Diffstat (limited to 'src/server.c')
-rw-r--r--src/server.c261
1 files changed, 138 insertions, 123 deletions
diff --git a/src/server.c b/src/server.c
index a5319e0..2839a61 100644
--- a/src/server.c
+++ b/src/server.c
@@ -9,6 +9,7 @@
#include "command.h"
#include "compiler.h"
#include "const.h"
+#include "event_loop.h"
#include "log.h"
#include "msg.h"
#include "run_queue.h"
@@ -27,6 +28,11 @@ struct server {
int stopping;
+ struct cmd_dispatcher *cmd_dispatcher;
+
+ struct event_loop *event_loop;
+ int signalfd;
+
struct worker_queue worker_queue;
struct run_queue run_queue;
@@ -67,8 +73,10 @@ static void server_notify(struct server *server)
pthread_errno_if(pthread_cond_signal(&server->server_cv), "pthread_cond_signal");
}
-static int server_set_stopping(struct server *server)
+static int server_set_stopping(UNUSED struct event_loop *loop, UNUSED int fd, UNUSED short revents,
+ void *_server)
{
+ struct server *server = (struct server *)_server;
int ret = 0;
ret = server_lock(server);
@@ -117,7 +125,7 @@ static int server_enqueue_run(struct server *server, struct run *run)
return ret;
run_queue_add_last(&server->run_queue, run);
- log("Added a new CI run for repository %s to the queue\n", run_get_url(run));
+ log("Added a new run for repository %s to the queue\n", run_get_url(run));
server_notify(server);
server_unlock(server);
@@ -147,25 +155,26 @@ static int server_assign_run(struct server *server)
int ret = 0;
struct run *run = run_queue_remove_first(&server->run_queue);
- log("Removed a CI run for repository %s from the queue\n", run_get_url(run));
+ log("Removed run for repository %s from the queue\n", run_get_url(run));
struct worker *worker = worker_queue_remove_first(&server->worker_queue);
log("Removed worker %d from the queue\n", worker_get_fd(worker));
const char *argv[] = {CMD_RUN, run_get_url(run), run_get_rev(run), NULL};
-
ret = msg_talk_argv(worker_get_fd(worker), argv, NULL);
+
if (ret < 0) {
- /* Failed to communicate with the worker, requeue the run
- * and forget about the worker. */
- worker_destroy(worker);
+ log("Failed to assign run for repository %s to worker %d, requeueing\n",
+ run_get_url(run), worker_get_fd(worker));
run_queue_add_first(&server->run_queue, run);
- return ret;
+ } else {
+ log("Assigned run for repository %s to worker %d\n", run_get_url(run),
+ worker_get_fd(worker));
+ run_destroy(run);
}
- /* Send the run to the worker, forget about both of them for a while. */
worker_destroy(worker);
- run_destroy(run);
+
return ret;
}
@@ -198,15 +207,89 @@ exit:
return NULL;
}
-int server_create(struct server **_server, const struct settings *settings)
+static int handle_cmd_new_worker(UNUSED const struct msg *request, UNUSED struct msg **response,
+ void *_ctx)
{
- struct storage_settings storage_settings;
+ struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx;
+ struct server *server = (struct server *)ctx->arg;
+ int client_fd = ctx->fd;
+
+ struct worker *worker = NULL;
+ int ret = 0;
+
+ ret = worker_create(&worker, client_fd);
+ if (ret < 0)
+ return ret;
+
+ ret = server_enqueue_worker(server, worker);
+ if (ret < 0)
+ goto destroy_worker;
+
+ return ret;
+
+destroy_worker:
+ worker_destroy(worker);
+
+ return ret;
+}
+
+static int handle_cmd_run(const struct msg *request, struct msg **response, void *_ctx)
+{
+ struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx;
+ struct server *server = (struct server *)ctx->arg;
+ struct run *run = NULL;
+
int ret = 0;
- ret = signal_handle_stops();
+ ret = run_from_msg(&run, request);
if (ret < 0)
return ret;
+ ret = msg_success(response);
+ if (ret < 0)
+ goto destroy_run;
+
+ ret = server_enqueue_run(server, run);
+ if (ret < 0)
+ goto free_response;
+
+ return ret;
+
+free_response:
+ msg_free(*response);
+ *response = NULL;
+
+destroy_run:
+ run_destroy(run);
+
+ return ret;
+}
+
+static int handle_cmd_complete(UNUSED const struct msg *request, UNUSED struct msg **response,
+ void *_ctx)
+{
+ struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx;
+ int client_fd = ctx->fd;
+ int ret = 0;
+
+ log("Received a \"run complete\" message from worker %d\n", client_fd);
+
+ return ret;
+}
+
+static struct cmd_desc commands[] = {
+ {CMD_NEW_WORKER, handle_cmd_new_worker},
+ {CMD_RUN, handle_cmd_run},
+ {CMD_COMPLETE, handle_cmd_complete},
+};
+
+static const size_t numof_commands = sizeof(commands) / sizeof(commands[0]);
+
+int server_create(struct server **_server, const struct settings *settings)
+{
+ struct storage_settings storage_settings;
+ int ret = 0;
+
struct server *server = malloc(sizeof(struct server));
if (!server) {
log_errno("malloc");
@@ -227,6 +310,24 @@ int server_create(struct server **_server, const struct settings *settings)
server->stopping = 0;
+ ret = cmd_dispatcher_create(&server->cmd_dispatcher, commands, numof_commands, server);
+ if (ret < 0)
+ goto destroy_cv;
+
+ ret = event_loop_create(&server->event_loop);
+ if (ret < 0)
+ goto destroy_cmd_dispatcher;
+
+ ret = signalfd_listen_for_stops();
+ if (ret < 0)
+ goto destroy_event_loop;
+ server->signalfd = ret;
+
+ ret = signalfd_add_to_event_loop(server->signalfd, server->event_loop, server_set_stopping,
+ server);
+ if (ret < 0)
+ goto close_signalfd;
+
worker_queue_create(&server->worker_queue);
run_queue_create(&server->run_queue);
@@ -239,10 +340,15 @@ int server_create(struct server **_server, const struct settings *settings)
if (ret < 0)
goto destroy_run_queue;
- ret = tcp_server_create(&server->tcp_server, settings->port);
+ ret = tcp_server_create(&server->tcp_server, settings->port, cmd_dispatcher_handle_conn,
+ server->cmd_dispatcher);
if (ret < 0)
goto destroy_storage;
+ ret = tcp_server_add_to_event_loop(server->tcp_server, server->event_loop);
+ if (ret < 0)
+ goto destroy_tcp_server;
+
ret = pthread_create(&server->main_thread, NULL, server_main_thread, server);
if (ret) {
pthread_errno(ret, "pthread_create");
@@ -263,6 +369,16 @@ destroy_run_queue:
worker_queue_destroy(&server->worker_queue);
+close_signalfd:
+ signalfd_destroy(server->signalfd);
+
+destroy_event_loop:
+ event_loop_destroy(server->event_loop);
+
+destroy_cmd_dispatcher:
+ cmd_dispatcher_destroy(server->cmd_dispatcher);
+
+destroy_cv:
pthread_errno_if(pthread_cond_destroy(&server->server_cv), "pthread_cond_destroy");
destroy_mtx:
@@ -283,128 +399,27 @@ void server_destroy(struct server *server)
storage_destroy(&server->storage);
run_queue_destroy(&server->run_queue);
worker_queue_destroy(&server->worker_queue);
+ signalfd_destroy(server->signalfd);
+ event_loop_destroy(server->event_loop);
+ cmd_dispatcher_destroy(server->cmd_dispatcher);
pthread_errno_if(pthread_cond_destroy(&server->server_cv), "pthread_cond_destroy");
pthread_errno_if(pthread_mutex_destroy(&server->server_mtx), "pthread_mutex_destroy");
free(server);
}
-static int handle_cmd_new_worker(UNUSED const struct msg *request, UNUSED struct msg **response,
- void *_ctx)
-{
- struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx;
- struct server *server = (struct server *)ctx->arg;
- int client_fd = ctx->fd;
-
- struct worker *worker = NULL;
- int ret = 0;
-
- ret = worker_create(&worker, client_fd);
- if (ret < 0)
- return ret;
-
- ret = server_enqueue_worker(server, worker);
- if (ret < 0)
- goto destroy_worker;
-
- return ret;
-
-destroy_worker:
- worker_destroy(worker);
-
- return ret;
-}
-
-static int handle_cmd_run(const struct msg *request, struct msg **response, void *_ctx)
-{
- struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx;
- struct server *server = (struct server *)ctx->arg;
- struct run *run = NULL;
-
- int ret = 0;
-
- ret = run_from_msg(&run, request);
- if (ret < 0)
- return ret;
-
- ret = msg_success(response);
- if (ret < 0)
- goto destroy_run;
-
- ret = server_enqueue_run(server, run);
- if (ret < 0)
- goto free_response;
-
- return ret;
-
-free_response:
- msg_free(*response);
-
-destroy_run:
- run_destroy(run);
-
- return ret;
-}
-
-static int handle_cmd_complete(UNUSED const struct msg *request, UNUSED struct msg **response,
- void *_ctx)
-{
- struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx;
- struct server *server = (struct server *)ctx->arg;
- int client_fd = ctx->fd;
-
- struct worker *worker = NULL;
- int ret = 0;
-
- log("Received a \"run complete\" message from worker %d\n", client_fd);
-
- ret = worker_create(&worker, client_fd);
- if (ret < 0)
- return ret;
-
- ret = server_enqueue_worker(server, worker);
- if (ret < 0)
- goto destroy_worker;
-
- return ret;
-
-destroy_worker:
- worker_destroy(worker);
-
- return 0;
-}
-
-static struct cmd_desc commands[] = {
- {CMD_NEW_WORKER, handle_cmd_new_worker},
- {CMD_RUN, handle_cmd_run},
- {CMD_COMPLETE, handle_cmd_complete},
-};
-
-static const size_t numof_commands = sizeof(commands) / sizeof(commands[0]);
-
static int server_listen_thread(struct server *server)
{
- struct cmd_dispatcher *dispatcher = NULL;
int ret = 0;
- ret = cmd_dispatcher_create(&dispatcher, commands, numof_commands, server);
- if (ret < 0)
- return ret;
-
- while (!global_stop_flag) {
+ while (!server->stopping) {
log("Waiting for new connections\n");
- ret = tcp_server_accept(server->tcp_server, cmd_dispatcher_handle_conn, dispatcher);
- if (ret < 0) {
- if ((errno == EINTR || errno == EINVAL) && global_stop_flag)
- ret = 0;
- goto dispatcher_destroy;
- }
+ ret = event_loop_run(server->event_loop);
+ if (ret < 0)
+ return ret;
}
-dispatcher_destroy:
- cmd_dispatcher_destroy(dispatcher);
-
- return server_set_stopping(server);
+ return 0;
}
int server_main(struct server *server)