diff options
author | Egor Tensin <Egor.Tensin@gmail.com> | 2020-03-01 11:17:29 +0300 |
---|---|---|
committer | Egor Tensin <Egor.Tensin@gmail.com> | 2020-03-01 11:24:09 +0300 |
commit | 0352e233955bc5d04344a4ae3ef04436d8299ae1 (patch) | |
tree | 7b99b9bbf7b6390cbea22966ba257dfd09f1fea9 /cgit/repos | |
parent | Travis: lint the config (diff) | |
download | cgitize-0352e233955bc5d04344a4ae3ef04436d8299ae1.tar.gz cgitize-0352e233955bc5d04344a4ae3ef04436d8299ae1.zip |
rename the modules
Diffstat (limited to 'cgit/repos')
-rw-r--r-- | cgit/repos/__init__.py | 0 | ||||
-rw-r--r-- | cgit/repos/cgit.py | 146 | ||||
-rw-r--r-- | cgit/repos/main.py | 114 | ||||
-rw-r--r-- | cgit/repos/repo.py | 98 | ||||
-rw-r--r-- | cgit/repos/utils.py | 43 |
5 files changed, 401 insertions, 0 deletions
diff --git a/cgit/repos/__init__.py b/cgit/repos/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/cgit/repos/__init__.py diff --git a/cgit/repos/cgit.py b/cgit/repos/cgit.py new file mode 100644 index 0000000..bb010d3 --- /dev/null +++ b/cgit/repos/cgit.py @@ -0,0 +1,146 @@ +# Copyright (c) 2018 Egor Tensin <Egor.Tensin@gmail.com> +# This file is part of the "cgit repos" project. +# For details, see https://github.com/egor-tensin/cgit-repos. +# Distributed under the MIT License. + +from enum import Enum +import logging +import os +import os.path +import shutil + +from cgit.repos.utils import chdir, check_output, run + + +_ENV = os.environ.copy() +_ENV['GIT_SSH_COMMAND'] = 'ssh -oBatchMode=yes -oLogLevel=QUIET -oStrictHostKeyChecking=no -oUserKnownHostsFile=/dev/null' + + +def _run(*args, **kwargs): + return run(*args, env=_ENV, **kwargs) + + +def _check_output(*args, **kwargs): + return check_output(*args, env=_ENV, **kwargs) + + +class CGit: + def __init__(self, clone_url): + self.clone_url = clone_url + + def get_clone_url(self, repo): + if self.clone_url is None: + return None + return self.clone_url.format(repo_id=repo.repo_id) + + +class CGitRC: + def __init__(self, cgit): + self.cgit = cgit + + def write(self, path, repo): + with open(path, 'w') as fd: + self._write_field(fd, 'clone-url', self._build_clone_url(repo)) + self._write_field(fd, 'owner', repo.owner) + self._write_field(fd, 'desc', repo.desc) + self._write_field(fd, 'homepage', repo.homepage) + + @staticmethod + def _write_field(fd, field, value): + if value is None: + return + fd.write(f'{field}={value}\n') + + def _build_clone_url(self, repo): + clone_urls = [] + if repo.clone_url is not None: + clone_urls.append(repo.clone_url) + cgit_clone_url = self.cgit.get_clone_url(repo) + if cgit_clone_url is not None: + clone_urls.append(cgit_clone_url) + if not clone_urls: + return None + clone_urls = ' '.join(clone_urls) + return clone_urls + + +class Output: + def __init__(self, output_dir, cgit): + self.output_dir = self._make_dir(output_dir) + self.cgitrc = CGitRC(cgit) + + @staticmethod + def _make_dir(rel_path): + abs_path = os.path.abspath(rel_path) + os.makedirs(abs_path, exist_ok=True) + return abs_path + + def get_repo_dir(self, repo): + return os.path.join(self.output_dir, repo.repo_id) + + def get_cgitrc_path(self, repo): + return os.path.join(self.get_repo_dir(repo), 'cgitrc') + + def pull(self, repo): + success = False + verdict = self.judge(repo) + if verdict is RepoVerdict.SHOULD_MIRROR: + success = self.mirror(repo) + elif verdict is RepoVerdict.SHOULD_UPDATE: + success = self.update(repo) + elif verdict is RepoVerdict.CANT_DECIDE: + success = False + else: + raise NotImplementedError(f'Unknown repository verdict: {verdict}') + if success: + self.cgitrc.write(self.get_cgitrc_path(repo), repo) + return success + + def judge(self, repo): + repo_dir = self.get_repo_dir(repo) + if not os.path.isdir(repo_dir): + return RepoVerdict.SHOULD_MIRROR + with chdir(repo_dir): + if not _run('git', 'rev-parse', '--is-inside-work-tree', discard_output=True): + logging.warning('Not a repository, so going to mirror: %s', repo_dir) + return RepoVerdict.SHOULD_MIRROR + success, output = _check_output('git', 'config', '--get', 'remote.origin.url') + if not success: + # Every repository managed by this script should have the + # 'origin' remote. If it doesn't, it's trash. + return RepoVerdict.SHOULD_MIRROR + if f'{repo.clone_url}\n' != output: + logging.warning("Existing repository '%s' URL doesn't match the specified clone" \ + " URL: %s", repo.repo_id, repo.clone_url) + return RepoVerdict.CANT_DECIDE + # Looks like a legit clone of the specified remote. + return RepoVerdict.SHOULD_UPDATE + + def mirror(self, repo): + logging.info("Mirroring repository '%s' from: %s", repo.repo_id, + repo.clone_url) + repo_dir = self.get_repo_dir(repo) + if os.path.isdir(repo_dir): + try: + shutil.rmtree(repo_dir) + except Exception as e: + logging.exception(e) + return False + return _run('git', 'clone', '--mirror', repo.clone_url, repo_dir) + + def update(self, repo): + logging.info("Updating repository '%s'", repo.repo_id) + repo_dir = self.get_repo_dir(repo) + with chdir(repo_dir): + if not _run('git', 'remote', 'update', '--prune'): + return False + if _run('git', 'rev-parse', '--verify', '--quiet', 'origin/master', discard_output=True): + if not _run('git', 'reset', '--soft', 'origin/master'): + return False + return True + + +class RepoVerdict(Enum): + SHOULD_MIRROR = 1 + SHOULD_UPDATE = 2 + CANT_DECIDE = 3 diff --git a/cgit/repos/main.py b/cgit/repos/main.py new file mode 100644 index 0000000..8132208 --- /dev/null +++ b/cgit/repos/main.py @@ -0,0 +1,114 @@ +# Copyright (c) 2018 Egor Tensin <Egor.Tensin@gmail.com> +# This file is part of the "cgit repos" project. +# For details, see https://github.com/egor-tensin/cgit-repos. +# Distributed under the MIT License. + +from argparse import ArgumentParser +import configparser +import importlib +import logging +import os.path +import sys + +from cgit.repos.cgit import CGit, Output +from cgit.repos.repo import BitbucketRepo, GithubRepo, Repo + + +DEFAULT_OUTPUT_DIR = '/var/tmp/cgit-repos/output' +DEFAULT_CONFIG_PATH = '/etc/cgit-repos/cgit-repos.conf' +DEFAULT_MY_REPOS_PATH = '/etc/cgit-repos/my_repos.py' + + +def set_up_logging(): + logging.basicConfig( + level=logging.DEBUG, + datefmt='%Y-%m-%d %H:%M:%S', + format='%(asctime)s | %(levelname)s | %(message)s') + + +def parse_args(argv=None): + if argv is None: + argv = sys.argv[1:] + parser = ArgumentParser() + parser.add_argument('--config', metavar='PATH', + default=DEFAULT_CONFIG_PATH, + help='config file path') + parser.add_argument('--repo', metavar='REPO_ID', + nargs='*', dest='repos', + help='repos to pull') + return parser.parse_args(argv) + + +class Config: + @staticmethod + def read(path): + config = configparser.ConfigParser() + config.read(path) + return Config(config) + + def __init__(self, impl): + self.impl = impl + + @property + def output(self): + return self.impl.get('DEFAULT', 'output', fallback=DEFAULT_OUTPUT_DIR) + + @property + def clone_url(self): + return self.impl.get('DEFAULT', 'clone_url', fallback=None) + + @property + def default_owner(self): + return self.impl.get('DEFAULT', 'owner', fallback=None) + + @property + def github_username(self): + return self.impl.get('GITHUB', 'username', fallback=None) + + @property + def bitbucket_username(self): + return self.impl.get('BITBUCKET', 'username', fallback=None) + + def set_defaults(self): + Repo.DEFAULT_OWNER = self.default_owner + GithubRepo.DEFAULT_USER = self.github_username + BitbucketRepo.DEFAULT_USER = self.bitbucket_username + + @property + def my_repos(self): + return self.impl.get('DEFAULT', 'my_repos', fallback=DEFAULT_MY_REPOS_PATH) + + def import_my_repos(self): + sys.path.append(os.path.dirname(self.my_repos)) + module_name = os.path.splitext(os.path.basename(self.my_repos))[0] + module = importlib.import_module(module_name) + return module.MY_REPOS + + +def main(args=None): + set_up_logging() + try: + args = parse_args(args) + config = Config.read(args.config) + config.set_defaults() + my_repos = config.import_my_repos() + cgit = CGit(config.clone_url) + output = Output(config.output, cgit) + success = True + for repo in my_repos: + if args.repos is None or repo.repo_id in args.repos: + if not output.pull(repo): + success = False + if success: + logging.info('All repositories were updated successfully') + return 0 + else: + logging.warning("Some repositories couldn't be updated!") + return 1 + except Exception as e: + logging.exception(e) + raise + + +if __name__ == '__main__': + sys.exit(main()) diff --git a/cgit/repos/repo.py b/cgit/repos/repo.py new file mode 100644 index 0000000..a128dbf --- /dev/null +++ b/cgit/repos/repo.py @@ -0,0 +1,98 @@ +# Copyright (c) 2018 Egor Tensin <Egor.Tensin@gmail.com> +# This file is part of the "cgit repos" project. +# For details, see https://github.com/egor-tensin/cgit-repos. +# Distributed under the MIT License. + +import os.path + + +class Repo: + DEFAULT_OWNER = None + + @staticmethod + def extract_repo_name(repo_id): + return os.path.basename(repo_id) + + def __init__(self, repo_id, clone_url, owner=None, desc=None, + homepage=None): + self.repo_id = repo_id + self.repo_name = self.extract_repo_name(repo_id) + self.clone_url = clone_url + if owner is None: + owner = Repo.DEFAULT_OWNER + self.owner = owner + if desc is None: + if homepage is not None: + desc = homepage + elif clone_url is not None: + desc = clone_url + else: + desc = self.repo_name + self.desc = desc + self.homepage = homepage + + +class GithubRepo(Repo): + DEFAULT_USER = None + + def __init__(self, repo_id, clone_url=None, owner=None, desc=None, + homepage=None, user=DEFAULT_USER, via_ssh=True): + if user is None: + if GithubRepo.DEFAULT_USER is None: + raise RuntimeError('neither explicit or default GitHub username was provided') + user = GithubRepo.DEFAULT_USER + name = Repo.extract_repo_name(repo_id) + if clone_url is None: + if via_ssh: + clone_url = self.build_clone_url_ssh(user, name) + else: + clone_url = self.build_clone_url_https(user, name) + if homepage is None: + homepage = self.build_homepage_url(user, name) + super().__init__(repo_id, clone_url, owner=owner, desc=desc, + homepage=homepage) + + @staticmethod + def build_clone_url_ssh(user, name): + return f'ssh://git@github.com/{user}/{name}.git' + + @staticmethod + def build_clone_url_https(user, name): + return f'https://github.com/{user}/{name}.git' + + @staticmethod + def build_homepage_url(user, name): + return f'https://github.com/{user}/{name}' + + +class BitbucketRepo(Repo): + DEFAULT_USER = None + + def __init__(self, repo_id, clone_url=None, owner=None, desc=None, + homepage=None, user=DEFAULT_USER, via_ssh=True): + if user is None: + if BitbucketRepo.DEFAULT_USER is None: + raise RuntimeError('neither explicit or default Bitbucket username was provided') + user = BitbucketRepo.DEFAULT_USER + name = Repo.extract_repo_name(repo_id) + if clone_url is None: + if via_ssh: + clone_url = self.build_clone_url_ssh(user, name) + else: + clone_url = self.build_clone_url_https(user, name) + if homepage is None: + homepage = self.build_homepage_url(user, name) + super().__init__(repo_id, clone_url, owner=owner, desc=desc, + homepage=homepage) + + @staticmethod + def build_clone_url_ssh(user, name): + return f'ssh://git@bitbucket.org/{user}/{name}.git' + + @staticmethod + def build_clone_url_https(user, name): + return f'https://bitbucket.org/{user}/{name}.git' + + @staticmethod + def build_homepage_url(user, name): + return f'https://bitbucket.org/{user}/{name.lower()}' diff --git a/cgit/repos/utils.py b/cgit/repos/utils.py new file mode 100644 index 0000000..84337e8 --- /dev/null +++ b/cgit/repos/utils.py @@ -0,0 +1,43 @@ +# Copyright (c) 2018 Egor Tensin <Egor.Tensin@gmail.com> +# This file is part of the "cgit repos" project. +# For details, see https://github.com/egor-tensin/cgit-repos. +# Distributed under the MIT License. + +import contextlib +import logging +import os +import subprocess + + +def check_output(*args, stdout=subprocess.PIPE, **kwargs): + try: + result = subprocess.run(args, stdout=stdout, stderr=subprocess.STDOUT, + encoding='utf-8', check=True, **kwargs) + if stdout != subprocess.DEVNULL: + if result.stdout is None: + logging.debug('%s', args) + else: + logging.debug('%s\n%s', args, result.stdout) + return result.returncode == 0, result.stdout + except subprocess.CalledProcessError as e: + if stdout != subprocess.DEVNULL: + logging.error('%s\n%s', e, e.output) + return e.returncode == 0, e.output + + +def run(*args, discard_output=False, **kwargs): + if discard_output: + success, _ = check_output(*args, stdout=subprocess.DEVNULL, **kwargs) + else: + success, _ = check_output(*args, **kwargs) + return success + + +@contextlib.contextmanager +def chdir(new_cwd): + old_cwd = os.getcwd() + os.chdir(new_cwd) + try: + yield + finally: + os.chdir(old_cwd) |