aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/src/worker.c
diff options
context:
space:
mode:
authorEgor Tensin <Egor.Tensin@gmail.com>2022-08-28 15:14:07 +0200
committerEgor Tensin <Egor.Tensin@gmail.com>2022-08-28 15:14:07 +0200
commit0ff63a9ceff4c8fcd679b52cb1c03d96675f52f0 (patch)
tree2b0d1fb32b09988f652228a40508dbcef9c1032e /src/worker.c
parentnet: use MSG_NOSIGNAL (diff)
downloadcimple-0ff63a9ceff4c8fcd679b52cb1c03d96675f52f0.tar.gz
cimple-0ff63a9ceff4c8fcd679b52cb1c03d96675f52f0.zip
holy crap, it actually kinda works now
Previously, I had a stupid system where I would create a thread after every accept(), and put worker descriptors in a queue. A special "scheduler" thread would then pick them out, and give out jobs to complete. The problem was, of course, I couldn't conveniently poll job status from workers. I thought about using poll(), but that turned out to be a horribly complicated API. How do I deal with partial reads, for example? I don't honestly know. Then it hit me that I could just use the threads that handle accept()ed connections as "worker threads", which would synchronously schedule jobs and wait for them to complete. This solves every problem and removes the need for a lot of inter-thread synchronization magic. It even works now, holy crap! You can launch and terminate workers at will, and they will pick up new jobs automatically. As a side not, msg_recv_and_handle turned out to be too limiting and complicated for me, so I got rid of that, and do normal msg_recv/msg_send calls.
Diffstat (limited to '')
-rw-r--r--src/worker.c223
1 files changed, 58 insertions, 165 deletions
diff --git a/src/worker.c b/src/worker.c
index 19270db..3b48636 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -26,18 +26,8 @@ int worker_create(struct worker *worker, const struct settings *settings)
goto git_shutdown;
worker->fd = ret;
- ret = pthread_mutex_init(&worker->task_mtx, NULL);
- if (ret) {
- pthread_print_errno(ret, "pthread_mutex_init");
- goto close;
- }
-
- worker->task_active = 0;
return ret;
-close:
- check_errno(close(worker->fd), "close");
-
git_shutdown:
libgit_shutdown();
@@ -48,11 +38,6 @@ void worker_destroy(struct worker *worker)
{
print_log("Shutting down\n");
- if (worker->task_active) {
- pthread_check(pthread_join(worker->task, NULL), "pthread_join");
- worker->task_active = 0;
- }
- pthread_check(pthread_mutex_destroy(&worker->task_mtx), "pthread_mutex_destroy");
check_errno(close(worker->fd), "close");
libgit_shutdown();
}
@@ -61,200 +46,101 @@ static int msg_send_new_worker(const struct worker *worker)
{
static char *argv[] = {"new_worker", NULL};
struct msg msg;
- int response, ret = 0;
+ int ret = 0;
ret = msg_from_argv(&msg, argv);
if (ret < 0)
return ret;
- ret = msg_send_and_wait(worker->fd, &msg, &response);
+ ret = msg_send(worker->fd, &msg);
if (ret < 0)
goto free_msg;
- if (response < 0) {
- print_error("Failed to register\n");
- ret = response;
- goto free_msg;
- }
-
free_msg:
msg_free(&msg);
return ret;
}
-typedef int (*worker_task_body)(const struct msg *);
-
-static int msg_body_ci_run(const struct msg *msg)
+static int msg_ci_run_do(const char *url, const char *rev, struct proc_output *result)
{
- const char *url = msg->argv[1];
- const char *rev = msg->argv[2];
- struct proc_output result;
int ret = 0;
- ret = ci_run_git_repo(url, rev, &result);
+ ret = ci_run_git_repo(url, rev, result);
if (ret < 0) {
print_error("Run failed with an error\n");
return ret;
}
- print_log("Process exit code: %d\n", result.ec);
- print_log("Process output:\n%s", result.output);
- if (!result.output || !result.output_len || result.output[result.output_len - 1] != '\n')
+ print_log("Process exit code: %d\n", result->ec);
+ print_log("Process output:\n%s", result->output);
+ if (!result->output || !result->output_len ||
+ result->output[result->output_len - 1] != '\n')
print_log("\n");
- free(result.output);
- return ret;
+ return 0;
}
-typedef worker_task_body (*worker_msg_parser)(const struct msg *);
+static int msg_ci_run_handler(struct worker *worker, const struct msg *request)
+{
+ struct msg response;
+ struct proc_output result;
+ int ret = 0;
+
+ ret = msg_ci_run_do(request->argv[1], request->argv[2], &result);
+ if (ret < 0)
+ msg_error(&response);
+ else
+ msg_success(&response);
+ proc_output_free(&result);
-static worker_task_body parse_msg_ci_run(const struct msg *msg)
+ return msg_send(worker->fd, &response);
+}
+
+static int msg_ci_run_parser(const struct msg *msg)
{
if (msg->argc != 3) {
print_error("Invalid number of arguments for a message: %d\n", msg->argc);
- return NULL;
+ return 0;
}
- return msg_body_ci_run;
+ return 1;
}
-struct worker_msg_parser_it {
- const char *msg;
- worker_msg_parser parser;
-};
+typedef int (*msg_parser)(const struct msg *msg);
+typedef int (*msg_handler)(struct worker *, const struct msg *);
-struct worker_msg_parser_it worker_msg_parsers[] = {
- {"ci_run", parse_msg_ci_run},
+struct msg_descr {
+ const char *cmd;
+ msg_parser parser;
+ msg_handler handler;
};
-struct worker_task_context {
- struct msg *msg;
- worker_task_body body;
+struct msg_descr messages[] = {
+ {"ci_run", msg_ci_run_parser, msg_ci_run_handler},
};
-static void *worker_task_wrapper(void *_ctx)
-{
- struct worker_task_context *ctx = (struct worker_task_context *)_ctx;
-
- ctx->body(ctx->msg);
- msg_free(ctx->msg);
- free(ctx->msg);
- free(ctx);
- return NULL;
-}
-
-static int worker_schedule_task(struct worker *worker, const struct msg *msg, worker_task_body body)
-{
- struct worker_task_context *ctx;
- pthread_attr_t attr;
- int ret = 0;
-
- ctx = malloc(sizeof(*ctx));
- if (!ctx) {
- print_errno("malloc");
- return -1;
- }
- ctx->body = body;
-
- ctx->msg = msg_copy(msg);
- if (!ctx->msg) {
- ret = -1;
- goto free_ctx;
- }
-
- ret = pthread_attr_init(&attr);
- if (ret) {
- pthread_print_errno(ret, "pthread_attr_init");
- goto free_msg;
- }
-
- ret = signal_set_thread_attr(&attr);
- if (ret < 0)
- goto free_attr;
-
- ret = pthread_create(&worker->task, NULL, worker_task_wrapper, ctx);
- if (ret) {
- pthread_print_errno(ret, "pthread_create");
- goto free_attr;
- }
- worker->task_active = 1;
- pthread_check(pthread_attr_destroy(&attr), "pthread_attr_destroy");
-
- return ret;
-
-free_attr:
- pthread_check(pthread_attr_destroy(&attr), "pthread_attr_destroy");
-
-free_msg:
- msg_free(ctx->msg);
- free(ctx->msg);
-
-free_ctx:
- free(ctx);
-
- return ret;
-}
-
-static int worker_msg_handler(struct worker *worker, const struct msg *msg)
+static int worker_msg_handler(struct worker *worker, const struct msg *request)
{
- if (worker->task_active) {
- int ret = pthread_tryjoin_np(worker->task, NULL);
- switch (ret) {
- case 0:
- worker->task_active = 0;
- break;
- case EBUSY:
- break;
- default:
- pthread_print_errno(ret, "pthread_tryjoin_np");
- return ret;
- }
- }
-
- if (worker->task_active) {
- print_error("Worker is busy\n");
- return -1;
- }
-
- if (msg->argc == 0) {
- print_error("Received an empty message\n");
- return -1;
- }
+ if (request->argc == 0)
+ goto unknown_request;
- size_t numof_parsers = sizeof(worker_msg_parsers) / sizeof(worker_msg_parsers[0]);
+ size_t numof_messages = sizeof(messages) / sizeof(messages[0]);
- for (size_t i = 0; i < numof_parsers; ++i) {
- const struct worker_msg_parser_it *it = &worker_msg_parsers[i];
- if (strcmp(it->msg, msg->argv[0]))
+ for (size_t i = 0; i < numof_messages; ++i) {
+ if (strcmp(messages[i].cmd, request->argv[0]))
continue;
-
- worker_task_body body = it->parser(msg);
- if (!body)
- return -1;
-
- return worker_schedule_task(worker, msg, body);
- }
-
- return msg_dump_unknown(msg);
-}
-
-static int worker_msg_handler_lock(const struct msg *msg, void *_worker)
-{
- struct worker *worker = (struct worker *)_worker;
- int ret = 0;
-
- ret = pthread_mutex_lock(&worker->task_mtx);
- if (ret) {
- pthread_print_errno(ret, "pthread_mutex_lock");
- return -1;
+ if (!messages[i].parser(request))
+ continue;
+ return messages[i].handler(worker, request);
}
- ret = worker_msg_handler(worker, msg);
-
- pthread_check(pthread_mutex_unlock(&worker->task_mtx), "pthread_mutex_lock");
-
- return ret;
+unknown_request:
+ print_error("Received an unknown message\n");
+ msg_dump(request);
+ struct msg response;
+ msg_error(&response);
+ return msg_send(worker->fd, &response);
}
int worker_main(struct worker *worker, int, char *[])
@@ -266,9 +152,16 @@ int worker_main(struct worker *worker, int, char *[])
return ret;
while (!global_stop_flag) {
+ struct msg request;
+
print_log("Waiting for a new command\n");
- ret = msg_recv_and_handle(worker->fd, worker_msg_handler_lock, worker);
+ ret = msg_recv(worker->fd, &request);
+ if (ret < 0)
+ return ret;
+
+ ret = worker_msg_handler(worker, &request);
+ msg_free(&request);
if (ret < 0)
return ret;
}