aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/src/server.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/server.c')
-rw-r--r--src/server.c430
1 files changed, 249 insertions, 181 deletions
diff --git a/src/server.c b/src/server.c
index 2e5fc18..bf6a967 100644
--- a/src/server.c
+++ b/src/server.c
@@ -16,6 +16,7 @@
#include "storage.h"
#include "storage_sqlite.h"
#include "tcp_server.h"
+#include "worker_queue.h"
#include <pthread.h>
#include <stdlib.h>
@@ -26,311 +27,373 @@ struct server {
int stopping;
+ struct worker_queue worker_queue;
+ struct run_queue run_queue;
+
struct storage storage;
- struct tcp_server *tcp_server;
+ pthread_t main_thread;
- struct run_queue run_queue;
+ struct tcp_server *tcp_server;
};
-int server_create(struct server **_server, const struct settings *settings)
+static int server_lock(struct server *server)
{
- struct storage_settings storage_settings;
- int ret = 0;
-
- struct server *server = malloc(sizeof(struct server));
- if (!server) {
- log_errno("malloc");
- return -1;
- }
-
- ret = pthread_mutex_init(&server->server_mtx, NULL);
+ int ret = pthread_mutex_lock(&server->server_mtx);
if (ret) {
- pthread_errno(ret, "pthread_mutex_init");
- goto free;
+ pthread_errno(ret, "pthread_mutex_lock");
+ return ret;
}
+ return ret;
+}
- ret = pthread_cond_init(&server->server_cv, NULL);
+static void server_unlock(struct server *server)
+{
+ pthread_errno_if(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock");
+}
+
+static int server_wait(struct server *server)
+{
+ int ret = pthread_cond_wait(&server->server_cv, &server->server_mtx);
if (ret) {
- pthread_errno(ret, "pthread_cond_init");
- goto destroy_mtx;
+ pthread_errno(ret, "pthread_cond_wait");
+ return ret;
}
+ return ret;
+}
- server->stopping = 0;
+static void server_notify(struct server *server)
+{
+ pthread_errno_if(pthread_cond_signal(&server->server_cv), "pthread_cond_signal");
+}
- ret = storage_settings_create_sqlite(&storage_settings, settings->sqlite_path);
- if (ret < 0)
- goto destroy_cv;
+static int server_set_stopping(struct server *server)
+{
+ int ret = 0;
- ret = storage_create(&server->storage, &storage_settings);
- storage_settings_destroy(&storage_settings);
+ ret = server_lock(server);
if (ret < 0)
- goto destroy_cv;
+ return ret;
- ret = tcp_server_create(&server->tcp_server, settings->port);
+ server->stopping = 1;
+
+ server_notify(server);
+ server_unlock(server);
+ return ret;
+}
+
+static int server_has_workers(const struct server *server)
+{
+ return !worker_queue_is_empty(&server->worker_queue);
+}
+
+static int server_enqueue_worker(struct server *server, struct worker *worker)
+{
+ int ret = 0;
+
+ ret = server_lock(server);
if (ret < 0)
- goto destroy_storage;
+ return ret;
- run_queue_create(&server->run_queue);
+ worker_queue_add_last(&server->worker_queue, worker);
+ log("Added a new worker %d to the queue\n", worker_get_fd(worker));
- *_server = server;
+ server_notify(server);
+ server_unlock(server);
return ret;
+}
-destroy_storage:
- storage_destroy(&server->storage);
+static int server_has_runs(const struct server *server)
+{
+ return !run_queue_is_empty(&server->run_queue);
+}
-destroy_cv:
- pthread_errno_if(pthread_cond_destroy(&server->server_cv), "pthread_cond_destroy");
+static int server_enqueue_run(struct server *server, struct run *run)
+{
+ int ret = 0;
-destroy_mtx:
- pthread_errno_if(pthread_mutex_destroy(&server->server_mtx), "pthread_mutex_destroy");
+ ret = server_lock(server);
+ if (ret < 0)
+ return ret;
-free:
- free(server);
+ run_queue_add_last(&server->run_queue, run);
+ log("Added a new CI run for repository %s to the queue\n", run_get_url(run));
+ server_notify(server);
+ server_unlock(server);
return ret;
}
-void server_destroy(struct server *server)
+static int server_ready_for_action(const struct server *server)
{
- log("Shutting down\n");
-
- run_queue_destroy(&server->run_queue);
- tcp_server_destroy(server->tcp_server);
- storage_destroy(&server->storage);
- pthread_errno_if(pthread_cond_destroy(&server->server_cv), "pthread_cond_destroy");
- pthread_errno_if(pthread_mutex_destroy(&server->server_mtx), "pthread_mutex_destroy");
- free(server);
+ return server->stopping || (server_has_runs(server) && server_has_workers(server));
}
-static int server_has_runs(const struct server *server)
+static int server_wait_for_action(struct server *server)
{
- return !run_queue_is_empty(&server->run_queue);
+ int ret = 0;
+
+ while (!server_ready_for_action(server)) {
+ ret = server_wait(server);
+ if (ret < 0)
+ return ret;
+ }
+
+ return ret;
}
-static int worker_ci_run(int fd, const struct run_queue_entry *ci_run)
+static int server_assign_run(struct server *server)
{
- struct msg *request = NULL, *response = NULL;
int ret = 0;
- const char *argv[] = {CMD_RUN, run_queue_entry_get_url(ci_run),
- run_queue_entry_get_rev(ci_run), NULL};
+ struct run *run = run_queue_remove_first(&server->run_queue);
+ log("Removed a CI run for repository %s from the queue\n", run_get_url(run));
+
+ struct worker *worker = worker_queue_remove_first(&server->worker_queue);
+ log("Removed worker %d from the queue\n", worker_get_fd(worker));
+
+ const char *argv[] = {CMD_RUN, run_get_url(run), run_get_rev(run), NULL};
+
+ struct msg *request = NULL;
ret = msg_from_argv(&request, argv);
- if (ret < 0)
+ if (ret < 0) {
+ worker_queue_add_first(&server->worker_queue, worker);
+ run_queue_add_first(&server->run_queue, run);
return ret;
+ }
- ret = msg_communicate(fd, request, &response);
+ ret = msg_communicate(worker_get_fd(worker), request, NULL);
msg_free(request);
- if (ret < 0)
+ if (ret < 0) {
+ /* Failed to communicate with the worker, requeue the run
+ * and forget about the worker. */
+ worker_destroy(worker);
+ run_queue_add_first(&server->run_queue, run);
return ret;
-
- if (!msg_is_success(response)) {
- log_err("Failed to schedule a CI run: worker is busy?\n");
- msg_dump(response);
- goto free_response;
}
- /* TODO: handle the response. */
-
-free_response:
- msg_free(response);
-
+ /* Send the run to the worker, forget about both of them for a while. */
+ worker_destroy(worker);
+ run_destroy(run);
return ret;
}
-static int worker_dequeue_run(struct server *server, struct run_queue_entry **ci_run)
+static void *server_main_thread(void *_server)
{
+ struct server *server = (struct server *)_server;
int ret = 0;
- ret = pthread_mutex_lock(&server->server_mtx);
- if (ret) {
- pthread_errno(ret, "pthread_mutex_lock");
- return ret;
- }
+ ret = server_lock(server);
+ if (ret < 0)
+ goto exit;
- while (!server->stopping && !server_has_runs(server)) {
- ret = pthread_cond_wait(&server->server_cv, &server->server_mtx);
- if (ret) {
- pthread_errno(ret, "pthread_cond_wait");
+ while (1) {
+ ret = server_wait_for_action(server);
+ if (ret < 0)
goto unlock;
- }
- }
- if (server->stopping) {
- ret = -1;
- goto unlock;
- }
+ if (server->stopping)
+ goto unlock;
- *ci_run = run_queue_remove_first(&server->run_queue);
- log("Removed a CI run for repository %s from the queue\n",
- run_queue_entry_get_url(*ci_run));
- goto unlock;
+ ret = server_assign_run(server);
+ if (ret < 0)
+ goto unlock;
+ }
unlock:
- pthread_errno_if(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock");
+ server_unlock(server);
- return ret;
+exit:
+ return NULL;
}
-static int worker_requeue_run(struct server *server, struct run_queue_entry *ci_run)
+int server_create(struct server **_server, const struct settings *settings)
{
+ struct storage_settings storage_settings;
int ret = 0;
- ret = pthread_mutex_lock(&server->server_mtx);
- if (ret) {
- pthread_errno(ret, "pthread_mutex_lock");
+ ret = signal_install_global_handler();
+ if (ret < 0)
return ret;
+
+ struct server *server = malloc(sizeof(struct server));
+ if (!server) {
+ log_errno("malloc");
+ return -1;
}
- run_queue_add_first(&server->run_queue, ci_run);
- log("Requeued a CI run for repository %s\n", run_queue_entry_get_url(ci_run));
+ ret = pthread_mutex_init(&server->server_mtx, NULL);
+ if (ret) {
+ pthread_errno(ret, "pthread_mutex_init");
+ goto free;
+ }
- ret = pthread_cond_signal(&server->server_cv);
+ ret = pthread_cond_init(&server->server_cv, NULL);
if (ret) {
- pthread_errno(ret, "pthread_cond_signal");
- ret = 0;
- goto unlock;
+ pthread_errno(ret, "pthread_cond_init");
+ goto destroy_mtx;
}
-unlock:
- pthread_errno_if(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock");
+ server->stopping = 0;
- return ret;
-}
+ worker_queue_create(&server->worker_queue);
+ run_queue_create(&server->run_queue);
-static int worker_iteration(struct server *server, int fd)
-{
- struct run_queue_entry *ci_run = NULL;
- int ret = 0;
+ ret = storage_settings_create_sqlite(&storage_settings, settings->sqlite_path);
+ if (ret < 0)
+ goto destroy_run_queue;
- ret = worker_dequeue_run(server, &ci_run);
+ ret = storage_create(&server->storage, &storage_settings);
+ storage_settings_destroy(&storage_settings);
if (ret < 0)
- return ret;
+ goto destroy_run_queue;
- ret = worker_ci_run(fd, ci_run);
+ ret = tcp_server_create(&server->tcp_server, settings->port);
if (ret < 0)
- goto requeue_run;
+ goto destroy_storage;
+
+ ret = pthread_create(&server->main_thread, NULL, server_main_thread, server);
+ if (ret) {
+ pthread_errno(ret, "pthread_create");
+ goto destroy_tcp_server;
+ }
- run_queue_entry_destroy(ci_run);
+ *_server = server;
return ret;
-requeue_run:
- worker_requeue_run(server, ci_run);
+destroy_tcp_server:
+ tcp_server_destroy(server->tcp_server);
- return ret;
-}
+destroy_storage:
+ storage_destroy(&server->storage);
-static int worker_thread(struct server *server, int fd)
-{
- int ret = 0;
+destroy_run_queue:
+ run_queue_destroy(&server->run_queue);
- while (1) {
- ret = worker_iteration(server, fd);
- if (ret < 0)
- return ret;
- }
+ worker_queue_destroy(&server->worker_queue);
+
+ pthread_errno_if(pthread_cond_destroy(&server->server_cv), "pthread_cond_destroy");
+
+destroy_mtx:
+ pthread_errno_if(pthread_mutex_destroy(&server->server_mtx), "pthread_mutex_destroy");
+
+free:
+ free(server);
return ret;
}
-static int msg_new_worker_handler(int client_fd, UNUSED const struct msg *request,
- UNUSED struct msg **response, void *_server)
+void server_destroy(struct server *server)
{
- return worker_thread((struct server *)_server, client_fd);
+ log("Shutting down\n");
+
+ pthread_errno_if(pthread_join(server->main_thread, NULL), "pthread_join");
+ tcp_server_destroy(server->tcp_server);
+ storage_destroy(&server->storage);
+ run_queue_destroy(&server->run_queue);
+ worker_queue_destroy(&server->worker_queue);
+ pthread_errno_if(pthread_cond_destroy(&server->server_cv), "pthread_cond_destroy");
+ pthread_errno_if(pthread_mutex_destroy(&server->server_mtx), "pthread_mutex_destroy");
+ free(server);
}
-static int msg_ci_run_queue(struct server *server, const char *url, const char *rev)
+static int handle_cmd_new_worker(UNUSED const struct msg *request, UNUSED struct msg **response,
+ void *_ctx)
{
- struct run_queue_entry *entry = NULL;
+ struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx;
+ struct server *server = (struct server *)ctx->arg;
+ int client_fd = ctx->fd;
+
+ struct worker *worker = NULL;
int ret = 0;
- ret = pthread_mutex_lock(&server->server_mtx);
- if (ret) {
- pthread_errno(ret, "pthread_mutex_lock");
+ ret = worker_create(&worker, client_fd);
+ if (ret < 0)
return ret;
- }
- ret = run_queue_entry_create(&entry, url, rev);
+ ret = server_enqueue_worker(server, worker);
if (ret < 0)
- goto unlock;
+ goto destroy_worker;
- run_queue_add_last(&server->run_queue, entry);
- log("Added a new CI run for repository %s to the queue\n", url);
-
- ret = pthread_cond_signal(&server->server_cv);
- if (ret) {
- pthread_errno(ret, "pthread_cond_signal");
- ret = 0;
- goto unlock;
- }
+ return ret;
-unlock:
- pthread_errno_if(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock");
+destroy_worker:
+ worker_destroy(worker);
return ret;
}
-static int msg_ci_run_handler(UNUSED int client_fd, const struct msg *request,
- struct msg **response, void *_server)
+static int handle_cmd_run(const struct msg *request, struct msg **response, void *_ctx)
{
- struct server *server = (struct server *)_server;
+ struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx;
+ struct server *server = (struct server *)ctx->arg;
+ struct run *run = NULL;
+
int ret = 0;
- if (msg_get_length(request) != 3) {
- log_err("Invalid number of arguments for a message: %zu\n",
- msg_get_length(request));
- msg_dump(request);
- return -1;
- }
+ ret = run_from_msg(&run, request);
+ if (ret < 0)
+ return ret;
- const char **words = msg_get_words(request);
+ ret = msg_success(response);
+ if (ret < 0)
+ goto destroy_run;
- ret = msg_ci_run_queue(server, words[1], words[2]);
+ ret = server_enqueue_run(server, run);
if (ret < 0)
- return ret;
+ goto free_response;
- return msg_success(response);
-}
+ return ret;
-static struct cmd_desc commands[] = {
- {CMD_NEW_WORKER, msg_new_worker_handler},
- {CMD_RUN, msg_ci_run_handler},
-};
+free_response:
+ msg_free(*response);
-static int server_set_stopping(struct server *server)
+destroy_run:
+ run_destroy(run);
+
+ return ret;
+}
+
+static int handle_cmd_complete(UNUSED const struct msg *request, UNUSED struct msg **response,
+ void *_ctx)
{
+ struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx;
+ struct server *server = (struct server *)ctx->arg;
+ int client_fd = ctx->fd;
+
+ struct worker *worker = NULL;
int ret = 0;
- ret = pthread_mutex_lock(&server->server_mtx);
- if (ret) {
- pthread_errno(ret, "pthread_mutex_lock");
+ log("Received a \"run complete\" message from worker %d\n", client_fd);
+
+ ret = worker_create(&worker, client_fd);
+ if (ret < 0)
return ret;
- }
- server->stopping = 1;
+ ret = server_enqueue_worker(server, worker);
+ if (ret < 0)
+ goto destroy_worker;
- ret = pthread_cond_broadcast(&server->server_cv);
- if (ret) {
- pthread_errno(ret, "pthread_cond_signal");
- goto unlock;
- }
+ return ret;
-unlock:
- pthread_errno_if(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock");
+destroy_worker:
+ worker_destroy(worker);
- return ret;
+ return 0;
}
-int server_main(struct server *server)
+static struct cmd_desc commands[] = {
+ {CMD_NEW_WORKER, handle_cmd_new_worker},
+ {CMD_RUN, handle_cmd_run},
+ {CMD_COMPLETE, handle_cmd_complete},
+};
+
+static int server_listen_thread(struct server *server)
{
struct cmd_dispatcher *dispatcher = NULL;
int ret = 0;
- ret = signal_install_global_handler();
- if (ret < 0)
- return ret;
-
ret = cmd_dispatcher_create(&dispatcher, commands, sizeof(commands) / sizeof(commands[0]),
server);
if (ret < 0)
@@ -352,3 +415,8 @@ dispatcher_destroy:
return server_set_stopping(server);
}
+
+int server_main(struct server *server)
+{
+ return server_listen_thread(server);
+}