aboutsummaryrefslogblamecommitdiffstatshomepage
path: root/src/server.c
blob: 4706c71a77e9e1d7e2457b33f1f6262769f7b9a4 (plain) (tree)
1
2
3
4
5
6
7
8
9
                   

                     
                
                   
                       
 
                    
                   
                   



                                                                         

                    













                                                               

                                                                     
                                
 

                                           
                   
 






                                                                                           
                   

 
                                          
 

                                     
                                            
                                                

                                                                                           

 
                                                       
 
                                                     

 
                                                                     
 

                                     
 
                                                                  
 


                                            
 








                                                                              

         
                                        
 

                            
 
                   

 
                                                                                    








                                                               






                                                                                 
 

                               


                            


                                                  





                                                                                         
                                                                                   


                    








































                                                                                         
                            
                                   
         
 
                   
 
 
                                                                                           
 










                                                                                    

                    
                                                                      
 
                                                      

                                                               


                           
                                                      


                            
                                                
 
                                                      

                                                                
                        



                            
                                                                                         



                   
                                                                                          
 
                            

                    









































































                                                                                              
 
                                                      

                                                               


                           
                             
 
                                                      

                                                                



                            
                                                                                         


                   














                                                                                          
#include "server.h"
#include "ci_queue.h"
#include "log.h"
#include "msg.h"
#include "signal.h"
#include "tcp_server.h"

#include <pthread.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

int server_create(struct server *server, const struct settings *settings)
{
	int ret = 0;

	ret = pthread_mutex_init(&server->server_mtx, NULL);
	if (ret) {
		pthread_print_errno(ret, "pthread_mutex_init");
		goto fail;
	}

	ret = pthread_cond_init(&server->server_cv, NULL);
	if (ret) {
		pthread_print_errno(ret, "pthread_cond_init");
		goto destroy_mtx;
	}

	server->stopping = 0;

	ret = tcp_server_create(&server->tcp_server, settings->port);
	if (ret < 0)
		goto destroy_cv;

	ci_queue_create(&server->ci_queue);

	return ret;

destroy_cv:
	pthread_check(pthread_cond_destroy(&server->server_cv), "pthread_cond_destroy");

destroy_mtx:
	pthread_check(pthread_mutex_destroy(&server->server_mtx), "pthread_mutex_destroy");

fail:
	return ret;
}

void server_destroy(struct server *server)
{
	print_log("Shutting down\n");

	ci_queue_destroy(&server->ci_queue);
	tcp_server_destroy(&server->tcp_server);
	pthread_check(pthread_cond_destroy(&server->server_cv), "pthread_cond_destroy");
	pthread_check(pthread_mutex_destroy(&server->server_mtx), "pthread_mutex_destroy");
}

static int server_has_runs(const struct server *server)
{
	return !ci_queue_is_empty(&server->ci_queue);
}

static int worker_ci_run(int fd, const struct ci_queue_entry *ci_run)
{
	struct msg request, response;
	int ret = 0;

	char *argv[] = {"ci_run", ci_run->url, ci_run->rev, NULL};

	ret = msg_from_argv(&request, argv);
	if (ret < 0)
		return ret;

	ret = msg_send_and_wait(fd, &request, &response);
	msg_free(&request);
	if (ret < 0)
		return ret;

	if (response.argc < 0) {
		print_error("Failed ot schedule a CI run: worker is busy?\n");
		ret = -1;
		goto free_response;
	}

	/* TODO: handle the response. */

free_response:
	msg_free(&response);

	return ret;
}

