diff options
Diffstat (limited to '')
-rw-r--r-- | src/worker.c | 116 |
1 files changed, 53 insertions, 63 deletions
diff --git a/src/worker.c b/src/worker.c index 2a258e5..5762549 100644 --- a/src/worker.c +++ b/src/worker.c @@ -13,27 +13,24 @@ #include "git.h" #include "log.h" #include "msg.h" -#include "net.h" #include "process.h" +#include "run_queue.h" #include "signal.h" -#include <pthread.h> #include <stdlib.h> -#include <unistd.h> struct worker { - int fd; - - /* TODO: these are not used, but they should be! */ - pthread_mutex_t task_mtx; - pthread_t task; - int task_active; + int dummy; }; -int worker_create(struct worker **_worker, const struct settings *settings) +int worker_create(struct worker **_worker) { int ret = 0; + ret = signal_install_global_handler(); + if (ret < 0) + return ret; + struct worker *worker = malloc(sizeof(struct worker)); if (!worker) { log_errno("malloc"); @@ -44,17 +41,9 @@ int worker_create(struct worker **_worker, const struct settings *settings) if (ret < 0) goto free; - ret = net_connect(settings->host, settings->port); - if (ret < 0) - goto git_shutdown; - worker->fd = ret; - *_worker = worker; return ret; -git_shutdown: - libgit_shutdown(); - free: free(worker); @@ -65,14 +54,19 @@ void worker_destroy(struct worker *worker) { log("Shutting down\n"); - log_errno_if(close(worker->fd), "close"); libgit_shutdown(); free(worker); } -static int msg_send_new_worker(const struct worker *worker) +static int worker_send_to_server(const struct settings *settings, const struct msg *request, + struct msg **response) +{ + return msg_connect_and_communicate(settings->host, settings->port, request, response); +} + +static int worker_send_to_server_argv(const struct settings *settings, const char **argv, + struct msg **response) { - static const char *argv[] = {CMD_NEW_WORKER, NULL}; struct msg *msg = NULL; int ret = 0; @@ -80,84 +74,80 @@ static int msg_send_new_worker(const struct worker *worker) if (ret < 0) return ret; - ret = msg_send(worker->fd, msg); + ret = worker_send_to_server(settings, msg, response); msg_free(msg); return ret; } -static int msg_ci_run_do(const char *url, const char *rev, struct proc_output *result) +static int worker_send_new_worker(const struct settings *settings, struct msg **task) { - int ret = 0; - - ret = ci_run_git_repo(url, rev, result); - if (ret < 0) { - log_err("Run failed with an error\n"); - return ret; - } - - proc_output_dump(result); - return 0; + static const char *argv[] = {CMD_NEW_WORKER, NULL}; + return worker_send_to_server_argv(settings, argv, task); } -static int msg_ci_run_handler(UNUSED int conn_fd, const struct msg *request, struct msg **response, - UNUSED void *_worker) +static int msg_run_handler(const struct msg *request, struct msg **response, UNUSED void *ctx) { + struct run *run = NULL; struct proc_output result; int ret = 0; - if (msg_get_length(request) != 3) { - log_err("Invalid number of arguments for a message\n"); - msg_dump(request); - return -1; + ret = run_from_msg(&run, request); + if (ret < 0) + return ret; + + proc_output_init(&result); + + ret = ci_run_git_repo(run_get_url(run), run_get_rev(run), &result); + if (ret < 0) { + log_err("Run failed with an error\n"); + goto free_output; } - const char **words = msg_get_words(request); + proc_output_dump(&result); - proc_output_init(&result); - ret = msg_ci_run_do(words[1], words[2], &result); + static const char *argv[] = {CMD_COMPLETE, NULL}; + ret = msg_from_argv(response, argv); + if (ret < 0) + goto free_output; + +free_output: proc_output_free(&result); - if (ret < 0) - return ret; + run_destroy(run); - return msg_success(response); + return ret; } static struct cmd_desc cmds[] = { - {CMD_RUN, msg_ci_run_handler}, + {CMD_RUN, msg_run_handler}, }; -int worker_main(struct worker *worker, UNUSED int argc, UNUSED char *argv[]) +int worker_main(UNUSED struct worker *worker, const struct settings *settings) { + struct msg *task = NULL; struct cmd_dispatcher *dispatcher = NULL; int ret = 0; - ret = signal_install_global_handler(); + ret = cmd_dispatcher_create(&dispatcher, cmds, sizeof(cmds) / sizeof(cmds[0]), NULL); if (ret < 0) return ret; - ret = msg_send_new_worker(worker); - if (ret < 0) - return ret; + log("Waiting for a new command\n"); - ret = cmd_dispatcher_create(&dispatcher, cmds, sizeof(cmds) / sizeof(cmds[0]), worker); + ret = worker_send_new_worker(settings, &task); if (ret < 0) - return ret; + goto dispatcher_destroy; while (!global_stop_flag) { - struct msg *request = NULL; + struct msg *result = NULL; - log("Waiting for a new command\n"); - - ret = msg_recv(worker->fd, &request); - if (ret < 0) { - if (errno == EINVAL && global_stop_flag) - ret = 0; + ret = cmd_dispatcher_handle(dispatcher, task, &result); + msg_free(task); + if (ret < 0) goto dispatcher_destroy; - } - ret = cmd_dispatcher_handle_msg(dispatcher, worker->fd, request); - msg_free(request); + ret = worker_send_to_server(settings, result, &task); + msg_free(result); if (ret < 0) goto dispatcher_destroy; } |