aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/src/worker.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/worker.c')
-rw-r--r--src/worker.c223
1 files changed, 58 insertions, 165 deletions
diff --git a/src/worker.c b/src/worker.c
index 19270db..3b48636 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -26,18 +26,8 @@ int worker_create(struct worker *worker, const struct settings *settings)
goto git_shutdown;
worker->fd = ret;
- ret = pthread_mutex_init(&worker->task_mtx, NULL);
- if (ret) {
- pthread_print_errno(ret, "pthread_mutex_init");
- goto close;
- }
-
- worker->task_active = 0;
return ret;
-close:
- check_errno(close(worker->fd), "close");
-
git_shutdown:
libgit_shutdown();
@@ -48,11 +38,6 @@ void worker_destroy(struct worker *worker)
{
print_log("Shutting down\n");
- if (worker->task_active) {
- pthread_check(pthread_join(worker->task, NULL), "pthread_join");
- worker->task_active = 0;
- }
- pthread_check(pthread_mutex_destroy(&worker->task_mtx), "pthread_mutex_destroy");
check_errno(close(worker->fd), "close");
libgit_shutdown();
}
@@ -61,200 +46,101 @@ static int msg_send_new_worker(const struct worker *worker)
{
static char *argv[] = {"new_worker", NULL};
struct msg msg;
- int response, ret = 0;
+ int ret = 0;
ret = msg_from_argv(&msg, argv);
if (ret < 0)
return ret;
- ret = msg_send_and_wait(worker->fd, &msg, &response);
+ ret = msg_send(worker->fd, &msg);
if (ret < 0)
goto free_msg;
- if (response < 0) {
- print_error("Failed to register\n");
- ret = response;
- goto free_msg;
- }
-
free_msg:
msg_free(&msg);
return ret;
}
-typedef int (*worker_task_body)(const struct msg *);
-
-static int msg_body_ci_run(const struct msg *msg)
+static int msg_ci_run_do(const char *url, const char *rev, struct proc_output *result)
{
- const char *url = msg->argv[1];
- const char *rev = msg->argv[2];
- struct proc_output result;
int ret = 0;
- ret = ci_run_git_repo(url, rev, &result);
+ ret = ci_run_git_repo(url, rev, result);
if (ret < 0) {
print_error("Run failed with an error\n");
return ret;
}
- print_log("Process exit code: %d\n", result.ec);
- print_log("Process output:\n%s", result.output);
- if (!result.output || !result.output_len || result.output[result.output_len - 1] != '\n')
+ print_log("Process exit code: %d\n", result->ec);
+ print_log("Process output:\n%s", result->output);
+ if (!result->output || !result->output_len ||
+ result->output[result->output_len - 1] != '\n')
print_log("\n");
- free(result.output);
- return ret;
+ return 0;
}
-typedef worker_task_body (*worker_msg_parser)(const struct msg *);
+static int msg_ci_run_handler(struct worker *worker, const struct msg *request)
+{
+ struct msg response;
+ struct proc_output result;
+ int ret = 0;
+
+ ret = msg_ci_run_do(request->argv[1], request->argv[2], &result);
+ if (ret < 0)
+ msg_error(&response);
+ else
+ msg_success(&response);
+ proc_output_free(&result);
-static worker_task_body parse_msg_ci_run(const struct msg *msg)
+ return msg_send(worker->fd, &response);
+}
+
+static int msg_ci_run_parser(const struct msg *msg)
{
if (msg->argc != 3) {
print_error("Invalid number of arguments for a message: %d\n", msg->argc);
- return NULL;
+ return 0;
}
- return msg_body_ci_run;
+ return 1;
}
-struct worker_msg_parser_it {
- const char *msg;
- worker_msg_parser parser;
-};
+typedef int (*msg_parser)(const struct msg *msg);
+typedef int (*msg_handler)(struct worker *, const struct msg *);
-struct worker_msg_parser_it worker_msg_parsers[] = {
- {"ci_run", parse_msg_ci_run},
+struct msg_descr {
+ const char *cmd;
+ msg_parser parser;
+ msg_handler handler;
};
-struct worker_task_context {
- struct msg *msg;
- worker_task_body body;
+struct msg_descr messages[] = {
+ {"ci_run", msg_ci_run_parser, msg_ci_run_handler},
};
-static void *worker_task_wrapper(void *_ctx)
-{
- struct worker_task_context *ctx = (struct worker_task_context *)_ctx;
-
- ctx->body(ctx->msg);
- msg_free(ctx->msg);
- free(ctx->msg);
- free(ctx);
- return NULL;
-}
-
-static int worker_schedule_task(struct worker *worker, const struct msg *msg, worker_task_body body)
-{
- struct worker_task_context *ctx;
- pthread_attr_t attr;
- int ret = 0;
-
- ctx = malloc(sizeof(*ctx));
- if (!ctx) {
- print_errno("malloc");
- return -1;
- }
- ctx->body = body;
-
- ctx->msg = msg_copy(msg);
- if (!ctx->msg) {
- ret = -1;
- goto free_ctx;
- }
-
- ret = pthread_attr_init(&attr);
- if (ret) {
- pthread_print_errno(ret, "pthread_attr_init");
- goto free_msg;
- }
-
- ret = signal_set_thread_attr(&attr);
- if (ret < 0)
- goto free_attr;
-
- ret = pthread_create(&worker->task, NULL, worker_task_wrapper, ctx);
- if (ret) {
- pthread_print_errno(ret, "pthread_create");
- goto free_attr;
- }
- worker->task_active = 1;
- pthread_check(pthread_attr_destroy(&attr), "pthread_attr_destroy");
-
- return ret;
-
-free_attr:
- pthread_check(pthread_attr_destroy(&attr), "pthread_attr_destroy");
-
-free_msg:
- msg_free(ctx->msg);
- free(ctx->msg);
-
-free_ctx:
- free(ctx);
-
- return ret;
-}
-
-static int worker_msg_handler(struct worker *worker, const struct msg *msg)
+static int worker_msg_handler(struct worker *worker, const struct msg *request)
{
- if (worker->task_active) {
- int ret = pthread_tryjoin_np(worker->task, NULL);
- switch (ret) {
- case 0:
- worker->task_active = 0;
- break;
- case EBUSY:
- break;
- default:
- pthread_print_errno(ret, "pthread_tryjoin_np");
- return ret;
- }
- }
-
- if (worker->task_active) {
- print_error("Worker is busy\n");
- return -1;
- }
-
- if (msg->argc == 0) {
- print_error("Received an empty message\n");
- return -1;
- }
+ if (request->argc == 0)
+ goto unknown_request;
- size_t numof_parsers = sizeof(worker_msg_parsers) / sizeof(worker_msg_parsers[0]);
+ size_t numof_messages = sizeof(messages) / sizeof(messages[0]);
- for (size_t i = 0; i < numof_parsers; ++i) {
- const struct worker_msg_parser_it *it = &worker_msg_parsers[i];
- if (strcmp(it->msg, msg->argv[0]))
+ for (size_t i = 0; i < numof_messages; ++i) {
+ if (strcmp(messages[i].cmd, request->argv[0]))
continue;
-
- worker_task_body body = it->parser(msg);
- if (!body)
- return -1;
-
- return worker_schedule_task(worker, msg, body);
- }
-
- return msg_dump_unknown(msg);
-}
-
-static int worker_msg_handler_lock(const struct msg *msg, void *_worker)
-{
- struct worker *worker = (struct worker *)_worker;
- int ret = 0;
-
- ret = pthread_mutex_lock(&worker->task_mtx);
- if (ret) {
- pthread_print_errno(ret, "pthread_mutex_lock");
- return -1;
+ if (!messages[i].parser(request))
+ continue;
+ return messages[i].handler(worker, request);
}
- ret = worker_msg_handler(worker, msg);
-
- pthread_check(pthread_mutex_unlock(&worker->task_mtx), "pthread_mutex_lock");
-
- return ret;
+unknown_request:
+ print_error("Received an unknown message\n");
+ msg_dump(request);
+ struct msg response;
+ msg_error(&response);
+ return msg_send(worker->fd, &response);
}
int worker_main(struct worker *worker, int, char *[])
@@ -266,9 +152,16 @@ int worker_main(struct worker *worker, int, char *[])
return ret;
while (!global_stop_flag) {
+ struct msg request;
+
print_log("Waiting for a new command\n");
- ret = msg_recv_and_handle(worker->fd, worker_msg_handler_lock, worker);
+ ret = msg_recv(worker->fd, &request);
+ if (ret < 0)
+ return ret;
+
+ ret = worker_msg_handler(worker, &request);
+ msg_free(&request);
if (ret < 0)
return ret;
}