aboutsummaryrefslogblamecommitdiffstatshomepage
path: root/src/server.c
blob: da9cfc3f89403b437365df857dbb28a06fb922a7 (plain) (tree)
1
2
3
4
5
6
7
8
9






                                                          
                   
                    
                     
                  
                       
                 
                
                
                
                    
                     
                      
                   

                           
                       
                         
 
                 
                    
                   
 





                                   




                                              


                                         

                               
                              
 
                                      

  
                                             
 
                                                          
                  

                                                         
         

                   
 







                                                                                            
                  

                                                        
         

                   
 



                                                                                         
 

                                                                                                   
 
                                                         
                    
 
                                  
                    
                           
 
















                                                                              
                    
                           
 

                                                                           
 

                              
                   
 
 



                                                       
 


                                                                     
 


                                  
 




                                                                                       
                                                    

                                                                                   
 
                              

       
                              
 
                   

 
                                                               
 
                                                                                           

 
                                                        
 








                                                  

 
                                                    
 
                                                                     
                                                                                                    



                                                                                 

                                     
 


                                                
 





                                                               
                      

                                                                                        
                                                             
                

                                                                                        
                                 

         
                               

 
                                              
 
                                                         

                    


                                  
 


                                                     
                                    
 

                                     
 
                                          
         
 
       
                              
 

                    

 

                                                                                 
 

                                                               

                    
                                

                           
 
                           

                                     
                                         
                    
                           









                                                    


                      


                   
                                                                                              


                                                               

                    


                                           
                    
                           
 



















                                              

                                                                                              

                                                               
                                                          

                    
                                                                             
 

                                  
 
                                                            


                           

                                                                        



                                                                       




                                     

                                                   
                                               








                                                                            



                                                              

         




                                                            
 
                                                          
                  

                                                        

         
                             
 







                                                                                               
                                         



                                        

                                                                                               


                                    
                                                   
 
                                                                                       
                    
                                          
 

                                                                  
                    

                                          

                                                                          
                                     
 

                                                                                        
                    
                                       





                                                                                     
 
                          

                   

                                               
 

                                              
 
                


                                          

                                                    









                                                       






                                                                                              
 
                   
 
 
                                          
 






                                                                                  


                                                       


                                                                                              

 
                                                      


                    
                                   
                                                     
 


                                                         

         
                 
 




                                            
/*
 * Copyright (c) 2022 Egor Tensin <Egor.Tensin@gmail.com>
 * This file is part of the "cimple" project.
 * For details, see https://github.com/egor-tensin/cimple.
 * Distributed under the MIT License.
 */

#include "server.h"
#include "command.h"
#include "compiler.h"
#include "const.h"
#include "event_loop.h"
#include "file.h"
#include "log.h"
#include "msg.h"
#include "net.h"
#include "process.h"
#include "protocol.h"
#include "run_queue.h"
#include "signal.h"
#include "storage.h"
#include "storage_sqlite.h"
#include "tcp_server.h"
#include "worker_queue.h"

#include <poll.h>
#include <pthread.h>
#include <stdlib.h>

struct server {
	pthread_mutex_t server_mtx;
	pthread_cond_t server_cv;

	int stopping;

	struct cmd_dispatcher *cmd_dispatcher;

	struct event_loop *event_loop;
	int signalfd;

	struct worker_queue worker_queue;
	struct run_queue run_queue;

	struct storage storage;

	pthread_t main_thread;

	struct tcp_server *tcp_server;
};

static int server_lock(struct server *server)
{
	int ret = pthread_mutex_lock(&server->server_mtx);
	if (ret) {
		pthread_errno(ret, "pthread_mutex_lock");
		return ret;
	}
	return ret;
}

