from argparse import ArgumentParser import contextlib from enum import Enum import logging import os import os.path import shutil import socket import sys import subprocess from pull.registry import MY_REPOS env = os.environ.copy() env['GIT_SSH_COMMAND'] = 'ssh -oStrictHostKeyChecking=no -oBatchMode=yes' DEFAULT_OUTPUT_DIR = 'output' DEFAULT_CGIT_CLONE_USER = 'egor' DEFAULT_CGIT_CLONE_HOST = 'tensin-ext1.home' DEFAULT_CGIT_CLONE_PORT = 8080 def set_up_logging(): logging.basicConfig( level=logging.DEBUG, datefmt='%Y-%m-%d %H:%M:%S', format='%(asctime)s | %(levelname)s | %(message)s') def parse_args(argv=None): if argv is None: argv = sys.argv[1:] parser = ArgumentParser() parser.add_argument('--output', metavar='PATH', default=DEFAULT_OUTPUT_DIR, help='output directory path') parser.add_argument('--cgit-user', metavar='USERNAME', default=DEFAULT_CGIT_CLONE_USER, help='cgit clone username') parser.add_argument('--cgit-host', metavar='HOST', default=DEFAULT_CGIT_CLONE_HOST, help='cgit clone host') parser.add_argument('--cgit-port', metavar='PORT', type=int, default=DEFAULT_CGIT_CLONE_PORT, help='cgit clone port number') parser.add_argument('--repo', metavar='REPO_ID', nargs='*', dest='repos', help='repos to pull') return parser.parse_args(argv) def check_output(*args, stdout=subprocess.PIPE): result = subprocess.run(args, stdout=stdout, stderr=subprocess.STDOUT, env=env, encoding='utf-8') try: result.check_returncode() if stdout != subprocess.DEVNULL: if result.stdout is None: logging.debug('%s', args) else: logging.debug('%s\n%s', args, result.stdout) return result.returncode == 0, result.stdout except subprocess.CalledProcessError as e: if stdout != subprocess.DEVNULL: logging.error('%s\n%s', e, e.output) return e.returncode == 0, e.output def run(*args, discard_output=False): if discard_output: success, _ = check_output(*args, stdout=subprocess.DEVNULL) else: success, _ = check_output(*args) return success @contextlib.contextmanager def chdir(new_cwd): old_cwd = os.getcwd() os.chdir(new_cwd) try: yield finally: os.chdir(old_cwd) class CGit: def __init__(self, user, host, port): self.user = user self.host = host self.ip = socket.gethostbyname(self.host) self.port = port def get_clone_url(self, repo): return f'http://{self.user}@{self.ip}:{self.port}/git/{repo.repo_id}' class CGitRC: def __init__(self, cgit): self.cgit = cgit def write(self, path, repo): with open(path, 'w') as fd: self._write_field(fd, 'clone-url', self._build_clone_url(repo)) self._write_field(fd, 'owner', repo.owner) self._write_field(fd, 'desc', repo.desc) self._write_field(fd, 'homepage', repo.homepage) @staticmethod def _write_field(fd, field, value): if value is None: return fd.write(f'{field}={value}\n') def _build_clone_url(self, repo): clone_urls = [] if repo.clone_url is not None: clone_urls.append(repo.clone_url) clone_urls.append(self.cgit.get_clone_url(repo)) clone_urls = ' '.join(clone_urls) return clone_urls class Output: def __init__(self, output_dir, cgit): self.output_dir = self._make_dir(output_dir) self.cgitrc = CGitRC(cgit) @staticmethod def _make_dir(rel_path): abs_path = os.path.abspath(rel_path) os.makedirs(abs_path, exist_ok=True) return abs_path def get_repo_dir(self, repo): return os.path.join(self.output_dir, repo.repo_id) def get_cgitrc_path(self, repo): return os.path.join(self.get_repo_dir(repo), 'cgitrc') def pull(self, repo): success = False verdict = self.judge(repo) if verdict is RepoVerdict.SHOULD_MIRROR: success = self.mirror(repo) elif verdict is RepoVerdict.SHOULD_UPDATE: success = self.update(repo) elif verdict is RepoVerdict.CANT_DECIDE: success = False else: raise NotImplementedError(f'Unknown repository verdict: {verdict}') if success: self.cgitrc.write(self.get_cgitrc_path(repo), repo) return success def judge(self, repo): repo_dir = self.get_repo_dir(repo) if not os.path.isdir(repo_dir): return RepoVerdict.SHOULD_MIRROR with chdir(repo_dir): if not run('git', 'rev-parse', '--is-inside-work-tree', discard_output=True): logging.warning('Not a repository, so going to mirror: %s', repo_dir) return RepoVerdict.SHOULD_MIRROR success, output = check_output('git', 'config', '--get', 'remote.origin.url') if not success: # Every repository managed by this script should have the # 'origin' remote. If it doesn't, it's trash. return RepoVerdict.SHOULD_MIRROR if f'{repo.clone_url}\n' != output: logging.warning("Existing repository '%s' URL doesn't match the specified clone" \ " URL: %s", repo.repo_id, repo.clone_url) return RepoVerdict.CANT_DECIDE # Looks like a legit clone of the specified remote. return RepoVerdict.SHOULD_UPDATE def mirror(self, repo): logging.info("Mirroring repository '%s' from: %s", repo.repo_id, repo.clone_url) repo_dir = self.get_repo_dir(repo) if os.path.isdir(repo_dir): try: shutil.rmtree(repo_dir) except Exception as e: logging.exception(e) return False return run('git', 'clone', '--mirror', repo.clone_url, repo_dir) def update(self, repo): logging.info("Updating repository '%s'", repo.repo_id) repo_dir = self.get_repo_dir(repo) with chdir(repo_dir): if not run('git', 'remote', 'update', '--prune'): return False if run('git', 'rev-parse', '--verify', '--quiet', 'origin/master', discard_output=True): if not run('git', 'reset', '--soft', 'origin/master'): return False return True class RepoVerdict(Enum): SHOULD_MIRROR = 1 SHOULD_UPDATE = 2 CANT_DECIDE = 3 def main(args=None): set_up_logging() try: args = parse_args(args) cgit = CGit(args.cgit_user, args.cgit_host, args.cgit_port) output = Output(args.output, cgit) success = True for repo in MY_REPOS: if args.repos is None or repo.repo_id in args.repos: if not output.pull(repo): success = False if success: logging.info('All repositories were updated successfully') return 0 else: logging.warning("Some repositories couldn't be updated!") return 1 except Exception as e: logging.exception(e) raise if __name__ == '__main__': sys.exit(main())