aboutsummaryrefslogtreecommitdiffstatshomepage
diff options
context:
space:
mode:
-rw-r--r--src/CMakeLists.txt4
-rw-r--r--src/command.c65
-rw-r--r--src/command.h11
-rw-r--r--src/const.h1
-rw-r--r--src/run_queue.c47
-rw-r--r--src/run_queue.h21
-rw-r--r--src/server.c430
-rw-r--r--src/tcp_server.c28
-rw-r--r--src/worker.c116
-rw-r--r--src/worker.h4
-rw-r--r--src/worker_main.c4
-rw-r--r--src/worker_queue.c84
-rw-r--r--src/worker_queue.h32
13 files changed, 521 insertions, 326 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index f8b2a7c..053ca9e 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -48,7 +48,8 @@ add_my_executable(server server_main.c server.c
sqlite.c
storage.c
storage_sqlite.c
- tcp_server.c)
+ tcp_server.c
+ worker_queue.c)
target_link_libraries(server PRIVATE pthread sqlite3)
target_include_directories(server PRIVATE "${CMAKE_CURRENT_BINARY_DIR}")
@@ -67,5 +68,6 @@ add_my_executable(worker worker_main.c worker.c
msg.c
net.c
process.c
+ run_queue.c
signal.c)
target_link_libraries(worker PRIVATE git2 pthread)
diff --git a/src/command.c b/src/command.c
index 3c75e69..581b319 100644
--- a/src/command.c
+++ b/src/command.c
@@ -103,17 +103,10 @@ void cmd_dispatcher_destroy(struct cmd_dispatcher *dispatcher)
free(dispatcher);
}
-int cmd_dispatcher_handle_msg(const struct cmd_dispatcher *dispatcher, int conn_fd,
- const struct msg *request)
+int cmd_dispatcher_handle(const struct cmd_dispatcher *dispatcher, const struct msg *command,
+ struct msg **result)
{
- struct msg *response = NULL;
- int ret = 0;
-
- size_t numof_words = msg_get_length(request);
- if (numof_words == 0)
- goto unknown;
-
- const char *actual_cmd = msg_get_first_word(request);
+ const char *actual_cmd = msg_get_first_word(command);
for (size_t i = 0; i < dispatcher->numof_cmds; ++i) {
struct cmd_desc *cmd = &dispatcher->cmds[i];
@@ -121,35 +114,55 @@ int cmd_dispatcher_handle_msg(const struct cmd_dispatcher *dispatcher, int conn_
if (strcmp(cmd->name, actual_cmd))
continue;
- ret = cmd->handler(conn_fd, request, &response, dispatcher->ctx);
- goto exit;
+ return cmd->handler(command, result, dispatcher->ctx);
}
-unknown:
log_err("Received an unknown command\n");
- ret = -1;
- msg_dump(request);
- goto exit;
-
-exit:
- if (ret < 0 && !response)
- msg_error(&response);
- if (response)
- return msg_send(conn_fd, response);
- return ret;
+ msg_dump(command);
+ return -1;
}
int cmd_dispatcher_handle_conn(int conn_fd, void *_dispatcher)
{
struct cmd_dispatcher *dispatcher = (struct cmd_dispatcher *)_dispatcher;
- struct msg *request = NULL;
+ struct msg *request = NULL, *response = NULL;
int ret = 0;
+ struct cmd_conn_ctx *new_ctx = malloc(sizeof(struct cmd_conn_ctx));
+ if (!new_ctx) {
+ log_errno("malloc");
+ return -1;
+ }
+
+ new_ctx->fd = conn_fd;
+ new_ctx->arg = dispatcher->ctx;
+
ret = msg_recv(conn_fd, &request);
if (ret < 0)
- return ret;
+ goto free_ctx;
+
+ void *old_ctx = dispatcher->ctx;
+ dispatcher->ctx = new_ctx;
+ ret = cmd_dispatcher_handle(dispatcher, request, &response);
+ dispatcher->ctx = old_ctx;
+
+ if (ret < 0)
+ goto free_response;
+
+ if (response) {
+ ret = msg_send(conn_fd, response);
+ if (ret < 0)
+ goto free_response;
+ }
+
+free_response:
+ if (response)
+ msg_free(response);
- ret = cmd_dispatcher_handle_msg(dispatcher, conn_fd, request);
msg_free(request);
+
+free_ctx:
+ free(new_ctx);
+
return ret;
}
diff --git a/src/command.h b/src/command.h
index 0ab44c1..90facbb 100644
--- a/src/command.h
+++ b/src/command.h
@@ -12,8 +12,7 @@
#include <stddef.h>
-typedef int (*cmd_handler)(int conn_fd, const struct msg *request, struct msg **response,
- void *ctx);
+typedef int (*cmd_handler)(const struct msg *request, struct msg **response, void *ctx);
struct cmd_desc {
char *name;
@@ -26,7 +25,13 @@ int cmd_dispatcher_create(struct cmd_dispatcher **, struct cmd_desc *, size_t nu
void *ctx);
void cmd_dispatcher_destroy(struct cmd_dispatcher *);
-int cmd_dispatcher_handle_msg(const struct cmd_dispatcher *, int conn_fd, const struct msg *);
+int cmd_dispatcher_handle(const struct cmd_dispatcher *, const struct msg *command,
+ struct msg **response);
+
+struct cmd_conn_ctx {
+ int fd;
+ void *arg;
+};
/* This is supposed to be used as an argument to tcp_server_accept. */
int cmd_dispatcher_handle_conn(int conn_fd, void *dispatcher);
diff --git a/src/const.h b/src/const.h
index f53ef7e..2e7054b 100644
--- a/src/const.h
+++ b/src/const.h
@@ -16,5 +16,6 @@
#define CMD_RUN "run"
#define CMD_NEW_WORKER "new-worker"
+#define CMD_COMPLETE "complete"
#endif
diff --git a/src/run_queue.c b/src/run_queue.c
index 8b5052b..0455c92 100644
--- a/src/run_queue.c
+++ b/src/run_queue.c
@@ -7,20 +7,21 @@
#include "run_queue.h"
#include "log.h"
+#include "msg.h"
#include <stdlib.h>
#include <string.h>
#include <sys/queue.h>
-struct run_queue_entry {
+struct run {
char *url;
char *rev;
- STAILQ_ENTRY(run_queue_entry) entries;
+ STAILQ_ENTRY(run) entries;
};
-int run_queue_entry_create(struct run_queue_entry **_entry, const char *_url, const char *_rev)
+int run_create(struct run **_entry, const char *_url, const char *_rev)
{
- struct run_queue_entry *entry = malloc(sizeof(struct run_queue_entry));
+ struct run *entry = malloc(sizeof(struct run));
if (!entry) {
log_errno("malloc");
goto fail;
@@ -54,19 +55,33 @@ fail:
return -1;
}
-void run_queue_entry_destroy(struct run_queue_entry *entry)
+int run_from_msg(struct run **run, const struct msg *msg)
+{
+ size_t msg_len = msg_get_length(msg);
+
+ if (msg_len != 3) {
+ log_err("Invalid number of arguments for a message: %zu\n", msg_len);
+ msg_dump(msg);
+ return -1;
+ }
+
+ const char **words = msg_get_words(msg);
+ return run_create(run, words[1], words[2]);
+}
+
+void run_destroy(struct run *entry)
{
free(entry->rev);
free(entry->url);
free(entry);
}
-const char *run_queue_entry_get_url(const struct run_queue_entry *entry)
+const char *run_get_url(const struct run *entry)
{
return entry->url;
}
-const char *run_queue_entry_get_rev(const struct run_queue_entry *entry)
+const char *run_get_rev(const struct run *entry)
{
return entry->rev;
}
@@ -78,10 +93,10 @@ void run_queue_create(struct run_queue *queue)
void run_queue_destroy(struct run_queue *queue)
{
- struct run_queue_entry *entry1 = STAILQ_FIRST(queue);
+ struct run *entry1 = STAILQ_FIRST(queue);
while (entry1) {
- struct run_queue_entry *entry2 = STAILQ_NEXT(entry1, entries);
- run_queue_entry_destroy(entry1);
+ struct run *entry2 = STAILQ_NEXT(entry1, entries);
+ run_destroy(entry1);
entry1 = entry2;
}
STAILQ_INIT(queue);
@@ -92,19 +107,19 @@ int run_queue_is_empty(const struct run_queue *queue)
return STAILQ_EMPTY(queue);
}
-void run_queue_add_last(struct run_queue *queue, struct run_queue_entry *entry)
+void run_queue_add_first(struct run_queue *queue, struct run *entry)
{
- STAILQ_INSERT_TAIL(queue, entry, entries);
+ STAILQ_INSERT_HEAD(queue, entry, entries);
}
-void run_queue_add_first(struct run_queue *queue, struct run_queue_entry *entry)
+void run_queue_add_last(struct run_queue *queue, struct run *entry)
{
- STAILQ_INSERT_HEAD(queue, entry, entries);
+ STAILQ_INSERT_TAIL(queue, entry, entries);
}
-struct run_queue_entry *run_queue_remove_first(struct run_queue *queue)
+struct run *run_queue_remove_first(struct run_queue *queue)
{
- struct run_queue_entry *entry = STAILQ_FIRST(queue);
+ struct run *entry = STAILQ_FIRST(queue);
STAILQ_REMOVE_HEAD(queue, entries);
return entry;
}
diff --git a/src/run_queue.h b/src/run_queue.h
index 629a8e0..eca071e 100644
--- a/src/run_queue.h
+++ b/src/run_queue.h
@@ -8,26 +8,29 @@
#ifndef __RUN_QUEUE_H__
#define __RUN_QUEUE_H__
+#include "msg.h"
+
#include <sys/queue.h>
-struct run_queue_entry;
+struct run;
-int run_queue_entry_create(struct run_queue_entry **, const char *url, const char *rev);
-void run_queue_entry_destroy(struct run_queue_entry *);
+int run_create(struct run **, const char *url, const char *rev);
+int run_from_msg(struct run **, const struct msg *);
+void run_destroy(struct run *);
-const char *run_queue_entry_get_url(const struct run_queue_entry *);
-const char *run_queue_entry_get_rev(const struct run_queue_entry *);
+const char *run_get_url(const struct run *);
+const char *run_get_rev(const struct run *);
-STAILQ_HEAD(run_queue, run_queue_entry);
+STAILQ_HEAD(run_queue, run);
void run_queue_create(struct run_queue *);
void run_queue_destroy(struct run_queue *);
int run_queue_is_empty(const struct run_queue *);
-void run_queue_add_first(struct run_queue *, struct run_queue_entry *);
-void run_queue_add_last(struct run_queue *, struct run_queue_entry *);
+void run_queue_add_first(struct run_queue *, struct run *);
+void run_queue_add_last(struct run_queue *, struct run *);
-struct run_queue_entry *run_queue_remove_first(struct run_queue *);
+struct run *run_queue_remove_first(struct run_queue *);
#endif
diff --git a/src/server.c b/src/server.c
index 2e5fc18..bf6a967 100644
--- a/src/server.c
+++ b/src/server.c
@@ -16,6 +16,7 @@
#include "storage.h"
#include "storage_sqlite.h"
#include "tcp_server.h"
+#include "worker_queue.h"
#include <pthread.h>
#include <stdlib.h>
@@ -26,311 +27,373 @@ struct server {
int stopping;
+ struct worker_queue worker_queue;
+ struct run_queue run_queue;
+
struct storage storage;
- struct tcp_server *tcp_server;
+ pthread_t main_thread;
- struct run_queue run_queue;
+ struct tcp_server *tcp_server;
};
-int server_create(struct server **_server, const struct settings *settings)
+static int server_lock(struct server *server)
{
- struct storage_settings storage_settings;
- int ret = 0;
-
- struct server *server = malloc(sizeof(struct server));
- if (!server) {
- log_errno("malloc");
- return -1;
- }
-
- ret = pthread_mutex_init(&server->server_mtx, NULL);
+ int ret = pthread_mutex_lock(&server->server_mtx);
if (ret) {
- pthread_errno(ret, "pthread_mutex_init");
- goto free;
+ pthread_errno(ret, "pthread_mutex_lock");
+ return ret;
}
+ return ret;
+}
- ret = pthread_cond_init(&server->server_cv, NULL);
+static void server_unlock(struct server *server)
+{
+ pthread_errno_if(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock");
+}
+
+static int server_wait(struct server *server)
+{
+ int ret = pthread_cond_wait(&server->server_cv, &server->server_mtx);
if (ret) {
- pthread_errno(ret, "pthread_cond_init");
- goto destroy_mtx;
+ pthread_errno(ret, "pthread_cond_wait");
+ return ret;
}
+ return ret;
+}
- server->stopping = 0;
+static void server_notify(struct server *server)
+{
+ pthread_errno_if(pthread_cond_signal(&server->server_cv), "pthread_cond_signal");
+}
- ret = storage_settings_create_sqlite(&storage_settings, settings->sqlite_path);
- if (ret < 0)
- goto destroy_cv;
+static int server_set_stopping(struct server *server)
+{
+ int ret = 0;
- ret = storage_create(&server->storage, &storage_settings);
- storage_settings_destroy(&storage_settings);
+ ret = server_lock(server);
if (ret < 0)
- goto destroy_cv;
+ return ret;
- ret = tcp_server_create(&server->tcp_server, settings->port);
+ server->stopping = 1;
+
+ server_notify(server);
+ server_unlock(server);
+ return ret;
+}
+
+static int server_has_workers(const struct server *server)
+{
+ return !worker_queue_is_empty(&server->worker_queue);
+}
+
+static int server_enqueue_worker(struct server *server, struct worker *worker)
+{
+ int ret = 0;
+
+ ret = server_lock(server);
if (ret < 0)
- goto destroy_storage;
+ return ret;
- run_queue_create(&server->run_queue);
+ worker_queue_add_last(&server->worker_queue, worker);
+ log("Added a new worker %d to the queue\n", worker_get_fd(worker));
- *_server = server;
+ server_notify(server);
+ server_unlock(server);
return ret;
+}
-destroy_storage:
- storage_destroy(&server->storage);
+static int server_has_runs(const struct server *server)
+{
+ return !run_queue_is_empty(&server->run_queue);
+}
-destroy_cv:
- pthread_errno_if(pthread_cond_destroy(&server->server_cv), "pthread_cond_destroy");
+static int server_enqueue_run(struct server *server, struct run *run)
+{
+ int ret = 0;
-destroy_mtx:
- pthread_errno_if(pthread_mutex_destroy(&server->server_mtx), "pthread_mutex_destroy");
+ ret = server_lock(server);
+ if (ret < 0)
+ return ret;
-free:
- free(server);
+ run_queue_add_last(&server->run_queue, run);
+ log("Added a new CI run for repository %s to the queue\n", run_get_url(run));
+ server_notify(server);
+ server_unlock(server);
return ret;
}
-void server_destroy(struct server *server)
+static int server_ready_for_action(const struct server *server)
{
- log("Shutting down\n");
-
- run_queue_destroy(&server->run_queue);
- tcp_server_destroy(server->tcp_server);
- storage_destroy(&server->storage);
- pthread_errno_if(pthread_cond_destroy(&server->server_cv), "pthread_cond_destroy");
- pthread_errno_if(pthread_mutex_destroy(&server->server_mtx), "pthread_mutex_destroy");
- free(server);
+ return server->stopping || (server_has_runs(server) && server_has_workers(server));
}
-static int server_has_runs(const struct server *server)
+static int server_wait_for_action(struct server *server)
{
- return !run_queue_is_empty(&server->run_queue);
+ int ret = 0;
+
+ while (!server_ready_for_action(server)) {
+ ret = server_wait(server);
+ if (ret < 0)
+ return ret;
+ }
+
+ return ret;
}
-static int worker_ci_run(int fd, const struct run_queue_entry *ci_run)
+static int server_assign_run(struct server *server)
{
- struct msg *request = NULL, *response = NULL;
int ret = 0;
- const char *argv[] = {CMD_RUN, run_queue_entry_get_url(ci_run),
- run_queue_entry_get_rev(ci_run), NULL};
+ struct run *run = run_queue_remove_first(&server->run_queue);
+ log("Removed a CI run for repository %s from the queue\n", run_get_url(run));
+
+ struct worker *worker = worker_queue_remove_first(&server->worker_queue);
+ log("Removed worker %d from the queue\n", worker_get_fd(worker));
+
+ const char *argv[] = {CMD_RUN, run_get_url(run), run_get_rev(run), NULL};
+
+ struct msg *request = NULL;
ret = msg_from_argv(&request, argv);
- if (ret < 0)
+ if (ret < 0) {
+ worker_queue_add_first(&server->worker_queue, worker);
+ run_queue_add_first(&server->run_queue, run);
return ret;
+ }
- ret = msg_communicate(fd, request, &response);
+ ret = msg_communicate(worker_get_fd(worker), request, NULL);
msg_free(request);
- if (ret < 0)
+ if (ret < 0) {
+ /* Failed to communicate with the worker, requeue the run
+ * and forget about the worker. */
+ worker_destroy(worker);
+ run_queue_add_first(&server->run_queue, run);
return ret;
-
- if (!msg_is_success(response)) {
- log_err("Failed to schedule a CI run: worker is busy?\n");
- msg_dump(response);
- goto free_response;
}
- /* TODO: handle the response. */
-
-free_response:
- msg_free(response);
-
+ /* Send the run to the worker, forget about both of them for a while. */
+ worker_destroy(worker);
+ run_destroy(run);
return ret;
}
-static int worker_dequeue_run(struct server *server, struct run_queue_entry **ci_run)
+static void *server_main_thread(void *_server)
{
+ struct server *server = (struct server *)_server;
int ret = 0;
- ret = pthread_mutex_lock(&server->server_mtx);
- if (ret) {
- pthread_errno(ret, "pthread_mutex_lock");
- return ret;
- }
+ ret = server_lock(server);
+ if (ret < 0)
+ goto exit;
- while (!server->stopping && !server_has_runs(server)) {
- ret = pthread_cond_wait(&server->server_cv, &server->server_mtx);
- if (ret) {
- pthread_errno(ret, "pthread_cond_wait");
+ while (1) {
+ ret = server_wait_for_action(server);
+ if (ret < 0)
goto unlock;
- }
- }
- if (server->stopping) {
- ret = -1;
- goto unlock;
- }
+ if (server->stopping)
+ goto unlock;
- *ci_run = run_queue_remove_first(&server->run_queue);
- log("Removed a CI run for repository %s from the queue\n",
- run_queue_entry_get_url(*ci_run));
- goto unlock;
+ ret = server_assign_run(server);
+ if (ret < 0)
+ goto unlock;
+ }
unlock:
- pthread_errno_if(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock");
+ server_unlock(server);
- return ret;
+exit:
+ return NULL;
}
-static int worker_requeue_run(struct server *server, struct run_queue_entry *ci_run)
+int server_create(struct server **_server, const struct settings *settings)
{
+ struct storage_settings storage_settings;
int ret = 0;
- ret = pthread_mutex_lock(&server->server_mtx);
- if (ret) {
- pthread_errno(ret, "pthread_mutex_lock");
+ ret = signal_install_global_handler();
+ if (ret < 0)
return ret;
+
+ struct server *server = malloc(sizeof(struct server));
+ if (!server) {
+ log_errno("malloc");
+ return -1;
}
- run_queue_add_first(&server->run_queue, ci_run);
- log("Requeued a CI run for repository %s\n", run_queue_entry_get_url(ci_run));
+ ret = pthread_mutex_init(&server->server_mtx, NULL);
+ if (ret) {
+ pthread_errno(ret, "pthread_mutex_init");
+ goto free;
+ }
- ret = pthread_cond_signal(&server->server_cv);
+ ret = pthread_cond_init(&server->server_cv, NULL);
if (ret) {
- pthread_errno(ret, "pthread_cond_signal");
- ret = 0;
- goto unlock;
+ pthread_errno(ret, "pthread_cond_init");
+ goto destroy_mtx;
}
-unlock:
- pthread_errno_if(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock");
+ server->stopping = 0;
- return ret;
-}
+ worker_queue_create(&server->worker_queue);
+ run_queue_create(&server->run_queue);
-static int worker_iteration(struct server *server, int fd)
-{
- struct run_queue_entry *ci_run = NULL;
- int ret = 0;
+ ret = storage_settings_create_sqlite(&storage_settings, settings->sqlite_path);
+ if (ret < 0)
+ goto destroy_run_queue;
- ret = worker_dequeue_run(server, &ci_run);
+ ret = storage_create(&server->storage, &storage_settings);
+ storage_settings_destroy(&storage_settings);
if (ret < 0)
- return ret;
+ goto destroy_run_queue;
- ret = worker_ci_run(fd, ci_run);
+ ret = tcp_server_create(&server->tcp_server, settings->port);
if (ret < 0)
- goto requeue_run;
+ goto destroy_storage;
+
+ ret = pthread_create(&server->main_thread, NULL, server_main_thread, server);
+ if (ret) {
+ pthread_errno(ret, "pthread_create");
+ goto destroy_tcp_server;
+ }
- run_queue_entry_destroy(ci_run);
+ *_server = server;
return ret;
-requeue_run:
- worker_requeue_run(server, ci_run);
+destroy_tcp_server:
+ tcp_server_destroy(server->tcp_server);
- return ret;
-}
+destroy_storage:
+ storage_destroy(&server->storage);
-static int worker_thread(struct server *server, int fd)
-{
- int ret = 0;
+destroy_run_queue:
+ run_queue_destroy(&server->run_queue);
- while (1) {
- ret = worker_iteration(server, fd);
- if (ret < 0)
- return ret;
- }
+ worker_queue_destroy(&server->worker_queue);
+
+ pthread_errno_if(pthread_cond_destroy(&server->server_cv), "pthread_cond_destroy");
+
+destroy_mtx:
+ pthread_errno_if(pthread_mutex_destroy(&server->server_mtx), "pthread_mutex_destroy");
+
+free:
+ free(server);
return ret;
}
-static int msg_new_worker_handler(int client_fd, UNUSED const struct msg *request,
- UNUSED struct msg **response, void *_server)
+void server_destroy(struct server *server)
{
- return worker_thread((struct server *)_server, client_fd);
+ log("Shutting down\n");
+
+ pthread_errno_if(pthread_join(server->main_thread, NULL), "pthread_join");
+ tcp_server_destroy(server->tcp_server);
+ storage_destroy(&server->storage);
+ run_queue_destroy(&server->run_queue);
+ worker_queue_destroy(&server->worker_queue);
+ pthread_errno_if(pthread_cond_destroy(&server->server_cv), "pthread_cond_destroy");
+ pthread_errno_if(pthread_mutex_destroy(&server->server_mtx), "pthread_mutex_destroy");
+ free(server);
}
-static int msg_ci_run_queue(struct server *server, const char *url, const char *rev)
+static int handle_cmd_new_worker(UNUSED const struct msg *request, UNUSED struct msg **response,
+ void *_ctx)
{
- struct run_queue_entry *entry = NULL;
+ struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx;
+ struct server *server = (struct server *)ctx->arg;
+ int client_fd = ctx->fd;
+
+ struct worker *worker = NULL;
int ret = 0;
- ret = pthread_mutex_lock(&server->server_mtx);
- if (ret) {
- pthread_errno(ret, "pthread_mutex_lock");
+ ret = worker_create(&worker, client_fd);
+ if (ret < 0)
return ret;
- }
- ret = run_queue_entry_create(&entry, url, rev);
+ ret = server_enqueue_worker(server, worker);
if (ret < 0)
- goto unlock;
+ goto destroy_worker;
- run_queue_add_last(&server->run_queue, entry);
- log("Added a new CI run for repository %s to the queue\n", url);
-
- ret = pthread_cond_signal(&server->server_cv);
- if (ret) {
- pthread_errno(ret, "pthread_cond_signal");
- ret = 0;
- goto unlock;
- }
+ return ret;
-unlock:
- pthread_errno_if(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock");
+destroy_worker:
+ worker_destroy(worker);
return ret;
}
-static int msg_ci_run_handler(UNUSED int client_fd, const struct msg *request,
- struct msg **response, void *_server)
+static int handle_cmd_run(const struct msg *request, struct msg **response, void *_ctx)
{
- struct server *server = (struct server *)_server;
+ struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx;
+ struct server *server = (struct server *)ctx->arg;
+ struct run *run = NULL;
+
int ret = 0;
- if (msg_get_length(request) != 3) {
- log_err("Invalid number of arguments for a message: %zu\n",
- msg_get_length(request));
- msg_dump(request);
- return -1;
- }
+ ret = run_from_msg(&run, request);
+ if (ret < 0)
+ return ret;
- const char **words = msg_get_words(request);
+ ret = msg_success(response);
+ if (ret < 0)
+ goto destroy_run;
- ret = msg_ci_run_queue(server, words[1], words[2]);
+ ret = server_enqueue_run(server, run);
if (ret < 0)
- return ret;
+ goto free_response;
- return msg_success(response);
-}
+ return ret;
-static struct cmd_desc commands[] = {
- {CMD_NEW_WORKER, msg_new_worker_handler},
- {CMD_RUN, msg_ci_run_handler},
-};
+free_response:
+ msg_free(*response);
-static int server_set_stopping(struct server *server)
+destroy_run:
+ run_destroy(run);
+
+ return ret;
+}
+
+static int handle_cmd_complete(UNUSED const struct msg *request, UNUSED struct msg **response,
+ void *_ctx)
{
+ struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx;
+ struct server *server = (struct server *)ctx->arg;
+ int client_fd = ctx->fd;
+
+ struct worker *worker = NULL;
int ret = 0;
- ret = pthread_mutex_lock(&server->server_mtx);
- if (ret) {
- pthread_errno(ret, "pthread_mutex_lock");
+ log("Received a \"run complete\" message from worker %d\n", client_fd);
+
+ ret = worker_create(&worker, client_fd);
+ if (ret < 0)
return ret;
- }
- server->stopping = 1;
+ ret = server_enqueue_worker(server, worker);
+ if (ret < 0)
+ goto destroy_worker;
- ret = pthread_cond_broadcast(&server->server_cv);
- if (ret) {
- pthread_errno(ret, "pthread_cond_signal");
- goto unlock;
- }
+ return ret;
-unlock:
- pthread_errno_if(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock");
+destroy_worker:
+ worker_destroy(worker);
- return ret;
+ return 0;
}
-int server_main(struct server *server)
+static struct cmd_desc commands[] = {
+ {CMD_NEW_WORKER, handle_cmd_new_worker},
+ {CMD_RUN, handle_cmd_run},
+ {CMD_COMPLETE, handle_cmd_complete},
+};
+
+static int server_listen_thread(struct server *server)
{
struct cmd_dispatcher *dispatcher = NULL;
int ret = 0;
- ret = signal_install_global_handler();
- if (ret < 0)
- return ret;
-
ret = cmd_dispatcher_create(&dispatcher, commands, sizeof(commands) / sizeof(commands[0]),
server);
if (ret < 0)
@@ -352,3 +415,8 @@ dispatcher_destroy:
return server_set_stopping(server);
}
+
+int server_main(struct server *server)
+{
+ return server_listen_thread(server);
+}
diff --git a/src/tcp_server.c b/src/tcp_server.c
index 25a74a7..d840414 100644
--- a/src/tcp_server.c
+++ b/src/tcp_server.c
@@ -62,19 +62,18 @@ static void *connection_thread(void *_ctx)
ret = signal_block_child();
if (ret < 0)
- goto close;
+ goto free_ctx;
ctx->handler(ctx->fd, ctx->arg);
-close:
- log_errno_if(close(ctx->fd), "close");
+free_ctx:
free(ctx);
+
return NULL;
}
int tcp_server_accept(const struct tcp_server *server, tcp_server_conn_handler handler, void *arg)
{
- pthread_attr_t child_attr;
sigset_t old_mask;
pthread_t child;
int ret = 0;
@@ -93,23 +92,11 @@ int tcp_server_accept(const struct tcp_server *server, tcp_server_conn_handler h
goto free_ctx;
ctx->fd = ret;
- ret = pthread_attr_init(&child_attr);
- if (ret) {
- pthread_errno(ret, "pthread_attr_init");
- goto close_conn;
- }
-
- ret = pthread_attr_setdetachstate(&child_attr, PTHREAD_CREATE_DETACHED);
- if (ret) {
- pthread_errno(ret, "pthread_attr_setdetachstate");
- goto destroy_attr;
- }
-
ret = signal_block_parent(&old_mask);
if (ret < 0)
- goto destroy_attr;
+ goto close_conn;
- ret = pthread_create(&child, &child_attr, connection_thread, ctx);
+ ret = pthread_create(&child, NULL, connection_thread, ctx);
if (ret) {
pthread_errno(ret, "pthread_create");
goto restore_mask;
@@ -117,16 +104,11 @@ int tcp_server_accept(const struct tcp_server *server, tcp_server_conn_handler h
signal_set(&old_mask, NULL);
- pthread_errno_if(pthread_attr_destroy(&child_attr), "pthread_attr_destroy");
-
return ret;
restore_mask:
signal_set(&old_mask, NULL);
-destroy_attr:
- pthread_errno_if(pthread_attr_destroy(&child_attr), "pthread_attr_destroy");
-
close_conn:
log_errno_if(close(ctx->fd), "close");
diff --git a/src/worker.c b/src/worker.c
index 2a258e5..5762549 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -13,27 +13,24 @@
#include "git.h"
#include "log.h"
#include "msg.h"
-#include "net.h"
#include "process.h"
+#include "run_queue.h"
#include "signal.h"
-#include <pthread.h>
#include <stdlib.h>
-#include <unistd.h>
struct worker {
- int fd;
-
- /* TODO: these are not used, but they should be! */
- pthread_mutex_t task_mtx;
- pthread_t task;
- int task_active;
+ int dummy;
};
-int worker_create(struct worker **_worker, const struct settings *settings)
+int worker_create(struct worker **_worker)
{
int ret = 0;
+ ret = signal_install_global_handler();
+ if (ret < 0)
+ return ret;
+
struct worker *worker = malloc(sizeof(struct worker));
if (!worker) {
log_errno("malloc");
@@ -44,17 +41,9 @@ int worker_create(struct worker **_worker, const struct settings *settings)
if (ret < 0)
goto free;
- ret = net_connect(settings->host, settings->port);
- if (ret < 0)
- goto git_shutdown;
- worker->fd = ret;
-
*_worker = worker;
return ret;
-git_shutdown:
- libgit_shutdown();
-
free:
free(worker);
@@ -65,14 +54,19 @@ void worker_destroy(struct worker *worker)
{
log("Shutting down\n");
- log_errno_if(close(worker->fd), "close");
libgit_shutdown();
free(worker);
}
-static int msg_send_new_worker(const struct worker *worker)
+static int worker_send_to_server(const struct settings *settings, const struct msg *request,
+ struct msg **response)
+{
+ return msg_connect_and_communicate(settings->host, settings->port, request, response);
+}
+
+static int worker_send_to_server_argv(const struct settings *settings, const char **argv,
+ struct msg **response)
{
- static const char *argv[] = {CMD_NEW_WORKER, NULL};
struct msg *msg = NULL;
int ret = 0;
@@ -80,84 +74,80 @@ static int msg_send_new_worker(const struct worker *worker)
if (ret < 0)
return ret;
- ret = msg_send(worker->fd, msg);
+ ret = worker_send_to_server(settings, msg, response);
msg_free(msg);
return ret;
}
-static int msg_ci_run_do(const char *url, const char *rev, struct proc_output *result)
+static int worker_send_new_worker(const struct settings *settings, struct msg **task)
{
- int ret = 0;
-
- ret = ci_run_git_repo(url, rev, result);
- if (ret < 0) {
- log_err("Run failed with an error\n");
- return ret;
- }
-
- proc_output_dump(result);
- return 0;
+ static const char *argv[] = {CMD_NEW_WORKER, NULL};
+ return worker_send_to_server_argv(settings, argv, task);
}
-static int msg_ci_run_handler(UNUSED int conn_fd, const struct msg *request, struct msg **response,
- UNUSED void *_worker)
+static int msg_run_handler(const struct msg *request, struct msg **response, UNUSED void *ctx)
{
+ struct run *run = NULL;
struct proc_output result;
int ret = 0;
- if (msg_get_length(request) != 3) {
- log_err("Invalid number of arguments for a message\n");
- msg_dump(request);
- return -1;
+ ret = run_from_msg(&run, request);
+ if (ret < 0)
+ return ret;
+
+ proc_output_init(&result);
+
+ ret = ci_run_git_repo(run_get_url(run), run_get_rev(run), &result);
+ if (ret < 0) {
+ log_err("Run failed with an error\n");
+ goto free_output;
}
- const char **words = msg_get_words(request);
+ proc_output_dump(&result);
- proc_output_init(&result);
- ret = msg_ci_run_do(words[1], words[2], &result);
+ static const char *argv[] = {CMD_COMPLETE, NULL};
+ ret = msg_from_argv(response, argv);
+ if (ret < 0)
+ goto free_output;
+
+free_output:
proc_output_free(&result);
- if (ret < 0)
- return ret;
+ run_destroy(run);
- return msg_success(response);
+ return ret;
}
static struct cmd_desc cmds[] = {
- {CMD_RUN, msg_ci_run_handler},
+ {CMD_RUN, msg_run_handler},
};
-int worker_main(struct worker *worker, UNUSED int argc, UNUSED char *argv[])
+int worker_main(UNUSED struct worker *worker, const struct settings *settings)
{
+ struct msg *task = NULL;
struct cmd_dispatcher *dispatcher = NULL;
int ret = 0;
- ret = signal_install_global_handler();
+ ret = cmd_dispatcher_create(&dispatcher, cmds, sizeof(cmds) / sizeof(cmds[0]), NULL);
if (ret < 0)
return ret;
- ret = msg_send_new_worker(worker);
- if (ret < 0)
- return ret;
+ log("Waiting for a new command\n");
- ret = cmd_dispatcher_create(&dispatcher, cmds, sizeof(cmds) / sizeof(cmds[0]), worker);
+ ret = worker_send_new_worker(settings, &task);
if (ret < 0)
- return ret;
+ goto dispatcher_destroy;
while (!global_stop_flag) {
- struct msg *request = NULL;
+ struct msg *result = NULL;
- log("Waiting for a new command\n");
-
- ret = msg_recv(worker->fd, &request);
- if (ret < 0) {
- if (errno == EINVAL && global_stop_flag)
- ret = 0;
+ ret = cmd_dispatcher_handle(dispatcher, task, &result);
+ msg_free(task);
+ if (ret < 0)
goto dispatcher_destroy;
- }
- ret = cmd_dispatcher_handle_msg(dispatcher, worker->fd, request);
- msg_free(request);
+ ret = worker_send_to_server(settings, result, &task);
+ msg_free(result);
if (ret < 0)
goto dispatcher_destroy;
}
diff --git a/src/worker.h b/src/worker.h
index 736c8b4..bf603df 100644
--- a/src/worker.h
+++ b/src/worker.h
@@ -15,9 +15,9 @@ struct settings {
struct worker;
-int worker_create(struct worker **, const struct settings *);
+int worker_create(struct worker **);
void worker_destroy(struct worker *);
-int worker_main(struct worker *, int argc, char *argv[]);
+int worker_main(struct worker *, const struct settings *);
#endif
diff --git a/src/worker_main.c b/src/worker_main.c
index c046ba0..b9e9b9b 100644
--- a/src/worker_main.c
+++ b/src/worker_main.c
@@ -70,11 +70,11 @@ int main(int argc, char *argv[])
if (ret < 0)
return ret;
- ret = worker_create(&worker, &settings);
+ ret = worker_create(&worker);
if (ret < 0)
return ret;
- ret = worker_main(worker, argc - optind, argv + optind);
+ ret = worker_main(worker, &settings);
if (ret < 0)
goto destroy_worker;
diff --git a/src/worker_queue.c b/src/worker_queue.c
new file mode 100644
index 0000000..3e207e3
--- /dev/null
+++ b/src/worker_queue.c
@@ -0,0 +1,84 @@
+/*
+ * Copyright (c) 2023 Egor Tensin <Egor.Tensin@gmail.com>
+ * This file is part of the "cimple" project.
+ * For details, see https://github.com/egor-tensin/cimple.
+ * Distributed under the MIT License.
+ */
+
+#include "worker_queue.h"
+#include "log.h"
+
+#include <pthread.h>
+#include <stdlib.h>
+#include <sys/queue.h>
+
+struct worker {
+ pthread_t thread;
+ int fd;
+ STAILQ_ENTRY(worker) entries;
+};
+
+int worker_create(struct worker **_entry, int fd)
+{
+ struct worker *entry = malloc(sizeof(struct worker));
+ if (!entry) {
+ log_errno("malloc");
+ return -1;
+ }
+
+ entry->thread = pthread_self();
+ entry->fd = fd;
+
+ *_entry = entry;
+ return 0;
+}
+
+void worker_destroy(struct worker *entry)
+{
+ log("Waiting for worker %d thread to exit\n", entry->fd);
+ pthread_errno_if(pthread_join(entry->thread, NULL), "pthread_join");
+ free(entry);
+}
+
+int worker_get_fd(const struct worker *entry)
+{
+ return entry->fd;
+}
+
+void worker_queue_create(struct worker_queue *queue)
+{
+ STAILQ_INIT(queue);
+}
+
+void worker_queue_destroy(struct worker_queue *queue)
+{
+ struct worker *entry1 = STAILQ_FIRST(queue);
+ while (entry1) {
+ struct worker *entry2 = STAILQ_NEXT(entry1, entries);
+ worker_destroy(entry1);
+ entry1 = entry2;
+ }
+ STAILQ_INIT(queue);
+}
+
+int worker_queue_is_empty(const struct worker_queue *queue)
+{
+ return STAILQ_EMPTY(queue);
+}
+
+void worker_queue_add_first(struct worker_queue *queue, struct worker *entry)
+{
+ STAILQ_INSERT_HEAD(queue, entry, entries);
+}
+
+void worker_queue_add_last(struct worker_queue *queue, struct worker *entry)
+{
+ STAILQ_INSERT_HEAD(queue, entry, entries);
+}
+
+struct worker *worker_queue_remove_first(struct worker_queue *queue)
+{
+ struct worker *entry = STAILQ_FIRST(queue);
+ STAILQ_REMOVE_HEAD(queue, entries);
+ return entry;
+}
diff --git a/src/worker_queue.h b/src/worker_queue.h
new file mode 100644
index 0000000..826cf65
--- /dev/null
+++ b/src/worker_queue.h
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2023 Egor Tensin <Egor.Tensin@gmail.com>
+ * This file is part of the "cimple" project.
+ * For details, see https://github.com/egor-tensin/cimple.
+ * Distributed under the MIT License.
+ */
+
+#ifndef __WORKER_QUEUE_H__
+#define __WORKER_QUEUE_H__
+
+#include <sys/queue.h>
+
+struct worker;
+
+int worker_create(struct worker **, int fd);
+void worker_destroy(struct worker *);
+
+int worker_get_fd(const struct worker *);
+
+STAILQ_HEAD(worker_queue, worker);
+
+void worker_queue_create(struct worker_queue *);
+void worker_queue_destroy(struct worker_queue *);
+
+int worker_queue_is_empty(const struct worker_queue *);
+
+void worker_queue_add_first(struct worker_queue *, struct worker *);
+void worker_queue_add_last(struct worker_queue *, struct worker *);
+
+struct worker *worker_queue_remove_first(struct worker_queue *);
+
+#endif