aboutsummaryrefslogtreecommitdiffstatshomepage
diff options
context:
space:
mode:
authorEgor Tensin <Egor.Tensin@gmail.com>2023-05-13 08:59:43 +0200
committerEgor Tensin <Egor.Tensin@gmail.com>2023-05-13 11:37:46 +0200
commitf471fbdf27462b82febe4e8db8358ab3380d2a28 (patch)
tree6c41abf4e32214cd8bdb0f8a377f93b3c7c0d830
parentci_queue: fix a broken getter (diff)
downloadcimple-f471fbdf27462b82febe4e8db8358ab3380d2a28.tar.gz
cimple-f471fbdf27462b82febe4e8db8358ab3380d2a28.zip
add command module to handle request-response communications
-rw-r--r--src/CMakeLists.txt2
-rw-r--r--src/client.c19
-rw-r--r--src/client.h2
-rw-r--r--src/client_main.c2
-rw-r--r--src/command.c157
-rw-r--r--src/command.h35
-rw-r--r--src/msg.c143
-rw-r--r--src/msg.h24
-rw-r--r--src/server.c129
-rw-r--r--src/worker.c106
10 files changed, 395 insertions, 224 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 6f2cc6b..545fd82 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -36,6 +36,7 @@ generate_sql_header(sqlite)
add_my_executable(server server_main.c server.c
ci_queue.c
cmd_line.c
+ command.c
file.c
msg.c
net.c
@@ -57,6 +58,7 @@ add_my_executable(client client_main.c client.c
add_my_executable(worker worker_main.c worker.c
ci.c
cmd_line.c
+ command.c
file.c
git.c
msg.c
diff --git a/src/client.c b/src/client.c
index 739c612..14eab14 100644
--- a/src/client.c
+++ b/src/client.c
@@ -47,24 +47,31 @@ void client_destroy(struct client *client)
free(client);
}
-int client_main(const struct client *client, int argc, char *argv[])
+int client_main(const struct client *client, const char **argv)
{
- struct msg request = {argc, argv};
- struct msg response;
+ struct msg *request;
+ struct msg *response;
int ret = 0;
- ret = msg_send_and_wait(client->fd, &request, &response);
+ ret = msg_from_argv(&request, argv);
if (ret < 0)
return ret;
- if (msg_is_error(&response)) {
+ ret = msg_send_and_wait(client->fd, request, &response);
+ if (ret < 0)
+ goto free_request;
+
+ if (msg_is_error(response)) {
log_err("Server failed to process the request\n");
ret = -1;
goto free_response;
}
free_response:
- msg_free(&response);
+ msg_free(response);
+
+free_request:
+ msg_free(request);
return ret;
}
diff --git a/src/client.h b/src/client.h
index 141c6be..5bc3f5c 100644
--- a/src/client.h
+++ b/src/client.h
@@ -18,6 +18,6 @@ struct client;
int client_create(struct client **, const struct settings *);
void client_destroy(struct client *);
-int client_main(const struct client *, int argc, char *argv[]);
+int client_main(const struct client *, const char **argv);
#endif
diff --git a/src/client_main.c b/src/client_main.c
index a895ccf..dd22508 100644
--- a/src/client_main.c
+++ b/src/client_main.c
@@ -73,7 +73,7 @@ int main(int argc, char *argv[])
if (ret < 0)
return ret;
- ret = client_main(client, argc - optind, argv + optind);
+ ret = client_main(client, (const char **)argv + optind);
if (ret < 0)
goto destroy_client;
diff --git a/src/command.c b/src/command.c
new file mode 100644
index 0000000..b1204a9
--- /dev/null
+++ b/src/command.c
@@ -0,0 +1,157 @@
+/*
+ * Copyright (c) 2023 Egor Tensin <Egor.Tensin@gmail.com>
+ * This file is part of the "cimple" project.
+ * For details, see https://github.com/egor-tensin/cimple.
+ * Distributed under the MIT License.
+ */
+
+#include "command.h"
+#include "log.h"
+#include "msg.h"
+
+#include <stdlib.h>
+#include <string.h>
+
+struct command_dispatcher {
+ struct command_def *defs;
+ size_t numof_defs;
+ void *ctx;
+};
+
+static int copy_def(struct command_def *dest, const struct command_def *src)
+{
+ dest->name = strdup(src->name);
+ if (!dest->name) {
+ log_errno("strdup");
+ return -1;
+ }
+ dest->processor = src->processor;
+ return 0;
+}
+
+static void free_def(struct command_def *def)
+{
+ free(def->name);
+}
+
+static int copy_defs(struct command_def *dest, const struct command_def *src, size_t numof_defs)
+{
+ size_t numof_copied, i;
+ int ret = 0;
+
+ for (numof_copied = 0; numof_copied < numof_defs; ++numof_copied) {
+ ret = copy_def(&dest[numof_copied], &src[numof_copied]);
+ if (ret < 0)
+ goto free;
+ }
+
+ return 0;
+
+free:
+ for (i = 0; i < numof_copied; ++i)
+ free_def(&dest[numof_copied]);
+
+ return -1;
+}
+
+static void free_defs(struct command_def *defs, size_t numof_defs)
+{
+ size_t i;
+
+ for (i = 0; i < numof_defs; ++i)
+ free_def(&defs[i]);
+}
+
+int command_dispatcher_create(struct command_dispatcher **_dispatcher, struct command_def *defs,
+ size_t numof_defs, void *ctx)
+{
+ struct command_dispatcher *dispatcher;
+ int ret = 0;
+
+ *_dispatcher = malloc(sizeof(struct command_dispatcher));
+ if (!*_dispatcher) {
+ log_errno("malloc");
+ return -1;
+ }
+ dispatcher = *_dispatcher;
+
+ dispatcher->defs = malloc(sizeof(struct command_def) * numof_defs);
+ if (!dispatcher->defs) {
+ log_errno("malloc");
+ goto free;
+ }
+ dispatcher->numof_defs = numof_defs;
+
+ ret = copy_defs(dispatcher->defs, defs, numof_defs);
+ if (ret < 0)
+ goto free_defs;
+
+ dispatcher->ctx = ctx;
+ return 0;
+
+free_defs:
+ free(dispatcher->defs);
+
+free:
+ free(dispatcher);
+
+ return -1;
+}
+
+void command_dispatcher_destroy(struct command_dispatcher *dispatcher)
+{
+ free_defs(dispatcher->defs, dispatcher->numof_defs);
+ free(dispatcher->defs);
+ free(dispatcher);
+}
+
+int command_dispatcher_msg_handler(const struct command_dispatcher *dispatcher, int conn_fd,
+ const struct msg *request)
+{
+ struct msg *response = NULL;
+ int ret = 0;
+
+ size_t numof_words = msg_get_length(request);
+ if (numof_words == 0)
+ goto unknown;
+
+ const char **words = msg_get_words(request);
+
+ for (size_t i = 0; i < dispatcher->numof_defs; ++i) {
+ struct command_def *def = &dispatcher->defs[i];
+
+ if (strcmp(def->name, words[0]))
+ continue;
+
+ ret = def->processor(conn_fd, request, dispatcher->ctx, &response);
+ goto exit;
+ }
+
+unknown:
+ log_err("Received an unknown command\n");
+ ret = -1;
+ msg_dump(request);
+ goto exit;
+
+exit:
+ if (ret < 0 && !response)
+ msg_error(&response);
+ if (response)
+ return msg_send(conn_fd, response);
+ return ret;
+}
+
+int command_dispatcher_conn_handler(int conn_fd, void *_dispatcher)
+{
+ struct command_dispatcher *dispatcher = (struct command_dispatcher *)_dispatcher;
+ struct msg *request;
+ int ret = 0;
+
+ ret = msg_recv(conn_fd, &request);
+ if (ret < 0)
+ return ret;
+
+ ret = command_dispatcher_msg_handler(dispatcher, conn_fd, request);
+ msg_free(request);
+ return ret;
+}
diff --git a/src/command.h b/src/command.h
new file mode 100644
index 0000000..b4c3b40
--- /dev/null
+++ b/src/command.h
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2023 Egor Tensin <Egor.Tensin@gmail.com>
+ * This file is part of the "cimple" project.
+ * For details, see https://github.com/egor-tensin/cimple.
+ * Distributed under the MIT License.
+ */
+
+#ifndef __COMMAND_H__
+#define __COMMAND_H__
+
+#include "msg.h"
+
+#include <stdlib.h>
+
+typedef int (*command_processor)(int conn_fd, const struct msg *request, void *ctx,
+ struct msg **response);
+
+struct command_def {
+ char *name;
+ command_processor processor;
+};
+
+struct command_dispatcher;
+
+int command_dispatcher_create(struct command_dispatcher **, struct command_def *, size_t numof_defs,
+ void *ctx);
+void command_dispatcher_destroy(struct command_dispatcher *);
+
+int command_dispatcher_msg_handler(const struct command_dispatcher *, int conn_fd,
+ const struct msg *);
+
+/* This is supposed to be used as an argument to tcp_server_accept. */
+int command_dispatcher_conn_handler(int conn_fd, void *dispatcher);
+
+#endif
diff --git a/src/msg.c b/src/msg.c
index 0c8dfc7..18e4c25 100644
--- a/src/msg.c
+++ b/src/msg.c
@@ -13,13 +13,28 @@
#include <stdlib.h>
#include <string.h>
-int msg_success(struct msg *msg)
+struct msg {
+ size_t argc;
+ const char **argv;
+};
+
+size_t msg_get_length(const struct msg *msg)
+{
+ return msg->argc;
+}
+
+const char **msg_get_words(const struct msg *msg)
+{
+ return msg->argv;
+}
+
+int msg_success(struct msg **msg)
{
const char *argv[] = {"success", NULL};
return msg_from_argv(msg, argv);
}
-int msg_error(struct msg *msg)
+int msg_error(struct msg **msg)
{
const char *argv[] = {"error", NULL};
return msg_from_argv(msg, argv);
@@ -37,82 +52,107 @@ int msg_is_error(const struct msg *msg)
static int msg_copy_argv(struct msg *msg, const char **argv)
{
- msg->argv = calloc(msg->argc, sizeof(const char *));
+ size_t copied = 0;
+ msg->argv = calloc(msg->argc + 1, sizeof(const char *));
if (!msg->argv) {
log_errno("calloc");
return -1;
}
- for (int i = 0; i < msg->argc; ++i) {
- msg->argv[i] = strdup(argv[i]);
- if (!msg->argv[i]) {
+ for (copied = 0; copied < msg->argc; ++copied) {
+ msg->argv[copied] = strdup(argv[copied]);
+ if (!msg->argv[copied]) {
log_errno("strdup");
- goto free;
+ goto free_copied;
}
}
return 0;
-free:
- msg_free(msg);
+free_copied:
+ for (size_t i = 0; i < copied; ++i) {
+ free((char *)msg->argv[i]);
+ }
+
+ free(msg->argv);
return -1;
}
-struct msg *msg_copy(const struct msg *src)
+int msg_copy(struct msg **_dest, const struct msg *src)
{
- struct msg *dest;
+ struct msg *dest = NULL;
int ret = 0;
- dest = malloc(sizeof(*dest));
- if (!dest) {
- log_errno("calloc");
- return NULL;
+ *_dest = malloc(sizeof(struct msg));
+ if (!_dest) {
+ log_errno("malloc");
+ return -1;
}
+ dest = *_dest;
+
dest->argc = src->argc;
ret = msg_copy_argv(dest, (const char **)src->argv);
if (ret < 0)
goto free;
- return dest;
+ return 0;
free:
free(dest);
- return NULL;
+ return -1;
}
-void msg_free(const struct msg *msg)
+void msg_free(struct msg *msg)
{
- for (int i = 0; i < msg->argc; ++i)
+ for (size_t i = 0; i < msg->argc; ++i)
free((char *)msg->argv[i]);
free(msg->argv);
+ free(msg);
}
-int msg_from_argv(struct msg *msg, const char **argv)
+int msg_from_argv(struct msg **_msg, const char **argv)
{
- int argc = 0;
+ struct msg *msg = NULL;
+ int ret = 0;
+
+ *_msg = malloc(sizeof(struct msg));
+ if (!*_msg) {
+ log_errno("malloc");
+ return -1;
+ }
+ msg = *_msg;
+ msg->argc = 0;
for (const char **s = argv; *s; ++s)
- ++argc;
+ ++msg->argc;
+
+ ret = msg_copy_argv(msg, argv);
+ if (ret < 0)
+ goto free;
+
+ return 0;
+
+free:
+ free(msg);
- msg->argc = argc;
- return msg_copy_argv(msg, argv);
+ return -1;
}
static uint32_t calc_buf_size(const struct msg *msg)
{
uint32_t len = 0;
- for (int i = 0; i < msg->argc; ++i)
+ for (size_t i = 0; i < msg->argc; ++i)
len += strlen(msg->argv[i]) + 1;
return len;
}
-static int calc_argv_len(const void *buf, size_t len)
+static size_t calc_argv_len(const void *buf, size_t len)
{
- int argc = 0;
+ size_t argc = 0;
for (const char *it = buf; it < (const char *)buf + len; it += strlen(it) + 1)
++argc;
return argc;
@@ -120,7 +160,7 @@ static int calc_argv_len(const void *buf, size_t len)
static void argv_pack(char *dest, const struct msg *msg)
{
- for (int i = 0; i < msg->argc; ++i) {
+ for (size_t i = 0; i < msg->argc; ++i) {
strcpy(dest, msg->argv[i]);
dest += strlen(msg->argv[i]) + 1;
}
@@ -128,28 +168,31 @@ static void argv_pack(char *dest, const struct msg *msg)
static int argv_unpack(struct msg *msg, const char *src)
{
- msg->argv = calloc(msg->argc, sizeof(char *));
+ size_t copied = 0;
+
+ msg->argv = calloc(msg->argc + 1, sizeof(const char *));
if (!msg->argv) {
log_errno("calloc");
return -1;
}
- for (int i = 0; i < msg->argc; ++i) {
- size_t len = strlen(src);
-
- msg->argv[i] = malloc(len + 1);
- if (!msg->argv[i]) {
- log_errno("malloc");
+ for (copied = 0; copied < msg->argc; ++copied) {
+ msg->argv[copied] = strdup(src);
+ if (!msg->argv[copied]) {
+ log_errno("strdup");
goto free;
}
- strcpy(msg->argv[i], src);
- src += len + 1;
+ src += strlen(msg->argv[copied]) + 1;
}
return 0;
free:
+ for (size_t i = 0; i < copied; ++i) {
+ free((char *)msg->argv[i]);
+ }
+
msg_free(msg);
return -1;
@@ -185,7 +228,7 @@ free_data:
return ret;
}
-int msg_send_and_wait(int fd, const struct msg *request, struct msg *response)
+int msg_send_and_wait(int fd, const struct msg *request, struct msg **response)
{
int ret = 0;
@@ -200,20 +243,34 @@ int msg_send_and_wait(int fd, const struct msg *request, struct msg *response)
return ret;
}
-int msg_recv(int fd, struct msg *msg)
+int msg_recv(int fd, struct msg **_msg)
{
- struct buf *buf;
+ struct msg *msg = NULL;
+ struct buf *buf = NULL;
int ret = 0;
ret = net_recv_buf(fd, &buf);
if (ret < 0)
return ret;
+ *_msg = malloc(sizeof(struct msg));
+ if (!*_msg) {
+ log_errno("malloc");
+ ret = -1;
+ goto destroy_buf;
+ }
+ msg = *_msg;
+
msg->argc = calc_argv_len(buf_get_data(buf), buf_get_size(buf));
ret = argv_unpack(msg, buf_get_data(buf));
if (ret < 0)
- goto destroy_buf;
+ goto free_msg;
+
+ goto destroy_buf;
+
+free_msg:
+ free(msg);
destroy_buf:
buf_destroy(buf);
@@ -223,7 +280,7 @@ destroy_buf:
void msg_dump(const struct msg *msg)
{
- log("Message[%d]:\n", msg->argc);
- for (int i = 0; i < msg->argc; ++i)
+ log("Message[%zu]:\n", msg->argc);
+ for (size_t i = 0; i < msg->argc; ++i)
log("\t%s\n", msg->argv[i]);
}
diff --git a/src/msg.h b/src/msg.h
index 2260016..2f631fc 100644
--- a/src/msg.h
+++ b/src/msg.h
@@ -8,26 +8,28 @@
#ifndef __MSG_H__
#define __MSG_H__
-struct msg {
- int argc;
- char **argv;
-};
+#include <stdlib.h>
-int msg_success(struct msg *);
-int msg_error(struct msg *);
+struct msg;
+
+size_t msg_get_length(const struct msg *);
+const char **msg_get_words(const struct msg *);
+
+int msg_success(struct msg **);
+int msg_error(struct msg **);
int msg_is_success(const struct msg *);
int msg_is_error(const struct msg *);
-struct msg *msg_copy(const struct msg *);
-void msg_free(const struct msg *);
+int msg_copy(struct msg **, const struct msg *);
+void msg_free(struct msg *);
-int msg_from_argv(struct msg *, const char **argv);
+int msg_from_argv(struct msg **, const char **argv);
-int msg_recv(int fd, struct msg *);
+int msg_recv(int fd, struct msg **);
int msg_send(int fd, const struct msg *);
-int msg_send_and_wait(int fd, const struct msg *, struct msg *response);
+int msg_send_and_wait(int fd, const struct msg *, struct msg **response);
void msg_dump(const struct msg *);
diff --git a/src/server.c b/src/server.c
index 22a9a86..806da11 100644
--- a/src/server.c
+++ b/src/server.c
@@ -7,6 +7,7 @@
#include "server.h"
#include "ci_queue.h"
+#include "command.h"
#include "compiler.h"
#include "const.h"
#include "log.h"
@@ -18,7 +19,6 @@
#include <pthread.h>
#include <stdlib.h>
-#include <string.h>
struct server {
pthread_mutex_t server_mtx;
@@ -111,7 +111,7 @@ static int server_has_runs(const struct server *server)
static int worker_ci_run(int fd, const struct ci_queue_entry *ci_run)
{
- struct msg request, response;
+ struct msg *request, *response;
int ret = 0;
const char *argv[] = {CMD_CI_RUN, ci_queue_entry_get_url(ci_run),
@@ -121,21 +121,21 @@ static int worker_ci_run(int fd, const struct ci_queue_entry *ci_run)
if (ret < 0)
return ret;
- ret = msg_send_and_wait(fd, &request, &response);
- msg_free(&request);
+ ret = msg_send_and_wait(fd, request, &response);
+ msg_free(request);
if (ret < 0)
return ret;
- if (response.argc < 0) {
+ if (!msg_is_success(response)) {
log_err("Failed to schedule a CI run: worker is busy?\n");
- ret = -1;
+ msg_dump(response);
goto free_response;
}
/* TODO: handle the response. */
free_response:
- msg_free(&response);
+ msg_free(response);
return ret;
}
@@ -234,15 +234,10 @@ static int worker_thread(struct server *server, int fd)
return ret;
}
-static int msg_new_worker_handler(struct server *server, int client_fd,
- UNUSED const struct msg *request)
+static int msg_new_worker_handler(int client_fd, UNUSED const struct msg *request, void *_server,
+ UNUSED struct msg **response)
{
- return worker_thread(server, client_fd);
-}
-
-static int msg_new_worker_parser(UNUSED const struct msg *msg)
-{
- return 1;
+ return worker_thread((struct server *)_server, client_fd);
}
static int msg_ci_run_queue(struct server *server, const char *url, const char *rev)
@@ -276,87 +271,33 @@ unlock:
return ret;
}
-static int msg_ci_run_handler(struct server *server, int client_fd, const struct msg *msg)
+static int msg_ci_run_handler(UNUSED int client_fd, const struct msg *request, void *_server,
+ struct msg **response)
{
- struct msg response;
+ struct server *server = (struct server *)_server;
int ret = 0;
- ret = msg_ci_run_queue(server, msg->argv[1], msg->argv[2]);
- if (ret < 0)
- ret = msg_error(&response);
- else
- ret = msg_success(&response);
-
- if (ret < 0)
- return ret;
-
- ret = msg_send(client_fd, &response);
- msg_free(&response);
- return ret;
-}
-
-static int msg_ci_run_parser(const struct msg *msg)
-{
- if (msg->argc != 3) {
- log_err("Invalid number of arguments for a message: %d\n", msg->argc);
- return 0;
- }
-
- return 1;
-}
-
-typedef int (*msg_parser)(const struct msg *msg);
-typedef int (*msg_handler)(struct server *, int client_fd, const struct msg *msg);
-
-struct msg_descr {
- const char *cmd;
- msg_parser parser;
- msg_handler handler;
-};
-
-struct msg_descr messages[] = {
- {CMD_NEW_WORKER, msg_new_worker_parser, msg_new_worker_handler},
- {CMD_CI_RUN, msg_ci_run_parser, msg_ci_run_handler},
-};
-
-static int server_msg_handler(struct server *server, int client_fd, const struct msg *request)
-{
- if (request->argc == 0)
- goto unknown_request;
-
- size_t numof_messages = sizeof(messages) / sizeof(messages[0]);
-
- for (size_t i = 0; i < numof_messages; ++i) {
- if (strcmp(messages[i].cmd, request->argv[0]))
- continue;
- if (!messages[i].parser(request))
- continue;
- return messages[i].handler(server, client_fd, request);
+ if (msg_get_length(request) != 3) {
+ log_err("Invalid number of arguments for a message: %zu\n",
+ msg_get_length(request));
+ msg_dump(request);
+ return -1;
}
-unknown_request:
- log_err("Received an unknown message\n");
- msg_dump(request);
- struct msg response;
- msg_error(&response);
- return msg_send(client_fd, &response);
-}
-
-static int server_conn_handler(int client_fd, void *_server)
-{
- struct server *server = (struct server *)_server;
- struct msg request;
- int ret = 0;
+ const char **words = msg_get_words(request);
- ret = msg_recv(client_fd, &request);
+ ret = msg_ci_run_queue(server, words[1], words[2]);
if (ret < 0)
return ret;
- ret = server_msg_handler(server, client_fd, &request);
- msg_free(&request);
- return ret;
+ return msg_success(response);
}
+static struct command_def commands[] = {
+ {CMD_NEW_WORKER, msg_new_worker_handler},
+ {CMD_CI_RUN, msg_ci_run_handler},
+};
+
static int server_set_stopping(struct server *server)
{
int ret = 0;
@@ -383,24 +324,32 @@ unlock:
int server_main(struct server *server)
{
+ struct command_dispatcher *dispatcher = NULL;
int ret = 0;
ret = signal_install_global_handler();
if (ret < 0)
return ret;
+ ret = command_dispatcher_create(&dispatcher, commands,
+ sizeof(commands) / sizeof(commands[0]), server);
+ if (ret < 0)
+ return ret;
+
while (!global_stop_flag) {
log("Waiting for new connections\n");
- ret = tcp_server_accept(server->tcp_server, server_conn_handler, server);
+ ret = tcp_server_accept(server->tcp_server, command_dispatcher_conn_handler,
+ dispatcher);
if (ret < 0) {
- if (errno == EINVAL && global_stop_flag) {
+ if (errno == EINVAL && global_stop_flag)
ret = 0;
- break;
- }
- return ret;
+ goto dispatcher_destroy;
}
}
+dispatcher_destroy:
+ command_dispatcher_destroy(dispatcher);
+
return server_set_stopping(server);
}
diff --git a/src/worker.c b/src/worker.c
index ed3d7e5..ae9a578 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -7,6 +7,7 @@
#include "worker.h"
#include "ci.h"
+#include "command.h"
#include "compiler.h"
#include "const.h"
#include "git.h"
@@ -16,8 +17,8 @@
#include "process.h"
#include "signal.h"
+#include <pthread.h>
#include <stdlib.h>
-#include <string.h>
#include <unistd.h>
struct worker {
@@ -73,20 +74,15 @@ void worker_destroy(struct worker *worker)
static int msg_send_new_worker(const struct worker *worker)
{
static const char *argv[] = {CMD_NEW_WORKER, NULL};
- struct msg msg;
+ struct msg *msg;
int ret = 0;
ret = msg_from_argv(&msg, argv);
if (ret < 0)
return ret;
- ret = msg_send(worker->fd, &msg);
- if (ret < 0)
- goto free_msg;
-
-free_msg:
- msg_free(&msg);
-
+ ret = msg_send(worker->fd, msg);
+ msg_free(msg);
return ret;
}
@@ -109,77 +105,37 @@ static int msg_ci_run_do(const char *url, const char *rev, struct proc_output *r
return 0;
}
-static int msg_ci_run_handler(struct worker *worker, const struct msg *request)
+static int msg_ci_run_handler(UNUSED int conn_fd, const struct msg *request, UNUSED void *_worker,
+ struct msg **response)
{
- struct msg response;
struct proc_output result;
int ret = 0;
+ if (msg_get_length(request) != 3) {
+ log_err("Invalid number of arguments for a message\n");
+ msg_dump(request);
+ return -1;
+ }
+
+ const char **words = msg_get_words(request);
+
proc_output_init(&result);
- ret = msg_ci_run_do(request->argv[1], request->argv[2], &result);
+ ret = msg_ci_run_do(words[1], words[2], &result);
proc_output_free(&result);
if (ret < 0)
- ret = msg_error(&response);
- else
- ret = msg_success(&response);
-
- if (ret < 0)
return ret;
- ret = msg_send(worker->fd, &response);
- msg_free(&response);
- return ret;
-}
-
-static int msg_ci_run_parser(const struct msg *msg)
-{
- if (msg->argc != 3) {
- log_err("Invalid number of arguments for a message: %d\n", msg->argc);
- return 0;
- }
-
- return 1;
+ return msg_success(response);
}
-typedef int (*msg_parser)(const struct msg *msg);
-typedef int (*msg_handler)(struct worker *, const struct msg *);
-
-struct msg_descr {
- const char *cmd;
- msg_parser parser;
- msg_handler handler;
-};
-
-struct msg_descr messages[] = {
- {CMD_CI_RUN, msg_ci_run_parser, msg_ci_run_handler},
+static struct command_def commands[] = {
+ {CMD_CI_RUN, msg_ci_run_handler},
};
-static int worker_msg_handler(struct worker *worker, const struct msg *request)
-{
- if (request->argc == 0)
- goto unknown_request;
-
- size_t numof_messages = sizeof(messages) / sizeof(messages[0]);
-
- for (size_t i = 0; i < numof_messages; ++i) {
- if (strcmp(messages[i].cmd, request->argv[0]))
- continue;
- if (!messages[i].parser(request))
- continue;
- return messages[i].handler(worker, request);
- }
-
-unknown_request:
- log_err("Received an unknown message\n");
- msg_dump(request);
- struct msg response;
- msg_error(&response);
- return msg_send(worker->fd, &response);
-}
-
int worker_main(struct worker *worker, UNUSED int argc, UNUSED char *argv[])
{
+ struct command_dispatcher *dispatcher = NULL;
int ret = 0;
ret = signal_install_global_handler();
@@ -190,25 +146,31 @@ int worker_main(struct worker *worker, UNUSED int argc, UNUSED char *argv[])
if (ret < 0)
return ret;
+ ret = command_dispatcher_create(&dispatcher, commands,
+ sizeof(commands) / sizeof(commands[0]), worker);
+ if (ret < 0)
+ return ret;
+
while (!global_stop_flag) {
- struct msg request;
+ struct msg *request;
log("Waiting for a new command\n");
ret = msg_recv(worker->fd, &request);
if (ret < 0) {
- if (errno == EINVAL && global_stop_flag) {
+ if (errno == EINVAL && global_stop_flag)
ret = 0;
- break;
- }
- return ret;
+ goto dispatcher_destroy;
}
- ret = worker_msg_handler(worker, &request);
- msg_free(&request);
+ ret = command_dispatcher_msg_handler(dispatcher, worker->fd, request);
+ msg_free(request);
if (ret < 0)
- return ret;
+ goto dispatcher_destroy;
}
+dispatcher_destroy:
+ command_dispatcher_destroy(dispatcher);
+
return ret;
}