aboutsummaryrefslogblamecommitdiffstatshomepage
path: root/src/server.c
blob: 04a68479c2cb965e73501268a34559bf7651e540 (plain) (tree)
1
2
3
4
5
6
7
8
9






                                                          
                   
                    
                     
                  
                
                
                      
                   

                           
                       
 
                    
                   
 









                                      
                                   


                                                                           
 
                                                 

                    

                                                              


                                    
 

                                                            
                                                         
                          



                                                          
                                                        




                                 





                                                                                       
                    
                                
 



                                                                     
                                             
 
                          
                   
 


                                          
           
                                                                                           

            
                                                                                              
 


                     
                   

 
                                          
 
                               
 
                                              
                                               
                                          

                                                                                              
                     

 
                                                       
 
                                                       

 
                                                                      
 
                                                     
                    
 

                                                                       
 


                                            
 

                                                        


                           
                                        
                                                                          
                                   
                                   

         
                                        
 
              
                           
 
                   

 
                                                                                     




                                                      
                                                         


                           


                                                                                 
                                                                


                                    
 

                               


                            


                                                                  

                    
       
                                                                                            



                   
                                                                                    


                    

                                                      
                                                         


                           

                                                                                      
 

                                                      
                                                          




                            
                                                                                            





                                                          
                                              









                                                  
                                        













                                                       
                            
                                   
         
 
                   
 
 

                                                                                                 
 
                                                                  



                                                                                    
                                             

                    
                                                      
                  
                                                         


                           
                                                       


                            
                                                      
                                                                        
 
                                                      
                  
                                                          
                        



                            
                                                                                            



                   

                                                                                             
 
                                                         

                    




                                                                           

         
                                                    
 
                                                           


                           
                                     

 
                                     
                                             
                                  

  


                                                     
 
                                                      
                  
                                                         


                           
                             
 
                                                         
                  
                                                          



                            
                                                                                            


                   


                                      
                                                 

                    



                                              

                                                                                                  


                           
                                   
                                                     
 
                                                                                                    
                              
                                                                
                                        
                                                
                 

         
                   
                                           
 

                                           
/*
 * Copyright (c) 2022 Egor Tensin <Egor.Tensin@gmail.com>
 * This file is part of the "cimple" project.
 * For details, see https://github.com/egor-tensin/cimple.
 * Distributed under the MIT License.
 */

#include "server.h"
#include "command.h"
#include "compiler.h"
#include "const.h"
#include "log.h"
#include "msg.h"
#include "run_queue.h"
#include "signal.h"
#include "storage.h"
#include "storage_sqlite.h"
#include "tcp_server.h"

#include <pthread.h>
#include <stdlib.h>

struct server {
	pthread_mutex_t server_mtx;
	pthread_cond_t server_cv;

	int stopping;

	struct storage storage;

	struct tcp_server *tcp_server;

	struct run_queue run_queue;
};

int server_create(struct server **_server, const struct settings *settings)
{
	struct storage_settings storage_settings;
	int ret = 0;

	struct server *server = malloc(sizeof(struct server));
	if (!server) {
		log_errno("malloc");
		return -1;
	}

	ret = pthread_mutex_init(&server->server_mtx, NULL);
	if (ret) {
		pthread_errno(ret, "pthread_mutex_init");
		goto free;
	}

	ret = pthread_cond_init(&server->server_cv, NULL);
	if (ret) {
		pthread_errno(ret, "pthread_cond_init");
		goto destroy_mtx;
	}

	server->stopping = 0;

	ret = storage_settings_create_sqlite(&storage_settings, settings->sqlite_path);
	if (ret < 0)
		goto destroy_cv;

	ret = storage_create(&server->storage, &storage_settings);
	storage_settings_destroy(&storage_settings);
	if (ret < 0)
		goto destroy_cv;

	ret = tcp_server_create(&server->tcp_server, settings->port);
	if (ret < 0)
		goto destroy_storage;

	run_queue_create(&server->run_queue);

	*_server = server;
	return ret;

destroy_storage:
	storage_destroy(&server->storage);

destroy_cv:
	pthread_errno_if(pthread_cond_destroy(&server->server_cv), "pthread_cond_destroy");

destroy_mtx:
	pthread_errno_if(pthread_mutex_destroy(&server->server_mtx), "pthread_mutex_destroy");

free:
	free(server);

	return ret;
}

void server_destroy(struct server *server)
{
	log("Shutting down\n");

	run_queue_destroy(&server->run_queue);
	tcp_server_destroy(server->tcp_server);
	storage_destroy(&server->storage);
	pthread_errno_if(pthread_cond_destroy(&server->server_cv), "pthread_cond_destroy");
	pthread_errno_if(pthread_mutex_destroy(&server->server_mtx), "pthread_mutex_destroy");
	free(server);
}

static int server_has_runs(const struct server *server)
{
	return !run_queue_is_empty(&server->run_queue);
}

static int worker_ci_run(int fd, const struct run_queue_entry *ci_run)
{
	struct msg *request = NULL, *response = NULL;
	int ret = 0;

	const char *argv[] = {CMD_RUN, run_queue_entry_get_url(ci_run),
	                      run_queue_entry_get_rev(ci_run), NULL};

	ret = msg_from_argv(&request, argv);
	if (ret < 0)
		return ret;

	ret = msg_send_and_wait(fd, request, &response);
	msg_free(request);
	if (ret < 0)
		return ret;

	if (!msg_is_success(response)) {
		log_err("Failed to schedule a CI run: worker is busy?\n");
		msg_dump(response);
		goto free_response;
	}

	/* TODO: handle the response. */

free_response:
	msg_free(response);

	return ret;
}

