From 0600cacfadf00e916340f2394f1d3bfc173a3d0b Mon Sep 17 00:00:00 2001 From: Egor Tensin Date: Sun, 9 Jul 2023 15:53:11 +0200 Subject: store process output in SQLite --- src/CMakeLists.txt | 6 +++-- src/base64.c | 62 ++++++++++++++++++++++++++++++++++++++++++++++++++++ src/base64.h | 16 ++++++++++++++ src/file.c | 11 +++++----- src/file.h | 2 +- src/process.c | 33 +++++++++++++++++----------- src/process.h | 8 +++---- src/protocol.c | 48 +++++++++++++++++++++++++++++++++------- src/protocol.h | 2 +- src/server.c | 6 ++--- src/sqlite.c | 14 ++++++++++++ src/sqlite.h | 3 +++ src/storage.c | 7 +++--- src/storage.h | 3 ++- src/storage_sqlite.c | 13 +++++++---- src/storage_sqlite.h | 3 ++- src/worker.c | 14 +++++++----- 17 files changed, 198 insertions(+), 53 deletions(-) create mode 100644 src/base64.c create mode 100644 src/base64.h (limited to 'src') diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index c26fe31..c85ffdf 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -56,6 +56,7 @@ function(add_my_executable name) endfunction() add_my_executable(server server_main.c server.c + base64.c cmd_line.c command.c const.c @@ -75,7 +76,7 @@ add_my_executable(server server_main.c server.c string.c tcp_server.c worker_queue.c) -target_link_libraries(server PRIVATE pthread sqlite3) +target_link_libraries(server PRIVATE pthread sodium sqlite3) target_include_directories(server PRIVATE "${CMAKE_CURRENT_BINARY_DIR}") add_my_executable(client client_main.c client.c @@ -87,6 +88,7 @@ add_my_executable(client client_main.c client.c net.c) add_my_executable(worker worker_main.c worker.c + base64.c ci.c cmd_line.c command.c @@ -102,4 +104,4 @@ add_my_executable(worker worker_main.c worker.c run_queue.c signal.c string.c) -target_link_libraries(worker PRIVATE git2 pthread) +target_link_libraries(worker PRIVATE git2 pthread sodium) diff --git a/src/base64.c b/src/base64.c new file mode 100644 index 0000000..f7fa3c4 --- /dev/null +++ b/src/base64.c @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2023 Egor Tensin + * This file is part of the "cimple" project. + * For details, see https://github.com/egor-tensin/cimple. + * Distributed under the MIT License. + */ + +#include "base64.h" +#include "log.h" + +#include + +#include +#include +#include + +static const int base64_variant = sodium_base64_VARIANT_ORIGINAL; + +int base64_encode(const unsigned char *src, size_t src_len, char **_dst) +{ + const size_t dst_len = sodium_base64_encoded_len(src_len, base64_variant); + + char *dst = calloc(dst_len, 1); + if (!dst) { + log_errno("calloc"); + return -1; + } + + sodium_bin2base64(dst, dst_len, src, src_len, base64_variant); + + *_dst = dst; + return 0; +} + +int base64_decode(const char *src, unsigned char **_dst, size_t *_dst_len) +{ + const size_t src_len = strlen(src); + const size_t dst_max_len = src_len / 4 * 3; + size_t dst_len = 0; + + unsigned char *dst = calloc(dst_max_len, 1); + if (!dst) { + log_errno("calloc"); + return -1; + } + + int ret = + sodium_base642bin(dst, dst_max_len, src, src_len, NULL, &dst_len, NULL, base64_variant); + if (ret < 0) { + log_err("Couldn't parse base64-encoded string\n"); + goto free; + } + + *_dst = dst; + *_dst_len = dst_len; + return ret; + +free: + free(dst); + + return ret; +} diff --git a/src/base64.h b/src/base64.h new file mode 100644 index 0000000..f86fa56 --- /dev/null +++ b/src/base64.h @@ -0,0 +1,16 @@ +/* + * Copyright (c) 2023 Egor Tensin + * This file is part of the "cimple" project. + * For details, see https://github.com/egor-tensin/cimple. + * Distributed under the MIT License. + */ + +#ifndef __BASE64_H__ +#define __BASE64_H__ + +#include + +int base64_encode(const unsigned char *src, size_t src_len, char **dst); +int base64_decode(const char *src, unsigned char **dst, size_t *dst_len); + +#endif diff --git a/src/file.c b/src/file.c index 27709be..6bb2683 100644 --- a/src/file.c +++ b/src/file.c @@ -124,14 +124,14 @@ int file_exists(const char *path) return !ret && S_ISREG(stat.st_mode); } -int file_read(int fd, char **_contents, size_t *_size) +int file_read(int fd, unsigned char **_contents, size_t *_size) { size_t alloc_size = 256; - char *contents = NULL; + unsigned char *contents = NULL; size_t size = 0; while (1) { - char *tmp_contents = realloc(contents, alloc_size); + unsigned char *tmp_contents = realloc(contents, alloc_size); if (!tmp_contents) { log_errno("realloc"); free(contents); @@ -139,7 +139,7 @@ int file_read(int fd, char **_contents, size_t *_size) } contents = tmp_contents; - ssize_t read_size = read(fd, contents + size, alloc_size - size - 1); + ssize_t read_size = read(fd, contents + size, alloc_size - size); if (read_size < 0) { log_errno("read"); @@ -154,9 +154,8 @@ int file_read(int fd, char **_contents, size_t *_size) } size += read_size; - contents[size] = '\0'; - if (size == alloc_size - 1) { + if (size == alloc_size) { alloc_size *= 2; } } diff --git a/src/file.h b/src/file.h index b4bba62..a6d9dc1 100644 --- a/src/file.h +++ b/src/file.h @@ -19,6 +19,6 @@ int file_dup(int fd); void file_close(int fd); int file_exists(const char *path); -int file_read(int fd, char **output, size_t *size); +int file_read(int fd, unsigned char **output, size_t *size); #endif diff --git a/src/process.c b/src/process.c index f800ad4..ee387bb 100644 --- a/src/process.c +++ b/src/process.c @@ -106,18 +106,18 @@ int proc_capture(const char *args[], const char *envp[], struct proc_output *res file_close(pipe_fds[1]); - ret = file_read(pipe_fds[0], &result->output, &result->output_len); + ret = file_read(pipe_fds[0], &result->data, &result->data_size); if (ret < 0) goto close_pipe; ret = wait_for_child(child_pid, &result->ec); if (ret < 0) - goto free_output; + goto free_data; goto close_pipe; -free_output: - free(result->output); +free_data: + free(result->data); close_pipe: file_close(pipe_fds[0]); @@ -127,23 +127,30 @@ close_pipe: return ret; } -void proc_output_init(struct proc_output *output) +int proc_output_create(struct proc_output **_output) { + struct proc_output *output = calloc(1, sizeof(struct proc_output)); + if (!output) { + log_errno("calloc"); + return -1; + } + output->ec = 0; - output->output = NULL; - output->output_len = 0; + output->data = NULL; + output->data_size = 0; + + *_output = output; + return 0; } -void proc_output_free(const struct proc_output *output) +void proc_output_destroy(struct proc_output *output) { - free(output->output); + free(output->data); + free(output); } void proc_output_dump(const struct proc_output *output) { log("Process exit code: %d\n", output->ec); - log("Process output:\n%s", output->output); - if (!output->output || !output->output_len || - output->output[output->output_len - 1] != '\n') - log("\n"); + log("Process output: %zu bytes\n", output->data_size); } diff --git a/src/process.h b/src/process.h index 5726fd3..f0aca1d 100644 --- a/src/process.h +++ b/src/process.h @@ -12,8 +12,8 @@ struct proc_output { int ec; - char *output; - size_t output_len; + unsigned char *data; + size_t data_size; }; /* The exit code is only valid if the functions returns a non-negative number. */ @@ -25,8 +25,8 @@ int proc_spawn(const char *args[], const char *envp[], int *ec); * In that case, you'll need to free the output. */ int proc_capture(const char *args[], const char *envp[], struct proc_output *result); -void proc_output_init(struct proc_output *); -void proc_output_free(const struct proc_output *); +int proc_output_create(struct proc_output **); +void proc_output_destroy(struct proc_output *); void proc_output_dump(const struct proc_output *); diff --git a/src/protocol.c b/src/protocol.c index 0b6e74e..5d1903a 100644 --- a/src/protocol.c +++ b/src/protocol.c @@ -6,6 +6,7 @@ */ #include "protocol.h" +#include "base64.h" #include "const.h" #include "log.h" #include "msg.h" @@ -15,6 +16,7 @@ #include #include +#include static int check_msg_length(const struct msg *msg, size_t expected) { @@ -77,35 +79,65 @@ int msg_start_parse(const struct msg *msg, struct run **run) int msg_finished_create(struct msg **msg, int run_id, const struct proc_output *output) { + int ret = 0; + char id[16]; char ec[16]; snprintf(id, sizeof(id), "%d", run_id); snprintf(ec, sizeof(ec), "%d", output->ec); - const char *argv[] = {CMD_FINISHED, id, ec, NULL}; + char *b64data = NULL; - return msg_from_argv(msg, argv); + ret = base64_encode(output->data, output->data_size, &b64data); + if (ret < 0) + return ret; + + const char *argv[] = {CMD_FINISHED, id, ec, b64data, NULL}; + + ret = msg_from_argv(msg, argv); + if (ret < 0) + goto free_b64data; + +free_b64data: + free(b64data); + + return ret; } -int msg_finished_parse(const struct msg *msg, int *run_id, struct proc_output *output) +int msg_finished_parse(const struct msg *msg, int *run_id, struct proc_output **_output) { int ret = 0; - ret = check_msg_length(msg, 3); + ret = check_msg_length(msg, 4); if (ret < 0) return ret; const char **argv = msg_get_strings(msg); - proc_output_init(output); + struct proc_output *output = NULL; + ret = proc_output_create(&output); + if (ret < 0) + return ret; ret = string_to_int(argv[1], run_id); if (ret < 0) - return ret; + goto free_output; ret = string_to_int(argv[2], &output->ec); if (ret < 0) - return ret; + goto free_output; - return 0; + const char *b64data = argv[3]; + + ret = base64_decode(b64data, &output->data, &output->data_size); + if (ret < 0) + goto free_output; + + *_output = output; + return ret; + +free_output: + proc_output_destroy(output); + + return ret; } diff --git a/src/protocol.h b/src/protocol.h index cde2417..99ec3ee 100644 --- a/src/protocol.h +++ b/src/protocol.h @@ -20,6 +20,6 @@ int msg_start_create(struct msg **, const struct run *); int msg_start_parse(const struct msg *, struct run **); int msg_finished_create(struct msg **, int run_id, const struct proc_output *); -int msg_finished_parse(const struct msg *, int *run_id, struct proc_output *); +int msg_finished_parse(const struct msg *, int *run_id, struct proc_output **); #endif diff --git a/src/server.c b/src/server.c index a4358d5..8e2fab3 100644 --- a/src/server.c +++ b/src/server.c @@ -295,13 +295,13 @@ static int server_handle_cmd_finished(const struct msg *request, UNUSED struct m int ret = 0; int run_id = 0; - struct proc_output output; + struct proc_output *output; ret = msg_finished_parse(request, &run_id, &output); if (ret < 0) return ret; - ret = storage_run_finished(&server->storage, run_id, output.ec); + ret = storage_run_finished(&server->storage, run_id, output); if (ret < 0) { log_err("Failed to mark run %d as finished\n", run_id); goto free_output; @@ -310,7 +310,7 @@ static int server_handle_cmd_finished(const struct msg *request, UNUSED struct m log("Marked run %d as finished\n", run_id); free_output: - proc_output_free(&output); + proc_output_destroy(output); return ret; } diff --git a/src/sqlite.c b/src/sqlite.c index 7080928..f064280 100644 --- a/src/sqlite.c +++ b/src/sqlite.c @@ -11,6 +11,7 @@ #include +#include #include #include #include @@ -231,6 +232,19 @@ int sqlite_bind_text(sqlite3_stmt *stmt, int index, const char *value) return ret; } +int sqlite_bind_blob(sqlite3_stmt *stmt, int index, unsigned char *value, size_t nb) +{ + int ret = 0; + + ret = sqlite3_bind_blob64(stmt, index, value, nb, SQLITE_STATIC); + if (ret) { + sqlite_errno(ret, "sqlite3_bind_blob64"); + return ret; + } + + return ret; +} + int sqlite_exec_as_transaction(sqlite3 *db, const char *stmt) { static const char *const fmt = "BEGIN; %s COMMIT;"; diff --git a/src/sqlite.h b/src/sqlite.h index 20386d4..556684f 100644 --- a/src/sqlite.h +++ b/src/sqlite.h @@ -10,6 +10,8 @@ #include +#include + int sqlite_init(void); void sqlite_destroy(void); @@ -27,6 +29,7 @@ int sqlite_step(sqlite3_stmt *); int sqlite_bind_int(sqlite3_stmt *, int column_index, int value); int sqlite_bind_text(sqlite3_stmt *, int column_index, const char *value); +int sqlite_bind_blob(sqlite3_stmt *, int column_index, unsigned char *value, size_t nb); int sqlite_column_int(sqlite3_stmt *, int column_index); int sqlite_column_text(sqlite3_stmt *, int column_index, char **result); diff --git a/src/storage.c b/src/storage.c index 5df9843..ebb2ce8 100644 --- a/src/storage.c +++ b/src/storage.c @@ -7,6 +7,7 @@ #include "storage.h" #include "log.h" +#include "process.h" #include "run_queue.h" #include "storage_sqlite.h" @@ -17,7 +18,7 @@ typedef int (*storage_create_t)(struct storage *, const struct storage_settings typedef void (*storage_destroy_t)(struct storage *); typedef int (*storage_run_create_t)(struct storage *, const char *repo_url, const char *rev); -typedef int (*storage_run_finished_t)(struct storage *, int repo_id, int ec); +typedef int (*storage_run_finished_t)(struct storage *, int repo_id, const struct proc_output *); typedef int (*storage_get_run_queue_t)(struct storage *, struct run_queue *); struct storage_api { @@ -98,12 +99,12 @@ int storage_run_create(struct storage *storage, const char *repo_url, const char return api->run_create(storage, repo_url, rev); } -int storage_run_finished(struct storage *storage, int run_id, int ec) +int storage_run_finished(struct storage *storage, int run_id, const struct proc_output *output) { const struct storage_api *api = get_api(storage->type); if (!api) return -1; - return api->run_finished(storage, run_id, ec); + return api->run_finished(storage, run_id, output); } int storage_get_run_queue(struct storage *storage, struct run_queue *queue) diff --git a/src/storage.h b/src/storage.h index 139d878..f7406a5 100644 --- a/src/storage.h +++ b/src/storage.h @@ -8,6 +8,7 @@ #ifndef __STORAGE_H__ #define __STORAGE_H__ +#include "process.h" #include "run_queue.h" #include "storage_sqlite.h" @@ -35,7 +36,7 @@ int storage_create(struct storage *, const struct storage_settings *); void storage_destroy(struct storage *); int storage_run_create(struct storage *, const char *repo_url, const char *rev); -int storage_run_finished(struct storage *, int run_id, int ec); +int storage_run_finished(struct storage *, int run_id, const struct proc_output *); int storage_get_run_queue(struct storage *, struct run_queue *); #endif diff --git a/src/storage_sqlite.c b/src/storage_sqlite.c index 2ac8b30..981a5dd 100644 --- a/src/storage_sqlite.c +++ b/src/storage_sqlite.c @@ -7,6 +7,7 @@ #include "storage_sqlite.h" #include "log.h" +#include "process.h" #include "run_queue.h" #include "sql/sqlite_sql.h" #include "sqlite.h" @@ -207,7 +208,7 @@ static int storage_sqlite_prepare_statements(struct storage_sqlite *storage) static const char *const fmt_run_insert = "INSERT INTO cimple_runs(status, ec, output, repo_id, rev) VALUES (?, -1, x'', ?, ?) RETURNING id;"; static const char *const fmt_run_finished = - "UPDATE cimple_runs SET status = ?, ec = ? WHERE id = ?;"; + "UPDATE cimple_runs SET status = ?, ec = ?, output = ? WHERE id = ?;"; int ret = 0; @@ -395,7 +396,8 @@ int storage_sqlite_run_create(struct storage *storage, const char *repo_url, con return ret; } -int storage_sqlite_run_finished(struct storage *storage, int run_id, int ec) +int storage_sqlite_run_finished(struct storage *storage, int run_id, + const struct proc_output *output) { struct prepared_stmt *stmt = &storage->sqlite->stmt_run_finished; int ret = 0; @@ -406,10 +408,13 @@ int storage_sqlite_run_finished(struct storage *storage, int run_id, int ec) ret = sqlite_bind_int(stmt->impl, 1, RUN_STATUS_FINISHED); if (ret < 0) goto reset; - ret = sqlite_bind_int(stmt->impl, 2, ec); + ret = sqlite_bind_int(stmt->impl, 2, output->ec); if (ret < 0) goto reset; - ret = sqlite_bind_int(stmt->impl, 3, run_id); + ret = sqlite_bind_blob(stmt->impl, 3, output->data, output->data_size); + if (ret < 0) + goto reset; + ret = sqlite_bind_int(stmt->impl, 4, run_id); if (ret < 0) goto reset; ret = sqlite_step(stmt->impl); diff --git a/src/storage_sqlite.h b/src/storage_sqlite.h index cecf7e1..857b9c0 100644 --- a/src/storage_sqlite.h +++ b/src/storage_sqlite.h @@ -8,6 +8,7 @@ #ifndef __STORAGE_SQLITE_H__ #define __STORAGE_SQLITE_H__ +#include "process.h" #include "run_queue.h" struct storage_settings; @@ -23,7 +24,7 @@ int storage_sqlite_create(struct storage *, const struct storage_settings *); void storage_sqlite_destroy(struct storage *); int storage_sqlite_run_create(struct storage *, const char *repo_url, const char *rev); -int storage_sqlite_run_finished(struct storage *, int id, int ec); +int storage_sqlite_run_finished(struct storage *, int id, const struct proc_output *); int storage_sqlite_get_run_queue(struct storage *, struct run_queue *runs); #endif diff --git a/src/worker.c b/src/worker.c index b75ec4e..e45bfba 100644 --- a/src/worker.c +++ b/src/worker.c @@ -180,20 +180,22 @@ static int worker_do_run(struct worker *worker) { int ret = 0; - struct proc_output result; - proc_output_init(&result); + struct proc_output *result = NULL; + ret = proc_output_create(&result); + if (ret < 0) + return ret; - ret = ci_run_git_repo(run_get_url(worker->run), run_get_rev(worker->run), &result); + ret = ci_run_git_repo(run_get_url(worker->run), run_get_rev(worker->run), result); if (ret < 0) { log_err("Run failed with an error\n"); goto free_output; } - proc_output_dump(&result); + proc_output_dump(result); struct msg *finished_msg = NULL; - ret = msg_finished_create(&finished_msg, run_get_id(worker->run), &result); + ret = msg_finished_create(&finished_msg, run_get_id(worker->run), result); if (ret < 0) goto free_output; @@ -206,7 +208,7 @@ free_finished_msg: msg_free(finished_msg); free_output: - proc_output_free(&result); + proc_output_destroy(result); run_destroy(worker->run); -- cgit v1.2.3