diff options
-rw-r--r-- | src/CMakeLists.txt | 1 | ||||
-rw-r--r-- | src/server.c | 113 | ||||
-rw-r--r-- | src/server.h | 7 | ||||
-rw-r--r-- | src/tcp_server.c | 9 |
4 files changed, 94 insertions, 36 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index d0bacc2..c75bb49 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -16,6 +16,7 @@ add_executable(server server_main.c server.c ci_queue.c msg.c net.c + signal.c tcp_server.c worker_queue.c) diff --git a/src/server.c b/src/server.c index 509d144..b5c7463 100644 --- a/src/server.c +++ b/src/server.c @@ -2,6 +2,7 @@ #include "ci_queue.h" #include "log.h" #include "msg.h" +#include "signal.h" #include "tcp_server.h" #include "worker_queue.h" @@ -64,28 +65,31 @@ static void *server_scheduler(void *_server) struct server *server = (struct server *)_server; int ret = 0; - ret = pthread_mutex_lock(&server->scheduler_mtx); + ret = pthread_mutex_lock(&server->server_mtx); if (ret) { pthread_print_errno(ret, "pthread_mutex_lock"); goto exit; } while (1) { - while (!server_has_runs_and_workers(server)) { - ret = pthread_cond_wait(&server->scheduler_cv, &server->scheduler_mtx); + while (!server->stopping && !server_has_runs_and_workers(server)) { + ret = pthread_cond_wait(&server->server_cv, &server->server_mtx); if (ret) { pthread_print_errno(ret, "pthread_cond_wait"); goto unlock; } } + if (server->stopping) + goto unlock; + ret = server_scheduler_iteration(server); if (ret < 0) goto unlock; } unlock: - pthread_check(pthread_mutex_unlock(&server->scheduler_mtx), "pthread_mutex_unlock"); + pthread_check(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock"); exit: return NULL; @@ -93,60 +97,81 @@ exit: int server_create(struct server *server, const struct settings *settings) { + pthread_attr_t scheduler_attr; int ret = 0; + ret = pthread_mutex_init(&server->server_mtx, NULL); + if (ret) { + pthread_print_errno(ret, "pthread_mutex_init"); + goto fail; + } + + ret = pthread_cond_init(&server->server_cv, NULL); + if (ret) { + pthread_print_errno(ret, "pthread_cond_init"); + goto destroy_mtx; + } + + server->stopping = 0; + ret = tcp_server_create(&server->tcp_server, settings->port); if (ret < 0) - return ret; + goto destroy_cv; worker_queue_create(&server->worker_queue); ci_queue_create(&server->ci_queue); - ret = pthread_mutex_init(&server->scheduler_mtx, NULL); + ret = pthread_attr_init(&scheduler_attr); if (ret) { - pthread_print_errno(ret, "pthread_mutex_init"); - goto destroy_tcp_server; + pthread_print_errno(ret, "pthread_attr_init"); + goto destroy_ci_queue; } - ret = pthread_cond_init(&server->scheduler_cv, NULL); - if (ret) { - pthread_print_errno(ret, "pthread_cond_init"); - goto destroy_scheduler_mtx; - } + ret = signal_set_thread_attr(&scheduler_attr); + if (ret) + goto destroy_attr; - ret = pthread_create(&server->scheduler, NULL, server_scheduler, server); + ret = pthread_create(&server->scheduler, &scheduler_attr, server_scheduler, server); if (ret) { pthread_print_errno(ret, "pthread_create"); - goto destroy_scheduler_cv; + goto destroy_attr; } - return ret; + pthread_check(pthread_attr_destroy(&scheduler_attr), "pthread_attr_destroy"); -destroy_scheduler_cv: - pthread_check(pthread_cond_destroy(&server->scheduler_cv), "pthread_cond_destroy"); + return ret; -destroy_scheduler_mtx: - pthread_check(pthread_mutex_destroy(&server->scheduler_mtx), "pthread_mutex_destroy"); +destroy_attr: + pthread_check(pthread_attr_destroy(&scheduler_attr), "pthread_attr_destroy"); +destroy_ci_queue: ci_queue_destroy(&server->ci_queue); worker_queue_destroy(&server->worker_queue); -destroy_tcp_server: tcp_server_destroy(&server->tcp_server); +destroy_cv: + pthread_check(pthread_cond_destroy(&server->server_cv), "pthread_cond_destroy"); + +destroy_mtx: + pthread_check(pthread_mutex_destroy(&server->server_mtx), "pthread_mutex_destroy"); + +fail: return ret; } void server_destroy(struct server *server) { + print_log("Shutting down\n"); + pthread_check(pthread_join(server->scheduler, NULL), "pthread_join"); - pthread_check(pthread_cond_destroy(&server->scheduler_cv), "pthread_cond_destroy"); - pthread_check(pthread_mutex_destroy(&server->scheduler_mtx), "pthread_mutex_destroy"); ci_queue_destroy(&server->ci_queue); worker_queue_destroy(&server->worker_queue); tcp_server_destroy(&server->tcp_server); + pthread_check(pthread_cond_destroy(&server->server_cv), "pthread_cond_destroy"); + pthread_check(pthread_mutex_destroy(&server->server_mtx), "pthread_mutex_destroy"); } struct msg_context { @@ -193,15 +218,41 @@ static int server_conn_handler(int fd, void *server) return msg_recv_and_handle(fd, server_msg_handler, &ctx); } +static int server_set_stopping(struct server *server) +{ + int ret = 0; + + ret = pthread_mutex_lock(&server->server_mtx); + if (ret) { + pthread_print_errno(ret, "pthread_mutex_lock"); + return ret; + } + + server->stopping = 1; + + ret = pthread_cond_signal(&server->server_cv); + if (ret) { + pthread_print_errno(ret, "pthread_cond_signal"); + goto unlock; + } + +unlock: + pthread_check(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock"); + + return ret; +} + int server_main(struct server *server) { int ret = 0; - while (1) { + while (!global_stop_flag) { ret = tcp_server_accept(&server->tcp_server, server_conn_handler, server); if (ret < 0) - return ret; + break; } + + return server_set_stopping(server); } int server_new_worker(struct server *server, int fd) @@ -211,7 +262,7 @@ int server_new_worker(struct server *server, int fd) print_log("Registering a new worker\n"); - ret = pthread_mutex_lock(&server->scheduler_mtx); + ret = pthread_mutex_lock(&server->server_mtx); if (ret) { pthread_print_errno(ret, "pthread_mutex_lock"); return ret; @@ -223,14 +274,14 @@ int server_new_worker(struct server *server, int fd) worker_queue_push(&server->worker_queue, entry); - ret = pthread_cond_signal(&server->scheduler_cv); + ret = pthread_cond_signal(&server->server_cv); if (ret) { pthread_print_errno(ret, "pthread_cond_signal"); goto unlock; } unlock: - pthread_check(pthread_mutex_unlock(&server->scheduler_mtx), "pthread_mutex_unlock"); + pthread_check(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock"); return ret; } @@ -242,7 +293,7 @@ int server_ci_run(struct server *server, const char *url, const char *rev) print_log("Scheduling a new CI run for repository %s\n", url); - ret = pthread_mutex_lock(&server->scheduler_mtx); + ret = pthread_mutex_lock(&server->server_mtx); if (ret) { pthread_print_errno(ret, "pthread_mutex_lock"); return ret; @@ -254,14 +305,14 @@ int server_ci_run(struct server *server, const char *url, const char *rev) ci_queue_push(&server->ci_queue, entry); - ret = pthread_cond_signal(&server->scheduler_cv); + ret = pthread_cond_signal(&server->server_cv); if (ret) { pthread_print_errno(ret, "pthread_cond_signal"); goto unlock; } unlock: - pthread_check(pthread_mutex_unlock(&server->scheduler_mtx), "pthread_mutex_unlock"); + pthread_check(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock"); return ret; } diff --git a/src/server.h b/src/server.h index 37fdb13..9107b4c 100644 --- a/src/server.h +++ b/src/server.h @@ -12,13 +12,16 @@ struct settings { }; struct server { + pthread_mutex_t server_mtx; + pthread_cond_t server_cv; + + int stopping; + struct tcp_server tcp_server; struct worker_queue worker_queue; struct ci_queue ci_queue; - pthread_mutex_t scheduler_mtx; - pthread_cond_t scheduler_cv; pthread_t scheduler; }; diff --git a/src/tcp_server.c b/src/tcp_server.c index 633b7f2..47317e7 100644 --- a/src/tcp_server.c +++ b/src/tcp_server.c @@ -1,6 +1,7 @@ #include "tcp_server.h" #include "log.h" #include "net.h" +#include "signal.h" #include <pthread.h> #include <stdlib.h> @@ -43,10 +44,8 @@ int tcp_server_accept(const struct tcp_server *server, tcp_server_conn_handler h int conn_fd, ret = 0; ret = net_accept(server->fd); - if (ret < 0) { - print_errno("accept"); + if (ret < 0) return ret; - } conn_fd = ret; ctx = malloc(sizeof(*ctx)); @@ -69,6 +68,10 @@ int tcp_server_accept(const struct tcp_server *server, tcp_server_conn_handler h goto destroy_attr; } + ret = signal_set_thread_attr(&child_attr); + if (ret < 0) + goto destroy_attr; + ret = pthread_create(&child, &child_attr, connection_thread, ctx); if (ret) { pthread_print_errno(ret, "pthread_create"); |