static void server_unlock(struct server *server)
{
	pthread_errno_if(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock");
}

static int server_wait(struct server *server)
{
	int ret = pthread_cond_wait(&server->server_cv, &server->server_mtx);
	if (ret) {
		pthread_errno(ret, "pthread_cond_wait");
		return ret;
	}
	return ret;
}

static void server_notify(struct server *server)
{
	pthread_errno_if(pthread_cond_signal(&server->server_cv), "pthread_cond_signal");
}

static int server_set_stopping(UNUSED struct event_loop *loop, UNUSED int fd, UNUSED short revents,
                               void *_server)
{
	struct server *server = (struct server *)_server;
	int ret = 0;

	ret = server_lock(server);
	if (ret < 0)
		return ret;

	server->stopping = 1;

	server_notify(server);
	server_unlock(server);
	return ret;
}

static int server_has_workers(const struct server *server)
{
	return !worker_queue_is_empty(&server->worker_queue);
}

static int server_enqueue_worker(struct server *server, struct worker *worker)
{
	int ret = 0;

	ret = server_lock(server);
	if (ret < 0)
		return ret;

	worker_queue_add_last(&server->worker_queue, worker);
	log("Added a new worker %d to the queue\n", worker_get_fd(worker));

	server_notify(server);
	server_unlock(server);
	return ret;
}

static int server_has_runs(const struct server *server)
{
	return !run_queue_is_empty(&server->run_queue);
}

static int server_enqueue_run(struct server *server, struct run *run)
{
	int ret = 0;

	ret = server_lock(server);
	if (ret < 0)
		return ret;

	ret = storage_run_create(&server->storage, run_get_url(run), run_get_rev(run));
	if (ret < 0)
		goto unlock;
	run_set_id(run, ret);

	run_queue_add_last(&server->run_queue, run);
	log("Added a new run %d for repository %s to the queue\n", run_get_id(run),
	    run_get_url(run));

	server_notify(server);

unlock:
	server_unlock(server);

	return ret;
}

static int server_ready_for_action(const struct server *server)
{
	return server->stopping || (server_has_runs(server) && server_has_workers(server));
}

static int server_wait_for_action(struct server *server)
{
	int ret = 0;

	while (!server_ready_for_action(server)) {
		ret = server_wait(server);
		if (ret < 0)
			return ret;
	}

	return ret;
}

static void server_assign_run(struct server *server)
{
	struct run *run = run_queue_remove_first(&server->run_queue);
	log("Removed run %d for repository %s from the queue\n", run_get_id(run), run_get_url(run));

	struct worker *worker = worker_queue_remove_first(&server->worker_queue);
	log("Removed worker %d from the queue\n", worker_get_fd(worker));

	struct msg *start_msg = NULL;
	int ret = 0;

	ret = msg_start_create(&start_msg, run);
	if (ret < 0)
		goto exit;

	ret = msg_talk(worker_get_fd(worker), start_msg, NULL);
	msg_free(start_msg);
	if (ret < 0)
		goto exit;

exit:
	if (ret < 0) {
		log("Failed to assign run for repository %s to worker %d, requeueing\n",
		    run_get_url(run), worker_get_fd(worker));
		run_queue_add_first(&server->run_queue, run);
	} else {
		log("Assigned run %d for repository %s to worker %d\n", run_get_id(run),
		    run_get_url(run), worker_get_fd(worker));
		run_destroy(run);
	}

	worker_destroy(worker);
}

static void *server_main_thread(void *_server)
{
	struct server *server = (struct server *)_server;
	int ret = 0;

	ret = server_lock(server);
	if (ret < 0)
		goto exit;

	while (1) {
		ret = server_wait_for_action(server);
		if (ret < 0)
			goto unlock;

		if (server->stopping)
			goto unlock;

		server_assign_run(server);
	}

unlock:
	server_unlock(server);

exit:
	return NULL;
}

static int server_handle_cmd_new_worker(UNUSED const struct msg *request,
                                        UNUSED struct msg **response, void *_ctx)
{
	struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx;
	struct server *server = (struct server *)ctx->arg;
	int ret = 0;

	ret = file_dup(ctx->fd);
	if (ret < 0)
		return ret;

	const int fd = ret;
	struct worker *worker = NULL;

	ret = worker_create(&worker, fd);
	if (ret < 0)
		goto close;

	ret = server_enqueue_worker(server, worker);
	if (ret < 0)
		goto destroy_worker;

	return ret;

destroy_worker:
	worker_destroy(worker);

close:
	net_close(fd);

	return ret;
}

static int server_handle_cmd_run(const struct msg *request, struct msg **response, void *_ctx)
{
	struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx;
	struct server *server = (struct server *)ctx->arg;
	int ret = 0;

	struct run *run = NULL;

	ret = msg_run_parse(request, &run);
	if (ret < 0)
		return ret;

	ret = msg_success(response);
	if (ret < 0)
		goto destroy_run;

	ret = server_enqueue_run(server, run);
	if (ret < 0)
		goto free_response;

	return ret;

free_response:
	msg_free(*response);
	*response = NULL;

destroy_run:
	run_destroy(run);

	return ret;
}

static int server_handle_cmd_finished(const struct msg *request, UNUSED struct msg **response,
                                      void *_ctx)
{
	struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx;
	struct server *server = (struct server *)ctx->arg;
	int ret = 0;

	log("Received a \"run finished\" message from worker %d\n", ctx->fd);

	int run_id;
	struct proc_output output;

	ret = msg_finished_parse(request, &run_id, &output);
	if (ret < 0)
		return ret;

	ret = storage_run_finished(&server->storage, run_id, output.ec);
	proc_output_free(&output);
	if (ret < 0) {
		log_err("Failed to mark run %d as finished\n", run_id);
		return ret;
	}

	return ret;
}

static struct cmd_desc commands[] = {
    {CMD_NEW_WORKER, server_handle_cmd_new_worker},
    {CMD_RUN, server_handle_cmd_run},
    {CMD_FINISHED, server_handle_cmd_finished},
};

static const size_t numof_commands = sizeof(commands) / sizeof(commands[0]);

int server_create(struct server **_server, const struct settings *settings)
{
	struct storage_settings storage_settings;
	int ret = 0;

	struct server *server = malloc(sizeof(struct server));
	if (!server) {
		log_errno("malloc");
		return -1;
	}

	ret = pthread_mutex_init(&server->server_mtx, NULL);
	if (ret) {
		pthread_errno(ret, "pthread_mutex_init");
		goto free;
	}

	ret = pthread_cond_init(&server->server_cv, NULL);
	if (ret) {
		pthread_errno(ret, "pthread_cond_init");
		goto destroy_mtx;
	}

	server->stopping = 0;

	ret = cmd_dispatcher_create(&server->cmd_dispatcher, commands, numof_commands, server);
	if (ret < 0)
		goto destroy_cv;

	ret = event_loop_create(&server->event_loop);
	if (ret < 0)
		goto destroy_cmd_dispatcher;

	ret = signalfd_create_sigterms();
	if (ret < 0)
		goto destroy_event_loop;
	server->signalfd = ret;

	ret = event_loop_add(server->event_loop, server->signalfd, POLLIN, server_set_stopping,
	                     server);
	if (ret < 0)
		goto close_signalfd;

	worker_queue_create(&server->worker_queue);

	ret = storage_sqlite_settings_create(&storage_settings, settings->sqlite_path);
	if (ret < 0)
		goto destroy_worker_queue;

	ret = storage_create(&server->storage, &storage_settings);
	storage_settings_destroy(&storage_settings);
	if (ret < 0)
		goto destroy_worker_queue;

	ret = storage_get_run_queue(&server->storage, &server->run_queue);
	if (ret < 0)
		goto destroy_storage;

	ret = tcp_server_create(&server->tcp_server, server->event_loop, settings->port,
	                        cmd_dispatcher_handle_conn, server->cmd_dispatcher);
	if (ret < 0)
		goto destroy_run_queue;

	ret = pthread_create(&server->main_thread, NULL, server_main_thread, server);
	if (ret) {
		pthread_errno(ret, "pthread_create");
		goto destroy_tcp_server;
	}

	*_server = server;
	return ret;

destroy_tcp_server:
	tcp_server_destroy(server->tcp_server);

destroy_run_queue:
	run_queue_destroy(&server->run_queue);

destroy_storage:
	storage_destroy(&server->storage);

destroy_worker_queue:
	worker_queue_destroy(&server->worker_queue);

close_signalfd:
	signalfd_destroy(server->signalfd);

destroy_event_loop:
	event_loop_destroy(server->event_loop);

destroy_cmd_dispatcher:
	cmd_dispatcher_destroy(server->cmd_dispatcher);

destroy_cv:
	pthread_errno_if(pthread_cond_destroy(&server->server_cv), "pthread_cond_destroy");

destroy_mtx:
	pthread_errno_if(pthread_mutex_destroy(&server->server_mtx), "pthread_mutex_destroy");

free:
	free(server);

	return ret;
}

void server_destroy(struct server *server)
{
	log("Shutting down\n");

	pthread_errno_if(pthread_join(server->main_thread, NULL), "pthread_join");
	tcp_server_destroy(server->tcp_server);
	storage_destroy(&server->storage);
	run_queue_destroy(&server->run_queue);
	worker_queue_destroy(&server->worker_queue);
	signalfd_destroy(server->signalfd);
	event_loop_destroy(server->event_loop);
	cmd_dispatcher_destroy(server->cmd_dispatcher);
	pthread_errno_if(pthread_cond_destroy(&server->server_cv), "pthread_cond_destroy");
	pthread_errno_if(pthread_mutex_destroy(&server->server_mtx), "pthread_mutex_destroy");
	free(server);
}

static int server_listen_thread(struct server *server)
{
	int ret = 0;

	while (!server->stopping) {
		log("Waiting for new connections\n");

		ret = event_loop_run(server->event_loop);
		if (ret < 0)
			return ret;
	}

	return 0;
}

int server_main(struct server *server)
{
	return server_listen_thread(server);
}