aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/src/worker.c
diff options
context:
space:
mode:
authorEgor Tensin <Egor.Tensin@gmail.com>2023-06-12 01:42:08 +0200
committerEgor Tensin <Egor.Tensin@gmail.com>2023-06-13 01:37:08 +0200
commit48ce9170b057ddd2165b0239a92aede15849f7a3 (patch)
treec9928af6202081d9521107f1dc0ae362f54a6adc /src/worker.c
parentlog: refactoring (diff)
downloadcimple-48ce9170b057ddd2165b0239a92aede15849f7a3.tar.gz
cimple-48ce9170b057ddd2165b0239a92aede15849f7a3.zip
use signalfd to stop on SIGTERM
Is this an overkill? I don't know. The thing is, correctly intercepting SIGTERM (also SIGINT, etc.) is incredibly tricky. For example, before this commit, my I/O loops in server.c and worker.c were inherently racy. This was immediately obvious if you tried to run the tests. The tests (especially the Valgrind flavour) would run a worker, wait until it prints a "Waiting for a new command" line, and try to kill it using SIGTERM. The problem is, the global_stop_flag check could have already been executed by the worker, and it would hang forever in recv(). The solution seems to be to use signalfd and select()/poll(). I've never used either before, but it seems to work well enough - at least the very same tests pass and don't hang now.
Diffstat (limited to 'src/worker.c')
-rw-r--r--src/worker.c208
1 files changed, 140 insertions, 68 deletions
diff --git a/src/worker.c b/src/worker.c
index d7ad310..8ecaed8 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -10,74 +10,78 @@
#include "command.h"
#include "compiler.h"
#include "const.h"
+#include "event_loop.h"
#include "git.h"
#include "log.h"
#include "msg.h"
+#include "net.h"
#include "process.h"
#include "run_queue.h"
#include "signal.h"
#include <stdlib.h>
+#include <string.h>
struct worker {
- int dummy;
-};
+ struct settings *settings;
-int worker_create(struct worker **_worker)
-{
- int ret = 0;
+ int stopping;
- ret = signal_handle_stops();
- if (ret < 0)
- return ret;
+ struct cmd_dispatcher *cmd_dispatcher;
- struct worker *worker = malloc(sizeof(struct worker));
- if (!worker) {
+ struct event_loop *event_loop;
+ int signalfd;
+};
+
+static struct settings *worker_settings_copy(const struct settings *src)
+{
+ struct settings *result = malloc(sizeof(*src));
+ if (!result) {
log_errno("malloc");
- return -1;
+ return NULL;
}
- ret = libgit_init();
- if (ret < 0)
- goto free;
-
- *_worker = worker;
- return ret;
+ result->host = strdup(src->host);
+ if (!result->host) {
+ log_errno("strdup");
+ goto free_result;
+ }
-free:
- free(worker);
+ result->port = strdup(src->port);
+ if (!result->port) {
+ log_errno("strdup");
+ goto free_host;
+ }
- return ret;
-}
+ return result;
-void worker_destroy(struct worker *worker)
-{
- log("Shutting down\n");
+free_host:
+ free(result->host);
- libgit_shutdown();
- free(worker);
-}
+free_result:
+ free(result);
-static int worker_send_to_server(const struct settings *settings, const struct msg *request,
- struct msg **response)
-{
- return msg_connect_and_talk(settings->host, settings->port, request, response);
+ return NULL;
}
-static int worker_send_to_server_argv(const struct settings *settings, const char **argv,
- struct msg **response)
+static void worker_settings_destroy(struct settings *settings)
{
- return msg_connect_and_talk_argv(settings->host, settings->port, argv, response);
+ free(settings->port);
+ free(settings->host);
+ free(settings);
}
-static int worker_send_new_worker(const struct settings *settings, struct msg **task)
+static int worker_set_stopping(UNUSED struct event_loop *loop, UNUSED int fd, UNUSED short revents,
+ void *_worker)
{
- static const char *argv[] = {CMD_NEW_WORKER, NULL};
- return worker_send_to_server_argv(settings, argv, task);
+ struct worker *worker = (struct worker *)_worker;
+ worker->stopping = 1;
+ return 0;
}
-static int msg_run_handler(const struct msg *request, struct msg **response, UNUSED void *ctx)
+static int worker_handle_run(const struct msg *request, UNUSED struct msg **response, void *_ctx)
{
+ struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx;
struct run *run = NULL;
struct proc_output result;
int ret = 0;
@@ -96,11 +100,17 @@ static int msg_run_handler(const struct msg *request, struct msg **response, UNU
proc_output_dump(&result);
+ struct worker *worker = (struct worker *)ctx->arg;
static const char *argv[] = {CMD_COMPLETE, NULL};
- ret = msg_from_argv(response, argv);
+
+ ret = msg_connect_and_talk_argv(worker->settings->host, worker->settings->port, argv, NULL);
if (ret < 0)
goto free_output;
+ /* Close the descriptor and remove it from the event loop.
+ * poll(2) is too confusing, honestly. */
+ ret = EVENT_LOOP_REMOVE;
+
free_output:
proc_output_free(&result);
@@ -110,49 +120,111 @@ free_output:
}
static struct cmd_desc commands[] = {
- {CMD_RUN, msg_run_handler},
+ {CMD_RUN, worker_handle_run},
};
static const size_t numof_commands = sizeof(commands) / sizeof(commands[0]);
-int worker_main(UNUSED struct worker *worker, const struct settings *settings)
+int worker_create(struct worker **_worker, const struct settings *settings)
{
- struct msg *task = NULL;
- struct cmd_dispatcher *dispatcher = NULL;
int ret = 0;
- ret = cmd_dispatcher_create(&dispatcher, commands, numof_commands, NULL);
+ struct worker *worker = malloc(sizeof(struct worker));
+ if (!worker) {
+ log_errno("malloc");
+ return -1;
+ }
+
+ worker->settings = worker_settings_copy(settings);
+ if (!worker->settings) {
+ ret = -1;
+ goto free;
+ }
+
+ worker->stopping = 0;
+
+ ret = cmd_dispatcher_create(&worker->cmd_dispatcher, commands, numof_commands, worker);
if (ret < 0)
- return ret;
+ goto free_settings;
- log("Waiting for a new command\n");
+ ret = event_loop_create(&worker->event_loop);
+ if (ret < 0)
+ goto destroy_cmd_dispatcher;
- ret = worker_send_new_worker(settings, &task);
- if (ret < 0) {
- if ((errno == EINTR || errno == EINVAL) && global_stop_flag)
- ret = 0;
- goto dispatcher_destroy;
- }
+ ret = signalfd_listen_for_stops();
+ if (ret < 0)
+ goto destroy_event_loop;
+ worker->signalfd = ret;
+
+ ret = signalfd_add_to_event_loop(worker->signalfd, worker->event_loop, worker_set_stopping,
+ worker);
+ if (ret < 0)
+ goto close_signalfd;
+
+ ret = libgit_init();
+ if (ret < 0)
+ goto close_signalfd;
+
+ *_worker = worker;
+ return ret;
+
+close_signalfd:
+ signalfd_destroy(worker->signalfd);
+
+destroy_event_loop:
+ event_loop_destroy(worker->event_loop);
+
+destroy_cmd_dispatcher:
+ cmd_dispatcher_destroy(worker->cmd_dispatcher);
+
+free_settings:
+ worker_settings_destroy(worker->settings);
+
+free:
+ free(worker);
+
+ return ret;
+}
- while (!global_stop_flag) {
- struct msg *result = NULL;
+void worker_destroy(struct worker *worker)
+{
+ log("Shutting down\n");
- ret = cmd_dispatcher_handle(dispatcher, task, &result);
- msg_free(task);
+ libgit_shutdown();
+ signalfd_destroy(worker->signalfd);
+ event_loop_destroy(worker->event_loop);
+ cmd_dispatcher_destroy(worker->cmd_dispatcher);
+ worker_settings_destroy(worker->settings);
+ free(worker);
+}
+
+int worker_main(struct worker *worker)
+{
+ int ret = 0;
+
+ while (!worker->stopping) {
+ ret = net_connect(worker->settings->host, worker->settings->port);
if (ret < 0)
- goto dispatcher_destroy;
-
- ret = worker_send_to_server(settings, result, &task);
- msg_free(result);
- if (ret < 0) {
- if ((errno == EINTR || errno == EINVAL) && global_stop_flag)
- ret = 0;
- goto dispatcher_destroy;
- }
- }
+ return ret;
+
+ const int fd = ret;
+ static const char *argv[] = {CMD_NEW_WORKER, NULL};
+
+ ret = msg_send_argv(fd, argv);
+ if (ret < 0)
+ return ret;
-dispatcher_destroy:
- cmd_dispatcher_destroy(dispatcher);
+ ret = cmd_dispatcher_add_to_event_loop(worker->cmd_dispatcher, worker->event_loop,
+ fd);
+ if (ret < 0)
+ return ret;
+
+ log("Waiting for a new command\n");
+
+ ret = event_loop_run(worker->event_loop);
+ if (ret < 0)
+ return ret;
+ }
return ret;
}