From 96ccf79d46adb2d6b49c67e3e6ad59512d67da65 Mon Sep 17 00:00:00 2001 From: Egor Tensin Date: Sat, 27 Mar 2021 22:09:05 +0300 Subject: rename the project to "cgitize" --- cgit/repos/__init__.py | 0 cgit/repos/cgit.py | 146 ------------------------------------------------- cgit/repos/main.py | 132 -------------------------------------------- cgit/repos/repo.py | 112 ------------------------------------- cgit/repos/utils.py | 43 --------------- 5 files changed, 433 deletions(-) delete mode 100644 cgit/repos/__init__.py delete mode 100644 cgit/repos/cgit.py delete mode 100644 cgit/repos/main.py delete mode 100644 cgit/repos/repo.py delete mode 100644 cgit/repos/utils.py (limited to 'cgit/repos') diff --git a/cgit/repos/__init__.py b/cgit/repos/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/cgit/repos/cgit.py b/cgit/repos/cgit.py deleted file mode 100644 index 0f8917a..0000000 --- a/cgit/repos/cgit.py +++ /dev/null @@ -1,146 +0,0 @@ -# Copyright (c) 2018 Egor Tensin -# This file is part of the "cgit repos" project. -# For details, see https://github.com/egor-tensin/cgit-repos. -# Distributed under the MIT License. - -from enum import Enum -import logging -import os -import os.path -import shutil - -import cgit.repos.utils as utils - - -GIT_ENV = os.environ.copy() -GIT_ENV['GIT_SSH_COMMAND'] = 'ssh -oBatchMode=yes -oLogLevel=QUIET -oStrictHostKeyChecking=no -oUserKnownHostsFile=/dev/null' - - -def run(*args, **kwargs): - return utils.run(*args, env=GIT_ENV, **kwargs) - - -def check_output(*args, **kwargs): - return utils.check_output(*args, env=GIT_ENV, **kwargs) - - -class CGit: - def __init__(self, clone_url): - self.clone_url = clone_url - - def get_clone_url(self, repo): - if self.clone_url is None: - return None - return self.clone_url.format(repo_id=repo.repo_id) - - -class CGitRC: - def __init__(self, cgit): - self.cgit = cgit - - def write(self, path, repo): - with open(path, 'w') as fd: - self._write_field(fd, 'clone-url', self._build_clone_url(repo)) - self._write_field(fd, 'owner', repo.owner) - self._write_field(fd, 'desc', repo.desc) - self._write_field(fd, 'homepage', repo.homepage) - - @staticmethod - def _write_field(fd, field, value): - if value is None: - return - fd.write(f'{field}={value}\n') - - def _build_clone_url(self, repo): - clone_urls = [] - if repo.clone_url is not None: - clone_urls.append(repo.clone_url) - cgit_clone_url = self.cgit.get_clone_url(repo) - if cgit_clone_url is not None: - clone_urls.append(cgit_clone_url) - if not clone_urls: - return None - clone_urls = ' '.join(clone_urls) - return clone_urls - - -class Output: - def __init__(self, output_dir, cgit): - self.output_dir = self._make_dir(output_dir) - self.cgitrc = CGitRC(cgit) - - @staticmethod - def _make_dir(rel_path): - abs_path = os.path.abspath(rel_path) - os.makedirs(abs_path, exist_ok=True) - return abs_path - - def get_repo_dir(self, repo): - return os.path.join(self.output_dir, repo.repo_id) - - def get_cgitrc_path(self, repo): - return os.path.join(self.get_repo_dir(repo), 'cgitrc') - - def pull(self, repo): - success = False - verdict = self.judge(repo) - if verdict is RepoVerdict.SHOULD_MIRROR: - success = self.mirror(repo) - elif verdict is RepoVerdict.SHOULD_UPDATE: - success = self.update(repo) - elif verdict is RepoVerdict.CANT_DECIDE: - success = False - else: - raise NotImplementedError(f'Unknown repository verdict: {verdict}') - if success: - self.cgitrc.write(self.get_cgitrc_path(repo), repo) - return success - - def judge(self, repo): - repo_dir = self.get_repo_dir(repo) - if not os.path.isdir(repo_dir): - return RepoVerdict.SHOULD_MIRROR - with utils.chdir(repo_dir): - if not run('git', 'rev-parse', '--is-inside-work-tree', discard_output=True): - logging.warning('Not a repository, so going to mirror: %s', repo_dir) - return RepoVerdict.SHOULD_MIRROR - success, output = check_output('git', 'config', '--get', 'remote.origin.url') - if not success: - # Every repository managed by this script should have the - # 'origin' remote. If it doesn't, it's trash. - return RepoVerdict.SHOULD_MIRROR - if f'{repo.clone_url}\n' != output: - logging.warning("Existing repository '%s' URL doesn't match the specified clone" \ - " URL: %s", repo.repo_id, repo.clone_url) - return RepoVerdict.CANT_DECIDE - # Looks like a legit clone of the specified remote. - return RepoVerdict.SHOULD_UPDATE - - def mirror(self, repo): - logging.info("Mirroring repository '%s' from: %s", repo.repo_id, - repo.clone_url) - repo_dir = self.get_repo_dir(repo) - if os.path.isdir(repo_dir): - try: - shutil.rmtree(repo_dir) - except Exception as e: - logging.exception(e) - return False - return run('git', 'clone', '--mirror', repo.clone_url, repo_dir) - - def update(self, repo): - logging.info("Updating repository '%s'", repo.repo_id) - repo_dir = self.get_repo_dir(repo) - with utils.chdir(repo_dir): - if not run('git', 'remote', 'update', '--prune'): - return False - if run('git', 'rev-parse', '--verify', '--quiet', 'origin/master', discard_output=True): - if not run('git', 'reset', '--soft', 'origin/master'): - return False - return True - - -class RepoVerdict(Enum): - SHOULD_MIRROR = 1 - SHOULD_UPDATE = 2 - CANT_DECIDE = 3 diff --git a/cgit/repos/main.py b/cgit/repos/main.py deleted file mode 100644 index e5e390f..0000000 --- a/cgit/repos/main.py +++ /dev/null @@ -1,132 +0,0 @@ -# Copyright (c) 2018 Egor Tensin -# This file is part of the "cgit repos" project. -# For details, see https://github.com/egor-tensin/cgit-repos. -# Distributed under the MIT License. - -from argparse import ArgumentParser -import configparser -from contextlib import contextmanager -import importlib -import logging -import os.path -import sys - -from cgit.repos.cgit import CGit, Output -from cgit.repos.repo import BitbucketRepo, GithubRepo, Repo -import cgit.repos.utils as utils - - -DEFAULT_OUTPUT_DIR = '/var/tmp/cgit-repos/output' -DEFAULT_CONFIG_PATH = '/etc/cgit-repos/cgit-repos.conf' -DEFAULT_MY_REPOS_PATH = '/etc/cgit-repos/my_repos.py' - - -@contextmanager -def setup_logging(): - logging.basicConfig( - level=logging.DEBUG, - datefmt='%Y-%m-%d %H:%M:%S', - format='%(asctime)s | %(levelname)s | %(message)s') - try: - yield - except Exception as e: - logging.exception(e) - raise - - -def parse_args(argv=None): - if argv is None: - argv = sys.argv[1:] - parser = ArgumentParser() - parser.add_argument('--config', metavar='PATH', - default=DEFAULT_CONFIG_PATH, - help='config file path') - parser.add_argument('--repo', metavar='REPO_ID', - nargs='*', dest='repos', - help='repos to pull') - return parser.parse_args(argv) - - -class Config: - @staticmethod - def read(path): - return Config(path) - - def __init__(self, path): - self.path = os.path.abspath(path) - self.impl = configparser.ConfigParser() - self.impl.read(path) - - def _resolve_relative(self, path): - if os.path.isabs(path): - return path - with utils.chdir(os.path.dirname(self.path)): - path = os.path.abspath(path) - return path - - @property - def output(self): - path = self.impl.get('DEFAULT', 'output', fallback=DEFAULT_OUTPUT_DIR) - return self._resolve_relative(path) - - @property - def clone_url(self): - return self.impl.get('DEFAULT', 'clone_url', fallback=None) - - @property - def default_owner(self): - return self.impl.get('DEFAULT', 'owner', fallback=None) - - @property - def via_ssh(self): - return self.impl.getboolean('DEFAULT', 'ssh', fallback=Repo.DEFAULT_VIA_SSH) - - @property - def github_username(self): - return self.impl.get('GITHUB', 'username', fallback=None) - - @property - def bitbucket_username(self): - return self.impl.get('BITBUCKET', 'username', fallback=None) - - def set_defaults(self): - Repo.DEFAULT_OWNER = self.default_owner - Repo.DEFAULT_VIA_SSH = self.via_ssh - GithubRepo.DEFAULT_USER = self.github_username - BitbucketRepo.DEFAULT_USER = self.bitbucket_username - - @property - def my_repos(self): - path = self.impl.get('DEFAULT', 'my_repos', fallback=DEFAULT_MY_REPOS_PATH) - return self._resolve_relative(path) - - def import_my_repos(self): - sys.path.append(os.path.dirname(self.my_repos)) - module_name = os.path.splitext(os.path.basename(self.my_repos))[0] - module = importlib.import_module(module_name) - return module.MY_REPOS - - -def main(args=None): - with setup_logging(): - args = parse_args(args) - config = Config.read(args.config) - config.set_defaults() - my_repos = config.import_my_repos() - cgit = CGit(config.clone_url) - output = Output(config.output, cgit) - success = True - for repo in my_repos: - if args.repos is None or repo.repo_id in args.repos: - if not output.pull(repo): - success = False - if success: - logging.info('All repositories were updated successfully') - return 0 - else: - logging.warning("Some repositories couldn't be updated!") - return 1 - - -if __name__ == '__main__': - sys.exit(main()) diff --git a/cgit/repos/repo.py b/cgit/repos/repo.py deleted file mode 100644 index 4b3072c..0000000 --- a/cgit/repos/repo.py +++ /dev/null @@ -1,112 +0,0 @@ -# Copyright (c) 2018 Egor Tensin -# This file is part of the "cgit repos" project. -# For details, see https://github.com/egor-tensin/cgit-repos. -# Distributed under the MIT License. - -import abc -import os.path - - -class Repo: - DEFAULT_OWNER = None - DEFAULT_VIA_SSH = True - - @staticmethod - def extract_repo_name(repo_id): - return os.path.basename(repo_id) - - def __init__(self, repo_id, clone_url, owner=None, desc=None, - homepage=None): - self.repo_id = repo_id - self.repo_name = self.extract_repo_name(repo_id) - self.clone_url = clone_url - if owner is None: - owner = Repo.DEFAULT_OWNER - self.owner = owner - if desc is None: - if homepage is not None: - desc = homepage - elif clone_url is not None: - desc = clone_url - else: - desc = self.repo_name - self.desc = desc - self.homepage = homepage - - -class HostedRepo(Repo, abc.ABC): - def __init__(self, repo_id, clone_url=None, owner=None, desc=None, - homepage=None, user=None, via_ssh=None): - user = user or self.get_default_user() - if user is None: - raise RuntimeError(f'neither explicit or default {self.provider_name()} username was provided') - name = Repo.extract_repo_name(repo_id) - if clone_url is None: - if via_ssh is None: - via_ssh = Repo.DEFAULT_VIA_SSH - if via_ssh: - clone_url = self.build_clone_url_ssh(user, name) - else: - clone_url = self.build_clone_url_https(user, name) - if homepage is None: - homepage = self.build_homepage_url(user, name) - super().__init__(repo_id, clone_url, owner=owner, desc=desc, - homepage=homepage) - - @abc.abstractmethod - def provider_name(self): - pass - - @abc.abstractmethod - def get_default_user(self): - pass - - @abc.abstractmethod - def build_clone_url_ssh(self, user, name): - pass - - @abc.abstractmethod - def build_clone_url_https(self, user, name): - pass - - @abc.abstractmethod - def build_homepage_url(self, user, name): - pass - - -class GithubRepo(HostedRepo): - DEFAULT_USER = None - - def provider_name(self): - return 'GitHub' - - def get_default_user(self): - return GithubRepo.DEFAULT_USER - - def build_clone_url_ssh(self, user, name): - return f'ssh://git@github.com/{user}/{name}.git' - - def build_clone_url_https(self, user, name): - return f'https://github.com/{user}/{name}.git' - - def build_homepage_url(self, user, name): - return f'https://github.com/{user}/{name}' - - -class BitbucketRepo(HostedRepo): - DEFAULT_USER = None - - def provider_name(self): - return 'Bitbucket' - - def get_default_user(self): - return BitbucketRepo.DEFAULT_USER - - def build_clone_url_ssh(self, user, name): - return f'ssh://git@bitbucket.org/{user}/{name}.git' - - def build_clone_url_https(self, user, name): - return f'https://bitbucket.org/{user}/{name}.git' - - def build_homepage_url(self, user, name): - return f'https://bitbucket.org/{user}/{name.lower()}' diff --git a/cgit/repos/utils.py b/cgit/repos/utils.py deleted file mode 100644 index 84337e8..0000000 --- a/cgit/repos/utils.py +++ /dev/null @@ -1,43 +0,0 @@ -# Copyright (c) 2018 Egor Tensin -# This file is part of the "cgit repos" project. -# For details, see https://github.com/egor-tensin/cgit-repos. -# Distributed under the MIT License. - -import contextlib -import logging -import os -import subprocess - - -def check_output(*args, stdout=subprocess.PIPE, **kwargs): - try: - result = subprocess.run(args, stdout=stdout, stderr=subprocess.STDOUT, - encoding='utf-8', check=True, **kwargs) - if stdout != subprocess.DEVNULL: - if result.stdout is None: - logging.debug('%s', args) - else: - logging.debug('%s\n%s', args, result.stdout) - return result.returncode == 0, result.stdout - except subprocess.CalledProcessError as e: - if stdout != subprocess.DEVNULL: - logging.error('%s\n%s', e, e.output) - return e.returncode == 0, e.output - - -def run(*args, discard_output=False, **kwargs): - if discard_output: - success, _ = check_output(*args, stdout=subprocess.DEVNULL, **kwargs) - else: - success, _ = check_output(*args, **kwargs) - return success - - -@contextlib.contextmanager -def chdir(new_cwd): - old_cwd = os.getcwd() - os.chdir(new_cwd) - try: - yield - finally: - os.chdir(old_cwd) -- cgit v1.2.3