/*
* Copyright (c) 2022 Egor Tensin <Egor.Tensin@gmail.com>
* This file is part of the "cimple" project.
* For details, see https://github.com/egor-tensin/cimple.
* Distributed under the MIT License.
*/
#include "worker.h"
#include "ci.h"
#include "command.h"
#include "compiler.h"
#include "const.h"
#include "event_loop.h"
#include "git.h"
#include "log.h"
#include "msg.h"
#include "net.h"
#include "process.h"
#include "run_queue.h"
#include "signal.h"
#include <stdlib.h>
#include <string.h>
struct worker {
struct settings *settings;
int stopping;
struct cmd_dispatcher *cmd_dispatcher;
struct event_loop *event_loop;
int signalfd;
};
static struct settings *worker_settings_copy(const struct settings *src)
{
struct settings *result = malloc(sizeof(*src));
if (!result) {
log_errno("malloc");
return NULL;
}
result->host = strdup(src->host);
if (!result->host) {
log_errno("strdup");
goto free_result;
}
result->port = strdup(src->port);
if (!result->port) {
log_errno("strdup");
goto free_host;
}
return result;
free_host:
free(result->host);
free_result:
free(result);
return NULL;
}
static void worker_settings_destroy(struct settings *settings)
{
free(settings->port);
free(settings->host);
free(settings);
}
static int worker_set_stopping(UNUSED struct event_loop *loop, UNUSED int fd, UNUSED short revents,
void *_worker)
{
struct worker *worker = (struct worker *)_worker;
worker->stopping = 1;
return 0;
}
static int worker_handle_run(const struct msg *request, UNUSED struct msg **response, void *_ctx)
{
struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx;
struct run *run = NULL;
struct proc_output result;
int ret = 0;
ret = run_from_msg(&run, request);
if (ret < 0)
return ret;
proc_output_init(&result);
ret = ci_run_git_repo(run_get_url(run), run_get_rev(run), &result);
if (ret < 0) {
log_err("Run failed with an error\n");
goto free_output;
}
proc_output_dump(&result);
struct worker *worker = (struct worker *)ctx->arg;
static const char *argv[] = {CMD_COMPLETE, NULL};
ret = msg_connect_and_talk_argv(worker->settings->host, worker->settings->port, argv, NULL);
if (ret < 0)
goto free_output;
/* Close the descriptor and remove it from the event loop.
* poll(2) is too confusing, honestly. */
ret = EVENT_LOOP_REMOVE;
free_output:
proc_output_free(&result);
run_destroy(run);
return ret;
}
static struct cmd_desc commands[] = {
{CMD_RUN, worker_handle_run},
};
static const size_t numof_commands = sizeof(commands) / sizeof(commands[0]);
int worker_create(struct worker **_worker, const struct settings *settings)
{
int ret = 0;
struct worker *worker = malloc(sizeof(struct worker));
if (!worker) {
log_errno("malloc");
return -1;
}
worker->settings = worker_settings_copy(settings);
if (!worker->settings) {
ret = -1;
goto free;
}
worker->stopping = 0;
ret = cmd_dispatcher_create(&worker->cmd_dispatcher, commands, numof_commands, worker);
if (ret < 0)
goto free_settings;
ret = event_loop_create(&worker->event_loop);
if (ret < 0)
goto destroy_cmd_dispatcher;
ret = signalfd_listen_for_stops();
if (ret < 0)
goto destroy_event_loop;
worker->signalfd = ret;
ret = signalfd_add_to_event_loop(worker->signalfd, worker->event_loop, worker_set_stopping,
worker);
if (ret < 0)
goto close_signalfd;
ret = libgit_init();
if (ret < 0)
goto close_signalfd;
*_worker = worker;
return ret;
close_signalfd:
signalfd_destroy(worker->signalfd);
destroy_event_loop:
event_loop_destroy(worker->event_loop);
destroy_cmd_dispatcher:
cmd_dispatcher_destroy(worker->cmd_dispatcher);
free_settings:
worker_settings_destroy(worker->settings);
free:
free(worker);
return ret;
}
void worker_destroy(struct worker *worker)
{
log("Shutting down\n");
libgit_shutdown();
signalfd_destroy(worker->signalfd);
event_loop_destroy(worker->event_loop);
cmd_dispatcher_destroy(worker->cmd_dispatcher);
worker_settings_destroy(worker->settings);
free(worker);
}
int worker_main(struct worker *worker)
{
int ret = 0;
while (!worker->stopping) {
ret = net_connect(worker->settings->host, worker->settings->port);
if (ret < 0)
return ret;
const int fd = ret;
static const char *argv[] = {CMD_NEW_WORKER, NULL};
ret = msg_send_argv(fd, argv);
if (ret < 0)
return ret;
ret = cmd_dispatcher_add_to_event_loop(worker->cmd_dispatcher, worker->event_loop,
fd);
if (ret < 0)
return ret;
log("Waiting for a new command\n");
ret = event_loop_run(worker->event_loop);
if (ret < 0)
return ret;
}
return ret;
}