diff options
author | Egor Tensin <Egor.Tensin@gmail.com> | 2022-08-25 16:58:38 +0200 |
---|---|---|
committer | Egor Tensin <Egor.Tensin@gmail.com> | 2022-08-26 04:05:02 +0200 |
commit | 532b3ae9b5cd8609237e04db768cc1f750d8631d (patch) | |
tree | f65253a6ce9970d1d93e6bb6c65758d6fa98373a /src/server.c | |
parent | cmake: ignore unused parameters for now (diff) | |
download | cimple-532b3ae9b5cd8609237e04db768cc1f750d8631d.tar.gz cimple-532b3ae9b5cd8609237e04db768cc1f750d8631d.zip |
add some more code
This adds a basic "worker" program.
You can now do something like
./server &
./worker &
./client ci_run URL REV
and the server should pass a message to worker, after which it should
clone the repository at URL, checkout REV, and try to run the CI script.
It's extremely unfinished: I need to sort out the graceful shutdown, how
the server manages workers, etc.
Diffstat (limited to '')
-rw-r--r-- | src/server.c | 249 |
1 files changed, 240 insertions, 9 deletions
diff --git a/src/server.c b/src/server.c index 3e9f080..e285a9e 100644 --- a/src/server.c +++ b/src/server.c @@ -1,11 +1,97 @@ #include "server.h" +#include "ci_queue.h" +#include "log.h" #include "msg.h" -#include "net.h" #include "tcp_server.h" +#include "worker_queue.h" -#include <stdio.h> +#include <pthread.h> +#include <string.h> #include <unistd.h> +static int server_has_runs_and_workers(const struct server *server) +{ + return !ci_queue_is_empty(&server->ci_queue) && + !worker_queue_is_empty(&server->worker_queue); +} + +static int server_scheduler_iteration(struct server *server) +{ + struct worker_queue_entry *worker; + struct ci_queue_entry *ci_run; + struct msg msg; + int response, ret = 0; + + worker = worker_queue_pop(&server->worker_queue); + ci_run = ci_queue_pop(&server->ci_queue); + + char *argv[] = {"ci_run", ci_run->url, ci_run->rev, NULL}; + + ret = msg_from_argv(&msg, argv); + if (ret < 0) + goto requeue_ci_run; + + ret = msg_send_and_wait(worker->fd, &msg, &response); + if (ret < 0) + goto free_msg; + + if (response < 0) { + print_error("Failed to schedule a CI run\n"); + } + + msg_free(&msg); + + ci_queue_entry_destroy(ci_run); + + /* FIXME: Don't mark worker as free! */ + worker_queue_push_head(&server->worker_queue, worker); + + return 0; + +free_msg: + msg_free(&msg); + +requeue_ci_run: + ci_queue_push_head(&server->ci_queue, ci_run); + + worker_queue_push_head(&server->worker_queue, worker); + + return ret; +} + +static void *server_scheduler(void *_server) +{ + struct server *server = (struct server *)_server; + int ret = 0; + + ret = pthread_mutex_lock(&server->scheduler_mtx); + if (ret < 0) { + print_errno("pthread_mutex_lock"); + goto exit; + } + + while (1) { + while (!server_has_runs_and_workers(server)) { + ret = pthread_cond_wait(&server->scheduler_cv, &server->scheduler_mtx); + if (ret < 0) { + print_errno("pthread_cond_wait"); + goto unlock; + } + } + + ret = server_scheduler_iteration(server); + if (ret < 0) + goto unlock; + } + +unlock: + if (pthread_mutex_unlock(&server->scheduler_mtx)) + print_errno("pthread_mutex_unlock"); + +exit: + return NULL; +} + int server_create(struct server *server, const struct settings *settings) { int ret = 0; @@ -14,31 +100,176 @@ int server_create(struct server *server, const struct settings *settings) if (ret < 0) return ret; - return 0; + worker_queue_create(&server->worker_queue); + + ci_queue_create(&server->ci_queue); + + ret = pthread_mutex_init(&server->scheduler_mtx, NULL); + if (ret < 0) { + print_errno("pthread_mutex_init"); + goto destroy_tcp_server; + } + + ret = pthread_cond_init(&server->scheduler_cv, NULL); + if (ret < 0) { + print_errno("pthread_cond_init"); + goto destroy_scheduler_mtx; + } + + ret = pthread_create(&server->scheduler, NULL, server_scheduler, server); + if (ret < 0) { + print_errno("pthread_create"); + goto destroy_scheduler_cv; + } + + return ret; + +destroy_scheduler_cv: + if (pthread_cond_destroy(&server->scheduler_cv)) + print_errno("pthread_cond_destroy"); + +destroy_scheduler_mtx: + if (pthread_mutex_destroy(&server->scheduler_mtx)) + print_errno("pthread_mutex_destroy"); + + ci_queue_destroy(&server->ci_queue); + + worker_queue_destroy(&server->worker_queue); + +destroy_tcp_server: + tcp_server_destroy(&server->tcp_server); + + return ret; } -void server_destroy(const struct server *server) +void server_destroy(struct server *server) { + if (pthread_join(server->scheduler, NULL)) + print_errno("pthread_join"); + if (pthread_cond_destroy(&server->scheduler_cv)) + print_errno("pthread_cond_destroy"); + if (pthread_mutex_destroy(&server->scheduler_mtx)) + print_errno("pthread_mutex_destroy"); + ci_queue_destroy(&server->ci_queue); + worker_queue_destroy(&server->worker_queue); tcp_server_destroy(&server->tcp_server); } -static int server_msg_handler(const struct msg *msg, void *) +struct msg_context { + struct server *server; + int client_fd; +}; + +static int msg_new_worker(const struct msg *, void *_ctx) +{ + struct msg_context *ctx = (struct msg_context *)_ctx; + return server_new_worker(ctx->server, ctx->client_fd); +} + +static int msg_ci_run(const struct msg *msg, void *_ctx) +{ + struct msg_context *ctx = (struct msg_context *)_ctx; + + if (msg->argc != 3) { + print_error("Invalid number of arguments for a message: %d\n", msg->argc); + return -1; + } + + return server_ci_run(ctx->server, msg->argv[1], msg->argv[2]); +} + +static int server_msg_handler(const struct msg *msg, void *ctx) { + if (msg->argc == 0) { + print_error("Received an empty message\n"); + return -1; + } + + if (!strcmp(msg->argv[0], "new_worker")) + return msg_new_worker(msg, ctx); + if (!strcmp(msg->argv[0], "ci_run")) + return msg_ci_run(msg, ctx); + return msg_dump_unknown(msg); } -static int server_conn_handler(int fd, void *) +static int server_conn_handler(int fd, void *server) { - return msg_recv_and_handle(fd, server_msg_handler, NULL); + struct msg_context ctx = {server, fd}; + return msg_recv_and_handle(fd, server_msg_handler, &ctx); } -int server_main(const struct server *server) +int server_main(struct server *server) { int ret = 0; while (1) { - ret = tcp_server_accept(&server->tcp_server, server_conn_handler, NULL); + ret = tcp_server_accept(&server->tcp_server, server_conn_handler, server); if (ret < 0) return ret; } } + +int server_new_worker(struct server *server, int fd) +{ + struct worker_queue_entry *entry; + int ret = 0; + + print_log("Registering a new worker\n"); + + ret = pthread_mutex_lock(&server->scheduler_mtx); + if (ret < 0) { + print_errno("pthread_mutex_lock"); + return ret; + } + + ret = worker_queue_entry_create(&entry, fd); + if (ret < 0) + goto unlock; + + worker_queue_push(&server->worker_queue, entry); + + ret = pthread_cond_signal(&server->scheduler_cv); + if (ret < 0) { + print_errno("pthread_cond_signal"); + goto unlock; + } + +unlock: + if (pthread_mutex_unlock(&server->scheduler_mtx)) + print_errno("pthread_mutex_unlock"); + + return ret; +} + +int server_ci_run(struct server *server, const char *url, const char *rev) +{ + struct ci_queue_entry *entry; + int ret = 0; + + print_log("Scheduling a new CI run for repository %s\n", url); + + ret = pthread_mutex_lock(&server->scheduler_mtx); + if (ret < 0) { + print_errno("pthread_mutex_lock"); + return ret; + } + + ret = ci_queue_entry_create(&entry, url, rev); + if (ret < 0) + goto unlock; + + ci_queue_push(&server->ci_queue, entry); + + ret = pthread_cond_signal(&server->scheduler_cv); + if (ret < 0) { + print_errno("pthread_cond_signal"); + goto unlock; + } + +unlock: + if (pthread_mutex_unlock(&server->scheduler_mtx)) + print_errno("pthread_mutex_unlock"); + + return ret; +} |