diff options
-rw-r--r-- | src/CMakeLists.txt | 4 | ||||
-rw-r--r-- | src/command.c | 65 | ||||
-rw-r--r-- | src/command.h | 11 | ||||
-rw-r--r-- | src/const.h | 1 | ||||
-rw-r--r-- | src/run_queue.c | 47 | ||||
-rw-r--r-- | src/run_queue.h | 21 | ||||
-rw-r--r-- | src/server.c | 430 | ||||
-rw-r--r-- | src/tcp_server.c | 28 | ||||
-rw-r--r-- | src/worker.c | 116 | ||||
-rw-r--r-- | src/worker.h | 4 | ||||
-rw-r--r-- | src/worker_main.c | 4 | ||||
-rw-r--r-- | src/worker_queue.c | 84 | ||||
-rw-r--r-- | src/worker_queue.h | 32 |
13 files changed, 521 insertions, 326 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index f8b2a7c..053ca9e 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -48,7 +48,8 @@ add_my_executable(server server_main.c server.c sqlite.c storage.c storage_sqlite.c - tcp_server.c) + tcp_server.c + worker_queue.c) target_link_libraries(server PRIVATE pthread sqlite3) target_include_directories(server PRIVATE "${CMAKE_CURRENT_BINARY_DIR}") @@ -67,5 +68,6 @@ add_my_executable(worker worker_main.c worker.c msg.c net.c process.c + run_queue.c signal.c) target_link_libraries(worker PRIVATE git2 pthread) diff --git a/src/command.c b/src/command.c index 3c75e69..581b319 100644 --- a/src/command.c +++ b/src/command.c @@ -103,17 +103,10 @@ void cmd_dispatcher_destroy(struct cmd_dispatcher *dispatcher) free(dispatcher); } -int cmd_dispatcher_handle_msg(const struct cmd_dispatcher *dispatcher, int conn_fd, - const struct msg *request) +int cmd_dispatcher_handle(const struct cmd_dispatcher *dispatcher, const struct msg *command, + struct msg **result) { - struct msg *response = NULL; - int ret = 0; - - size_t numof_words = msg_get_length(request); - if (numof_words == 0) - goto unknown; - - const char *actual_cmd = msg_get_first_word(request); + const char *actual_cmd = msg_get_first_word(command); for (size_t i = 0; i < dispatcher->numof_cmds; ++i) { struct cmd_desc *cmd = &dispatcher->cmds[i]; @@ -121,35 +114,55 @@ int cmd_dispatcher_handle_msg(const struct cmd_dispatcher *dispatcher, int conn_ if (strcmp(cmd->name, actual_cmd)) continue; - ret = cmd->handler(conn_fd, request, &response, dispatcher->ctx); - goto exit; + return cmd->handler(command, result, dispatcher->ctx); } -unknown: log_err("Received an unknown command\n"); - ret = -1; - msg_dump(request); - goto exit; - -exit: - if (ret < 0 && !response) - msg_error(&response); - if (response) - return msg_send(conn_fd, response); - return ret; + msg_dump(command); + return -1; } int cmd_dispatcher_handle_conn(int conn_fd, void *_dispatcher) { struct cmd_dispatcher *dispatcher = (struct cmd_dispatcher *)_dispatcher; - struct msg *request = NULL; + struct msg *request = NULL, *response = NULL; int ret = 0; + struct cmd_conn_ctx *new_ctx = malloc(sizeof(struct cmd_conn_ctx)); + if (!new_ctx) { + log_errno("malloc"); + return -1; + } + + new_ctx->fd = conn_fd; + new_ctx->arg = dispatcher->ctx; + ret = msg_recv(conn_fd, &request); if (ret < 0) - return ret; + goto free_ctx; + + void *old_ctx = dispatcher->ctx; + dispatcher->ctx = new_ctx; + ret = cmd_dispatcher_handle(dispatcher, request, &response); + dispatcher->ctx = old_ctx; + + if (ret < 0) + goto free_response; + + if (response) { + ret = msg_send(conn_fd, response); + if (ret < 0) + goto free_response; + } + +free_response: + if (response) + msg_free(response); - ret = cmd_dispatcher_handle_msg(dispatcher, conn_fd, request); msg_free(request); + +free_ctx: + free(new_ctx); + return ret; } diff --git a/src/command.h b/src/command.h index 0ab44c1..90facbb 100644 --- a/src/command.h +++ b/src/command.h @@ -12,8 +12,7 @@ #include <stddef.h> -typedef int (*cmd_handler)(int conn_fd, const struct msg *request, struct msg **response, - void *ctx); +typedef int (*cmd_handler)(const struct msg *request, struct msg **response, void *ctx); struct cmd_desc { char *name; @@ -26,7 +25,13 @@ int cmd_dispatcher_create(struct cmd_dispatcher **, struct cmd_desc *, size_t nu void *ctx); void cmd_dispatcher_destroy(struct cmd_dispatcher *); -int cmd_dispatcher_handle_msg(const struct cmd_dispatcher *, int conn_fd, const struct msg *); +int cmd_dispatcher_handle(const struct cmd_dispatcher *, const struct msg *command, + struct msg **response); + +struct cmd_conn_ctx { + int fd; + void *arg; +}; /* This is supposed to be used as an argument to tcp_server_accept. */ int cmd_dispatcher_handle_conn(int conn_fd, void *dispatcher); diff --git a/src/const.h b/src/const.h index f53ef7e..2e7054b 100644 --- a/src/const.h +++ b/src/const.h @@ -16,5 +16,6 @@ #define CMD_RUN "run" #define CMD_NEW_WORKER "new-worker" +#define CMD_COMPLETE "complete" #endif diff --git a/src/run_queue.c b/src/run_queue.c index 8b5052b..0455c92 100644 --- a/src/run_queue.c +++ b/src/run_queue.c @@ -7,20 +7,21 @@ #include "run_queue.h" #include "log.h" +#include "msg.h" #include <stdlib.h> #include <string.h> #include <sys/queue.h> -struct run_queue_entry { +struct run { char *url; char *rev; - STAILQ_ENTRY(run_queue_entry) entries; + STAILQ_ENTRY(run) entries; }; -int run_queue_entry_create(struct run_queue_entry **_entry, const char *_url, const char *_rev) +int run_create(struct run **_entry, const char *_url, const char *_rev) { - struct run_queue_entry *entry = malloc(sizeof(struct run_queue_entry)); + struct run *entry = malloc(sizeof(struct run)); if (!entry) { log_errno("malloc"); goto fail; @@ -54,19 +55,33 @@ fail: return -1; } -void run_queue_entry_destroy(struct run_queue_entry *entry) +int run_from_msg(struct run **run, const struct msg *msg) +{ + size_t msg_len = msg_get_length(msg); + + if (msg_len != 3) { + log_err("Invalid number of arguments for a message: %zu\n", msg_len); + msg_dump(msg); + return -1; + } + + const char **words = msg_get_words(msg); + return run_create(run, words[1], words[2]); +} + +void run_destroy(struct run *entry) { free(entry->rev); free(entry->url); free(entry); } -const char *run_queue_entry_get_url(const struct run_queue_entry *entry) +const char *run_get_url(const struct run *entry) { return entry->url; } -const char *run_queue_entry_get_rev(const struct run_queue_entry *entry) +const char *run_get_rev(const struct run *entry) { return entry->rev; } @@ -78,10 +93,10 @@ void run_queue_create(struct run_queue *queue) void run_queue_destroy(struct run_queue *queue) { - struct run_queue_entry *entry1 = STAILQ_FIRST(queue); + struct run *entry1 = STAILQ_FIRST(queue); while (entry1) { - struct run_queue_entry *entry2 = STAILQ_NEXT(entry1, entries); - run_queue_entry_destroy(entry1); + struct run *entry2 = STAILQ_NEXT(entry1, entries); + run_destroy(entry1); entry1 = entry2; } STAILQ_INIT(queue); @@ -92,19 +107,19 @@ int run_queue_is_empty(const struct run_queue *queue) return STAILQ_EMPTY(queue); } -void run_queue_add_last(struct run_queue *queue, struct run_queue_entry *entry) +void run_queue_add_first(struct run_queue *queue, struct run *entry) { - STAILQ_INSERT_TAIL(queue, entry, entries); + STAILQ_INSERT_HEAD(queue, entry, entries); } -void run_queue_add_first(struct run_queue *queue, struct run_queue_entry *entry) +void run_queue_add_last(struct run_queue *queue, struct run *entry) { - STAILQ_INSERT_HEAD(queue, entry, entries); + STAILQ_INSERT_TAIL(queue, entry, entries); } -struct run_queue_entry *run_queue_remove_first(struct run_queue *queue) +struct run *run_queue_remove_first(struct run_queue *queue) { - struct run_queue_entry *entry = STAILQ_FIRST(queue); + struct run *entry = STAILQ_FIRST(queue); STAILQ_REMOVE_HEAD(queue, entries); return entry; } diff --git a/src/run_queue.h b/src/run_queue.h index 629a8e0..eca071e 100644 --- a/src/run_queue.h +++ b/src/run_queue.h @@ -8,26 +8,29 @@ #ifndef __RUN_QUEUE_H__ #define __RUN_QUEUE_H__ +#include "msg.h" + #include <sys/queue.h> -struct run_queue_entry; +struct run; -int run_queue_entry_create(struct run_queue_entry **, const char *url, const char *rev); -void run_queue_entry_destroy(struct run_queue_entry *); +int run_create(struct run **, const char *url, const char *rev); +int run_from_msg(struct run **, const struct msg *); +void run_destroy(struct run *); -const char *run_queue_entry_get_url(const struct run_queue_entry *); -const char *run_queue_entry_get_rev(const struct run_queue_entry *); +const char *run_get_url(const struct run *); +const char *run_get_rev(const struct run *); -STAILQ_HEAD(run_queue, run_queue_entry); +STAILQ_HEAD(run_queue, run); void run_queue_create(struct run_queue *); void run_queue_destroy(struct run_queue *); int run_queue_is_empty(const struct run_queue *); -void run_queue_add_first(struct run_queue *, struct run_queue_entry *); -void run_queue_add_last(struct run_queue *, struct run_queue_entry *); +void run_queue_add_first(struct run_queue *, struct run *); +void run_queue_add_last(struct run_queue *, struct run *); -struct run_queue_entry *run_queue_remove_first(struct run_queue *); +struct run *run_queue_remove_first(struct run_queue *); #endif diff --git a/src/server.c b/src/server.c index 2e5fc18..bf6a967 100644 --- a/src/server.c +++ b/src/server.c @@ -16,6 +16,7 @@ #include "storage.h" #include "storage_sqlite.h" #include "tcp_server.h" +#include "worker_queue.h" #include <pthread.h> #include <stdlib.h> @@ -26,311 +27,373 @@ struct server { int stopping; + struct worker_queue worker_queue; + struct run_queue run_queue; + struct storage storage; - struct tcp_server *tcp_server; + pthread_t main_thread; - struct run_queue run_queue; + struct tcp_server *tcp_server; }; -int server_create(struct server **_server, const struct settings *settings) +static int server_lock(struct server *server) { - struct storage_settings storage_settings; - int ret = 0; - - struct server *server = malloc(sizeof(struct server)); - if (!server) { - log_errno("malloc"); - return -1; - } - - ret = pthread_mutex_init(&server->server_mtx, NULL); + int ret = pthread_mutex_lock(&server->server_mtx); if (ret) { - pthread_errno(ret, "pthread_mutex_init"); - goto free; + pthread_errno(ret, "pthread_mutex_lock"); + return ret; } + return ret; +} - ret = pthread_cond_init(&server->server_cv, NULL); +static void server_unlock(struct server *server) +{ + pthread_errno_if(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock"); +} + +static int server_wait(struct server *server) +{ + int ret = pthread_cond_wait(&server->server_cv, &server->server_mtx); if (ret) { - pthread_errno(ret, "pthread_cond_init"); - goto destroy_mtx; + pthread_errno(ret, "pthread_cond_wait"); + return ret; } + return ret; +} - server->stopping = 0; +static void server_notify(struct server *server) +{ + pthread_errno_if(pthread_cond_signal(&server->server_cv), "pthread_cond_signal"); +} - ret = storage_settings_create_sqlite(&storage_settings, settings->sqlite_path); - if (ret < 0) - goto destroy_cv; +static int server_set_stopping(struct server *server) +{ + int ret = 0; - ret = storage_create(&server->storage, &storage_settings); - storage_settings_destroy(&storage_settings); + ret = server_lock(server); if (ret < 0) - goto destroy_cv; + return ret; - ret = tcp_server_create(&server->tcp_server, settings->port); + server->stopping = 1; + + server_notify(server); + server_unlock(server); + return ret; +} + +static int server_has_workers(const struct server *server) +{ + return !worker_queue_is_empty(&server->worker_queue); +} + +static int server_enqueue_worker(struct server *server, struct worker *worker) +{ + int ret = 0; + + ret = server_lock(server); if (ret < 0) - goto destroy_storage; + return ret; - run_queue_create(&server->run_queue); + worker_queue_add_last(&server->worker_queue, worker); + log("Added a new worker %d to the queue\n", worker_get_fd(worker)); - *_server = server; + server_notify(server); + server_unlock(server); return ret; +} -destroy_storage: - storage_destroy(&server->storage); +static int server_has_runs(const struct server *server) +{ + return !run_queue_is_empty(&server->run_queue); +} -destroy_cv: - pthread_errno_if(pthread_cond_destroy(&server->server_cv), "pthread_cond_destroy"); +static int server_enqueue_run(struct server *server, struct run *run) +{ + int ret = 0; -destroy_mtx: - pthread_errno_if(pthread_mutex_destroy(&server->server_mtx), "pthread_mutex_destroy"); + ret = server_lock(server); + if (ret < 0) + return ret; -free: - free(server); + run_queue_add_last(&server->run_queue, run); + log("Added a new CI run for repository %s to the queue\n", run_get_url(run)); + server_notify(server); + server_unlock(server); return ret; } -void server_destroy(struct server *server) +static int server_ready_for_action(const struct server *server) { - log("Shutting down\n"); - - run_queue_destroy(&server->run_queue); - tcp_server_destroy(server->tcp_server); - storage_destroy(&server->storage); - pthread_errno_if(pthread_cond_destroy(&server->server_cv), "pthread_cond_destroy"); - pthread_errno_if(pthread_mutex_destroy(&server->server_mtx), "pthread_mutex_destroy"); - free(server); + return server->stopping || (server_has_runs(server) && server_has_workers(server)); } -static int server_has_runs(const struct server *server) +static int server_wait_for_action(struct server *server) { - return !run_queue_is_empty(&server->run_queue); + int ret = 0; + + while (!server_ready_for_action(server)) { + ret = server_wait(server); + if (ret < 0) + return ret; + } + + return ret; } -static int worker_ci_run(int fd, const struct run_queue_entry *ci_run) +static int server_assign_run(struct server *server) { - struct msg *request = NULL, *response = NULL; int ret = 0; - const char *argv[] = {CMD_RUN, run_queue_entry_get_url(ci_run), - run_queue_entry_get_rev(ci_run), NULL}; + struct run *run = run_queue_remove_first(&server->run_queue); + log("Removed a CI run for repository %s from the queue\n", run_get_url(run)); + + struct worker *worker = worker_queue_remove_first(&server->worker_queue); + log("Removed worker %d from the queue\n", worker_get_fd(worker)); + + const char *argv[] = {CMD_RUN, run_get_url(run), run_get_rev(run), NULL}; + + struct msg *request = NULL; ret = msg_from_argv(&request, argv); - if (ret < 0) + if (ret < 0) { + worker_queue_add_first(&server->worker_queue, worker); + run_queue_add_first(&server->run_queue, run); return ret; + } - ret = msg_communicate(fd, request, &response); + ret = msg_communicate(worker_get_fd(worker), request, NULL); msg_free(request); - if (ret < 0) + if (ret < 0) { + /* Failed to communicate with the worker, requeue the run + * and forget about the worker. */ + worker_destroy(worker); + run_queue_add_first(&server->run_queue, run); return ret; - - if (!msg_is_success(response)) { - log_err("Failed to schedule a CI run: worker is busy?\n"); - msg_dump(response); - goto free_response; } - /* TODO: handle the response. */ - -free_response: - msg_free(response); - + /* Send the run to the worker, forget about both of them for a while. */ + worker_destroy(worker); + run_destroy(run); return ret; } -static int worker_dequeue_run(struct server *server, struct run_queue_entry **ci_run) +static void *server_main_thread(void *_server) { + struct server *server = (struct server *)_server; int ret = 0; - ret = pthread_mutex_lock(&server->server_mtx); - if (ret) { - pthread_errno(ret, "pthread_mutex_lock"); - return ret; - } + ret = server_lock(server); + if (ret < 0) + goto exit; - while (!server->stopping && !server_has_runs(server)) { - ret = pthread_cond_wait(&server->server_cv, &server->server_mtx); - if (ret) { - pthread_errno(ret, "pthread_cond_wait"); + while (1) { + ret = server_wait_for_action(server); + if (ret < 0) goto unlock; - } - } - if (server->stopping) { - ret = -1; - goto unlock; - } + if (server->stopping) + goto unlock; - *ci_run = run_queue_remove_first(&server->run_queue); - log("Removed a CI run for repository %s from the queue\n", - run_queue_entry_get_url(*ci_run)); - goto unlock; + ret = server_assign_run(server); + if (ret < 0) + goto unlock; + } unlock: - pthread_errno_if(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock"); + server_unlock(server); - return ret; +exit: + return NULL; } -static int worker_requeue_run(struct server *server, struct run_queue_entry *ci_run) +int server_create(struct server **_server, const struct settings *settings) { + struct storage_settings storage_settings; int ret = 0; - ret = pthread_mutex_lock(&server->server_mtx); - if (ret) { - pthread_errno(ret, "pthread_mutex_lock"); + ret = signal_install_global_handler(); + if (ret < 0) return ret; + + struct server *server = malloc(sizeof(struct server)); + if (!server) { + log_errno("malloc"); + return -1; } - run_queue_add_first(&server->run_queue, ci_run); - log("Requeued a CI run for repository %s\n", run_queue_entry_get_url(ci_run)); + ret = pthread_mutex_init(&server->server_mtx, NULL); + if (ret) { + pthread_errno(ret, "pthread_mutex_init"); + goto free; + } - ret = pthread_cond_signal(&server->server_cv); + ret = pthread_cond_init(&server->server_cv, NULL); if (ret) { - pthread_errno(ret, "pthread_cond_signal"); - ret = 0; - goto unlock; + pthread_errno(ret, "pthread_cond_init"); + goto destroy_mtx; } -unlock: - pthread_errno_if(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock"); + server->stopping = 0; - return ret; -} + worker_queue_create(&server->worker_queue); + run_queue_create(&server->run_queue); -static int worker_iteration(struct server *server, int fd) -{ - struct run_queue_entry *ci_run = NULL; - int ret = 0; + ret = storage_settings_create_sqlite(&storage_settings, settings->sqlite_path); + if (ret < 0) + goto destroy_run_queue; - ret = worker_dequeue_run(server, &ci_run); + ret = storage_create(&server->storage, &storage_settings); + storage_settings_destroy(&storage_settings); if (ret < 0) - return ret; + goto destroy_run_queue; - ret = worker_ci_run(fd, ci_run); + ret = tcp_server_create(&server->tcp_server, settings->port); if (ret < 0) - goto requeue_run; + goto destroy_storage; + + ret = pthread_create(&server->main_thread, NULL, server_main_thread, server); + if (ret) { + pthread_errno(ret, "pthread_create"); + goto destroy_tcp_server; + } - run_queue_entry_destroy(ci_run); + *_server = server; return ret; -requeue_run: - worker_requeue_run(server, ci_run); +destroy_tcp_server: + tcp_server_destroy(server->tcp_server); - return ret; -} +destroy_storage: + storage_destroy(&server->storage); -static int worker_thread(struct server *server, int fd) -{ - int ret = 0; +destroy_run_queue: + run_queue_destroy(&server->run_queue); - while (1) { - ret = worker_iteration(server, fd); - if (ret < 0) - return ret; - } + worker_queue_destroy(&server->worker_queue); + + pthread_errno_if(pthread_cond_destroy(&server->server_cv), "pthread_cond_destroy"); + +destroy_mtx: + pthread_errno_if(pthread_mutex_destroy(&server->server_mtx), "pthread_mutex_destroy"); + +free: + free(server); return ret; } -static int msg_new_worker_handler(int client_fd, UNUSED const struct msg *request, - UNUSED struct msg **response, void *_server) +void server_destroy(struct server *server) { - return worker_thread((struct server *)_server, client_fd); + log("Shutting down\n"); + + pthread_errno_if(pthread_join(server->main_thread, NULL), "pthread_join"); + tcp_server_destroy(server->tcp_server); + storage_destroy(&server->storage); + run_queue_destroy(&server->run_queue); + worker_queue_destroy(&server->worker_queue); + pthread_errno_if(pthread_cond_destroy(&server->server_cv), "pthread_cond_destroy"); + pthread_errno_if(pthread_mutex_destroy(&server->server_mtx), "pthread_mutex_destroy"); + free(server); } -static int msg_ci_run_queue(struct server *server, const char *url, const char *rev) +static int handle_cmd_new_worker(UNUSED const struct msg *request, UNUSED struct msg **response, + void *_ctx) { - struct run_queue_entry *entry = NULL; + struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx; + struct server *server = (struct server *)ctx->arg; + int client_fd = ctx->fd; + + struct worker *worker = NULL; int ret = 0; - ret = pthread_mutex_lock(&server->server_mtx); - if (ret) { - pthread_errno(ret, "pthread_mutex_lock"); + ret = worker_create(&worker, client_fd); + if (ret < 0) return ret; - } - ret = run_queue_entry_create(&entry, url, rev); + ret = server_enqueue_worker(server, worker); if (ret < 0) - goto unlock; + goto destroy_worker; - run_queue_add_last(&server->run_queue, entry); - log("Added a new CI run for repository %s to the queue\n", url); - - ret = pthread_cond_signal(&server->server_cv); - if (ret) { - pthread_errno(ret, "pthread_cond_signal"); - ret = 0; - goto unlock; - } + return ret; -unlock: - pthread_errno_if(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock"); +destroy_worker: + worker_destroy(worker); return ret; } -static int msg_ci_run_handler(UNUSED int client_fd, const struct msg *request, - struct msg **response, void *_server) +static int handle_cmd_run(const struct msg *request, struct msg **response, void *_ctx) { - struct server *server = (struct server *)_server; + struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx; + struct server *server = (struct server *)ctx->arg; + struct run *run = NULL; + int ret = 0; - if (msg_get_length(request) != 3) { - log_err("Invalid number of arguments for a message: %zu\n", - msg_get_length(request)); - msg_dump(request); - return -1; - } + ret = run_from_msg(&run, request); + if (ret < 0) + return ret; - const char **words = msg_get_words(request); + ret = msg_success(response); + if (ret < 0) + goto destroy_run; - ret = msg_ci_run_queue(server, words[1], words[2]); + ret = server_enqueue_run(server, run); if (ret < 0) - return ret; + goto free_response; - return msg_success(response); -} + return ret; -static struct cmd_desc commands[] = { - {CMD_NEW_WORKER, msg_new_worker_handler}, - {CMD_RUN, msg_ci_run_handler}, -}; +free_response: + msg_free(*response); -static int server_set_stopping(struct server *server) +destroy_run: + run_destroy(run); + + return ret; +} + +static int handle_cmd_complete(UNUSED const struct msg *request, UNUSED struct msg **response, + void *_ctx) { + struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx; + struct server *server = (struct server *)ctx->arg; + int client_fd = ctx->fd; + + struct worker *worker = NULL; int ret = 0; - ret = pthread_mutex_lock(&server->server_mtx); - if (ret) { - pthread_errno(ret, "pthread_mutex_lock"); + log("Received a \"run complete\" message from worker %d\n", client_fd); + + ret = worker_create(&worker, client_fd); + if (ret < 0) return ret; - } - server->stopping = 1; + ret = server_enqueue_worker(server, worker); + if (ret < 0) + goto destroy_worker; - ret = pthread_cond_broadcast(&server->server_cv); - if (ret) { - pthread_errno(ret, "pthread_cond_signal"); - goto unlock; - } + return ret; -unlock: - pthread_errno_if(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock"); +destroy_worker: + worker_destroy(worker); - return ret; + return 0; } -int server_main(struct server *server) +static struct cmd_desc commands[] = { + {CMD_NEW_WORKER, handle_cmd_new_worker}, + {CMD_RUN, handle_cmd_run}, + {CMD_COMPLETE, handle_cmd_complete}, +}; + +static int server_listen_thread(struct server *server) { struct cmd_dispatcher *dispatcher = NULL; int ret = 0; - ret = signal_install_global_handler(); - if (ret < 0) - return ret; - ret = cmd_dispatcher_create(&dispatcher, commands, sizeof(commands) / sizeof(commands[0]), server); if (ret < 0) @@ -352,3 +415,8 @@ dispatcher_destroy: return server_set_stopping(server); } + +int server_main(struct server *server) +{ + return server_listen_thread(server); +} diff --git a/src/tcp_server.c b/src/tcp_server.c index 25a74a7..d840414 100644 --- a/src/tcp_server.c +++ b/src/tcp_server.c @@ -62,19 +62,18 @@ static void *connection_thread(void *_ctx) ret = signal_block_child(); if (ret < 0) - goto close; + goto free_ctx; ctx->handler(ctx->fd, ctx->arg); -close: - log_errno_if(close(ctx->fd), "close"); +free_ctx: free(ctx); + return NULL; } int tcp_server_accept(const struct tcp_server *server, tcp_server_conn_handler handler, void *arg) { - pthread_attr_t child_attr; sigset_t old_mask; pthread_t child; int ret = 0; @@ -93,23 +92,11 @@ int tcp_server_accept(const struct tcp_server *server, tcp_server_conn_handler h goto free_ctx; ctx->fd = ret; - ret = pthread_attr_init(&child_attr); - if (ret) { - pthread_errno(ret, "pthread_attr_init"); - goto close_conn; - } - - ret = pthread_attr_setdetachstate(&child_attr, PTHREAD_CREATE_DETACHED); - if (ret) { - pthread_errno(ret, "pthread_attr_setdetachstate"); - goto destroy_attr; - } - ret = signal_block_parent(&old_mask); if (ret < 0) - goto destroy_attr; + goto close_conn; - ret = pthread_create(&child, &child_attr, connection_thread, ctx); + ret = pthread_create(&child, NULL, connection_thread, ctx); if (ret) { pthread_errno(ret, "pthread_create"); goto restore_mask; @@ -117,16 +104,11 @@ int tcp_server_accept(const struct tcp_server *server, tcp_server_conn_handler h signal_set(&old_mask, NULL); - pthread_errno_if(pthread_attr_destroy(&child_attr), "pthread_attr_destroy"); - return ret; restore_mask: signal_set(&old_mask, NULL); -destroy_attr: - pthread_errno_if(pthread_attr_destroy(&child_attr), "pthread_attr_destroy"); - close_conn: log_errno_if(close(ctx->fd), "close"); diff --git a/src/worker.c b/src/worker.c index 2a258e5..5762549 100644 --- a/src/worker.c +++ b/src/worker.c @@ -13,27 +13,24 @@ #include "git.h" #include "log.h" #include "msg.h" -#include "net.h" #include "process.h" +#include "run_queue.h" #include "signal.h" -#include <pthread.h> #include <stdlib.h> -#include <unistd.h> struct worker { - int fd; - - /* TODO: these are not used, but they should be! */ - pthread_mutex_t task_mtx; - pthread_t task; - int task_active; + int dummy; }; -int worker_create(struct worker **_worker, const struct settings *settings) +int worker_create(struct worker **_worker) { int ret = 0; + ret = signal_install_global_handler(); + if (ret < 0) + return ret; + struct worker *worker = malloc(sizeof(struct worker)); if (!worker) { log_errno("malloc"); @@ -44,17 +41,9 @@ int worker_create(struct worker **_worker, const struct settings *settings) if (ret < 0) goto free; - ret = net_connect(settings->host, settings->port); - if (ret < 0) - goto git_shutdown; - worker->fd = ret; - *_worker = worker; return ret; -git_shutdown: - libgit_shutdown(); - free: free(worker); @@ -65,14 +54,19 @@ void worker_destroy(struct worker *worker) { log("Shutting down\n"); - log_errno_if(close(worker->fd), "close"); libgit_shutdown(); free(worker); } -static int msg_send_new_worker(const struct worker *worker) +static int worker_send_to_server(const struct settings *settings, const struct msg *request, + struct msg **response) +{ + return msg_connect_and_communicate(settings->host, settings->port, request, response); +} + +static int worker_send_to_server_argv(const struct settings *settings, const char **argv, + struct msg **response) { - static const char *argv[] = {CMD_NEW_WORKER, NULL}; struct msg *msg = NULL; int ret = 0; @@ -80,84 +74,80 @@ static int msg_send_new_worker(const struct worker *worker) if (ret < 0) return ret; - ret = msg_send(worker->fd, msg); + ret = worker_send_to_server(settings, msg, response); msg_free(msg); return ret; } -static int msg_ci_run_do(const char *url, const char *rev, struct proc_output *result) +static int worker_send_new_worker(const struct settings *settings, struct msg **task) { - int ret = 0; - - ret = ci_run_git_repo(url, rev, result); - if (ret < 0) { - log_err("Run failed with an error\n"); - return ret; - } - - proc_output_dump(result); - return 0; + static const char *argv[] = {CMD_NEW_WORKER, NULL}; + return worker_send_to_server_argv(settings, argv, task); } -static int msg_ci_run_handler(UNUSED int conn_fd, const struct msg *request, struct msg **response, - UNUSED void *_worker) +static int msg_run_handler(const struct msg *request, struct msg **response, UNUSED void *ctx) { + struct run *run = NULL; struct proc_output result; int ret = 0; - if (msg_get_length(request) != 3) { - log_err("Invalid number of arguments for a message\n"); - msg_dump(request); - return -1; + ret = run_from_msg(&run, request); + if (ret < 0) + return ret; + + proc_output_init(&result); + + ret = ci_run_git_repo(run_get_url(run), run_get_rev(run), &result); + if (ret < 0) { + log_err("Run failed with an error\n"); + goto free_output; } - const char **words = msg_get_words(request); + proc_output_dump(&result); - proc_output_init(&result); - ret = msg_ci_run_do(words[1], words[2], &result); + static const char *argv[] = {CMD_COMPLETE, NULL}; + ret = msg_from_argv(response, argv); + if (ret < 0) + goto free_output; + +free_output: proc_output_free(&result); - if (ret < 0) - return ret; + run_destroy(run); - return msg_success(response); + return ret; } static struct cmd_desc cmds[] = { - {CMD_RUN, msg_ci_run_handler}, + {CMD_RUN, msg_run_handler}, }; -int worker_main(struct worker *worker, UNUSED int argc, UNUSED char *argv[]) +int worker_main(UNUSED struct worker *worker, const struct settings *settings) { + struct msg *task = NULL; struct cmd_dispatcher *dispatcher = NULL; int ret = 0; - ret = signal_install_global_handler(); + ret = cmd_dispatcher_create(&dispatcher, cmds, sizeof(cmds) / sizeof(cmds[0]), NULL); if (ret < 0) return ret; - ret = msg_send_new_worker(worker); - if (ret < 0) - return ret; + log("Waiting for a new command\n"); - ret = cmd_dispatcher_create(&dispatcher, cmds, sizeof(cmds) / sizeof(cmds[0]), worker); + ret = worker_send_new_worker(settings, &task); if (ret < 0) - return ret; + goto dispatcher_destroy; while (!global_stop_flag) { - struct msg *request = NULL; + struct msg *result = NULL; - log("Waiting for a new command\n"); - - ret = msg_recv(worker->fd, &request); - if (ret < 0) { - if (errno == EINVAL && global_stop_flag) - ret = 0; + ret = cmd_dispatcher_handle(dispatcher, task, &result); + msg_free(task); + if (ret < 0) goto dispatcher_destroy; - } - ret = cmd_dispatcher_handle_msg(dispatcher, worker->fd, request); - msg_free(request); + ret = worker_send_to_server(settings, result, &task); + msg_free(result); if (ret < 0) goto dispatcher_destroy; } diff --git a/src/worker.h b/src/worker.h index 736c8b4..bf603df 100644 --- a/src/worker.h +++ b/src/worker.h @@ -15,9 +15,9 @@ struct settings { struct worker; -int worker_create(struct worker **, const struct settings *); +int worker_create(struct worker **); void worker_destroy(struct worker *); -int worker_main(struct worker *, int argc, char *argv[]); +int worker_main(struct worker *, const struct settings *); #endif diff --git a/src/worker_main.c b/src/worker_main.c index c046ba0..b9e9b9b 100644 --- a/src/worker_main.c +++ b/src/worker_main.c @@ -70,11 +70,11 @@ int main(int argc, char *argv[]) if (ret < 0) return ret; - ret = worker_create(&worker, &settings); + ret = worker_create(&worker); if (ret < 0) return ret; - ret = worker_main(worker, argc - optind, argv + optind); + ret = worker_main(worker, &settings); if (ret < 0) goto destroy_worker; diff --git a/src/worker_queue.c b/src/worker_queue.c new file mode 100644 index 0000000..3e207e3 --- /dev/null +++ b/src/worker_queue.c @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2023 Egor Tensin <Egor.Tensin@gmail.com> + * This file is part of the "cimple" project. + * For details, see https://github.com/egor-tensin/cimple. + * Distributed under the MIT License. + */ + +#include "worker_queue.h" +#include "log.h" + +#include <pthread.h> +#include <stdlib.h> +#include <sys/queue.h> + +struct worker { + pthread_t thread; + int fd; + STAILQ_ENTRY(worker) entries; +}; + +int worker_create(struct worker **_entry, int fd) +{ + struct worker *entry = malloc(sizeof(struct worker)); + if (!entry) { + log_errno("malloc"); + return -1; + } + + entry->thread = pthread_self(); + entry->fd = fd; + + *_entry = entry; + return 0; +} + +void worker_destroy(struct worker *entry) +{ + log("Waiting for worker %d thread to exit\n", entry->fd); + pthread_errno_if(pthread_join(entry->thread, NULL), "pthread_join"); + free(entry); +} + +int worker_get_fd(const struct worker *entry) +{ + return entry->fd; +} + +void worker_queue_create(struct worker_queue *queue) +{ + STAILQ_INIT(queue); +} + +void worker_queue_destroy(struct worker_queue *queue) +{ + struct worker *entry1 = STAILQ_FIRST(queue); + while (entry1) { + struct worker *entry2 = STAILQ_NEXT(entry1, entries); + worker_destroy(entry1); + entry1 = entry2; + } + STAILQ_INIT(queue); +} + +int worker_queue_is_empty(const struct worker_queue *queue) +{ + return STAILQ_EMPTY(queue); +} + +void worker_queue_add_first(struct worker_queue *queue, struct worker *entry) +{ + STAILQ_INSERT_HEAD(queue, entry, entries); +} + +void worker_queue_add_last(struct worker_queue *queue, struct worker *entry) +{ + STAILQ_INSERT_HEAD(queue, entry, entries); +} + +struct worker *worker_queue_remove_first(struct worker_queue *queue) +{ + struct worker *entry = STAILQ_FIRST(queue); + STAILQ_REMOVE_HEAD(queue, entries); + return entry; +} diff --git a/src/worker_queue.h b/src/worker_queue.h new file mode 100644 index 0000000..826cf65 --- /dev/null +++ b/src/worker_queue.h @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2023 Egor Tensin <Egor.Tensin@gmail.com> + * This file is part of the "cimple" project. + * For details, see https://github.com/egor-tensin/cimple. + * Distributed under the MIT License. + */ + +#ifndef __WORKER_QUEUE_H__ +#define __WORKER_QUEUE_H__ + +#include <sys/queue.h> + +struct worker; + +int worker_create(struct worker **, int fd); +void worker_destroy(struct worker *); + +int worker_get_fd(const struct worker *); + +STAILQ_HEAD(worker_queue, worker); + +void worker_queue_create(struct worker_queue *); +void worker_queue_destroy(struct worker_queue *); + +int worker_queue_is_empty(const struct worker_queue *); + +void worker_queue_add_first(struct worker_queue *, struct worker *); +void worker_queue_add_last(struct worker_queue *, struct worker *); + +struct worker *worker_queue_remove_first(struct worker_queue *); + +#endif |