/*
* Copyright (c) 2022 Egor Tensin <egor@tensin.name>
* This file is part of the "cimple" project.
* For details, see https://github.com/egor-tensin/cimple.
* Distributed under the MIT License.
*/
#include "storage_sqlite.h"
#include "log.h"
#include "process.h"
#include "run_queue.h"
#include "sql/sqlite_sql.h"
#include "sqlite.h"
#include "storage.h"
#include <sqlite3.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
struct storage_sqlite_settings {
char *path;
};
int storage_sqlite_settings_create(struct storage_settings *settings, const char *path)
{
struct storage_sqlite_settings *sqlite = malloc(sizeof(struct storage_sqlite_settings));
if (!sqlite) {
log_errno("malloc");
return -1;
}
sqlite->path = strdup(path);
if (!sqlite->path) {
log_errno("strdup");
goto free;
}
settings->type = STORAGE_TYPE_SQLITE;
settings->sqlite = sqlite;
return 0;
free:
free(sqlite);
return -1;
}
void storage_sqlite_settings_destroy(const struct storage_settings *settings)
{
free(settings->sqlite->path);
free(settings->sqlite);
}
struct prepared_stmt {
pthread_mutex_t mtx;
sqlite3_stmt *impl;
};
static int prepared_stmt_init(struct prepared_stmt *stmt, sqlite3 *db, const char *sql)
{
int ret = 0;
ret = pthread_mutex_init(&stmt->mtx, NULL);
if (ret) {
pthread_errno(ret, "pthread_mutex_init");
return ret;
}
ret = sqlite_prepare(db, sql, &stmt->impl);
if (ret < 0)
goto destroy_mtx;
return ret;
destroy_mtx:
pthread_errno_if(pthread_mutex_destroy(&stmt->mtx), "pthread_mutex_destroy");
return ret;
}
static void prepared_stmt_destroy(struct prepared_stmt *stmt)
{
pthread_errno_if(pthread_mutex_destroy(&stmt->mtx), "pthread_mutex_destroy");
sqlite_finalize(stmt->impl);
}
static int prepared_stmt_lock(struct prepared_stmt *stmt)
{
int ret = pthread_mutex_lock(&stmt->mtx);
if (ret) {
pthread_errno(ret, "pthread_mutex_unlock");
return ret;
}
return ret;
}
static void prepared_stmt_unlock(struct prepared_stmt *stmt)
{
pthread_errno_if(pthread_mutex_unlock(&stmt->mtx), "pthread_mutex_unlock");
}
struct storage_sqlite {
sqlite3 *db;
struct prepared_stmt stmt_repo_find;
struct prepared_stmt stmt_repo_insert;
struct prepared_stmt stmt_run_insert;
struct prepared_stmt stmt_run_finished;
struct prepared_stmt stmt_get_runs;
struct prepared_stmt stmt_get_run_queue;
};
static int storage_sqlite_upgrade_to(struct storage_sqlite *storage, size_t version)
{
static const char *const fmt = "%s PRAGMA user_version = %zu;";
const char *script = sqlite_schemas[version];
int ret = 0;
ret = snprintf(NULL, 0, fmt, script, version + 1);
size_t nb = (size_t)ret + 1;
ret = 0;
char *full_script = malloc(nb);
if (!full_script) {
log_errno("malloc");
return -1;
}
snprintf(full_script, nb, fmt, script, version + 1);
ret = sqlite_exec_as_transaction(storage->db, full_script);
goto free;
free:
free(full_script);
return ret;
}
static int storage_sqlite_upgrade_from_to(struct storage_sqlite *storage, size_t from, size_t to)
{
int ret = 0;
for (size_t i = from; i < to; ++i) {
log("Upgrading SQLite database from version %zu to version %zu\n", i, i + 1);
ret = storage_sqlite_upgrade_to(storage, i);
if (ret < 0) {
log_err("Failed to upgrade to version %zu\n", i + 1);
return ret;
}
}
return ret;
}
static int storage_sqlite_upgrade(struct storage_sqlite *storage)
{
unsigned int current_version = 0;
int ret = 0;
ret = sqlite_get_user_version(storage->db, ¤t_version);
if (ret < 0)
return ret;
log("SQLite database version: %u\n", current_version);
size_t newest_version = sizeof(sqlite_schemas) / sizeof(sqlite_schemas[0]);
log("Newest database version: %zu\n", newest_version);
if (current_version > newest_version) {
log_err("Unknown database version: %u\n", current_version);
return -1;
}
if (current_version == newest_version) {
log("SQLite database already at the newest version\n");
return 0;
}
return storage_sqlite_upgrade_from_to(storage, current_version, newest_version);
}
static int storage_sqlite_setup(struct storage_sqlite *storage)
{
int ret = 0;
ret = sqlite_set_foreign_keys(storage->db);
if (ret < 0)
return ret;
ret = storage_sqlite_upgrade(storage);
if (ret < 0)
return ret;
return ret;
}
static int storage_sqlite_prepare_statements(struct storage_sqlite *storage)
{
static const char *const fmt_repo_find = "SELECT id FROM cimple_repos WHERE url = ?;";
static const char *const fmt_repo_insert =
"INSERT INTO cimple_repos(url) VALUES (?) ON CONFLICT(url) DO NOTHING;";
static const char *const fmt_run_insert =
"INSERT INTO cimple_runs(status, exit_code, output, repo_id, repo_rev) VALUES (?, -1, x'', ?, ?) RETURNING id;";
static const char *const fmt_run_finished =
"UPDATE cimple_runs SET status = ?, exit_code = ?, output = ? WHERE id = ?;";
static const char *const fmt_get_runs =
"SELECT id, status, exit_code, repo_url, repo_rev FROM cimple_runs_view ORDER BY id DESC";
static const char *const fmt_get_run_queue =
"SELECT id, status, exit_code, repo_url, repo_rev FROM cimple_runs_view WHERE status = ? ORDER BY id;";
int ret = 0;
ret = prepared_stmt_init(&storage->stmt_repo_find, storage->db, fmt_repo_find);
if (ret < 0)
return ret;
ret = prepared_stmt_init(&storage->stmt_repo_insert, storage->db, fmt_repo_insert);
if (ret < 0)
goto finalize_repo_find;
ret = prepared_stmt_init(&storage->stmt_run_insert, storage->db, fmt_run_insert);
if (ret < 0)
goto finalize_repo_insert;
ret = prepared_stmt_init(&storage->stmt_run_finished, storage->db, fmt_run_finished);
if (ret < 0)
goto finalize_run_insert;
ret = prepared_stmt_init(&storage->stmt_get_runs, storage->db, fmt_get_runs);
if (ret < 0)
goto finalize_run_finished;
ret = prepared_stmt_init(&storage->stmt_get_run_queue, storage->db, fmt_get_run_queue);
if (ret < 0)
goto finalize_get_runs;
return ret;
finalize_get_runs:
prepared_stmt_destroy(&storage->stmt_get_runs);
finalize_run_finished:
prepared_stmt_destroy(&storage->stmt_run_finished);
finalize_run_insert:
prepared_stmt_destroy(&storage->stmt_run_insert);
finalize_repo_insert:
prepared_stmt_destroy(&storage->stmt_repo_insert);
finalize_repo_find:
prepared_stmt_destroy(&storage->stmt_repo_find);
return ret;
}
static void storage_sqlite_finalize_statements(struct storage_sqlite *storage)
{
prepared_stmt_destroy(&storage->stmt_get_run_queue);
prepared_stmt_destroy(&storage->stmt_get_runs);
prepared_stmt_destroy(&storage->stmt_run_finished);
prepared_stmt_destroy(&storage->stmt_run_insert);
prepared_stmt_destroy(&storage->stmt_repo_insert);
prepared_stmt_destroy(&storage->stmt_repo_find);
}
int storage_sqlite_create(struct storage *storage, const struct storage_settings *settings)
{
int ret = 0;
log("Using SQLite database at %s\n", settings->sqlite->path);
struct storage_sqlite *sqlite = malloc(sizeof(struct storage_sqlite));
if (!sqlite) {
log_errno("malloc");
return -1;
}
ret = sqlite_init();
if (ret < 0)
goto free;
ret = sqlite_open_rw(settings->sqlite->path, &sqlite->db);
if (ret < 0)
goto destroy;
ret = storage_sqlite_setup(sqlite);
if (ret < 0)
goto close;
ret = storage_sqlite_prepare_statements(sqlite);
if (ret < 0)
goto close;
storage->sqlite = sqlite;
return ret;
close:
sqlite_close(storage->sqlite->db);
destroy:
sqlite_destroy();
free:
free(sqlite);
return ret;
}
void storage_sqlite_destroy(struct storage *storage)
{
storage_sqlite_finalize_statements(storage->sqlite);
sqlite_close(storage->sqlite->db);
sqlite_destroy();
free(storage->sqlite);
}
static int storage_sqlite_find_repo(struct storage_sqlite *storage, const char *url)
{
struct prepared_stmt *stmt = &storage->stmt_repo_find;
int ret = 0;
ret = prepared_stmt_lock(stmt);
if (ret < 0)
return ret;
ret = sqlite_bind_text(stmt->impl, 1, url);
if (ret < 0)
goto reset;
ret = sqlite_step(stmt->impl);
if (ret < 0)
goto reset;
if (!ret)
goto reset;
ret = sqlite_column_int(stmt->impl, 0);
goto reset;
reset:
sqlite_reset(stmt->impl);
prepared_stmt_unlock(stmt);
return ret;
}
static int storage_sqlite_insert_repo(struct storage_sqlite *storage, const char *url)
{
struct prepared_stmt *stmt = &storage->stmt_repo_insert;
int ret = 0;
ret = prepared_stmt_lock(stmt);
if (ret < 0)
return ret;
ret = sqlite_bind_text(stmt->impl, 1, url);
if (ret < 0)
goto reset;
ret = sqlite_step(stmt->impl);
if (ret < 0)
goto reset;
reset:
sqlite_reset(stmt->impl);
prepared_stmt_unlock(stmt);
if (ret < 0)
return ret;
return storage_sqlite_find_repo(storage, url);
}
static int storage_sqlite_insert_run(struct storage_sqlite *storage, int repo_id, const char *rev)
{
struct prepared_stmt *stmt = &storage->stmt_run_insert;
int ret = 0;
ret = prepared_stmt_lock(stmt);
if (ret < 0)
return ret;
ret = sqlite_bind_int(stmt->impl, 1, RUN_STATUS_CREATED);
if (ret < 0)
goto reset;
ret = sqlite_bind_int(stmt->impl, 2, repo_id);
if (ret < 0)
goto reset;
ret = sqlite_bind_text(stmt->impl, 3, rev);
if (ret < 0)
goto reset;
ret = sqlite_step(stmt->impl);
if (ret < 0)
goto reset;
if (!ret) {
ret = -1;
log_err("Failed to insert a run\n");
goto reset;
}
ret = sqlite_column_int(stmt->impl, 0);
goto reset;
reset:
sqlite_reset(stmt->impl);
prepared_stmt_unlock(stmt);
return ret;
}
int storage_sqlite_run_create(struct storage *storage, const char *repo_url, const char *rev)
{
int ret = 0;
ret = storage_sqlite_insert_repo(storage->sqlite, repo_url);
if (ret < 0)
return ret;
ret = storage_sqlite_insert_run(storage->sqlite, ret, rev);
if (ret < 0)
return ret;
return ret;
}
int storage_sqlite_run_finished(struct storage *storage, int run_id,
const struct proc_output *output)
{
struct prepared_stmt *stmt = &storage->sqlite->stmt_run_finished;
int ret = 0;
ret = prepared_stmt_lock(stmt);
if (ret < 0)
return ret;
ret = sqlite_bind_int(stmt->impl, 1, RUN_STATUS_FINISHED);
if (ret < 0)
goto reset;
ret = sqlite_bind_int(stmt->impl, 2, output->ec);
if (ret < 0)
goto reset;
ret = sqlite_bind_blob(stmt->impl, 3, output->data, output->data_size);
if (ret < 0)
goto reset;
ret = sqlite_bind_int(stmt->impl, 4, run_id);
if (ret < 0)
goto reset;
ret = sqlite_step(stmt->impl);
if (ret < 0)
goto reset;
reset:
sqlite_reset(stmt->impl);
prepared_stmt_unlock(stmt);
return ret;
}
static int storage_sqlite_row_to_run(struct sqlite3_stmt *stmt, struct run **run)
{
int ret = 0;
int id = sqlite_column_int(stmt, 0);
int status = sqlite_column_int(stmt, 1);
int exit_code = sqlite_column_int(stmt, 2);
char *url = NULL;
ret = sqlite_column_text(stmt, 3, &url);
if (ret < 0)
return ret;
char *rev = NULL;
ret = sqlite_column_text(stmt, 4, &rev);
if (ret < 0)
goto free_url;
ret = run_new(run, id, url, rev, status, exit_code);
if (ret < 0)
goto free_rev;
log("Adding a run %d for repository %s to the queue\n", id, url);
free_rev:
free(rev);
free_url:
free(url);
return ret;
}
static int storage_sqlite_rows_to_runs(struct sqlite3_stmt *stmt, struct run_queue *queue)
{
int ret = 0;
run_queue_create(queue);
while (1) {
ret = sqlite_step(stmt);
if (!ret)
break;
if (ret < 0)
goto run_queue_destroy;
struct run *run = NULL;
ret = storage_sqlite_row_to_run(stmt, &run);
if (ret < 0)
goto run_queue_destroy;
run_queue_add_last(queue, run);
}
return ret;
run_queue_destroy:
run_queue_destroy(queue);
return ret;
}
int storage_sqlite_get_runs(struct storage *storage, struct run_queue *queue)
{
struct prepared_stmt *stmt = &storage->sqlite->stmt_get_runs;
int ret = 0;
ret = prepared_stmt_lock(stmt);
if (ret < 0)
return ret;
ret = storage_sqlite_rows_to_runs(stmt->impl, queue);
if (ret < 0)
goto reset;
reset:
sqlite_reset(stmt->impl);
prepared_stmt_unlock(stmt);
return ret;
}
int storage_sqlite_get_run_queue(struct storage *storage, struct run_queue *queue)
{
struct prepared_stmt *stmt = &storage->sqlite->stmt_get_run_queue;
int ret = 0;
ret = prepared_stmt_lock(stmt);
if (ret < 0)
return ret;
ret = sqlite_bind_int(stmt->impl, 1, RUN_STATUS_CREATED);
if (ret < 0)
goto reset;
ret = storage_sqlite_rows_to_runs(stmt->impl, queue);
if (ret < 0)
goto reset;
reset:
sqlite_reset(stmt->impl);
prepared_stmt_unlock(stmt);
return ret;
}