aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/pull/pull.py
diff options
context:
space:
mode:
authorEgor Tensin <Egor.Tensin@gmail.com>2019-08-12 01:17:07 +0300
committerEgor Tensin <Egor.Tensin@gmail.com>2019-08-12 01:17:07 +0300
commite113fb7431ebd18c1bc6f7641ceb376df9699dea (patch)
tree4f8ebdbe8dc328b3bb09981065ad59b11890232a /pull/pull.py
parentrename source files (diff)
downloadcgitize-e113fb7431ebd18c1bc6f7641ceb376df9699dea.tar.gz
cgitize-e113fb7431ebd18c1bc6f7641ceb376df9699dea.zip
split pull/pull.py
Diffstat (limited to 'pull/pull.py')
-rw-r--r--pull/pull.py230
1 files changed, 0 insertions, 230 deletions
diff --git a/pull/pull.py b/pull/pull.py
deleted file mode 100644
index 115be5b..0000000
--- a/pull/pull.py
+++ /dev/null
@@ -1,230 +0,0 @@
-from argparse import ArgumentParser
-import contextlib
-from enum import Enum
-import logging
-import os
-import os.path
-import shutil
-import socket
-import sys
-import subprocess
-
-from pull.my_repos import MY_REPOS
-
-
-env = os.environ.copy()
-env['GIT_SSH_COMMAND'] = 'ssh -oStrictHostKeyChecking=no -oBatchMode=yes'
-
-DEFAULT_OUTPUT_DIR = 'output'
-
-DEFAULT_CGIT_CLONE_USER = 'egor'
-DEFAULT_CGIT_CLONE_HOST = 'tensin-ext1.home'
-DEFAULT_CGIT_CLONE_PORT = 8080
-
-
-def set_up_logging():
- logging.basicConfig(
- level=logging.DEBUG,
- datefmt='%Y-%m-%d %H:%M:%S',
- format='%(asctime)s | %(levelname)s | %(message)s')
-
-
-def parse_args(argv=None):
- if argv is None:
- argv = sys.argv[1:]
- parser = ArgumentParser()
- parser.add_argument('--output', metavar='PATH',
- default=DEFAULT_OUTPUT_DIR,
- help='output directory path')
- parser.add_argument('--cgit-user', metavar='USERNAME',
- default=DEFAULT_CGIT_CLONE_USER,
- help='cgit clone username')
- parser.add_argument('--cgit-host', metavar='HOST',
- default=DEFAULT_CGIT_CLONE_HOST,
- help='cgit clone host')
- parser.add_argument('--cgit-port', metavar='PORT', type=int,
- default=DEFAULT_CGIT_CLONE_PORT,
- help='cgit clone port number')
- parser.add_argument('--repo', metavar='REPO_ID', nargs='*', dest='repos',
- help='repos to pull')
- return parser.parse_args(argv)
-
-
-def check_output(*args, stdout=subprocess.PIPE):
- result = subprocess.run(args, stdout=stdout, stderr=subprocess.STDOUT,
- env=env, encoding='utf-8')
- try:
- result.check_returncode()
- if stdout != subprocess.DEVNULL:
- if result.stdout is None:
- logging.debug('%s', args)
- else:
- logging.debug('%s\n%s', args, result.stdout)
- return result.returncode == 0, result.stdout
- except subprocess.CalledProcessError as e:
- if stdout != subprocess.DEVNULL:
- logging.error('%s\n%s', e, e.output)
- return e.returncode == 0, e.output
-
-
-def run(*args, discard_output=False):
- if discard_output:
- success, _ = check_output(*args, stdout=subprocess.DEVNULL)
- else:
- success, _ = check_output(*args)
- return success
-
-
-@contextlib.contextmanager
-def chdir(new_cwd):
- old_cwd = os.getcwd()
- os.chdir(new_cwd)
- try:
- yield
- finally:
- os.chdir(old_cwd)
-
-
-class CGit:
- def __init__(self, user, host, port):
- self.user = user
- self.host = host
- self.ip = socket.gethostbyname(self.host)
- self.port = port
-
- def get_clone_url(self, repo):
- return f'http://{self.user}@{self.ip}:{self.port}/git/{repo.repo_id}'
-
-
-class CGitRC:
- def __init__(self, cgit):
- self.cgit = cgit
-
- def write(self, path, repo):
- with open(path, 'w') as fd:
- self._write_field(fd, 'clone-url', self._build_clone_url(repo))
- self._write_field(fd, 'owner', repo.owner)
- self._write_field(fd, 'desc', repo.desc)
- self._write_field(fd, 'homepage', repo.homepage)
-
- @staticmethod
- def _write_field(fd, field, value):
- if value is None:
- return
- fd.write(f'{field}={value}\n')
-
- def _build_clone_url(self, repo):
- clone_urls = []
- if repo.clone_url is not None:
- clone_urls.append(repo.clone_url)
- clone_urls.append(self.cgit.get_clone_url(repo))
- clone_urls = ' '.join(clone_urls)
- return clone_urls
-
-
-class Output:
- def __init__(self, output_dir, cgit):
- self.output_dir = self._make_dir(output_dir)
- self.cgitrc = CGitRC(cgit)
-
- @staticmethod
- def _make_dir(rel_path):
- abs_path = os.path.abspath(rel_path)
- os.makedirs(abs_path, exist_ok=True)
- return abs_path
-
- def get_repo_dir(self, repo):
- return os.path.join(self.output_dir, repo.repo_id)
-
- def get_cgitrc_path(self, repo):
- return os.path.join(self.get_repo_dir(repo), 'cgitrc')
-
- def pull(self, repo):
- success = False
- verdict = self.judge(repo)
- if verdict is RepoVerdict.SHOULD_MIRROR:
- success = self.mirror(repo)
- elif verdict is RepoVerdict.SHOULD_UPDATE:
- success = self.update(repo)
- elif verdict is RepoVerdict.CANT_DECIDE:
- success = False
- else:
- raise NotImplementedError(f'Unknown repository verdict: {verdict}')
- if success:
- self.cgitrc.write(self.get_cgitrc_path(repo), repo)
- return success
-
- def judge(self, repo):
- repo_dir = self.get_repo_dir(repo)
- if not os.path.isdir(repo_dir):
- return RepoVerdict.SHOULD_MIRROR
- with chdir(repo_dir):
- if not run('git', 'rev-parse', '--is-inside-work-tree', discard_output=True):
- logging.warning('Not a repository, so going to mirror: %s', repo_dir)
- return RepoVerdict.SHOULD_MIRROR
- success, output = check_output('git', 'config', '--get', 'remote.origin.url')
- if not success:
- # Every repository managed by this script should have the
- # 'origin' remote. If it doesn't, it's trash.
- return RepoVerdict.SHOULD_MIRROR
- if f'{repo.clone_url}\n' != output:
- logging.warning("Existing repository '%s' URL doesn't match the specified clone" \
- " URL: %s", repo.repo_id, repo.clone_url)
- return RepoVerdict.CANT_DECIDE
- # Looks like a legit clone of the specified remote.
- return RepoVerdict.SHOULD_UPDATE
-
- def mirror(self, repo):
- logging.info("Mirroring repository '%s' from: %s", repo.repo_id,
- repo.clone_url)
- repo_dir = self.get_repo_dir(repo)
- if os.path.isdir(repo_dir):
- try:
- shutil.rmtree(repo_dir)
- except Exception as e:
- logging.exception(e)
- return False
- return run('git', 'clone', '--mirror', repo.clone_url, repo_dir)
-
- def update(self, repo):
- logging.info("Updating repository '%s'", repo.repo_id)
- repo_dir = self.get_repo_dir(repo)
- with chdir(repo_dir):
- if not run('git', 'remote', 'update', '--prune'):
- return False
- if run('git', 'rev-parse', '--verify', '--quiet', 'origin/master', discard_output=True):
- if not run('git', 'reset', '--soft', 'origin/master'):
- return False
- return True
-
-
-class RepoVerdict(Enum):
- SHOULD_MIRROR = 1
- SHOULD_UPDATE = 2
- CANT_DECIDE = 3
-
-
-def main(args=None):
- set_up_logging()
- try:
- args = parse_args(args)
- cgit = CGit(args.cgit_user, args.cgit_host, args.cgit_port)
- output = Output(args.output, cgit)
- success = True
- for repo in MY_REPOS:
- if args.repos is None or repo.repo_id in args.repos:
- if not output.pull(repo):
- success = False
- if success:
- logging.info('All repositories were updated successfully')
- return 0
- else:
- logging.warning("Some repositories couldn't be updated!")
- return 1
- except Exception as e:
- logging.exception(e)
- raise
-
-
-if __name__ == '__main__':
- sys.exit(main())