aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/src
diff options
context:
space:
mode:
authorEgor Tensin <Egor.Tensin@gmail.com>2022-08-25 16:58:38 +0200
committerEgor Tensin <Egor.Tensin@gmail.com>2022-08-26 04:05:02 +0200
commit532b3ae9b5cd8609237e04db768cc1f750d8631d (patch)
treef65253a6ce9970d1d93e6bb6c65758d6fa98373a /src
parentcmake: ignore unused parameters for now (diff)
downloadcimple-532b3ae9b5cd8609237e04db768cc1f750d8631d.tar.gz
cimple-532b3ae9b5cd8609237e04db768cc1f750d8631d.zip
add some more code
This adds a basic "worker" program. You can now do something like ./server & ./worker & ./client ci_run URL REV and the server should pass a message to worker, after which it should clone the repository at URL, checkout REV, and try to run the CI script. It's extremely unfinished: I need to sort out the graceful shutdown, how the server manages workers, etc.
Diffstat (limited to '')
-rw-r--r--src/CMakeLists.txt21
-rw-r--r--src/ci.c101
-rw-r--r--src/ci.h28
-rw-r--r--src/ci_queue.c92
-rw-r--r--src/ci_queue.h27
-rw-r--r--src/file.c61
-rw-r--r--src/file.h10
-rw-r--r--src/git.c109
-rw-r--r--src/git.h19
-rw-r--r--src/process.c46
-rw-r--r--src/process.h6
-rw-r--r--src/server.c249
-rw-r--r--src/server.h18
-rw-r--r--src/worker.c241
-rw-r--r--src/worker.h24
-rw-r--r--src/worker_main.c86
-rw-r--r--src/worker_queue.c79
-rw-r--r--src/worker_queue.h26
18 files changed, 1230 insertions, 13 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 1cd508c..c2333a8 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -12,7 +12,24 @@ add_compile_definitions(_GNU_SOURCE)
add_compile_definitions(VERSION="${PROJECT_VERSION}")
-add_executable(server server_main.c server.c msg.c net.c tcp_server.c)
-add_executable(client client_main.c client.c msg.c net.c)
+add_executable(server server_main.c server.c
+ ci_queue.c
+ msg.c
+ net.c
+ tcp_server.c
+ worker_queue.c)
+
+add_executable(client client_main.c client.c
+ msg.c
+ net.c)
+
+add_executable(worker worker_main.c worker.c
+ ci.c
+ file.c
+ git.c
+ msg.c
+ net.c
+ process.c)
target_link_libraries(server PRIVATE pthread)
+target_link_libraries(worker PRIVATE git2 pthread)
diff --git a/src/ci.c b/src/ci.c
new file mode 100644
index 0000000..1932008
--- /dev/null
+++ b/src/ci.c
@@ -0,0 +1,101 @@
+#include "ci.h"
+#include "file.h"
+#include "git.h"
+#include "log.h"
+#include "process.h"
+
+#include <git2.h>
+#include <stddef.h>
+#include <stdlib.h>
+
+/* clang-format off */
+static const char *ci_scripts[] = {
+ "./.ci.sh",
+ "./.ci",
+ "./ci.sh",
+ "./ci",
+ NULL,
+};
+/* clang-format on */
+
+static run_result ci_run_script(const char *script, int *ec)
+{
+ const char *args[] = {script, NULL};
+ int ret = 0;
+
+ ret = proc_spawn(args, ec);
+ if (ret < 0)
+ return RUN_ERROR;
+ if (*ec)
+ return RUN_FAILURE;
+ return RUN_SUCCESS;
+}
+
+run_result ci_run(int *ec)
+{
+ for (const char **script = ci_scripts; *script; ++script) {
+ if (!file_exists(*script))
+ continue;
+ print_log("Going to run: %s\n", *script);
+ return ci_run_script(*script, ec);
+ }
+
+ print_log("Couldn't find any CI scripts to run\n");
+ return RUN_NO;
+}
+
+static void ci_cleanup_git_repo(git_repository *repo)
+{
+ rm_rf(git_repository_workdir(repo));
+ libgit_repository_free(repo);
+}
+
+static int ci_prepare_git_repo(git_repository **repo, const char *url, const char *rev)
+{
+ int ret = 0;
+
+ ret = libgit_clone_to_tmp(repo, url);
+ if (ret < 0)
+ return ret;
+
+ ret = libgit_checkout(*repo, rev);
+ if (ret < 0)
+ goto free_repo;
+
+ return ret;
+
+free_repo:
+ ci_cleanup_git_repo(*repo);
+
+ return ret;
+}
+
+run_result ci_run_git_repo(const char *url, const char *rev, int *ec)
+{
+ char *oldpwd;
+ git_repository *repo;
+ run_result result = RUN_ERROR;
+ int ret = 0;
+
+ ret = ci_prepare_git_repo(&repo, url, rev);
+ if (ret < 0)
+ goto exit;
+
+ ret = my_chdir(git_repository_workdir(repo), &oldpwd);
+ if (ret < 0)
+ goto free_repo;
+
+ result = ci_run(ec);
+ if (result < 0)
+ goto oldpwd;
+
+oldpwd:
+ my_chdir(oldpwd, NULL);
+ free(oldpwd);
+
+free_repo:
+ ci_cleanup_git_repo(repo);
+
+exit:
+ return result;
+}
diff --git a/src/ci.h b/src/ci.h
new file mode 100644
index 0000000..4267b89
--- /dev/null
+++ b/src/ci.h
@@ -0,0 +1,28 @@
+#ifndef __CI_H__
+#define __CI_H__
+
+typedef enum {
+ RUN_ERROR = -1,
+ RUN_SUCCESS,
+ RUN_FAILURE,
+ RUN_NO,
+} run_result;
+
+run_result ci_run(int *ec);
+
+/*
+ * This is a high-level function. It's basically equivalent to the following
+ * sequence in bash:
+ *
+ * dir="$( mktemp -d )"
+ * git clone --no-checkout "$url" "$dir"
+ * pushd "$dir"
+ * git checkout "$rev"
+ * ./ci
+ * popd
+ * rm -rf "$dir"
+ *
+ */
+run_result ci_run_git_repo(const char *url, const char *rev, int *ec);
+
+#endif
diff --git a/src/ci_queue.c b/src/ci_queue.c
new file mode 100644
index 0000000..d87d3c0
--- /dev/null
+++ b/src/ci_queue.c
@@ -0,0 +1,92 @@
+#include "ci_queue.h"
+#include "log.h"
+
+#include <stdlib.h>
+#include <string.h>
+#include <sys/queue.h>
+
+int ci_queue_entry_create(struct ci_queue_entry **entry, const char *_url, const char *_rev)
+{
+ char *url, *rev;
+
+ url = strdup(_url);
+ if (!url) {
+ print_errno("strdup");
+ goto fail;
+ }
+
+ rev = strdup(_rev);
+ if (!rev) {
+ print_errno("strdup");
+ goto free_url;
+ }
+
+ *entry = malloc(sizeof(struct ci_queue_entry));
+ if (!*entry) {
+ print_errno("malloc");
+ goto free_rev;
+ }
+ (*entry)->url = url;
+ (*entry)->rev = rev;
+
+ return 0;
+
+free_rev:
+ free(rev);
+
+free_url:
+ free(url);
+
+fail:
+ return -1;
+}
+
+void ci_queue_entry_destroy(struct ci_queue_entry *entry)
+{
+ free(entry->rev);
+ free(entry->url);
+ free(entry);
+}
+
+void ci_queue_create(struct ci_queue *queue)
+{
+ STAILQ_INIT(queue);
+}
+
+void ci_queue_destroy(struct ci_queue *queue)
+{
+ struct ci_queue_entry *entry1, *entry2;
+
+ entry1 = STAILQ_FIRST(queue);
+ while (entry1) {
+ entry2 = STAILQ_NEXT(entry1, entries);
+ ci_queue_entry_destroy(entry1);
+ entry1 = entry2;
+ }
+ STAILQ_INIT(queue);
+}
+
+int ci_queue_is_empty(const struct ci_queue *queue)
+{
+ return STAILQ_EMPTY(queue);
+}
+
+void ci_queue_push(struct ci_queue *queue, struct ci_queue_entry *entry)
+{
+ STAILQ_INSERT_TAIL(queue, entry, entries);
+}
+
+void ci_queue_push_head(struct ci_queue *queue, struct ci_queue_entry *entry)
+{
+ STAILQ_INSERT_HEAD(queue, entry, entries);
+}
+
+struct ci_queue_entry *ci_queue_pop(struct ci_queue *queue)
+{
+ struct ci_queue_entry *entry;
+
+ entry = STAILQ_FIRST(queue);
+ STAILQ_REMOVE_HEAD(queue, entries);
+
+ return entry;
+}
diff --git a/src/ci_queue.h b/src/ci_queue.h
new file mode 100644
index 0000000..d036376
--- /dev/null
+++ b/src/ci_queue.h
@@ -0,0 +1,27 @@
+#ifndef __CI_QUEUE_H__
+#define __CI_QUEUE_H__
+
+#include <sys/queue.h>
+
+struct ci_queue_entry {
+ char *url;
+ char *rev;
+ STAILQ_ENTRY(ci_queue_entry) entries;
+};
+
+int ci_queue_entry_create(struct ci_queue_entry **, const char *url, const char *rev);
+void ci_queue_entry_destroy(struct ci_queue_entry *);
+
+STAILQ_HEAD(ci_queue, ci_queue_entry);
+
+void ci_queue_create(struct ci_queue *);
+void ci_queue_destroy(struct ci_queue *);
+
+int ci_queue_is_empty(const struct ci_queue *);
+
+void ci_queue_push(struct ci_queue *, struct ci_queue_entry *);
+void ci_queue_push_head(struct ci_queue *, struct ci_queue_entry *);
+
+struct ci_queue_entry *ci_queue_pop(struct ci_queue *);
+
+#endif
diff --git a/src/file.c b/src/file.c
new file mode 100644
index 0000000..ede640b
--- /dev/null
+++ b/src/file.c
@@ -0,0 +1,61 @@
+#include "file.h"
+#include "log.h"
+
+#include <ftw.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <sys/stat.h>
+#include <unistd.h>
+
+static int unlink_cb(const char *fpath, const struct stat *, int, struct FTW *)
+{
+ int ret = 0;
+
+ ret = remove(fpath);
+ if (ret < 0) {
+ print_errno("remove");
+ return ret;
+ }
+
+ return ret;
+}
+
+int rm_rf(const char *dir)
+{
+ print_log("Recursively removing directory: %s\n", dir);
+ return nftw(dir, unlink_cb, 64, FTW_DEPTH | FTW_PHYS);
+}
+
+int my_chdir(const char *dir, char **old)
+{
+ int ret = 0;
+
+ if (old) {
+ *old = get_current_dir_name();
+ if (!*old) {
+ print_errno("get_current_dir_name");
+ return -1;
+ }
+ }
+
+ ret = chdir(dir);
+ if (ret < 0) {
+ print_errno("chdir");
+ goto free_old;
+ }
+
+ return ret;
+
+free_old:
+ if (old)
+ free(*old);
+
+ return ret;
+}
+
+int file_exists(const char *path)
+{
+ struct stat stat;
+ int ret = lstat(path, &stat);
+ return !ret && S_ISREG(stat.st_mode);
+}
diff --git a/src/file.h b/src/file.h
new file mode 100644
index 0000000..b89fca1
--- /dev/null
+++ b/src/file.h
@@ -0,0 +1,10 @@
+#ifndef __FILE_H__
+#define __FILE_H__
+
+int rm_rf(const char *dir);
+
+int my_chdir(const char *dir, char **old);
+
+int file_exists(const char *path);
+
+#endif
diff --git a/src/git.c b/src/git.c
new file mode 100644
index 0000000..3572d67
--- /dev/null
+++ b/src/git.c
@@ -0,0 +1,109 @@
+#include "git.h"
+#include "file.h"
+#include "log.h"
+
+#include <git2.h>
+
+#define git_print_error(fn) \
+ { \
+ const git_error *error = git_error_last(); \
+ const char *msg = error && error->message ? error->message : "???"; \
+ print_error("%s: %s\n", fn, msg); \
+ }
+
+int libgit_init()
+{
+ int ret = 0;
+
+ ret = git_libgit2_init();
+ if (ret < 0) {
+ git_print_error("git_libgit2_init");
+ return ret;
+ }
+
+ return 0;
+}
+
+void libgit_shutdown()
+{
+ git_libgit2_shutdown();
+}
+
+int libgit_clone(git_repository **repo, const char *url, const char *dir)
+{
+ git_clone_options opts;
+ int ret = 0;
+
+ print_log("Cloning git repository from %s to %s\n", url, dir);
+
+ ret = git_clone_options_init(&opts, GIT_CLONE_OPTIONS_VERSION);
+ if (ret < 0) {
+ git_print_error("git_clone_options_init");
+ return ret;
+ }
+ opts.checkout_opts.checkout_strategy = GIT_CHECKOUT_NONE;
+
+ ret = git_clone(repo, url, dir, &opts);
+ if (ret < 0) {
+ git_print_error("git_clone");
+ return ret;
+ }
+
+ return 0;
+}
+
+int libgit_clone_to_tmp(git_repository **repo, const char *url)
+{
+ char dir[] = "/tmp/git.XXXXXX";
+
+ if (!mkdtemp(dir)) {
+ print_errno("mkdtemp");
+ return -1;
+ }
+
+ return libgit_clone(repo, url, dir);
+}
+
+void libgit_repository_free(git_repository *repo)
+{
+ git_repository_free(repo);
+}
+
+int libgit_checkout(git_repository *repo, const char *rev)
+{
+ git_checkout_options opts;
+ git_object *obj;
+ int ret = 0;
+
+ print_log("Checking out revision %s\n", rev);
+
+ ret = git_revparse_single(&obj, repo, rev);
+ if (ret < 0) {
+ git_print_error("git_revparse_single");
+ return ret;
+ }
+
+ ret = git_checkout_options_init(&opts, GIT_CHECKOUT_OPTIONS_VERSION);
+ if (ret < 0) {
+ git_print_error("git_checkout_options_init");
+ goto free_obj;
+ }
+ opts.checkout_strategy = GIT_CHECKOUT_FORCE;
+
+ ret = git_checkout_tree(repo, obj, &opts);
+ if (ret < 0) {
+ git_print_error("git_checkout_tree");
+ goto free_obj;
+ }
+
+ ret = git_repository_set_head_detached(repo, git_object_id(obj));
+ if (ret < 0) {
+ git_print_error("git_repository_set_head_detached");
+ goto free_obj;
+ }
+
+free_obj:
+ git_object_free(obj);
+
+ return ret;
+}
diff --git a/src/git.h b/src/git.h
new file mode 100644
index 0000000..83b9c49
--- /dev/null
+++ b/src/git.h
@@ -0,0 +1,19 @@
+#ifndef __GIT_H__
+#define __GIT_H__
+
+#include <git2.h>
+
+int libgit_init();
+void libgit_shutdown();
+
+/* These never check out any files (so that we don't have to do 2 checkouts). */
+int libgit_clone(git_repository **, const char *url, const char *dir);
+int libgit_clone_to_tmp(git_repository **, const char *url);
+
+/* Free a cloned repository. */
+void libgit_repository_free(git_repository *);
+
+/* I tried to make this an equivalent of `git checkout`. */
+int libgit_checkout(git_repository *, const char *rev);
+
+#endif
diff --git a/src/process.c b/src/process.c
new file mode 100644
index 0000000..9d95037
--- /dev/null
+++ b/src/process.c
@@ -0,0 +1,46 @@
+#include "process.h"
+#include "log.h"
+
+#include <stdlib.h>
+#include <sys/wait.h>
+#include <unistd.h>
+
+static int wait_for_child(pid_t pid, int *ec)
+{
+ int status;
+
+ pid_t ret = waitpid(pid, &status, 0);
+ if (ret < 0) {
+ print_errno("waitpid");
+ return ret;
+ }
+
+ if (WIFEXITED(status))
+ *ec = WEXITSTATUS(status);
+ else
+ *ec = -1;
+
+ return 0;
+}
+
+int proc_spawn(const char *args[], int *ec)
+{
+ int ret = 0;
+
+ pid_t child_pid = fork();
+ if (child_pid < 0) {
+ print_errno("fork");
+ return child_pid;
+ }
+
+ if (child_pid)
+ return wait_for_child(child_pid, ec);
+
+ ret = execv(args[0], (char *const *)args);
+ if (ret < 0) {
+ print_errno("execv");
+ exit(ret);
+ }
+
+ return 0;
+}
diff --git a/src/process.h b/src/process.h
new file mode 100644
index 0000000..da0c421
--- /dev/null
+++ b/src/process.h
@@ -0,0 +1,6 @@
+#ifndef __PROCESS_H__
+#define __PROCESS_H__
+
+int proc_spawn(const char *args[], int *ec);
+
+#endif
diff --git a/src/server.c b/src/server.c
index 3e9f080..e285a9e 100644
--- a/src/server.c
+++ b/src/server.c
@@ -1,11 +1,97 @@
#include "server.h"
+#include "ci_queue.h"
+#include "log.h"
#include "msg.h"
-#include "net.h"
#include "tcp_server.h"
+#include "worker_queue.h"
-#include <stdio.h>
+#include <pthread.h>
+#include <string.h>
#include <unistd.h>
+static int server_has_runs_and_workers(const struct server *server)
+{
+ return !ci_queue_is_empty(&server->ci_queue) &&
+ !worker_queue_is_empty(&server->worker_queue);
+}
+
+static int server_scheduler_iteration(struct server *server)
+{
+ struct worker_queue_entry *worker;
+ struct ci_queue_entry *ci_run;
+ struct msg msg;
+ int response, ret = 0;
+
+ worker = worker_queue_pop(&server->worker_queue);
+ ci_run = ci_queue_pop(&server->ci_queue);
+
+ char *argv[] = {"ci_run", ci_run->url, ci_run->rev, NULL};
+
+ ret = msg_from_argv(&msg, argv);
+ if (ret < 0)
+ goto requeue_ci_run;
+
+ ret = msg_send_and_wait(worker->fd, &msg, &response);
+ if (ret < 0)
+ goto free_msg;
+
+ if (response < 0) {
+ print_error("Failed to schedule a CI run\n");
+ }
+
+ msg_free(&msg);
+
+ ci_queue_entry_destroy(ci_run);
+
+ /* FIXME: Don't mark worker as free! */
+ worker_queue_push_head(&server->worker_queue, worker);
+
+ return 0;
+
+free_msg:
+ msg_free(&msg);
+
+requeue_ci_run:
+ ci_queue_push_head(&server->ci_queue, ci_run);
+
+ worker_queue_push_head(&server->worker_queue, worker);
+
+ return ret;
+}
+
+static void *server_scheduler(void *_server)
+{
+ struct server *server = (struct server *)_server;
+ int ret = 0;
+
+ ret = pthread_mutex_lock(&server->scheduler_mtx);
+ if (ret < 0) {
+ print_errno("pthread_mutex_lock");
+ goto exit;
+ }
+
+ while (1) {
+ while (!server_has_runs_and_workers(server)) {
+ ret = pthread_cond_wait(&server->scheduler_cv, &server->scheduler_mtx);
+ if (ret < 0) {
+ print_errno("pthread_cond_wait");
+ goto unlock;
+ }
+ }
+
+ ret = server_scheduler_iteration(server);
+ if (ret < 0)
+ goto unlock;
+ }
+
+unlock:
+ if (pthread_mutex_unlock(&server->scheduler_mtx))
+ print_errno("pthread_mutex_unlock");
+
+exit:
+ return NULL;
+}
+
int server_create(struct server *server, const struct settings *settings)
{
int ret = 0;
@@ -14,31 +100,176 @@ int server_create(struct server *server, const struct settings *settings)
if (ret < 0)
return ret;
- return 0;
+ worker_queue_create(&server->worker_queue);
+
+ ci_queue_create(&server->ci_queue);
+
+ ret = pthread_mutex_init(&server->scheduler_mtx, NULL);
+ if (ret < 0) {
+ print_errno("pthread_mutex_init");
+ goto destroy_tcp_server;
+ }
+
+ ret = pthread_cond_init(&server->scheduler_cv, NULL);
+ if (ret < 0) {
+ print_errno("pthread_cond_init");
+ goto destroy_scheduler_mtx;
+ }
+
+ ret = pthread_create(&server->scheduler, NULL, server_scheduler, server);
+ if (ret < 0) {
+ print_errno("pthread_create");
+ goto destroy_scheduler_cv;
+ }
+
+ return ret;
+
+destroy_scheduler_cv:
+ if (pthread_cond_destroy(&server->scheduler_cv))
+ print_errno("pthread_cond_destroy");
+
+destroy_scheduler_mtx:
+ if (pthread_mutex_destroy(&server->scheduler_mtx))
+ print_errno("pthread_mutex_destroy");
+
+ ci_queue_destroy(&server->ci_queue);
+
+ worker_queue_destroy(&server->worker_queue);
+
+destroy_tcp_server:
+ tcp_server_destroy(&server->tcp_server);
+
+ return ret;
}
-void server_destroy(const struct server *server)
+void server_destroy(struct server *server)
{
+ if (pthread_join(server->scheduler, NULL))
+ print_errno("pthread_join");
+ if (pthread_cond_destroy(&server->scheduler_cv))
+ print_errno("pthread_cond_destroy");
+ if (pthread_mutex_destroy(&server->scheduler_mtx))
+ print_errno("pthread_mutex_destroy");
+ ci_queue_destroy(&server->ci_queue);
+ worker_queue_destroy(&server->worker_queue);
tcp_server_destroy(&server->tcp_server);
}
-static int server_msg_handler(const struct msg *msg, void *)
+struct msg_context {
+ struct server *server;
+ int client_fd;
+};
+
+static int msg_new_worker(const struct msg *, void *_ctx)
+{
+ struct msg_context *ctx = (struct msg_context *)_ctx;
+ return server_new_worker(ctx->server, ctx->client_fd);
+}
+
+static int msg_ci_run(const struct msg *msg, void *_ctx)
+{
+ struct msg_context *ctx = (struct msg_context *)_ctx;
+
+ if (msg->argc != 3) {
+ print_error("Invalid number of arguments for a message: %d\n", msg->argc);
+ return -1;
+ }
+
+ return server_ci_run(ctx->server, msg->argv[1], msg->argv[2]);
+}
+
+static int server_msg_handler(const struct msg *msg, void *ctx)
{
+ if (msg->argc == 0) {
+ print_error("Received an empty message\n");
+ return -1;
+ }
+
+ if (!strcmp(msg->argv[0], "new_worker"))
+ return msg_new_worker(msg, ctx);
+ if (!strcmp(msg->argv[0], "ci_run"))
+ return msg_ci_run(msg, ctx);
+
return msg_dump_unknown(msg);
}
-static int server_conn_handler(int fd, void *)
+static int server_conn_handler(int fd, void *server)
{
- return msg_recv_and_handle(fd, server_msg_handler, NULL);
+ struct msg_context ctx = {server, fd};
+ return msg_recv_and_handle(fd, server_msg_handler, &ctx);
}
-int server_main(const struct server *server)
+int server_main(struct server *server)
{
int ret = 0;
while (1) {
- ret = tcp_server_accept(&server->tcp_server, server_conn_handler, NULL);
+ ret = tcp_server_accept(&server->tcp_server, server_conn_handler, server);
if (ret < 0)
return ret;
}
}
+
+int server_new_worker(struct server *server, int fd)
+{
+ struct worker_queue_entry *entry;
+ int ret = 0;
+
+ print_log("Registering a new worker\n");
+
+ ret = pthread_mutex_lock(&server->scheduler_mtx);
+ if (ret < 0) {
+ print_errno("pthread_mutex_lock");
+ return ret;
+ }
+
+ ret = worker_queue_entry_create(&entry, fd);
+ if (ret < 0)
+ goto unlock;
+
+ worker_queue_push(&server->worker_queue, entry);
+
+ ret = pthread_cond_signal(&server->scheduler_cv);
+ if (ret < 0) {
+ print_errno("pthread_cond_signal");
+ goto unlock;
+ }
+
+unlock:
+ if (pthread_mutex_unlock(&server->scheduler_mtx))
+ print_errno("pthread_mutex_unlock");
+
+ return ret;
+}
+
+int server_ci_run(struct server *server, const char *url, const char *rev)
+{
+ struct ci_queue_entry *entry;
+ int ret = 0;
+
+ print_log("Scheduling a new CI run for repository %s\n", url);
+
+ ret = pthread_mutex_lock(&server->scheduler_mtx);
+ if (ret < 0) {
+ print_errno("pthread_mutex_lock");
+ return ret;
+ }
+
+ ret = ci_queue_entry_create(&entry, url, rev);
+ if (ret < 0)
+ goto unlock;
+
+ ci_queue_push(&server->ci_queue, entry);
+
+ ret = pthread_cond_signal(&server->scheduler_cv);
+ if (ret < 0) {
+ print_errno("pthread_cond_signal");
+ goto unlock;
+ }
+
+unlock:
+ if (pthread_mutex_unlock(&server->scheduler_mtx))
+ print_errno("pthread_mutex_unlock");
+
+ return ret;
+}
diff --git a/src/server.h b/src/server.h
index 11c409b..37fdb13 100644
--- a/src/server.h
+++ b/src/server.h
@@ -1,7 +1,11 @@
#ifndef __SERVER_H__
#define __SERVER_H__
+#include "ci_queue.h"
#include "tcp_server.h"
+#include "worker_queue.h"
+
+#include <pthread.h>
struct settings {
const char *port;
@@ -9,11 +13,21 @@ struct settings {
struct server {
struct tcp_server tcp_server;
+
+ struct worker_queue worker_queue;
+ struct ci_queue ci_queue;
+
+ pthread_mutex_t scheduler_mtx;
+ pthread_cond_t scheduler_cv;
+ pthread_t scheduler;
};
int server_create(struct server *, const struct settings *);
-void server_destroy(const struct server *);
+void server_destroy(struct server *);
+
+int server_main(struct server *);
-int server_main(const struct server *);
+int server_new_worker(struct server *, int fd);
+int server_ci_run(struct server *, const char *url, const char *rev);
#endif
diff --git a/src/worker.c b/src/worker.c
new file mode 100644
index 0000000..687e647
--- /dev/null
+++ b/src/worker.c
@@ -0,0 +1,241 @@
+#include "worker.h"
+#include "ci.h"
+#include "git.h"
+#include "log.h"
+#include "msg.h"
+#include "net.h"
+
+#include <errno.h>
+#include <pthread.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+
+int worker_create(struct worker *worker, const struct settings *settings)
+{
+ int ret = 0;
+
+ ret = libgit_init();
+ if (ret < 0)
+ return ret;
+
+ ret = net_connect(settings->host, settings->port);
+ if (ret < 0)
+ goto git_shutdown;
+ worker->fd = ret;
+
+ ret = pthread_mutex_init(&worker->task_mtx, NULL);
+ if (ret < 0) {
+ print_errno("pthread_mutex_init");
+ goto close;
+ }
+
+ worker->task_active = 0;
+ return ret;
+
+close:
+ close(worker->fd);
+
+git_shutdown:
+ libgit_shutdown();
+
+ return ret;
+}
+
+void worker_destroy(struct worker *worker)
+{
+ if (pthread_mutex_destroy(&worker->task_mtx))
+ print_errno("pthread_mutex_destroy");
+ close(worker->fd);
+ libgit_shutdown();
+}
+
+static int msg_send_new_worker(const struct worker *worker)
+{
+ static char *argv[] = {"new_worker", NULL};
+ struct msg msg;
+ int response, ret = 0;
+
+ ret = msg_from_argv(&msg, argv);
+ if (ret < 0)
+ return ret;
+
+ ret = msg_send_and_wait(worker->fd, &msg, &response);
+ if (ret < 0)
+ goto free_msg;
+
+ if (response < 0) {
+ print_error("Failed to register\n");
+ ret = response;
+ goto free_msg;
+ }
+
+free_msg:
+ msg_free(&msg);
+
+ return ret;
+}
+
+typedef int (*worker_task_body)(const struct msg *);
+
+static int msg_body_ci_run(const struct msg *msg)
+{
+ const char *url = msg->argv[1];
+ const char *rev = msg->argv[2];
+ int ec = 0;
+
+ return ci_run_git_repo(url, rev, &ec);
+}
+
+typedef worker_task_body (*worker_msg_parser)(const struct msg *);
+
+static worker_task_body parse_msg_ci_run(const struct msg *msg)
+{
+ if (msg->argc != 3) {
+ print_error("Invalid number of arguments for a message: %d\n", msg->argc);
+ return NULL;
+ }
+
+ return msg_body_ci_run;
+}
+
+struct worker_msg_parser_it {
+ const char *msg;
+ worker_msg_parser parser;
+};
+
+struct worker_msg_parser_it worker_msg_parsers[] = {
+ {"ci_run", parse_msg_ci_run},
+};
+
+struct worker_task_context {
+ struct msg *msg;
+ worker_task_body body;
+};
+
+static void *worker_task_wrapper(void *_ctx)
+{
+ struct worker_task_context *ctx = (struct worker_task_context *)_ctx;
+
+ ctx->body(ctx->msg);
+ msg_free(ctx->msg);
+ free(ctx->msg);
+ free(ctx);
+ return NULL;
+}
+
+static int worker_schedule_task(struct worker *worker, const struct msg *msg, worker_task_body body)
+{
+ struct worker_task_context *ctx;
+ int ret = 0;
+
+ ctx = malloc(sizeof(*ctx));
+ if (!ctx) {
+ print_errno("malloc");
+ return -1;
+ }
+ ctx->body = body;
+
+ ctx->msg = msg_copy(msg);
+ if (!ctx->msg) {
+ ret = -1;
+ goto free_ctx;
+ }
+
+ ret = pthread_create(&worker->task, NULL, worker_task_wrapper, ctx);
+ if (ret < 0) {
+ print_errno("pthread_create");
+ goto free_msg;
+ }
+ worker->task_active = 1;
+
+ return ret;
+
+free_msg:
+ msg_free(ctx->msg);
+ free(ctx->msg);
+
+free_ctx:
+ free(ctx);
+
+ return ret;
+}
+
+static int worker_msg_handler(struct worker *worker, const struct msg *msg)
+{
+ if (worker->task_active) {
+ int ret = pthread_tryjoin_np(worker->task, NULL);
+ switch (ret) {
+ case 0:
+ worker->task_active = 0;
+ break;
+ case EBUSY:
+ break;
+ default:
+ print_errno("pthread_tryjoin_np");
+ return ret;
+ }
+ }
+
+ if (worker->task_active) {
+ print_error("Worker is busy\n");
+ return -1;
+ }
+
+ if (msg->argc == 0) {
+ print_error("Received an empty message\n");
+ return -1;
+ }
+
+ size_t numof_parsers = sizeof(worker_msg_parsers) / sizeof(worker_msg_parsers[0]);
+
+ for (size_t i = 0; i < numof_parsers; ++i) {
+ const struct worker_msg_parser_it *it = &worker_msg_parsers[i];
+ if (strcmp(it->msg, msg->argv[0]))
+ continue;
+
+ worker_task_body body = it->parser(msg);
+ if (!body)
+ return -1;
+
+ return worker_schedule_task(worker, msg, body);
+ }
+
+ return msg_dump_unknown(msg);
+}
+
+static int worker_msg_handler_lock(const struct msg *msg, void *_worker)
+{
+ struct worker *worker = (struct worker *)_worker;
+ int ret = 0;
+
+ ret = pthread_mutex_lock(&worker->task_mtx);
+ if (ret < 0) {
+ print_errno("pthread_mutex_lock");
+ return -1;
+ }
+
+ ret = worker_msg_handler(worker, msg);
+
+ if (pthread_mutex_unlock(&worker->task_mtx))
+ print_errno("pthread_mutex_unlock");
+
+ return ret;
+}
+
+int worker_main(struct worker *worker, int, char *[])
+{
+ int ret = 0;
+
+ ret = msg_send_new_worker(worker);
+ if (ret < 0)
+ return ret;
+
+ while (1) {
+ print_log("Waiting for a new command\n");
+
+ ret = msg_recv_and_handle(worker->fd, worker_msg_handler_lock, worker);
+ if (ret < 0)
+ return ret;
+ }
+}
diff --git a/src/worker.h b/src/worker.h
new file mode 100644
index 0000000..5231594
--- /dev/null
+++ b/src/worker.h
@@ -0,0 +1,24 @@
+#ifndef __WORKER_H__
+#define __WORKER_H__
+
+#include <pthread.h>
+
+struct settings {
+ char *host;
+ char *port;
+};
+
+struct worker {
+ int fd;
+
+ pthread_mutex_t task_mtx;
+ pthread_t task;
+ int task_active;
+};
+
+int worker_create(struct worker *, const struct settings *);
+void worker_destroy(struct worker *);
+
+int worker_main(struct worker *, int argc, char *argv[]);
+
+#endif
diff --git a/src/worker_main.c b/src/worker_main.c
new file mode 100644
index 0000000..d88071d
--- /dev/null
+++ b/src/worker_main.c
@@ -0,0 +1,86 @@
+#include "const.h"
+#include "worker.h"
+
+#include <getopt.h>
+#include <stdio.h>
+#include <stdlib.h>
+
+static struct settings default_settings()
+{
+ struct settings settings = {DEFAULT_HOST, DEFAULT_PORT};
+ return settings;
+}
+
+static void print_usage(const char *argv0)
+{
+ printf("usage: %s [-h|--help] [-V|--version] [-H|--host HOST] [-p|--port PORT]\n", argv0);
+}
+
+static void print_version()
+{
+ printf("%s\n", VERSION);
+}
+
+static int parse_settings(struct settings *settings, int argc, char *argv[])
+{
+ int opt, longind;
+
+ *settings = default_settings();
+
+ static struct option long_options[] = {
+ {"help", no_argument, 0, 'h'},
+ {"version", no_argument, 0, 'V'},
+ {"host", required_argument, 0, 'H'},
+ {"port", required_argument, 0, 'p'},
+ {0, 0, 0, 0},
+ };
+
+ while ((opt = getopt_long(argc, argv, "hVH:p:", long_options, &longind)) != -1) {
+ switch (opt) {
+ case 'h':
+ print_usage(argv[0]);
+ exit(0);
+ break;
+ case 'V':
+ print_version();
+ exit(0);
+ break;
+ case 'H':
+ settings->host = optarg;
+ break;
+ case 'p':
+ settings->port = optarg;
+ break;
+ default:
+ print_usage(argv[0]);
+ exit(1);
+ break;
+ }
+ }
+
+ return 0;
+}
+
+int main(int argc, char *argv[])
+{
+ struct settings settings;
+ struct worker worker;
+ int ret = 0;
+
+ ret = parse_settings(&settings, argc, argv);
+ if (ret < 0)
+ return ret;
+
+ ret = worker_create(&worker, &settings);
+ if (ret < 0)
+ return ret;
+
+ ret = worker_main(&worker, argc - optind, argv + optind);
+ if (ret < 0)
+ goto destroy_worker;
+
+destroy_worker:
+ worker_destroy(&worker);
+
+ return ret;
+}
diff --git a/src/worker_queue.c b/src/worker_queue.c
new file mode 100644
index 0000000..bb0077b
--- /dev/null
+++ b/src/worker_queue.c
@@ -0,0 +1,79 @@
+#include "worker_queue.h"
+#include "log.h"
+
+#include <stdlib.h>
+#include <sys/queue.h>
+#include <unistd.h>
+
+int worker_queue_entry_create(struct worker_queue_entry **entry, int fd)
+{
+ int newfd = dup(fd);
+
+ if (newfd < 0) {
+ print_errno("malloc");
+ return -1;
+ }
+
+ *entry = malloc(sizeof(struct worker_queue_entry));
+ if (!*entry) {
+ print_errno("malloc");
+ goto close_newfd;
+ }
+ (*entry)->fd = newfd;
+
+ return 0;
+
+close_newfd:
+ close(newfd);
+
+ return -1;
+}
+
+void worker_queue_entry_destroy(struct worker_queue_entry *entry)
+{
+ close(entry->fd);
+ free(entry);
+}
+
+void worker_queue_create(struct worker_queue *queue)
+{
+ STAILQ_INIT(queue);
+}
+
+void worker_queue_destroy(struct worker_queue *queue)
+{
+ struct worker_queue_entry *entry1, *entry2;
+
+ entry1 = STAILQ_FIRST(queue);
+ while (entry1) {
+ entry2 = STAILQ_NEXT(entry1, entries);
+ worker_queue_entry_destroy(entry1);
+ entry1 = entry2;
+ }
+ STAILQ_INIT(queue);
+}
+
+int worker_queue_is_empty(const struct worker_queue *queue)
+{
+ return STAILQ_EMPTY(queue);
+}
+
+void worker_queue_push(struct worker_queue *queue, struct worker_queue_entry *entry)
+{
+ STAILQ_INSERT_TAIL(queue, entry, entries);
+}
+
+void worker_queue_push_head(struct worker_queue *queue, struct worker_queue_entry *entry)
+{
+ STAILQ_INSERT_HEAD(queue, entry, entries);
+}
+
+struct worker_queue_entry *worker_queue_pop(struct worker_queue *queue)
+{
+ struct worker_queue_entry *entry;
+
+ entry = STAILQ_FIRST(queue);
+ STAILQ_REMOVE_HEAD(queue, entries);
+
+ return entry;
+}
diff --git a/src/worker_queue.h b/src/worker_queue.h
new file mode 100644
index 0000000..d5e0bb2
--- /dev/null
+++ b/src/worker_queue.h
@@ -0,0 +1,26 @@
+#ifndef __WORKER_QUEUE_H__
+#define __WORKER_QUEUE_H__
+
+#include <sys/queue.h>
+
+struct worker_queue_entry {
+ int fd;
+ STAILQ_ENTRY(worker_queue_entry) entries;
+};
+
+int worker_queue_entry_create(struct worker_queue_entry **, int fd);
+void worker_queue_entry_destroy(struct worker_queue_entry *);
+
+STAILQ_HEAD(worker_queue, worker_queue_entry);
+
+void worker_queue_create(struct worker_queue *);
+void worker_queue_destroy(struct worker_queue *);
+
+int worker_queue_is_empty(const struct worker_queue *);
+
+void worker_queue_push(struct worker_queue *, struct worker_queue_entry *);
+void worker_queue_push_head(struct worker_queue *, struct worker_queue_entry *);
+
+struct worker_queue_entry *worker_queue_pop(struct worker_queue *);
+
+#endif