aboutsummaryrefslogtreecommitdiffstatshomepage
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/CMakeLists.txt6
-rw-r--r--src/command.c79
-rw-r--r--src/command.h3
-rw-r--r--src/event_loop.c204
-rw-r--r--src/event_loop.h36
-rw-r--r--src/server.c261
-rw-r--r--src/signal.c57
-rw-r--r--src/signal.h9
-rw-r--r--src/string.c25
-rw-r--r--src/string.h13
-rw-r--r--src/tcp_server.c76
-rw-r--r--src/tcp_server.h11
-rw-r--r--src/worker.c208
-rw-r--r--src/worker.h4
-rw-r--r--src/worker_main.c4
15 files changed, 766 insertions, 230 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index af6e9fb..fb94b3a 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -39,6 +39,7 @@ generate_sql_header(sqlite)
add_my_executable(server server_main.c server.c
cmd_line.c
command.c
+ event_loop.c
file.c
msg.c
net.c
@@ -48,6 +49,7 @@ add_my_executable(server server_main.c server.c
sqlite.c
storage.c
storage_sqlite.c
+ string.c
tcp_server.c
worker_queue.c)
target_link_libraries(server PRIVATE pthread sqlite3)
@@ -63,11 +65,13 @@ add_my_executable(worker worker_main.c worker.c
ci.c
cmd_line.c
command.c
+ event_loop.c
file.c
git.c
msg.c
net.c
process.c
run_queue.c
- signal.c)
+ signal.c
+ string.c)
target_link_libraries(worker PRIVATE git2 pthread)
diff --git a/src/command.c b/src/command.c
index bd58f05..6306f7e 100644
--- a/src/command.c
+++ b/src/command.c
@@ -6,6 +6,8 @@
*/
#include "command.h"
+#include "compiler.h"
+#include "event_loop.h"
#include "log.h"
#include "msg.h"
@@ -128,20 +130,29 @@ int cmd_dispatcher_handle(const struct cmd_dispatcher *dispatcher, const struct
return cmd_dispatcher_handle_internal(dispatcher, command, result, dispatcher->ctx);
}
+static struct cmd_conn_ctx *make_conn_ctx(int fd, void *arg)
+{
+ struct cmd_conn_ctx *ctx = malloc(sizeof(struct cmd_conn_ctx));
+ if (!ctx) {
+ log_errno("malloc");
+ return NULL;
+ }
+
+ ctx->fd = fd;
+ ctx->arg = arg;
+
+ return ctx;
+}
+
int cmd_dispatcher_handle_conn(int conn_fd, void *_dispatcher)
{
struct cmd_dispatcher *dispatcher = (struct cmd_dispatcher *)_dispatcher;
struct msg *request = NULL, *response = NULL;
int ret = 0;
- struct cmd_conn_ctx *new_ctx = malloc(sizeof(struct cmd_conn_ctx));
- if (!new_ctx) {
- log_errno("malloc");
+ struct cmd_conn_ctx *new_ctx = make_conn_ctx(conn_fd, dispatcher->ctx);
+ if (!new_ctx)
return -1;
- }
-
- new_ctx->fd = conn_fd;
- new_ctx->arg = dispatcher->ctx;
ret = msg_recv(conn_fd, &request);
if (ret < 0)
@@ -168,3 +179,57 @@ free_ctx:
return ret;
}
+
+static int cmd_dispatcher_handle_event(UNUSED struct event_loop *loop, int fd, short revents,
+ void *_dispatcher)
+{
+ struct cmd_dispatcher *dispatcher = (struct cmd_dispatcher *)_dispatcher;
+ struct msg *request = NULL, *response = NULL;
+ int ret = 0;
+
+ if (!(revents & POLLIN)) {
+ log_err("Descriptor %d is not readable\n", fd);
+ return -1;
+ }
+
+ struct cmd_conn_ctx *new_ctx = make_conn_ctx(fd, dispatcher->ctx);
+ if (!new_ctx)
+ return -1;
+
+ ret = msg_recv(fd, &request);
+ if (ret < 0)
+ goto free_ctx;
+
+ ret = cmd_dispatcher_handle_internal(dispatcher, request, &response, new_ctx);
+ if (ret < 0)
+ goto free_response;
+
+ if (response) {
+ ret = msg_send(fd, response);
+ if (ret < 0)
+ goto free_response;
+ }
+
+free_response:
+ if (response)
+ msg_free(response);
+
+ msg_free(request);
+
+free_ctx:
+ free(new_ctx);
+
+ return ret;
+}
+
+int cmd_dispatcher_add_to_event_loop(const struct cmd_dispatcher *dispatcher,
+ struct event_loop *loop, int fd)
+{
+ struct event_fd entry = {
+ .fd = fd,
+ .events = POLLIN,
+ .handler = cmd_dispatcher_handle_event,
+ .arg = (void *)dispatcher,
+ };
+ return event_loop_add(loop, &entry);
+}
diff --git a/src/command.h b/src/command.h
index 90facbb..3d13abd 100644
--- a/src/command.h
+++ b/src/command.h
@@ -8,6 +8,7 @@
#ifndef __COMMAND_H__
#define __COMMAND_H__
+#include "event_loop.h"
#include "msg.h"
#include <stddef.h>
@@ -28,6 +29,8 @@ void cmd_dispatcher_destroy(struct cmd_dispatcher *);
int cmd_dispatcher_handle(const struct cmd_dispatcher *, const struct msg *command,
struct msg **response);
+int cmd_dispatcher_add_to_event_loop(const struct cmd_dispatcher *, struct event_loop *, int fd);
+
struct cmd_conn_ctx {
int fd;
void *arg;
diff --git a/src/event_loop.c b/src/event_loop.c
new file mode 100644
index 0000000..8d91480
--- /dev/null
+++ b/src/event_loop.c
@@ -0,0 +1,204 @@
+/*
+ * Copyright (c) 2023 Egor Tensin <Egor.Tensin@gmail.com>
+ * This file is part of the "cimple" project.
+ * For details, see https://github.com/egor-tensin/cimple.
+ * Distributed under the MIT License.
+ */
+
+#include "event_loop.h"
+#include "log.h"
+#include "net.h"
+#include "string.h"
+
+#include <poll.h>
+#include <stddef.h>
+#include <stdlib.h>
+#include <sys/queue.h>
+
+SIMPLEQ_HEAD(event_fd_queue, event_fd);
+
+static struct event_fd *event_fd_copy(const struct event_fd *src)
+{
+ struct event_fd *res = malloc(sizeof(*src));
+ if (!res) {
+ log_errno("malloc");
+ return NULL;
+ }
+ *res = *src;
+ return res;
+}
+
+static void event_fd_destroy(struct event_fd *entry)
+{
+ free(entry);
+}
+
+static void event_fd_queue_create(struct event_fd_queue *queue)
+{
+ SIMPLEQ_INIT(queue);
+}
+
+static void event_fd_queue_destroy(struct event_fd_queue *queue)
+{
+ struct event_fd *entry1 = SIMPLEQ_FIRST(queue);
+ while (entry1) {
+ struct event_fd *entry2 = SIMPLEQ_NEXT(entry1, entries);
+ event_fd_destroy(entry1);
+ entry1 = entry2;
+ }
+ SIMPLEQ_INIT(queue);
+}
+
+struct event_loop {
+ nfds_t nfds;
+ struct event_fd_queue entries;
+};
+
+int event_loop_create(struct event_loop **_loop)
+{
+ int ret = 0;
+
+ struct event_loop *loop = calloc(1, sizeof(struct event_loop));
+ if (!loop) {
+ log_errno("calloc");
+ return -1;
+ }
+ *_loop = loop;
+
+ event_fd_queue_create(&loop->entries);
+
+ return ret;
+}
+
+void event_loop_destroy(struct event_loop *loop)
+{
+ event_fd_queue_destroy(&loop->entries);
+ free(loop);
+}
+
+int event_loop_add(struct event_loop *loop, const struct event_fd *entry)
+{
+ log("Adding descriptor %d to event loop\n", entry->fd);
+
+ struct event_fd *copied = event_fd_copy(entry);
+ if (!copied)
+ return -1;
+
+ nfds_t nfds = loop->nfds + 1;
+ SIMPLEQ_INSERT_TAIL(&loop->entries, copied, entries);
+ loop->nfds = nfds;
+
+ return 0;
+}
+
+static void event_loop_remove(struct event_loop *loop, struct event_fd *entry)
+{
+ log("Removing descriptor %d from event loop\n", entry->fd);
+
+ SIMPLEQ_REMOVE(&loop->entries, entry, event_fd, entries);
+ net_close(entry->fd);
+ event_fd_destroy(entry);
+ --loop->nfds;
+}
+
+static char *append_event(char *buf, size_t sz, char *ptr, const char *event)
+{
+ if (ptr > buf)
+ ptr = stpecpy(ptr, buf + sz, ",");
+ return stpecpy(ptr, buf + sz, event);
+}
+
+static char *events_to_string(short events)
+{
+ const size_t sz = 128;
+ char *buf = calloc(1, sz);
+ if (!buf)
+ return NULL;
+
+ char *ptr = buf;
+
+ if (events & POLLNVAL)
+ ptr = append_event(buf, sz, ptr, "POLLNVAL");
+ if (events & POLLERR)
+ ptr = append_event(buf, sz, ptr, "POLLERR");
+ if (events & POLLHUP)
+ ptr = append_event(buf, sz, ptr, "POLLHUP");
+ if (events & POLLIN)
+ ptr = append_event(buf, sz, ptr, "POLLIN");
+ if (events & POLLOUT)
+ ptr = append_event(buf, sz, ptr, "POLLOUT");
+
+ return buf;
+}
+
+static struct pollfd *make_pollfds(const struct event_loop *loop)
+{
+ struct pollfd *fds = calloc(loop->nfds, sizeof(struct pollfd));
+ if (!fds) {
+ log_errno("calloc");
+ return NULL;
+ }
+
+ struct event_fd *entry = SIMPLEQ_FIRST(&loop->entries);
+ for (nfds_t i = 0; i < loop->nfds; ++i, entry = SIMPLEQ_NEXT(entry, entries)) {
+ fds[i].fd = entry->fd;
+ fds[i].events = entry->events;
+ }
+
+ log("Descriptors:\n");
+ for (nfds_t i = 0; i < loop->nfds; ++i) {
+ char *events = events_to_string(fds[i].events);
+ log(" %d (%s)\n", fds[i].fd, events ? events : "");
+ free(events);
+ }
+
+ return fds;
+}
+
+int event_loop_run(struct event_loop *loop)
+{
+ struct pollfd *fds = make_pollfds(loop);
+ if (!fds)
+ return -1;
+
+ int ret = poll(fds, loop->nfds, -1);
+ if (ret < 0) {
+ log_errno("poll");
+ return ret;
+ }
+ ret = 0;
+
+ struct event_fd *entry = SIMPLEQ_FIRST(&loop->entries);
+ for (nfds_t i = 0; i < loop->nfds; ++i) {
+ struct event_fd *next = SIMPLEQ_NEXT(entry, entries);
+
+ if (!fds[i].revents)
+ goto next;
+
+ char *events = events_to_string(fds[i].revents);
+ log("Descriptor %d is ready: %s\n", fds[i].fd, events ? events : "");
+ free(events);
+
+ /* Execute all handlers but notice if any of them fail. */
+ const int handler_ret = entry->handler(loop, fds[i].fd, fds[i].revents, entry->arg);
+ switch (handler_ret) {
+ case 0:
+ goto next;
+ case EVENT_LOOP_REMOVE:
+ goto remove;
+ default:
+ break;
+ }
+
+ remove:
+ event_loop_remove(loop, entry);
+ goto next;
+
+ next:
+ entry = next;
+ continue;
+ }
+
+ free(fds);
+ return ret;
+}
diff --git a/src/event_loop.h b/src/event_loop.h
new file mode 100644
index 0000000..eb5ff05
--- /dev/null
+++ b/src/event_loop.h
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2023 Egor Tensin <Egor.Tensin@gmail.com>
+ * This file is part of the "cimple" project.
+ * For details, see https://github.com/egor-tensin/cimple.
+ * Distributed under the MIT License.
+ */
+
+#ifndef __EVENT_LOOP_H__
+#define __EVENT_LOOP_H__
+
+#include <poll.h>
+#include <sys/queue.h>
+
+struct event_loop;
+
+int event_loop_create(struct event_loop **);
+void event_loop_destroy(struct event_loop *);
+
+int event_loop_run(struct event_loop *);
+
+#define EVENT_LOOP_REMOVE 1
+
+typedef int (*event_loop_handler)(struct event_loop *, int fd, short revents, void *arg);
+
+struct event_fd {
+ int fd;
+ short events;
+ event_loop_handler handler;
+ void *arg;
+
+ SIMPLEQ_ENTRY(event_fd) entries;
+};
+
+int event_loop_add(struct event_loop *, const struct event_fd *);
+
+#endif
diff --git a/src/server.c b/src/server.c
index a5319e0..2839a61 100644
--- a/src/server.c
+++ b/src/server.c
@@ -9,6 +9,7 @@
#include "command.h"
#include "compiler.h"
#include "const.h"
+#include "event_loop.h"
#include "log.h"
#include "msg.h"
#include "run_queue.h"
@@ -27,6 +28,11 @@ struct server {
int stopping;
+ struct cmd_dispatcher *cmd_dispatcher;
+
+ struct event_loop *event_loop;
+ int signalfd;
+
struct worker_queue worker_queue;
struct run_queue run_queue;
@@ -67,8 +73,10 @@ static void server_notify(struct server *server)
pthread_errno_if(pthread_cond_signal(&server->server_cv), "pthread_cond_signal");
}
-static int server_set_stopping(struct server *server)
+static int server_set_stopping(UNUSED struct event_loop *loop, UNUSED int fd, UNUSED short revents,
+ void *_server)
{
+ struct server *server = (struct server *)_server;
int ret = 0;
ret = server_lock(server);
@@ -117,7 +125,7 @@ static int server_enqueue_run(struct server *server, struct run *run)
return ret;
run_queue_add_last(&server->run_queue, run);
- log("Added a new CI run for repository %s to the queue\n", run_get_url(run));
+ log("Added a new run for repository %s to the queue\n", run_get_url(run));
server_notify(server);
server_unlock(server);
@@ -147,25 +155,26 @@ static int server_assign_run(struct server *server)
int ret = 0;
struct run *run = run_queue_remove_first(&server->run_queue);
- log("Removed a CI run for repository %s from the queue\n", run_get_url(run));
+ log("Removed run for repository %s from the queue\n", run_get_url(run));
struct worker *worker = worker_queue_remove_first(&server->worker_queue);
log("Removed worker %d from the queue\n", worker_get_fd(worker));
const char *argv[] = {CMD_RUN, run_get_url(run), run_get_rev(run), NULL};
-
ret = msg_talk_argv(worker_get_fd(worker), argv, NULL);
+
if (ret < 0) {
- /* Failed to communicate with the worker, requeue the run
- * and forget about the worker. */
- worker_destroy(worker);
+ log("Failed to assign run for repository %s to worker %d, requeueing\n",
+ run_get_url(run), worker_get_fd(worker));
run_queue_add_first(&server->run_queue, run);
- return ret;
+ } else {
+ log("Assigned run for repository %s to worker %d\n", run_get_url(run),
+ worker_get_fd(worker));
+ run_destroy(run);
}
- /* Send the run to the worker, forget about both of them for a while. */
worker_destroy(worker);
- run_destroy(run);
+
return ret;
}
@@ -198,15 +207,89 @@ exit:
return NULL;
}
-int server_create(struct server **_server, const struct settings *settings)
+static int handle_cmd_new_worker(UNUSED const struct msg *request, UNUSED struct msg **response,
+ void *_ctx)
{
- struct storage_settings storage_settings;
+ struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx;
+ struct server *server = (struct server *)ctx->arg;
+ int client_fd = ctx->fd;
+
+ struct worker *worker = NULL;
+ int ret = 0;
+
+ ret = worker_create(&worker, client_fd);
+ if (ret < 0)
+ return ret;
+
+ ret = server_enqueue_worker(server, worker);
+ if (ret < 0)
+ goto destroy_worker;
+
+ return ret;
+
+destroy_worker:
+ worker_destroy(worker);
+
+ return ret;
+}
+
+static int handle_cmd_run(const struct msg *request, struct msg **response, void *_ctx)
+{
+ struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx;
+ struct server *server = (struct server *)ctx->arg;
+ struct run *run = NULL;
+
int ret = 0;
- ret = signal_handle_stops();
+ ret = run_from_msg(&run, request);
if (ret < 0)
return ret;
+ ret = msg_success(response);
+ if (ret < 0)
+ goto destroy_run;
+
+ ret = server_enqueue_run(server, run);
+ if (ret < 0)
+ goto free_response;
+
+ return ret;
+
+free_response:
+ msg_free(*response);
+ *response = NULL;
+
+destroy_run:
+ run_destroy(run);
+
+ return ret;
+}
+
+static int handle_cmd_complete(UNUSED const struct msg *request, UNUSED struct msg **response,
+ void *_ctx)
+{
+ struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx;
+ int client_fd = ctx->fd;
+ int ret = 0;
+
+ log("Received a \"run complete\" message from worker %d\n", client_fd);
+
+ return ret;
+}
+
+static struct cmd_desc commands[] = {
+ {CMD_NEW_WORKER, handle_cmd_new_worker},
+ {CMD_RUN, handle_cmd_run},
+ {CMD_COMPLETE, handle_cmd_complete},
+};
+
+static const size_t numof_commands = sizeof(commands) / sizeof(commands[0]);
+
+int server_create(struct server **_server, const struct settings *settings)
+{
+ struct storage_settings storage_settings;
+ int ret = 0;
+
struct server *server = malloc(sizeof(struct server));
if (!server) {
log_errno("malloc");
@@ -227,6 +310,24 @@ int server_create(struct server **_server, const struct settings *settings)
server->stopping = 0;
+ ret = cmd_dispatcher_create(&server->cmd_dispatcher, commands, numof_commands, server);
+ if (ret < 0)
+ goto destroy_cv;
+
+ ret = event_loop_create(&server->event_loop);
+ if (ret < 0)
+ goto destroy_cmd_dispatcher;
+
+ ret = signalfd_listen_for_stops();
+ if (ret < 0)
+ goto destroy_event_loop;
+ server->signalfd = ret;
+
+ ret = signalfd_add_to_event_loop(server->signalfd, server->event_loop, server_set_stopping,
+ server);
+ if (ret < 0)
+ goto close_signalfd;
+
worker_queue_create(&server->worker_queue);
run_queue_create(&server->run_queue);
@@ -239,10 +340,15 @@ int server_create(struct server **_server, const struct settings *settings)
if (ret < 0)
goto destroy_run_queue;
- ret = tcp_server_create(&server->tcp_server, settings->port);
+ ret = tcp_server_create(&server->tcp_server, settings->port, cmd_dispatcher_handle_conn,
+ server->cmd_dispatcher);
if (ret < 0)
goto destroy_storage;
+ ret = tcp_server_add_to_event_loop(server->tcp_server, server->event_loop);
+ if (ret < 0)
+ goto destroy_tcp_server;
+
ret = pthread_create(&server->main_thread, NULL, server_main_thread, server);
if (ret) {
pthread_errno(ret, "pthread_create");
@@ -263,6 +369,16 @@ destroy_run_queue:
worker_queue_destroy(&server->worker_queue);
+close_signalfd:
+ signalfd_destroy(server->signalfd);
+
+destroy_event_loop:
+ event_loop_destroy(server->event_loop);
+
+destroy_cmd_dispatcher:
+ cmd_dispatcher_destroy(server->cmd_dispatcher);
+
+destroy_cv:
pthread_errno_if(pthread_cond_destroy(&server->server_cv), "pthread_cond_destroy");
destroy_mtx:
@@ -283,128 +399,27 @@ void server_destroy(struct server *server)
storage_destroy(&server->storage);
run_queue_destroy(&server->run_queue);
worker_queue_destroy(&server->worker_queue);
+ signalfd_destroy(server->signalfd);
+ event_loop_destroy(server->event_loop);
+ cmd_dispatcher_destroy(server->cmd_dispatcher);
pthread_errno_if(pthread_cond_destroy(&server->server_cv), "pthread_cond_destroy");
pthread_errno_if(pthread_mutex_destroy(&server->server_mtx), "pthread_mutex_destroy");
free(server);
}
-static int handle_cmd_new_worker(UNUSED const struct msg *request, UNUSED struct msg **response,
- void *_ctx)
-{
- struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx;
- struct server *server = (struct server *)ctx->arg;
- int client_fd = ctx->fd;
-
- struct worker *worker = NULL;
- int ret = 0;
-
- ret = worker_create(&worker, client_fd);
- if (ret < 0)
- return ret;
-
- ret = server_enqueue_worker(server, worker);
- if (ret < 0)
- goto destroy_worker;
-
- return ret;
-
-destroy_worker:
- worker_destroy(worker);
-
- return ret;
-}
-
-static int handle_cmd_run(const struct msg *request, struct msg **response, void *_ctx)
-{
- struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx;
- struct server *server = (struct server *)ctx->arg;
- struct run *run = NULL;
-
- int ret = 0;
-
- ret = run_from_msg(&run, request);
- if (ret < 0)
- return ret;
-
- ret = msg_success(response);
- if (ret < 0)
- goto destroy_run;
-
- ret = server_enqueue_run(server, run);
- if (ret < 0)
- goto free_response;
-
- return ret;
-
-free_response:
- msg_free(*response);
-
-destroy_run:
- run_destroy(run);
-
- return ret;
-}
-
-static int handle_cmd_complete(UNUSED const struct msg *request, UNUSED struct msg **response,
- void *_ctx)
-{
- struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx;
- struct server *server = (struct server *)ctx->arg;
- int client_fd = ctx->fd;
-
- struct worker *worker = NULL;
- int ret = 0;
-
- log("Received a \"run complete\" message from worker %d\n", client_fd);
-
- ret = worker_create(&worker, client_fd);
- if (ret < 0)
- return ret;
-
- ret = server_enqueue_worker(server, worker);
- if (ret < 0)
- goto destroy_worker;
-
- return ret;
-
-destroy_worker:
- worker_destroy(worker);
-
- return 0;
-}
-
-static struct cmd_desc commands[] = {
- {CMD_NEW_WORKER, handle_cmd_new_worker},
- {CMD_RUN, handle_cmd_run},
- {CMD_COMPLETE, handle_cmd_complete},
-};
-
-static const size_t numof_commands = sizeof(commands) / sizeof(commands[0]);
-
static int server_listen_thread(struct server *server)
{
- struct cmd_dispatcher *dispatcher = NULL;
int ret = 0;
- ret = cmd_dispatcher_create(&dispatcher, commands, numof_commands, server);
- if (ret < 0)
- return ret;
-
- while (!global_stop_flag) {
+ while (!server->stopping) {
log("Waiting for new connections\n");
- ret = tcp_server_accept(server->tcp_server, cmd_dispatcher_handle_conn, dispatcher);
- if (ret < 0) {
- if ((errno == EINTR || errno == EINVAL) && global_stop_flag)
- ret = 0;
- goto dispatcher_destroy;
- }
+ ret = event_loop_run(server->event_loop);
+ if (ret < 0)
+ return ret;
}
-dispatcher_destroy:
- cmd_dispatcher_destroy(dispatcher);
-
- return server_set_stopping(server);
+ return 0;
}
int server_main(struct server *server)
diff --git a/src/signal.c b/src/signal.c
index 7d00bc9..9f788b2 100644
--- a/src/signal.c
+++ b/src/signal.c
@@ -7,14 +7,25 @@
#include "signal.h"
#include "compiler.h"
+#include "event_loop.h"
+#include "file.h"
#include "log.h"
#include <signal.h>
#include <stddef.h>
#include <string.h>
+#include <sys/signalfd.h>
+#include <unistd.h>
static int stop_signals[] = {SIGINT, SIGTERM, SIGQUIT};
+static void stops_set(sigset_t *set)
+{
+ sigemptyset(set);
+ for (size_t i = 0; i < sizeof(stop_signals) / sizeof(stop_signals[0]); ++i)
+ sigaddset(set, stop_signals[i]);
+}
+
volatile sig_atomic_t global_stop_flag = 0;
static void set_global_stop_flag(UNUSED int signum)
@@ -78,11 +89,7 @@ int signal_block_all(sigset_t *old)
int signal_block_stops(void)
{
sigset_t set;
- sigemptyset(&set);
-
- for (size_t i = 0; i < sizeof(stop_signals) / sizeof(stop_signals[0]); ++i)
- sigaddset(&set, stop_signals[i]);
-
+ stops_set(&set);
return signal_set(&set, NULL);
}
@@ -90,3 +97,43 @@ int signal_restore(const sigset_t *new)
{
return signal_set(new, NULL);
}
+
+int signalfd_create(const sigset_t *set)
+{
+ sigset_t old;
+ int ret = 0;
+
+ ret = signal_set(set, &old);
+ if (ret < 0)
+ return ret;
+
+ ret = signalfd(-1, set, SFD_CLOEXEC);
+ if (ret < 0)
+ goto restore;
+
+ return ret;
+
+restore:
+ signal_set(&old, NULL);
+
+ return ret;
+}
+
+int signalfd_listen_for_stops(void)
+{
+ sigset_t set;
+ stops_set(&set);
+ return signalfd_create(&set);
+}
+
+void signalfd_destroy(int fd)
+{
+ file_close(fd);
+}
+
+int signalfd_add_to_event_loop(int fd, struct event_loop *loop, event_loop_handler handler,
+ void *arg)
+{
+ struct event_fd entry = {.fd = fd, .events = POLLIN, .handler = handler, .arg = arg};
+ return event_loop_add(loop, &entry);
+}
diff --git a/src/signal.h b/src/signal.h
index 4f1c280..e3f5897 100644
--- a/src/signal.h
+++ b/src/signal.h
@@ -8,6 +8,8 @@
#ifndef __SIGNAL_H__
#define __SIGNAL_H__
+#include "event_loop.h"
+
#include <signal.h>
extern volatile sig_atomic_t global_stop_flag;
@@ -18,4 +20,11 @@ int signal_block_stops(void);
int signal_restore(const sigset_t *new);
+int signalfd_create(const sigset_t *);
+void signalfd_destroy(int fd);
+
+int signalfd_add_to_event_loop(int fd, struct event_loop *, event_loop_handler handler, void *arg);
+
+int signalfd_listen_for_stops(void);
+
#endif
diff --git a/src/string.c b/src/string.c
new file mode 100644
index 0000000..878efeb
--- /dev/null
+++ b/src/string.c
@@ -0,0 +1,25 @@
+/*
+ * Copyright (c) 2023 Egor Tensin <Egor.Tensin@gmail.com>
+ * This file is part of the "cimple" project.
+ * For details, see https://github.com/egor-tensin/cimple.
+ * Distributed under the MIT License.
+ */
+
+#include "string.h"
+
+#include <string.h>
+
+char *stpecpy(char *dst, char *end, const char *src)
+{
+ if (!dst)
+ return NULL;
+ if (dst == end)
+ return end;
+
+ char *p = memccpy(dst, src, '\0', end - dst);
+ if (p)
+ return p - 1;
+
+ end[-1] = '\0';
+ return end;
+}
diff --git a/src/string.h b/src/string.h
new file mode 100644
index 0000000..fb77e64
--- /dev/null
+++ b/src/string.h
@@ -0,0 +1,13 @@
+/*
+ * Copyright (c) 2023 Egor Tensin <Egor.Tensin@gmail.com>
+ * This file is part of the "cimple" project.
+ * For details, see https://github.com/egor-tensin/cimple.
+ * Distributed under the MIT License.
+ */
+
+#ifndef __STRING_H__
+#define __STRING_H__
+
+char *stpecpy(char *dst, char *end, const char *src);
+
+#endif
diff --git a/src/tcp_server.c b/src/tcp_server.c
index 2e65bc5..a6263f7 100644
--- a/src/tcp_server.c
+++ b/src/tcp_server.c
@@ -6,6 +6,8 @@
*/
#include "tcp_server.h"
+#include "compiler.h"
+#include "event_loop.h"
#include "log.h"
#include "net.h"
#include "signal.h"
@@ -16,18 +18,24 @@
struct tcp_server {
int fd;
+ tcp_server_conn_handler conn_handler;
+ void *arg;
};
-int tcp_server_create(struct tcp_server **_server, const char *port)
+int tcp_server_create(struct tcp_server **_server, const char *port,
+ tcp_server_conn_handler conn_handler, void *arg)
{
int ret = 0;
- struct tcp_server *server = malloc(sizeof(struct tcp_server));
+ struct tcp_server *server = calloc(1, sizeof(struct tcp_server));
if (!server) {
log_errno("malloc");
return -1;
}
+ server->conn_handler = conn_handler;
+ server->arg = arg;
+
ret = net_bind(port);
if (ret < 0)
goto free;
@@ -50,7 +58,7 @@ void tcp_server_destroy(struct tcp_server *server)
struct child_context {
int fd;
- tcp_server_conn_handler handler;
+ tcp_server_conn_handler conn_handler;
void *arg;
};
@@ -65,7 +73,7 @@ static void *connection_thread(void *_ctx)
if (ret < 0)
goto free_ctx;
- ctx->handler(ctx->fd, ctx->arg);
+ ctx->conn_handler(ctx->fd, ctx->arg);
free_ctx:
free(ctx);
@@ -73,7 +81,7 @@ free_ctx:
return NULL;
}
-int tcp_server_accept(const struct tcp_server *server, tcp_server_conn_handler handler, void *arg)
+static int create_connection_thread(int fd, tcp_server_conn_handler conn_handler, void *arg)
{
sigset_t old_mask;
pthread_t child;
@@ -81,24 +89,20 @@ int tcp_server_accept(const struct tcp_server *server, tcp_server_conn_handler h
struct child_context *ctx = calloc(1, sizeof(*ctx));
if (!ctx) {
- log_errno("malloc");
+ log_errno("calloc");
return -1;
}
- ctx->handler = handler;
+ ctx->fd = fd;
+ ctx->conn_handler = conn_handler;
ctx->arg = arg;
- ret = net_accept(server->fd);
- if (ret < 0)
- goto free_ctx;
- ctx->fd = ret;
-
/* Block all signals (we'll unblock them later); the child thread will
* have all signals blocked initially. This allows the main thread to
* handle SIGINT/SIGTERM/etc. */
ret = signal_block_all(&old_mask);
if (ret < 0)
- goto close_conn;
+ goto free_ctx;
ret = pthread_create(&child, NULL, connection_thread, ctx);
if (ret) {
@@ -106,19 +110,53 @@ int tcp_server_accept(const struct tcp_server *server, tcp_server_conn_handler h
goto restore_mask;
}
+restore_mask:
/* Restore the previously-enabled signals for handling in the main thread. */
signal_restore(&old_mask);
return ret;
-restore_mask:
- signal_restore(&old_mask);
-
-close_conn:
- net_close(ctx->fd);
-
free_ctx:
free(ctx);
return ret;
}
+
+int tcp_server_accept(const struct tcp_server *server)
+{
+ int fd = -1, ret = 0;
+
+ ret = net_accept(server->fd);
+ if (ret < 0)
+ return ret;
+ fd = ret;
+
+ ret = create_connection_thread(fd, server->conn_handler, server->arg);
+ if (ret < 0)
+ goto close_conn;
+
+ return ret;
+
+close_conn:
+ net_close(fd);
+
+ return ret;
+}
+
+static int tcp_server_event_loop_handler(UNUSED struct event_loop *loop, UNUSED int fd,
+ UNUSED short revents, void *_server)
+{
+ struct tcp_server *server = (struct tcp_server *)_server;
+ return tcp_server_accept(server);
+}
+
+int tcp_server_add_to_event_loop(struct tcp_server *server, struct event_loop *loop)
+{
+ struct event_fd entry = {
+ .fd = server->fd,
+ .events = POLLIN,
+ .handler = tcp_server_event_loop_handler,
+ .arg = server,
+ };
+ return event_loop_add(loop, &entry);
+}
diff --git a/src/tcp_server.h b/src/tcp_server.h
index 0f377d8..f96e3f1 100644
--- a/src/tcp_server.h
+++ b/src/tcp_server.h
@@ -8,12 +8,17 @@
#ifndef __TCP_SERVER_H__
#define __TCP_SERVER_H__
+#include "event_loop.h"
+
struct tcp_server;
-int tcp_server_create(struct tcp_server **, const char *port);
+typedef int (*tcp_server_conn_handler)(int conn_fd, void *arg);
+
+int tcp_server_create(struct tcp_server **, const char *port, tcp_server_conn_handler, void *arg);
void tcp_server_destroy(struct tcp_server *);
-typedef int (*tcp_server_conn_handler)(int conn_fd, void *arg);
-int tcp_server_accept(const struct tcp_server *, tcp_server_conn_handler, void *arg);
+int tcp_server_accept(const struct tcp_server *);
+
+int tcp_server_add_to_event_loop(struct tcp_server *, struct event_loop *);
#endif
diff --git a/src/worker.c b/src/worker.c
index d7ad310..8ecaed8 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -10,74 +10,78 @@
#include "command.h"
#include "compiler.h"
#include "const.h"
+#include "event_loop.h"
#include "git.h"
#include "log.h"
#include "msg.h"
+#include "net.h"
#include "process.h"
#include "run_queue.h"
#include "signal.h"
#include <stdlib.h>
+#include <string.h>
struct worker {
- int dummy;
-};
+ struct settings *settings;
-int worker_create(struct worker **_worker)
-{
- int ret = 0;
+ int stopping;
- ret = signal_handle_stops();
- if (ret < 0)
- return ret;
+ struct cmd_dispatcher *cmd_dispatcher;
- struct worker *worker = malloc(sizeof(struct worker));
- if (!worker) {
+ struct event_loop *event_loop;
+ int signalfd;
+};
+
+static struct settings *worker_settings_copy(const struct settings *src)
+{
+ struct settings *result = malloc(sizeof(*src));
+ if (!result) {
log_errno("malloc");
- return -1;
+ return NULL;
}
- ret = libgit_init();
- if (ret < 0)
- goto free;
-
- *_worker = worker;
- return ret;
+ result->host = strdup(src->host);
+ if (!result->host) {
+ log_errno("strdup");
+ goto free_result;
+ }
-free:
- free(worker);
+ result->port = strdup(src->port);
+ if (!result->port) {
+ log_errno("strdup");
+ goto free_host;
+ }
- return ret;
-}
+ return result;
-void worker_destroy(struct worker *worker)
-{
- log("Shutting down\n");
+free_host:
+ free(result->host);
- libgit_shutdown();
- free(worker);
-}
+free_result:
+ free(result);
-static int worker_send_to_server(const struct settings *settings, const struct msg *request,
- struct msg **response)
-{
- return msg_connect_and_talk(settings->host, settings->port, request, response);
+ return NULL;
}
-static int worker_send_to_server_argv(const struct settings *settings, const char **argv,
- struct msg **response)
+static void worker_settings_destroy(struct settings *settings)
{
- return msg_connect_and_talk_argv(settings->host, settings->port, argv, response);
+ free(settings->port);
+ free(settings->host);
+ free(settings);
}
-static int worker_send_new_worker(const struct settings *settings, struct msg **task)
+static int worker_set_stopping(UNUSED struct event_loop *loop, UNUSED int fd, UNUSED short revents,
+ void *_worker)
{
- static const char *argv[] = {CMD_NEW_WORKER, NULL};
- return worker_send_to_server_argv(settings, argv, task);
+ struct worker *worker = (struct worker *)_worker;
+ worker->stopping = 1;
+ return 0;
}
-static int msg_run_handler(const struct msg *request, struct msg **response, UNUSED void *ctx)
+static int worker_handle_run(const struct msg *request, UNUSED struct msg **response, void *_ctx)
{
+ struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx;
struct run *run = NULL;
struct proc_output result;
int ret = 0;
@@ -96,11 +100,17 @@ static int msg_run_handler(const struct msg *request, struct msg **response, UNU
proc_output_dump(&result);
+ struct worker *worker = (struct worker *)ctx->arg;
static const char *argv[] = {CMD_COMPLETE, NULL};
- ret = msg_from_argv(response, argv);
+
+ ret = msg_connect_and_talk_argv(worker->settings->host, worker->settings->port, argv, NULL);
if (ret < 0)
goto free_output;
+ /* Close the descriptor and remove it from the event loop.
+ * poll(2) is too confusing, honestly. */
+ ret = EVENT_LOOP_REMOVE;
+
free_output:
proc_output_free(&result);
@@ -110,49 +120,111 @@ free_output:
}
static struct cmd_desc commands[] = {
- {CMD_RUN, msg_run_handler},
+ {CMD_RUN, worker_handle_run},
};
static const size_t numof_commands = sizeof(commands) / sizeof(commands[0]);
-int worker_main(UNUSED struct worker *worker, const struct settings *settings)
+int worker_create(struct worker **_worker, const struct settings *settings)
{
- struct msg *task = NULL;
- struct cmd_dispatcher *dispatcher = NULL;
int ret = 0;
- ret = cmd_dispatcher_create(&dispatcher, commands, numof_commands, NULL);
+ struct worker *worker = malloc(sizeof(struct worker));
+ if (!worker) {
+ log_errno("malloc");
+ return -1;
+ }
+
+ worker->settings = worker_settings_copy(settings);
+ if (!worker->settings) {
+ ret = -1;
+ goto free;
+ }
+
+ worker->stopping = 0;
+
+ ret = cmd_dispatcher_create(&worker->cmd_dispatcher, commands, numof_commands, worker);
if (ret < 0)
- return ret;
+ goto free_settings;
- log("Waiting for a new command\n");
+ ret = event_loop_create(&worker->event_loop);
+ if (ret < 0)
+ goto destroy_cmd_dispatcher;
- ret = worker_send_new_worker(settings, &task);
- if (ret < 0) {
- if ((errno == EINTR || errno == EINVAL) && global_stop_flag)
- ret = 0;
- goto dispatcher_destroy;
- }
+ ret = signalfd_listen_for_stops();
+ if (ret < 0)
+ goto destroy_event_loop;
+ worker->signalfd = ret;
+
+ ret = signalfd_add_to_event_loop(worker->signalfd, worker->event_loop, worker_set_stopping,
+ worker);
+ if (ret < 0)
+ goto close_signalfd;
+
+ ret = libgit_init();
+ if (ret < 0)
+ goto close_signalfd;
+
+ *_worker = worker;
+ return ret;
+
+close_signalfd:
+ signalfd_destroy(worker->signalfd);
+
+destroy_event_loop:
+ event_loop_destroy(worker->event_loop);
+
+destroy_cmd_dispatcher:
+ cmd_dispatcher_destroy(worker->cmd_dispatcher);
+
+free_settings:
+ worker_settings_destroy(worker->settings);
+
+free:
+ free(worker);
+
+ return ret;
+}
- while (!global_stop_flag) {
- struct msg *result = NULL;
+void worker_destroy(struct worker *worker)
+{
+ log("Shutting down\n");
- ret = cmd_dispatcher_handle(dispatcher, task, &result);
- msg_free(task);
+ libgit_shutdown();
+ signalfd_destroy(worker->signalfd);
+ event_loop_destroy(worker->event_loop);
+ cmd_dispatcher_destroy(worker->cmd_dispatcher);
+ worker_settings_destroy(worker->settings);
+ free(worker);
+}
+
+int worker_main(struct worker *worker)
+{
+ int ret = 0;
+
+ while (!worker->stopping) {
+ ret = net_connect(worker->settings->host, worker->settings->port);
if (ret < 0)
- goto dispatcher_destroy;
-
- ret = worker_send_to_server(settings, result, &task);
- msg_free(result);
- if (ret < 0) {
- if ((errno == EINTR || errno == EINVAL) && global_stop_flag)
- ret = 0;
- goto dispatcher_destroy;
- }
- }
+ return ret;
+
+ const int fd = ret;
+ static const char *argv[] = {CMD_NEW_WORKER, NULL};
+
+ ret = msg_send_argv(fd, argv);
+ if (ret < 0)
+ return ret;
-dispatcher_destroy:
- cmd_dispatcher_destroy(dispatcher);
+ ret = cmd_dispatcher_add_to_event_loop(worker->cmd_dispatcher, worker->event_loop,
+ fd);
+ if (ret < 0)
+ return ret;
+
+ log("Waiting for a new command\n");
+
+ ret = event_loop_run(worker->event_loop);
+ if (ret < 0)
+ return ret;
+ }
return ret;
}
diff --git a/src/worker.h b/src/worker.h
index bf603df..8eb325c 100644
--- a/src/worker.h
+++ b/src/worker.h
@@ -15,9 +15,9 @@ struct settings {
struct worker;
-int worker_create(struct worker **);
+int worker_create(struct worker **, const struct settings *);
void worker_destroy(struct worker *);
-int worker_main(struct worker *, const struct settings *);
+int worker_main(struct worker *);
#endif
diff --git a/src/worker_main.c b/src/worker_main.c
index b9e9b9b..dcfee47 100644
--- a/src/worker_main.c
+++ b/src/worker_main.c
@@ -70,11 +70,11 @@ int main(int argc, char *argv[])
if (ret < 0)
return ret;
- ret = worker_create(&worker);
+ ret = worker_create(&worker, &settings);
if (ret < 0)
return ret;
- ret = worker_main(worker, &settings);
+ ret = worker_main(worker);
if (ret < 0)
goto destroy_worker;