aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/cgit
diff options
context:
space:
mode:
authorEgor Tensin <Egor.Tensin@gmail.com>2020-03-01 11:17:29 +0300
committerEgor Tensin <Egor.Tensin@gmail.com>2020-03-01 11:24:09 +0300
commit0352e233955bc5d04344a4ae3ef04436d8299ae1 (patch)
tree7b99b9bbf7b6390cbea22966ba257dfd09f1fea9 /cgit
parentTravis: lint the config (diff)
downloadcgitize-0352e233955bc5d04344a4ae3ef04436d8299ae1.tar.gz
cgitize-0352e233955bc5d04344a4ae3ef04436d8299ae1.zip
rename the modules
Diffstat (limited to 'cgit')
-rw-r--r--cgit/__init__.py0
-rw-r--r--cgit/repos/__init__.py0
-rw-r--r--cgit/repos/cgit.py146
-rw-r--r--cgit/repos/main.py114
-rw-r--r--cgit/repos/repo.py98
-rw-r--r--cgit/repos/utils.py43
6 files changed, 401 insertions, 0 deletions
diff --git a/cgit/__init__.py b/cgit/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/cgit/__init__.py
diff --git a/cgit/repos/__init__.py b/cgit/repos/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/cgit/repos/__init__.py
diff --git a/cgit/repos/cgit.py b/cgit/repos/cgit.py
new file mode 100644
index 0000000..bb010d3
--- /dev/null
+++ b/cgit/repos/cgit.py
@@ -0,0 +1,146 @@
+# Copyright (c) 2018 Egor Tensin <Egor.Tensin@gmail.com>
+# This file is part of the "cgit repos" project.
+# For details, see https://github.com/egor-tensin/cgit-repos.
+# Distributed under the MIT License.
+
+from enum import Enum
+import logging
+import os
+import os.path
+import shutil
+
+from cgit.repos.utils import chdir, check_output, run
+
+
+_ENV = os.environ.copy()
+_ENV['GIT_SSH_COMMAND'] = 'ssh -oBatchMode=yes -oLogLevel=QUIET -oStrictHostKeyChecking=no -oUserKnownHostsFile=/dev/null'
+
+
+def _run(*args, **kwargs):
+ return run(*args, env=_ENV, **kwargs)
+
+
+def _check_output(*args, **kwargs):
+ return check_output(*args, env=_ENV, **kwargs)
+
+
+class CGit:
+ def __init__(self, clone_url):
+ self.clone_url = clone_url
+
+ def get_clone_url(self, repo):
+ if self.clone_url is None:
+ return None
+ return self.clone_url.format(repo_id=repo.repo_id)
+
+
+class CGitRC:
+ def __init__(self, cgit):
+ self.cgit = cgit
+
+ def write(self, path, repo):
+ with open(path, 'w') as fd:
+ self._write_field(fd, 'clone-url', self._build_clone_url(repo))
+ self._write_field(fd, 'owner', repo.owner)
+ self._write_field(fd, 'desc', repo.desc)
+ self._write_field(fd, 'homepage', repo.homepage)
+
+ @staticmethod
+ def _write_field(fd, field, value):
+ if value is None:
+ return
+ fd.write(f'{field}={value}\n')
+
+ def _build_clone_url(self, repo):
+ clone_urls = []
+ if repo.clone_url is not None:
+ clone_urls.append(repo.clone_url)
+ cgit_clone_url = self.cgit.get_clone_url(repo)
+ if cgit_clone_url is not None:
+ clone_urls.append(cgit_clone_url)
+ if not clone_urls:
+ return None
+ clone_urls = ' '.join(clone_urls)
+ return clone_urls
+
+
+class Output:
+ def __init__(self, output_dir, cgit):
+ self.output_dir = self._make_dir(output_dir)
+ self.cgitrc = CGitRC(cgit)
+
+ @staticmethod
+ def _make_dir(rel_path):
+ abs_path = os.path.abspath(rel_path)
+ os.makedirs(abs_path, exist_ok=True)
+ return abs_path
+
+ def get_repo_dir(self, repo):
+ return os.path.join(self.output_dir, repo.repo_id)
+
+ def get_cgitrc_path(self, repo):
+ return os.path.join(self.get_repo_dir(repo), 'cgitrc')
+
+ def pull(self, repo):
+ success = False
+ verdict = self.judge(repo)
+ if verdict is RepoVerdict.SHOULD_MIRROR:
+ success = self.mirror(repo)
+ elif verdict is RepoVerdict.SHOULD_UPDATE:
+ success = self.update(repo)
+ elif verdict is RepoVerdict.CANT_DECIDE:
+ success = False
+ else:
+ raise NotImplementedError(f'Unknown repository verdict: {verdict}')
+ if success:
+ self.cgitrc.write(self.get_cgitrc_path(repo), repo)
+ return success
+
+ def judge(self, repo):
+ repo_dir = self.get_repo_dir(repo)
+ if not os.path.isdir(repo_dir):
+ return RepoVerdict.SHOULD_MIRROR
+ with chdir(repo_dir):
+ if not _run('git', 'rev-parse', '--is-inside-work-tree', discard_output=True):
+ logging.warning('Not a repository, so going to mirror: %s', repo_dir)
+ return RepoVerdict.SHOULD_MIRROR
+ success, output = _check_output('git', 'config', '--get', 'remote.origin.url')
+ if not success:
+ # Every repository managed by this script should have the
+ # 'origin' remote. If it doesn't, it's trash.
+ return RepoVerdict.SHOULD_MIRROR
+ if f'{repo.clone_url}\n' != output:
+ logging.warning("Existing repository '%s' URL doesn't match the specified clone" \
+ " URL: %s", repo.repo_id, repo.clone_url)
+ return RepoVerdict.CANT_DECIDE
+ # Looks like a legit clone of the specified remote.
+ return RepoVerdict.SHOULD_UPDATE
+
+ def mirror(self, repo):
+ logging.info("Mirroring repository '%s' from: %s", repo.repo_id,
+ repo.clone_url)
+ repo_dir = self.get_repo_dir(repo)
+ if os.path.isdir(repo_dir):
+ try:
+ shutil.rmtree(repo_dir)
+ except Exception as e:
+ logging.exception(e)
+ return False
+ return _run('git', 'clone', '--mirror', repo.clone_url, repo_dir)
+
+ def update(self, repo):
+ logging.info("Updating repository '%s'", repo.repo_id)
+ repo_dir = self.get_repo_dir(repo)
+ with chdir(repo_dir):
+ if not _run('git', 'remote', 'update', '--prune'):
+ return False
+ if _run('git', 'rev-parse', '--verify', '--quiet', 'origin/master', discard_output=True):
+ if not _run('git', 'reset', '--soft', 'origin/master'):
+ return False
+ return True
+
+
+class RepoVerdict(Enum):
+ SHOULD_MIRROR = 1
+ SHOULD_UPDATE = 2
+ CANT_DECIDE = 3
diff --git a/cgit/repos/main.py b/cgit/repos/main.py
new file mode 100644
index 0000000..8132208
--- /dev/null
+++ b/cgit/repos/main.py
@@ -0,0 +1,114 @@
+# Copyright (c) 2018 Egor Tensin <Egor.Tensin@gmail.com>
+# This file is part of the "cgit repos" project.
+# For details, see https://github.com/egor-tensin/cgit-repos.
+# Distributed under the MIT License.
+
+from argparse import ArgumentParser
+import configparser
+import importlib
+import logging
+import os.path
+import sys
+
+from cgit.repos.cgit import CGit, Output
+from cgit.repos.repo import BitbucketRepo, GithubRepo, Repo
+
+
+DEFAULT_OUTPUT_DIR = '/var/tmp/cgit-repos/output'
+DEFAULT_CONFIG_PATH = '/etc/cgit-repos/cgit-repos.conf'
+DEFAULT_MY_REPOS_PATH = '/etc/cgit-repos/my_repos.py'
+
+
+def set_up_logging():
+ logging.basicConfig(
+ level=logging.DEBUG,
+ datefmt='%Y-%m-%d %H:%M:%S',
+ format='%(asctime)s | %(levelname)s | %(message)s')
+
+
+def parse_args(argv=None):
+ if argv is None:
+ argv = sys.argv[1:]
+ parser = ArgumentParser()
+ parser.add_argument('--config', metavar='PATH',
+ default=DEFAULT_CONFIG_PATH,
+ help='config file path')
+ parser.add_argument('--repo', metavar='REPO_ID',
+ nargs='*', dest='repos',
+ help='repos to pull')
+ return parser.parse_args(argv)
+
+
+class Config:
+ @staticmethod
+ def read(path):
+ config = configparser.ConfigParser()
+ config.read(path)
+ return Config(config)
+
+ def __init__(self, impl):
+ self.impl = impl
+
+ @property
+ def output(self):
+ return self.impl.get('DEFAULT', 'output', fallback=DEFAULT_OUTPUT_DIR)
+
+ @property
+ def clone_url(self):
+ return self.impl.get('DEFAULT', 'clone_url', fallback=None)
+
+ @property
+ def default_owner(self):
+ return self.impl.get('DEFAULT', 'owner', fallback=None)
+
+ @property
+ def github_username(self):
+ return self.impl.get('GITHUB', 'username', fallback=None)
+
+ @property
+ def bitbucket_username(self):
+ return self.impl.get('BITBUCKET', 'username', fallback=None)
+
+ def set_defaults(self):
+ Repo.DEFAULT_OWNER = self.default_owner
+ GithubRepo.DEFAULT_USER = self.github_username
+ BitbucketRepo.DEFAULT_USER = self.bitbucket_username
+
+ @property
+ def my_repos(self):
+ return self.impl.get('DEFAULT', 'my_repos', fallback=DEFAULT_MY_REPOS_PATH)
+
+ def import_my_repos(self):
+ sys.path.append(os.path.dirname(self.my_repos))
+ module_name = os.path.splitext(os.path.basename(self.my_repos))[0]
+ module = importlib.import_module(module_name)
+ return module.MY_REPOS
+
+
+def main(args=None):
+ set_up_logging()
+ try:
+ args = parse_args(args)
+ config = Config.read(args.config)
+ config.set_defaults()
+ my_repos = config.import_my_repos()
+ cgit = CGit(config.clone_url)
+ output = Output(config.output, cgit)
+ success = True
+ for repo in my_repos:
+ if args.repos is None or repo.repo_id in args.repos:
+ if not output.pull(repo):
+ success = False
+ if success:
+ logging.info('All repositories were updated successfully')
+ return 0
+ else:
+ logging.warning("Some repositories couldn't be updated!")
+ return 1
+ except Exception as e:
+ logging.exception(e)
+ raise
+
+
+if __name__ == '__main__':
+ sys.exit(main())
diff --git a/cgit/repos/repo.py b/cgit/repos/repo.py
new file mode 100644
index 0000000..a128dbf
--- /dev/null
+++ b/cgit/repos/repo.py
@@ -0,0 +1,98 @@
+# Copyright (c) 2018 Egor Tensin <Egor.Tensin@gmail.com>
+# This file is part of the "cgit repos" project.
+# For details, see https://github.com/egor-tensin/cgit-repos.
+# Distributed under the MIT License.
+
+import os.path
+
+
+class Repo:
+ DEFAULT_OWNER = None
+
+ @staticmethod
+ def extract_repo_name(repo_id):
+ return os.path.basename(repo_id)
+
+ def __init__(self, repo_id, clone_url, owner=None, desc=None,
+ homepage=None):
+ self.repo_id = repo_id
+ self.repo_name = self.extract_repo_name(repo_id)
+ self.clone_url = clone_url
+ if owner is None:
+ owner = Repo.DEFAULT_OWNER
+ self.owner = owner
+ if desc is None:
+ if homepage is not None:
+ desc = homepage
+ elif clone_url is not None:
+ desc = clone_url
+ else:
+ desc = self.repo_name
+ self.desc = desc
+ self.homepage = homepage
+
+
+class GithubRepo(Repo):
+ DEFAULT_USER = None
+
+ def __init__(self, repo_id, clone_url=None, owner=None, desc=None,
+ homepage=None, user=DEFAULT_USER, via_ssh=True):
+ if user is None:
+ if GithubRepo.DEFAULT_USER is None:
+ raise RuntimeError('neither explicit or default GitHub username was provided')
+ user = GithubRepo.DEFAULT_USER
+ name = Repo.extract_repo_name(repo_id)
+ if clone_url is None:
+ if via_ssh:
+ clone_url = self.build_clone_url_ssh(user, name)
+ else:
+ clone_url = self.build_clone_url_https(user, name)
+ if homepage is None:
+ homepage = self.build_homepage_url(user, name)
+ super().__init__(repo_id, clone_url, owner=owner, desc=desc,
+ homepage=homepage)
+
+ @staticmethod
+ def build_clone_url_ssh(user, name):
+ return f'ssh://git@github.com/{user}/{name}.git'
+
+ @staticmethod
+ def build_clone_url_https(user, name):
+ return f'https://github.com/{user}/{name}.git'
+
+ @staticmethod
+ def build_homepage_url(user, name):
+ return f'https://github.com/{user}/{name}'
+
+
+class BitbucketRepo(Repo):
+ DEFAULT_USER = None
+
+ def __init__(self, repo_id, clone_url=None, owner=None, desc=None,
+ homepage=None, user=DEFAULT_USER, via_ssh=True):
+ if user is None:
+ if BitbucketRepo.DEFAULT_USER is None:
+ raise RuntimeError('neither explicit or default Bitbucket username was provided')
+ user = BitbucketRepo.DEFAULT_USER
+ name = Repo.extract_repo_name(repo_id)
+ if clone_url is None:
+ if via_ssh:
+ clone_url = self.build_clone_url_ssh(user, name)
+ else:
+ clone_url = self.build_clone_url_https(user, name)
+ if homepage is None:
+ homepage = self.build_homepage_url(user, name)
+ super().__init__(repo_id, clone_url, owner=owner, desc=desc,
+ homepage=homepage)
+
+ @staticmethod
+ def build_clone_url_ssh(user, name):
+ return f'ssh://git@bitbucket.org/{user}/{name}.git'
+
+ @staticmethod
+ def build_clone_url_https(user, name):
+ return f'https://bitbucket.org/{user}/{name}.git'
+
+ @staticmethod
+ def build_homepage_url(user, name):
+ return f'https://bitbucket.org/{user}/{name.lower()}'
diff --git a/cgit/repos/utils.py b/cgit/repos/utils.py
new file mode 100644
index 0000000..84337e8
--- /dev/null
+++ b/cgit/repos/utils.py
@@ -0,0 +1,43 @@
+# Copyright (c) 2018 Egor Tensin <Egor.Tensin@gmail.com>
+# This file is part of the "cgit repos" project.
+# For details, see https://github.com/egor-tensin/cgit-repos.
+# Distributed under the MIT License.
+
+import contextlib
+import logging
+import os
+import subprocess
+
+
+def check_output(*args, stdout=subprocess.PIPE, **kwargs):
+ try:
+ result = subprocess.run(args, stdout=stdout, stderr=subprocess.STDOUT,
+ encoding='utf-8', check=True, **kwargs)
+ if stdout != subprocess.DEVNULL:
+ if result.stdout is None:
+ logging.debug('%s', args)
+ else:
+ logging.debug('%s\n%s', args, result.stdout)
+ return result.returncode == 0, result.stdout
+ except subprocess.CalledProcessError as e:
+ if stdout != subprocess.DEVNULL:
+ logging.error('%s\n%s', e, e.output)
+ return e.returncode == 0, e.output
+
+
+def run(*args, discard_output=False, **kwargs):
+ if discard_output:
+ success, _ = check_output(*args, stdout=subprocess.DEVNULL, **kwargs)
+ else:
+ success, _ = check_output(*args, **kwargs)
+ return success
+
+
+@contextlib.contextmanager
+def chdir(new_cwd):
+ old_cwd = os.getcwd()
+ os.chdir(new_cwd)
+ try:
+ yield
+ finally:
+ os.chdir(old_cwd)