/* * Copyright (c) 2022 Egor Tensin * This file is part of the "cimple" project. * For details, see https://github.com/egor-tensin/cimple. * Distributed under the MIT License. */ #include "server.h" #include "command.h" #include "compiler.h" #include "const.h" #include "log.h" #include "msg.h" #include "run_queue.h" #include "signal.h" #include "storage.h" #include "storage_sqlite.h" #include "tcp_server.h" #include #include struct server { pthread_mutex_t server_mtx; pthread_cond_t server_cv; int stopping; struct storage storage; struct tcp_server *tcp_server; struct run_queue run_queue; }; int server_create(struct server **_server, const struct settings *settings) { struct storage_settings storage_settings; int ret = 0; struct server *server = malloc(sizeof(struct server)); if (!server) { log_errno("malloc"); return -1; } ret = pthread_mutex_init(&server->server_mtx, NULL); if (ret) { pthread_errno(ret, "pthread_mutex_init"); goto free; } ret = pthread_cond_init(&server->server_cv, NULL); if (ret) { pthread_errno(ret, "pthread_cond_init"); goto destroy_mtx; } server->stopping = 0; ret = storage_settings_create_sqlite(&storage_settings, settings->sqlite_path); if (ret < 0) goto destroy_cv; ret = storage_create(&server->storage, &storage_settings); storage_settings_destroy(&storage_settings); if (ret < 0) goto destroy_cv; ret = tcp_server_create(&server->tcp_server, settings->port); if (ret < 0) goto destroy_storage; run_queue_create(&server->run_queue); *_server = server; return ret; destroy_storage: storage_destroy(&server->storage); destroy_cv: pthread_errno_if(pthread_cond_destroy(&server->server_cv), "pthread_cond_destroy"); destroy_mtx: pthread_errno_if(pthread_mutex_destroy(&server->server_mtx), "pthread_mutex_destroy"); free: free(server); return ret; } void server_destroy(struct server *server) { log("Shutting down\n"); run_queue_destroy(&server->run_queue); tcp_server_destroy(server->tcp_server); storage_destroy(&server->storage); pthread_errno_if(pthread_cond_destroy(&server->server_cv), "pthread_cond_destroy"); pthread_errno_if(pthread_mutex_destroy(&server->server_mtx), "pthread_mutex_destroy"); free(server); } static int server_has_runs(const struct server *server) { return !run_queue_is_empty(&server->run_queue); } static int worker_ci_run(int fd, const struct run_queue_entry *ci_run) { struct msg *request = NULL, *response = NULL; int ret = 0; const char *argv[] = {CMD_RUN, run_queue_entry_get_url(ci_run), run_queue_entry_get_rev(ci_run), NULL}; ret = msg_from_argv(&request, argv); if (ret < 0) return ret; ret = msg_send_and_wait(fd, request, &response); msg_free(request); if (ret < 0) return ret; if (!msg_is_success(response)) { log_err("Failed to schedule a CI run: worker is busy?\n"); msg_dump(response); goto free_response; } /* TODO: handle the response. */ free_response: msg_free(response); return ret; } static int worker_dequeue_run(struct server *server, struct run_queue_entry **ci_run) { int ret = 0; ret = pthread_mutex_lock(&server->server_mtx); if (ret) { pthread_errno(ret, "pthread_mutex_lock"); return ret; } while (!server->stopping && !server_has_runs(server)) { ret = pthread_cond_wait(&server->server_cv, &server->server_mtx); if (ret) { pthread_errno(ret, "pthread_cond_wait"); goto unlock; } } if (server->stopping) { ret = -1; goto unlock; } *ci_run = run_queue_remove_first(&server->run_queue); log("Removed a CI run for repository %s from the queue\n", run_queue_entry_get_url(*ci_run)); goto unlock; unlock: pthread_errno_if(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock"); return ret; } static int worker_requeue_run(struct server *server, struct run_queue_entry *ci_run) { int ret = 0; ret = pthread_mutex_lock(&server->server_mtx); if (ret) { pthread_errno(ret, "pthread_mutex_lock"); return ret; } run_queue_add_first(&server->run_queue, ci_run); log("Requeued a CI run for repository %s\n", run_queue_entry_get_url(ci_run)); ret = pthread_cond_signal(&server->server_cv); if (ret) { pthread_errno(ret, "pthread_cond_signal"); ret = 0; goto unlock; } unlock: pthread_errno_if(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock"); return ret; } static int worker_iteration(struct server *server, int fd) { struct run_queue_entry *ci_run = NULL; int ret = 0; ret = worker_dequeue_run(server, &ci_run); if (ret < 0) return ret; ret = worker_ci_run(fd, ci_run); if (ret < 0) goto requeue_run; run_queue_entry_destroy(ci_run); return ret; requeue_run: worker_requeue_run(server, ci_run); return ret; } static int worker_thread(struct server *server, int fd) { int ret = 0; while (1) { ret = worker_iteration(server, fd); if (ret < 0) return ret; } return ret; } static int msg_new_worker_handler(int client_fd, UNUSED const struct msg *request, void *_server, UNUSED struct msg **response) { return worker_thread((struct server *)_server, client_fd); } static int msg_ci_run_queue(struct server *server, const char *url, const char *rev) { struct run_queue_entry *entry = NULL; int ret = 0; ret = pthread_mutex_lock(&server->server_mtx); if (ret) { pthread_errno(ret, "pthread_mutex_lock"); return ret; } ret = run_queue_entry_create(&entry, url, rev); if (ret < 0) goto unlock; run_queue_add_last(&server->run_queue, entry); log("Added a new CI run for repository %s to the queue\n", url); ret = pthread_cond_signal(&server->server_cv); if (ret) { pthread_errno(ret, "pthread_cond_signal"); ret = 0; goto unlock; } unlock: pthread_errno_if(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock"); return ret; } static int msg_ci_run_handler(UNUSED int client_fd, const struct msg *request, void *_server, struct msg **response) { struct server *server = (struct server *)_server; int ret = 0; if (msg_get_length(request) != 3) { log_err("Invalid number of arguments for a message: %zu\n", msg_get_length(request)); msg_dump(request); return -1; } const char **words = msg_get_words(request); ret = msg_ci_run_queue(server, words[1], words[2]); if (ret < 0) return ret; return msg_success(response); } static struct cmd_desc commands[] = { {CMD_NEW_WORKER, msg_new_worker_handler}, {CMD_RUN, msg_ci_run_handler}, }; static int server_set_stopping(struct server *server) { int ret = 0; ret = pthread_mutex_lock(&server->server_mtx); if (ret) { pthread_errno(ret, "pthread_mutex_lock"); return ret; } server->stopping = 1; ret = pthread_cond_broadcast(&server->server_cv); if (ret) { pthread_errno(ret, "pthread_cond_signal"); goto unlock; } unlock: pthread_errno_if(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock"); return ret; } int server_main(struct server *server) { struct cmd_dispatcher *dispatcher = NULL; int ret = 0; ret = signal_install_global_handler(); if (ret < 0) return ret; ret = cmd_dispatcher_create(&dispatcher, commands, sizeof(commands) / sizeof(commands[0]), server); if (ret < 0) return ret; while (!global_stop_flag) { log("Waiting for new connections\n"); ret = tcp_server_accept(server->tcp_server, cmd_dispatcher_handle_conn, dispatcher); if (ret < 0) { if (errno == EINVAL && global_stop_flag) ret = 0; goto dispatcher_destroy; } } dispatcher_destroy: cmd_dispatcher_destroy(dispatcher); return server_set_stopping(server); }