diff options
author | Egor Tensin <Egor.Tensin@gmail.com> | 2022-08-25 16:58:38 +0200 |
---|---|---|
committer | Egor Tensin <Egor.Tensin@gmail.com> | 2022-08-26 04:05:02 +0200 |
commit | 532b3ae9b5cd8609237e04db768cc1f750d8631d (patch) | |
tree | f65253a6ce9970d1d93e6bb6c65758d6fa98373a /src | |
parent | cmake: ignore unused parameters for now (diff) | |
download | cimple-532b3ae9b5cd8609237e04db768cc1f750d8631d.tar.gz cimple-532b3ae9b5cd8609237e04db768cc1f750d8631d.zip |
add some more code
This adds a basic "worker" program.
You can now do something like
./server &
./worker &
./client ci_run URL REV
and the server should pass a message to worker, after which it should
clone the repository at URL, checkout REV, and try to run the CI script.
It's extremely unfinished: I need to sort out the graceful shutdown, how
the server manages workers, etc.
Diffstat (limited to '')
-rw-r--r-- | src/CMakeLists.txt | 21 | ||||
-rw-r--r-- | src/ci.c | 101 | ||||
-rw-r--r-- | src/ci.h | 28 | ||||
-rw-r--r-- | src/ci_queue.c | 92 | ||||
-rw-r--r-- | src/ci_queue.h | 27 | ||||
-rw-r--r-- | src/file.c | 61 | ||||
-rw-r--r-- | src/file.h | 10 | ||||
-rw-r--r-- | src/git.c | 109 | ||||
-rw-r--r-- | src/git.h | 19 | ||||
-rw-r--r-- | src/process.c | 46 | ||||
-rw-r--r-- | src/process.h | 6 | ||||
-rw-r--r-- | src/server.c | 249 | ||||
-rw-r--r-- | src/server.h | 18 | ||||
-rw-r--r-- | src/worker.c | 241 | ||||
-rw-r--r-- | src/worker.h | 24 | ||||
-rw-r--r-- | src/worker_main.c | 86 | ||||
-rw-r--r-- | src/worker_queue.c | 79 | ||||
-rw-r--r-- | src/worker_queue.h | 26 |
18 files changed, 1230 insertions, 13 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 1cd508c..c2333a8 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -12,7 +12,24 @@ add_compile_definitions(_GNU_SOURCE) add_compile_definitions(VERSION="${PROJECT_VERSION}") -add_executable(server server_main.c server.c msg.c net.c tcp_server.c) -add_executable(client client_main.c client.c msg.c net.c) +add_executable(server server_main.c server.c + ci_queue.c + msg.c + net.c + tcp_server.c + worker_queue.c) + +add_executable(client client_main.c client.c + msg.c + net.c) + +add_executable(worker worker_main.c worker.c + ci.c + file.c + git.c + msg.c + net.c + process.c) target_link_libraries(server PRIVATE pthread) +target_link_libraries(worker PRIVATE git2 pthread) diff --git a/src/ci.c b/src/ci.c new file mode 100644 index 0000000..1932008 --- /dev/null +++ b/src/ci.c @@ -0,0 +1,101 @@ +#include "ci.h" +#include "file.h" +#include "git.h" +#include "log.h" +#include "process.h" + +#include <git2.h> +#include <stddef.h> +#include <stdlib.h> + +/* clang-format off */ +static const char *ci_scripts[] = { + "./.ci.sh", + "./.ci", + "./ci.sh", + "./ci", + NULL, +}; +/* clang-format on */ + +static run_result ci_run_script(const char *script, int *ec) +{ + const char *args[] = {script, NULL}; + int ret = 0; + + ret = proc_spawn(args, ec); + if (ret < 0) + return RUN_ERROR; + if (*ec) + return RUN_FAILURE; + return RUN_SUCCESS; +} + +run_result ci_run(int *ec) +{ + for (const char **script = ci_scripts; *script; ++script) { + if (!file_exists(*script)) + continue; + print_log("Going to run: %s\n", *script); + return ci_run_script(*script, ec); + } + + print_log("Couldn't find any CI scripts to run\n"); + return RUN_NO; +} + +static void ci_cleanup_git_repo(git_repository *repo) +{ + rm_rf(git_repository_workdir(repo)); + libgit_repository_free(repo); +} + +static int ci_prepare_git_repo(git_repository **repo, const char *url, const char *rev) +{ + int ret = 0; + + ret = libgit_clone_to_tmp(repo, url); + if (ret < 0) + return ret; + + ret = libgit_checkout(*repo, rev); + if (ret < 0) + goto free_repo; + + return ret; + +free_repo: + ci_cleanup_git_repo(*repo); + + return ret; +} + +run_result ci_run_git_repo(const char *url, const char *rev, int *ec) +{ + char *oldpwd; + git_repository *repo; + run_result result = RUN_ERROR; + int ret = 0; + + ret = ci_prepare_git_repo(&repo, url, rev); + if (ret < 0) + goto exit; + + ret = my_chdir(git_repository_workdir(repo), &oldpwd); + if (ret < 0) + goto free_repo; + + result = ci_run(ec); + if (result < 0) + goto oldpwd; + +oldpwd: + my_chdir(oldpwd, NULL); + free(oldpwd); + +free_repo: + ci_cleanup_git_repo(repo); + +exit: + return result; +} diff --git a/src/ci.h b/src/ci.h new file mode 100644 index 0000000..4267b89 --- /dev/null +++ b/src/ci.h @@ -0,0 +1,28 @@ +#ifndef __CI_H__ +#define __CI_H__ + +typedef enum { + RUN_ERROR = -1, + RUN_SUCCESS, + RUN_FAILURE, + RUN_NO, +} run_result; + +run_result ci_run(int *ec); + +/* + * This is a high-level function. It's basically equivalent to the following + * sequence in bash: + * + * dir="$( mktemp -d )" + * git clone --no-checkout "$url" "$dir" + * pushd "$dir" + * git checkout "$rev" + * ./ci + * popd + * rm -rf "$dir" + * + */ +run_result ci_run_git_repo(const char *url, const char *rev, int *ec); + +#endif diff --git a/src/ci_queue.c b/src/ci_queue.c new file mode 100644 index 0000000..d87d3c0 --- /dev/null +++ b/src/ci_queue.c @@ -0,0 +1,92 @@ +#include "ci_queue.h" +#include "log.h" + +#include <stdlib.h> +#include <string.h> +#include <sys/queue.h> + +int ci_queue_entry_create(struct ci_queue_entry **entry, const char *_url, const char *_rev) +{ + char *url, *rev; + + url = strdup(_url); + if (!url) { + print_errno("strdup"); + goto fail; + } + + rev = strdup(_rev); + if (!rev) { + print_errno("strdup"); + goto free_url; + } + + *entry = malloc(sizeof(struct ci_queue_entry)); + if (!*entry) { + print_errno("malloc"); + goto free_rev; + } + (*entry)->url = url; + (*entry)->rev = rev; + + return 0; + +free_rev: + free(rev); + +free_url: + free(url); + +fail: + return -1; +} + +void ci_queue_entry_destroy(struct ci_queue_entry *entry) +{ + free(entry->rev); + free(entry->url); + free(entry); +} + +void ci_queue_create(struct ci_queue *queue) +{ + STAILQ_INIT(queue); +} + +void ci_queue_destroy(struct ci_queue *queue) +{ + struct ci_queue_entry *entry1, *entry2; + + entry1 = STAILQ_FIRST(queue); + while (entry1) { + entry2 = STAILQ_NEXT(entry1, entries); + ci_queue_entry_destroy(entry1); + entry1 = entry2; + } + STAILQ_INIT(queue); +} + +int ci_queue_is_empty(const struct ci_queue *queue) +{ + return STAILQ_EMPTY(queue); +} + +void ci_queue_push(struct ci_queue *queue, struct ci_queue_entry *entry) +{ + STAILQ_INSERT_TAIL(queue, entry, entries); +} + +void ci_queue_push_head(struct ci_queue *queue, struct ci_queue_entry *entry) +{ + STAILQ_INSERT_HEAD(queue, entry, entries); +} + +struct ci_queue_entry *ci_queue_pop(struct ci_queue *queue) +{ + struct ci_queue_entry *entry; + + entry = STAILQ_FIRST(queue); + STAILQ_REMOVE_HEAD(queue, entries); + + return entry; +} diff --git a/src/ci_queue.h b/src/ci_queue.h new file mode 100644 index 0000000..d036376 --- /dev/null +++ b/src/ci_queue.h @@ -0,0 +1,27 @@ +#ifndef __CI_QUEUE_H__ +#define __CI_QUEUE_H__ + +#include <sys/queue.h> + +struct ci_queue_entry { + char *url; + char *rev; + STAILQ_ENTRY(ci_queue_entry) entries; +}; + +int ci_queue_entry_create(struct ci_queue_entry **, const char *url, const char *rev); +void ci_queue_entry_destroy(struct ci_queue_entry *); + +STAILQ_HEAD(ci_queue, ci_queue_entry); + +void ci_queue_create(struct ci_queue *); +void ci_queue_destroy(struct ci_queue *); + +int ci_queue_is_empty(const struct ci_queue *); + +void ci_queue_push(struct ci_queue *, struct ci_queue_entry *); +void ci_queue_push_head(struct ci_queue *, struct ci_queue_entry *); + +struct ci_queue_entry *ci_queue_pop(struct ci_queue *); + +#endif diff --git a/src/file.c b/src/file.c new file mode 100644 index 0000000..ede640b --- /dev/null +++ b/src/file.c @@ -0,0 +1,61 @@ +#include "file.h" +#include "log.h" + +#include <ftw.h> +#include <stdio.h> +#include <stdlib.h> +#include <sys/stat.h> +#include <unistd.h> + +static int unlink_cb(const char *fpath, const struct stat *, int, struct FTW *) +{ + int ret = 0; + + ret = remove(fpath); + if (ret < 0) { + print_errno("remove"); + return ret; + } + + return ret; +} + +int rm_rf(const char *dir) +{ + print_log("Recursively removing directory: %s\n", dir); + return nftw(dir, unlink_cb, 64, FTW_DEPTH | FTW_PHYS); +} + +int my_chdir(const char *dir, char **old) +{ + int ret = 0; + + if (old) { + *old = get_current_dir_name(); + if (!*old) { + print_errno("get_current_dir_name"); + return -1; + } + } + + ret = chdir(dir); + if (ret < 0) { + print_errno("chdir"); + goto free_old; + } + + return ret; + +free_old: + if (old) + free(*old); + + return ret; +} + +int file_exists(const char *path) +{ + struct stat stat; + int ret = lstat(path, &stat); + return !ret && S_ISREG(stat.st_mode); +} diff --git a/src/file.h b/src/file.h new file mode 100644 index 0000000..b89fca1 --- /dev/null +++ b/src/file.h @@ -0,0 +1,10 @@ +#ifndef __FILE_H__ +#define __FILE_H__ + +int rm_rf(const char *dir); + +int my_chdir(const char *dir, char **old); + +int file_exists(const char *path); + +#endif diff --git a/src/git.c b/src/git.c new file mode 100644 index 0000000..3572d67 --- /dev/null +++ b/src/git.c @@ -0,0 +1,109 @@ +#include "git.h" +#include "file.h" +#include "log.h" + +#include <git2.h> + +#define git_print_error(fn) \ + { \ + const git_error *error = git_error_last(); \ + const char *msg = error && error->message ? error->message : "???"; \ + print_error("%s: %s\n", fn, msg); \ + } + +int libgit_init() +{ + int ret = 0; + + ret = git_libgit2_init(); + if (ret < 0) { + git_print_error("git_libgit2_init"); + return ret; + } + + return 0; +} + +void libgit_shutdown() +{ + git_libgit2_shutdown(); +} + +int libgit_clone(git_repository **repo, const char *url, const char *dir) +{ + git_clone_options opts; + int ret = 0; + + print_log("Cloning git repository from %s to %s\n", url, dir); + + ret = git_clone_options_init(&opts, GIT_CLONE_OPTIONS_VERSION); + if (ret < 0) { + git_print_error("git_clone_options_init"); + return ret; + } + opts.checkout_opts.checkout_strategy = GIT_CHECKOUT_NONE; + + ret = git_clone(repo, url, dir, &opts); + if (ret < 0) { + git_print_error("git_clone"); + return ret; + } + + return 0; +} + +int libgit_clone_to_tmp(git_repository **repo, const char *url) +{ + char dir[] = "/tmp/git.XXXXXX"; + + if (!mkdtemp(dir)) { + print_errno("mkdtemp"); + return -1; + } + + return libgit_clone(repo, url, dir); +} + +void libgit_repository_free(git_repository *repo) +{ + git_repository_free(repo); +} + +int libgit_checkout(git_repository *repo, const char *rev) +{ + git_checkout_options opts; + git_object *obj; + int ret = 0; + + print_log("Checking out revision %s\n", rev); + + ret = git_revparse_single(&obj, repo, rev); + if (ret < 0) { + git_print_error("git_revparse_single"); + return ret; + } + + ret = git_checkout_options_init(&opts, GIT_CHECKOUT_OPTIONS_VERSION); + if (ret < 0) { + git_print_error("git_checkout_options_init"); + goto free_obj; + } + opts.checkout_strategy = GIT_CHECKOUT_FORCE; + + ret = git_checkout_tree(repo, obj, &opts); + if (ret < 0) { + git_print_error("git_checkout_tree"); + goto free_obj; + } + + ret = git_repository_set_head_detached(repo, git_object_id(obj)); + if (ret < 0) { + git_print_error("git_repository_set_head_detached"); + goto free_obj; + } + +free_obj: + git_object_free(obj); + + return ret; +} diff --git a/src/git.h b/src/git.h new file mode 100644 index 0000000..83b9c49 --- /dev/null +++ b/src/git.h @@ -0,0 +1,19 @@ +#ifndef __GIT_H__ +#define __GIT_H__ + +#include <git2.h> + +int libgit_init(); +void libgit_shutdown(); + +/* These never check out any files (so that we don't have to do 2 checkouts). */ +int libgit_clone(git_repository **, const char *url, const char *dir); +int libgit_clone_to_tmp(git_repository **, const char *url); + +/* Free a cloned repository. */ +void libgit_repository_free(git_repository *); + +/* I tried to make this an equivalent of `git checkout`. */ +int libgit_checkout(git_repository *, const char *rev); + +#endif diff --git a/src/process.c b/src/process.c new file mode 100644 index 0000000..9d95037 --- /dev/null +++ b/src/process.c @@ -0,0 +1,46 @@ +#include "process.h" +#include "log.h" + +#include <stdlib.h> +#include <sys/wait.h> +#include <unistd.h> + +static int wait_for_child(pid_t pid, int *ec) +{ + int status; + + pid_t ret = waitpid(pid, &status, 0); + if (ret < 0) { + print_errno("waitpid"); + return ret; + } + + if (WIFEXITED(status)) + *ec = WEXITSTATUS(status); + else + *ec = -1; + + return 0; +} + +int proc_spawn(const char *args[], int *ec) +{ + int ret = 0; + + pid_t child_pid = fork(); + if (child_pid < 0) { + print_errno("fork"); + return child_pid; + } + + if (child_pid) + return wait_for_child(child_pid, ec); + + ret = execv(args[0], (char *const *)args); + if (ret < 0) { + print_errno("execv"); + exit(ret); + } + + return 0; +} diff --git a/src/process.h b/src/process.h new file mode 100644 index 0000000..da0c421 --- /dev/null +++ b/src/process.h @@ -0,0 +1,6 @@ +#ifndef __PROCESS_H__ +#define __PROCESS_H__ + +int proc_spawn(const char *args[], int *ec); + +#endif diff --git a/src/server.c b/src/server.c index 3e9f080..e285a9e 100644 --- a/src/server.c +++ b/src/server.c @@ -1,11 +1,97 @@ #include "server.h" +#include "ci_queue.h" +#include "log.h" #include "msg.h" -#include "net.h" #include "tcp_server.h" +#include "worker_queue.h" -#include <stdio.h> +#include <pthread.h> +#include <string.h> #include <unistd.h> +static int server_has_runs_and_workers(const struct server *server) +{ + return !ci_queue_is_empty(&server->ci_queue) && + !worker_queue_is_empty(&server->worker_queue); +} + +static int server_scheduler_iteration(struct server *server) +{ + struct worker_queue_entry *worker; + struct ci_queue_entry *ci_run; + struct msg msg; + int response, ret = 0; + + worker = worker_queue_pop(&server->worker_queue); + ci_run = ci_queue_pop(&server->ci_queue); + + char *argv[] = {"ci_run", ci_run->url, ci_run->rev, NULL}; + + ret = msg_from_argv(&msg, argv); + if (ret < 0) + goto requeue_ci_run; + + ret = msg_send_and_wait(worker->fd, &msg, &response); + if (ret < 0) + goto free_msg; + + if (response < 0) { + print_error("Failed to schedule a CI run\n"); + } + + msg_free(&msg); + + ci_queue_entry_destroy(ci_run); + + /* FIXME: Don't mark worker as free! */ + worker_queue_push_head(&server->worker_queue, worker); + + return 0; + +free_msg: + msg_free(&msg); + +requeue_ci_run: + ci_queue_push_head(&server->ci_queue, ci_run); + + worker_queue_push_head(&server->worker_queue, worker); + + return ret; +} + +static void *server_scheduler(void *_server) +{ + struct server *server = (struct server *)_server; + int ret = 0; + + ret = pthread_mutex_lock(&server->scheduler_mtx); + if (ret < 0) { + print_errno("pthread_mutex_lock"); + goto exit; + } + + while (1) { + while (!server_has_runs_and_workers(server)) { + ret = pthread_cond_wait(&server->scheduler_cv, &server->scheduler_mtx); + if (ret < 0) { + print_errno("pthread_cond_wait"); + goto unlock; + } + } + + ret = server_scheduler_iteration(server); + if (ret < 0) + goto unlock; + } + +unlock: + if (pthread_mutex_unlock(&server->scheduler_mtx)) + print_errno("pthread_mutex_unlock"); + +exit: + return NULL; +} + int server_create(struct server *server, const struct settings *settings) { int ret = 0; @@ -14,31 +100,176 @@ int server_create(struct server *server, const struct settings *settings) if (ret < 0) return ret; - return 0; + worker_queue_create(&server->worker_queue); + + ci_queue_create(&server->ci_queue); + + ret = pthread_mutex_init(&server->scheduler_mtx, NULL); + if (ret < 0) { + print_errno("pthread_mutex_init"); + goto destroy_tcp_server; + } + + ret = pthread_cond_init(&server->scheduler_cv, NULL); + if (ret < 0) { + print_errno("pthread_cond_init"); + goto destroy_scheduler_mtx; + } + + ret = pthread_create(&server->scheduler, NULL, server_scheduler, server); + if (ret < 0) { + print_errno("pthread_create"); + goto destroy_scheduler_cv; + } + + return ret; + +destroy_scheduler_cv: + if (pthread_cond_destroy(&server->scheduler_cv)) + print_errno("pthread_cond_destroy"); + +destroy_scheduler_mtx: + if (pthread_mutex_destroy(&server->scheduler_mtx)) + print_errno("pthread_mutex_destroy"); + + ci_queue_destroy(&server->ci_queue); + + worker_queue_destroy(&server->worker_queue); + +destroy_tcp_server: + tcp_server_destroy(&server->tcp_server); + + return ret; } -void server_destroy(const struct server *server) +void server_destroy(struct server *server) { + if (pthread_join(server->scheduler, NULL)) + print_errno("pthread_join"); + if (pthread_cond_destroy(&server->scheduler_cv)) + print_errno("pthread_cond_destroy"); + if (pthread_mutex_destroy(&server->scheduler_mtx)) + print_errno("pthread_mutex_destroy"); + ci_queue_destroy(&server->ci_queue); + worker_queue_destroy(&server->worker_queue); tcp_server_destroy(&server->tcp_server); } -static int server_msg_handler(const struct msg *msg, void *) +struct msg_context { + struct server *server; + int client_fd; +}; + +static int msg_new_worker(const struct msg *, void *_ctx) +{ + struct msg_context *ctx = (struct msg_context *)_ctx; + return server_new_worker(ctx->server, ctx->client_fd); +} + +static int msg_ci_run(const struct msg *msg, void *_ctx) +{ + struct msg_context *ctx = (struct msg_context *)_ctx; + + if (msg->argc != 3) { + print_error("Invalid number of arguments for a message: %d\n", msg->argc); + return -1; + } + + return server_ci_run(ctx->server, msg->argv[1], msg->argv[2]); +} + +static int server_msg_handler(const struct msg *msg, void *ctx) { + if (msg->argc == 0) { + print_error("Received an empty message\n"); + return -1; + } + + if (!strcmp(msg->argv[0], "new_worker")) + return msg_new_worker(msg, ctx); + if (!strcmp(msg->argv[0], "ci_run")) + return msg_ci_run(msg, ctx); + return msg_dump_unknown(msg); } -static int server_conn_handler(int fd, void *) +static int server_conn_handler(int fd, void *server) { - return msg_recv_and_handle(fd, server_msg_handler, NULL); + struct msg_context ctx = {server, fd}; + return msg_recv_and_handle(fd, server_msg_handler, &ctx); } -int server_main(const struct server *server) +int server_main(struct server *server) { int ret = 0; while (1) { - ret = tcp_server_accept(&server->tcp_server, server_conn_handler, NULL); + ret = tcp_server_accept(&server->tcp_server, server_conn_handler, server); if (ret < 0) return ret; } } + +int server_new_worker(struct server *server, int fd) +{ + struct worker_queue_entry *entry; + int ret = 0; + + print_log("Registering a new worker\n"); + + ret = pthread_mutex_lock(&server->scheduler_mtx); + if (ret < 0) { + print_errno("pthread_mutex_lock"); + return ret; + } + + ret = worker_queue_entry_create(&entry, fd); + if (ret < 0) + goto unlock; + + worker_queue_push(&server->worker_queue, entry); + + ret = pthread_cond_signal(&server->scheduler_cv); + if (ret < 0) { + print_errno("pthread_cond_signal"); + goto unlock; + } + +unlock: + if (pthread_mutex_unlock(&server->scheduler_mtx)) + print_errno("pthread_mutex_unlock"); + + return ret; +} + +int server_ci_run(struct server *server, const char *url, const char *rev) +{ + struct ci_queue_entry *entry; + int ret = 0; + + print_log("Scheduling a new CI run for repository %s\n", url); + + ret = pthread_mutex_lock(&server->scheduler_mtx); + if (ret < 0) { + print_errno("pthread_mutex_lock"); + return ret; + } + + ret = ci_queue_entry_create(&entry, url, rev); + if (ret < 0) + goto unlock; + + ci_queue_push(&server->ci_queue, entry); + + ret = pthread_cond_signal(&server->scheduler_cv); + if (ret < 0) { + print_errno("pthread_cond_signal"); + goto unlock; + } + +unlock: + if (pthread_mutex_unlock(&server->scheduler_mtx)) + print_errno("pthread_mutex_unlock"); + + return ret; +} diff --git a/src/server.h b/src/server.h index 11c409b..37fdb13 100644 --- a/src/server.h +++ b/src/server.h @@ -1,7 +1,11 @@ #ifndef __SERVER_H__ #define __SERVER_H__ +#include "ci_queue.h" #include "tcp_server.h" +#include "worker_queue.h" + +#include <pthread.h> struct settings { const char *port; @@ -9,11 +13,21 @@ struct settings { struct server { struct tcp_server tcp_server; + + struct worker_queue worker_queue; + struct ci_queue ci_queue; + + pthread_mutex_t scheduler_mtx; + pthread_cond_t scheduler_cv; + pthread_t scheduler; }; int server_create(struct server *, const struct settings *); -void server_destroy(const struct server *); +void server_destroy(struct server *); + +int server_main(struct server *); -int server_main(const struct server *); +int server_new_worker(struct server *, int fd); +int server_ci_run(struct server *, const char *url, const char *rev); #endif diff --git a/src/worker.c b/src/worker.c new file mode 100644 index 0000000..687e647 --- /dev/null +++ b/src/worker.c @@ -0,0 +1,241 @@ +#include "worker.h" +#include "ci.h" +#include "git.h" +#include "log.h" +#include "msg.h" +#include "net.h" + +#include <errno.h> +#include <pthread.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> + +int worker_create(struct worker *worker, const struct settings *settings) +{ + int ret = 0; + + ret = libgit_init(); + if (ret < 0) + return ret; + + ret = net_connect(settings->host, settings->port); + if (ret < 0) + goto git_shutdown; + worker->fd = ret; + + ret = pthread_mutex_init(&worker->task_mtx, NULL); + if (ret < 0) { + print_errno("pthread_mutex_init"); + goto close; + } + + worker->task_active = 0; + return ret; + +close: + close(worker->fd); + +git_shutdown: + libgit_shutdown(); + + return ret; +} + +void worker_destroy(struct worker *worker) +{ + if (pthread_mutex_destroy(&worker->task_mtx)) + print_errno("pthread_mutex_destroy"); + close(worker->fd); + libgit_shutdown(); +} + +static int msg_send_new_worker(const struct worker *worker) +{ + static char *argv[] = {"new_worker", NULL}; + struct msg msg; + int response, ret = 0; + + ret = msg_from_argv(&msg, argv); + if (ret < 0) + return ret; + + ret = msg_send_and_wait(worker->fd, &msg, &response); + if (ret < 0) + goto free_msg; + + if (response < 0) { + print_error("Failed to register\n"); + ret = response; + goto free_msg; + } + +free_msg: + msg_free(&msg); + + return ret; +} + +typedef int (*worker_task_body)(const struct msg *); + +static int msg_body_ci_run(const struct msg *msg) +{ + const char *url = msg->argv[1]; + const char *rev = msg->argv[2]; + int ec = 0; + + return ci_run_git_repo(url, rev, &ec); +} + +typedef worker_task_body (*worker_msg_parser)(const struct msg *); + +static worker_task_body parse_msg_ci_run(const struct msg *msg) +{ + if (msg->argc != 3) { + print_error("Invalid number of arguments for a message: %d\n", msg->argc); + return NULL; + } + + return msg_body_ci_run; +} + +struct worker_msg_parser_it { + const char *msg; + worker_msg_parser parser; +}; + +struct worker_msg_parser_it worker_msg_parsers[] = { + {"ci_run", parse_msg_ci_run}, +}; + +struct worker_task_context { + struct msg *msg; + worker_task_body body; +}; + +static void *worker_task_wrapper(void *_ctx) +{ + struct worker_task_context *ctx = (struct worker_task_context *)_ctx; + + ctx->body(ctx->msg); + msg_free(ctx->msg); + free(ctx->msg); + free(ctx); + return NULL; +} + +static int worker_schedule_task(struct worker *worker, const struct msg *msg, worker_task_body body) +{ + struct worker_task_context *ctx; + int ret = 0; + + ctx = malloc(sizeof(*ctx)); + if (!ctx) { + print_errno("malloc"); + return -1; + } + ctx->body = body; + + ctx->msg = msg_copy(msg); + if (!ctx->msg) { + ret = -1; + goto free_ctx; + } + + ret = pthread_create(&worker->task, NULL, worker_task_wrapper, ctx); + if (ret < 0) { + print_errno("pthread_create"); + goto free_msg; + } + worker->task_active = 1; + + return ret; + +free_msg: + msg_free(ctx->msg); + free(ctx->msg); + +free_ctx: + free(ctx); + + return ret; +} + +static int worker_msg_handler(struct worker *worker, const struct msg *msg) +{ + if (worker->task_active) { + int ret = pthread_tryjoin_np(worker->task, NULL); + switch (ret) { + case 0: + worker->task_active = 0; + break; + case EBUSY: + break; + default: + print_errno("pthread_tryjoin_np"); + return ret; + } + } + + if (worker->task_active) { + print_error("Worker is busy\n"); + return -1; + } + + if (msg->argc == 0) { + print_error("Received an empty message\n"); + return -1; + } + + size_t numof_parsers = sizeof(worker_msg_parsers) / sizeof(worker_msg_parsers[0]); + + for (size_t i = 0; i < numof_parsers; ++i) { + const struct worker_msg_parser_it *it = &worker_msg_parsers[i]; + if (strcmp(it->msg, msg->argv[0])) + continue; + + worker_task_body body = it->parser(msg); + if (!body) + return -1; + + return worker_schedule_task(worker, msg, body); + } + + return msg_dump_unknown(msg); +} + +static int worker_msg_handler_lock(const struct msg *msg, void *_worker) +{ + struct worker *worker = (struct worker *)_worker; + int ret = 0; + + ret = pthread_mutex_lock(&worker->task_mtx); + if (ret < 0) { + print_errno("pthread_mutex_lock"); + return -1; + } + + ret = worker_msg_handler(worker, msg); + + if (pthread_mutex_unlock(&worker->task_mtx)) + print_errno("pthread_mutex_unlock"); + + return ret; +} + +int worker_main(struct worker *worker, int, char *[]) +{ + int ret = 0; + + ret = msg_send_new_worker(worker); + if (ret < 0) + return ret; + + while (1) { + print_log("Waiting for a new command\n"); + + ret = msg_recv_and_handle(worker->fd, worker_msg_handler_lock, worker); + if (ret < 0) + return ret; + } +} diff --git a/src/worker.h b/src/worker.h new file mode 100644 index 0000000..5231594 --- /dev/null +++ b/src/worker.h @@ -0,0 +1,24 @@ +#ifndef __WORKER_H__ +#define __WORKER_H__ + +#include <pthread.h> + +struct settings { + char *host; + char *port; +}; + +struct worker { + int fd; + + pthread_mutex_t task_mtx; + pthread_t task; + int task_active; +}; + +int worker_create(struct worker *, const struct settings *); +void worker_destroy(struct worker *); + +int worker_main(struct worker *, int argc, char *argv[]); + +#endif diff --git a/src/worker_main.c b/src/worker_main.c new file mode 100644 index 0000000..d88071d --- /dev/null +++ b/src/worker_main.c @@ -0,0 +1,86 @@ +#include "const.h" +#include "worker.h" + +#include <getopt.h> +#include <stdio.h> +#include <stdlib.h> + +static struct settings default_settings() +{ + struct settings settings = {DEFAULT_HOST, DEFAULT_PORT}; + return settings; +} + +static void print_usage(const char *argv0) +{ + printf("usage: %s [-h|--help] [-V|--version] [-H|--host HOST] [-p|--port PORT]\n", argv0); +} + +static void print_version() +{ + printf("%s\n", VERSION); +} + +static int parse_settings(struct settings *settings, int argc, char *argv[]) +{ + int opt, longind; + + *settings = default_settings(); + + static struct option long_options[] = { + {"help", no_argument, 0, 'h'}, + {"version", no_argument, 0, 'V'}, + {"host", required_argument, 0, 'H'}, + {"port", required_argument, 0, 'p'}, + {0, 0, 0, 0}, + }; + + while ((opt = getopt_long(argc, argv, "hVH:p:", long_options, &longind)) != -1) { + switch (opt) { + case 'h': + print_usage(argv[0]); + exit(0); + break; + case 'V': + print_version(); + exit(0); + break; + case 'H': + settings->host = optarg; + break; + case 'p': + settings->port = optarg; + break; + default: + print_usage(argv[0]); + exit(1); + break; + } + } + + return 0; +} + +int main(int argc, char *argv[]) +{ + struct settings settings; + struct worker worker; + int ret = 0; + + ret = parse_settings(&settings, argc, argv); + if (ret < 0) + return ret; + + ret = worker_create(&worker, &settings); + if (ret < 0) + return ret; + + ret = worker_main(&worker, argc - optind, argv + optind); + if (ret < 0) + goto destroy_worker; + +destroy_worker: + worker_destroy(&worker); + + return ret; +} diff --git a/src/worker_queue.c b/src/worker_queue.c new file mode 100644 index 0000000..bb0077b --- /dev/null +++ b/src/worker_queue.c @@ -0,0 +1,79 @@ +#include "worker_queue.h" +#include "log.h" + +#include <stdlib.h> +#include <sys/queue.h> +#include <unistd.h> + +int worker_queue_entry_create(struct worker_queue_entry **entry, int fd) +{ + int newfd = dup(fd); + + if (newfd < 0) { + print_errno("malloc"); + return -1; + } + + *entry = malloc(sizeof(struct worker_queue_entry)); + if (!*entry) { + print_errno("malloc"); + goto close_newfd; + } + (*entry)->fd = newfd; + + return 0; + +close_newfd: + close(newfd); + + return -1; +} + +void worker_queue_entry_destroy(struct worker_queue_entry *entry) +{ + close(entry->fd); + free(entry); +} + +void worker_queue_create(struct worker_queue *queue) +{ + STAILQ_INIT(queue); +} + +void worker_queue_destroy(struct worker_queue *queue) +{ + struct worker_queue_entry *entry1, *entry2; + + entry1 = STAILQ_FIRST(queue); + while (entry1) { + entry2 = STAILQ_NEXT(entry1, entries); + worker_queue_entry_destroy(entry1); + entry1 = entry2; + } + STAILQ_INIT(queue); +} + +int worker_queue_is_empty(const struct worker_queue *queue) +{ + return STAILQ_EMPTY(queue); +} + +void worker_queue_push(struct worker_queue *queue, struct worker_queue_entry *entry) +{ + STAILQ_INSERT_TAIL(queue, entry, entries); +} + +void worker_queue_push_head(struct worker_queue *queue, struct worker_queue_entry *entry) +{ + STAILQ_INSERT_HEAD(queue, entry, entries); +} + +struct worker_queue_entry *worker_queue_pop(struct worker_queue *queue) +{ + struct worker_queue_entry *entry; + + entry = STAILQ_FIRST(queue); + STAILQ_REMOVE_HEAD(queue, entries); + + return entry; +} diff --git a/src/worker_queue.h b/src/worker_queue.h new file mode 100644 index 0000000..d5e0bb2 --- /dev/null +++ b/src/worker_queue.h @@ -0,0 +1,26 @@ +#ifndef __WORKER_QUEUE_H__ +#define __WORKER_QUEUE_H__ + +#include <sys/queue.h> + +struct worker_queue_entry { + int fd; + STAILQ_ENTRY(worker_queue_entry) entries; +}; + +int worker_queue_entry_create(struct worker_queue_entry **, int fd); +void worker_queue_entry_destroy(struct worker_queue_entry *); + +STAILQ_HEAD(worker_queue, worker_queue_entry); + +void worker_queue_create(struct worker_queue *); +void worker_queue_destroy(struct worker_queue *); + +int worker_queue_is_empty(const struct worker_queue *); + +void worker_queue_push(struct worker_queue *, struct worker_queue_entry *); +void worker_queue_push_head(struct worker_queue *, struct worker_queue_entry *); + +struct worker_queue_entry *worker_queue_pop(struct worker_queue *); + +#endif |