from enum import Enum import logging import os import os.path import shutil import socket from pull.utils import chdir, check_output, run _ENV = os.environ.copy() _ENV['GIT_SSH_COMMAND'] = 'ssh -oBatchMode=yes -oLogLevel=QUIET -oStrictHostKeyChecking=no -oUserKnownHostsFile=/dev/null' def _run(*args, **kwargs): return run(*args, env=_ENV, **kwargs) def _check_output(*args, **kwargs): return check_output(*args, env=_ENV, **kwargs) class CGit: def __init__(self, clone_url): self.clone_url = clone_url def get_clone_url(self, repo): if self.clone_url is None: return None return self.clone_url.format(repo_id=repo.repo_id) class CGitRC: def __init__(self, cgit): self.cgit = cgit def write(self, path, repo): with open(path, 'w') as fd: self._write_field(fd, 'clone-url', self._build_clone_url(repo)) self._write_field(fd, 'owner', repo.owner) self._write_field(fd, 'desc', repo.desc) self._write_field(fd, 'homepage', repo.homepage) @staticmethod def _write_field(fd, field, value): if value is None: return fd.write(f'{field}={value}\n') def _build_clone_url(self, repo): clone_urls = [] if repo.clone_url is not None: clone_urls.append(repo.clone_url) cgit_clone_url = self.cgit.get_clone_url(repo) if cgit_clone_url is not None: clone_urls.append(cgit_clone_url) if not clone_urls: return None clone_urls = ' '.join(clone_urls) return clone_urls class Output: def __init__(self, output_dir, cgit): self.output_dir = self._make_dir(output_dir) self.cgitrc = CGitRC(cgit) @staticmethod def _make_dir(rel_path): abs_path = os.path.abspath(rel_path) os.makedirs(abs_path, exist_ok=True) return abs_path def get_repo_dir(self, repo): return os.path.join(self.output_dir, repo.repo_id) def get_cgitrc_path(self, repo): return os.path.join(self.get_repo_dir(repo), 'cgitrc') def pull(self, repo): success = False verdict = self.judge(repo) if verdict is RepoVerdict.SHOULD_MIRROR: success = self.mirror(repo) elif verdict is RepoVerdict.SHOULD_UPDATE: success = self.update(repo) elif verdict is RepoVerdict.CANT_DECIDE: success = False else: raise NotImplementedError(f'Unknown repository verdict: {verdict}') if success: self.cgitrc.write(self.get_cgitrc_path(repo), repo) return success def judge(self, repo): repo_dir = self.get_repo_dir(repo) if not os.path.isdir(repo_dir): return RepoVerdict.SHOULD_MIRROR with chdir(repo_dir): if not _run('git', 'rev-parse', '--is-inside-work-tree', discard_output=True): logging.warning('Not a repository, so going to mirror: %s', repo_dir) return RepoVerdict.SHOULD_MIRROR success, output = _check_output('git', 'config', '--get', 'remote.origin.url') if not success: # Every repository managed by this script should have the # 'origin' remote. If it doesn't, it's trash. return RepoVerdict.SHOULD_MIRROR if f'{repo.clone_url}\n' != output: logging.warning("Existing repository '%s' URL doesn't match the specified clone" \ " URL: %s", repo.repo_id, repo.clone_url) return RepoVerdict.CANT_DECIDE # Looks like a legit clone of the specified remote. return RepoVerdict.SHOULD_UPDATE def mirror(self, repo): logging.info("Mirroring repository '%s' from: %s", repo.repo_id, repo.clone_url) repo_dir = self.get_repo_dir(repo) if os.path.isdir(repo_dir): try: shutil.rmtree(repo_dir) except Exception as e: logging.exception(e) return False return _run('git', 'clone', '--mirror', repo.clone_url, repo_dir) def update(self, repo): logging.info("Updating repository '%s'", repo.repo_id) repo_dir = self.get_repo_dir(repo) with chdir(repo_dir): if not _run('git', 'remote', 'update', '--prune'): return False if _run('git', 'rev-parse', '--verify', '--quiet', 'origin/master', discard_output=True): if not _run('git', 'reset', '--soft', 'origin/master'): return False return True class RepoVerdict(Enum): SHOULD_MIRROR = 1 SHOULD_UPDATE = 2 CANT_DECIDE = 3