diff options
Diffstat (limited to '')
-rw-r--r-- | src/CMakeLists.txt | 20 | ||||
-rw-r--r-- | src/client.c | 73 | ||||
-rw-r--r-- | src/client_main.c | 5 | ||||
-rw-r--r-- | src/command.c | 65 | ||||
-rw-r--r-- | src/command.h | 9 | ||||
-rw-r--r-- | src/json.c | 256 | ||||
-rw-r--r-- | src/json.h | 44 | ||||
-rw-r--r-- | src/json_rpc.c | 587 | ||||
-rw-r--r-- | src/json_rpc.h | 57 | ||||
-rw-r--r-- | src/msg.c | 308 | ||||
-rw-r--r-- | src/msg.h | 44 | ||||
-rw-r--r-- | src/protocol.c | 163 | ||||
-rw-r--r-- | src/protocol.h | 16 | ||||
-rw-r--r-- | src/server.c | 29 | ||||
-rw-r--r-- | src/worker.c | 37 |
15 files changed, 1223 insertions, 490 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index d994943..a6a42df 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -60,8 +60,9 @@ add_my_executable(server server_main.c server.c const.c event_loop.c file.c + json.c + json_rpc.c log.c - msg.c net.c process.c protocol.c @@ -74,16 +75,22 @@ add_my_executable(server server_main.c server.c string.c tcp_server.c worker_queue.c) -target_link_libraries(server PRIVATE pthread sodium sqlite3) +target_link_libraries(server PRIVATE json-c pthread sodium sqlite3) target_include_directories(server PRIVATE "${CMAKE_CURRENT_BINARY_DIR}") add_my_executable(client client_main.c client.c + base64.c cmd_line.c const.c file.c + json.c + json_rpc.c log.c - msg.c - net.c) + net.c + process.c + protocol.c + run_queue.c) +target_link_libraries(client PRIVATE json-c sodium) add_my_executable(worker worker_main.c worker.c base64.c @@ -94,12 +101,13 @@ add_my_executable(worker worker_main.c worker.c event_loop.c file.c git.c + json.c + json_rpc.c log.c - msg.c net.c process.c protocol.c run_queue.c signal.c string.c) -target_link_libraries(worker PRIVATE git2 pthread sodium) +target_link_libraries(worker PRIVATE git2 json-c pthread sodium) diff --git a/src/client.c b/src/client.c index 5259d52..8c6c597 100644 --- a/src/client.c +++ b/src/client.c @@ -8,10 +8,15 @@ #include "client.h" #include "cmd_line.h" #include "compiler.h" +#include "const.h" +#include "json_rpc.h" #include "log.h" -#include "msg.h" +#include "net.h" +#include "protocol.h" +#include "run_queue.h" #include <stdlib.h> +#include <string.h> struct client { int dummy; @@ -36,30 +41,70 @@ void client_destroy(struct client *client) free(client); } +static int make_request(struct jsonrpc_request **request, int argc, const char **argv) +{ + if (argc < 1) { + exit_with_usage_err("no action specified"); + return -1; + } + + if (!strcmp(argv[0], CMD_RUN)) { + if (argc != 3) + return -1; + + struct run *run = NULL; + int ret = run_create(&run, 0, argv[1], argv[2]); + if (ret < 0) + return ret; + + ret = run_request_create(request, run); + run_destroy(run); + return ret; + } + + return -1; +} + int client_main(UNUSED const struct client *client, const struct settings *settings, int argc, const char **argv) { - struct msg *response = NULL; int ret = 0; - if (argc < 1) { - exit_with_usage_err("no message to send to the server"); - return -1; + struct jsonrpc_request *request = NULL; + ret = make_request(&request, argc, argv); + if (ret < 0) { + exit_with_usage_err("invalid request"); + return ret; } - ret = msg_connect_and_talk_argv(settings->host, settings->port, argv, &response); - if (ret < 0 || !response || !msg_is_success(response)) { - log_err("Failed to connect to server or it couldn't process the request\n"); - if (response) - msg_dump(response); - if (!ret) - ret = -1; + ret = net_connect(settings->host, settings->port); + if (ret < 0) + goto free_request; + int fd = ret; + + ret = jsonrpc_request_send(request, fd); + if (ret < 0) + goto close; + + struct jsonrpc_response *response = NULL; + ret = jsonrpc_response_recv(&response, fd); + if (ret < 0) + goto close; + + if (jsonrpc_response_is_error(response)) { + log_err("server failed to process the request\n"); + ret = -1; goto free_response; } free_response: - if (response) - msg_free(response); + jsonrpc_response_destroy(response); + +close: + net_close(fd); + +free_request: + jsonrpc_request_destroy(request); return ret; } diff --git a/src/client_main.c b/src/client_main.c index 40d7995..15ad449 100644 --- a/src/client_main.c +++ b/src/client_main.c @@ -24,7 +24,10 @@ static struct settings default_settings(void) const char *get_usage_string(void) { - return "[-h|--help] [-V|--version] [-v|--verbose] [-H|--host HOST] [-p|--port PORT] ACTION [ARG...]"; + return "[-h|--help] [-V|--version] [-v|--verbose] [-H|--host HOST] [-p|--port PORT] ACTION [ARG...]\n\ +\n\ +available actions:\n\ +\t" CMD_RUN " URL REV - schedule a CI run of repository at URL, revision REV"; } static int parse_settings(struct settings *settings, int argc, char *argv[]) diff --git a/src/command.c b/src/command.c index 47280fe..9946c13 100644 --- a/src/command.c +++ b/src/command.c @@ -8,8 +8,8 @@ #include "command.h" #include "compiler.h" #include "event_loop.h" +#include "json_rpc.h" #include "log.h" -#include "msg.h" #include <poll.h> #include <stdlib.h> @@ -107,9 +107,10 @@ void cmd_dispatcher_destroy(struct cmd_dispatcher *dispatcher) } static int cmd_dispatcher_handle_internal(const struct cmd_dispatcher *dispatcher, - const struct msg *command, struct msg **result, void *arg) + const struct jsonrpc_request *request, + struct jsonrpc_response **result, void *arg) { - const char *actual_cmd = msg_get_first_string(command); + const char *actual_cmd = jsonrpc_request_get_method(request); for (size_t i = 0; i < dispatcher->numof_cmds; ++i) { struct cmd_desc *cmd = &dispatcher->cmds[i]; @@ -117,16 +118,15 @@ static int cmd_dispatcher_handle_internal(const struct cmd_dispatcher *dispatche if (strcmp(cmd->name, actual_cmd)) continue; - return cmd->handler(command, result, arg); + return cmd->handler(request, result, arg); } - log_err("Received an unknown command\n"); - msg_dump(command); + log_err("Received an unknown command: %s\n", actual_cmd); return -1; } -int cmd_dispatcher_handle(const struct cmd_dispatcher *dispatcher, const struct msg *command, - struct msg **result) +int cmd_dispatcher_handle(const struct cmd_dispatcher *dispatcher, + const struct jsonrpc_request *command, struct jsonrpc_response **result) { return cmd_dispatcher_handle_internal(dispatcher, command, result, dispatcher->ctx); } @@ -147,32 +147,59 @@ static struct cmd_conn_ctx *make_conn_ctx(int fd, void *arg) static int cmd_dispatcher_handle_conn_internal(int conn_fd, struct cmd_dispatcher *dispatcher) { - struct msg *request = NULL, *response = NULL; int ret = 0; struct cmd_conn_ctx *new_ctx = make_conn_ctx(conn_fd, dispatcher->ctx); if (!new_ctx) return -1; - ret = msg_recv(conn_fd, &request); + struct jsonrpc_request *request = NULL; + ret = jsonrpc_request_recv(&request, conn_fd); if (ret < 0) goto free_ctx; - ret = cmd_dispatcher_handle_internal(dispatcher, request, &response, new_ctx); - if (ret < 0) - goto free_response; + const int requires_response = !jsonrpc_request_is_notification(request); + + struct jsonrpc_response *default_response = NULL; + if (requires_response) { + ret = jsonrpc_response_create(&default_response, request, NULL); + if (ret < 0) + goto free_request; + } - if (response) { - ret = msg_send(conn_fd, response); + struct jsonrpc_response *default_error = NULL; + if (requires_response) { + ret = jsonrpc_error_create(&default_error, request, -1, "An error occured"); if (ret < 0) - goto free_response; + goto free_default_response; + } + + struct jsonrpc_response *response = NULL; + ret = cmd_dispatcher_handle_internal(dispatcher, request, &response, new_ctx); + + if (requires_response) { + struct jsonrpc_response *actual_response = response; + if (!actual_response) { + actual_response = ret < 0 ? default_error : default_response; + } + if (ret < 0 && !jsonrpc_response_is_error(actual_response)) { + actual_response = default_error; + } + ret = jsonrpc_response_send(actual_response, conn_fd) < 0 ? -1 : ret; } -free_response: if (response) - msg_free(response); + jsonrpc_response_destroy(response); + + if (default_error) + jsonrpc_response_destroy(default_error); + +free_default_response: + if (default_response) + jsonrpc_response_destroy(default_response); - msg_free(request); +free_request: + jsonrpc_request_destroy(request); free_ctx: free(new_ctx); diff --git a/src/command.h b/src/command.h index 8daa912..0edba98 100644 --- a/src/command.h +++ b/src/command.h @@ -9,11 +9,12 @@ #define __COMMAND_H__ #include "event_loop.h" -#include "msg.h" +#include "json_rpc.h" #include <stddef.h> -typedef int (*cmd_handler)(const struct msg *request, struct msg **response, void *ctx); +typedef int (*cmd_handler)(const struct jsonrpc_request *request, + struct jsonrpc_response **response, void *ctx); struct cmd_desc { char *name; @@ -26,8 +27,8 @@ int cmd_dispatcher_create(struct cmd_dispatcher **, struct cmd_desc *, size_t nu void *ctx); void cmd_dispatcher_destroy(struct cmd_dispatcher *); -int cmd_dispatcher_handle(const struct cmd_dispatcher *, const struct msg *command, - struct msg **response); +int cmd_dispatcher_handle(const struct cmd_dispatcher *, const struct jsonrpc_request *command, + struct jsonrpc_response **response); struct cmd_conn_ctx { int fd; diff --git a/src/json.c b/src/json.c new file mode 100644 index 0000000..a01aae6 --- /dev/null +++ b/src/json.c @@ -0,0 +1,256 @@ +/* + * Copyright (c) 2023 Egor Tensin <Egor.Tensin@gmail.com> + * This file is part of the "cimple" project. + * For details, see https://github.com/egor-tensin/cimple. + * Distributed under the MIT License. + */ + +#include "json.h" +#include "log.h" +#include "net.h" + +#include <json-c/json_object.h> +#include <json-c/json_tokener.h> + +#include <errno.h> +#include <stdint.h> +#include <stdlib.h> +#include <string.h> + +char *json_to_string(struct json_object *obj) +{ + const char *result = json_object_to_json_string(obj); + if (!result) { + json_errno("json_object_to_json_string"); + return NULL; + } + + char *_result = strdup(result); + if (!_result) { + log_errno("strdup"); + return NULL; + } + + return _result; +} + +struct json_object *json_from_string(const char *src) +{ + enum json_tokener_error error; + + struct json_object *result = json_tokener_parse_verbose(src, &error); + if (!result) { + json_errno("json_tokener_parse_verbose"); + log_err("JSON: parsing failed: %s\n", json_tokener_error_desc(error)); + return NULL; + } + + return result; +} + +int json_clone(const struct json_object *obj, const char *key, struct json_object **_value) +{ + int ret = 0; + + struct json_object *old_value = NULL; + ret = json_get(obj, key, &old_value); + if (ret < 0) + return ret; + + struct json_object *new_value = NULL; + ret = json_object_deep_copy(old_value, &new_value, NULL); + if (ret < 0) + return ret; + + *_value = new_value; + return ret; +} + +int json_send(struct json_object *obj, int fd) +{ + int ret = 0; + + const char *str = json_to_string(obj); + if (!str) + return -1; + + struct buf *buf = NULL; + ret = buf_pack_strings(&buf, 1, &str); + free((char *)str); + if (ret < 0) + return ret; + + ret = net_send_buf(fd, buf); + buf_destroy(buf); + if (ret < 0) + return ret; + + return ret; +} + +struct json_object *json_recv(int fd) +{ + struct json_object *result = NULL; + int ret = 0; + + struct buf *buf = NULL; + ret = net_recv_buf(fd, &buf); + if (ret < 0) + return NULL; + + result = json_from_string((const char *)buf_get_data(buf)); + if (!result) + goto destroy_buf; + +destroy_buf: + buf_destroy(buf); + + return result; +} + +int json_has(const struct json_object *obj, const char *key) +{ + return json_object_object_get_ex(obj, key, NULL); +} + +int json_get(const struct json_object *obj, const char *key, struct json_object **value) +{ + if (!json_has(obj, key)) { + log_err("JSON: key is missing: %s\n", key); + return -1; + } + + return json_object_object_get_ex(obj, key, value); +} + +int json_get_string(const struct json_object *obj, const char *key, const char **_value) +{ + struct json_object *value = NULL; + + int ret = json_get(obj, key, &value); + if (ret < 0) + return ret; + + if (!json_object_is_type(value, json_type_string)) { + log_err("JSON: key is not a string: %s\n", key); + return -1; + } + + *_value = json_object_get_string(value); + return 0; +} + +int json_get_int(const struct json_object *obj, const char *key, int64_t *_value) +{ + struct json_object *value = NULL; + + int ret = json_get(obj, key, &value); + if (ret < 0) + return ret; + + if (!json_object_is_type(value, json_type_int)) { + log_err("JSON: key is not an integer: %s\n", key); + return -1; + } + + errno = 0; + int64_t tmp = json_object_get_int64(value); + if (errno) { + log_err("JSON: failed to parse integer from key: %s\n", key); + return -1; + } + + *_value = tmp; + return 0; +} + +static int json_set_internal(struct json_object *obj, const char *key, struct json_object *value, + unsigned flags) +{ + int ret = 0; + + ret = json_object_object_add_ex(obj, key, value, flags); + if (ret < 0) { + json_errno("json_object_object_add_ex"); + return ret; + } + + return 0; +} + +static int json_set_string_internal(struct json_object *obj, const char *key, const char *_value, + unsigned flags) +{ + struct json_object *value = json_object_new_string(_value); + if (!value) { + json_errno("json_object_new_string"); + return -1; + } + + int ret = json_set_internal(obj, key, value, flags); + if (ret < 0) + goto free_value; + + return ret; + +free_value: + json_object_put(value); + + return ret; +} + +static int json_set_int_internal(struct json_object *obj, const char *key, int64_t _value, + unsigned flags) +{ + struct json_object *value = json_object_new_int64(_value); + if (!value) { + json_errno("json_object_new_int"); + return -1; + } + + int ret = json_set_internal(obj, key, value, flags); + if (ret < 0) + goto free_value; + + return ret; + +free_value: + json_object_put(value); + + return ret; +} + +int json_set(struct json_object *obj, const char *key, struct json_object *value) +{ + return json_set_internal(obj, key, value, 0); +} + +int json_set_string(struct json_object *obj, const char *key, const char *value) +{ + return json_set_string_internal(obj, key, value, 0); +} + +int json_set_int(struct json_object *obj, const char *key, int64_t value) +{ + return json_set_int_internal(obj, key, value, 0); +} + +#ifndef JSON_C_OBJECT_ADD_CONSTANT_KEY +#define JSON_C_OBJECT_ADD_CONSTANT_KEY JSON_C_OBJECT_KEY_IS_CONSTANT +#endif +static const unsigned json_const_key_flags = JSON_C_OBJECT_ADD_CONSTANT_KEY; + +int json_set_const_key(struct json_object *obj, const char *key, struct json_object *value) +{ + return json_set_internal(obj, key, value, json_const_key_flags); +} + +int json_set_string_const_key(struct json_object *obj, const char *key, const char *value) +{ + return json_set_string_internal(obj, key, value, json_const_key_flags); +} + +int json_set_int_const_key(struct json_object *obj, const char *key, int64_t value) +{ + return json_set_int_internal(obj, key, value, json_const_key_flags); +} diff --git a/src/json.h b/src/json.h new file mode 100644 index 0000000..77bfde4 --- /dev/null +++ b/src/json.h @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2023 Egor Tensin <Egor.Tensin@gmail.com> + * This file is part of the "cimple" project. + * For details, see https://github.com/egor-tensin/cimple. + * Distributed under the MIT License. + */ + +#ifndef __JSON_H__ +#define __JSON_H__ + +#include "log.h" + +#include <json-c/json_object.h> + +#include <stdint.h> + +#define json_errno(fn) \ + do { \ + log_err("JSON: %s failed\n", fn); \ + } while (0) + +char *json_to_string(struct json_object *); +struct json_object *json_from_string(const char *); + +int json_clone(const struct json_object *, const char *key, struct json_object **value); + +int json_send(struct json_object *, int fd); +struct json_object *json_recv(int fd); + +int json_has(const struct json_object *, const char *key); + +int json_get(const struct json_object *, const char *key, struct json_object **value); +int json_get_string(const struct json_object *, const char *key, const char **value); +int json_get_int(const struct json_object *, const char *key, int64_t *value); + +int json_set(struct json_object *, const char *key, struct json_object *value); +int json_set_string(struct json_object *, const char *key, const char *value); +int json_set_int(struct json_object *, const char *key, int64_t value); + +int json_set_const_key(struct json_object *, const char *, struct json_object *value); +int json_set_string_const_key(struct json_object *, const char *, const char *value); +int json_set_int_const_key(struct json_object *, const char *, int64_t value); + +#endif diff --git a/src/json_rpc.c b/src/json_rpc.c new file mode 100644 index 0000000..538cff0 --- /dev/null +++ b/src/json_rpc.c @@ -0,0 +1,587 @@ +/* + * Copyright (c) 2023 Egor Tensin <Egor.Tensin@gmail.com> + * This file is part of the "cimple" project. + * For details, see https://github.com/egor-tensin/cimple. + * Distributed under the MIT License. + */ + +#include "json_rpc.h" +#include "json.h" +#include "log.h" + +#include <json-c/json_object.h> + +#include <stdint.h> +#include <stdlib.h> +#include <string.h> + +struct jsonrpc_request { + struct json_object *impl; +}; + +struct jsonrpc_response { + struct json_object *impl; +}; + +static const char *const jsonrpc_key_version = "jsonrpc"; +static const char *const jsonrpc_key_id = "id"; +static const char *const jsonrpc_key_method = "method"; +static const char *const jsonrpc_key_params = "params"; + +static const char *const jsonrpc_value_version = "2.0"; + +static int jsonrpc_check_version(struct json_object *obj) +{ + const char *key = jsonrpc_key_version; + const char *version = NULL; + + int ret = json_get_string(obj, key, &version); + if (ret < 0) + return ret; + + if (strcmp(version, jsonrpc_value_version)) { + log_err("JSON-RPC: invalid '%s' value: %s\n", key, version); + return -1; + } + + return 0; +} + +static int jsonrpc_set_version(struct json_object *obj) +{ + return json_set_string_const_key(obj, jsonrpc_key_version, jsonrpc_value_version); +} + +static int jsonrpc_check_id_type(struct json_object *id) +{ + if (!json_object_is_type(id, json_type_string) && !json_object_is_type(id, json_type_int)) { + log_err("JSON-RPC: key '%s' must be either an integer or a string\n", + jsonrpc_key_id); + return -1; + } + return 0; +} + +static int jsonrpc_check_id(struct json_object *obj, int required) +{ + const char *key = jsonrpc_key_id; + + if (!json_has(obj, key)) { + if (!required) + return 0; + log_err("JSON-RPC: key is missing: %s\n", key); + return -1; + } + + struct json_object *id = NULL; + + int ret = json_get(obj, key, &id); + if (ret < 0) + return ret; + return jsonrpc_check_id_type(id); +} + +static int jsonrpc_set_id(struct json_object *obj, int id) +{ + return json_set_int_const_key(obj, jsonrpc_key_id, id); +} + +static int jsonrpc_check_method(struct json_object *obj) +{ + const char *key = jsonrpc_key_method; + const char *method = NULL; + return json_get_string(obj, key, &method); +} + +static int jsonrpc_set_method(struct json_object *obj, const char *method) +{ + return json_set_string_const_key(obj, jsonrpc_key_method, method); +} + +static int jsonrpc_check_params_type(struct json_object *params) +{ + if (!json_object_is_type(params, json_type_object) && + !json_object_is_type(params, json_type_array)) { + log_err("JSON-RPC: key '%s' must be either an object or an array\n", + jsonrpc_key_params); + return -1; + } + return 0; +} + +static int jsonrpc_check_params(struct json_object *obj) +{ + const char *key = jsonrpc_key_params; + + if (!json_has(obj, key)) + return 0; + + struct json_object *params = NULL; + + int ret = json_get(obj, key, ¶ms); + if (ret < 0) + return ret; + return jsonrpc_check_params_type(params); +} + +static int jsonrpc_set_params(struct json_object *obj, struct json_object *params) +{ + const char *key = jsonrpc_key_params; + + int ret = jsonrpc_check_params_type(params); + if (ret < 0) + return ret; + return json_set_const_key(obj, key, params); +} + +static const char *const jsonrpc_key_result = "result"; +static const char *const jsonrpc_key_error = "error"; + +static const char *const jsonrpc_key_code = "code"; +static const char *const jsonrpc_key_message = "message"; + +static int jsonrpc_check_error(struct json_object *obj) +{ + const char *key = jsonrpc_key_error; + struct json_object *error = NULL; + + int ret = json_get(obj, key, &error); + if (ret < 0) + return ret; + + int64_t code = -1; + + ret = json_get_int(error, jsonrpc_key_code, &code); + if (ret < 0) { + log_err("JSON-RPC: key is missing or not an integer: %s\n", jsonrpc_key_code); + return -1; + } + + const char *message = NULL; + + ret = json_get_string(error, jsonrpc_key_message, &message); + if (ret < 0) { + log_err("JSON-RPC: key is missing or not a string: %s\n", jsonrpc_key_message); + return -1; + } + + return ret; +} + +static int jsonrpc_check_result_or_error(struct json_object *obj) +{ + const char *key_result = jsonrpc_key_result; + const char *key_error = jsonrpc_key_error; + + if (!json_has(obj, key_result) && !json_has(obj, key_error)) { + log_err("JSON-RPC: either '%s' or '%s' must be present\n", key_result, key_error); + return -1; + } + + if (json_has(obj, key_result)) + return 0; + + return jsonrpc_check_error(obj); +} + +static int jsonrpc_request_create_internal(struct jsonrpc_request **_request, int *id, + const char *method, struct json_object *params) +{ + int ret = 0; + + struct jsonrpc_request *request = malloc(sizeof(struct jsonrpc_request)); + if (!request) { + log_errno("malloc"); + ret = -1; + goto exit; + } + + request->impl = json_object_new_object(); + if (!request->impl) { + json_errno("json_object_new_object"); + ret = -1; + goto free; + } + + ret = jsonrpc_set_version(request->impl); + if (ret < 0) + goto free_impl; + + if (id) { + ret = jsonrpc_set_id(request->impl, *id); + if (ret < 0) + goto free_impl; + } + + ret = jsonrpc_set_method(request->impl, method); + if (ret < 0) + goto free_impl; + + if (params) { + ret = jsonrpc_set_params(request->impl, params); + if (ret < 0) + goto free_impl; + } + + *_request = request; + goto exit; + +free_impl: + json_object_put(request->impl); +free: + free(request); +exit: + return ret; +} + +int jsonrpc_request_create(struct jsonrpc_request **_request, int id, const char *method, + struct json_object *params) +{ + return jsonrpc_request_create_internal(_request, &id, method, params); +} + +void jsonrpc_request_destroy(struct jsonrpc_request *request) +{ + json_object_put(request->impl); + free(request); +} + +int jsonrpc_notification_create(struct jsonrpc_request **_request, const char *method, + struct json_object *params) +{ + return jsonrpc_request_create_internal(_request, NULL, method, params); +} + +int jsonrpc_request_is_notification(const struct jsonrpc_request *request) +{ + return !json_has(request->impl, jsonrpc_key_id); +} + +const char *jsonrpc_request_to_string(struct jsonrpc_request *request) +{ + return json_to_string(request->impl); +} + +static int jsonrpc_request_from_json(struct jsonrpc_request **_request, struct json_object *impl) +{ + int ret = 0; + + ret = jsonrpc_check_version(impl); + if (ret < 0) + return ret; + ret = jsonrpc_check_id(impl, 0); + if (ret < 0) + return ret; + ret = jsonrpc_check_method(impl); + if (ret < 0) + return ret; + ret = jsonrpc_check_params(impl); + if (ret < 0) + return ret; + + struct jsonrpc_request *request = malloc(sizeof(struct jsonrpc_request)); + if (!request) { + log_errno("malloc"); + return -1; + } + request->impl = impl; + + *_request = request; + return ret; +} + +int jsonrpc_request_parse(struct jsonrpc_request **_request, const char *src) +{ + struct json_object *impl = json_from_string(src); + if (!impl) { + log_err("JSON-RPC: failed to parse request\n"); + return -1; + } + + int ret = jsonrpc_request_from_json(_request, impl); + if (ret < 0) + goto free_impl; + + return ret; + +free_impl: + json_object_put(impl); + + return ret; +} + +int jsonrpc_request_send(const struct jsonrpc_request *request, int fd) +{ + return json_send(request->impl, fd); +} + +int jsonrpc_request_recv(struct jsonrpc_request **request, int fd) +{ + struct json_object *impl = json_recv(fd); + if (!impl) { + log_err("JSON-RPC: failed to receive request\n"); + return -1; + } + + int ret = jsonrpc_request_from_json(request, impl); + if (ret < 0) + goto free_impl; + + return ret; + +free_impl: + json_object_put(impl); + + return ret; +} + +const char *jsonrpc_request_get_method(const struct jsonrpc_request *request) +{ + const char *method = NULL; + int ret = json_get_string(request->impl, jsonrpc_key_method, &method); + if (ret < 0) { + /* Should never happen. */ + return NULL; + } + return method; +} + +static struct json_object *jsonrpc_request_create_params(struct jsonrpc_request *request) +{ + const char *const key = jsonrpc_key_params; + + if (!json_has(request->impl, key)) { + struct json_object *params = json_object_new_object(); + if (!params) { + json_errno("json_object_new_object"); + return NULL; + } + int ret = json_set(request->impl, key, params); + if (ret < 0) { + json_object_put(params); + return NULL; + } + return params; + } + + struct json_object *params = NULL; + int ret = json_get(request->impl, key, ¶ms); + if (ret < 0) + return NULL; + return params; +} + +int jsonrpc_request_get_param_string(const struct jsonrpc_request *request, const char *name, + const char **value) +{ + struct json_object *params = NULL; + int ret = json_get(request->impl, jsonrpc_key_params, ¶ms); + if (ret < 0) + return ret; + return json_get_string(params, name, value); +} + +int jsonrpc_request_set_param_string(struct jsonrpc_request *request, const char *name, + const char *value) +{ + struct json_object *params = jsonrpc_request_create_params(request); + if (!params) + return -1; + return json_set_string(params, name, value); +} + +int jsonrpc_request_get_param_int(const struct jsonrpc_request *request, const char *name, + int64_t *value) +{ + struct json_object *params = NULL; + int ret = json_get(request->impl, jsonrpc_key_params, ¶ms); + if (ret < 0) + return ret; + return json_get_int(params, name, value); +} + +int jsonrpc_request_set_param_int(struct jsonrpc_request *request, const char *name, int64_t value) +{ + struct json_object *params = jsonrpc_request_create_params(request); + if (!params) + return -1; + return json_set_int(params, name, value); +} + +int jsonrpc_response_create_internal(struct jsonrpc_response **_response, + const struct jsonrpc_request *request, + struct json_object *result, struct json_object *error) +{ + int ret = 0; + + struct jsonrpc_response *response = malloc(sizeof(struct jsonrpc_response)); + if (!response) { + log_errno("malloc"); + ret = -1; + goto exit; + } + + response->impl = json_object_new_object(); + if (!response->impl) { + json_errno("json_object_new_object"); + ret = -1; + goto free; + } + + ret = jsonrpc_set_version(response->impl); + if (ret < 0) + goto free_impl; + + struct json_object *id = NULL; + ret = json_clone(request->impl, jsonrpc_key_id, &id); + if (ret < 0) + goto free_impl; + + ret = json_set(response->impl, jsonrpc_key_id, id); + if (ret < 0) { + json_object_put(id); + goto free_impl; + } + + if (error) { + ret = json_set_const_key(response->impl, jsonrpc_key_error, error); + if (ret < 0) + goto free_impl; + } else { + ret = json_set_const_key(response->impl, jsonrpc_key_result, result); + if (ret < 0) + goto free_impl; + } + + *_response = response; + goto exit; + +free_impl: + json_object_put(response->impl); +free: + free(response); +exit: + return ret; +} + +int jsonrpc_response_create(struct jsonrpc_response **response, + const struct jsonrpc_request *request, struct json_object *result) +{ + return jsonrpc_response_create_internal(response, request, result, NULL); +} + +void jsonrpc_response_destroy(struct jsonrpc_response *response) +{ + json_object_put(response->impl); + free(response); +} + +int jsonrpc_error_create(struct jsonrpc_response **response, struct jsonrpc_request *request, + int code, const char *message) +{ + struct json_object *error = json_object_new_object(); + if (!error) { + json_errno("json_object_new_object"); + return -1; + } + + int ret = 0; + + ret = json_set_int_const_key(error, jsonrpc_key_code, code); + if (ret < 0) + goto free; + ret = json_set_string_const_key(error, jsonrpc_key_message, message); + if (ret < 0) + goto free; + + ret = jsonrpc_response_create_internal(response, request, NULL, error); + if (ret < 0) + goto free; + + return ret; + +free: + json_object_put(error); + + return ret; +} + +int jsonrpc_response_is_error(const struct jsonrpc_response *response) +{ + return json_has(response->impl, jsonrpc_key_error); +} + +const char *jsonrpc_response_to_string(struct jsonrpc_response *response) +{ + return json_to_string(response->impl); +} + +static int jsonrpc_response_from_json(struct jsonrpc_response **_response, struct json_object *impl) +{ + int ret = 0; + + ret = jsonrpc_check_version(impl); + if (ret < 0) + return ret; + ret = jsonrpc_check_id(impl, 1); + if (ret < 0) + return ret; + ret = jsonrpc_check_result_or_error(impl); + if (ret < 0) + return ret; + + struct jsonrpc_response *response = malloc(sizeof(struct jsonrpc_response)); + if (!response) { + log_errno("malloc"); + return -1; + } + response->impl = impl; + + *_response = response; + return ret; +} + +int jsonrpc_response_parse(struct jsonrpc_response **_response, const char *src) +{ + struct json_object *impl = json_from_string(src); + if (!impl) { + log_err("JSON-RPC: failed to parse response\n"); + return -1; + } + + int ret = jsonrpc_response_from_json(_response, impl); + if (ret < 0) + goto free_impl; + + return ret; + +free_impl: + json_object_put(impl); + + return ret; +} + +int jsonrpc_response_send(const struct jsonrpc_response *response, int fd) +{ + return json_send(response->impl, fd); +} + +int jsonrpc_response_recv(struct jsonrpc_response **response, int fd) +{ + struct json_object *impl = json_recv(fd); + if (!impl) { + log_err("JSON-RPC: failed to receive response\n"); + return -1; + } + + int ret = jsonrpc_response_from_json(response, impl); + if (ret < 0) + goto free_impl; + + return ret; + +free_impl: + json_object_put(impl); + + return ret; +} diff --git a/src/json_rpc.h b/src/json_rpc.h new file mode 100644 index 0000000..6e2be04 --- /dev/null +++ b/src/json_rpc.h @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2023 Egor Tensin <Egor.Tensin@gmail.com> + * This file is part of the "cimple" project. + * For details, see https://github.com/egor-tensin/cimple. + * Distributed under the MIT License. + */ + +#ifndef __JSON_RPC_H__ +#define __JSON_RPC_H__ + +/* This attempts to adhere to the format described in https://www.jsonrpc.org/specification. */ + +#include <json-c/json_object.h> + +#include <stdint.h> + +struct jsonrpc_request; + +int jsonrpc_request_create(struct jsonrpc_request **, int id, const char *method, + struct json_object *params); +void jsonrpc_request_destroy(struct jsonrpc_request *); + +int jsonrpc_notification_create(struct jsonrpc_request **, const char *method, + struct json_object *params); +int jsonrpc_request_is_notification(const struct jsonrpc_request *); + +const char *jsonrpc_request_to_string(struct jsonrpc_request *); +int jsonrpc_request_parse(struct jsonrpc_request **, const char *src); + +int jsonrpc_request_send(const struct jsonrpc_request *, int fd); +int jsonrpc_request_recv(struct jsonrpc_request **, int fd); + +const char *jsonrpc_request_get_method(const struct jsonrpc_request *); + +int jsonrpc_request_get_param_string(const struct jsonrpc_request *, const char *name, + const char **); +int jsonrpc_request_set_param_string(struct jsonrpc_request *, const char *name, const char *); +int jsonrpc_request_get_param_int(const struct jsonrpc_request *, const char *name, int64_t *); +int jsonrpc_request_set_param_int(struct jsonrpc_request *, const char *name, int64_t); + +struct jsonrpc_response; + +int jsonrpc_response_create(struct jsonrpc_response **, const struct jsonrpc_request *, + struct json_object *result); +void jsonrpc_response_destroy(struct jsonrpc_response *); + +int jsonrpc_error_create(struct jsonrpc_response **, struct jsonrpc_request *, int code, + const char *message); +int jsonrpc_response_is_error(const struct jsonrpc_response *); + +const char *jsonrpc_response_to_string(struct jsonrpc_response *); +int jsonrpc_response_parse(struct jsonrpc_response **, const char *src); + +int jsonrpc_response_send(const struct jsonrpc_response *, int fd); +int jsonrpc_response_recv(struct jsonrpc_response **, int fd); + +#endif diff --git a/src/msg.c b/src/msg.c deleted file mode 100644 index 0de8a13..0000000 --- a/src/msg.c +++ /dev/null @@ -1,308 +0,0 @@ -/* - * Copyright (c) 2022 Egor Tensin <Egor.Tensin@gmail.com> - * This file is part of the "cimple" project. - * For details, see https://github.com/egor-tensin/cimple. - * Distributed under the MIT License. - */ - -#include "msg.h" -#include "log.h" -#include "net.h" - -#include <stdlib.h> -#include <string.h> - -struct msg { - size_t argc; - const char **argv; -}; - -size_t msg_get_length(const struct msg *msg) -{ - return msg->argc; -} - -const char **msg_get_strings(const struct msg *msg) -{ - return msg->argv; -} - -const char *msg_get_first_string(const struct msg *msg) -{ - return msg->argv[0]; -} - -int msg_success(struct msg **msg) -{ - static const char *argv[] = {"success", NULL}; - return msg_from_argv(msg, argv); -} - -int msg_error(struct msg **msg) -{ - static const char *argv[] = {"error", NULL}; - return msg_from_argv(msg, argv); -} - -int msg_is_success(const struct msg *msg) -{ - return msg->argc == 1 && !strcmp(msg->argv[0], "success"); -} - -int msg_is_error(const struct msg *msg) -{ - return msg->argc == 1 && !strcmp(msg->argv[0], "error"); -} - -static int msg_copy_argv(struct msg *msg, const char **argv) -{ - size_t copied = 0; - - msg->argv = calloc(msg->argc + 1, sizeof(const char *)); - if (!msg->argv) { - log_errno("calloc"); - return -1; - } - - for (copied = 0; copied < msg->argc; ++copied) { - msg->argv[copied] = strdup(argv[copied]); - if (!msg->argv[copied]) { - log_errno("strdup"); - goto free_copied; - } - } - - return 0; - -free_copied: - for (size_t i = 0; i < copied; ++i) { - free((char *)msg->argv[i]); - } - - free(msg->argv); - - return -1; -} - -int msg_copy(struct msg **_dest, const struct msg *src) -{ - int ret = 0; - - struct msg *dest = malloc(sizeof(struct msg)); - if (!dest) { - log_errno("malloc"); - return -1; - } - - dest->argc = src->argc; - - ret = msg_copy_argv(dest, (const char **)src->argv); - if (ret < 0) - goto free; - - *_dest = dest; - return 0; - -free: - free(dest); - - return -1; -} - -void msg_free(struct msg *msg) -{ - for (size_t i = 0; i < msg->argc; ++i) - free((char *)msg->argv[i]); - free(msg->argv); - free(msg); -} - -int msg_from_argv(struct msg **_msg, const char **argv) -{ - int ret = 0; - - struct msg *msg = malloc(sizeof(struct msg)); - if (!msg) { - log_errno("malloc"); - return -1; - } - - msg->argc = 0; - for (const char **s = argv; *s; ++s) - ++msg->argc; - - if (!msg->argc) { - log_err("A message must contain at least one string\n"); - goto free; - } - - ret = msg_copy_argv(msg, argv); - if (ret < 0) - goto free; - - *_msg = msg; - return 0; - -free: - free(msg); - - return -1; -} - -int msg_send(int fd, const struct msg *msg) -{ - struct buf *buf = NULL; - int ret = 0; - - ret = buf_pack_strings(&buf, msg->argc, msg->argv); - if (ret < 0) - return ret; - - ret = net_send_buf(fd, buf); - if (ret < 0) - goto destroy_buf; - -destroy_buf: - buf_destroy(buf); - - return ret; -} - -int msg_send_argv(int fd, const char **argv) -{ - struct msg *msg = NULL; - int ret = 0; - - ret = msg_from_argv(&msg, argv); - if (ret < 0) - return ret; - - ret = msg_send(fd, msg); - msg_free(msg); - if (ret < 0) - return ret; - - return ret; -} - -int msg_recv(int fd, struct msg **_msg) -{ - struct buf *buf = NULL; - int ret = 0; - - ret = net_recv_buf(fd, &buf); - if (ret < 0) - return ret; - - struct msg *msg = malloc(sizeof(struct msg)); - if (!msg) { - log_errno("malloc"); - ret = -1; - goto destroy_buf; - } - - ret = buf_unpack_strings(buf, &msg->argc, &msg->argv); - if (ret < 0) - goto free_msg; - - if (!msg->argc) { - log_err("A message must contain at least one string\n"); - goto free_msg; - } - - *_msg = msg; - goto destroy_buf; - -free_msg: - free(msg); - -destroy_buf: - buf_destroy(buf); - - return ret; -} - -int msg_talk(int fd, const struct msg *request, struct msg **response) -{ - int ret = 0; - - if (!request && !response) { - log_err("For communication, there must be at least a request and/or response\n"); - return -1; - } - - if (request) { - ret = msg_send(fd, request); - if (ret < 0) - return ret; - } - - if (response) { - ret = msg_recv(fd, response); - if (ret < 0) - return ret; - } - - return ret; -} - -int msg_talk_argv(int fd, const char **argv, struct msg **response) -{ - struct msg *request = NULL; - int ret = 0; - - ret = msg_from_argv(&request, argv); - if (ret < 0) - return ret; - - ret = msg_talk(fd, request, response); - msg_free(request); - if (ret < 0) - return ret; - - return ret; -} - -int msg_connect_and_talk(const char *host, const char *port, const struct msg *request, - struct msg **response) -{ - int fd = -1, ret = 0; - - fd = net_connect(host, port); - if (fd < 0) - return fd; - - ret = msg_talk(fd, request, response); - if (ret < 0) - goto close; - -close: - net_close(fd); - - return ret; -} - -int msg_connect_and_talk_argv(const char *host, const char *port, const char **argv, - struct msg **response) -{ - struct msg *request = NULL; - int ret = 0; - - ret = msg_from_argv(&request, argv); - if (ret < 0) - return ret; - - ret = msg_connect_and_talk(host, port, request, response); - msg_free(request); - if (ret < 0) - return ret; - - return ret; -} - -void msg_dump(const struct msg *msg) -{ - log("Message[%zu]:\n", msg->argc); - for (size_t i = 0; i < msg->argc; ++i) - log("\t%s\n", msg->argv[i]); -} diff --git a/src/msg.h b/src/msg.h deleted file mode 100644 index 44ebf44..0000000 --- a/src/msg.h +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright (c) 2022 Egor Tensin <Egor.Tensin@gmail.com> - * This file is part of the "cimple" project. - * For details, see https://github.com/egor-tensin/cimple. - * Distributed under the MIT License. - */ - -#ifndef __MSG_H__ -#define __MSG_H__ - -#include <stddef.h> - -struct msg; - -int msg_from_argv(struct msg **, const char **argv); -void msg_free(struct msg *); - -int msg_copy(struct msg **, const struct msg *); - -size_t msg_get_length(const struct msg *); -const char **msg_get_strings(const struct msg *); -const char *msg_get_first_string(const struct msg *); - -int msg_success(struct msg **); -int msg_error(struct msg **); - -int msg_is_success(const struct msg *); -int msg_is_error(const struct msg *); - -int msg_recv(int fd, struct msg **); - -int msg_send(int fd, const struct msg *); -int msg_send_argv(int fd, const char **argv); - -int msg_talk(int fd, const struct msg *, struct msg **response); -int msg_talk_argv(int fd, const char **argv, struct msg **response); - -int msg_connect_and_talk(const char *host, const char *port, const struct msg *, struct msg **); -int msg_connect_and_talk_argv(const char *host, const char *port, const char **argv, - struct msg **response); - -void msg_dump(const struct msg *); - -#endif diff --git a/src/protocol.c b/src/protocol.c index 5d1903a..8ee57e1 100644 --- a/src/protocol.c +++ b/src/protocol.c @@ -7,132 +7,181 @@ #include "protocol.h" #include "base64.h" +#include "compiler.h" #include "const.h" -#include "log.h" -#include "msg.h" +#include "json_rpc.h" #include "process.h" #include "run_queue.h" -#include "string.h" #include <stddef.h> -#include <stdio.h> +#include <stdint.h> #include <stdlib.h> -static int check_msg_length(const struct msg *msg, size_t expected) +static const char *const run_key_id = "id"; +static const char *const run_key_url = "url"; +static const char *const run_key_rev = "rev"; + +int run_request_create(struct jsonrpc_request **request, const struct run *run) { - size_t actual = msg_get_length(msg); + int ret = 0; - if (actual != expected) { - log_err("Invalid number of arguments for a message: %zu\n", actual); - msg_dump(msg); - return -1; - } + ret = jsonrpc_request_create(request, 1, CMD_RUN, NULL); + if (ret < 0) + return ret; + ret = jsonrpc_request_set_param_string(*request, run_key_url, run_get_url(run)); + if (ret < 0) + goto free_request; + ret = jsonrpc_request_set_param_string(*request, run_key_rev, run_get_rev(run)); + if (ret < 0) + goto free_request; - return 0; + return ret; + +free_request: + jsonrpc_request_destroy(*request); + + return ret; } -int msg_run_parse(const struct msg *msg, struct run **run) +int run_request_parse(const struct jsonrpc_request *request, struct run **run) { - int ret = check_msg_length(msg, 3); + int ret = 0; + + const char *url = NULL; + ret = jsonrpc_request_get_param_string(request, run_key_url, &url); + if (ret < 0) + return ret; + const char *rev = NULL; + ret = jsonrpc_request_get_param_string(request, run_key_rev, &rev); if (ret < 0) return ret; - const char **argv = msg_get_strings(msg); - /* We don't know the ID yet. */ - return run_create(run, 0, argv[1], argv[2]); + return run_create(run, 0, url, rev); } -int msg_new_worker_create(struct msg **msg) +int new_worker_request_create(struct jsonrpc_request **request) { - static const char *argv[] = {CMD_NEW_WORKER, NULL}; - return msg_from_argv(msg, argv); + return jsonrpc_notification_create(request, CMD_NEW_WORKER, NULL); } -int msg_start_create(struct msg **msg, const struct run *run) +int new_worker_request_parse(UNUSED const struct jsonrpc_request *request) { - char id[16]; - snprintf(id, sizeof(id), "%d", run_get_id(run)); - - const char *argv[] = {CMD_START, id, run_get_url(run), run_get_rev(run), NULL}; - - return msg_from_argv(msg, argv); + return 0; } -int msg_start_parse(const struct msg *msg, struct run **run) +int start_request_create(struct jsonrpc_request **request, const struct run *run) { int ret = 0; - ret = check_msg_length(msg, 4); + ret = jsonrpc_notification_create(request, CMD_START, NULL); if (ret < 0) return ret; + ret = jsonrpc_request_set_param_int(*request, run_key_id, run_get_id(run)); + if (ret < 0) + goto free_request; + ret = jsonrpc_request_set_param_string(*request, run_key_url, run_get_url(run)); + if (ret < 0) + goto free_request; + ret = jsonrpc_request_set_param_string(*request, run_key_rev, run_get_rev(run)); + if (ret < 0) + goto free_request; + + return ret; - const char **argv = msg_get_strings(msg); +free_request: + jsonrpc_request_destroy(*request); + + return ret; +} - int id = 0; +int start_request_parse(const struct jsonrpc_request *request, struct run **run) +{ + int ret = 0; - ret = string_to_int(argv[1], &id); + int64_t id = 0; + ret = jsonrpc_request_get_param_int(request, run_key_id, &id); + if (ret < 0) + return ret; + const char *url = NULL; + ret = jsonrpc_request_get_param_string(request, run_key_url, &url); + if (ret < 0) + return ret; + const char *rev = NULL; + ret = jsonrpc_request_get_param_string(request, run_key_rev, &rev); if (ret < 0) return ret; - return run_create(run, id, argv[2], argv[3]); + return run_create(run, (int)id, url, rev); } -int msg_finished_create(struct msg **msg, int run_id, const struct proc_output *output) +static const char *const finished_key_run_id = "run_id"; +static const char *const finished_key_ec = "exit_code"; +static const char *const finished_key_data = "output"; + +int finished_request_create(struct jsonrpc_request **request, int run_id, + const struct proc_output *output) { int ret = 0; - char id[16]; - char ec[16]; - - snprintf(id, sizeof(id), "%d", run_id); - snprintf(ec, sizeof(ec), "%d", output->ec); + ret = jsonrpc_notification_create(request, CMD_FINISHED, NULL); + if (ret < 0) + return ret; + ret = jsonrpc_request_set_param_int(*request, finished_key_run_id, run_id); + if (ret < 0) + goto free_request; + ret = jsonrpc_request_set_param_int(*request, finished_key_ec, output->ec); + if (ret < 0) + goto free_request; char *b64data = NULL; - ret = base64_encode(output->data, output->data_size, &b64data); if (ret < 0) - return ret; - - const char *argv[] = {CMD_FINISHED, id, ec, b64data, NULL}; + goto free_request; - ret = msg_from_argv(msg, argv); + ret = jsonrpc_request_set_param_string(*request, finished_key_data, b64data); + free(b64data); if (ret < 0) - goto free_b64data; + goto free_request; -free_b64data: - free(b64data); + return ret; + +free_request: + jsonrpc_request_destroy(*request); return ret; } -int msg_finished_parse(const struct msg *msg, int *run_id, struct proc_output **_output) +int finished_request_parse(const struct jsonrpc_request *request, int *_run_id, + struct proc_output **_output) { int ret = 0; - ret = check_msg_length(msg, 4); - if (ret < 0) - return ret; - - const char **argv = msg_get_strings(msg); - struct proc_output *output = NULL; ret = proc_output_create(&output); if (ret < 0) return ret; - ret = string_to_int(argv[1], run_id); + int64_t run_id = 0; + ret = jsonrpc_request_get_param_int(request, finished_key_run_id, &run_id); if (ret < 0) goto free_output; - ret = string_to_int(argv[2], &output->ec); + + int64_t ec = -1; + ret = jsonrpc_request_get_param_int(request, finished_key_ec, &ec); if (ret < 0) goto free_output; + output->ec = (int)ec; - const char *b64data = argv[3]; + const char *b64data = NULL; + ret = jsonrpc_request_get_param_string(request, finished_key_data, &b64data); + if (ret < 0) + goto free_output; ret = base64_decode(b64data, &output->data, &output->data_size); if (ret < 0) goto free_output; + *_run_id = (int)run_id; *_output = output; return ret; diff --git a/src/protocol.h b/src/protocol.h index 99ec3ee..765e6cf 100644 --- a/src/protocol.h +++ b/src/protocol.h @@ -8,18 +8,20 @@ #ifndef __PROTOCOL_H__ #define __PROTOCOL_H__ -#include "msg.h" +#include "json_rpc.h" #include "process.h" #include "run_queue.h" -int msg_run_parse(const struct msg *, struct run **); +int run_request_create(struct jsonrpc_request **, const struct run *); +int run_request_parse(const struct jsonrpc_request *, struct run **); -int msg_new_worker_create(struct msg **); +int new_worker_request_create(struct jsonrpc_request **); +int new_worker_request_parse(const struct jsonrpc_request *); -int msg_start_create(struct msg **, const struct run *); -int msg_start_parse(const struct msg *, struct run **); +int start_request_create(struct jsonrpc_request **, const struct run *); +int start_request_parse(const struct jsonrpc_request *, struct run **); -int msg_finished_create(struct msg **, int run_id, const struct proc_output *); -int msg_finished_parse(const struct msg *, int *run_id, struct proc_output **); +int finished_request_create(struct jsonrpc_request **, int run_id, const struct proc_output *); +int finished_request_parse(const struct jsonrpc_request *, int *run_id, struct proc_output **); #endif diff --git a/src/server.c b/src/server.c index 8e2fab3..61e3136 100644 --- a/src/server.c +++ b/src/server.c @@ -11,8 +11,8 @@ #include "const.h" #include "event_loop.h" #include "file.h" +#include "json_rpc.h" #include "log.h" -#include "msg.h" #include "net.h" #include "process.h" #include "protocol.h" @@ -169,15 +169,15 @@ static void server_assign_run(struct server *server) struct worker *worker = worker_queue_remove_first(&server->worker_queue); log("Removed worker %d from the queue\n", worker_get_fd(worker)); - struct msg *start_msg = NULL; + struct jsonrpc_request *start_request = NULL; int ret = 0; - ret = msg_start_create(&start_msg, run); + ret = start_request_create(&start_request, run); if (ret < 0) goto exit; - ret = msg_talk(worker_get_fd(worker), start_msg, NULL); - msg_free(start_msg); + ret = jsonrpc_request_send(start_request, worker_get_fd(worker)); + jsonrpc_request_destroy(start_request); if (ret < 0) goto exit; @@ -222,8 +222,8 @@ exit: return NULL; } -static int server_handle_cmd_new_worker(UNUSED const struct msg *request, - UNUSED struct msg **response, void *_ctx) +static int server_handle_cmd_new_worker(UNUSED const struct jsonrpc_request *request, + UNUSED struct jsonrpc_response **response, void *_ctx) { struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx; struct server *server = (struct server *)ctx->arg; @@ -255,7 +255,8 @@ close: return ret; } -static int server_handle_cmd_run(const struct msg *request, struct msg **response, void *_ctx) +static int server_handle_cmd_run(const struct jsonrpc_request *request, + struct jsonrpc_response **response, void *_ctx) { struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx; struct server *server = (struct server *)ctx->arg; @@ -263,11 +264,11 @@ static int server_handle_cmd_run(const struct msg *request, struct msg **respons struct run *run = NULL; - ret = msg_run_parse(request, &run); + ret = run_request_parse(request, &run); if (ret < 0) return ret; - ret = msg_success(response); + ret = jsonrpc_response_create(response, request, NULL); if (ret < 0) goto destroy_run; @@ -278,7 +279,7 @@ static int server_handle_cmd_run(const struct msg *request, struct msg **respons return ret; free_response: - msg_free(*response); + jsonrpc_response_destroy(*response); *response = NULL; destroy_run: @@ -287,8 +288,8 @@ destroy_run: return ret; } -static int server_handle_cmd_finished(const struct msg *request, UNUSED struct msg **response, - void *_ctx) +static int server_handle_cmd_finished(const struct jsonrpc_request *request, + UNUSED struct jsonrpc_response **response, void *_ctx) { struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx; struct server *server = (struct server *)ctx->arg; @@ -297,7 +298,7 @@ static int server_handle_cmd_finished(const struct msg *request, UNUSED struct m int run_id = 0; struct proc_output *output; - ret = msg_finished_parse(request, &run_id, &output); + ret = finished_request_parse(request, &run_id, &output); if (ret < 0) return ret; diff --git a/src/worker.c b/src/worker.c index e45bfba..a6ccc89 100644 --- a/src/worker.c +++ b/src/worker.c @@ -13,7 +13,6 @@ #include "event_loop.h" #include "git.h" #include "log.h" -#include "msg.h" #include "net.h" #include "process.h" #include "protocol.h" @@ -83,14 +82,14 @@ static int worker_set_stopping(UNUSED struct event_loop *loop, UNUSED int fd, UN return 0; } -static int worker_handle_cmd_start(const struct msg *request, UNUSED struct msg **response, - void *_ctx) +static int worker_handle_cmd_start(const struct jsonrpc_request *request, + UNUSED struct jsonrpc_response **response, void *_ctx) { struct cmd_conn_ctx *ctx = (struct cmd_conn_ctx *)_ctx; struct worker *worker = (struct worker *)ctx->arg; int ret = 0; - ret = msg_start_parse(request, &worker->run); + ret = start_request_parse(request, &worker->run); if (ret < 0) return ret; @@ -193,19 +192,26 @@ static int worker_do_run(struct worker *worker) proc_output_dump(result); - struct msg *finished_msg = NULL; + struct jsonrpc_request *finished_request = NULL; - ret = msg_finished_create(&finished_msg, run_get_id(worker->run), result); + ret = finished_request_create(&finished_request, run_get_id(worker->run), result); if (ret < 0) goto free_output; - ret = msg_connect_and_talk(worker->settings->host, worker->settings->port, finished_msg, - NULL); + ret = net_connect(worker->settings->host, worker->settings->port); + if (ret < 0) + goto free_request; + int fd = ret; + + ret = jsonrpc_request_send(finished_request, fd); if (ret < 0) - goto free_finished_msg; + goto close_conn; -free_finished_msg: - msg_free(finished_msg); +close_conn: + net_close(fd); + +free_request: + jsonrpc_request_destroy(finished_request); free_output: proc_output_destroy(result); @@ -224,14 +230,13 @@ static int worker_get_run(struct worker *worker) return ret; fd = ret; - struct msg *new_worker_msg = NULL; - - ret = msg_new_worker_create(&new_worker_msg); + struct jsonrpc_request *new_worker_request = NULL; + ret = new_worker_request_create(&new_worker_request); if (ret < 0) goto close; - ret = msg_send(fd, new_worker_msg); - msg_free(new_worker_msg); + ret = jsonrpc_request_send(new_worker_request, fd); + jsonrpc_request_destroy(new_worker_request); if (ret < 0) goto close; |