diff options
Diffstat (limited to 'src/server.c')
-rw-r--r-- | src/server.c | 430 |
1 files changed, 249 insertions, 181 deletions
diff --git a/src/server.c b/src/server.c index 2e5fc18..bf6a967 100644 --- a/src/server.c +++ b/src/server.c @@ -16,6 +16,7 @@ #include "storage.h" #include "storage_sqlite.h" #include "tcp_server.h" +#include "worker_queue.h" #include <pthread.h> #include <stdlib.h> @@ -26,311 +27,373 @@ struct server { int stopping; + struct worker_queue worker_queue; + struct run_queue run_queue; + struct storage storage; - struct tcp_server *tcp_server; + pthread_t main_thread; - struct run_queue run_queue; + struct tcp_server *tcp_server; }; -int server_create(struct server **_server, const struct settings *settings) +static int server_lock(struct server *server) { - struct storage_settings storage_settings; - int ret = 0; - - struct server *server = malloc(sizeof(struct server)); - if (!server) { - log_errno("malloc"); - return -1; - } - - ret = pthread_mutex_init(&server->server_mtx, NULL); + int ret = pthread_mutex_lock(&server->server_mtx); if (ret) { - pthread_errno(ret, "pthread_mutex_init"); - goto free; + pthread_errno(ret, "pthread_mutex_lock"); + return ret; } + return ret; +} - ret = pthread_cond_init(&server->server_cv, NULL); +static void server_unlock(struct server *server) +{ + pthread_errno_if(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock"); +} + +static int server_wait(struct server *server) +{ + int ret = pthread_cond_wait(&server->server_cv, &server->server_mtx); if (ret) { - pthread_errno(ret, "pthread_cond_init"); - goto destroy_mtx; + pthread_errno(ret, "pthread_cond_wait"); + return ret; } + return ret; +} - server->stopping = 0; +static void server_notify(struct server *server) +{ + pthread_errno_if(pthread_cond_signal(&server->server_cv), "pthread_cond_signal"); +} - ret = storage_settings_create_sqlite(&storage_settings, settings->sqlite_path); - if (ret < 0) - goto destroy_cv; +static int server_set_stopping(struct server *server) +{ + int ret = 0; - ret = storage_create(&server->storage, &storage_settings); - storage_settings_destroy(&storage_settings); + ret = server_lock(server); if (ret < 0) - goto destroy_cv; + return ret; - ret = tcp_server_create(&server->tcp_server, settings->port); + server->stopping = 1; + + server_notify(server); + server_unlock(server); + return ret; +} + +static int server_has_workers(const struct server *server) +{ + return !worker_queue_is_empty(&server->worker_queue); +} + +static int server_enqueue_worker(struct server *server, struct worker *worker) +{ + int ret = 0; + + ret = server_lock(server); if (ret < 0) - goto destroy_storage; + return ret; - run_queue_create(&server->run_queue); + worker_queue_add_last(&server->worker_queue, worker); + log("Added a new worker %d to the queue\n", worker_get_fd(worker)); - *_server = server; + server_notify(server); + server_unlock(server); return ret; +} -destroy_storage: - storage_destroy(&server->storage); +static int server_has_runs(const struct server *server) +{ + return !run_queue_is_empty(&server->run_queue); +} -destroy_cv: - pthread_errno_if(pthread_cond_destroy(&server->server_cv), "pthread_cond_destroy"); +static int server_enqueue_run(struct server *server, struct run *run) +{ + int ret = 0; -destroy_mtx: - pthread_errno_if(pthread_mutex_destroy(&server->server_mtx), "pthread_mutex_destroy"); + ret = server_lock(server); + if (ret < 0) + return ret; -free: - free(server); + run_queue_add_last(&server->run_queue, run); + log("Added a new CI run for repository %s to the queue\n", run_get_url(run)); + server_notify(server); + server_unlock(server); return ret; } -void server_destroy(struct server *server) +static int server_ready_for_action(const struct server *server) { - log("Shutting down\n"); - - run_queue_destroy(&server->run_queue); - tcp_server_destroy(server->tcp_server); - storage_destroy(&server->storage); - pthread_errno_if(pthread_cond_destroy(&server->server_cv), "pthread_cond_destroy"); - pthread_errno_if(pthread_mutex_destroy(&server->server_mtx), "pthread_mutex_destroy"); - free(server); + return server->stopping || (server_has_runs(server) && server_has_workers(server)); } -static int server_has_runs(const struct server *server) +static int server_wait_for_action(struct server *server) { - return !run_queue_is_empty(&server->run_queue); + int ret = 0; + + while (!server_ready_for_action(server)) { + ret = server_wait(server); + if (ret < 0) + return ret; + } + + return ret; } -static int worker_ci_run(int fd, const struct run_queue_entry *ci_run) +static int server_assign_run(struct server *server) { - struct msg *request = NULL, *response = NULL; int ret = 0; - const char *argv[] = {CMD_RUN, run_queue_entry_get_url(ci_run), - run_queue_entry_get_rev(ci_run), NULL}; + struct run *run = run_queue_remove_first(&server->run_queue); + log("Removed a CI run for repository %s from the queue\n", run_get_url(run)); + + struct worker *worker = worker_queue_remove_first(&server->worker_queue); + log("Removed worker %d from the queue\n", worker_get_fd(worker)); + + const char *argv[] = {CMD_RUN, run_get_url(run), run_get_rev(run), NULL}; + + struct msg *request = NULL; ret = msg_from_argv(&request, argv); - if (ret < 0) + if (ret < 0) { + worker_queue_add_first(&server->worker_queue, worker); + run_queue_add_first(&server->run_queue, run); return ret; + } - ret = msg_communicate(fd, request, &response); + ret = msg_communicate(worker_get_fd(worker), request, NULL); msg_free(request); - if (ret < 0) + if (ret < 0) { + /* Failed to communicate with the worker, requeue the run + * and forget about the worker. */ + worker_destroy(worker); + run_queue_add_first(&server->run_queue, run); return ret; - - if (!msg_is_success(response)) { - log_err("Failed to schedule a CI run: worker is busy?\n"); - msg_dump(response); - goto free_response; } - /* TODO: handle the response. */ - -free_response: - msg_free(response); - + /* Send the run to the worker, forget about both of them for a while. */ + worker_destroy(worker); + run_destroy(run); return ret; } -static int worker_dequeue_run(struct server *server, struct run_queue_entry **ci_run) +static void *server_main_thread(void *_server) { + struct server *server = (struct server *)_server; int ret = 0; - ret = pthread_mutex_lock(&server->server_mtx); - if (ret) { - pthread_errno(ret, "pthread_mutex_lock"); - return ret; - } + ret = server_lock(server); + if (ret < 0) + goto exit; - while (!server->stopping && !server_has_runs(server)) { - ret = pthread_cond_wait(&server->server_cv, &server->server_mtx); - if (ret) { - pthread_errno(ret, "pthread_cond_wait"); + while (1) { + ret = server_wait_for_action(server); + if (ret < 0) goto unlock; - } - } - if (server->stopping) { - ret = -1; - goto unlock; - } + if (server->stopping) + goto unlock; - *ci_run = run_queue_remove_first(&server->run_queue); - log("Removed a CI run for repository %s from the queue\n", - run_queue_entry_get_url(*ci_run)); - goto unlock; + ret = server_assign_run(server); + if (ret < 0) + goto unlock; + } unlock: - pthread_errno_if(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock"); + server_unlock(server); - return ret; +exit: + return NULL; } -static int worker_requeue_run(struct server *server, struct run_queue_entry *ci_run) +int server_create(struct server **_server, const struct settings *settings) { + struct storage_settings storage_settings; int ret = 0; - ret = pthread_mutex_lock(&server->server_mtx); - if (ret) { - pthread_errno(ret, "pthread_mutex_lock"); + ret = signal_install_global_handler(); + if (ret < 0) return ret; + + struct server *server = malloc(sizeof(struct server)); + if (!server) { + log_errno("malloc"); + return -1; } - run_queue_add_first(&server->run_queue, ci_run); - log("Requeued a CI run for repository %s\n", run_queue_entry_get_url(ci_run)); + ret = pthread_mutex_init(&server->server_mtx, NULL); + if (ret) { + pthread_errno(ret, "pthread_mutex_init"); + goto free; + } - ret = pthread_cond_signal(&server->server_cv); + ret = pthread_cond_init(&server->server_cv, NULL); if (ret) { - pthread_errno(ret, "pthread_cond_signal"); - ret = 0; - goto unlock; + pthread_errno(ret, "pthread_cond_init"); + goto destroy_mtx; } -unlock: - pthread_errno_if(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock"); + server->stopping = 0; - return ret; -} + worker_queue_create(&server->worker_queue); + run_queue_create(&server->run_queue); -static int worker_iteration(struct server *server, int fd) -{ - struct run_queue_entry *ci_run = NULL; - int ret = 0; + ret = storage_settings_create_sqlite(&storage_settings, settings->sqlite_path); + if (ret < 0) + goto destroy_run_queue; - ret = worker_dequeue_run(server, &ci_run); + ret = storage_create(&server->storage, &storage_settings); + storage_settings_destroy(&storage_settings); if (ret < 0) - return ret; + goto destroy_run_queue; - ret = worker_ci_run(fd, ci_run); + ret = tcp_server_create(&server->tcp_server, settings->port); if (ret < 0) - goto requeue_run; + goto destroy_storage; + + ret = pthread_create(&server->main_thread, NULL, server_main_thread, server); + if (ret) { + pthread_errno(ret, "pthread_create"); + goto destroy_tcp_server; + } - run_queue_entry_destroy(ci_run); + *_server = server; return ret; -requeue_run: - worker_requeue_run(server, ci_run); +destroy_tcp_server: + tcp_server_destroy(server->tcp_server); - return ret; -} +destroy_storage: + storage_destroy(&server->storage); -static int worker_thread(struct server *server, int fd) -{ - int ret = 0; +destroy_run_queue: + run_queue_destroy(&server->run_queue); - while (1) { - ret = worker_iteration(server, fd); - if (ret < 0) - return ret; - } + worker_queue_destroy(&server->worker_queue); + + pthread_errno_if(pthread_cond_destroy(&server->server_cv), "pthread_cond_destroy"); + +destroy_mtx: + pthread_errno_if(pthread_mutex_destroy(&server->server_mtx), "pthread_mutex_destroy"); + +free: + free(server); return ret; } -static int msg_new_worker_handler(int client_fd, UNUSED const struct msg *request, - UNUSED struct msg **response, void *_server) +void server_destroy(struct server *server) { - return worker_thread((struct server *)_server, client_fd); + log("Shutting down\n"); + + pthread_errno_if(pthread_join(server->main_thread, NULL), "pthread_join"); + tcp_server_destroy(server->tcp_server); + storage_destroy(&server->storage); + run_queue_destroy(&server->run_queue); + worker_queue_destroy(&server->worker_queue); + pthread_errno_if(pthread_cond_destroy(&server->server_cv), "pthread_cond_destroy"); + pthread_errno_if(pthread_mutex_destroy(&server->server_mtx), "pthread_mutex_destroy"); + free(server); } -static int msg_ci_run_queue(struct server *server, const char *url, const char *rev) +static int handle_cmd_new_worker(UNUSED const struct msg *request, UNUSED struct msg **response, + void *_ctx) { - struct run_queue_entry *entry = NULL; + struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx; + struct server *server = (struct server *)ctx->arg; + int client_fd = ctx->fd; + + struct worker *worker = NULL; int ret = 0; - ret = pthread_mutex_lock(&server->server_mtx); - if (ret) { - pthread_errno(ret, "pthread_mutex_lock"); + ret = worker_create(&worker, client_fd); + if (ret < 0) return ret; - } - ret = run_queue_entry_create(&entry, url, rev); + ret = server_enqueue_worker(server, worker); if (ret < 0) - goto unlock; + goto destroy_worker; - run_queue_add_last(&server->run_queue, entry); - log("Added a new CI run for repository %s to the queue\n", url); - - ret = pthread_cond_signal(&server->server_cv); - if (ret) { - pthread_errno(ret, "pthread_cond_signal"); - ret = 0; - goto unlock; - } + return ret; -unlock: - pthread_errno_if(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock"); +destroy_worker: + worker_destroy(worker); return ret; } -static int msg_ci_run_handler(UNUSED int client_fd, const struct msg *request, - struct msg **response, void *_server) +static int handle_cmd_run(const struct msg *request, struct msg **response, void *_ctx) { - struct server *server = (struct server *)_server; + struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx; + struct server *server = (struct server *)ctx->arg; + struct run *run = NULL; + int ret = 0; - if (msg_get_length(request) != 3) { - log_err("Invalid number of arguments for a message: %zu\n", - msg_get_length(request)); - msg_dump(request); - return -1; - } + ret = run_from_msg(&run, request); + if (ret < 0) + return ret; - const char **words = msg_get_words(request); + ret = msg_success(response); + if (ret < 0) + goto destroy_run; - ret = msg_ci_run_queue(server, words[1], words[2]); + ret = server_enqueue_run(server, run); if (ret < 0) - return ret; + goto free_response; - return msg_success(response); -} + return ret; -static struct cmd_desc commands[] = { - {CMD_NEW_WORKER, msg_new_worker_handler}, - {CMD_RUN, msg_ci_run_handler}, -}; +free_response: + msg_free(*response); -static int server_set_stopping(struct server *server) +destroy_run: + run_destroy(run); + + return ret; +} + +static int handle_cmd_complete(UNUSED const struct msg *request, UNUSED struct msg **response, + void *_ctx) { + struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx; + struct server *server = (struct server *)ctx->arg; + int client_fd = ctx->fd; + + struct worker *worker = NULL; int ret = 0; - ret = pthread_mutex_lock(&server->server_mtx); - if (ret) { - pthread_errno(ret, "pthread_mutex_lock"); + log("Received a \"run complete\" message from worker %d\n", client_fd); + + ret = worker_create(&worker, client_fd); + if (ret < 0) return ret; - } - server->stopping = 1; + ret = server_enqueue_worker(server, worker); + if (ret < 0) + goto destroy_worker; - ret = pthread_cond_broadcast(&server->server_cv); - if (ret) { - pthread_errno(ret, "pthread_cond_signal"); - goto unlock; - } + return ret; -unlock: - pthread_errno_if(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock"); +destroy_worker: + worker_destroy(worker); - return ret; + return 0; } -int server_main(struct server *server) +static struct cmd_desc commands[] = { + {CMD_NEW_WORKER, handle_cmd_new_worker}, + {CMD_RUN, handle_cmd_run}, + {CMD_COMPLETE, handle_cmd_complete}, +}; + +static int server_listen_thread(struct server *server) { struct cmd_dispatcher *dispatcher = NULL; int ret = 0; - ret = signal_install_global_handler(); - if (ret < 0) - return ret; - ret = cmd_dispatcher_create(&dispatcher, commands, sizeof(commands) / sizeof(commands[0]), server); if (ret < 0) @@ -352,3 +415,8 @@ dispatcher_destroy: return server_set_stopping(server); } + +int server_main(struct server *server) +{ + return server_listen_thread(server); +} |