aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/cgit
diff options
context:
space:
mode:
Diffstat (limited to 'cgit')
-rw-r--r--cgit/__init__.py0
-rw-r--r--cgit/repos/__init__.py0
-rw-r--r--cgit/repos/cgit.py146
-rw-r--r--cgit/repos/main.py132
-rw-r--r--cgit/repos/repo.py112
-rw-r--r--cgit/repos/utils.py43
6 files changed, 0 insertions, 433 deletions
diff --git a/cgit/__init__.py b/cgit/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/cgit/__init__.py
+++ /dev/null
diff --git a/cgit/repos/__init__.py b/cgit/repos/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/cgit/repos/__init__.py
+++ /dev/null
diff --git a/cgit/repos/cgit.py b/cgit/repos/cgit.py
deleted file mode 100644
index 0f8917a..0000000
--- a/cgit/repos/cgit.py
+++ /dev/null
@@ -1,146 +0,0 @@
-# Copyright (c) 2018 Egor Tensin <Egor.Tensin@gmail.com>
-# This file is part of the "cgit repos" project.
-# For details, see https://github.com/egor-tensin/cgit-repos.
-# Distributed under the MIT License.
-
-from enum import Enum
-import logging
-import os
-import os.path
-import shutil
-
-import cgit.repos.utils as utils
-
-
-GIT_ENV = os.environ.copy()
-GIT_ENV['GIT_SSH_COMMAND'] = 'ssh -oBatchMode=yes -oLogLevel=QUIET -oStrictHostKeyChecking=no -oUserKnownHostsFile=/dev/null'
-
-
-def run(*args, **kwargs):
- return utils.run(*args, env=GIT_ENV, **kwargs)
-
-
-def check_output(*args, **kwargs):
- return utils.check_output(*args, env=GIT_ENV, **kwargs)
-
-
-class CGit:
- def __init__(self, clone_url):
- self.clone_url = clone_url
-
- def get_clone_url(self, repo):
- if self.clone_url is None:
- return None
- return self.clone_url.format(repo_id=repo.repo_id)
-
-
-class CGitRC:
- def __init__(self, cgit):
- self.cgit = cgit
-
- def write(self, path, repo):
- with open(path, 'w') as fd:
- self._write_field(fd, 'clone-url', self._build_clone_url(repo))
- self._write_field(fd, 'owner', repo.owner)
- self._write_field(fd, 'desc', repo.desc)
- self._write_field(fd, 'homepage', repo.homepage)
-
- @staticmethod
- def _write_field(fd, field, value):
- if value is None:
- return
- fd.write(f'{field}={value}\n')
-
- def _build_clone_url(self, repo):
- clone_urls = []
- if repo.clone_url is not None:
- clone_urls.append(repo.clone_url)
- cgit_clone_url = self.cgit.get_clone_url(repo)
- if cgit_clone_url is not None:
- clone_urls.append(cgit_clone_url)
- if not clone_urls:
- return None
- clone_urls = ' '.join(clone_urls)
- return clone_urls
-
-
-class Output:
- def __init__(self, output_dir, cgit):
- self.output_dir = self._make_dir(output_dir)
- self.cgitrc = CGitRC(cgit)
-
- @staticmethod
- def _make_dir(rel_path):
- abs_path = os.path.abspath(rel_path)
- os.makedirs(abs_path, exist_ok=True)
- return abs_path
-
- def get_repo_dir(self, repo):
- return os.path.join(self.output_dir, repo.repo_id)
-
- def get_cgitrc_path(self, repo):
- return os.path.join(self.get_repo_dir(repo), 'cgitrc')
-
- def pull(self, repo):
- success = False
- verdict = self.judge(repo)
- if verdict is RepoVerdict.SHOULD_MIRROR:
- success = self.mirror(repo)
- elif verdict is RepoVerdict.SHOULD_UPDATE:
- success = self.update(repo)
- elif verdict is RepoVerdict.CANT_DECIDE:
- success = False
- else:
- raise NotImplementedError(f'Unknown repository verdict: {verdict}')
- if success:
- self.cgitrc.write(self.get_cgitrc_path(repo), repo)
- return success
-
- def judge(self, repo):
- repo_dir = self.get_repo_dir(repo)
- if not os.path.isdir(repo_dir):
- return RepoVerdict.SHOULD_MIRROR
- with utils.chdir(repo_dir):
- if not run('git', 'rev-parse', '--is-inside-work-tree', discard_output=True):
- logging.warning('Not a repository, so going to mirror: %s', repo_dir)
- return RepoVerdict.SHOULD_MIRROR
- success, output = check_output('git', 'config', '--get', 'remote.origin.url')
- if not success:
- # Every repository managed by this script should have the
- # 'origin' remote. If it doesn't, it's trash.
- return RepoVerdict.SHOULD_MIRROR
- if f'{repo.clone_url}\n' != output:
- logging.warning("Existing repository '%s' URL doesn't match the specified clone" \
- " URL: %s", repo.repo_id, repo.clone_url)
- return RepoVerdict.CANT_DECIDE
- # Looks like a legit clone of the specified remote.
- return RepoVerdict.SHOULD_UPDATE
-
- def mirror(self, repo):
- logging.info("Mirroring repository '%s' from: %s", repo.repo_id,
- repo.clone_url)
- repo_dir = self.get_repo_dir(repo)
- if os.path.isdir(repo_dir):
- try:
- shutil.rmtree(repo_dir)
- except Exception as e:
- logging.exception(e)
- return False
- return run('git', 'clone', '--mirror', repo.clone_url, repo_dir)
-
- def update(self, repo):
- logging.info("Updating repository '%s'", repo.repo_id)
- repo_dir = self.get_repo_dir(repo)
- with utils.chdir(repo_dir):
- if not run('git', 'remote', 'update', '--prune'):
- return False
- if run('git', 'rev-parse', '--verify', '--quiet', 'origin/master', discard_output=True):
- if not run('git', 'reset', '--soft', 'origin/master'):
- return False
- return True
-
-
-class RepoVerdict(Enum):
- SHOULD_MIRROR = 1
- SHOULD_UPDATE = 2
- CANT_DECIDE = 3
diff --git a/cgit/repos/main.py b/cgit/repos/main.py
deleted file mode 100644
index e5e390f..0000000
--- a/cgit/repos/main.py
+++ /dev/null
@@ -1,132 +0,0 @@
-# Copyright (c) 2018 Egor Tensin <Egor.Tensin@gmail.com>
-# This file is part of the "cgit repos" project.
-# For details, see https://github.com/egor-tensin/cgit-repos.
-# Distributed under the MIT License.
-
-from argparse import ArgumentParser
-import configparser
-from contextlib import contextmanager
-import importlib
-import logging
-import os.path
-import sys
-
-from cgit.repos.cgit import CGit, Output
-from cgit.repos.repo import BitbucketRepo, GithubRepo, Repo
-import cgit.repos.utils as utils
-
-
-DEFAULT_OUTPUT_DIR = '/var/tmp/cgit-repos/output'
-DEFAULT_CONFIG_PATH = '/etc/cgit-repos/cgit-repos.conf'
-DEFAULT_MY_REPOS_PATH = '/etc/cgit-repos/my_repos.py'
-
-
-@contextmanager
-def setup_logging():
- logging.basicConfig(
- level=logging.DEBUG,
- datefmt='%Y-%m-%d %H:%M:%S',
- format='%(asctime)s | %(levelname)s | %(message)s')
- try:
- yield
- except Exception as e:
- logging.exception(e)
- raise
-
-
-def parse_args(argv=None):
- if argv is None:
- argv = sys.argv[1:]
- parser = ArgumentParser()
- parser.add_argument('--config', metavar='PATH',
- default=DEFAULT_CONFIG_PATH,
- help='config file path')
- parser.add_argument('--repo', metavar='REPO_ID',
- nargs='*', dest='repos',
- help='repos to pull')
- return parser.parse_args(argv)
-
-
-class Config:
- @staticmethod
- def read(path):
- return Config(path)
-
- def __init__(self, path):
- self.path = os.path.abspath(path)
- self.impl = configparser.ConfigParser()
- self.impl.read(path)
-
- def _resolve_relative(self, path):
- if os.path.isabs(path):
- return path
- with utils.chdir(os.path.dirname(self.path)):
- path = os.path.abspath(path)
- return path
-
- @property
- def output(self):
- path = self.impl.get('DEFAULT', 'output', fallback=DEFAULT_OUTPUT_DIR)
- return self._resolve_relative(path)
-
- @property
- def clone_url(self):
- return self.impl.get('DEFAULT', 'clone_url', fallback=None)
-
- @property
- def default_owner(self):
- return self.impl.get('DEFAULT', 'owner', fallback=None)
-
- @property
- def via_ssh(self):
- return self.impl.getboolean('DEFAULT', 'ssh', fallback=Repo.DEFAULT_VIA_SSH)
-
- @property
- def github_username(self):
- return self.impl.get('GITHUB', 'username', fallback=None)
-
- @property
- def bitbucket_username(self):
- return self.impl.get('BITBUCKET', 'username', fallback=None)
-
- def set_defaults(self):
- Repo.DEFAULT_OWNER = self.default_owner
- Repo.DEFAULT_VIA_SSH = self.via_ssh
- GithubRepo.DEFAULT_USER = self.github_username
- BitbucketRepo.DEFAULT_USER = self.bitbucket_username
-
- @property
- def my_repos(self):
- path = self.impl.get('DEFAULT', 'my_repos', fallback=DEFAULT_MY_REPOS_PATH)
- return self._resolve_relative(path)
-
- def import_my_repos(self):
- sys.path.append(os.path.dirname(self.my_repos))
- module_name = os.path.splitext(os.path.basename(self.my_repos))[0]
- module = importlib.import_module(module_name)
- return module.MY_REPOS
-
-
-def main(args=None):
- with setup_logging():
- args = parse_args(args)
- config = Config.read(args.config)
- config.set_defaults()
- my_repos = config.import_my_repos()
- cgit = CGit(config.clone_url)
- output = Output(config.output, cgit)
- success = True
- for repo in my_repos:
- if args.repos is None or repo.repo_id in args.repos:
- if not output.pull(repo):
- success = False
- if success:
- logging.info('All repositories were updated successfully')
- return 0
- else:
- logging.warning("Some repositories couldn't be updated!")
- return 1
-
-
-if __name__ == '__main__':
- sys.exit(main())
diff --git a/cgit/repos/repo.py b/cgit/repos/repo.py
deleted file mode 100644
index 4b3072c..0000000
--- a/cgit/repos/repo.py
+++ /dev/null
@@ -1,112 +0,0 @@
-# Copyright (c) 2018 Egor Tensin <Egor.Tensin@gmail.com>
-# This file is part of the "cgit repos" project.
-# For details, see https://github.com/egor-tensin/cgit-repos.
-# Distributed under the MIT License.
-
-import abc
-import os.path
-
-
-class Repo:
- DEFAULT_OWNER = None
- DEFAULT_VIA_SSH = True
-
- @staticmethod
- def extract_repo_name(repo_id):
- return os.path.basename(repo_id)
-
- def __init__(self, repo_id, clone_url, owner=None, desc=None,
- homepage=None):
- self.repo_id = repo_id
- self.repo_name = self.extract_repo_name(repo_id)
- self.clone_url = clone_url
- if owner is None:
- owner = Repo.DEFAULT_OWNER
- self.owner = owner
- if desc is None:
- if homepage is not None:
- desc = homepage
- elif clone_url is not None:
- desc = clone_url
- else:
- desc = self.repo_name
- self.desc = desc
- self.homepage = homepage
-
-
-class HostedRepo(Repo, abc.ABC):
- def __init__(self, repo_id, clone_url=None, owner=None, desc=None,
- homepage=None, user=None, via_ssh=None):
- user = user or self.get_default_user()
- if user is None:
- raise RuntimeError(f'neither explicit or default {self.provider_name()} username was provided')
- name = Repo.extract_repo_name(repo_id)
- if clone_url is None:
- if via_ssh is None:
- via_ssh = Repo.DEFAULT_VIA_SSH
- if via_ssh:
- clone_url = self.build_clone_url_ssh(user, name)
- else:
- clone_url = self.build_clone_url_https(user, name)
- if homepage is None:
- homepage = self.build_homepage_url(user, name)
- super().__init__(repo_id, clone_url, owner=owner, desc=desc,
- homepage=homepage)
-
- @abc.abstractmethod
- def provider_name(self):
- pass
-
- @abc.abstractmethod
- def get_default_user(self):
- pass
-
- @abc.abstractmethod
- def build_clone_url_ssh(self, user, name):
- pass
-
- @abc.abstractmethod
- def build_clone_url_https(self, user, name):
- pass
-
- @abc.abstractmethod
- def build_homepage_url(self, user, name):
- pass
-
-
-class GithubRepo(HostedRepo):
- DEFAULT_USER = None
-
- def provider_name(self):
- return 'GitHub'
-
- def get_default_user(self):
- return GithubRepo.DEFAULT_USER
-
- def build_clone_url_ssh(self, user, name):
- return f'ssh://git@github.com/{user}/{name}.git'
-
- def build_clone_url_https(self, user, name):
- return f'https://github.com/{user}/{name}.git'
-
- def build_homepage_url(self, user, name):
- return f'https://github.com/{user}/{name}'
-
-
-class BitbucketRepo(HostedRepo):
- DEFAULT_USER = None
-
- def provider_name(self):
- return 'Bitbucket'
-
- def get_default_user(self):
- return BitbucketRepo.DEFAULT_USER
-
- def build_clone_url_ssh(self, user, name):
- return f'ssh://git@bitbucket.org/{user}/{name}.git'
-
- def build_clone_url_https(self, user, name):
- return f'https://bitbucket.org/{user}/{name}.git'
-
- def build_homepage_url(self, user, name):
- return f'https://bitbucket.org/{user}/{name.lower()}'
diff --git a/cgit/repos/utils.py b/cgit/repos/utils.py
deleted file mode 100644
index 84337e8..0000000
--- a/cgit/repos/utils.py
+++ /dev/null
@@ -1,43 +0,0 @@
-# Copyright (c) 2018 Egor Tensin <Egor.Tensin@gmail.com>
-# This file is part of the "cgit repos" project.
-# For details, see https://github.com/egor-tensin/cgit-repos.
-# Distributed under the MIT License.
-
-import contextlib
-import logging
-import os
-import subprocess
-
-
-def check_output(*args, stdout=subprocess.PIPE, **kwargs):
- try:
- result = subprocess.run(args, stdout=stdout, stderr=subprocess.STDOUT,
- encoding='utf-8', check=True, **kwargs)
- if stdout != subprocess.DEVNULL:
- if result.stdout is None:
- logging.debug('%s', args)
- else:
- logging.debug('%s\n%s', args, result.stdout)
- return result.returncode == 0, result.stdout
- except subprocess.CalledProcessError as e:
- if stdout != subprocess.DEVNULL:
- logging.error('%s\n%s', e, e.output)
- return e.returncode == 0, e.output
-
-
-def run(*args, discard_output=False, **kwargs):
- if discard_output:
- success, _ = check_output(*args, stdout=subprocess.DEVNULL, **kwargs)
- else:
- success, _ = check_output(*args, **kwargs)
- return success
-
-
-@contextlib.contextmanager
-def chdir(new_cwd):
- old_cwd = os.getcwd()
- os.chdir(new_cwd)
- try:
- yield
- finally:
- os.chdir(old_cwd)