aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/src/worker.c
diff options
context:
space:
mode:
authorEgor Tensin <Egor.Tensin@gmail.com>2023-05-13 08:59:43 +0200
committerEgor Tensin <Egor.Tensin@gmail.com>2023-05-13 11:37:46 +0200
commitf471fbdf27462b82febe4e8db8358ab3380d2a28 (patch)
tree6c41abf4e32214cd8bdb0f8a377f93b3c7c0d830 /src/worker.c
parentci_queue: fix a broken getter (diff)
downloadcimple-f471fbdf27462b82febe4e8db8358ab3380d2a28.tar.gz
cimple-f471fbdf27462b82febe4e8db8358ab3380d2a28.zip
add command module to handle request-response communications
Diffstat (limited to '')
-rw-r--r--src/worker.c106
1 files changed, 34 insertions, 72 deletions
diff --git a/src/worker.c b/src/worker.c
index ed3d7e5..ae9a578 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -7,6 +7,7 @@
#include "worker.h"
#include "ci.h"
+#include "command.h"
#include "compiler.h"
#include "const.h"
#include "git.h"
@@ -16,8 +17,8 @@
#include "process.h"
#include "signal.h"
+#include <pthread.h>
#include <stdlib.h>
-#include <string.h>
#include <unistd.h>
struct worker {
@@ -73,20 +74,15 @@ void worker_destroy(struct worker *worker)
static int msg_send_new_worker(const struct worker *worker)
{
static const char *argv[] = {CMD_NEW_WORKER, NULL};
- struct msg msg;
+ struct msg *msg;
int ret = 0;
ret = msg_from_argv(&msg, argv);
if (ret < 0)
return ret;
- ret = msg_send(worker->fd, &msg);
- if (ret < 0)
- goto free_msg;
-
-free_msg:
- msg_free(&msg);
-
+ ret = msg_send(worker->fd, msg);
+ msg_free(msg);
return ret;
}
@@ -109,77 +105,37 @@ static int msg_ci_run_do(const char *url, const char *rev, struct proc_output *r
return 0;
}
-static int msg_ci_run_handler(struct worker *worker, const struct msg *request)
+static int msg_ci_run_handler(UNUSED int conn_fd, const struct msg *request, UNUSED void *_worker,
+ struct msg **response)
{
- struct msg response;
struct proc_output result;
int ret = 0;
+ if (msg_get_length(request) != 3) {
+ log_err("Invalid number of arguments for a message\n");
+ msg_dump(request);
+ return -1;
+ }
+
+ const char **words = msg_get_words(request);
+
proc_output_init(&result);
- ret = msg_ci_run_do(request->argv[1], request->argv[2], &result);
+ ret = msg_ci_run_do(words[1], words[2], &result);
proc_output_free(&result);
if (ret < 0)
- ret = msg_error(&response);
- else
- ret = msg_success(&response);
-
- if (ret < 0)
return ret;
- ret = msg_send(worker->fd, &response);
- msg_free(&response);
- return ret;
-}
-
-static int msg_ci_run_parser(const struct msg *msg)
-{
- if (msg->argc != 3) {
- log_err("Invalid number of arguments for a message: %d\n", msg->argc);
- return 0;
- }
-
- return 1;
+ return msg_success(response);
}
-typedef int (*msg_parser)(const struct msg *msg);
-typedef int (*msg_handler)(struct worker *, const struct msg *);
-
-struct msg_descr {
- const char *cmd;
- msg_parser parser;
- msg_handler handler;
-};
-
-struct msg_descr messages[] = {
- {CMD_CI_RUN, msg_ci_run_parser, msg_ci_run_handler},
+static struct command_def commands[] = {
+ {CMD_CI_RUN, msg_ci_run_handler},
};
-static int worker_msg_handler(struct worker *worker, const struct msg *request)
-{
- if (request->argc == 0)
- goto unknown_request;
-
- size_t numof_messages = sizeof(messages) / sizeof(messages[0]);
-
- for (size_t i = 0; i < numof_messages; ++i) {
- if (strcmp(messages[i].cmd, request->argv[0]))
- continue;
- if (!messages[i].parser(request))
- continue;
- return messages[i].handler(worker, request);
- }
-
-unknown_request:
- log_err("Received an unknown message\n");
- msg_dump(request);
- struct msg response;
- msg_error(&response);
- return msg_send(worker->fd, &response);
-}
-
int worker_main(struct worker *worker, UNUSED int argc, UNUSED char *argv[])
{
+ struct command_dispatcher *dispatcher = NULL;
int ret = 0;
ret = signal_install_global_handler();
@@ -190,25 +146,31 @@ int worker_main(struct worker *worker, UNUSED int argc, UNUSED char *argv[])
if (ret < 0)
return ret;
+ ret = command_dispatcher_create(&dispatcher, commands,
+ sizeof(commands) / sizeof(commands[0]), worker);
+ if (ret < 0)
+ return ret;
+
while (!global_stop_flag) {
- struct msg request;
+ struct msg *request;
log("Waiting for a new command\n");
ret = msg_recv(worker->fd, &request);
if (ret < 0) {
- if (errno == EINVAL && global_stop_flag) {
+ if (errno == EINVAL && global_stop_flag)
ret = 0;
- break;
- }
- return ret;
+ goto dispatcher_destroy;
}
- ret = worker_msg_handler(worker, &request);
- msg_free(&request);
+ ret = command_dispatcher_msg_handler(dispatcher, worker->fd, request);
+ msg_free(request);
if (ret < 0)
- return ret;
+ goto dispatcher_destroy;
}
+dispatcher_destroy:
+ command_dispatcher_destroy(dispatcher);
+
return ret;
}