diff options
Diffstat (limited to '')
-rw-r--r-- | src/server.c | 361 |
1 files changed, 188 insertions, 173 deletions
diff --git a/src/server.c b/src/server.c index b5c7463..4706c71 100644 --- a/src/server.c +++ b/src/server.c @@ -4,100 +4,14 @@ #include "msg.h" #include "signal.h" #include "tcp_server.h" -#include "worker_queue.h" #include <pthread.h> +#include <stdlib.h> #include <string.h> #include <unistd.h> -static int server_has_runs_and_workers(const struct server *server) -{ - return !ci_queue_is_empty(&server->ci_queue) && - !worker_queue_is_empty(&server->worker_queue); -} - -static int server_scheduler_iteration(struct server *server) -{ - struct worker_queue_entry *worker; - struct ci_queue_entry *ci_run; - struct msg msg; - int response, ret = 0; - - worker = worker_queue_pop(&server->worker_queue); - ci_run = ci_queue_pop(&server->ci_queue); - - char *argv[] = {"ci_run", ci_run->url, ci_run->rev, NULL}; - - ret = msg_from_argv(&msg, argv); - if (ret < 0) - goto requeue_ci_run; - - ret = msg_send_and_wait(worker->fd, &msg, &response); - if (ret < 0) - goto free_msg; - - if (response < 0) { - print_error("Failed to schedule a CI run\n"); - } - - msg_free(&msg); - - ci_queue_entry_destroy(ci_run); - - /* FIXME: Don't mark worker as free! */ - worker_queue_push_head(&server->worker_queue, worker); - - return 0; - -free_msg: - msg_free(&msg); - -requeue_ci_run: - ci_queue_push_head(&server->ci_queue, ci_run); - - worker_queue_push_head(&server->worker_queue, worker); - - return ret; -} - -static void *server_scheduler(void *_server) -{ - struct server *server = (struct server *)_server; - int ret = 0; - - ret = pthread_mutex_lock(&server->server_mtx); - if (ret) { - pthread_print_errno(ret, "pthread_mutex_lock"); - goto exit; - } - - while (1) { - while (!server->stopping && !server_has_runs_and_workers(server)) { - ret = pthread_cond_wait(&server->server_cv, &server->server_mtx); - if (ret) { - pthread_print_errno(ret, "pthread_cond_wait"); - goto unlock; - } - } - - if (server->stopping) - goto unlock; - - ret = server_scheduler_iteration(server); - if (ret < 0) - goto unlock; - } - -unlock: - pthread_check(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock"); - -exit: - return NULL; -} - int server_create(struct server *server, const struct settings *settings) { - pthread_attr_t scheduler_attr; int ret = 0; ret = pthread_mutex_init(&server->server_mtx, NULL); @@ -118,40 +32,10 @@ int server_create(struct server *server, const struct settings *settings) if (ret < 0) goto destroy_cv; - worker_queue_create(&server->worker_queue); - ci_queue_create(&server->ci_queue); - ret = pthread_attr_init(&scheduler_attr); - if (ret) { - pthread_print_errno(ret, "pthread_attr_init"); - goto destroy_ci_queue; - } - - ret = signal_set_thread_attr(&scheduler_attr); - if (ret) - goto destroy_attr; - - ret = pthread_create(&server->scheduler, &scheduler_attr, server_scheduler, server); - if (ret) { - pthread_print_errno(ret, "pthread_create"); - goto destroy_attr; - } - - pthread_check(pthread_attr_destroy(&scheduler_attr), "pthread_attr_destroy"); - return ret; -destroy_attr: - pthread_check(pthread_attr_destroy(&scheduler_attr), "pthread_attr_destroy"); - -destroy_ci_queue: - ci_queue_destroy(&server->ci_queue); - - worker_queue_destroy(&server->worker_queue); - - tcp_server_destroy(&server->tcp_server); - destroy_cv: pthread_check(pthread_cond_destroy(&server->server_cv), "pthread_cond_destroy"); @@ -166,59 +50,48 @@ void server_destroy(struct server *server) { print_log("Shutting down\n"); - pthread_check(pthread_join(server->scheduler, NULL), "pthread_join"); ci_queue_destroy(&server->ci_queue); - worker_queue_destroy(&server->worker_queue); tcp_server_destroy(&server->tcp_server); pthread_check(pthread_cond_destroy(&server->server_cv), "pthread_cond_destroy"); pthread_check(pthread_mutex_destroy(&server->server_mtx), "pthread_mutex_destroy"); } -struct msg_context { - struct server *server; - int client_fd; -}; - -static int msg_new_worker(const struct msg *, void *_ctx) +static int server_has_runs(const struct server *server) { - struct msg_context *ctx = (struct msg_context *)_ctx; - return server_new_worker(ctx->server, ctx->client_fd); + return !ci_queue_is_empty(&server->ci_queue); } -static int msg_ci_run(const struct msg *msg, void *_ctx) +static int worker_ci_run(int fd, const struct ci_queue_entry *ci_run) { - struct msg_context *ctx = (struct msg_context *)_ctx; + struct msg request, response; + int ret = 0; - if (msg->argc != 3) { - print_error("Invalid number of arguments for a message: %d\n", msg->argc); - return -1; - } + char *argv[] = {"ci_run", ci_run->url, ci_run->rev, NULL}; - return server_ci_run(ctx->server, msg->argv[1], msg->argv[2]); -} + ret = msg_from_argv(&request, argv); + if (ret < 0) + return ret; -static int server_msg_handler(const struct msg *msg, void *ctx) -{ - if (msg->argc == 0) { - print_error("Received an empty message\n"); - return -1; + ret = msg_send_and_wait(fd, &request, &response); + msg_free(&request); + if (ret < 0) + return ret; + + if (response.argc < 0) { + print_error("Failed ot schedule a CI run: worker is busy?\n"); + ret = -1; + goto free_response; } - if (!strcmp(msg->argv[0], "new_worker")) - return msg_new_worker(msg, ctx); - if (!strcmp(msg->argv[0], "ci_run")) - return msg_ci_run(msg, ctx); + /* TODO: handle the response. */ - return msg_dump_unknown(msg); -} +free_response: + msg_free(&response); -static int server_conn_handler(int fd, void *server) -{ - struct msg_context ctx = {server, fd}; - return msg_recv_and_handle(fd, server_msg_handler, &ctx); + return ret; } -static int server_set_stopping(struct server *server) +static int worker_dequeue_run(struct server *server, struct ci_queue_entry **ci_run) { int ret = 0; @@ -228,39 +101,96 @@ static int server_set_stopping(struct server *server) return ret; } - server->stopping = 1; + while (!server->stopping && !server_has_runs(server)) { + ret = pthread_cond_wait(&server->server_cv, &server->server_mtx); + if (ret) { + pthread_print_errno(ret, "pthread_cond_wait"); + goto unlock; + } + } - ret = pthread_cond_signal(&server->server_cv); - if (ret) { - pthread_print_errno(ret, "pthread_cond_signal"); + if (server->stopping) { + ret = -1; goto unlock; } + *ci_run = ci_queue_pop(&server->ci_queue); + goto unlock; + unlock: pthread_check(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock"); return ret; } -int server_main(struct server *server) +static int worker_requeue_run(struct server *server, struct ci_queue_entry *ci_run) { int ret = 0; - while (!global_stop_flag) { - ret = tcp_server_accept(&server->tcp_server, server_conn_handler, server); + ret = pthread_mutex_lock(&server->server_mtx); + if (ret) { + pthread_print_errno(ret, "pthread_mutex_lock"); + return ret; + } + + ci_queue_push_head(&server->ci_queue, ci_run); + + pthread_check(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock"); + + return ret; +} + +static int worker_iteration(struct server *server, int fd) +{ + struct ci_queue_entry *ci_run; + int ret = 0; + + ret = worker_dequeue_run(server, &ci_run); + if (ret < 0) + return ret; + + ret = worker_ci_run(fd, ci_run); + if (ret < 0) + goto requeue_run; + + ci_queue_entry_destroy(ci_run); + return ret; + +requeue_run: + worker_requeue_run(server, ci_run); + + return ret; +} + +static int worker_thread(struct server *server, int fd) +{ + int ret = 0; + + while (1) { + ret = worker_iteration(server, fd); if (ret < 0) - break; + return ret; } - return server_set_stopping(server); + return ret; } -int server_new_worker(struct server *server, int fd) +static int msg_new_worker_handler(struct server *server, int client_fd, const struct msg *) { - struct worker_queue_entry *entry; + return worker_thread(server, client_fd); +} + +static int msg_new_worker_parser(const struct msg *) +{ + return 1; +} + +static int msg_ci_run_queue(struct server *server, const char *url, const char *rev) +{ + struct ci_queue_entry *entry; int ret = 0; - print_log("Registering a new worker\n"); + print_log("Scheduling a new CI run for repository %s\n", url); ret = pthread_mutex_lock(&server->server_mtx); if (ret) { @@ -268,15 +198,16 @@ int server_new_worker(struct server *server, int fd) return ret; } - ret = worker_queue_entry_create(&entry, fd); + ret = ci_queue_entry_create(&entry, url, rev); if (ret < 0) goto unlock; - worker_queue_push(&server->worker_queue, entry); + ci_queue_push(&server->ci_queue, entry); ret = pthread_cond_signal(&server->server_cv); if (ret) { pthread_print_errno(ret, "pthread_cond_signal"); + ret = 0; goto unlock; } @@ -286,12 +217,85 @@ unlock: return ret; } -int server_ci_run(struct server *server, const char *url, const char *rev) +static int msg_ci_run_handler(struct server *server, int client_fd, const struct msg *msg) { - struct ci_queue_entry *entry; + struct msg response; int ret = 0; - print_log("Scheduling a new CI run for repository %s\n", url); + ret = msg_ci_run_queue(server, msg->argv[1], msg->argv[2]); + if (ret < 0) + msg_error(&response); + else + msg_success(&response); + + return msg_send(client_fd, &response); +} + +static int msg_ci_run_parser(const struct msg *msg) +{ + if (msg->argc != 3) { + print_error("Invalid number of arguments for a message: %d\n", msg->argc); + return 0; + } + + return 1; +} + +typedef int (*msg_parser)(const struct msg *msg); +typedef int (*msg_handler)(struct server *, int client_fd, const struct msg *msg); + +struct msg_descr { + const char *cmd; + msg_parser parser; + msg_handler handler; +}; + +struct msg_descr messages[] = { + {"new_worker", msg_new_worker_parser, msg_new_worker_handler}, + {"ci_run", msg_ci_run_parser, msg_ci_run_handler}, +}; + +static int server_msg_handler(struct server *server, int client_fd, const struct msg *request) +{ + if (request->argc == 0) + goto unknown_request; + + size_t numof_messages = sizeof(messages) / sizeof(messages[0]); + + for (size_t i = 0; i < numof_messages; ++i) { + if (strcmp(messages[i].cmd, request->argv[0])) + continue; + if (!messages[i].parser(request)) + continue; + return messages[i].handler(server, client_fd, request); + } + +unknown_request: + print_error("Received an unknown message\n"); + msg_dump(request); + struct msg response; + msg_error(&response); + return msg_send(client_fd, &response); +} + +static int server_conn_handler(int client_fd, void *_server) +{ + struct server *server = (struct server *)_server; + struct msg request; + int ret = 0; + + ret = msg_recv(client_fd, &request); + if (ret < 0) + return ret; + + ret = server_msg_handler(server, client_fd, &request); + msg_free(&request); + return ret; +} + +static int server_set_stopping(struct server *server) +{ + int ret = 0; ret = pthread_mutex_lock(&server->server_mtx); if (ret) { @@ -299,11 +303,7 @@ int server_ci_run(struct server *server, const char *url, const char *rev) return ret; } - ret = ci_queue_entry_create(&entry, url, rev); - if (ret < 0) - goto unlock; - - ci_queue_push(&server->ci_queue, entry); + server->stopping = 1; ret = pthread_cond_signal(&server->server_cv); if (ret) { @@ -316,3 +316,18 @@ unlock: return ret; } + +int server_main(struct server *server) +{ + int ret = 0; + + while (!global_stop_flag) { + print_log("Waiting for new connections\n"); + + ret = tcp_server_accept(&server->tcp_server, server_conn_handler, server); + if (ret < 0) + break; + } + + return server_set_stopping(server); +} |