diff options
author | Egor Tensin <Egor.Tensin@gmail.com> | 2021-03-29 19:26:29 +0300 |
---|---|---|
committer | Egor Tensin <Egor.Tensin@gmail.com> | 2021-03-29 20:05:05 +0300 |
commit | d720a04082760bc8ce075fd4387d6c708b5794c9 (patch) | |
tree | 7f9fa93e0d94b922cb13750e1673cc4e539485af | |
parent | don't leak access tokens on the command line (diff) | |
download | cgitize-d720a04082760bc8ce075fd4387d6c708b5794c9.tar.gz cgitize-d720a04082760bc8ce075fd4387d6c708b5794c9.zip |
add cgitize.git, refactor subprocess utilities
-rw-r--r-- | cgitize/cgit.py | 26 | ||||
-rw-r--r-- | cgitize/git.py | 24 | ||||
-rw-r--r-- | cgitize/utils.py | 47 |
3 files changed, 60 insertions, 37 deletions
diff --git a/cgitize/cgit.py b/cgitize/cgit.py index 8db5ef9..712bc55 100644 --- a/cgitize/cgit.py +++ b/cgitize/cgit.py @@ -11,21 +11,10 @@ import os.path import shutil import stat +from cgitize.git import Git import cgitize.utils as utils -GIT_ENV = os.environ.copy() -GIT_ENV['GIT_SSH_COMMAND'] = 'ssh -oBatchMode=yes -oLogLevel=QUIET -oStrictHostKeyChecking=no -oUserKnownHostsFile=/dev/null' - - -def git(*args, **kwargs): - return utils.run('git', *args, env=GIT_ENV, **kwargs) - - -def git_stdout(*args, **kwargs): - return utils.check_output('git', *args, env=GIT_ENV, **kwargs) - - @contextmanager def setup_git_auth(repo): if not repo.url_auth: @@ -135,10 +124,10 @@ class Output: if not os.path.isdir(repo_dir): return RepoVerdict.SHOULD_MIRROR with utils.chdir(repo_dir): - if not git('rev-parse', '--is-inside-work-tree', discard_output=True): + if not Git.check('rev-parse', '--is-inside-work-tree'): logging.warning('Not a repository, so going to mirror: %s', repo_dir) return RepoVerdict.SHOULD_MIRROR - success, output = git_stdout('config', '--get', 'remote.origin.url') + success, output = Git.capture('config', '--get', 'remote.origin.url') if not success: # Every repository managed by this script should have the # 'origin' remote. If it doesn't, it's trash. @@ -161,18 +150,17 @@ class Output: logging.exception(e) return False with setup_git_auth(repo): - return git('clone', '--mirror', repo.clone_url, repo_dir) + return Git.check('clone', '--mirror', '--quiet', repo.clone_url, repo_dir) def update(self, repo): logging.info("Updating repository '%s'", repo.repo_id) repo_dir = self.get_repo_dir(repo) with utils.chdir(repo_dir): with setup_git_auth(repo): - if not git('remote', 'update', '--prune'): - return False - if git('rev-parse', '--verify', '--quiet', 'origin/master', discard_output=True): - if not git('reset', '--soft', 'origin/master'): + if not Git.check('remote', 'update', '--prune'): return False + if Git.check('rev-parse', '--verify', '--quiet', 'origin/master'): + return Git.check('reset', '--soft', 'origin/master') return True diff --git a/cgitize/git.py b/cgitize/git.py new file mode 100644 index 0000000..87554dd --- /dev/null +++ b/cgitize/git.py @@ -0,0 +1,24 @@ +# Copyright (c) 2021 Egor Tensin <Egor.Tensin@gmail.com> +# This file is part of the "cgitize" project. +# For details, see https://github.com/egor-tensin/cgitize. +# Distributed under the MIT License. + +import os + +import cgitize.utils as utils + + +GIT_ENV = os.environ.copy() +GIT_ENV['GIT_SSH_COMMAND'] = 'ssh -oBatchMode=yes -oLogLevel=QUIET -oStrictHostKeyChecking=no -oUserKnownHostsFile=/dev/null' + + +class Git: + EXE = 'git' + + @staticmethod + def check(*args, **kwargs): + return utils.try_run(Git.EXE, *args, env=GIT_ENV, **kwargs) + + @staticmethod + def capture(*args, **kwargs): + return utils.try_run_capture(Git.EXE, *args, env=GIT_ENV, **kwargs) diff --git a/cgitize/utils.py b/cgitize/utils.py index 5ff8475..478a35d 100644 --- a/cgitize/utils.py +++ b/cgitize/utils.py @@ -9,28 +9,39 @@ import os import subprocess -def check_output(*args, stdout=subprocess.PIPE, **kwargs): +def run(*args, capture_output=False, **kwargs): + stdout = None + stderr = None + if capture_output: + stdout = subprocess.PIPE + stderr = subprocess.STDOUT + + logging.debug('%s', args) + result = subprocess.run(args, check=True, stdout=stdout, stderr=stderr, + encoding='utf-8', **kwargs) + + if result.stdout is not None: + logging.debug('\n%s', result.stdout) + return result.stdout + + +def try_run(*args, **kwargs): try: - result = subprocess.run(args, stdout=stdout, stderr=subprocess.STDOUT, - encoding='utf-8', check=True, **kwargs) - if stdout != subprocess.DEVNULL: - if result.stdout is None: - logging.debug('%s', args) - else: - logging.debug('%s\n%s', args, result.stdout) - return result.returncode == 0, result.stdout + run(*args, **kwargs) + return True except subprocess.CalledProcessError as e: - if stdout != subprocess.DEVNULL: - logging.error('%s\n%s', e, e.output) - return e.returncode == 0, e.output + return e.returncode == 0 -def run(*args, discard_output=False, **kwargs): - if discard_output: - success, _ = check_output(*args, stdout=subprocess.DEVNULL, **kwargs) - else: - success, _ = check_output(*args, **kwargs) - return success +def run_capture(*args, **kwargs): + return run(*args, capture_output=True, **kwargs) + + +def try_run_capture(*args, **kwargs): + try: + return True, run(*args, capture_output=True, **kwargs) + except subprocess.CalledProcessError as e: + return e.returncode == 0, e.output @contextlib.contextmanager |