aboutsummaryrefslogblamecommitdiffstatshomepage
path: root/pull/pull.py
blob: 115be5b98551d7518b31b7b908f204c228b454a5 (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11










                                   
                                  






















                                                                         

                                                   
                                                     

                                                          
                                                   

                                                        
                                               




                                                                             




































































































































































                                                                                                    


                                                                












                                                                      
from argparse import ArgumentParser
import contextlib
from enum import Enum
import logging
import os
import os.path
import shutil
import socket
import sys
import subprocess

from pull.my_repos import MY_REPOS


env = os.environ.copy()
env['GIT_SSH_COMMAND'] = 'ssh -oStrictHostKeyChecking=no -oBatchMode=yes'

DEFAULT_OUTPUT_DIR = 'output'

DEFAULT_CGIT_CLONE_USER = 'egor'
DEFAULT_CGIT_CLONE_HOST = 'tensin-ext1.home'
DEFAULT_CGIT_CLONE_PORT = 8080


def set_up_logging():
    logging.basicConfig(
        level=logging.DEBUG,
        datefmt='%Y-%m-%d %H:%M:%S',
        format='%(asctime)s | %(levelname)s | %(message)s')


def parse_args(argv=None):
    if argv is None:
        argv = sys.argv[1:]
    parser = ArgumentParser()
    parser.add_argument('--output', metavar='PATH',
                        default=DEFAULT_OUTPUT_DIR,
                        help='output directory path')
    parser.add_argument('--cgit-user', metavar='USERNAME',
                        default=DEFAULT_CGIT_CLONE_USER,
                        help='cgit clone username')
    parser.add_argument('--cgit-host', metavar='HOST',
                        default=DEFAULT_CGIT_CLONE_HOST,
                        help='cgit clone host')
    parser.add_argument('--cgit-port', metavar='PORT', type=int,
                        default=DEFAULT_CGIT_CLONE_PORT,
                        help='cgit clone port number')
    parser.add_argument('--repo', metavar='REPO_ID', nargs='*', dest='repos',
                        help='repos to pull')
    return parser.parse_args(argv)


def check_output(*args, stdout=subprocess.PIPE):
    result = subprocess.run(args, stdout=stdout, stderr=subprocess.STDOUT,
                            env=env, encoding='utf-8')
    try:
        result.check_returncode()
        if stdout != subprocess.DEVNULL:
            if result.stdout is None:
                logging.debug('%s', args)
            else:
                logging.debug('%s\n%s', args, result.stdout)
        return result.returncode == 0, result.stdout
    except subprocess.CalledProcessError as e:
        if stdout != subprocess.DEVNULL:
            logging.error('%s\n%s', e, e.output)
        return e.returncode == 0, e.output


def run(*args, discard_output=False):
    if discard_output:
        success, _ = check_output(*args, stdout=subprocess.DEVNULL)
    else:
        success, _ = check_output(*args)
    return success


@contextlib.contextmanager
def chdir(new_cwd):
    old_cwd = os.getcwd()
    os.chdir(new_cwd)
    try:
        yield
    finally:
        os.chdir(old_cwd)


class CGit:
    def __init__(self, user, host, port):
        self.user = user
        self.host = host
        self.ip = socket.gethostbyname(self.host)
        self.port = port

    def get_clone_url(self, repo):
        return f'http://{self.user}@{self.ip}:{self.port}/git/{repo.repo_id}'


class CGitRC:
    def __init__(self, cgit):
        self.cgit = cgit

    def write(self, path, repo):
        with open(path, 'w') as fd:
            self._write_field(fd, 'clone-url', self._build_clone_url(repo))
            self._write_field(fd, 'owner', repo.owner)
            self._write_field(fd, 'desc', repo.desc)
            self._write_field(fd, 'homepage', repo.homepage)

    @staticmethod
    def _write_field(fd, field, value):
        if value is None:
            return
        fd.write(f'{field}={value}\n')

    def _build_clone_url(self, repo):
        clone_urls = []
        if repo.clone_url is not None:
            clone_urls.append(repo.clone_url)
        clone_urls.append(self.cgit.get_clone_url(repo))
        clone_urls = ' '.join(clone_urls)
        return clone_urls


class Output:
    def __init__(self, output_dir, cgit):
        self.output_dir = self._make_dir(output_dir)
        self.cgitrc = CGitRC(cgit)

    @staticmethod
    def _make_dir(rel_path):
        abs_path = os.path.abspath(rel_path)
        os.makedirs(abs_path, exist_ok=True)
        return abs_path

    def get_repo_dir(self, repo):
        return os.path.join(self.output_dir, repo.repo_id)

    def get_cgitrc_path(self, repo):
        return os.path.join(self.get_repo_dir(repo), 'cgitrc')

    def pull(self, repo):
        success = False
        verdict = self.judge(repo)
        if verdict is RepoVerdict.SHOULD_MIRROR:
            success = self.mirror(repo)
        elif verdict is RepoVerdict.SHOULD_UPDATE:
            success = self.update(repo)
        elif verdict is RepoVerdict.CANT_DECIDE:
            success = False
        else:
            raise NotImplementedError(f'Unknown repository verdict: {verdict}')
        if success:
            self.cgitrc.write(self.get_cgitrc_path(repo), repo)
        return success

    def judge(self, repo):
        repo_dir = self.get_repo_dir(repo)
        if not os.path.isdir(repo_dir):
            return RepoVerdict.SHOULD_MIRROR
        with chdir(repo_dir):
            if not run('git', 'rev-parse', '--is-inside-work-tree', discard_output=True):
                logging.warning('Not a repository, so going to mirror: %s', repo_dir)
                return RepoVerdict.SHOULD_MIRROR
            success, output = check_output('git', 'config', '--get', 'remote.origin.url')
            if not success:
                # Every repository managed by this script should have the
                # 'origin' remote. If it doesn't, it's trash.
                return RepoVerdict.SHOULD_MIRROR
            if f'{repo.clone_url}\n' != output:
                logging.warning("Existing repository '%s' URL doesn't match the specified clone" \
                                " URL: %s", repo.repo_id, repo.clone_url)
                return RepoVerdict.CANT_DECIDE
            # Looks like a legit clone of the specified remote.
            return RepoVerdict.SHOULD_UPDATE

    def mirror(self, repo):
        logging.info("Mirroring repository '%s' from: %s", repo.repo_id,
                     repo.clone_url)
        repo_dir = self.get_repo_dir(repo)
        if os.path.isdir(repo_dir):
            try:
                shutil.rmtree(repo_dir)
            except Exception as e:
                logging.exception(e)
                return False
        return run('git', 'clone', '--mirror', repo.clone_url, repo_dir)

    def update(self, repo):
        logging.info("Updating repository '%s'", repo.repo_id)
        repo_dir = self.get_repo_dir(repo)
        with chdir(repo_dir):
            if not run('git', 'remote', 'update', '--prune'):
                return False
            if run('git', 'rev-parse', '--verify', '--quiet', 'origin/master', discard_output=True):
                if not run('git', 'reset', '--soft', 'origin/master'):
                    return False
            return True


class RepoVerdict(Enum):
    SHOULD_MIRROR = 1
    SHOULD_UPDATE = 2
    CANT_DECIDE = 3


def main(args=None):
    set_up_logging()
    try:
        args = parse_args(args)
        cgit = CGit(args.cgit_user, args.cgit_host, args.cgit_port)
        output = Output(args.output, cgit)
        success = True
        for repo in MY_REPOS:
            if args.repos is None or repo.repo_id in args.repos:
                if not output.pull(repo):
                    success = False
        if success:
            logging.info('All repositories were updated successfully')
            return 0
        else:
            logging.warning("Some repositories couldn't be updated!")
            return 1
    except Exception as e:
        logging.exception(e)
        raise


if __name__ == '__main__':
    sys.exit(main())