diff options
-rw-r--r-- | src/CMakeLists.txt | 3 | ||||
-rw-r--r-- | src/client.c | 18 | ||||
-rw-r--r-- | src/msg.c | 55 | ||||
-rw-r--r-- | src/msg.h | 12 | ||||
-rw-r--r-- | src/process.c | 5 | ||||
-rw-r--r-- | src/process.h | 2 | ||||
-rw-r--r-- | src/server.c | 361 | ||||
-rw-r--r-- | src/server.h | 7 | ||||
-rw-r--r-- | src/worker.c | 223 | ||||
-rw-r--r-- | src/worker_queue.c | 79 | ||||
-rw-r--r-- | src/worker_queue.h | 26 |
11 files changed, 303 insertions, 488 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index c75bb49..e565d78 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -17,8 +17,7 @@ add_executable(server server_main.c server.c msg.c net.c signal.c - tcp_server.c - worker_queue.c) + tcp_server.c) add_executable(client client_main.c client.c msg.c diff --git a/src/client.c b/src/client.c index be46ad1..c711235 100644 --- a/src/client.c +++ b/src/client.c @@ -21,12 +21,22 @@ void client_destroy(const struct client *client) int client_main(const struct client *client, int argc, char *argv[]) { - int result, ret = 0; - struct msg msg = {argc, argv}; + struct msg request = {argc, argv}; + struct msg response; + int ret = 0; - ret = msg_send_and_wait(client->fd, &msg, &result); + ret = msg_send_and_wait(client->fd, &request, &response); if (ret < 0) return ret; - return result; + if (msg_is_error(&response)) { + print_error("Server failed to process the request\n"); + ret = -1; + goto free_response; + } + +free_response: + msg_free(&response); + + return ret; } @@ -5,6 +5,28 @@ #include <stdlib.h> #include <string.h> +void msg_success(struct msg *msg) +{ + msg->argc = 0; + msg->argv = NULL; +} + +void msg_error(struct msg *msg) +{ + msg->argc = -1; + msg->argv = NULL; +} + +int msg_is_success(const struct msg *msg) +{ + return msg->argc == 0; +} + +int msg_is_error(const struct msg *msg) +{ + return msg->argc < 0; +} + static int msg_copy_argv(struct msg *msg, char **argv) { msg->argv = calloc(msg->argc, sizeof(char *)); @@ -147,15 +169,15 @@ free_buf: return ret; } -int msg_send_and_wait(int fd, const struct msg *msg, int *result) +int msg_send_and_wait(int fd, const struct msg *request, struct msg *response) { int ret = 0; - ret = msg_send(fd, msg); + ret = msg_send(fd, request); if (ret < 0) return ret; - ret = net_recv_static(fd, result, sizeof(*result)); + ret = msg_recv(fd, response); if (ret < 0) return ret; @@ -184,32 +206,9 @@ free_buf: return ret; } -int msg_recv_and_handle(int fd, msg_handler handler, void *arg) -{ - struct msg msg; - int result; - int ret = 0; - - ret = msg_recv(fd, &msg); - if (ret < 0) - return ret; - - result = handler(&msg, arg); - - ret = net_send_buf(fd, &result, sizeof(result)); - if (ret < 0) - goto free_msg; - -free_msg: - msg_free(&msg); - - return ret; -} - -int msg_dump_unknown(const struct msg *msg) +void msg_dump(const struct msg *msg) { - print_log("Received an unknown message:\n"); + print_log("Message[%d]:\n", msg->argc); for (int i = 0; i < msg->argc; ++i) print_log("\t%s\n", msg->argv[i]); - return -1; } @@ -6,6 +6,12 @@ struct msg { char **argv; }; +void msg_success(struct msg *); +void msg_error(struct msg *); + +int msg_is_success(const struct msg *); +int msg_is_error(const struct msg *); + struct msg *msg_copy(const struct msg *); void msg_free(const struct msg *); @@ -14,10 +20,8 @@ int msg_from_argv(struct msg *, char **argv); int msg_recv(int fd, struct msg *); int msg_send(int fd, const struct msg *); -typedef int (*msg_handler)(const struct msg *, void *arg); -int msg_send_and_wait(int fd, const struct msg *, int *result); -int msg_recv_and_handle(int fd, msg_handler, void *arg); +int msg_send_and_wait(int fd, const struct msg *, struct msg *response); -int msg_dump_unknown(const struct msg *); +void msg_dump(const struct msg *); #endif diff --git a/src/process.c b/src/process.c index c4ff307..fa15641 100644 --- a/src/process.c +++ b/src/process.c @@ -113,3 +113,8 @@ close_pipe: return ret; } + +void proc_output_free(const struct proc_output *output) +{ + free(output->output); +} diff --git a/src/process.h b/src/process.h index 455d4ec..d47536f 100644 --- a/src/process.h +++ b/src/process.h @@ -18,4 +18,6 @@ int proc_spawn(const char *args[], int *ec); * In that case, you'll need to free the output. */ int proc_capture(const char *args[], struct proc_output *result); +void proc_output_free(const struct proc_output *); + #endif diff --git a/src/server.c b/src/server.c index b5c7463..4706c71 100644 --- a/src/server.c +++ b/src/server.c @@ -4,100 +4,14 @@ #include "msg.h" #include "signal.h" #include "tcp_server.h" -#include "worker_queue.h" #include <pthread.h> +#include <stdlib.h> #include <string.h> #include <unistd.h> -static int server_has_runs_and_workers(const struct server *server) -{ - return !ci_queue_is_empty(&server->ci_queue) && - !worker_queue_is_empty(&server->worker_queue); -} - -static int server_scheduler_iteration(struct server *server) -{ - struct worker_queue_entry *worker; - struct ci_queue_entry *ci_run; - struct msg msg; - int response, ret = 0; - - worker = worker_queue_pop(&server->worker_queue); - ci_run = ci_queue_pop(&server->ci_queue); - - char *argv[] = {"ci_run", ci_run->url, ci_run->rev, NULL}; - - ret = msg_from_argv(&msg, argv); - if (ret < 0) - goto requeue_ci_run; - - ret = msg_send_and_wait(worker->fd, &msg, &response); - if (ret < 0) - goto free_msg; - - if (response < 0) { - print_error("Failed to schedule a CI run\n"); - } - - msg_free(&msg); - - ci_queue_entry_destroy(ci_run); - - /* FIXME: Don't mark worker as free! */ - worker_queue_push_head(&server->worker_queue, worker); - - return 0; - -free_msg: - msg_free(&msg); - -requeue_ci_run: - ci_queue_push_head(&server->ci_queue, ci_run); - - worker_queue_push_head(&server->worker_queue, worker); - - return ret; -} - -static void *server_scheduler(void *_server) -{ - struct server *server = (struct server *)_server; - int ret = 0; - - ret = pthread_mutex_lock(&server->server_mtx); - if (ret) { - pthread_print_errno(ret, "pthread_mutex_lock"); - goto exit; - } - - while (1) { - while (!server->stopping && !server_has_runs_and_workers(server)) { - ret = pthread_cond_wait(&server->server_cv, &server->server_mtx); - if (ret) { - pthread_print_errno(ret, "pthread_cond_wait"); - goto unlock; - } - } - - if (server->stopping) - goto unlock; - - ret = server_scheduler_iteration(server); - if (ret < 0) - goto unlock; - } - -unlock: - pthread_check(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock"); - -exit: - return NULL; -} - int server_create(struct server *server, const struct settings *settings) { - pthread_attr_t scheduler_attr; int ret = 0; ret = pthread_mutex_init(&server->server_mtx, NULL); @@ -118,40 +32,10 @@ int server_create(struct server *server, const struct settings *settings) if (ret < 0) goto destroy_cv; - worker_queue_create(&server->worker_queue); - ci_queue_create(&server->ci_queue); - ret = pthread_attr_init(&scheduler_attr); - if (ret) { - pthread_print_errno(ret, "pthread_attr_init"); - goto destroy_ci_queue; - } - - ret = signal_set_thread_attr(&scheduler_attr); - if (ret) - goto destroy_attr; - - ret = pthread_create(&server->scheduler, &scheduler_attr, server_scheduler, server); - if (ret) { - pthread_print_errno(ret, "pthread_create"); - goto destroy_attr; - } - - pthread_check(pthread_attr_destroy(&scheduler_attr), "pthread_attr_destroy"); - return ret; -destroy_attr: - pthread_check(pthread_attr_destroy(&scheduler_attr), "pthread_attr_destroy"); - -destroy_ci_queue: - ci_queue_destroy(&server->ci_queue); - - worker_queue_destroy(&server->worker_queue); - - tcp_server_destroy(&server->tcp_server); - destroy_cv: pthread_check(pthread_cond_destroy(&server->server_cv), "pthread_cond_destroy"); @@ -166,59 +50,48 @@ void server_destroy(struct server *server) { print_log("Shutting down\n"); - pthread_check(pthread_join(server->scheduler, NULL), "pthread_join"); ci_queue_destroy(&server->ci_queue); - worker_queue_destroy(&server->worker_queue); tcp_server_destroy(&server->tcp_server); pthread_check(pthread_cond_destroy(&server->server_cv), "pthread_cond_destroy"); pthread_check(pthread_mutex_destroy(&server->server_mtx), "pthread_mutex_destroy"); } -struct msg_context { - struct server *server; - int client_fd; -}; - -static int msg_new_worker(const struct msg *, void *_ctx) +static int server_has_runs(const struct server *server) { - struct msg_context *ctx = (struct msg_context *)_ctx; - return server_new_worker(ctx->server, ctx->client_fd); + return !ci_queue_is_empty(&server->ci_queue); } -static int msg_ci_run(const struct msg *msg, void *_ctx) +static int worker_ci_run(int fd, const struct ci_queue_entry *ci_run) { - struct msg_context *ctx = (struct msg_context *)_ctx; + struct msg request, response; + int ret = 0; - if (msg->argc != 3) { - print_error("Invalid number of arguments for a message: %d\n", msg->argc); - return -1; - } + char *argv[] = {"ci_run", ci_run->url, ci_run->rev, NULL}; - return server_ci_run(ctx->server, msg->argv[1], msg->argv[2]); -} + ret = msg_from_argv(&request, argv); + if (ret < 0) + return ret; -static int server_msg_handler(const struct msg *msg, void *ctx) -{ - if (msg->argc == 0) { - print_error("Received an empty message\n"); - return -1; + ret = msg_send_and_wait(fd, &request, &response); + msg_free(&request); + if (ret < 0) + return ret; + + if (response.argc < 0) { + print_error("Failed ot schedule a CI run: worker is busy?\n"); + ret = -1; + goto free_response; } - if (!strcmp(msg->argv[0], "new_worker")) - return msg_new_worker(msg, ctx); - if (!strcmp(msg->argv[0], "ci_run")) - return msg_ci_run(msg, ctx); + /* TODO: handle the response. */ - return msg_dump_unknown(msg); -} +free_response: + msg_free(&response); -static int server_conn_handler(int fd, void *server) -{ - struct msg_context ctx = {server, fd}; - return msg_recv_and_handle(fd, server_msg_handler, &ctx); + return ret; } -static int server_set_stopping(struct server *server) +static int worker_dequeue_run(struct server *server, struct ci_queue_entry **ci_run) { int ret = 0; @@ -228,39 +101,96 @@ static int server_set_stopping(struct server *server) return ret; } - server->stopping = 1; + while (!server->stopping && !server_has_runs(server)) { + ret = pthread_cond_wait(&server->server_cv, &server->server_mtx); + if (ret) { + pthread_print_errno(ret, "pthread_cond_wait"); + goto unlock; + } + } - ret = pthread_cond_signal(&server->server_cv); - if (ret) { - pthread_print_errno(ret, "pthread_cond_signal"); + if (server->stopping) { + ret = -1; goto unlock; } + *ci_run = ci_queue_pop(&server->ci_queue); + goto unlock; + unlock: pthread_check(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock"); return ret; } -int server_main(struct server *server) +static int worker_requeue_run(struct server *server, struct ci_queue_entry *ci_run) { int ret = 0; - while (!global_stop_flag) { - ret = tcp_server_accept(&server->tcp_server, server_conn_handler, server); + ret = pthread_mutex_lock(&server->server_mtx); + if (ret) { + pthread_print_errno(ret, "pthread_mutex_lock"); + return ret; + } + + ci_queue_push_head(&server->ci_queue, ci_run); + + pthread_check(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock"); + + return ret; +} + +static int worker_iteration(struct server *server, int fd) +{ + struct ci_queue_entry *ci_run; + int ret = 0; + + ret = worker_dequeue_run(server, &ci_run); + if (ret < 0) + return ret; + + ret = worker_ci_run(fd, ci_run); + if (ret < 0) + goto requeue_run; + + ci_queue_entry_destroy(ci_run); + return ret; + +requeue_run: + worker_requeue_run(server, ci_run); + + return ret; +} + +static int worker_thread(struct server *server, int fd) +{ + int ret = 0; + + while (1) { + ret = worker_iteration(server, fd); if (ret < 0) - break; + return ret; } - return server_set_stopping(server); + return ret; } -int server_new_worker(struct server *server, int fd) +static int msg_new_worker_handler(struct server *server, int client_fd, const struct msg *) { - struct worker_queue_entry *entry; + return worker_thread(server, client_fd); +} + +static int msg_new_worker_parser(const struct msg *) +{ + return 1; +} + +static int msg_ci_run_queue(struct server *server, const char *url, const char *rev) +{ + struct ci_queue_entry *entry; int ret = 0; - print_log("Registering a new worker\n"); + print_log("Scheduling a new CI run for repository %s\n", url); ret = pthread_mutex_lock(&server->server_mtx); if (ret) { @@ -268,15 +198,16 @@ int server_new_worker(struct server *server, int fd) return ret; } - ret = worker_queue_entry_create(&entry, fd); + ret = ci_queue_entry_create(&entry, url, rev); if (ret < 0) goto unlock; - worker_queue_push(&server->worker_queue, entry); + ci_queue_push(&server->ci_queue, entry); ret = pthread_cond_signal(&server->server_cv); if (ret) { pthread_print_errno(ret, "pthread_cond_signal"); + ret = 0; goto unlock; } @@ -286,12 +217,85 @@ unlock: return ret; } -int server_ci_run(struct server *server, const char *url, const char *rev) +static int msg_ci_run_handler(struct server *server, int client_fd, const struct msg *msg) { - struct ci_queue_entry *entry; + struct msg response; int ret = 0; - print_log("Scheduling a new CI run for repository %s\n", url); + ret = msg_ci_run_queue(server, msg->argv[1], msg->argv[2]); + if (ret < 0) + msg_error(&response); + else + msg_success(&response); + + return msg_send(client_fd, &response); +} + +static int msg_ci_run_parser(const struct msg *msg) +{ + if (msg->argc != 3) { + print_error("Invalid number of arguments for a message: %d\n", msg->argc); + return 0; + } + + return 1; +} + +typedef int (*msg_parser)(const struct msg *msg); +typedef int (*msg_handler)(struct server *, int client_fd, const struct msg *msg); + +struct msg_descr { + const char *cmd; + msg_parser parser; + msg_handler handler; +}; + +struct msg_descr messages[] = { + {"new_worker", msg_new_worker_parser, msg_new_worker_handler}, + {"ci_run", msg_ci_run_parser, msg_ci_run_handler}, +}; + +static int server_msg_handler(struct server *server, int client_fd, const struct msg *request) +{ + if (request->argc == 0) + goto unknown_request; + + size_t numof_messages = sizeof(messages) / sizeof(messages[0]); + + for (size_t i = 0; i < numof_messages; ++i) { + if (strcmp(messages[i].cmd, request->argv[0])) + continue; + if (!messages[i].parser(request)) + continue; + return messages[i].handler(server, client_fd, request); + } + +unknown_request: + print_error("Received an unknown message\n"); + msg_dump(request); + struct msg response; + msg_error(&response); + return msg_send(client_fd, &response); +} + +static int server_conn_handler(int client_fd, void *_server) +{ + struct server *server = (struct server *)_server; + struct msg request; + int ret = 0; + + ret = msg_recv(client_fd, &request); + if (ret < 0) + return ret; + + ret = server_msg_handler(server, client_fd, &request); + msg_free(&request); + return ret; +} + +static int server_set_stopping(struct server *server) +{ + int ret = 0; ret = pthread_mutex_lock(&server->server_mtx); if (ret) { @@ -299,11 +303,7 @@ int server_ci_run(struct server *server, const char *url, const char *rev) return ret; } - ret = ci_queue_entry_create(&entry, url, rev); - if (ret < 0) - goto unlock; - - ci_queue_push(&server->ci_queue, entry); + server->stopping = 1; ret = pthread_cond_signal(&server->server_cv); if (ret) { @@ -316,3 +316,18 @@ unlock: return ret; } + +int server_main(struct server *server) +{ + int ret = 0; + + while (!global_stop_flag) { + print_log("Waiting for new connections\n"); + + ret = tcp_server_accept(&server->tcp_server, server_conn_handler, server); + if (ret < 0) + break; + } + + return server_set_stopping(server); +} diff --git a/src/server.h b/src/server.h index 9107b4c..ebd88a1 100644 --- a/src/server.h +++ b/src/server.h @@ -3,7 +3,6 @@ #include "ci_queue.h" #include "tcp_server.h" -#include "worker_queue.h" #include <pthread.h> @@ -19,10 +18,7 @@ struct server { struct tcp_server tcp_server; - struct worker_queue worker_queue; struct ci_queue ci_queue; - - pthread_t scheduler; }; int server_create(struct server *, const struct settings *); @@ -30,7 +26,4 @@ void server_destroy(struct server *); int server_main(struct server *); -int server_new_worker(struct server *, int fd); -int server_ci_run(struct server *, const char *url, const char *rev); - #endif diff --git a/src/worker.c b/src/worker.c index 19270db..3b48636 100644 --- a/src/worker.c +++ b/src/worker.c @@ -26,18 +26,8 @@ int worker_create(struct worker *worker, const struct settings *settings) goto git_shutdown; worker->fd = ret; - ret = pthread_mutex_init(&worker->task_mtx, NULL); - if (ret) { - pthread_print_errno(ret, "pthread_mutex_init"); - goto close; - } - - worker->task_active = 0; return ret; -close: - check_errno(close(worker->fd), "close"); - git_shutdown: libgit_shutdown(); @@ -48,11 +38,6 @@ void worker_destroy(struct worker *worker) { print_log("Shutting down\n"); - if (worker->task_active) { - pthread_check(pthread_join(worker->task, NULL), "pthread_join"); - worker->task_active = 0; - } - pthread_check(pthread_mutex_destroy(&worker->task_mtx), "pthread_mutex_destroy"); check_errno(close(worker->fd), "close"); libgit_shutdown(); } @@ -61,200 +46,101 @@ static int msg_send_new_worker(const struct worker *worker) { static char *argv[] = {"new_worker", NULL}; struct msg msg; - int response, ret = 0; + int ret = 0; ret = msg_from_argv(&msg, argv); if (ret < 0) return ret; - ret = msg_send_and_wait(worker->fd, &msg, &response); + ret = msg_send(worker->fd, &msg); if (ret < 0) goto free_msg; - if (response < 0) { - print_error("Failed to register\n"); - ret = response; - goto free_msg; - } - free_msg: msg_free(&msg); return ret; } -typedef int (*worker_task_body)(const struct msg *); - -static int msg_body_ci_run(const struct msg *msg) +static int msg_ci_run_do(const char *url, const char *rev, struct proc_output *result) { - const char *url = msg->argv[1]; - const char *rev = msg->argv[2]; - struct proc_output result; int ret = 0; - ret = ci_run_git_repo(url, rev, &result); + ret = ci_run_git_repo(url, rev, result); if (ret < 0) { print_error("Run failed with an error\n"); return ret; } - print_log("Process exit code: %d\n", result.ec); - print_log("Process output:\n%s", result.output); - if (!result.output || !result.output_len || result.output[result.output_len - 1] != '\n') + print_log("Process exit code: %d\n", result->ec); + print_log("Process output:\n%s", result->output); + if (!result->output || !result->output_len || + result->output[result->output_len - 1] != '\n') print_log("\n"); - free(result.output); - return ret; + return 0; } -typedef worker_task_body (*worker_msg_parser)(const struct msg *); +static int msg_ci_run_handler(struct worker *worker, const struct msg *request) +{ + struct msg response; + struct proc_output result; + int ret = 0; + + ret = msg_ci_run_do(request->argv[1], request->argv[2], &result); + if (ret < 0) + msg_error(&response); + else + msg_success(&response); + proc_output_free(&result); -static worker_task_body parse_msg_ci_run(const struct msg *msg) + return msg_send(worker->fd, &response); +} + +static int msg_ci_run_parser(const struct msg *msg) { if (msg->argc != 3) { print_error("Invalid number of arguments for a message: %d\n", msg->argc); - return NULL; + return 0; } - return msg_body_ci_run; + return 1; } -struct worker_msg_parser_it { - const char *msg; - worker_msg_parser parser; -}; +typedef int (*msg_parser)(const struct msg *msg); +typedef int (*msg_handler)(struct worker *, const struct msg *); -struct worker_msg_parser_it worker_msg_parsers[] = { - {"ci_run", parse_msg_ci_run}, +struct msg_descr { + const char *cmd; + msg_parser parser; + msg_handler handler; }; -struct worker_task_context { - struct msg *msg; - worker_task_body body; +struct msg_descr messages[] = { + {"ci_run", msg_ci_run_parser, msg_ci_run_handler}, }; -static void *worker_task_wrapper(void *_ctx) -{ - struct worker_task_context *ctx = (struct worker_task_context *)_ctx; - - ctx->body(ctx->msg); - msg_free(ctx->msg); - free(ctx->msg); - free(ctx); - return NULL; -} - -static int worker_schedule_task(struct worker *worker, const struct msg *msg, worker_task_body body) -{ - struct worker_task_context *ctx; - pthread_attr_t attr; - int ret = 0; - - ctx = malloc(sizeof(*ctx)); - if (!ctx) { - print_errno("malloc"); - return -1; - } - ctx->body = body; - - ctx->msg = msg_copy(msg); - if (!ctx->msg) { - ret = -1; - goto free_ctx; - } - - ret = pthread_attr_init(&attr); - if (ret) { - pthread_print_errno(ret, "pthread_attr_init"); - goto free_msg; - } - - ret = signal_set_thread_attr(&attr); - if (ret < 0) - goto free_attr; - - ret = pthread_create(&worker->task, NULL, worker_task_wrapper, ctx); - if (ret) { - pthread_print_errno(ret, "pthread_create"); - goto free_attr; - } - worker->task_active = 1; - pthread_check(pthread_attr_destroy(&attr), "pthread_attr_destroy"); - - return ret; - -free_attr: - pthread_check(pthread_attr_destroy(&attr), "pthread_attr_destroy"); - -free_msg: - msg_free(ctx->msg); - free(ctx->msg); - -free_ctx: - free(ctx); - - return ret; -} - -static int worker_msg_handler(struct worker *worker, const struct msg *msg) +static int worker_msg_handler(struct worker *worker, const struct msg *request) { - if (worker->task_active) { - int ret = pthread_tryjoin_np(worker->task, NULL); - switch (ret) { - case 0: - worker->task_active = 0; - break; - case EBUSY: - break; - default: - pthread_print_errno(ret, "pthread_tryjoin_np"); - return ret; - } - } - - if (worker->task_active) { - print_error("Worker is busy\n"); - return -1; - } - - if (msg->argc == 0) { - print_error("Received an empty message\n"); - return -1; - } + if (request->argc == 0) + goto unknown_request; - size_t numof_parsers = sizeof(worker_msg_parsers) / sizeof(worker_msg_parsers[0]); + size_t numof_messages = sizeof(messages) / sizeof(messages[0]); - for (size_t i = 0; i < numof_parsers; ++i) { - const struct worker_msg_parser_it *it = &worker_msg_parsers[i]; - if (strcmp(it->msg, msg->argv[0])) + for (size_t i = 0; i < numof_messages; ++i) { + if (strcmp(messages[i].cmd, request->argv[0])) continue; - - worker_task_body body = it->parser(msg); - if (!body) - return -1; - - return worker_schedule_task(worker, msg, body); - } - - return msg_dump_unknown(msg); -} - -static int worker_msg_handler_lock(const struct msg *msg, void *_worker) -{ - struct worker *worker = (struct worker *)_worker; - int ret = 0; - - ret = pthread_mutex_lock(&worker->task_mtx); - if (ret) { - pthread_print_errno(ret, "pthread_mutex_lock"); - return -1; + if (!messages[i].parser(request)) + continue; + return messages[i].handler(worker, request); } - ret = worker_msg_handler(worker, msg); - - pthread_check(pthread_mutex_unlock(&worker->task_mtx), "pthread_mutex_lock"); - - return ret; +unknown_request: + print_error("Received an unknown message\n"); + msg_dump(request); + struct msg response; + msg_error(&response); + return msg_send(worker->fd, &response); } int worker_main(struct worker *worker, int, char *[]) @@ -266,9 +152,16 @@ int worker_main(struct worker *worker, int, char *[]) return ret; while (!global_stop_flag) { + struct msg request; + print_log("Waiting for a new command\n"); - ret = msg_recv_and_handle(worker->fd, worker_msg_handler_lock, worker); + ret = msg_recv(worker->fd, &request); + if (ret < 0) + return ret; + + ret = worker_msg_handler(worker, &request); + msg_free(&request); if (ret < 0) return ret; } diff --git a/src/worker_queue.c b/src/worker_queue.c deleted file mode 100644 index c832746..0000000 --- a/src/worker_queue.c +++ /dev/null @@ -1,79 +0,0 @@ -#include "worker_queue.h" -#include "log.h" - -#include <stdlib.h> -#include <sys/queue.h> -#include <unistd.h> - -int worker_queue_entry_create(struct worker_queue_entry **entry, int fd) -{ - int newfd = dup(fd); - - if (newfd < 0) { - print_errno("malloc"); - return -1; - } - - *entry = malloc(sizeof(struct worker_queue_entry)); - if (!*entry) { - print_errno("malloc"); - goto close_newfd; - } - (*entry)->fd = newfd; - - return 0; - -close_newfd: - check_errno(close(newfd), "close"); - - return -1; -} - -void worker_queue_entry_destroy(struct worker_queue_entry *entry) -{ - check_errno(close(entry->fd), "close"); - free(entry); -} - -void worker_queue_create(struct worker_queue *queue) -{ - STAILQ_INIT(queue); -} - -void worker_queue_destroy(struct worker_queue *queue) -{ - struct worker_queue_entry *entry1, *entry2; - - entry1 = STAILQ_FIRST(queue); - while (entry1) { - entry2 = STAILQ_NEXT(entry1, entries); - worker_queue_entry_destroy(entry1); - entry1 = entry2; - } - STAILQ_INIT(queue); -} - -int worker_queue_is_empty(const struct worker_queue *queue) -{ - return STAILQ_EMPTY(queue); -} - -void worker_queue_push(struct worker_queue *queue, struct worker_queue_entry *entry) -{ - STAILQ_INSERT_TAIL(queue, entry, entries); -} - -void worker_queue_push_head(struct worker_queue *queue, struct worker_queue_entry *entry) -{ - STAILQ_INSERT_HEAD(queue, entry, entries); -} - -struct worker_queue_entry *worker_queue_pop(struct worker_queue *queue) -{ - struct worker_queue_entry *entry; - - entry = STAILQ_FIRST(queue); - STAILQ_REMOVE_HEAD(queue, entries); - - return entry; -} diff --git a/src/worker_queue.h b/src/worker_queue.h deleted file mode 100644 index d5e0bb2..0000000 --- a/src/worker_queue.h +++ /dev/null @@ -1,26 +0,0 @@ -#ifndef __WORKER_QUEUE_H__ -#define __WORKER_QUEUE_H__ - -#include <sys/queue.h> - -struct worker_queue_entry { - int fd; - STAILQ_ENTRY(worker_queue_entry) entries; -}; - -int worker_queue_entry_create(struct worker_queue_entry **, int fd); -void worker_queue_entry_destroy(struct worker_queue_entry *); - -STAILQ_HEAD(worker_queue, worker_queue_entry); - -void worker_queue_create(struct worker_queue *); -void worker_queue_destroy(struct worker_queue *); - -int worker_queue_is_empty(const struct worker_queue *); - -void worker_queue_push(struct worker_queue *, struct worker_queue_entry *); -void worker_queue_push_head(struct worker_queue *, struct worker_queue_entry *); - -struct worker_queue_entry *worker_queue_pop(struct worker_queue *); - -#endif |