aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/src/worker.c
diff options
context:
space:
mode:
authorEgor Tensin <Egor.Tensin@gmail.com>2023-05-15 15:31:33 +0200
committerEgor Tensin <Egor.Tensin@gmail.com>2023-05-15 15:32:17 +0200
commit7cd83e15139447156ca915ce2d9d19295c146d56 (patch)
tree277f35dcc6c59d93cf5ef0232daa525079342f97 /src/worker.c
parentcommand: adjust order of parameters to handlers (diff)
downloadcimple-7cd83e15139447156ca915ce2d9d19295c146d56.tar.gz
cimple-7cd83e15139447156ca915ce2d9d19295c146d56.zip
rework server-worker communication
OK, this is a major rework. * tcp_server: connection threads are not detached anymore, the caller has to clean them up. This was done so that the server can clean up the threads cleanly. * run_queue: simple refactoring, run_queue_entry is called just run now. * server: worker threads are now killed when a run is assigned to a worker. * worker: the connection to server is no longer persistent. A worker sends "new-worker", waits for a task, closes the connection, and when it's done, sends the "complete" message and waits for a new task. This is supposed to improve resilience, since the worker-server connections don't have to be maintained while the worker is doing a CI run.
Diffstat (limited to '')
-rw-r--r--src/worker.c116
1 files changed, 53 insertions, 63 deletions
diff --git a/src/worker.c b/src/worker.c
index 2a258e5..5762549 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -13,27 +13,24 @@
#include "git.h"
#include "log.h"
#include "msg.h"
-#include "net.h"
#include "process.h"
+#include "run_queue.h"
#include "signal.h"
-#include <pthread.h>
#include <stdlib.h>
-#include <unistd.h>
struct worker {
- int fd;
-
- /* TODO: these are not used, but they should be! */
- pthread_mutex_t task_mtx;
- pthread_t task;
- int task_active;
+ int dummy;
};
-int worker_create(struct worker **_worker, const struct settings *settings)
+int worker_create(struct worker **_worker)
{
int ret = 0;
+ ret = signal_install_global_handler();
+ if (ret < 0)
+ return ret;
+
struct worker *worker = malloc(sizeof(struct worker));
if (!worker) {
log_errno("malloc");
@@ -44,17 +41,9 @@ int worker_create(struct worker **_worker, const struct settings *settings)
if (ret < 0)
goto free;
- ret = net_connect(settings->host, settings->port);
- if (ret < 0)
- goto git_shutdown;
- worker->fd = ret;
-
*_worker = worker;
return ret;
-git_shutdown:
- libgit_shutdown();
-
free:
free(worker);
@@ -65,14 +54,19 @@ void worker_destroy(struct worker *worker)
{
log("Shutting down\n");
- log_errno_if(close(worker->fd), "close");
libgit_shutdown();
free(worker);
}
-static int msg_send_new_worker(const struct worker *worker)
+static int worker_send_to_server(const struct settings *settings, const struct msg *request,
+ struct msg **response)
+{
+ return msg_connect_and_communicate(settings->host, settings->port, request, response);
+}
+
+static int worker_send_to_server_argv(const struct settings *settings, const char **argv,
+ struct msg **response)
{
- static const char *argv[] = {CMD_NEW_WORKER, NULL};
struct msg *msg = NULL;
int ret = 0;
@@ -80,84 +74,80 @@ static int msg_send_new_worker(const struct worker *worker)
if (ret < 0)
return ret;
- ret = msg_send(worker->fd, msg);
+ ret = worker_send_to_server(settings, msg, response);
msg_free(msg);
return ret;
}
-static int msg_ci_run_do(const char *url, const char *rev, struct proc_output *result)
+static int worker_send_new_worker(const struct settings *settings, struct msg **task)
{
- int ret = 0;
-
- ret = ci_run_git_repo(url, rev, result);
- if (ret < 0) {
- log_err("Run failed with an error\n");
- return ret;
- }
-
- proc_output_dump(result);
- return 0;
+ static const char *argv[] = {CMD_NEW_WORKER, NULL};
+ return worker_send_to_server_argv(settings, argv, task);
}
-static int msg_ci_run_handler(UNUSED int conn_fd, const struct msg *request, struct msg **response,
- UNUSED void *_worker)
+static int msg_run_handler(const struct msg *request, struct msg **response, UNUSED void *ctx)
{
+ struct run *run = NULL;
struct proc_output result;
int ret = 0;
- if (msg_get_length(request) != 3) {
- log_err("Invalid number of arguments for a message\n");
- msg_dump(request);
- return -1;
+ ret = run_from_msg(&run, request);
+ if (ret < 0)
+ return ret;
+
+ proc_output_init(&result);
+
+ ret = ci_run_git_repo(run_get_url(run), run_get_rev(run), &result);
+ if (ret < 0) {
+ log_err("Run failed with an error\n");
+ goto free_output;
}
- const char **words = msg_get_words(request);
+ proc_output_dump(&result);
- proc_output_init(&result);
- ret = msg_ci_run_do(words[1], words[2], &result);
+ static const char *argv[] = {CMD_COMPLETE, NULL};
+ ret = msg_from_argv(response, argv);
+ if (ret < 0)
+ goto free_output;
+
+free_output:
proc_output_free(&result);
- if (ret < 0)
- return ret;
+ run_destroy(run);
- return msg_success(response);
+ return ret;
}
static struct cmd_desc cmds[] = {
- {CMD_RUN, msg_ci_run_handler},
+ {CMD_RUN, msg_run_handler},
};
-int worker_main(struct worker *worker, UNUSED int argc, UNUSED char *argv[])
+int worker_main(UNUSED struct worker *worker, const struct settings *settings)
{
+ struct msg *task = NULL;
struct cmd_dispatcher *dispatcher = NULL;
int ret = 0;
- ret = signal_install_global_handler();
+ ret = cmd_dispatcher_create(&dispatcher, cmds, sizeof(cmds) / sizeof(cmds[0]), NULL);
if (ret < 0)
return ret;
- ret = msg_send_new_worker(worker);
- if (ret < 0)
- return ret;
+ log("Waiting for a new command\n");
- ret = cmd_dispatcher_create(&dispatcher, cmds, sizeof(cmds) / sizeof(cmds[0]), worker);
+ ret = worker_send_new_worker(settings, &task);
if (ret < 0)
- return ret;
+ goto dispatcher_destroy;
while (!global_stop_flag) {
- struct msg *request = NULL;
+ struct msg *result = NULL;
- log("Waiting for a new command\n");
-
- ret = msg_recv(worker->fd, &request);
- if (ret < 0) {
- if (errno == EINVAL && global_stop_flag)
- ret = 0;
+ ret = cmd_dispatcher_handle(dispatcher, task, &result);
+ msg_free(task);
+ if (ret < 0)
goto dispatcher_destroy;
- }
- ret = cmd_dispatcher_handle_msg(dispatcher, worker->fd, request);
- msg_free(request);
+ ret = worker_send_to_server(settings, result, &task);
+ msg_free(result);
if (ret < 0)
goto dispatcher_destroy;
}