aboutsummaryrefslogblamecommitdiffstatshomepage
path: root/src/tcp_server.c
blob: ca85ff20483b1d1f9eab31143f7d2a9b9116bc31 (plain) (tree)
1
2
3
4
5
6
7
8
9
10






                                                          
                       

                       

                
                   
 
                 
                    
                   
                   


                        
 














































                                                                                

  
                                   
 

                                
 

                                             
 
                                         
 

                      
 


                                                                 
 


                                                                               
                                   
                     

 

                                                                                
 

                                                                              
 


                               
 
                                              
 
                                                         

                    



                                                                               
                                         
                                      
                    
                             
 



                                                                                        
 


                    
                                                      
 
                          
                    
 
                                                                               


                                                                             
                    
                           
 
                                                                                
                  
                                                     
                                  

         
             
                                                                                     
                                   
 
                   

























                                                                                                   
 















                                                                       
 

                   
 
                                                                  
 

                                   
 



































                                                                                             
                    

                                
 
                                                                                                 
                    
                           
 
                          

                   



                                     



                   
                                                  
 


                                                    

 
                                                
 
















                                             
 
/*
 * Copyright (c) 2022 Egor Tensin <Egor.Tensin@gmail.com>
 * This file is part of the "cimple" project.
 * For details, see https://github.com/egor-tensin/cimple.
 * Distributed under the MIT License.
 */

#include "tcp_server.h"
#include "compiler.h"
#include "event_loop.h"
#include "log.h"
#include "net.h"
#include "signal.h"

#include <poll.h>
#include <pthread.h>
#include <signal.h>
#include <stdlib.h>
#include <sys/eventfd.h>
#include <sys/queue.h>
#include <unistd.h>

/*
 * This is a simple threaded TCP server implementation. Each client is handled
 * in a separate thread.
 *
 * It used to be much simpler; basically, we have two types of client
 * connections: those made by cimple-worker and cimple-client respectively.
 * cimple-server would keep track of cimple-worker threads/connections, and
 * clean them up when assigning tasks/on shutdown.
 *
 * What about cimple-client connections though? I struggled to come up with a
 * scheme that would allow cimple-server to clean them up gracefully. When
 * would it do the cleanup even? I didn't want to do it on shutdown, since
 * there would be potentially a lot of them.
 *
 * One solution is to make client threads detached. This is a common advise;
 * I really don't understand the merit of this approach though. Client threads
 * actively work on shared data, take locks, etc. Data corruption is very
 * likely after the main thread exits and all the rest are killed.
 *
 * Another approach is pre-threading; we make a number of threads beforehand
 * and handle all client connections; I view this approach as limiting in
 * principle; probably that's foolish of me.
 *
 * Finally, I cannot bring myself to do non-blocking I/O. I honestly fear the
 * amount of work it would require to maintain read buffers, etc.
 *
 * So I came up with this convoluted scheme. The TCP server adds the listening
 * socket to the event loop, as before. Each client thread makes an eventfd
 * descriptor that it writes to when it's about to finish. The eventfd
 * descriptor is added to the event loop; once it's readable, we clean up the
 * client thread quickly from the main event loop thread. The TCP server itself
 * keeps track of client threads; on shutdown, it cleans up those still working.
 *
 * I'm _really_ not sure about this approach, it seems fishy as hell; I guess,
 * we'll see.
 */

struct client {
	struct tcp_server *server;
	int conn_fd;

	int cleanup_fd;

	pid_t tid;
	pthread_t thread;

	SIMPLEQ_ENTRY(client) entries;
};

SIMPLEQ_HEAD(client_queue, client);

struct tcp_server {
	struct event_loop *loop;

	tcp_server_conn_handler conn_handler;
	void *conn_handler_arg;

	struct client_queue client_queue;

	int accept_fd;
};

static void client_destroy(struct client *client)
{
	log_debug("Cleaning up client thread %d\n", client->tid);

	SIMPLEQ_REMOVE(&client->server->client_queue, client, client, entries);
	pthread_errno_if(pthread_join(client->thread, NULL), "pthread_join");
	net_close(client->cleanup_fd);
	net_close(client->conn_fd);
	free(client);
}

static int client_destroy_handler(UNUSED struct event_loop *loop, UNUSED int fd,
                                  UNUSED short revents, void *_client)
{
	struct client *client = (struct client *)_client;
	log_debug("Client thread %d indicated that it's done\n", client->tid);

	client_destroy(client);
	return 0;
}

