From 0ff63a9ceff4c8fcd679b52cb1c03d96675f52f0 Mon Sep 17 00:00:00 2001 From: Egor Tensin Date: Sun, 28 Aug 2022 15:14:07 +0200 Subject: holy crap, it actually kinda works now Previously, I had a stupid system where I would create a thread after every accept(), and put worker descriptors in a queue. A special "scheduler" thread would then pick them out, and give out jobs to complete. The problem was, of course, I couldn't conveniently poll job status from workers. I thought about using poll(), but that turned out to be a horribly complicated API. How do I deal with partial reads, for example? I don't honestly know. Then it hit me that I could just use the threads that handle accept()ed connections as "worker threads", which would synchronously schedule jobs and wait for them to complete. This solves every problem and removes the need for a lot of inter-thread synchronization magic. It even works now, holy crap! You can launch and terminate workers at will, and they will pick up new jobs automatically. As a side not, msg_recv_and_handle turned out to be too limiting and complicated for me, so I got rid of that, and do normal msg_recv/msg_send calls. --- src/CMakeLists.txt | 3 +- src/client.c | 18 ++- src/msg.c | 55 ++++---- src/msg.h | 12 +- src/process.c | 5 + src/process.h | 2 + src/server.c | 361 ++++++++++++++++++++++++++++------------------------- src/server.h | 7 -- src/worker.c | 223 +++++++++------------------------ src/worker_queue.c | 79 ------------ src/worker_queue.h | 26 ---- 11 files changed, 303 insertions(+), 488 deletions(-) delete mode 100644 src/worker_queue.c delete mode 100644 src/worker_queue.h (limited to 'src') diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index c75bb49..e565d78 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -17,8 +17,7 @@ add_executable(server server_main.c server.c msg.c net.c signal.c - tcp_server.c - worker_queue.c) + tcp_server.c) add_executable(client client_main.c client.c msg.c diff --git a/src/client.c b/src/client.c index be46ad1..c711235 100644 --- a/src/client.c +++ b/src/client.c @@ -21,12 +21,22 @@ void client_destroy(const struct client *client) int client_main(const struct client *client, int argc, char *argv[]) { - int result, ret = 0; - struct msg msg = {argc, argv}; + struct msg request = {argc, argv}; + struct msg response; + int ret = 0; - ret = msg_send_and_wait(client->fd, &msg, &result); + ret = msg_send_and_wait(client->fd, &request, &response); if (ret < 0) return ret; - return result; + if (msg_is_error(&response)) { + print_error("Server failed to process the request\n"); + ret = -1; + goto free_response; + } + +free_response: + msg_free(&response); + + return ret; } diff --git a/src/msg.c b/src/msg.c index 863d141..c37cfb5 100644 --- a/src/msg.c +++ b/src/msg.c @@ -5,6 +5,28 @@ #include #include +void msg_success(struct msg *msg) +{ + msg->argc = 0; + msg->argv = NULL; +} + +void msg_error(struct msg *msg) +{ + msg->argc = -1; + msg->argv = NULL; +} + +int msg_is_success(const struct msg *msg) +{ + return msg->argc == 0; +} + +int msg_is_error(const struct msg *msg) +{ + return msg->argc < 0; +} + static int msg_copy_argv(struct msg *msg, char **argv) { msg->argv = calloc(msg->argc, sizeof(char *)); @@ -147,15 +169,15 @@ free_buf: return ret; } -int msg_send_and_wait(int fd, const struct msg *msg, int *result) +int msg_send_and_wait(int fd, const struct msg *request, struct msg *response) { int ret = 0; - ret = msg_send(fd, msg); + ret = msg_send(fd, request); if (ret < 0) return ret; - ret = net_recv_static(fd, result, sizeof(*result)); + ret = msg_recv(fd, response); if (ret < 0) return ret; @@ -184,32 +206,9 @@ free_buf: return ret; } -int msg_recv_and_handle(int fd, msg_handler handler, void *arg) -{ - struct msg msg; - int result; - int ret = 0; - - ret = msg_recv(fd, &msg); - if (ret < 0) - return ret; - - result = handler(&msg, arg); - - ret = net_send_buf(fd, &result, sizeof(result)); - if (ret < 0) - goto free_msg; - -free_msg: - msg_free(&msg); - - return ret; -} - -int msg_dump_unknown(const struct msg *msg) +void msg_dump(const struct msg *msg) { - print_log("Received an unknown message:\n"); + print_log("Message[%d]:\n", msg->argc); for (int i = 0; i < msg->argc; ++i) print_log("\t%s\n", msg->argv[i]); - return -1; } diff --git a/src/msg.h b/src/msg.h index a71b571..e60f2ce 100644 --- a/src/msg.h +++ b/src/msg.h @@ -6,6 +6,12 @@ struct msg { char **argv; }; +void msg_success(struct msg *); +void msg_error(struct msg *); + +int msg_is_success(const struct msg *); +int msg_is_error(const struct msg *); + struct msg *msg_copy(const struct msg *); void msg_free(const struct msg *); @@ -14,10 +20,8 @@ int msg_from_argv(struct msg *, char **argv); int msg_recv(int fd, struct msg *); int msg_send(int fd, const struct msg *); -typedef int (*msg_handler)(const struct msg *, void *arg); -int msg_send_and_wait(int fd, const struct msg *, int *result); -int msg_recv_and_handle(int fd, msg_handler, void *arg); +int msg_send_and_wait(int fd, const struct msg *, struct msg *response); -int msg_dump_unknown(const struct msg *); +void msg_dump(const struct msg *); #endif diff --git a/src/process.c b/src/process.c index c4ff307..fa15641 100644 --- a/src/process.c +++ b/src/process.c @@ -113,3 +113,8 @@ close_pipe: return ret; } + +void proc_output_free(const struct proc_output *output) +{ + free(output->output); +} diff --git a/src/process.h b/src/process.h index 455d4ec..d47536f 100644 --- a/src/process.h +++ b/src/process.h @@ -18,4 +18,6 @@ int proc_spawn(const char *args[], int *ec); * In that case, you'll need to free the output. */ int proc_capture(const char *args[], struct proc_output *result); +void proc_output_free(const struct proc_output *); + #endif diff --git a/src/server.c b/src/server.c index b5c7463..4706c71 100644 --- a/src/server.c +++ b/src/server.c @@ -4,100 +4,14 @@ #include "msg.h" #include "signal.h" #include "tcp_server.h" -#include "worker_queue.h" #include +#include #include #include -static int server_has_runs_and_workers(const struct server *server) -{ - return !ci_queue_is_empty(&server->ci_queue) && - !worker_queue_is_empty(&server->worker_queue); -} - -static int server_scheduler_iteration(struct server *server) -{ - struct worker_queue_entry *worker; - struct ci_queue_entry *ci_run; - struct msg msg; - int response, ret = 0; - - worker = worker_queue_pop(&server->worker_queue); - ci_run = ci_queue_pop(&server->ci_queue); - - char *argv[] = {"ci_run", ci_run->url, ci_run->rev, NULL}; - - ret = msg_from_argv(&msg, argv); - if (ret < 0) - goto requeue_ci_run; - - ret = msg_send_and_wait(worker->fd, &msg, &response); - if (ret < 0) - goto free_msg; - - if (response < 0) { - print_error("Failed to schedule a CI run\n"); - } - - msg_free(&msg); - - ci_queue_entry_destroy(ci_run); - - /* FIXME: Don't mark worker as free! */ - worker_queue_push_head(&server->worker_queue, worker); - - return 0; - -free_msg: - msg_free(&msg); - -requeue_ci_run: - ci_queue_push_head(&server->ci_queue, ci_run); - - worker_queue_push_head(&server->worker_queue, worker); - - return ret; -} - -static void *server_scheduler(void *_server) -{ - struct server *server = (struct server *)_server; - int ret = 0; - - ret = pthread_mutex_lock(&server->server_mtx); - if (ret) { - pthread_print_errno(ret, "pthread_mutex_lock"); - goto exit; - } - - while (1) { - while (!server->stopping && !server_has_runs_and_workers(server)) { - ret = pthread_cond_wait(&server->server_cv, &server->server_mtx); - if (ret) { - pthread_print_errno(ret, "pthread_cond_wait"); - goto unlock; - } - } - - if (server->stopping) - goto unlock; - - ret = server_scheduler_iteration(server); - if (ret < 0) - goto unlock; - } - -unlock: - pthread_check(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock"); - -exit: - return NULL; -} - int server_create(struct server *server, const struct settings *settings) { - pthread_attr_t scheduler_attr; int ret = 0; ret = pthread_mutex_init(&server->server_mtx, NULL); @@ -118,40 +32,10 @@ int server_create(struct server *server, const struct settings *settings) if (ret < 0) goto destroy_cv; - worker_queue_create(&server->worker_queue); - ci_queue_create(&server->ci_queue); - ret = pthread_attr_init(&scheduler_attr); - if (ret) { - pthread_print_errno(ret, "pthread_attr_init"); - goto destroy_ci_queue; - } - - ret = signal_set_thread_attr(&scheduler_attr); - if (ret) - goto destroy_attr; - - ret = pthread_create(&server->scheduler, &scheduler_attr, server_scheduler, server); - if (ret) { - pthread_print_errno(ret, "pthread_create"); - goto destroy_attr; - } - - pthread_check(pthread_attr_destroy(&scheduler_attr), "pthread_attr_destroy"); - return ret; -destroy_attr: - pthread_check(pthread_attr_destroy(&scheduler_attr), "pthread_attr_destroy"); - -destroy_ci_queue: - ci_queue_destroy(&server->ci_queue); - - worker_queue_destroy(&server->worker_queue); - - tcp_server_destroy(&server->tcp_server); - destroy_cv: pthread_check(pthread_cond_destroy(&server->server_cv), "pthread_cond_destroy"); @@ -166,59 +50,48 @@ void server_destroy(struct server *server) { print_log("Shutting down\n"); - pthread_check(pthread_join(server->scheduler, NULL), "pthread_join"); ci_queue_destroy(&server->ci_queue); - worker_queue_destroy(&server->worker_queue); tcp_server_destroy(&server->tcp_server); pthread_check(pthread_cond_destroy(&server->server_cv), "pthread_cond_destroy"); pthread_check(pthread_mutex_destroy(&server->server_mtx), "pthread_mutex_destroy"); } -struct msg_context { - struct server *server; - int client_fd; -}; - -static int msg_new_worker(const struct msg *, void *_ctx) +static int server_has_runs(const struct server *server) { - struct msg_context *ctx = (struct msg_context *)_ctx; - return server_new_worker(ctx->server, ctx->client_fd); + return !ci_queue_is_empty(&server->ci_queue); } -static int msg_ci_run(const struct msg *msg, void *_ctx) +static int worker_ci_run(int fd, const struct ci_queue_entry *ci_run) { - struct msg_context *ctx = (struct msg_context *)_ctx; + struct msg request, response; + int ret = 0; - if (msg->argc != 3) { - print_error("Invalid number of arguments for a message: %d\n", msg->argc); - return -1; - } + char *argv[] = {"ci_run", ci_run->url, ci_run->rev, NULL}; - return server_ci_run(ctx->server, msg->argv[1], msg->argv[2]); -} + ret = msg_from_argv(&request, argv); + if (ret < 0) + return ret; -static int server_msg_handler(const struct msg *msg, void *ctx) -{ - if (msg->argc == 0) { - print_error("Received an empty message\n"); - return -1; + ret = msg_send_and_wait(fd, &request, &response); + msg_free(&request); + if (ret < 0) + return ret; + + if (response.argc < 0) { + print_error("Failed ot schedule a CI run: worker is busy?\n"); + ret = -1; + goto free_response; } - if (!strcmp(msg->argv[0], "new_worker")) - return msg_new_worker(msg, ctx); - if (!strcmp(msg->argv[0], "ci_run")) - return msg_ci_run(msg, ctx); + /* TODO: handle the response. */ - return msg_dump_unknown(msg); -} +free_response: + msg_free(&response); -static int server_conn_handler(int fd, void *server) -{ - struct msg_context ctx = {server, fd}; - return msg_recv_and_handle(fd, server_msg_handler, &ctx); + return ret; } -static int server_set_stopping(struct server *server) +static int worker_dequeue_run(struct server *server, struct ci_queue_entry **ci_run) { int ret = 0; @@ -228,39 +101,96 @@ static int server_set_stopping(struct server *server) return ret; } - server->stopping = 1; + while (!server->stopping && !server_has_runs(server)) { + ret = pthread_cond_wait(&server->server_cv, &server->server_mtx); + if (ret) { + pthread_print_errno(ret, "pthread_cond_wait"); + goto unlock; + } + } - ret = pthread_cond_signal(&server->server_cv); - if (ret) { - pthread_print_errno(ret, "pthread_cond_signal"); + if (server->stopping) { + ret = -1; goto unlock; } + *ci_run = ci_queue_pop(&server->ci_queue); + goto unlock; + unlock: pthread_check(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock"); return ret; } -int server_main(struct server *server) +static int worker_requeue_run(struct server *server, struct ci_queue_entry *ci_run) { int ret = 0; - while (!global_stop_flag) { - ret = tcp_server_accept(&server->tcp_server, server_conn_handler, server); + ret = pthread_mutex_lock(&server->server_mtx); + if (ret) { + pthread_print_errno(ret, "pthread_mutex_lock"); + return ret; + } + + ci_queue_push_head(&server->ci_queue, ci_run); + + pthread_check(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock"); + + return ret; +} + +static int worker_iteration(struct server *server, int fd) +{ + struct ci_queue_entry *ci_run; + int ret = 0; + + ret = worker_dequeue_run(server, &ci_run); + if (ret < 0) + return ret; + + ret = worker_ci_run(fd, ci_run); + if (ret < 0) + goto requeue_run; + + ci_queue_entry_destroy(ci_run); + return ret; + +requeue_run: + worker_requeue_run(server, ci_run); + + return ret; +} + +static int worker_thread(struct server *server, int fd) +{ + int ret = 0; + + while (1) { + ret = worker_iteration(server, fd); if (ret < 0) - break; + return ret; } - return server_set_stopping(server); + return ret; } -int server_new_worker(struct server *server, int fd) +static int msg_new_worker_handler(struct server *server, int client_fd, const struct msg *) { - struct worker_queue_entry *entry; + return worker_thread(server, client_fd); +} + +static int msg_new_worker_parser(const struct msg *) +{ + return 1; +} + +static int msg_ci_run_queue(struct server *server, const char *url, const char *rev) +{ + struct ci_queue_entry *entry; int ret = 0; - print_log("Registering a new worker\n"); + print_log("Scheduling a new CI run for repository %s\n", url); ret = pthread_mutex_lock(&server->server_mtx); if (ret) { @@ -268,15 +198,16 @@ int server_new_worker(struct server *server, int fd) return ret; } - ret = worker_queue_entry_create(&entry, fd); + ret = ci_queue_entry_create(&entry, url, rev); if (ret < 0) goto unlock; - worker_queue_push(&server->worker_queue, entry); + ci_queue_push(&server->ci_queue, entry); ret = pthread_cond_signal(&server->server_cv); if (ret) { pthread_print_errno(ret, "pthread_cond_signal"); + ret = 0; goto unlock; } @@ -286,12 +217,85 @@ unlock: return ret; } -int server_ci_run(struct server *server, const char *url, const char *rev) +static int msg_ci_run_handler(struct server *server, int client_fd, const struct msg *msg) { - struct ci_queue_entry *entry; + struct msg response; int ret = 0; - print_log("Scheduling a new CI run for repository %s\n", url); + ret = msg_ci_run_queue(server, msg->argv[1], msg->argv[2]); + if (ret < 0) + msg_error(&response); + else + msg_success(&response); + + return msg_send(client_fd, &response); +} + +static int msg_ci_run_parser(const struct msg *msg) +{ + if (msg->argc != 3) { + print_error("Invalid number of arguments for a message: %d\n", msg->argc); + return 0; + } + + return 1; +} + +typedef int (*msg_parser)(const struct msg *msg); +typedef int (*msg_handler)(struct server *, int client_fd, const struct msg *msg); + +struct msg_descr { + const char *cmd; + msg_parser parser; + msg_handler handler; +}; + +struct msg_descr messages[] = { + {"new_worker", msg_new_worker_parser, msg_new_worker_handler}, + {"ci_run", msg_ci_run_parser, msg_ci_run_handler}, +}; + +static int server_msg_handler(struct server *server, int client_fd, const struct msg *request) +{ + if (request->argc == 0) + goto unknown_request; + + size_t numof_messages = sizeof(messages) / sizeof(messages[0]); + + for (size_t i = 0; i < numof_messages; ++i) { + if (strcmp(messages[i].cmd, request->argv[0])) + continue; + if (!messages[i].parser(request)) + continue; + return messages[i].handler(server, client_fd, request); + } + +unknown_request: + print_error("Received an unknown message\n"); + msg_dump(request); + struct msg response; + msg_error(&response); + return msg_send(client_fd, &response); +} + +static int server_conn_handler(int client_fd, void *_server) +{ + struct server *server = (struct server *)_server; + struct msg request; + int ret = 0; + + ret = msg_recv(client_fd, &request); + if (ret < 0) + return ret; + + ret = server_msg_handler(server, client_fd, &request); + msg_free(&request); + return ret; +} + +static int server_set_stopping(struct server *server) +{ + int ret = 0; ret = pthread_mutex_lock(&server->server_mtx); if (ret) { @@ -299,11 +303,7 @@ int server_ci_run(struct server *server, const char *url, const char *rev) return ret; } - ret = ci_queue_entry_create(&entry, url, rev); - if (ret < 0) - goto unlock; - - ci_queue_push(&server->ci_queue, entry); + server->stopping = 1; ret = pthread_cond_signal(&server->server_cv); if (ret) { @@ -316,3 +316,18 @@ unlock: return ret; } + +int server_main(struct server *server) +{ + int ret = 0; + + while (!global_stop_flag) { + print_log("Waiting for new connections\n"); + + ret = tcp_server_accept(&server->tcp_server, server_conn_handler, server); + if (ret < 0) + break; + } + + return server_set_stopping(server); +} diff --git a/src/server.h b/src/server.h index 9107b4c..ebd88a1 100644 --- a/src/server.h +++ b/src/server.h @@ -3,7 +3,6 @@ #include "ci_queue.h" #include "tcp_server.h" -#include "worker_queue.h" #include @@ -19,10 +18,7 @@ struct server { struct tcp_server tcp_server; - struct worker_queue worker_queue; struct ci_queue ci_queue; - - pthread_t scheduler; }; int server_create(struct server *, const struct settings *); @@ -30,7 +26,4 @@ void server_destroy(struct server *); int server_main(struct server *); -int server_new_worker(struct server *, int fd); -int server_ci_run(struct server *, const char *url, const char *rev); - #endif diff --git a/src/worker.c b/src/worker.c index 19270db..3b48636 100644 --- a/src/worker.c +++ b/src/worker.c @@ -26,18 +26,8 @@ int worker_create(struct worker *worker, const struct settings *settings) goto git_shutdown; worker->fd = ret; - ret = pthread_mutex_init(&worker->task_mtx, NULL); - if (ret) { - pthread_print_errno(ret, "pthread_mutex_init"); - goto close; - } - - worker->task_active = 0; return ret; -close: - check_errno(close(worker->fd), "close"); - git_shutdown: libgit_shutdown(); @@ -48,11 +38,6 @@ void worker_destroy(struct worker *worker) { print_log("Shutting down\n"); - if (worker->task_active) { - pthread_check(pthread_join(worker->task, NULL), "pthread_join"); - worker->task_active = 0; - } - pthread_check(pthread_mutex_destroy(&worker->task_mtx), "pthread_mutex_destroy"); check_errno(close(worker->fd), "close"); libgit_shutdown(); } @@ -61,200 +46,101 @@ static int msg_send_new_worker(const struct worker *worker) { static char *argv[] = {"new_worker", NULL}; struct msg msg; - int response, ret = 0; + int ret = 0; ret = msg_from_argv(&msg, argv); if (ret < 0) return ret; - ret = msg_send_and_wait(worker->fd, &msg, &response); + ret = msg_send(worker->fd, &msg); if (ret < 0) goto free_msg; - if (response < 0) { - print_error("Failed to register\n"); - ret = response; - goto free_msg; - } - free_msg: msg_free(&msg); return ret; } -typedef int (*worker_task_body)(const struct msg *); - -static int msg_body_ci_run(const struct msg *msg) +static int msg_ci_run_do(const char *url, const char *rev, struct proc_output *result) { - const char *url = msg->argv[1]; - const char *rev = msg->argv[2]; - struct proc_output result; int ret = 0; - ret = ci_run_git_repo(url, rev, &result); + ret = ci_run_git_repo(url, rev, result); if (ret < 0) { print_error("Run failed with an error\n"); return ret; } - print_log("Process exit code: %d\n", result.ec); - print_log("Process output:\n%s", result.output); - if (!result.output || !result.output_len || result.output[result.output_len - 1] != '\n') + print_log("Process exit code: %d\n", result->ec); + print_log("Process output:\n%s", result->output); + if (!result->output || !result->output_len || + result->output[result->output_len - 1] != '\n') print_log("\n"); - free(result.output); - return ret; + return 0; } -typedef worker_task_body (*worker_msg_parser)(const struct msg *); +static int msg_ci_run_handler(struct worker *worker, const struct msg *request) +{ + struct msg response; + struct proc_output result; + int ret = 0; + + ret = msg_ci_run_do(request->argv[1], request->argv[2], &result); + if (ret < 0) + msg_error(&response); + else + msg_success(&response); + proc_output_free(&result); -static worker_task_body parse_msg_ci_run(const struct msg *msg) + return msg_send(worker->fd, &response); +} + +static int msg_ci_run_parser(const struct msg *msg) { if (msg->argc != 3) { print_error("Invalid number of arguments for a message: %d\n", msg->argc); - return NULL; + return 0; } - return msg_body_ci_run; + return 1; } -struct worker_msg_parser_it { - const char *msg; - worker_msg_parser parser; -}; +typedef int (*msg_parser)(const struct msg *msg); +typedef int (*msg_handler)(struct worker *, const struct msg *); -struct worker_msg_parser_it worker_msg_parsers[] = { - {"ci_run", parse_msg_ci_run}, +struct msg_descr { + const char *cmd; + msg_parser parser; + msg_handler handler; }; -struct worker_task_context { - struct msg *msg; - worker_task_body body; +struct msg_descr messages[] = { + {"ci_run", msg_ci_run_parser, msg_ci_run_handler}, }; -static void *worker_task_wrapper(void *_ctx) -{ - struct worker_task_context *ctx = (struct worker_task_context *)_ctx; - - ctx->body(ctx->msg); - msg_free(ctx->msg); - free(ctx->msg); - free(ctx); - return NULL; -} - -static int worker_schedule_task(struct worker *worker, const struct msg *msg, worker_task_body body) -{ - struct worker_task_context *ctx; - pthread_attr_t attr; - int ret = 0; - - ctx = malloc(sizeof(*ctx)); - if (!ctx) { - print_errno("malloc"); - return -1; - } - ctx->body = body; - - ctx->msg = msg_copy(msg); - if (!ctx->msg) { - ret = -1; - goto free_ctx; - } - - ret = pthread_attr_init(&attr); - if (ret) { - pthread_print_errno(ret, "pthread_attr_init"); - goto free_msg; - } - - ret = signal_set_thread_attr(&attr); - if (ret < 0) - goto free_attr; - - ret = pthread_create(&worker->task, NULL, worker_task_wrapper, ctx); - if (ret) { - pthread_print_errno(ret, "pthread_create"); - goto free_attr; - } - worker->task_active = 1; - pthread_check(pthread_attr_destroy(&attr), "pthread_attr_destroy"); - - return ret; - -free_attr: - pthread_check(pthread_attr_destroy(&attr), "pthread_attr_destroy"); - -free_msg: - msg_free(ctx->msg); - free(ctx->msg); - -free_ctx: - free(ctx); - - return ret; -} - -static int worker_msg_handler(struct worker *worker, const struct msg *msg) +static int worker_msg_handler(struct worker *worker, const struct msg *request) { - if (worker->task_active) { - int ret = pthread_tryjoin_np(worker->task, NULL); - switch (ret) { - case 0: - worker->task_active = 0; - break; - case EBUSY: - break; - default: - pthread_print_errno(ret, "pthread_tryjoin_np"); - return ret; - } - } - - if (worker->task_active) { - print_error("Worker is busy\n"); - return -1; - } - - if (msg->argc == 0) { - print_error("Received an empty message\n"); - return -1; - } + if (request->argc == 0) + goto unknown_request; - size_t numof_parsers = sizeof(worker_msg_parsers) / sizeof(worker_msg_parsers[0]); + size_t numof_messages = sizeof(messages) / sizeof(messages[0]); - for (size_t i = 0; i < numof_parsers; ++i) { - const struct worker_msg_parser_it *it = &worker_msg_parsers[i]; - if (strcmp(it->msg, msg->argv[0])) + for (size_t i = 0; i < numof_messages; ++i) { + if (strcmp(messages[i].cmd, request->argv[0])) continue; - - worker_task_body body = it->parser(msg); - if (!body) - return -1; - - return worker_schedule_task(worker, msg, body); - } - - return msg_dump_unknown(msg); -} - -static int worker_msg_handler_lock(const struct msg *msg, void *_worker) -{ - struct worker *worker = (struct worker *)_worker; - int ret = 0; - - ret = pthread_mutex_lock(&worker->task_mtx); - if (ret) { - pthread_print_errno(ret, "pthread_mutex_lock"); - return -1; + if (!messages[i].parser(request)) + continue; + return messages[i].handler(worker, request); } - ret = worker_msg_handler(worker, msg); - - pthread_check(pthread_mutex_unlock(&worker->task_mtx), "pthread_mutex_lock"); - - return ret; +unknown_request: + print_error("Received an unknown message\n"); + msg_dump(request); + struct msg response; + msg_error(&response); + return msg_send(worker->fd, &response); } int worker_main(struct worker *worker, int, char *[]) @@ -266,9 +152,16 @@ int worker_main(struct worker *worker, int, char *[]) return ret; while (!global_stop_flag) { + struct msg request; + print_log("Waiting for a new command\n"); - ret = msg_recv_and_handle(worker->fd, worker_msg_handler_lock, worker); + ret = msg_recv(worker->fd, &request); + if (ret < 0) + return ret; + + ret = worker_msg_handler(worker, &request); + msg_free(&request); if (ret < 0) return ret; } diff --git a/src/worker_queue.c b/src/worker_queue.c deleted file mode 100644 index c832746..0000000 --- a/src/worker_queue.c +++ /dev/null @@ -1,79 +0,0 @@ -#include "worker_queue.h" -#include "log.h" - -#include -#include -#include - -int worker_queue_entry_create(struct worker_queue_entry **entry, int fd) -{ - int newfd = dup(fd); - - if (newfd < 0) { - print_errno("malloc"); - return -1; - } - - *entry = malloc(sizeof(struct worker_queue_entry)); - if (!*entry) { - print_errno("malloc"); - goto close_newfd; - } - (*entry)->fd = newfd; - - return 0; - -close_newfd: - check_errno(close(newfd), "close"); - - return -1; -} - -void worker_queue_entry_destroy(struct worker_queue_entry *entry) -{ - check_errno(close(entry->fd), "close"); - free(entry); -} - -void worker_queue_create(struct worker_queue *queue) -{ - STAILQ_INIT(queue); -} - -void worker_queue_destroy(struct worker_queue *queue) -{ - struct worker_queue_entry *entry1, *entry2; - - entry1 = STAILQ_FIRST(queue); - while (entry1) { - entry2 = STAILQ_NEXT(entry1, entries); - worker_queue_entry_destroy(entry1); - entry1 = entry2; - } - STAILQ_INIT(queue); -} - -int worker_queue_is_empty(const struct worker_queue *queue) -{ - return STAILQ_EMPTY(queue); -} - -void worker_queue_push(struct worker_queue *queue, struct worker_queue_entry *entry) -{ - STAILQ_INSERT_TAIL(queue, entry, entries); -} - -void worker_queue_push_head(struct worker_queue *queue, struct worker_queue_entry *entry) -{ - STAILQ_INSERT_HEAD(queue, entry, entries); -} - -struct worker_queue_entry *worker_queue_pop(struct worker_queue *queue) -{ - struct worker_queue_entry *entry; - - entry = STAILQ_FIRST(queue); - STAILQ_REMOVE_HEAD(queue, entries); - - return entry; -} diff --git a/src/worker_queue.h b/src/worker_queue.h deleted file mode 100644 index d5e0bb2..0000000 --- a/src/worker_queue.h +++ /dev/null @@ -1,26 +0,0 @@ -#ifndef __WORKER_QUEUE_H__ -#define __WORKER_QUEUE_H__ - -#include - -struct worker_queue_entry { - int fd; - STAILQ_ENTRY(worker_queue_entry) entries; -}; - -int worker_queue_entry_create(struct worker_queue_entry **, int fd); -void worker_queue_entry_destroy(struct worker_queue_entry *); - -STAILQ_HEAD(worker_queue, worker_queue_entry); - -void worker_queue_create(struct worker_queue *); -void worker_queue_destroy(struct worker_queue *); - -int worker_queue_is_empty(const struct worker_queue *); - -void worker_queue_push(struct worker_queue *, struct worker_queue_entry *); -void worker_queue_push_head(struct worker_queue *, struct worker_queue_entry *); - -struct worker_queue_entry *worker_queue_pop(struct worker_queue *); - -#endif -- cgit v1.2.3