diff options
author | Egor Tensin <Egor.Tensin@gmail.com> | 2022-08-25 16:58:38 +0200 |
---|---|---|
committer | Egor Tensin <Egor.Tensin@gmail.com> | 2022-08-26 04:05:02 +0200 |
commit | 532b3ae9b5cd8609237e04db768cc1f750d8631d (patch) | |
tree | f65253a6ce9970d1d93e6bb6c65758d6fa98373a /src/worker.c | |
parent | cmake: ignore unused parameters for now (diff) | |
download | cimple-532b3ae9b5cd8609237e04db768cc1f750d8631d.tar.gz cimple-532b3ae9b5cd8609237e04db768cc1f750d8631d.zip |
add some more code
This adds a basic "worker" program.
You can now do something like
./server &
./worker &
./client ci_run URL REV
and the server should pass a message to worker, after which it should
clone the repository at URL, checkout REV, and try to run the CI script.
It's extremely unfinished: I need to sort out the graceful shutdown, how
the server manages workers, etc.
Diffstat (limited to 'src/worker.c')
-rw-r--r-- | src/worker.c | 241 |
1 files changed, 241 insertions, 0 deletions
diff --git a/src/worker.c b/src/worker.c new file mode 100644 index 0000000..687e647 --- /dev/null +++ b/src/worker.c @@ -0,0 +1,241 @@ +#include "worker.h" +#include "ci.h" +#include "git.h" +#include "log.h" +#include "msg.h" +#include "net.h" + +#include <errno.h> +#include <pthread.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> + +int worker_create(struct worker *worker, const struct settings *settings) +{ + int ret = 0; + + ret = libgit_init(); + if (ret < 0) + return ret; + + ret = net_connect(settings->host, settings->port); + if (ret < 0) + goto git_shutdown; + worker->fd = ret; + + ret = pthread_mutex_init(&worker->task_mtx, NULL); + if (ret < 0) { + print_errno("pthread_mutex_init"); + goto close; + } + + worker->task_active = 0; + return ret; + +close: + close(worker->fd); + +git_shutdown: + libgit_shutdown(); + + return ret; +} + +void worker_destroy(struct worker *worker) +{ + if (pthread_mutex_destroy(&worker->task_mtx)) + print_errno("pthread_mutex_destroy"); + close(worker->fd); + libgit_shutdown(); +} + +static int msg_send_new_worker(const struct worker *worker) +{ + static char *argv[] = {"new_worker", NULL}; + struct msg msg; + int response, ret = 0; + + ret = msg_from_argv(&msg, argv); + if (ret < 0) + return ret; + + ret = msg_send_and_wait(worker->fd, &msg, &response); + if (ret < 0) + goto free_msg; + + if (response < 0) { + print_error("Failed to register\n"); + ret = response; + goto free_msg; + } + +free_msg: + msg_free(&msg); + + return ret; +} + +typedef int (*worker_task_body)(const struct msg *); + +static int msg_body_ci_run(const struct msg *msg) +{ + const char *url = msg->argv[1]; + const char *rev = msg->argv[2]; + int ec = 0; + + return ci_run_git_repo(url, rev, &ec); +} + +typedef worker_task_body (*worker_msg_parser)(const struct msg *); + +static worker_task_body parse_msg_ci_run(const struct msg *msg) +{ + if (msg->argc != 3) { + print_error("Invalid number of arguments for a message: %d\n", msg->argc); + return NULL; + } + + return msg_body_ci_run; +} + +struct worker_msg_parser_it { + const char *msg; + worker_msg_parser parser; +}; + +struct worker_msg_parser_it worker_msg_parsers[] = { + {"ci_run", parse_msg_ci_run}, +}; + +struct worker_task_context { + struct msg *msg; + worker_task_body body; +}; + +static void *worker_task_wrapper(void *_ctx) +{ + struct worker_task_context *ctx = (struct worker_task_context *)_ctx; + + ctx->body(ctx->msg); + msg_free(ctx->msg); + free(ctx->msg); + free(ctx); + return NULL; +} + +static int worker_schedule_task(struct worker *worker, const struct msg *msg, worker_task_body body) +{ + struct worker_task_context *ctx; + int ret = 0; + + ctx = malloc(sizeof(*ctx)); + if (!ctx) { + print_errno("malloc"); + return -1; + } + ctx->body = body; + + ctx->msg = msg_copy(msg); + if (!ctx->msg) { + ret = -1; + goto free_ctx; + } + + ret = pthread_create(&worker->task, NULL, worker_task_wrapper, ctx); + if (ret < 0) { + print_errno("pthread_create"); + goto free_msg; + } + worker->task_active = 1; + + return ret; + +free_msg: + msg_free(ctx->msg); + free(ctx->msg); + +free_ctx: + free(ctx); + + return ret; +} + +static int worker_msg_handler(struct worker *worker, const struct msg *msg) +{ + if (worker->task_active) { + int ret = pthread_tryjoin_np(worker->task, NULL); + switch (ret) { + case 0: + worker->task_active = 0; + break; + case EBUSY: + break; + default: + print_errno("pthread_tryjoin_np"); + return ret; + } + } + + if (worker->task_active) { + print_error("Worker is busy\n"); + return -1; + } + + if (msg->argc == 0) { + print_error("Received an empty message\n"); + return -1; + } + + size_t numof_parsers = sizeof(worker_msg_parsers) / sizeof(worker_msg_parsers[0]); + + for (size_t i = 0; i < numof_parsers; ++i) { + const struct worker_msg_parser_it *it = &worker_msg_parsers[i]; + if (strcmp(it->msg, msg->argv[0])) + continue; + + worker_task_body body = it->parser(msg); + if (!body) + return -1; + + return worker_schedule_task(worker, msg, body); + } + + return msg_dump_unknown(msg); +} + +static int worker_msg_handler_lock(const struct msg *msg, void *_worker) +{ + struct worker *worker = (struct worker *)_worker; + int ret = 0; + + ret = pthread_mutex_lock(&worker->task_mtx); + if (ret < 0) { + print_errno("pthread_mutex_lock"); + return -1; + } + + ret = worker_msg_handler(worker, msg); + + if (pthread_mutex_unlock(&worker->task_mtx)) + print_errno("pthread_mutex_unlock"); + + return ret; +} + +int worker_main(struct worker *worker, int, char *[]) +{ + int ret = 0; + + ret = msg_send_new_worker(worker); + if (ret < 0) + return ret; + + while (1) { + print_log("Waiting for a new command\n"); + + ret = msg_recv_and_handle(worker->fd, worker_msg_handler_lock, worker); + if (ret < 0) + return ret; + } +} |