static void *client_thread_func(void *_client)
{
	struct client *client = (struct client *)_client;
	int ret = 0;

	client->tid = gettid();
	log_debug("New client thread thread %d has started\n", client->tid);

	/* Let the client thread handle its signals except those that should be
	 * handled in the main thread. */
	ret = signal_block_sigterms();
	if (ret < 0)
		goto cleanup;

	client->server->conn_handler(client->conn_fd, client->server->conn_handler_arg);

cleanup:
	log_errno_if(eventfd_write(client->cleanup_fd, 1), "eventfd_write");

	return NULL;
}

static int client_create_thread(struct client *client)
{
	sigset_t old_mask;
	int ret = 0;

	/* Block all signals (we'll unblock them later); the client thread will
	 * have all signals blocked initially. This allows the main thread to
	 * handle SIGINT/SIGTERM/etc. */
	ret = signal_block_all(&old_mask);
	if (ret < 0)
		return ret;

	ret = pthread_create(&client->thread, NULL, client_thread_func, client);
	if (ret) {
		pthread_errno(ret, "pthread_create");
		goto restore_mask;
	}

restore_mask:
	/* Restore the previously-enabled signals for handling in the main thread. */
	signal_set_mask(&old_mask);

	return ret;
}

static int client_create(struct tcp_server *server, int conn_fd)
{
	int ret = 0;

	struct client *client = calloc(1, sizeof(struct client));
	if (!client) {
		log_errno("calloc");
		return -1;
	}

	client->server = server;
	client->conn_fd = conn_fd;

	ret = eventfd(0, EFD_CLOEXEC);
	if (ret < 0) {
		log_errno("eventfd");
		goto free;
	}
	client->cleanup_fd = ret;

	ret = event_loop_add_once(server->loop, client->cleanup_fd, POLLIN, client_destroy_handler,
	                          client);
	if (ret < 0)
		goto close_cleanup_fd;

	SIMPLEQ_INSERT_TAIL(&server->client_queue, client, entries);

	ret = client_create_thread(client);
	if (ret < 0)
		goto remove_from_client_queue;

	return ret;

remove_from_client_queue:
	SIMPLEQ_REMOVE(&server->client_queue, client, client, entries);

close_cleanup_fd:
	net_close(client->cleanup_fd);

free:
	free(client);

	return ret;
}

static void client_queue_create(struct client_queue *client_queue)
{
	SIMPLEQ_INIT(client_queue);
}

static void client_queue_destroy(struct client_queue *client_queue)
{
	struct client *entry1 = SIMPLEQ_FIRST(client_queue);
	while (entry1) {
		struct client *entry2 = SIMPLEQ_NEXT(entry1, entries);
		client_destroy(entry1);
		entry1 = entry2;
	}
}

static int tcp_server_accept_handler(UNUSED struct event_loop *loop, UNUSED int fd,
                                     UNUSED short revents, void *_server)
{
	struct tcp_server *server = (struct tcp_server *)_server;
	return tcp_server_accept(server);
}

int tcp_server_create(struct tcp_server **_server, struct event_loop *loop, const char *port,
                      tcp_server_conn_handler conn_handler, void *conn_handler_arg)
{
	int ret = 0;

	struct tcp_server *server = calloc(1, sizeof(struct tcp_server));
	if (!server) {
		log_errno("calloc");
		return -1;
	}

	server->loop = loop;

	server->conn_handler = conn_handler;
	server->conn_handler_arg = conn_handler_arg;

	client_queue_create(&server->client_queue);

	ret = net_bind(port);
	if (ret < 0)
		goto free;
	server->accept_fd = ret;

	ret = event_loop_add(loop, server->accept_fd, POLLIN, tcp_server_accept_handler, server);
	if (ret < 0)
		goto close;

	*_server = server;
	return ret;

close:
	net_close(server->accept_fd);
free:
	free(server);

	return ret;
}

void tcp_server_destroy(struct tcp_server *server)
{
	net_close(server->accept_fd);
	client_queue_destroy(&server->client_queue);
	free(server);
}

int tcp_server_accept(struct tcp_server *server)
{
	int conn_fd = -1, ret = 0;

	ret = net_accept(server->accept_fd);
	if (ret < 0)
		return ret;
	conn_fd = ret;

	ret = client_create(server, conn_fd);
	if (ret < 0)
		goto close_conn;

	return ret;

close_conn:
	net_close(conn_fd);

	return ret;
}