1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
|
/*
* Copyright (c) 2022 Egor Tensin <Egor.Tensin@gmail.com>
* This file is part of the "cimple" project.
* For details, see https://github.com/egor-tensin/cimple.
* Distributed under the MIT License.
*/
#include "tcp_server.h"
#include "compiler.h"
#include "event_loop.h"
#include "log.h"
#include "net.h"
#include "signal.h"
#include <poll.h>
#include <pthread.h>
#include <signal.h>
#include <stdlib.h>
#include <sys/eventfd.h>
#include <sys/queue.h>
#include <unistd.h>
/*
* This is a simple threaded TCP server implementation. Each client is handled
* in a separate thread.
*
* It used to be much simpler; basically, we have two types of client
* connections: those made by cimple-worker and cimple-client respectively.
* cimple-server would keep track of cimple-worker threads/connections, and
* clean them up when assigning tasks/on shutdown.
*
* What about cimple-client connections though? I struggled to come up with a
* scheme that would allow cimple-server to clean them up gracefully. When
* would it do the cleanup even? I didn't want to do it on shutdown, since
* there would be potentially a lot of them.
*
* One solution is to make client threads detached. This is a common advise;
* I really don't understand the merit of this approach though. Client threads
* actively work on shared data, take locks, etc. Data corruption is very
* likely after the main thread exits and all the rest are killed.
*
* Another approach is pre-threading; we make a number of threads beforehand
* and handle all client connections; I view this approach as limiting in
* principle; probably that's foolish of me.
*
* Finally, I cannot bring myself to do non-blocking I/O. I honestly fear the
* amount of work it would require to maintain read buffers, etc.
*
* So I came up with this convoluted scheme. The TCP server adds the listening
* socket to the event loop, as before. Each client thread makes an eventfd
* descriptor that it writes to when it's about to finish. The eventfd
* descriptor is added to the event loop; once it's readable, we clean up the
* client thread quickly from the main event loop thread. The TCP server itself
* keeps track of client threads; on shutdown, it cleans up those still working.
*
* I'm _really_ not sure about this approach, it seems fishy as hell; I guess,
* we'll see.
*/
struct client {
struct tcp_server *server;
int conn_fd;
int cleanup_fd;
pid_t tid;
pthread_t thread;
SIMPLEQ_ENTRY(client) entries;
};
SIMPLEQ_HEAD(client_queue, client);
struct tcp_server {
struct event_loop *loop;
tcp_server_conn_handler conn_handler;
void *conn_handler_arg;
struct client_queue client_queue;
int accept_fd;
};
static void client_destroy(struct client *client)
{
log_debug("Cleaning up client thread %d\n", client->tid);
SIMPLEQ_REMOVE(&client->server->client_queue, client, client, entries);
pthread_errno_if(pthread_join(client->thread, NULL), "pthread_join");
net_close(client->cleanup_fd);
net_close(client->conn_fd);
free(client);
}
static int client_destroy_handler(UNUSED struct event_loop *loop, UNUSED int fd,
UNUSED short revents, void *_client)
{
struct client *client = (struct client *)_client;
log_debug("Client thread %d indicated that it's done\n", client->tid);
client_destroy(client);
return 0;
}
static void *client_thread_func(void *_client)
{
struct client *client = (struct client *)_client;
int ret = 0;
client->tid = gettid();
log_debug("New client thread thread %d has started\n", client->tid);
/* Let the client thread handle its signals except those that should be
* handled in the main thread. */
ret = signal_block_sigterms();
if (ret < 0)
goto cleanup;
client->server->conn_handler(client->conn_fd, client->server->conn_handler_arg);
cleanup:
log_errno_if(eventfd_write(client->cleanup_fd, 1), "eventfd_write");
return NULL;
}
static int client_create_thread(struct client *client)
{
sigset_t old_mask;
int ret = 0;
/* Block all signals (we'll unblock them later); the client thread will
* have all signals blocked initially. This allows the main thread to
* handle SIGINT/SIGTERM/etc. */
ret = signal_block_all(&old_mask);
if (ret < 0)
return ret;
ret = pthread_create(&client->thread, NULL, client_thread_func, client);
if (ret) {
pthread_errno(ret, "pthread_create");
goto restore_mask;
}
restore_mask:
/* Restore the previously-enabled signals for handling in the main thread. */
signal_set_mask(&old_mask);
return ret;
}
static int client_create(struct tcp_server *server, int conn_fd)
{
int ret = 0;
struct client *client = calloc(1, sizeof(struct client));
if (!client) {
log_errno("calloc");
return -1;
}
client->server = server;
client->conn_fd = conn_fd;
ret = eventfd(0, EFD_CLOEXEC);
if (ret < 0) {
log_errno("eventfd");
goto free;
}
client->cleanup_fd = ret;
ret = event_loop_add_once(server->loop, client->cleanup_fd, POLLIN, client_destroy_handler,
client);
if (ret < 0)
goto close_cleanup_fd;
SIMPLEQ_INSERT_TAIL(&server->client_queue, client, entries);
ret = client_create_thread(client);
if (ret < 0)
goto remove_from_client_queue;
return ret;
remove_from_client_queue:
SIMPLEQ_REMOVE(&server->client_queue, client, client, entries);
close_cleanup_fd:
net_close(client->cleanup_fd);
free:
free(client);
return ret;
}
static void client_queue_create(struct client_queue *client_queue)
{
SIMPLEQ_INIT(client_queue);
}
static void client_queue_destroy(struct client_queue *client_queue)
{
struct client *entry1 = SIMPLEQ_FIRST(client_queue);
while (entry1) {
struct client *entry2 = SIMPLEQ_NEXT(entry1, entries);
client_destroy(entry1);
entry1 = entry2;
}
}
static int tcp_server_accept_handler(UNUSED struct event_loop *loop, UNUSED int fd,
UNUSED short revents, void *_server)
{
struct tcp_server *server = (struct tcp_server *)_server;
return tcp_server_accept(server);
}
int tcp_server_create(struct tcp_server **_server, struct event_loop *loop, const char *port,
tcp_server_conn_handler conn_handler, void *conn_handler_arg)
{
int ret = 0;
struct tcp_server *server = calloc(1, sizeof(struct tcp_server));
if (!server) {
log_errno("calloc");
return -1;
}
server->loop = loop;
server->conn_handler = conn_handler;
server->conn_handler_arg = conn_handler_arg;
client_queue_create(&server->client_queue);
ret = net_bind(port);
if (ret < 0)
goto free;
server->accept_fd = ret;
ret = event_loop_add(loop, server->accept_fd, POLLIN, tcp_server_accept_handler, server);
if (ret < 0)
goto close;
*_server = server;
return ret;
close:
net_close(server->accept_fd);
free:
free(server);
return ret;
}
void tcp_server_destroy(struct tcp_server *server)
{
net_close(server->accept_fd);
client_queue_destroy(&server->client_queue);
free(server);
}
int tcp_server_accept(struct tcp_server *server)
{
int conn_fd = -1, ret = 0;
ret = net_accept(server->accept_fd);
if (ret < 0)
return ret;
conn_fd = ret;
ret = client_create(server, conn_fd);
if (ret < 0)
goto close_conn;
return ret;
close_conn:
net_close(conn_fd);
return ret;
}
|