Directory: | src/ |
---|---|
File: | src/tcp_server.c |
Date: | 2024-04-25 03:45:42 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 87 | 122 | 71.3% |
Branches: | 27 | 66 | 40.9% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /* | ||
2 | * Copyright (c) 2022 Egor Tensin <egor@tensin.name> | ||
3 | * This file is part of the "cimple" project. | ||
4 | * For details, see https://github.com/egor-tensin/cimple. | ||
5 | * Distributed under the MIT License. | ||
6 | */ | ||
7 | |||
8 | #include "tcp_server.h" | ||
9 | #include "compiler.h" | ||
10 | #include "event_loop.h" | ||
11 | #include "file.h" | ||
12 | #include "log.h" | ||
13 | #include "net.h" | ||
14 | #include "signal.h" | ||
15 | |||
16 | #include <poll.h> | ||
17 | #include <pthread.h> | ||
18 | #include <signal.h> | ||
19 | #include <stdlib.h> | ||
20 | #include <sys/eventfd.h> | ||
21 | #include <sys/queue.h> | ||
22 | #include <unistd.h> | ||
23 | |||
24 | /* | ||
25 | * This is a simple threaded TCP server implementation. Each client is handled | ||
26 | * in a separate thread. | ||
27 | * | ||
28 | * It used to be much simpler; basically, we have two types of client | ||
29 | * connections: those made by cimple-worker and cimple-client respectively. | ||
30 | * cimple-server would keep track of cimple-worker threads/connections, and | ||
31 | * clean them up when assigning tasks/on shutdown. | ||
32 | * | ||
33 | * What about cimple-client connections though? I struggled to come up with a | ||
34 | * scheme that would allow cimple-server to clean them up gracefully. When | ||
35 | * would it do the cleanup even? I didn't want to do it on shutdown, since | ||
36 | * there would be potentially a lot of them. | ||
37 | * | ||
38 | * One solution is to make client threads detached. This is a common advise; | ||
39 | * I really don't understand the merit of this approach though. Client threads | ||
40 | * actively work on shared data, take locks, etc. Data corruption is very | ||
41 | * likely after the main thread exits and all the rest are killed. | ||
42 | * | ||
43 | * Another approach is pre-threading; we make a number of threads beforehand | ||
44 | * and handle all client connections; I view this approach as limiting in | ||
45 | * principle; probably that's foolish of me. | ||
46 | * | ||
47 | * Finally, I cannot bring myself to do non-blocking I/O. I honestly fear the | ||
48 | * amount of work it would require to maintain read buffers, etc. | ||
49 | * | ||
50 | * So I came up with this convoluted scheme. The TCP server adds the listening | ||
51 | * socket to the event loop, as before. Each client thread makes an eventfd | ||
52 | * descriptor that it writes to when it's about to finish. The eventfd | ||
53 | * descriptor is added to the event loop; once it's readable, we clean up the | ||
54 | * client thread quickly from the main event loop thread. The TCP server itself | ||
55 | * keeps track of client threads; on shutdown, it cleans up those still working. | ||
56 | * | ||
57 | * I'm _really_ not sure about this approach, it seems fishy as hell; I guess, | ||
58 | * we'll see. | ||
59 | */ | ||
60 | |||
61 | struct client { | ||
62 | struct tcp_server *server; | ||
63 | int conn_fd; | ||
64 | |||
65 | int cleanup_fd; | ||
66 | |||
67 | pid_t tid; | ||
68 | pthread_t thread; | ||
69 | |||
70 | SIMPLEQ_ENTRY(client) entries; | ||
71 | }; | ||
72 | |||
73 | SIMPLEQ_HEAD(client_queue, client); | ||
74 | |||
75 | struct tcp_server { | ||
76 | struct event_loop *loop; | ||
77 | |||
78 | tcp_server_conn_handler conn_handler; | ||
79 | void *conn_handler_arg; | ||
80 | |||
81 | struct client_queue client_queue; | ||
82 | |||
83 | int accept_fd; | ||
84 | }; | ||
85 | |||
86 | 27620 | static void client_destroy(struct client *client) | |
87 | { | ||
88 |
1/2✓ Branch 1 taken 27620 times.
✗ Branch 2 not taken.
|
27620 | log_debug("Cleaning up client thread %d\n", client->tid); |
89 | |||
90 |
8/8✓ Branch 0 taken 14983 times.
✓ Branch 1 taken 12637 times.
✓ Branch 2 taken 6743 times.
✓ Branch 3 taken 8240 times.
✓ Branch 4 taken 8078 times.
✓ Branch 5 taken 12637 times.
✓ Branch 6 taken 9276 times.
✓ Branch 7 taken 3361 times.
|
35698 | SIMPLEQ_REMOVE(&client->server->client_queue, client, client, entries); |
91 |
1/4✗ Branch 1 not taken.
✓ Branch 2 taken 27620 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
|
27620 | pthread_errno_if(pthread_join(client->thread, NULL), "pthread_join"); |
92 | 27620 | file_close(client->cleanup_fd); | |
93 | 27620 | net_close(client->conn_fd); | |
94 | 27620 | free(client); | |
95 | 27620 | } | |
96 | |||
97 | 27620 | static int client_destroy_handler(UNUSED struct event_loop *loop, UNUSED int fd, | |
98 | UNUSED short revents, void *_client) | ||
99 | { | ||
100 | 27620 | struct client *client = (struct client *)_client; | |
101 |
1/2✓ Branch 1 taken 27620 times.
✗ Branch 2 not taken.
|
27620 | log_debug("Client thread %d indicated that it's done\n", client->tid); |
102 | |||
103 | 27620 | client_destroy(client); | |
104 | 27620 | return 0; | |
105 | } | ||
106 | |||
107 | 27620 | static void *client_thread_func(void *_client) | |
108 | { | ||
109 | 27620 | struct client *client = (struct client *)_client; | |
110 | 27620 | int ret = 0; | |
111 | |||
112 | 27620 | client->tid = gettid(); | |
113 |
1/2✓ Branch 1 taken 27620 times.
✗ Branch 2 not taken.
|
27620 | log_debug("New client thread thread %d has started\n", client->tid); |
114 | |||
115 | /* Let the client thread handle its signals except those that should be | ||
116 | * handled in the main thread. */ | ||
117 | 27620 | ret = signal_block_sigterms(); | |
118 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 27620 times.
|
27620 | if (ret < 0) |
119 | ✗ | goto cleanup; | |
120 | |||
121 | 27620 | ret = client->server->conn_handler(client->conn_fd, client->server->conn_handler_arg); | |
122 |
1/2✓ Branch 0 taken 27620 times.
✗ Branch 1 not taken.
|
27620 | if (ret < 0) |
123 | ✗ | goto cleanup; | |
124 | |||
125 | 27620 | cleanup: | |
126 |
1/4✗ Branch 1 not taken.
✓ Branch 2 taken 27620 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
|
27620 | log_errno_if(eventfd_write(client->cleanup_fd, 1), "eventfd_write"); |
127 | |||
128 | 27620 | return NULL; | |
129 | } | ||
130 | |||
131 | 27620 | static int client_create_thread(struct client *client) | |
132 | { | ||
133 | sigset_t old_mask; | ||
134 | 27620 | int ret = 0; | |
135 | |||
136 | /* Block all signals (we'll unblock them later); the client thread will | ||
137 | * have all signals blocked initially. This allows the main thread to | ||
138 | * handle SIGINT/SIGTERM/etc. */ | ||
139 | 27620 | ret = signal_block_all(&old_mask); | |
140 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 27620 times.
|
27620 | if (ret < 0) |
141 | ✗ | return ret; | |
142 | |||
143 | 27620 | ret = pthread_create(&client->thread, NULL, client_thread_func, client); | |
144 |
1/2✓ Branch 0 taken 27620 times.
✗ Branch 1 not taken.
|
27620 | if (ret) { |
145 | ✗ | pthread_errno(ret, "pthread_create"); | |
146 | ✗ | goto restore_mask; | |
147 | } | ||
148 | |||
149 | 27620 | restore_mask: | |
150 | /* Restore the previously-enabled signals for handling in the main thread. */ | ||
151 | 27620 | signal_set_mask(&old_mask); | |
152 | |||
153 | 27620 | return ret; | |
154 | } | ||
155 | |||
156 | 27620 | static int client_create(struct tcp_server *server, int conn_fd) | |
157 | { | ||
158 | 27620 | int ret = 0; | |
159 | |||
160 | 27620 | struct client *client = calloc(1, sizeof(struct client)); | |
161 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 27620 times.
|
27620 | if (!client) { |
162 | ✗ | log_errno("calloc"); | |
163 | ✗ | return -1; | |
164 | } | ||
165 | |||
166 | 27620 | client->server = server; | |
167 | 27620 | client->conn_fd = conn_fd; | |
168 | |||
169 | 27620 | ret = eventfd(0, EFD_CLOEXEC); | |
170 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 27620 times.
|
27620 | if (ret < 0) { |
171 | ✗ | log_errno("eventfd"); | |
172 | ✗ | goto free; | |
173 | } | ||
174 | 27620 | client->cleanup_fd = ret; | |
175 | |||
176 | 27620 | ret = event_loop_add_once(server->loop, client->cleanup_fd, POLLIN, client_destroy_handler, | |
177 | client); | ||
178 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 27620 times.
|
27620 | if (ret < 0) |
179 | ✗ | goto close_cleanup_fd; | |
180 | |||
181 | 27620 | SIMPLEQ_INSERT_TAIL(&server->client_queue, client, entries); | |
182 | |||
183 | 27620 | ret = client_create_thread(client); | |
184 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 27620 times.
|
27620 | if (ret < 0) |
185 | ✗ | goto remove_from_client_queue; | |
186 | |||
187 | 27620 | return ret; | |
188 | |||
189 | ✗ | remove_from_client_queue: | |
190 | ✗ | SIMPLEQ_REMOVE(&server->client_queue, client, client, entries); | |
191 | |||
192 | ✗ | close_cleanup_fd: | |
193 | ✗ | file_close(client->cleanup_fd); | |
194 | |||
195 | ✗ | free: | |
196 | ✗ | free(client); | |
197 | |||
198 | ✗ | return ret; | |
199 | } | ||
200 | |||
201 | 29 | static void client_queue_create(struct client_queue *client_queue) | |
202 | { | ||
203 | 29 | SIMPLEQ_INIT(client_queue); | |
204 | 29 | } | |
205 | |||
206 | 29 | static void client_queue_destroy(struct client_queue *client_queue) | |
207 | { | ||
208 | 29 | struct client *entry1 = SIMPLEQ_FIRST(client_queue); | |
209 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 29 times.
|
29 | while (entry1) { |
210 | ✗ | struct client *entry2 = SIMPLEQ_NEXT(entry1, entries); | |
211 | ✗ | client_destroy(entry1); | |
212 | ✗ | entry1 = entry2; | |
213 | } | ||
214 | 29 | } | |
215 | |||
216 | 27620 | static int tcp_server_accept_handler(UNUSED struct event_loop *loop, UNUSED int fd, | |
217 | UNUSED short revents, void *_server) | ||
218 | { | ||
219 | 27620 | struct tcp_server *server = (struct tcp_server *)_server; | |
220 | 27620 | return tcp_server_accept(server); | |
221 | } | ||
222 | |||
223 | 29 | int tcp_server_create(struct tcp_server **_server, struct event_loop *loop, const char *port, | |
224 | tcp_server_conn_handler conn_handler, void *conn_handler_arg) | ||
225 | { | ||
226 | 29 | int ret = 0; | |
227 | |||
228 | 29 | struct tcp_server *server = calloc(1, sizeof(struct tcp_server)); | |
229 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 29 times.
|
29 | if (!server) { |
230 | ✗ | log_errno("calloc"); | |
231 | ✗ | return -1; | |
232 | } | ||
233 | |||
234 | 29 | server->loop = loop; | |
235 | |||
236 | 29 | server->conn_handler = conn_handler; | |
237 | 29 | server->conn_handler_arg = conn_handler_arg; | |
238 | |||
239 | 29 | client_queue_create(&server->client_queue); | |
240 | |||
241 | 29 | ret = net_bind(port); | |
242 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 29 times.
|
29 | if (ret < 0) |
243 | ✗ | goto free; | |
244 | 29 | server->accept_fd = ret; | |
245 | |||
246 | 29 | ret = event_loop_add(loop, server->accept_fd, POLLIN, tcp_server_accept_handler, server); | |
247 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 29 times.
|
29 | if (ret < 0) |
248 | ✗ | goto close; | |
249 | |||
250 | 29 | *_server = server; | |
251 | 29 | return ret; | |
252 | |||
253 | ✗ | close: | |
254 | ✗ | net_close(server->accept_fd); | |
255 | ✗ | free: | |
256 | ✗ | free(server); | |
257 | |||
258 | ✗ | return ret; | |
259 | } | ||
260 | |||
261 | 29 | void tcp_server_destroy(struct tcp_server *server) | |
262 | { | ||
263 | 29 | net_close(server->accept_fd); | |
264 | 29 | client_queue_destroy(&server->client_queue); | |
265 | 29 | free(server); | |
266 | 29 | } | |
267 | |||
268 | 27620 | int tcp_server_accept(struct tcp_server *server) | |
269 | { | ||
270 | 27620 | int conn_fd = -1, ret = 0; | |
271 | |||
272 | 27620 | ret = net_accept(server->accept_fd); | |
273 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 27620 times.
|
27620 | if (ret < 0) |
274 | ✗ | return ret; | |
275 | 27620 | conn_fd = ret; | |
276 | |||
277 | 27620 | ret = client_create(server, conn_fd); | |
278 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 27620 times.
|
27620 | if (ret < 0) |
279 | ✗ | goto close_conn; | |
280 | |||
281 | 27620 | return ret; | |
282 | |||
283 | ✗ | close_conn: | |
284 | ✗ | net_close(conn_fd); | |
285 | |||
286 | ✗ | return ret; | |
287 | } | ||
288 |