aboutsummaryrefslogtreecommitdiffstatshomepage
diff options
context:
space:
mode:
-rw-r--r--src/CMakeLists.txt3
-rw-r--r--src/client.c18
-rw-r--r--src/msg.c55
-rw-r--r--src/msg.h12
-rw-r--r--src/process.c5
-rw-r--r--src/process.h2
-rw-r--r--src/server.c361
-rw-r--r--src/server.h7
-rw-r--r--src/worker.c223
-rw-r--r--src/worker_queue.c79
-rw-r--r--src/worker_queue.h26
11 files changed, 303 insertions, 488 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index c75bb49..e565d78 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -17,8 +17,7 @@ add_executable(server server_main.c server.c
msg.c
net.c
signal.c
- tcp_server.c
- worker_queue.c)
+ tcp_server.c)
add_executable(client client_main.c client.c
msg.c
diff --git a/src/client.c b/src/client.c
index be46ad1..c711235 100644
--- a/src/client.c
+++ b/src/client.c
@@ -21,12 +21,22 @@ void client_destroy(const struct client *client)
int client_main(const struct client *client, int argc, char *argv[])
{
- int result, ret = 0;
- struct msg msg = {argc, argv};
+ struct msg request = {argc, argv};
+ struct msg response;
+ int ret = 0;
- ret = msg_send_and_wait(client->fd, &msg, &result);
+ ret = msg_send_and_wait(client->fd, &request, &response);
if (ret < 0)
return ret;
- return result;
+ if (msg_is_error(&response)) {
+ print_error("Server failed to process the request\n");
+ ret = -1;
+ goto free_response;
+ }
+
+free_response:
+ msg_free(&response);
+
+ return ret;
}
diff --git a/src/msg.c b/src/msg.c
index 863d141..c37cfb5 100644
--- a/src/msg.c
+++ b/src/msg.c
@@ -5,6 +5,28 @@
#include <stdlib.h>
#include <string.h>
+void msg_success(struct msg *msg)
+{
+ msg->argc = 0;
+ msg->argv = NULL;
+}
+
+void msg_error(struct msg *msg)
+{
+ msg->argc = -1;
+ msg->argv = NULL;
+}
+
+int msg_is_success(const struct msg *msg)
+{
+ return msg->argc == 0;
+}
+
+int msg_is_error(const struct msg *msg)
+{
+ return msg->argc < 0;
+}
+
static int msg_copy_argv(struct msg *msg, char **argv)
{
msg->argv = calloc(msg->argc, sizeof(char *));
@@ -147,15 +169,15 @@ free_buf:
return ret;
}
-int msg_send_and_wait(int fd, const struct msg *msg, int *result)
+int msg_send_and_wait(int fd, const struct msg *request, struct msg *response)
{
int ret = 0;
- ret = msg_send(fd, msg);
+ ret = msg_send(fd, request);
if (ret < 0)
return ret;
- ret = net_recv_static(fd, result, sizeof(*result));
+ ret = msg_recv(fd, response);
if (ret < 0)
return ret;
@@ -184,32 +206,9 @@ free_buf:
return ret;
}
-int msg_recv_and_handle(int fd, msg_handler handler, void *arg)
-{
- struct msg msg;
- int result;
- int ret = 0;
-
- ret = msg_recv(fd, &msg);
- if (ret < 0)
- return ret;
-
- result = handler(&msg, arg);
-
- ret = net_send_buf(fd, &result, sizeof(result));
- if (ret < 0)
- goto free_msg;
-
-free_msg:
- msg_free(&msg);
-
- return ret;
-}
-
-int msg_dump_unknown(const struct msg *msg)
+void msg_dump(const struct msg *msg)
{
- print_log("Received an unknown message:\n");
+ print_log("Message[%d]:\n", msg->argc);
for (int i = 0; i < msg->argc; ++i)
print_log("\t%s\n", msg->argv[i]);
- return -1;
}
diff --git a/src/msg.h b/src/msg.h
index a71b571..e60f2ce 100644
--- a/src/msg.h
+++ b/src/msg.h
@@ -6,6 +6,12 @@ struct msg {
char **argv;
};
+void msg_success(struct msg *);
+void msg_error(struct msg *);
+
+int msg_is_success(const struct msg *);
+int msg_is_error(const struct msg *);
+
struct msg *msg_copy(const struct msg *);
void msg_free(const struct msg *);
@@ -14,10 +20,8 @@ int msg_from_argv(struct msg *, char **argv);
int msg_recv(int fd, struct msg *);
int msg_send(int fd, const struct msg *);
-typedef int (*msg_handler)(const struct msg *, void *arg);
-int msg_send_and_wait(int fd, const struct msg *, int *result);
-int msg_recv_and_handle(int fd, msg_handler, void *arg);
+int msg_send_and_wait(int fd, const struct msg *, struct msg *response);
-int msg_dump_unknown(const struct msg *);
+void msg_dump(const struct msg *);
#endif
diff --git a/src/process.c b/src/process.c
index c4ff307..fa15641 100644
--- a/src/process.c
+++ b/src/process.c
@@ -113,3 +113,8 @@ close_pipe:
return ret;
}
+
+void proc_output_free(const struct proc_output *output)
+{
+ free(output->output);
+}
diff --git a/src/process.h b/src/process.h
index 455d4ec..d47536f 100644
--- a/src/process.h
+++ b/src/process.h
@@ -18,4 +18,6 @@ int proc_spawn(const char *args[], int *ec);
* In that case, you'll need to free the output. */
int proc_capture(const char *args[], struct proc_output *result);
+void proc_output_free(const struct proc_output *);
+
#endif
diff --git a/src/server.c b/src/server.c
index b5c7463..4706c71 100644
--- a/src/server.c
+++ b/src/server.c
@@ -4,100 +4,14 @@
#include "msg.h"
#include "signal.h"
#include "tcp_server.h"
-#include "worker_queue.h"
#include <pthread.h>
+#include <stdlib.h>
#include <string.h>
#include <unistd.h>
-static int server_has_runs_and_workers(const struct server *server)
-{
- return !ci_queue_is_empty(&server->ci_queue) &&
- !worker_queue_is_empty(&server->worker_queue);
-}
-
-static int server_scheduler_iteration(struct server *server)
-{
- struct worker_queue_entry *worker;
- struct ci_queue_entry *ci_run;
- struct msg msg;
- int response, ret = 0;
-
- worker = worker_queue_pop(&server->worker_queue);
- ci_run = ci_queue_pop(&server->ci_queue);
-
- char *argv[] = {"ci_run", ci_run->url, ci_run->rev, NULL};
-
- ret = msg_from_argv(&msg, argv);
- if (ret < 0)
- goto requeue_ci_run;
-
- ret = msg_send_and_wait(worker->fd, &msg, &response);
- if (ret < 0)
- goto free_msg;
-
- if (response < 0) {
- print_error("Failed to schedule a CI run\n");
- }
-
- msg_free(&msg);
-
- ci_queue_entry_destroy(ci_run);
-
- /* FIXME: Don't mark worker as free! */
- worker_queue_push_head(&server->worker_queue, worker);
-
- return 0;
-
-free_msg:
- msg_free(&msg);
-
-requeue_ci_run:
- ci_queue_push_head(&server->ci_queue, ci_run);
-
- worker_queue_push_head(&server->worker_queue, worker);
-
- return ret;
-}
-
-static void *server_scheduler(void *_server)
-{
- struct server *server = (struct server *)_server;
- int ret = 0;
-
- ret = pthread_mutex_lock(&server->server_mtx);
- if (ret) {
- pthread_print_errno(ret, "pthread_mutex_lock");
- goto exit;
- }
-
- while (1) {
- while (!server->stopping && !server_has_runs_and_workers(server)) {
- ret = pthread_cond_wait(&server->server_cv, &server->server_mtx);
- if (ret) {
- pthread_print_errno(ret, "pthread_cond_wait");
- goto unlock;
- }
- }
-
- if (server->stopping)
- goto unlock;
-
- ret = server_scheduler_iteration(server);
- if (ret < 0)
- goto unlock;
- }
-
-unlock:
- pthread_check(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock");
-
-exit:
- return NULL;
-}
-
int server_create(struct server *server, const struct settings *settings)
{
- pthread_attr_t scheduler_attr;
int ret = 0;
ret = pthread_mutex_init(&server->server_mtx, NULL);
@@ -118,40 +32,10 @@ int server_create(struct server *server, const struct settings *settings)
if (ret < 0)
goto destroy_cv;
- worker_queue_create(&server->worker_queue);
-
ci_queue_create(&server->ci_queue);
- ret = pthread_attr_init(&scheduler_attr);
- if (ret) {
- pthread_print_errno(ret, "pthread_attr_init");
- goto destroy_ci_queue;
- }
-
- ret = signal_set_thread_attr(&scheduler_attr);
- if (ret)
- goto destroy_attr;
-
- ret = pthread_create(&server->scheduler, &scheduler_attr, server_scheduler, server);
- if (ret) {
- pthread_print_errno(ret, "pthread_create");
- goto destroy_attr;
- }
-
- pthread_check(pthread_attr_destroy(&scheduler_attr), "pthread_attr_destroy");
-
return ret;
-destroy_attr:
- pthread_check(pthread_attr_destroy(&scheduler_attr), "pthread_attr_destroy");
-
-destroy_ci_queue:
- ci_queue_destroy(&server->ci_queue);
-
- worker_queue_destroy(&server->worker_queue);
-
- tcp_server_destroy(&server->tcp_server);
-
destroy_cv:
pthread_check(pthread_cond_destroy(&server->server_cv), "pthread_cond_destroy");
@@ -166,59 +50,48 @@ void server_destroy(struct server *server)
{
print_log("Shutting down\n");
- pthread_check(pthread_join(server->scheduler, NULL), "pthread_join");
ci_queue_destroy(&server->ci_queue);
- worker_queue_destroy(&server->worker_queue);
tcp_server_destroy(&server->tcp_server);
pthread_check(pthread_cond_destroy(&server->server_cv), "pthread_cond_destroy");
pthread_check(pthread_mutex_destroy(&server->server_mtx), "pthread_mutex_destroy");
}
-struct msg_context {
- struct server *server;
- int client_fd;
-};
-
-static int msg_new_worker(const struct msg *, void *_ctx)
+static int server_has_runs(const struct server *server)
{
- struct msg_context *ctx = (struct msg_context *)_ctx;
- return server_new_worker(ctx->server, ctx->client_fd);
+ return !ci_queue_is_empty(&server->ci_queue);
}
-static int msg_ci_run(const struct msg *msg, void *_ctx)
+static int worker_ci_run(int fd, const struct ci_queue_entry *ci_run)
{
- struct msg_context *ctx = (struct msg_context *)_ctx;
+ struct msg request, response;
+ int ret = 0;
- if (msg->argc != 3) {
- print_error("Invalid number of arguments for a message: %d\n", msg->argc);
- return -1;
- }
+ char *argv[] = {"ci_run", ci_run->url, ci_run->rev, NULL};
- return server_ci_run(ctx->server, msg->argv[1], msg->argv[2]);
-}
+ ret = msg_from_argv(&request, argv);
+ if (ret < 0)
+ return ret;
-static int server_msg_handler(const struct msg *msg, void *ctx)
-{
- if (msg->argc == 0) {
- print_error("Received an empty message\n");
- return -1;
+ ret = msg_send_and_wait(fd, &request, &response);
+ msg_free(&request);
+ if (ret < 0)
+ return ret;
+
+ if (response.argc < 0) {
+ print_error("Failed ot schedule a CI run: worker is busy?\n");
+ ret = -1;
+ goto free_response;
}
- if (!strcmp(msg->argv[0], "new_worker"))
- return msg_new_worker(msg, ctx);
- if (!strcmp(msg->argv[0], "ci_run"))
- return msg_ci_run(msg, ctx);
+ /* TODO: handle the response. */
- return msg_dump_unknown(msg);
-}
+free_response:
+ msg_free(&response);
-static int server_conn_handler(int fd, void *server)
-{
- struct msg_context ctx = {server, fd};
- return msg_recv_and_handle(fd, server_msg_handler, &ctx);
+ return ret;
}
-static int server_set_stopping(struct server *server)
+static int worker_dequeue_run(struct server *server, struct ci_queue_entry **ci_run)
{
int ret = 0;
@@ -228,39 +101,96 @@ static int server_set_stopping(struct server *server)
return ret;
}
- server->stopping = 1;
+ while (!server->stopping && !server_has_runs(server)) {
+ ret = pthread_cond_wait(&server->server_cv, &server->server_mtx);
+ if (ret) {
+ pthread_print_errno(ret, "pthread_cond_wait");
+ goto unlock;
+ }
+ }
- ret = pthread_cond_signal(&server->server_cv);
- if (ret) {
- pthread_print_errno(ret, "pthread_cond_signal");
+ if (server->stopping) {
+ ret = -1;
goto unlock;
}
+ *ci_run = ci_queue_pop(&server->ci_queue);
+ goto unlock;
+
unlock:
pthread_check(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock");
return ret;
}
-int server_main(struct server *server)
+static int worker_requeue_run(struct server *server, struct ci_queue_entry *ci_run)
{
int ret = 0;
- while (!global_stop_flag) {
- ret = tcp_server_accept(&server->tcp_server, server_conn_handler, server);
+ ret = pthread_mutex_lock(&server->server_mtx);
+ if (ret) {
+ pthread_print_errno(ret, "pthread_mutex_lock");
+ return ret;
+ }
+
+ ci_queue_push_head(&server->ci_queue, ci_run);
+
+ pthread_check(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock");
+
+ return ret;
+}
+
+static int worker_iteration(struct server *server, int fd)
+{
+ struct ci_queue_entry *ci_run;
+ int ret = 0;
+
+ ret = worker_dequeue_run(server, &ci_run);
+ if (ret < 0)
+ return ret;
+
+ ret = worker_ci_run(fd, ci_run);
+ if (ret < 0)
+ goto requeue_run;
+
+ ci_queue_entry_destroy(ci_run);
+ return ret;
+
+requeue_run:
+ worker_requeue_run(server, ci_run);
+
+ return ret;
+}
+
+static int worker_thread(struct server *server, int fd)
+{
+ int ret = 0;
+
+ while (1) {
+ ret = worker_iteration(server, fd);
if (ret < 0)
- break;
+ return ret;
}
- return server_set_stopping(server);
+ return ret;
}
-int server_new_worker(struct server *server, int fd)
+static int msg_new_worker_handler(struct server *server, int client_fd, const struct msg *)
{
- struct worker_queue_entry *entry;
+ return worker_thread(server, client_fd);
+}
+
+static int msg_new_worker_parser(const struct msg *)
+{
+ return 1;
+}
+
+static int msg_ci_run_queue(struct server *server, const char *url, const char *rev)
+{
+ struct ci_queue_entry *entry;
int ret = 0;
- print_log("Registering a new worker\n");
+ print_log("Scheduling a new CI run for repository %s\n", url);
ret = pthread_mutex_lock(&server->server_mtx);
if (ret) {
@@ -268,15 +198,16 @@ int server_new_worker(struct server *server, int fd)
return ret;
}
- ret = worker_queue_entry_create(&entry, fd);
+ ret = ci_queue_entry_create(&entry, url, rev);
if (ret < 0)
goto unlock;
- worker_queue_push(&server->worker_queue, entry);
+ ci_queue_push(&server->ci_queue, entry);
ret = pthread_cond_signal(&server->server_cv);
if (ret) {
pthread_print_errno(ret, "pthread_cond_signal");
+ ret = 0;
goto unlock;
}
@@ -286,12 +217,85 @@ unlock:
return ret;
}
-int server_ci_run(struct server *server, const char *url, const char *rev)
+static int msg_ci_run_handler(struct server *server, int client_fd, const struct msg *msg)
{
- struct ci_queue_entry *entry;
+ struct msg response;
int ret = 0;
- print_log("Scheduling a new CI run for repository %s\n", url);
+ ret = msg_ci_run_queue(server, msg->argv[1], msg->argv[2]);
+ if (ret < 0)
+ msg_error(&response);
+ else
+ msg_success(&response);
+
+ return msg_send(client_fd, &response);
+}
+
+static int msg_ci_run_parser(const struct msg *msg)
+{
+ if (msg->argc != 3) {
+ print_error("Invalid number of arguments for a message: %d\n", msg->argc);
+ return 0;
+ }
+
+ return 1;
+}
+
+typedef int (*msg_parser)(const struct msg *msg);
+typedef int (*msg_handler)(struct server *, int client_fd, const struct msg *msg);
+
+struct msg_descr {
+ const char *cmd;
+ msg_parser parser;
+ msg_handler handler;
+};
+
+struct msg_descr messages[] = {
+ {"new_worker", msg_new_worker_parser, msg_new_worker_handler},
+ {"ci_run", msg_ci_run_parser, msg_ci_run_handler},
+};
+
+static int server_msg_handler(struct server *server, int client_fd, const struct msg *request)
+{
+ if (request->argc == 0)
+ goto unknown_request;
+
+ size_t numof_messages = sizeof(messages) / sizeof(messages[0]);
+
+ for (size_t i = 0; i < numof_messages; ++i) {
+ if (strcmp(messages[i].cmd, request->argv[0]))
+ continue;
+ if (!messages[i].parser(request))
+ continue;
+ return messages[i].handler(server, client_fd, request);
+ }
+
+unknown_request:
+ print_error("Received an unknown message\n");
+ msg_dump(request);
+ struct msg response;
+ msg_error(&response);
+ return msg_send(client_fd, &response);
+}
+
+static int server_conn_handler(int client_fd, void *_server)
+{
+ struct server *server = (struct server *)_server;
+ struct msg request;
+ int ret = 0;
+
+ ret = msg_recv(client_fd, &request);
+ if (ret < 0)
+ return ret;
+
+ ret = server_msg_handler(server, client_fd, &request);
+ msg_free(&request);
+ return ret;
+}
+
+static int server_set_stopping(struct server *server)
+{
+ int ret = 0;
ret = pthread_mutex_lock(&server->server_mtx);
if (ret) {
@@ -299,11 +303,7 @@ int server_ci_run(struct server *server, const char *url, const char *rev)
return ret;
}
- ret = ci_queue_entry_create(&entry, url, rev);
- if (ret < 0)
- goto unlock;
-
- ci_queue_push(&server->ci_queue, entry);
+ server->stopping = 1;
ret = pthread_cond_signal(&server->server_cv);
if (ret) {
@@ -316,3 +316,18 @@ unlock:
return ret;
}
+
+int server_main(struct server *server)
+{
+ int ret = 0;
+
+ while (!global_stop_flag) {
+ print_log("Waiting for new connections\n");
+
+ ret = tcp_server_accept(&server->tcp_server, server_conn_handler, server);
+ if (ret < 0)
+ break;
+ }
+
+ return server_set_stopping(server);
+}
diff --git a/src/server.h b/src/server.h
index 9107b4c..ebd88a1 100644
--- a/src/server.h
+++ b/src/server.h
@@ -3,7 +3,6 @@
#include "ci_queue.h"
#include "tcp_server.h"
-#include "worker_queue.h"
#include <pthread.h>
@@ -19,10 +18,7 @@ struct server {
struct tcp_server tcp_server;
- struct worker_queue worker_queue;
struct ci_queue ci_queue;
-
- pthread_t scheduler;
};
int server_create(struct server *, const struct settings *);
@@ -30,7 +26,4 @@ void server_destroy(struct server *);
int server_main(struct server *);
-int server_new_worker(struct server *, int fd);
-int server_ci_run(struct server *, const char *url, const char *rev);
-
#endif
diff --git a/src/worker.c b/src/worker.c
index 19270db..3b48636 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -26,18 +26,8 @@ int worker_create(struct worker *worker, const struct settings *settings)
goto git_shutdown;
worker->fd = ret;
- ret = pthread_mutex_init(&worker->task_mtx, NULL);
- if (ret) {
- pthread_print_errno(ret, "pthread_mutex_init");
- goto close;
- }
-
- worker->task_active = 0;
return ret;
-close:
- check_errno(close(worker->fd), "close");
-
git_shutdown:
libgit_shutdown();
@@ -48,11 +38,6 @@ void worker_destroy(struct worker *worker)
{
print_log("Shutting down\n");
- if (worker->task_active) {
- pthread_check(pthread_join(worker->task, NULL), "pthread_join");
- worker->task_active = 0;
- }
- pthread_check(pthread_mutex_destroy(&worker->task_mtx), "pthread_mutex_destroy");
check_errno(close(worker->fd), "close");
libgit_shutdown();
}
@@ -61,200 +46,101 @@ static int msg_send_new_worker(const struct worker *worker)
{
static char *argv[] = {"new_worker", NULL};
struct msg msg;
- int response, ret = 0;
+ int ret = 0;
ret = msg_from_argv(&msg, argv);
if (ret < 0)
return ret;
- ret = msg_send_and_wait(worker->fd, &msg, &response);
+ ret = msg_send(worker->fd, &msg);
if (ret < 0)
goto free_msg;
- if (response < 0) {
- print_error("Failed to register\n");
- ret = response;
- goto free_msg;
- }
-
free_msg:
msg_free(&msg);
return ret;
}
-typedef int (*worker_task_body)(const struct msg *);
-
-static int msg_body_ci_run(const struct msg *msg)
+static int msg_ci_run_do(const char *url, const char *rev, struct proc_output *result)
{
- const char *url = msg->argv[1];
- const char *rev = msg->argv[2];
- struct proc_output result;
int ret = 0;
- ret = ci_run_git_repo(url, rev, &result);
+ ret = ci_run_git_repo(url, rev, result);
if (ret < 0) {
print_error("Run failed with an error\n");
return ret;
}
- print_log("Process exit code: %d\n", result.ec);
- print_log("Process output:\n%s", result.output);
- if (!result.output || !result.output_len || result.output[result.output_len - 1] != '\n')
+ print_log("Process exit code: %d\n", result->ec);
+ print_log("Process output:\n%s", result->output);
+ if (!result->output || !result->output_len ||
+ result->output[result->output_len - 1] != '\n')
print_log("\n");
- free(result.output);
- return ret;
+ return 0;
}
-typedef worker_task_body (*worker_msg_parser)(const struct msg *);
+static int msg_ci_run_handler(struct worker *worker, const struct msg *request)
+{
+ struct msg response;
+ struct proc_output result;
+ int ret = 0;
+
+ ret = msg_ci_run_do(request->argv[1], request->argv[2], &result);
+ if (ret < 0)
+ msg_error(&response);
+ else
+ msg_success(&response);
+ proc_output_free(&result);
-static worker_task_body parse_msg_ci_run(const struct msg *msg)
+ return msg_send(worker->fd, &response);
+}
+
+static int msg_ci_run_parser(const struct msg *msg)
{
if (msg->argc != 3) {
print_error("Invalid number of arguments for a message: %d\n", msg->argc);
- return NULL;
+ return 0;
}
- return msg_body_ci_run;
+ return 1;
}
-struct worker_msg_parser_it {
- const char *msg;
- worker_msg_parser parser;
-};
+typedef int (*msg_parser)(const struct msg *msg);
+typedef int (*msg_handler)(struct worker *, const struct msg *);
-struct worker_msg_parser_it worker_msg_parsers[] = {
- {"ci_run", parse_msg_ci_run},
+struct msg_descr {
+ const char *cmd;
+ msg_parser parser;
+ msg_handler handler;
};
-struct worker_task_context {
- struct msg *msg;
- worker_task_body body;
+struct msg_descr messages[] = {
+ {"ci_run", msg_ci_run_parser, msg_ci_run_handler},
};
-static void *worker_task_wrapper(void *_ctx)
-{
- struct worker_task_context *ctx = (struct worker_task_context *)_ctx;
-
- ctx->body(ctx->msg);
- msg_free(ctx->msg);
- free(ctx->msg);
- free(ctx);
- return NULL;
-}
-
-static int worker_schedule_task(struct worker *worker, const struct msg *msg, worker_task_body body)
-{
- struct worker_task_context *ctx;
- pthread_attr_t attr;
- int ret = 0;
-
- ctx = malloc(sizeof(*ctx));
- if (!ctx) {
- print_errno("malloc");
- return -1;
- }
- ctx->body = body;
-
- ctx->msg = msg_copy(msg);
- if (!ctx->msg) {
- ret = -1;
- goto free_ctx;
- }
-
- ret = pthread_attr_init(&attr);
- if (ret) {
- pthread_print_errno(ret, "pthread_attr_init");
- goto free_msg;
- }
-
- ret = signal_set_thread_attr(&attr);
- if (ret < 0)
- goto free_attr;
-
- ret = pthread_create(&worker->task, NULL, worker_task_wrapper, ctx);
- if (ret) {
- pthread_print_errno(ret, "pthread_create");
- goto free_attr;
- }
- worker->task_active = 1;
- pthread_check(pthread_attr_destroy(&attr), "pthread_attr_destroy");
-
- return ret;
-
-free_attr:
- pthread_check(pthread_attr_destroy(&attr), "pthread_attr_destroy");
-
-free_msg:
- msg_free(ctx->msg);
- free(ctx->msg);
-
-free_ctx:
- free(ctx);
-
- return ret;
-}
-
-static int worker_msg_handler(struct worker *worker, const struct msg *msg)
+static int worker_msg_handler(struct worker *worker, const struct msg *request)
{
- if (worker->task_active) {
- int ret = pthread_tryjoin_np(worker->task, NULL);
- switch (ret) {
- case 0:
- worker->task_active = 0;
- break;
- case EBUSY:
- break;
- default:
- pthread_print_errno(ret, "pthread_tryjoin_np");
- return ret;
- }
- }
-
- if (worker->task_active) {
- print_error("Worker is busy\n");
- return -1;
- }
-
- if (msg->argc == 0) {
- print_error("Received an empty message\n");
- return -1;
- }
+ if (request->argc == 0)
+ goto unknown_request;
- size_t numof_parsers = sizeof(worker_msg_parsers) / sizeof(worker_msg_parsers[0]);
+ size_t numof_messages = sizeof(messages) / sizeof(messages[0]);
- for (size_t i = 0; i < numof_parsers; ++i) {
- const struct worker_msg_parser_it *it = &worker_msg_parsers[i];
- if (strcmp(it->msg, msg->argv[0]))
+ for (size_t i = 0; i < numof_messages; ++i) {
+ if (strcmp(messages[i].cmd, request->argv[0]))
continue;
-
- worker_task_body body = it->parser(msg);
- if (!body)
- return -1;
-
- return worker_schedule_task(worker, msg, body);
- }
-
- return msg_dump_unknown(msg);
-}
-
-static int worker_msg_handler_lock(const struct msg *msg, void *_worker)
-{
- struct worker *worker = (struct worker *)_worker;
- int ret = 0;
-
- ret = pthread_mutex_lock(&worker->task_mtx);
- if (ret) {
- pthread_print_errno(ret, "pthread_mutex_lock");
- return -1;
+ if (!messages[i].parser(request))
+ continue;
+ return messages[i].handler(worker, request);
}
- ret = worker_msg_handler(worker, msg);
-
- pthread_check(pthread_mutex_unlock(&worker->task_mtx), "pthread_mutex_lock");
-
- return ret;
+unknown_request:
+ print_error("Received an unknown message\n");
+ msg_dump(request);
+ struct msg response;
+ msg_error(&response);
+ return msg_send(worker->fd, &response);
}
int worker_main(struct worker *worker, int, char *[])
@@ -266,9 +152,16 @@ int worker_main(struct worker *worker, int, char *[])
return ret;
while (!global_stop_flag) {
+ struct msg request;
+
print_log("Waiting for a new command\n");
- ret = msg_recv_and_handle(worker->fd, worker_msg_handler_lock, worker);
+ ret = msg_recv(worker->fd, &request);
+ if (ret < 0)
+ return ret;
+
+ ret = worker_msg_handler(worker, &request);
+ msg_free(&request);
if (ret < 0)
return ret;
}
diff --git a/src/worker_queue.c b/src/worker_queue.c
deleted file mode 100644
index c832746..0000000
--- a/src/worker_queue.c
+++ /dev/null
@@ -1,79 +0,0 @@
-#include "worker_queue.h"
-#include "log.h"
-
-#include <stdlib.h>
-#include <sys/queue.h>
-#include <unistd.h>
-
-int worker_queue_entry_create(struct worker_queue_entry **entry, int fd)
-{
- int newfd = dup(fd);
-
- if (newfd < 0) {
- print_errno("malloc");
- return -1;
- }
-
- *entry = malloc(sizeof(struct worker_queue_entry));
- if (!*entry) {
- print_errno("malloc");
- goto close_newfd;
- }
- (*entry)->fd = newfd;
-
- return 0;
-
-close_newfd:
- check_errno(close(newfd), "close");
-
- return -1;
-}
-
-void worker_queue_entry_destroy(struct worker_queue_entry *entry)
-{
- check_errno(close(entry->fd), "close");
- free(entry);
-}
-
-void worker_queue_create(struct worker_queue *queue)
-{
- STAILQ_INIT(queue);
-}
-
-void worker_queue_destroy(struct worker_queue *queue)
-{
- struct worker_queue_entry *entry1, *entry2;
-
- entry1 = STAILQ_FIRST(queue);
- while (entry1) {
- entry2 = STAILQ_NEXT(entry1, entries);
- worker_queue_entry_destroy(entry1);
- entry1 = entry2;
- }
- STAILQ_INIT(queue);
-}
-
-int worker_queue_is_empty(const struct worker_queue *queue)
-{
- return STAILQ_EMPTY(queue);
-}
-
-void worker_queue_push(struct worker_queue *queue, struct worker_queue_entry *entry)
-{
- STAILQ_INSERT_TAIL(queue, entry, entries);
-}
-
-void worker_queue_push_head(struct worker_queue *queue, struct worker_queue_entry *entry)
-{
- STAILQ_INSERT_HEAD(queue, entry, entries);
-}
-
-struct worker_queue_entry *worker_queue_pop(struct worker_queue *queue)
-{
- struct worker_queue_entry *entry;
-
- entry = STAILQ_FIRST(queue);
- STAILQ_REMOVE_HEAD(queue, entries);
-
- return entry;
-}
diff --git a/src/worker_queue.h b/src/worker_queue.h
deleted file mode 100644
index d5e0bb2..0000000
--- a/src/worker_queue.h
+++ /dev/null
@@ -1,26 +0,0 @@
-#ifndef __WORKER_QUEUE_H__
-#define __WORKER_QUEUE_H__
-
-#include <sys/queue.h>
-
-struct worker_queue_entry {
- int fd;
- STAILQ_ENTRY(worker_queue_entry) entries;
-};
-
-int worker_queue_entry_create(struct worker_queue_entry **, int fd);
-void worker_queue_entry_destroy(struct worker_queue_entry *);
-
-STAILQ_HEAD(worker_queue, worker_queue_entry);
-
-void worker_queue_create(struct worker_queue *);
-void worker_queue_destroy(struct worker_queue *);
-
-int worker_queue_is_empty(const struct worker_queue *);
-
-void worker_queue_push(struct worker_queue *, struct worker_queue_entry *);
-void worker_queue_push_head(struct worker_queue *, struct worker_queue_entry *);
-
-struct worker_queue_entry *worker_queue_pop(struct worker_queue *);
-
-#endif