static int worker_dequeue_run(struct server *server, struct ci_queue_entry **ci_run)
{
	int ret = 0;

	ret = pthread_mutex_lock(&server->server_mtx);
	if (ret) {
		pthread_print_errno(ret, "pthread_mutex_lock");
		return ret;
	}

	while (!server->stopping && !server_has_runs(server)) {
		ret = pthread_cond_wait(&server->server_cv, &server->server_mtx);
		if (ret) {
			pthread_print_errno(ret, "pthread_cond_wait");
			goto unlock;
		}
	}

	if (server->stopping) {
		ret = -1;
		goto unlock;
	}

	*ci_run = ci_queue_pop(&server->ci_queue);
	goto unlock;

unlock:
	pthread_check(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock");

	return ret;
}

static int worker_requeue_run(struct server *server, struct ci_queue_entry *ci_run)
{
	int ret = 0;

	ret = pthread_mutex_lock(&server->server_mtx);
	if (ret) {
		pthread_print_errno(ret, "pthread_mutex_lock");
		return ret;
	}

	ci_queue_push_head(&server->ci_queue, ci_run);

	pthread_check(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock");

	return ret;
}

static int worker_iteration(struct server *server, int fd)
{
	struct ci_queue_entry *ci_run;
	int ret = 0;

	ret = worker_dequeue_run(server, &ci_run);
	if (ret < 0)
		return ret;

	ret = worker_ci_run(fd, ci_run);
	if (ret < 0)
		goto requeue_run;

	ci_queue_entry_destroy(ci_run);
	return ret;

requeue_run:
	worker_requeue_run(server, ci_run);

	return ret;
}

static int worker_thread(struct server *server, int fd)
{
	int ret = 0;

	while (1) {
		ret = worker_iteration(server, fd);
		if (ret < 0)
			return ret;
	}

	return ret;
}

static int msg_new_worker_handler(struct server *server, int client_fd, const struct msg *)
{
	return worker_thread(server, client_fd);
}

static int msg_new_worker_parser(const struct msg *)
{
	return 1;
}

static int msg_ci_run_queue(struct server *server, const char *url, const char *rev)
{
	struct ci_queue_entry *entry;
	int ret = 0;

	print_log("Scheduling a new CI run for repository %s\n", url);

	ret = pthread_mutex_lock(&server->server_mtx);
	if (ret) {
		pthread_print_errno(ret, "pthread_mutex_lock");
		return ret;
	}

	ret = ci_queue_entry_create(&entry, url, rev);
	if (ret < 0)
		goto unlock;

	ci_queue_push(&server->ci_queue, entry);

	ret = pthread_cond_signal(&server->server_cv);
	if (ret) {
		pthread_print_errno(ret, "pthread_cond_signal");
		ret = 0;
		goto unlock;
	}

unlock:
	pthread_check(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock");

	return ret;
}

static int msg_ci_run_handler(struct server *server, int client_fd, const struct msg *msg)
{
	struct msg response;
	int ret = 0;

	ret = msg_ci_run_queue(server, msg->argv[1], msg->argv[2]);
	if (ret < 0)
		msg_error(&response);
	else
		msg_success(&response);

	return msg_send(client_fd, &response);
}

static int msg_ci_run_parser(const struct msg *msg)
{
	if (msg->argc != 3) {
		print_error("Invalid number of arguments for a message: %d\n", msg->argc);
		return 0;
	}

	return 1;
}

typedef int (*msg_parser)(const struct msg *msg);
typedef int (*msg_handler)(struct server *, int client_fd, const struct msg *msg);

struct msg_descr {
	const char *cmd;
	msg_parser parser;
	msg_handler handler;
};

struct msg_descr messages[] = {
    {"new_worker", msg_new_worker_parser, msg_new_worker_handler},
    {"ci_run", msg_ci_run_parser, msg_ci_run_handler},
};

static int server_msg_handler(struct server *server, int client_fd, const struct msg *request)
{
	if (request->argc == 0)
		goto unknown_request;

	size_t numof_messages = sizeof(messages) / sizeof(messages[0]);

	for (size_t i = 0; i < numof_messages; ++i) {
		if (strcmp(messages[i].cmd, request->argv[0]))
			continue;
		if (!messages[i].parser(request))
			continue;
		return messages[i].handler(server, client_fd, request);
	}

unknown_request:
	print_error("Received an unknown message\n");
	msg_dump(request);
	struct msg response;
	msg_error(&response);
	return msg_send(client_fd, &response);
}

static int server_conn_handler(int client_fd, void *_server)
{
	struct server *server = (struct server *)_server;
	struct msg request;
	int ret = 0;

	ret = msg_recv(client_fd, &request);
	if (ret < 0)
		return ret;

	ret = server_msg_handler(server, client_fd, &request);
	msg_free(&request);
	return ret;
}

static int server_set_stopping(struct server *server)
{
	int ret = 0;

	ret = pthread_mutex_lock(&server->server_mtx);
	if (ret) {
		pthread_print_errno(ret, "pthread_mutex_lock");
		return ret;
	}

	server->stopping = 1;

	ret = pthread_cond_signal(&server->server_cv);
	if (ret) {
		pthread_print_errno(ret, "pthread_cond_signal");
		goto unlock;
	}

unlock:
	pthread_check(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock");

	return ret;
}

int server_main(struct server *server)
{
	int ret = 0;

	while (!global_stop_flag) {
		print_log("Waiting for new connections\n");

		ret = tcp_server_accept(&server->tcp_server, server_conn_handler, server);
		if (ret < 0)
			break;
	}

	return server_set_stopping(server);
}