static int worker_dequeue_run(struct server *server, struct run_queue_entry **ci_run)
{
	int ret = 0;

	ret = pthread_mutex_lock(&server->server_mtx);
	if (ret) {
		pthread_errno(ret, "pthread_mutex_lock");
		return ret;
	}

	while (!server->stopping && !server_has_runs(server)) {
		ret = pthread_cond_wait(&server->server_cv, &server->server_mtx);
		if (ret) {
			pthread_errno(ret, "pthread_cond_wait");
			goto unlock;
		}
	}

	if (server->stopping) {
		ret = -1;
		goto unlock;
	}

	*ci_run = run_queue_remove_first(&server->run_queue);
	log("Removed a CI run for repository %s from the queue\n",
	    run_queue_entry_get_url(*ci_run));
	goto unlock;

unlock:
	pthread_errno_if(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock");

	return ret;
}

static int worker_requeue_run(struct server *server, struct run_queue_entry *ci_run)
{
	int ret = 0;

	ret = pthread_mutex_lock(&server->server_mtx);
	if (ret) {
		pthread_errno(ret, "pthread_mutex_lock");
		return ret;
	}

	run_queue_add_first(&server->run_queue, ci_run);
	log("Requeued a CI run for repository %s\n", run_queue_entry_get_url(ci_run));

	ret = pthread_cond_signal(&server->server_cv);
	if (ret) {
		pthread_errno(ret, "pthread_cond_signal");
		ret = 0;
		goto unlock;
	}

unlock:
	pthread_errno_if(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock");

	return ret;
}

static int worker_iteration(struct server *server, int fd)
{
	struct run_queue_entry *ci_run = NULL;
	int ret = 0;

	ret = worker_dequeue_run(server, &ci_run);
	if (ret < 0)
		return ret;

	ret = worker_ci_run(fd, ci_run);
	if (ret < 0)
		goto requeue_run;

	run_queue_entry_destroy(ci_run);
	return ret;

requeue_run:
	worker_requeue_run(server, ci_run);

	return ret;
}

static int worker_thread(struct server *server, int fd)
{
	int ret = 0;

	while (1) {
		ret = worker_iteration(server, fd);
		if (ret < 0)
			return ret;
	}

	return ret;
}

static int msg_new_worker_handler(int client_fd, UNUSED const struct msg *request, void *_server,
                                  UNUSED struct msg **response)
{
	return worker_thread((struct server *)_server, client_fd);
}

static int msg_ci_run_queue(struct server *server, const char *url, const char *rev)
{
	struct run_queue_entry *entry = NULL;
	int ret = 0;

	ret = pthread_mutex_lock(&server->server_mtx);
	if (ret) {
		pthread_errno(ret, "pthread_mutex_lock");
		return ret;
	}

	ret = run_queue_entry_create(&entry, url, rev);
	if (ret < 0)
		goto unlock;

	run_queue_add_last(&server->run_queue, entry);
	log("Added a new CI run for repository %s to the queue\n", url);

	ret = pthread_cond_signal(&server->server_cv);
	if (ret) {
		pthread_errno(ret, "pthread_cond_signal");
		ret = 0;
		goto unlock;
	}

unlock:
	pthread_errno_if(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock");

	return ret;
}

static int msg_ci_run_handler(UNUSED int client_fd, const struct msg *request, void *_server,
                              struct msg **response)
{
	struct server *server = (struct server *)_server;
	int ret = 0;

	if (msg_get_length(request) != 3) {
		log_err("Invalid number of arguments for a message: %zu\n",
		        msg_get_length(request));
		msg_dump(request);
		return -1;
	}

	const char **words = msg_get_words(request);

	ret = msg_ci_run_queue(server, words[1], words[2]);
	if (ret < 0)
		return ret;

	return msg_success(response);
}

static struct cmd_desc commands[] = {
    {CMD_NEW_WORKER, msg_new_worker_handler},
    {CMD_RUN, msg_ci_run_handler},
};

static int server_set_stopping(struct server *server)
{
	int ret = 0;

	ret = pthread_mutex_lock(&server->server_mtx);
	if (ret) {
		pthread_errno(ret, "pthread_mutex_lock");
		return ret;
	}

	server->stopping = 1;

	ret = pthread_cond_broadcast(&server->server_cv);
	if (ret) {
		pthread_errno(ret, "pthread_cond_signal");
		goto unlock;
	}

unlock:
	pthread_errno_if(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock");

	return ret;
}

int server_main(struct server *server)
{
	struct cmd_dispatcher *dispatcher = NULL;
	int ret = 0;

	ret = signal_install_global_handler();
	if (ret < 0)
		return ret;

	ret = cmd_dispatcher_create(&dispatcher, commands, sizeof(commands) / sizeof(commands[0]),
	                            server);
	if (ret < 0)
		return ret;

	while (!global_stop_flag) {
		log("Waiting for new connections\n");

		ret = tcp_server_accept(server->tcp_server, cmd_dispatcher_handle_conn, dispatcher);
		if (ret < 0) {
			if (errno == EINVAL && global_stop_flag)
				ret = 0;
			goto dispatcher_destroy;
		}
	}

dispatcher_destroy:
	cmd_dispatcher_destroy(dispatcher);

	return server_set_stopping(server);
}