GCC Code Coverage Report


Directory: src/
File: src/server.c
Date: 2023-08-28 07:33:56
Exec Total Coverage
Lines: 182 255 71.4%
Branches: 58 138 42.0%

Line Branch Exec Source
1 /*
2 * Copyright (c) 2022 Egor Tensin <Egor.Tensin@gmail.com>
3 * This file is part of the "cimple" project.
4 * For details, see https://github.com/egor-tensin/cimple.
5 * Distributed under the MIT License.
6 */
7
8 #include "server.h"
9 #include "command.h"
10 #include "compiler.h"
11 #include "const.h"
12 #include "event_loop.h"
13 #include "file.h"
14 #include "json_rpc.h"
15 #include "log.h"
16 #include "net.h"
17 #include "process.h"
18 #include "protocol.h"
19 #include "run_queue.h"
20 #include "signal.h"
21 #include "storage.h"
22 #include "storage_sqlite.h"
23 #include "tcp_server.h"
24 #include "worker_queue.h"
25
26 #include <poll.h>
27 #include <pthread.h>
28 #include <stdlib.h>
29
30 struct server {
31 pthread_mutex_t server_mtx;
32 pthread_cond_t server_cv;
33
34 int stopping;
35
36 struct cmd_dispatcher *cmd_dispatcher;
37
38 struct event_loop *event_loop;
39 int signalfd;
40
41 struct worker_queue worker_queue;
42 struct run_queue run_queue;
43
44 struct storage storage;
45
46 pthread_t main_thread;
47
48 struct tcp_server *tcp_server;
49 };
50
51 18472 static int server_lock(struct server *server)
52 {
53 18472 int ret = pthread_mutex_lock(&server->server_mtx);
54
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 18472 times.
18472 if (ret) {
55 pthread_errno(ret, "pthread_mutex_lock");
56 return ret;
57 }
58 18472 return ret;
59 }
60
61 18472 static void server_unlock(struct server *server)
62 {
63
1/4
✗ Branch 1 not taken.
✓ Branch 2 taken 18472 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
18472 pthread_errno_if(pthread_mutex_unlock(&server->server_mtx), "pthread_mutex_unlock");
64 18472 }
65
66 18195 static int server_wait(struct server *server)
67 {
68 18195 int ret = pthread_cond_wait(&server->server_cv, &server->server_mtx);
69
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 18195 times.
18195 if (ret) {
70 pthread_errno(ret, "pthread_cond_wait");
71 return ret;
72 }
73 18195 return ret;
74 }
75
76 18443 static void server_notify(struct server *server)
77 {
78
1/4
✗ Branch 1 not taken.
✓ Branch 2 taken 18443 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
18443 pthread_errno_if(pthread_cond_signal(&server->server_cv), "pthread_cond_signal");
79 18443 }
80
81 29 static int server_set_stopping(UNUSED struct event_loop *loop, UNUSED int fd, UNUSED short revents,
82 void *_server)
83 {
84 29 struct server *server = (struct server *)_server;
85 29 int ret = 0;
86
87 29 ret = server_lock(server);
88
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 29 times.
29 if (ret < 0)
89 return ret;
90
91 29 server->stopping = 1;
92
93 29 server_notify(server);
94 29 server_unlock(server);
95 29 return ret;
96 }
97
98 27169 static int server_has_workers(const struct server *server)
99 {
100 27169 return !worker_queue_is_empty(&server->worker_queue);
101 }
102
103 9234 static int server_enqueue_worker(struct server *server, struct worker *worker)
104 {
105 9234 int ret = 0;
106
107 9234 ret = server_lock(server);
108
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9234 times.
9234 if (ret < 0)
109 return ret;
110
111 9234 worker_queue_add_last(&server->worker_queue, worker);
112
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 9234 times.
9234 log("Added a new worker %d to the queue\n", worker_get_fd(worker));
113
114 9234 server_notify(server);
115 9234 server_unlock(server);
116 9234 return ret;
117 }
118
119 27375 static int server_has_runs(const struct server *server)
120 {
121 27375 return !run_queue_is_empty(&server->run_queue);
122 }
123
124 9180 static int server_enqueue_run(struct server *server, struct run *run)
125 {
126 9180 int ret = 0;
127
128 9180 ret = storage_run_create(&server->storage, run_get_url(run), run_get_rev(run));
129
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9180 times.
9180 if (ret < 0)
130 return ret;
131 9180 run_set_id(run, ret);
132
133 9180 ret = server_lock(server);
134
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9180 times.
9180 if (ret < 0)
135 return ret;
136
137 9180 run_queue_add_last(&server->run_queue, run);
138
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 9180 times.
9180 log("Added a new run %d for repository %s to the queue\n", run_get_id(run),
139 run_get_url(run));
140
141 9180 server_notify(server);
142 9180 server_unlock(server);
143 9180 return ret;
144 }
145
146 27404 static int server_ready_for_action(const struct server *server)
147 {
148
6/6
✓ Branch 0 taken 27375 times.
✓ Branch 1 taken 29 times.
✓ Branch 3 taken 27169 times.
✓ Branch 4 taken 206 times.
✓ Branch 6 taken 9180 times.
✓ Branch 7 taken 17989 times.
27404 return server->stopping || (server_has_runs(server) && server_has_workers(server));
149 }
150
151 9209 static int server_wait_for_action(struct server *server)
152 {
153 9209 int ret = 0;
154
155
2/2
✓ Branch 1 taken 18195 times.
✓ Branch 2 taken 9209 times.
27404 while (!server_ready_for_action(server)) {
156 18195 ret = server_wait(server);
157
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 18195 times.
18195 if (ret < 0)
158 return ret;
159 }
160
161 9209 return ret;
162 }
163
164 9180 static void server_assign_run(struct server *server)
165 {
166 9180 struct run *run = run_queue_remove_first(&server->run_queue);
167
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 9180 times.
9180 log("Removed run %d for repository %s from the queue\n", run_get_id(run), run_get_url(run));
168
169 9180 struct worker *worker = worker_queue_remove_first(&server->worker_queue);
170
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 9180 times.
9180 log("Removed worker %d from the queue\n", worker_get_fd(worker));
171
172 9180 struct jsonrpc_request *start_request = NULL;
173 9180 int ret = 0;
174
175 9180 ret = start_request_create(&start_request, run);
176
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9180 times.
9180 if (ret < 0)
177 goto exit;
178
179 9180 ret = jsonrpc_request_send(start_request, worker_get_fd(worker));
180 9180 jsonrpc_request_destroy(start_request);
181
1/2
✓ Branch 0 taken 9180 times.
✗ Branch 1 not taken.
9180 if (ret < 0)
182 goto exit;
183
184 9180 exit:
185
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9180 times.
9180 if (ret < 0) {
186 log("Failed to assign run for repository %s to worker %d, requeueing\n",
187 run_get_url(run), worker_get_fd(worker));
188 run_queue_add_first(&server->run_queue, run);
189 } else {
190
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 9180 times.
9180 log("Assigned run %d for repository %s to worker %d\n", run_get_id(run),
191 run_get_url(run), worker_get_fd(worker));
192 9180 run_destroy(run);
193 }
194
195 9180 worker_destroy(worker);
196 9180 }
197
198 29 static void *server_main_thread(void *_server)
199 {
200 29 struct server *server = (struct server *)_server;
201 29 int ret = 0;
202
203 29 ret = server_lock(server);
204
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 29 times.
29 if (ret < 0)
205 goto exit;
206
207 while (1) {
208 9209 ret = server_wait_for_action(server);
209
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9209 times.
9209 if (ret < 0)
210 goto unlock;
211
212
2/2
✓ Branch 0 taken 29 times.
✓ Branch 1 taken 9180 times.
9209 if (server->stopping)
213 29 goto unlock;
214
215 9180 server_assign_run(server);
216 }
217
218 29 unlock:
219 29 server_unlock(server);
220
221 29 exit:
222 29 return NULL;
223 }
224
225 9234 static int server_handle_cmd_new_worker(UNUSED const struct jsonrpc_request *request,
226 UNUSED struct jsonrpc_response **response, void *_ctx)
227 {
228 9234 struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx;
229 9234 struct server *server = (struct server *)ctx->arg;
230 9234 int ret = 0;
231
232 9234 ret = file_dup(ctx->fd);
233
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9234 times.
9234 if (ret < 0)
234 return ret;
235
236 9234 const int fd = ret;
237 9234 struct worker *worker = NULL;
238
239 9234 ret = worker_create(&worker, fd);
240
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9234 times.
9234 if (ret < 0)
241 goto close;
242
243 9234 ret = server_enqueue_worker(server, worker);
244
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9234 times.
9234 if (ret < 0)
245 goto destroy_worker;
246
247 9234 return ret;
248
249 destroy_worker:
250 worker_destroy(worker);
251
252 close:
253 net_close(fd);
254
255 return ret;
256 }
257
258 9180 static int server_handle_cmd_run(const struct jsonrpc_request *request,
259 struct jsonrpc_response **response, void *_ctx)
260 {
261 9180 struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx;
262 9180 struct server *server = (struct server *)ctx->arg;
263 9180 int ret = 0;
264
265 9180 struct run *run = NULL;
266
267 9180 ret = run_request_parse(request, &run);
268
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9180 times.
9180 if (ret < 0)
269 return ret;
270
271 9180 ret = jsonrpc_response_create(response, request, NULL);
272
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9180 times.
9180 if (ret < 0)
273 goto destroy_run;
274
275 9180 ret = server_enqueue_run(server, run);
276
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9180 times.
9180 if (ret < 0)
277 goto free_response;
278
279 9180 return ret;
280
281 free_response:
282 jsonrpc_response_destroy(*response);
283 *response = NULL;
284
285 destroy_run:
286 run_destroy(run);
287
288 return ret;
289 }
290
291 9180 static int server_handle_cmd_finished(const struct jsonrpc_request *request,
292 UNUSED struct jsonrpc_response **response, void *_ctx)
293 {
294 9180 struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx;
295 9180 struct server *server = (struct server *)ctx->arg;
296 9180 int ret = 0;
297
298 9180 int run_id = 0;
299 struct proc_output *output;
300
301 9180 ret = finished_request_parse(request, &run_id, &output);
302
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9180 times.
9180 if (ret < 0)
303 return ret;
304
305 9180 ret = storage_run_finished(&server->storage, run_id, output);
306
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9180 times.
9180 if (ret < 0) {
307 log_err("Failed to mark run %d as finished\n", run_id);
308 goto free_output;
309 }
310
311
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 9180 times.
9180 log("Marked run %d as finished\n", run_id);
312
313 9180 free_output:
314 9180 proc_output_destroy(output);
315
316 9180 return ret;
317 }
318
319 static struct cmd_desc commands[] = {
320 {CMD_NEW_WORKER, server_handle_cmd_new_worker},
321 {CMD_RUN, server_handle_cmd_run},
322 {CMD_FINISHED, server_handle_cmd_finished},
323 };
324
325 static const size_t numof_commands = sizeof(commands) / sizeof(commands[0]);
326
327 29 int server_create(struct server **_server, const struct settings *settings)
328 {
329 struct storage_settings storage_settings;
330 29 int ret = 0;
331
332 29 struct server *server = malloc(sizeof(struct server));
333
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 29 times.
29 if (!server) {
334 log_errno("malloc");
335 return -1;
336 }
337
338 29 ret = pthread_mutex_init(&server->server_mtx, NULL);
339
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 29 times.
29 if (ret) {
340 pthread_errno(ret, "pthread_mutex_init");
341 goto free;
342 }
343
344 29 ret = pthread_cond_init(&server->server_cv, NULL);
345
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 29 times.
29 if (ret) {
346 pthread_errno(ret, "pthread_cond_init");
347 goto destroy_mtx;
348 }
349
350 29 server->stopping = 0;
351
352 29 ret = cmd_dispatcher_create(&server->cmd_dispatcher, commands, numof_commands, server);
353
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 29 times.
29 if (ret < 0)
354 goto destroy_cv;
355
356 29 ret = event_loop_create(&server->event_loop);
357
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 29 times.
29 if (ret < 0)
358 goto destroy_cmd_dispatcher;
359
360 29 ret = signalfd_create_sigterms();
361
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 29 times.
29 if (ret < 0)
362 goto destroy_event_loop;
363 29 server->signalfd = ret;
364
365 29 ret = event_loop_add(server->event_loop, server->signalfd, POLLIN, server_set_stopping,
366 server);
367
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 29 times.
29 if (ret < 0)
368 goto close_signalfd;
369
370 29 worker_queue_create(&server->worker_queue);
371
372 29 ret = storage_sqlite_settings_create(&storage_settings, settings->sqlite_path);
373
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 29 times.
29 if (ret < 0)
374 goto destroy_worker_queue;
375
376 29 ret = storage_create(&server->storage, &storage_settings);
377 29 storage_settings_destroy(&storage_settings);
378
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 29 times.
29 if (ret < 0)
379 goto destroy_worker_queue;
380
381 29 ret = storage_get_run_queue(&server->storage, &server->run_queue);
382
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 29 times.
29 if (ret < 0)
383 goto destroy_storage;
384
385 29 ret = tcp_server_create(&server->tcp_server, server->event_loop, settings->port,
386 29 cmd_dispatcher_handle_conn, server->cmd_dispatcher);
387
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 29 times.
29 if (ret < 0)
388 goto destroy_run_queue;
389
390 29 ret = pthread_create(&server->main_thread, NULL, server_main_thread, server);
391
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 29 times.
29 if (ret) {
392 pthread_errno(ret, "pthread_create");
393 goto destroy_tcp_server;
394 }
395
396 29 *_server = server;
397 29 return ret;
398
399 destroy_tcp_server:
400 tcp_server_destroy(server->tcp_server);
401
402 destroy_run_queue:
403 run_queue_destroy(&server->run_queue);
404
405 destroy_storage:
406 storage_destroy(&server->storage);
407
408 destroy_worker_queue:
409 worker_queue_destroy(&server->worker_queue);
410
411 close_signalfd:
412 signalfd_destroy(server->signalfd);
413
414 destroy_event_loop:
415 event_loop_destroy(server->event_loop);
416
417 destroy_cmd_dispatcher:
418 cmd_dispatcher_destroy(server->cmd_dispatcher);
419
420 destroy_cv:
421 pthread_errno_if(pthread_cond_destroy(&server->server_cv), "pthread_cond_destroy");
422
423 destroy_mtx:
424 pthread_errno_if(pthread_mutex_destroy(&server->server_mtx), "pthread_mutex_destroy");
425
426 free:
427 free(server);
428
429 return ret;
430 }
431
432 29 void server_destroy(struct server *server)
433 {
434
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 29 times.
29 log("Shutting down\n");
435
436
1/4
✗ Branch 1 not taken.
✓ Branch 2 taken 29 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
29 pthread_errno_if(pthread_join(server->main_thread, NULL), "pthread_join");
437 29 tcp_server_destroy(server->tcp_server);
438 29 storage_destroy(&server->storage);
439 29 run_queue_destroy(&server->run_queue);
440 29 worker_queue_destroy(&server->worker_queue);
441 29 signalfd_destroy(server->signalfd);
442 29 event_loop_destroy(server->event_loop);
443 29 cmd_dispatcher_destroy(server->cmd_dispatcher);
444
1/4
✗ Branch 1 not taken.
✓ Branch 2 taken 29 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
29 pthread_errno_if(pthread_cond_destroy(&server->server_cv), "pthread_cond_destroy");
445
1/4
✗ Branch 1 not taken.
✓ Branch 2 taken 29 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
29 pthread_errno_if(pthread_mutex_destroy(&server->server_mtx), "pthread_mutex_destroy");
446 29 free(server);
447 29 }
448
449 29 static int server_listen_thread(struct server *server)
450 {
451 29 int ret = 0;
452
453
2/2
✓ Branch 0 taken 53910 times.
✓ Branch 1 taken 29 times.
53939 while (!server->stopping) {
454
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 53910 times.
53910 log("Waiting for new connections\n");
455
456 53910 ret = event_loop_run(server->event_loop);
457
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 53910 times.
53910 if (ret < 0)
458 return ret;
459 }
460
461 29 return 0;
462 }
463
464 29 int server_main(struct server *server)
465 {
466 29 return server_listen_thread(server);
467 }
468