aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/cgitize/cgit.py
blob: fe715d5058d5ef0ae0dba187ae1546947fa1e30d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# Copyright (c) 2018 Egor Tensin <Egor.Tensin@gmail.com>
# This file is part of the "cgitize" project.
# For details, see https://github.com/egor-tensin/cgitize.
# Distributed under the MIT License.

import logging
import os
import shutil

from cgitize.git import Git
from cgitize.utils import chdir


class CGitServer:
    def __init__(self, clone_url):
        self.clone_url = clone_url

    def get_clone_url(self, repo):
        if self.clone_url is None:
            return None
        return self.clone_url.format(repo_id=repo.repo_id)


class CGitRCWriter:
    def __init__(self, cgit_server):
        self.cgit_server = cgit_server

    @staticmethod
    def get_path(repo_dir):
        return os.path.join(repo_dir, 'cgitrc')

    def write(self, repo_dir, repo):
        with open(self.get_path(repo_dir), 'w') as fd:
            self._write_field(fd, 'clone-url', self._build_clone_url(repo))
            self._write_field(fd, 'owner', repo.owner)
            self._write_field(fd, 'desc', repo.desc)
            self._write_field(fd, 'homepage', repo.homepage)

    @staticmethod
    def _write_field(fd, field, value):
        if value is None:
            return
        fd.write(f'{field}={value}\n')

    def _build_clone_url(self, repo):
        clone_urls = []
        if repo.clone_url is not None:
            clone_urls.append(repo.clone_url)
        cgit_clone_url = self.cgit_server.get_clone_url(repo)
        if cgit_clone_url is not None:
            clone_urls.append(cgit_clone_url)
        if not clone_urls:
            return None
        clone_urls = ' '.join(clone_urls)
        return clone_urls


class AgeFile:
    @staticmethod
    def write(repo_dir):
        timestamp = AgeFile.get_age(repo_dir)
        if timestamp:
            os.makedirs(AgeFile.get_dir(repo_dir), exist_ok=True)
            with open(AgeFile.get_path(repo_dir), mode='w') as fd:
                fd.write(f'{timestamp}')

    @staticmethod
    def get_age(repo_dir):
        # https://git.zx2c4.com/cgit/tree/contrib/hooks/post-receive.agefile
        with chdir(repo_dir):
            success, output = Git.capture('for-each-ref', '--sort=-authordate', '--count=1', '--format=%(authordate:iso8601)')
            if not success:
                logging.error("Couldn't get the timestamp of the newest commit in repository: %s", repo_dir)
                return None
            return output

    @staticmethod
    def get_dir(repo_dir):
        return os.path.join(repo_dir, 'info', 'web')

    @staticmethod
    def get_path(repo_dir):
        return os.path.join(AgeFile.get_dir(repo_dir), 'last-modified')


class CGitRepositories:
    def __init__(self, dir, cgit_server, force=False):
        self.dir = self._make_dir(dir)
        self.cgitrc = CGitRCWriter(cgit_server)
        self.force = force

    @staticmethod
    def _make_dir(rel_path):
        abs_path = os.path.abspath(rel_path)
        os.makedirs(abs_path, exist_ok=True)
        return abs_path

    def get_repo_dir(self, repo):
        return os.path.join(self.dir, repo.repo_id)

    def update(self, repo):
        success = self._mirror_or_update(repo)
        if success:
            self.cgitrc.write(self.get_repo_dir(repo), repo)
            AgeFile.write(self.get_repo_dir(repo))
        return success

    def _mirror_or_update(self, repo):
        repo_dir = self.get_repo_dir(repo)

        if not os.path.isdir(repo_dir):
            # The local directory doesn't exist, mirror the new repository.
            return self._mirror(repo)

        with chdir(repo_dir):
            if not Git.check('rev-parse', '--is-inside-work-tree'):
                # Overwrite the existing directory if it's not a Git repository.
                logging.warning('Local directory is not a repository, going to overwrite it: %s', repo_dir)
                return self._mirror(repo)

            success, output = Git.capture('config', '--get', 'remote.origin.url')
            if not success:
                # Every repository managed by this script should have the
                # 'origin' remote. If it doesn't, it's trash. Overwrite the
                # existing directory, mirroring the repository in it.
                logging.warning("Local repository doesn't have remote 'origin', going to overwrite it: %s", repo_dir)
                return self._mirror(repo)

            if f'{repo.clone_url}\n' != output:
                # Jeez, there's a proper local repository in the target
                # directory already with a different upstream; something's
                # wrong, fix it manually.
                logging.warning("Existing repository '%s' doesn't match the specified clone URL: %s", repo.repo_id, repo.clone_url)
                if self.force:
                    # Unless --force was specified, in which case we overwrite
                    # the repository.
                    return self._fix_upstream_url(repo) and self._update_existing(repo)
                return False

            # The local directory contains the local version of the upstream,
            # update it.
            return self._update_existing(repo)

    def _mirror(self, repo):
        logging.info("Mirroring repository '%s' from: %s", repo.repo_id, repo.clone_url)
        repo_dir = self.get_repo_dir(repo)
        if os.path.isdir(repo_dir):
            try:
                shutil.rmtree(repo_dir)
            except Exception as e:
                logging.exception(e)
                return False
        with Git.setup_auth(repo):
            return Git.check('clone', '--mirror', '--quiet', repo.clone_url, repo_dir)

    def _fix_upstream_url(self, repo):
        repo_dir = self.get_repo_dir(repo)
        with chdir(repo_dir):
            return Git.check('remote', 'set-url', 'origin', repo.clone_url)

    def _update_existing(self, repo):
        logging.info("Updating repository '%s'", repo.repo_id)
        repo_dir = self.get_repo_dir(repo)
        with chdir(repo_dir):
            with Git.setup_auth(repo):
                if not Git.check('remote', 'update', '--prune'):
                    return False
            # In case the local repository is not a bare repository, but a
            # full-fledged working copy:
            if Git.check('rev-parse', '--verify', '--quiet', 'origin/master'):
                return Git.check('reset', '--soft', 'origin/master')
            return True