diff options
Diffstat (limited to 'src/worker.c')
-rw-r--r-- | src/worker.c | 223 |
1 files changed, 58 insertions, 165 deletions
diff --git a/src/worker.c b/src/worker.c index 19270db..3b48636 100644 --- a/src/worker.c +++ b/src/worker.c @@ -26,18 +26,8 @@ int worker_create(struct worker *worker, const struct settings *settings) goto git_shutdown; worker->fd = ret; - ret = pthread_mutex_init(&worker->task_mtx, NULL); - if (ret) { - pthread_print_errno(ret, "pthread_mutex_init"); - goto close; - } - - worker->task_active = 0; return ret; -close: - check_errno(close(worker->fd), "close"); - git_shutdown: libgit_shutdown(); @@ -48,11 +38,6 @@ void worker_destroy(struct worker *worker) { print_log("Shutting down\n"); - if (worker->task_active) { - pthread_check(pthread_join(worker->task, NULL), "pthread_join"); - worker->task_active = 0; - } - pthread_check(pthread_mutex_destroy(&worker->task_mtx), "pthread_mutex_destroy"); check_errno(close(worker->fd), "close"); libgit_shutdown(); } @@ -61,200 +46,101 @@ static int msg_send_new_worker(const struct worker *worker) { static char *argv[] = {"new_worker", NULL}; struct msg msg; - int response, ret = 0; + int ret = 0; ret = msg_from_argv(&msg, argv); if (ret < 0) return ret; - ret = msg_send_and_wait(worker->fd, &msg, &response); + ret = msg_send(worker->fd, &msg); if (ret < 0) goto free_msg; - if (response < 0) { - print_error("Failed to register\n"); - ret = response; - goto free_msg; - } - free_msg: msg_free(&msg); return ret; } -typedef int (*worker_task_body)(const struct msg *); - -static int msg_body_ci_run(const struct msg *msg) +static int msg_ci_run_do(const char *url, const char *rev, struct proc_output *result) { - const char *url = msg->argv[1]; - const char *rev = msg->argv[2]; - struct proc_output result; int ret = 0; - ret = ci_run_git_repo(url, rev, &result); + ret = ci_run_git_repo(url, rev, result); if (ret < 0) { print_error("Run failed with an error\n"); return ret; } - print_log("Process exit code: %d\n", result.ec); - print_log("Process output:\n%s", result.output); - if (!result.output || !result.output_len || result.output[result.output_len - 1] != '\n') + print_log("Process exit code: %d\n", result->ec); + print_log("Process output:\n%s", result->output); + if (!result->output || !result->output_len || + result->output[result->output_len - 1] != '\n') print_log("\n"); - free(result.output); - return ret; + return 0; } -typedef worker_task_body (*worker_msg_parser)(const struct msg *); +static int msg_ci_run_handler(struct worker *worker, const struct msg *request) +{ + struct msg response; + struct proc_output result; + int ret = 0; + + ret = msg_ci_run_do(request->argv[1], request->argv[2], &result); + if (ret < 0) + msg_error(&response); + else + msg_success(&response); + proc_output_free(&result); -static worker_task_body parse_msg_ci_run(const struct msg *msg) + return msg_send(worker->fd, &response); +} + +static int msg_ci_run_parser(const struct msg *msg) { if (msg->argc != 3) { print_error("Invalid number of arguments for a message: %d\n", msg->argc); - return NULL; + return 0; } - return msg_body_ci_run; + return 1; } -struct worker_msg_parser_it { - const char *msg; - worker_msg_parser parser; -}; +typedef int (*msg_parser)(const struct msg *msg); +typedef int (*msg_handler)(struct worker *, const struct msg *); -struct worker_msg_parser_it worker_msg_parsers[] = { - {"ci_run", parse_msg_ci_run}, +struct msg_descr { + const char *cmd; + msg_parser parser; + msg_handler handler; }; -struct worker_task_context { - struct msg *msg; - worker_task_body body; +struct msg_descr messages[] = { + {"ci_run", msg_ci_run_parser, msg_ci_run_handler}, }; -static void *worker_task_wrapper(void *_ctx) -{ - struct worker_task_context *ctx = (struct worker_task_context *)_ctx; - - ctx->body(ctx->msg); - msg_free(ctx->msg); - free(ctx->msg); - free(ctx); - return NULL; -} - -static int worker_schedule_task(struct worker *worker, const struct msg *msg, worker_task_body body) -{ - struct worker_task_context *ctx; - pthread_attr_t attr; - int ret = 0; - - ctx = malloc(sizeof(*ctx)); - if (!ctx) { - print_errno("malloc"); - return -1; - } - ctx->body = body; - - ctx->msg = msg_copy(msg); - if (!ctx->msg) { - ret = -1; - goto free_ctx; - } - - ret = pthread_attr_init(&attr); - if (ret) { - pthread_print_errno(ret, "pthread_attr_init"); - goto free_msg; - } - - ret = signal_set_thread_attr(&attr); - if (ret < 0) - goto free_attr; - - ret = pthread_create(&worker->task, NULL, worker_task_wrapper, ctx); - if (ret) { - pthread_print_errno(ret, "pthread_create"); - goto free_attr; - } - worker->task_active = 1; - pthread_check(pthread_attr_destroy(&attr), "pthread_attr_destroy"); - - return ret; - -free_attr: - pthread_check(pthread_attr_destroy(&attr), "pthread_attr_destroy"); - -free_msg: - msg_free(ctx->msg); - free(ctx->msg); - -free_ctx: - free(ctx); - - return ret; -} - -static int worker_msg_handler(struct worker *worker, const struct msg *msg) +static int worker_msg_handler(struct worker *worker, const struct msg *request) { - if (worker->task_active) { - int ret = pthread_tryjoin_np(worker->task, NULL); - switch (ret) { - case 0: - worker->task_active = 0; - break; - case EBUSY: - break; - default: - pthread_print_errno(ret, "pthread_tryjoin_np"); - return ret; - } - } - - if (worker->task_active) { - print_error("Worker is busy\n"); - return -1; - } - - if (msg->argc == 0) { - print_error("Received an empty message\n"); - return -1; - } + if (request->argc == 0) + goto unknown_request; - size_t numof_parsers = sizeof(worker_msg_parsers) / sizeof(worker_msg_parsers[0]); + size_t numof_messages = sizeof(messages) / sizeof(messages[0]); - for (size_t i = 0; i < numof_parsers; ++i) { - const struct worker_msg_parser_it *it = &worker_msg_parsers[i]; - if (strcmp(it->msg, msg->argv[0])) + for (size_t i = 0; i < numof_messages; ++i) { + if (strcmp(messages[i].cmd, request->argv[0])) continue; - - worker_task_body body = it->parser(msg); - if (!body) - return -1; - - return worker_schedule_task(worker, msg, body); - } - - return msg_dump_unknown(msg); -} - -static int worker_msg_handler_lock(const struct msg *msg, void *_worker) -{ - struct worker *worker = (struct worker *)_worker; - int ret = 0; - - ret = pthread_mutex_lock(&worker->task_mtx); - if (ret) { - pthread_print_errno(ret, "pthread_mutex_lock"); - return -1; + if (!messages[i].parser(request)) + continue; + return messages[i].handler(worker, request); } - ret = worker_msg_handler(worker, msg); - - pthread_check(pthread_mutex_unlock(&worker->task_mtx), "pthread_mutex_lock"); - - return ret; +unknown_request: + print_error("Received an unknown message\n"); + msg_dump(request); + struct msg response; + msg_error(&response); + return msg_send(worker->fd, &response); } int worker_main(struct worker *worker, int, char *[]) @@ -266,9 +152,16 @@ int worker_main(struct worker *worker, int, char *[]) return ret; while (!global_stop_flag) { + struct msg request; + print_log("Waiting for a new command\n"); - ret = msg_recv_and_handle(worker->fd, worker_msg_handler_lock, worker); + ret = msg_recv(worker->fd, &request); + if (ret < 0) + return ret; + + ret = worker_msg_handler(worker, &request); + msg_free(&request); if (ret < 0) return ret; } |