aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/cgitize/utils.py
blob: 64b419ae539348300163c7a6d6899eaa9d6c9ed9 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# Copyright (c) 2018 Egor Tensin <Egor.Tensin@gmail.com>
# This file is part of the "cgitize" project.
# For details, see https://github.com/egor-tensin/cgitize.
# Distributed under the MIT License.

from contextlib import contextmanager
import logging
import os
import stat
import subprocess
import sys
from urllib.parse import urlsplit, urlunsplit


@contextmanager
def setup_logging(verbose=False):
    level = logging.DEBUG if verbose else logging.INFO
    logging.basicConfig(
        level=level,
        datefmt='%Y-%m-%d %H:%M:%S',
        format='%(asctime)s | %(levelname)s | %(message)s',
        # Log to stdout, because that's where subprocess's output goes (so that
        # the don't get interleaved).
        stream=sys.stdout)
    try:
        yield
    except Exception as e:
        logging.exception(e)
        raise


def run(*args, capture_output=False, **kwargs):
    stdout = None
    stderr = None
    if capture_output:
        stdout = subprocess.PIPE
        stderr = subprocess.STDOUT

    logging.debug('%s', args)
    result = subprocess.run(args, check=True, stdout=stdout, stderr=stderr,
                            encoding='utf-8', **kwargs)

    if result.stdout is not None:
        logging.debug('\n%s', result.stdout)
    return result.stdout


def try_run(*args, **kwargs):
    try:
        run(*args, **kwargs)
        return True
    except subprocess.CalledProcessError as e:
        return e.returncode == 0


def run_capture(*args, **kwargs):
    return run(*args, capture_output=True, **kwargs)


def try_run_capture(*args, **kwargs):
    try:
        return True, run(*args, capture_output=True, **kwargs)
    except subprocess.CalledProcessError as e:
        return e.returncode == 0, e.output


@contextmanager
def chdir(new_cwd):
    old_cwd = os.getcwd()
    os.chdir(new_cwd)
    try:
        yield
    finally:
        os.chdir(old_cwd)


@contextmanager
def protected_file(path):
    # 0600:
    new_permissions = stat.S_IRUSR | stat.S_IWUSR
    if os.path.exists(path):
        old_permissions = stat.S_IMODE(os.stat(path).st_mode)
        os.chmod(path, new_permissions)
        try:
            yield
        finally:
            os.chmod(path, old_permissions)
    else:
        with open(path, mode='w'):
            pass
        os.chmod(path, new_permissions)
        try:
            yield
        finally:
            os.unlink(path)


def url_replace_auth(url, username, password=None):
    parts = urlsplit(url)
    netloc = username
    if password is not None:
        netloc += f':{password}'
    netloc += f'@{parts.hostname}'
    if parts.port is not None:
        netloc += f':{parts.port}'
    parts = parts._replace(netloc=netloc)
    return urlunsplit(parts)


def url_remove_auth(url):
    parts = urlsplit(url)
    netloc = parts.hostname
    parts = parts._replace(netloc=netloc)
    return urlunsplit(parts)