diff options
author | Egor Tensin <Egor.Tensin@gmail.com> | 2022-08-28 15:14:07 +0200 |
---|---|---|
committer | Egor Tensin <Egor.Tensin@gmail.com> | 2022-08-28 15:14:07 +0200 |
commit | 0ff63a9ceff4c8fcd679b52cb1c03d96675f52f0 (patch) | |
tree | 2b0d1fb32b09988f652228a40508dbcef9c1032e /src/server.c | |
parent | net: use MSG_NOSIGNAL (diff) | |
download | cimple-0ff63a9ceff4c8fcd679b52cb1c03d96675f52f0.tar.gz cimple-0ff63a9ceff4c8fcd679b52cb1c03d96675f52f0.zip |
holy crap, it actually kinda works now
Previously, I had a stupid system where I would create a thread after
every accept(), and put worker descriptors in a queue. A special
"scheduler" thread would then pick them out, and give out jobs to
complete.
The problem was, of course, I couldn't conveniently poll job status from
workers. I thought about using poll(), but that turned out to be a
horribly complicated API. How do I deal with partial reads, for example?
I don't honestly know.
Then it hit me that I could just use the threads that handle accept()ed
connections as "worker threads", which would synchronously schedule jobs
and wait for them to complete. This solves every problem and removes the
need for a lot of inter-thread synchronization magic. It even works now,
holy crap! You can launch and terminate workers at will, and they will
pick up new jobs automatically.
As a side not, msg_recv_and_handle turned out to be too limiting and
complicated for me, so I got rid of that, and do normal
msg_recv/msg_send calls.
Diffstat (limited to '')
-rw-r--r-- | src/server.c | 361 |
1 files changed, 188 insertions, 173 deletions
diff --git a/src/server.c b/src/server.c index b5c7463..4706c71 100644 --- a/src/server.c +++ b/src/server.c @@ -4,100 +4,14 @@ #include "msg.h" #include "signal.h" #include "tcp_server.h" -#include "worker_queue.h" #include <pthread.h> +#include <stdlib.h> #include <string.h> #include <unistd.h> -static int server_has_runs_and_workers(const struct server *server) -{ - return !ci_queue_is_empty(&server->ci_queue) && - !worker_queue_is_empty(&server->worker_queue); -} - -static int server_scheduler_iteration(struct server *server) -{ - struct worker_queue_entry *worker; - struct ci_queue_entry *ci_run; - struct msg msg; - int response, ret = 0; - - worker = worker_queue_pop(&server->worker_queue); - ci_run = ci_queue_pop(&server->ci_queue); - - char *argv[] = {"ci_run", ci_run->url, ci_run->rev, NULL}; - - ret = msg_from_argv(&msg, argv); - if (ret < 0) - goto requeue_ci_run; - - ret = msg_send_and_wait(worker->fd, &msg, &response); - if (ret < 0) - goto free_msg; - - if (response < 0) { - print_error("Failed to schedule a CI run\n"); - } - - msg_free(&msg); - - ci_queue_entry_destroy(ci_run); - - /* FIXME: Don't mark worker as free! */ - worker_queue_push_head(&server->worker_queue, worker); - - return 0; - -free_msg: - msg_free(&msg); - -requeue_ci_run: - ci_queue_push_head(&server->ci_queue, ci_run); - - worker_queue_push_head(&server->worker_queue, worker); - - return ret; -} - -static void *server_scheduler(void *_server) -{ - struct server *server = (struct server *)_server; - int ret = 0; - - ret = pthread_mutex_lock(&server->server_mtx); - if (ret) { - pthread_print_errno(ret, "pthread_mutex_lock"); - goto exit; - } - - while (1) { - while (!server->stopping && !server_has_runs_and_workers(server)) { - ret = pthread_cond_wait(&server->server_cv, &server->server_mtx); - if (ret) { - pthread_print_errno(ret, "pthread_cond_wait"); - goto unlock; - } - } - - if (server->stopping) - goto unlock; - - ret = server_scheduler_iteration(server); - if (ret < 0) - goto unlock; - } - -unlock: - pthread_check(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock"); - -exit: - return NULL; -} - int server_create(struct server *server, const struct settings *settings) { - pthread_attr_t scheduler_attr; int ret = 0; ret = pthread_mutex_init(&server->server_mtx, NULL); @@ -118,40 +32,10 @@ int server_create(struct server *server, const struct settings *settings) if (ret < 0) goto destroy_cv; - worker_queue_create(&server->worker_queue); - ci_queue_create(&server->ci_queue); - ret = pthread_attr_init(&scheduler_attr); - if (ret) { - pthread_print_errno(ret, "pthread_attr_init"); - goto destroy_ci_queue; - } - - ret = signal_set_thread_attr(&scheduler_attr); - if (ret) - goto destroy_attr; - - ret = pthread_create(&server->scheduler, &scheduler_attr, server_scheduler, server); - if (ret) { - pthread_print_errno(ret, "pthread_create"); - goto destroy_attr; - } - - pthread_check(pthread_attr_destroy(&scheduler_attr), "pthread_attr_destroy"); - return ret; -destroy_attr: - pthread_check(pthread_attr_destroy(&scheduler_attr), "pthread_attr_destroy"); - -destroy_ci_queue: - ci_queue_destroy(&server->ci_queue); - - worker_queue_destroy(&server->worker_queue); - - tcp_server_destroy(&server->tcp_server); - destroy_cv: pthread_check(pthread_cond_destroy(&server->server_cv), "pthread_cond_destroy"); @@ -166,59 +50,48 @@ void server_destroy(struct server *server) { print_log("Shutting down\n"); - pthread_check(pthread_join(server->scheduler, NULL), "pthread_join"); ci_queue_destroy(&server->ci_queue); - worker_queue_destroy(&server->worker_queue); tcp_server_destroy(&server->tcp_server); pthread_check(pthread_cond_destroy(&server->server_cv), "pthread_cond_destroy"); pthread_check(pthread_mutex_destroy(&server->server_mtx), "pthread_mutex_destroy"); } -struct msg_context { - struct server *server; - int client_fd; -}; - -static int msg_new_worker(const struct msg *, void *_ctx) +static int server_has_runs(const struct server *server) { - struct msg_context *ctx = (struct msg_context *)_ctx; - return server_new_worker(ctx->server, ctx->client_fd); + return !ci_queue_is_empty(&server->ci_queue); } -static int msg_ci_run(const struct msg *msg, void *_ctx) +static int worker_ci_run(int fd, const struct ci_queue_entry *ci_run) { - struct msg_context *ctx = (struct msg_context *)_ctx; + struct msg request, response; + int ret = 0; - if (msg->argc != 3) { - print_error("Invalid number of arguments for a message: %d\n", msg->argc); - return -1; - } + char *argv[] = {"ci_run", ci_run->url, ci_run->rev, NULL}; - return server_ci_run(ctx->server, msg->argv[1], msg->argv[2]); -} + ret = msg_from_argv(&request, argv); + if (ret < 0) + return ret; -static int server_msg_handler(const struct msg *msg, void *ctx) -{ - if (msg->argc == 0) { - print_error("Received an empty message\n"); - return -1; + ret = msg_send_and_wait(fd, &request, &response); + msg_free(&request); + if (ret < 0) + return ret; + + if (response.argc < 0) { + print_error("Failed ot schedule a CI run: worker is busy?\n"); + ret = -1; + goto free_response; } - if (!strcmp(msg->argv[0], "new_worker")) - return msg_new_worker(msg, ctx); - if (!strcmp(msg->argv[0], "ci_run")) - return msg_ci_run(msg, ctx); + /* TODO: handle the response. */ - return msg_dump_unknown(msg); -} +free_response: + msg_free(&response); -static int server_conn_handler(int fd, void *server) -{ - struct msg_context ctx = {server, fd}; - return msg_recv_and_handle(fd, server_msg_handler, &ctx); + return ret; } -static int server_set_stopping(struct server *server) +static int worker_dequeue_run(struct server *server, struct ci_queue_entry **ci_run) { int ret = 0; @@ -228,39 +101,96 @@ static int server_set_stopping(struct server *server) return ret; } - server->stopping = 1; + while (!server->stopping && !server_has_runs(server)) { + ret = pthread_cond_wait(&server->server_cv, &server->server_mtx); + if (ret) { + pthread_print_errno(ret, "pthread_cond_wait"); + goto unlock; + } + } - ret = pthread_cond_signal(&server->server_cv); - if (ret) { - pthread_print_errno(ret, "pthread_cond_signal"); + if (server->stopping) { + ret = -1; goto unlock; } + *ci_run = ci_queue_pop(&server->ci_queue); + goto unlock; + unlock: pthread_check(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock"); return ret; } -int server_main(struct server *server) +static int worker_requeue_run(struct server *server, struct ci_queue_entry *ci_run) { int ret = 0; - while (!global_stop_flag) { - ret = tcp_server_accept(&server->tcp_server, server_conn_handler, server); + ret = pthread_mutex_lock(&server->server_mtx); + if (ret) { + pthread_print_errno(ret, "pthread_mutex_lock"); + return ret; + } + + ci_queue_push_head(&server->ci_queue, ci_run); + + pthread_check(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock"); + + return ret; +} + +static int worker_iteration(struct server *server, int fd) +{ + struct ci_queue_entry *ci_run; + int ret = 0; + + ret = worker_dequeue_run(server, &ci_run); + if (ret < 0) + return ret; + + ret = worker_ci_run(fd, ci_run); + if (ret < 0) + goto requeue_run; + + ci_queue_entry_destroy(ci_run); + return ret; + +requeue_run: + worker_requeue_run(server, ci_run); + + return ret; +} + +static int worker_thread(struct server *server, int fd) +{ + int ret = 0; + + while (1) { + ret = worker_iteration(server, fd); if (ret < 0) - break; + return ret; } - return server_set_stopping(server); + return ret; } -int server_new_worker(struct server *server, int fd) +static int msg_new_worker_handler(struct server *server, int client_fd, const struct msg *) { - struct worker_queue_entry *entry; + return worker_thread(server, client_fd); +} + +static int msg_new_worker_parser(const struct msg *) +{ + return 1; +} + +static int msg_ci_run_queue(struct server *server, const char *url, const char *rev) +{ + struct ci_queue_entry *entry; int ret = 0; - print_log("Registering a new worker\n"); + print_log("Scheduling a new CI run for repository %s\n", url); ret = pthread_mutex_lock(&server->server_mtx); if (ret) { @@ -268,15 +198,16 @@ int server_new_worker(struct server *server, int fd) return ret; } - ret = worker_queue_entry_create(&entry, fd); + ret = ci_queue_entry_create(&entry, url, rev); if (ret < 0) goto unlock; - worker_queue_push(&server->worker_queue, entry); + ci_queue_push(&server->ci_queue, entry); ret = pthread_cond_signal(&server->server_cv); if (ret) { pthread_print_errno(ret, "pthread_cond_signal"); + ret = 0; goto unlock; } @@ -286,12 +217,85 @@ unlock: return ret; } -int server_ci_run(struct server *server, const char *url, const char *rev) +static int msg_ci_run_handler(struct server *server, int client_fd, const struct msg *msg) { - struct ci_queue_entry *entry; + struct msg response; int ret = 0; - print_log("Scheduling a new CI run for repository %s\n", url); + ret = msg_ci_run_queue(server, msg->argv[1], msg->argv[2]); + if (ret < 0) + msg_error(&response); + else + msg_success(&response); + + return msg_send(client_fd, &response); +} + +static int msg_ci_run_parser(const struct msg *msg) +{ + if (msg->argc != 3) { + print_error("Invalid number of arguments for a message: %d\n", msg->argc); + return 0; + } + + return 1; +} + +typedef int (*msg_parser)(const struct msg *msg); +typedef int (*msg_handler)(struct server *, int client_fd, const struct msg *msg); + +struct msg_descr { + const char *cmd; + msg_parser parser; + msg_handler handler; +}; + +struct msg_descr messages[] = { + {"new_worker", msg_new_worker_parser, msg_new_worker_handler}, + {"ci_run", msg_ci_run_parser, msg_ci_run_handler}, +}; + +static int server_msg_handler(struct server *server, int client_fd, const struct msg *request) +{ + if (request->argc == 0) + goto unknown_request; + + size_t numof_messages = sizeof(messages) / sizeof(messages[0]); + + for (size_t i = 0; i < numof_messages; ++i) { + if (strcmp(messages[i].cmd, request->argv[0])) + continue; + if (!messages[i].parser(request)) + continue; + return messages[i].handler(server, client_fd, request); + } + +unknown_request: + print_error("Received an unknown message\n"); + msg_dump(request); + struct msg response; + msg_error(&response); + return msg_send(client_fd, &response); +} + +static int server_conn_handler(int client_fd, void *_server) +{ + struct server *server = (struct server *)_server; + struct msg request; + int ret = 0; + + ret = msg_recv(client_fd, &request); + if (ret < 0) + return ret; + + ret = server_msg_handler(server, client_fd, &request); + msg_free(&request); + return ret; +} + +static int server_set_stopping(struct server *server) +{ + int ret = 0; ret = pthread_mutex_lock(&server->server_mtx); if (ret) { @@ -299,11 +303,7 @@ int server_ci_run(struct server *server, const char *url, const char *rev) return ret; } - ret = ci_queue_entry_create(&entry, url, rev); - if (ret < 0) - goto unlock; - - ci_queue_push(&server->ci_queue, entry); + server->stopping = 1; ret = pthread_cond_signal(&server->server_cv); if (ret) { @@ -316,3 +316,18 @@ unlock: return ret; } + +int server_main(struct server *server) +{ + int ret = 0; + + while (!global_stop_flag) { + print_log("Waiting for new connections\n"); + + ret = tcp_server_accept(&server->tcp_server, server_conn_handler, server); + if (ret < 0) + break; + } + + return server_set_stopping(server); +} |