aboutsummaryrefslogtreecommitdiff
blob: 587e4a57a3ebfe5284db84ce7bf49846ef57acee (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
# Copyright 1999-2024 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2

from _emerge.AsynchronousLock import AsynchronousLock
from _emerge.CompositeTask import CompositeTask
from _emerge.SpawnProcess import SpawnProcess
from urllib.parse import urlparse as urllib_parse_urlparse
import stat
import sys
import portage
from portage import os
from portage.binpkg import get_binpkg_format
from portage.exception import FileNotFound
from portage.util._async.AsyncTaskFuture import AsyncTaskFuture
from portage.util._async.FileCopier import FileCopier
from portage.util._pty import _create_pty_or_pipe


class BinpkgFetcher(CompositeTask):
    __slots__ = ("pkg", "pretend", "logfile", "pkg_path", "pkg_allocated_path")

    def __init__(self, **kwargs):
        CompositeTask.__init__(self, **kwargs)

        pkg = self.pkg
        bintree = pkg.root_config.trees["bintree"]
        instance_key = bintree.dbapi._instance_key(pkg.cpv)

        binpkg_path = bintree._remotepkgs[instance_key].get("PATH")
        if not binpkg_path:
            raise FileNotFound(
                f"PATH not found in the binpkg index, the binhost's portage is probably out of date."
            )
        binpkg_format = get_binpkg_format(binpkg_path)

        self.pkg_allocated_path = pkg.root_config.trees["bintree"].getname(
            pkg.cpv, allocate_new=True, remote_binpkg_format=binpkg_format
        )
        self.pkg_path = self.pkg_allocated_path + ".partial"

    def _start(self):
        self._start_task(
            AsyncTaskFuture(future=self._main(), scheduler=self.scheduler),
            self._main_exit,
        )

    async def _main(self) -> int:
        """
        Main coroutine which saves the binary package to self.pkg_path
        and returns the exit status of the fetcher or copier.

        @rtype: int
        @return: Exit status of fetcher or copier.
        """
        pkg = self.pkg
        bintree = pkg.root_config.trees["bintree"]

        fetcher = _BinpkgFetcherProcess(
            background=self.background,
            logfile=self.logfile,
            pkg=self.pkg,
            pkg_path=self.pkg_path,
            pretend=self.pretend,
            scheduler=self.scheduler,
        )

        if not self.pretend:
            portage.util.ensure_dirs(os.path.dirname(self.pkg_path))
            if "distlocks" in self.pkg.root_config.settings.features:
                await fetcher.async_lock()

        try:
            if bintree._remote_has_index:
                remote_metadata = bintree._remotepkgs[
                    bintree.dbapi._instance_key(pkg.cpv)
                ]
                rel_uri = remote_metadata.get("PATH")
                if not rel_uri:
                    # Assume that the remote index is out of date. No path should
                    # never happen in new portage versions.
                    rel_uri = pkg.cpv + ".tbz2"
                remote_base_uri = remote_metadata["BASE_URI"]
                uri = remote_base_uri.rstrip("/") + "/" + rel_uri.lstrip("/")
            else:
                raise FileNotFound("Binary packages index not found")

            uri_parsed = urllib_parse_urlparse(uri)

            copier = None
            if not self.pretend and uri_parsed.scheme in ("", "file"):
                copier = FileCopier(
                    src_path=uri_parsed.path,
                    dest_path=self.pkg_path,
                    scheduler=self.scheduler,
                )
                copier.start()
                try:
                    await copier.async_wait()
                    copier.future.result()
                except FileNotFoundError:
                    await self.scheduler.async_output(
                        f"!!! File not found: {uri_parsed.path}\n",
                        log_file=self.logfile,
                        background=self.background,
                    )
                finally:
                    if copier.isAlive():
                        copier.cancel()
                if copier.returncode == os.EX_OK:
                    fetcher.sync_timestamp()
            else:
                fetcher.start()
                try:
                    await fetcher.async_wait()
                finally:
                    if fetcher.isAlive():
                        fetcher.cancel()

                if not self.pretend and fetcher.returncode == os.EX_OK:
                    fetcher.sync_timestamp()
        finally:
            if fetcher.locked:
                await fetcher.async_unlock()

        return fetcher.returncode if copier is None else copier.returncode

    def _main_exit(self, main_task):
        if not main_task.cancelled:
            # Use the fetcher or copier returncode.
            main_task.returncode = main_task.future.result()
        self._default_final_exit(main_task)


class _BinpkgFetcherProcess(SpawnProcess):
    __slots__ = ("pkg", "pretend", "locked", "pkg_path", "_lock_obj")

    def _start(self):
        pkg = self.pkg
        pretend = self.pretend
        bintree = pkg.root_config.trees["bintree"]
        settings = bintree.settings
        pkg_path = self.pkg_path

        exists = os.path.exists(pkg_path)
        resume = exists and os.path.basename(pkg_path) in bintree.invalids
        if not (pretend or resume):
            # Remove existing file or broken symlink.
            try:
                os.unlink(pkg_path)
            except OSError:
                pass

        # urljoin doesn't work correctly with
        # unrecognized protocols like sftp
        fetchcommand = None
        resumecommand = None
        if bintree._remote_has_index:
            remote_metadata = bintree._remotepkgs[bintree.dbapi._instance_key(pkg.cpv)]
            rel_uri = remote_metadata.get("PATH")
            if not rel_uri:
                # Assume that the remote index is out of date. No path should
                # never happen in new portage versions.
                rel_uri = pkg.cpv + ".tbz2"
            remote_base_uri = remote_metadata["BASE_URI"]
            uri = remote_base_uri.rstrip("/") + "/" + rel_uri.lstrip("/")
            fetchcommand = remote_metadata.get("FETCHCOMMAND")
            resumecommand = remote_metadata.get("RESUMECOMMAND")
        else:
            raise FileNotFound("Binary packages index not found")

        if pretend:
            portage.writemsg_stdout(f"\n{uri}\n", noiselevel=-1)
            self.returncode = os.EX_OK
            self._async_wait()
            return

        fcmd = None
        if resume:
            fcmd = resumecommand
        else:
            fcmd = fetchcommand
        if fcmd is None:
            protocol = urllib_parse_urlparse(uri)[0]
            fcmd_prefix = "FETCHCOMMAND"
            if resume:
                fcmd_prefix = "RESUMECOMMAND"
            fcmd = settings.get(fcmd_prefix + "_" + protocol.upper())
            if not fcmd:
                fcmd = settings.get(fcmd_prefix)

        fcmd_vars = {
            "DISTDIR": os.path.dirname(pkg_path),
            "URI": uri,
            "FILE": os.path.basename(pkg_path),
        }

        for k in ("PORTAGE_SSH_OPTS",):
            v = settings.get(k)
            if v is not None:
                fcmd_vars[k] = v

        fetch_env = dict(settings.items())
        fetch_args = [
            portage.util.varexpand(x, mydict=fcmd_vars)
            for x in portage.util.shlex_split(fcmd)
        ]

        if self.fd_pipes is None:
            self.fd_pipes = {}
        fd_pipes = self.fd_pipes

        # Redirect all output to stdout since some fetchers like
        # wget pollute stderr (if portage detects a problem then it
        # can send it's own message to stderr).
        fd_pipes.setdefault(0, portage._get_stdin().fileno())
        fd_pipes.setdefault(1, sys.__stdout__.fileno())
        fd_pipes.setdefault(2, sys.__stdout__.fileno())

        self.args = fetch_args
        self.env = fetch_env
        if settings.selinux_enabled():
            self._selinux_type = settings["PORTAGE_FETCH_T"]
        self.log_filter_file = settings.get("PORTAGE_LOG_FILTER_FILE_CMD")
        SpawnProcess._start(self)

    def _pipe(self, fd_pipes):
        """When appropriate, use a pty so that fetcher progress bars,
        like wget has, will work properly."""
        if self.background or not sys.__stdout__.isatty():
            # When the output only goes to a log file,
            # there's no point in creating a pty.
            return os.pipe()
        stdout_pipe = None
        if not self.background:
            stdout_pipe = fd_pipes.get(1)
        self._pty_ready, master_fd, slave_fd = _create_pty_or_pipe(
            copy_term_size=stdout_pipe
        )
        return (master_fd, slave_fd)

    def sync_timestamp(self):
        # If possible, update the mtime to match the remote package if
        # the fetcher didn't already do it automatically.
        bintree = self.pkg.root_config.trees["bintree"]
        if bintree._remote_has_index:
            remote_mtime = bintree._remotepkgs[
                bintree.dbapi._instance_key(self.pkg.cpv)
            ].get("_mtime_")
            if remote_mtime is not None:
                try:
                    remote_mtime = int(remote_mtime)
                except ValueError:
                    pass
                else:
                    try:
                        local_mtime = os.stat(self.pkg_path)[stat.ST_MTIME]
                    except OSError:
                        pass
                    else:
                        if remote_mtime != local_mtime:
                            try:
                                os.utime(self.pkg_path, (remote_mtime, remote_mtime))
                            except OSError:
                                pass

    def async_lock(self):
        """
        This raises an AlreadyLocked exception if lock() is called
        while a lock is already held. In order to avoid this, call
        unlock() or check whether the "locked" attribute is True
        or False before calling lock().
        """
        if self._lock_obj is not None:
            raise self.AlreadyLocked((self._lock_obj,))

        result = self.scheduler.create_future()

        def acquired_lock(async_lock):
            if async_lock.wait() == os.EX_OK:
                self.locked = True
                result.set_result(None)
            else:
                result.set_exception(
                    AssertionError(
                        f"AsynchronousLock failed with returncode {async_lock.returncode}"
                    )
                )

        self._lock_obj = AsynchronousLock(path=self.pkg_path, scheduler=self.scheduler)
        self._lock_obj.addExitListener(acquired_lock)
        self._lock_obj.start()
        return result

    class AlreadyLocked(portage.exception.PortageException):
        pass

    def async_unlock(self):
        if self._lock_obj is None:
            raise AssertionError("already unlocked")
        result = self._lock_obj.async_unlock()
        self._lock_obj = None
        self.locked = False
        return result