aboutsummaryrefslogtreecommitdiff
path: root/python/pip_install/tools/wheel_installer/wheel.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pip_install/tools/wheel_installer/wheel.py')
-rw-r--r--python/pip_install/tools/wheel_installer/wheel.py383
1 files changed, 369 insertions, 14 deletions
diff --git a/python/pip_install/tools/wheel_installer/wheel.py b/python/pip_install/tools/wheel_installer/wheel.py
index 84af04c..efd916d 100644
--- a/python/pip_install/tools/wheel_installer/wheel.py
+++ b/python/pip_install/tools/wheel_installer/wheel.py
@@ -13,18 +13,372 @@
# limitations under the License.
"""Utility class to inspect an extracted wheel directory"""
+
import email
-from typing import Dict, Optional, Set, Tuple
+import platform
+import re
+import sys
+from collections import defaultdict
+from dataclasses import dataclass
+from enum import Enum
+from pathlib import Path
+from typing import Any, Dict, List, Optional, Set, Tuple, Union
import installer
-import pkg_resources
+from packaging.requirements import Requirement
from pip._vendor.packaging.utils import canonicalize_name
+class OS(Enum):
+ linux = 1
+ osx = 2
+ windows = 3
+ darwin = osx
+ win32 = windows
+
+
+class Arch(Enum):
+ x86_64 = 1
+ x86_32 = 2
+ aarch64 = 3
+ ppc = 4
+ s390x = 5
+ amd64 = x86_64
+ arm64 = aarch64
+ i386 = x86_32
+ i686 = x86_32
+ x86 = x86_32
+ ppc64le = ppc
+
+
+@dataclass(frozen=True)
+class Platform:
+ os: OS
+ arch: Optional[Arch] = None
+
+ @classmethod
+ def all(cls, want_os: Optional[OS] = None) -> List["Platform"]:
+ return sorted(
+ [
+ cls(os=os, arch=arch)
+ for os in OS
+ for arch in Arch
+ if not want_os or want_os == os
+ ]
+ )
+
+ @classmethod
+ def host(cls) -> List["Platform"]:
+ """Use the Python interpreter to detect the platform.
+
+ We extract `os` from sys.platform and `arch` from platform.machine
+
+ Returns:
+ A list of parsed values which makes the signature the same as
+ `Platform.all` and `Platform.from_string`.
+ """
+ return [
+ cls(
+ os=OS[sys.platform.lower()],
+ # FIXME @aignas 2023-12-13: Hermetic toolchain on Windows 3.11.6
+ # is returning an empty string here, so lets default to x86_64
+ arch=Arch[platform.machine().lower() or "x86_64"],
+ )
+ ]
+
+ def __lt__(self, other: Any) -> bool:
+ """Add a comparison method, so that `sorted` returns the most specialized platforms first."""
+ if not isinstance(other, Platform) or other is None:
+ raise ValueError(f"cannot compare {other} with Platform")
+
+ if self.arch is None and other.arch is not None:
+ return True
+
+ if self.arch is not None and other.arch is None:
+ return True
+
+ # Here we ensure that we sort by OS before sorting by arch
+
+ if self.arch is None and other.arch is None:
+ return self.os.value < other.os.value
+
+ if self.os.value < other.os.value:
+ return True
+
+ if self.os.value == other.os.value:
+ return self.arch.value < other.arch.value
+
+ return False
+
+ def __str__(self) -> str:
+ if self.arch is None:
+ return f"@platforms//os:{self.os.name.lower()}"
+
+ return self.os.name.lower() + "_" + self.arch.name.lower()
+
+ @classmethod
+ def from_string(cls, platform: Union[str, List[str]]) -> List["Platform"]:
+ """Parse a string and return a list of platforms"""
+ platform = [platform] if isinstance(platform, str) else list(platform)
+ ret = set()
+ for p in platform:
+ if p == "host":
+ ret.update(cls.host())
+ elif p == "all":
+ ret.update(cls.all())
+ elif p.endswith("*"):
+ os, _, _ = p.partition("_")
+ ret.update(cls.all(OS[os]))
+ else:
+ os, _, arch = p.partition("_")
+ ret.add(cls(os=OS[os], arch=Arch[arch]))
+
+ return sorted(ret)
+
+ # NOTE @aignas 2023-12-05: below is the minimum number of accessors that are defined in
+ # https://peps.python.org/pep-0496/ to make rules_python generate dependencies.
+ #
+ # WARNING: It may not work in cases where the python implementation is different between
+ # different platforms.
+
+ # derived from OS
+ @property
+ def os_name(self) -> str:
+ if self.os == OS.linux or self.os == OS.osx:
+ return "posix"
+ elif self.os == OS.windows:
+ return "nt"
+ else:
+ return ""
+
+ @property
+ def sys_platform(self) -> str:
+ if self.os == OS.linux:
+ return "linux"
+ elif self.os == OS.osx:
+ return "darwin"
+ elif self.os == OS.windows:
+ return "win32"
+ else:
+ return ""
+
+ @property
+ def platform_system(self) -> str:
+ if self.os == OS.linux:
+ return "Linux"
+ elif self.os == OS.osx:
+ return "Darwin"
+ elif self.os == OS.windows:
+ return "Windows"
+
+ # derived from OS and Arch
+ @property
+ def platform_machine(self) -> str:
+ """Guess the target 'platform_machine' marker.
+
+ NOTE @aignas 2023-12-05: this may not work on really new systems, like
+ Windows if they define the platform markers in a different way.
+ """
+ if self.arch == Arch.x86_64:
+ return "x86_64"
+ elif self.arch == Arch.x86_32 and self.os != OS.osx:
+ return "i386"
+ elif self.arch == Arch.x86_32:
+ return ""
+ elif self.arch == Arch.aarch64 and self.os == OS.linux:
+ return "aarch64"
+ elif self.arch == Arch.aarch64:
+ # Assuming that OSX and Windows use this one since the precedent is set here:
+ # https://github.com/cgohlke/win_arm64-wheels
+ return "arm64"
+ elif self.os != OS.linux:
+ return ""
+ elif self.arch == Arch.ppc64le:
+ return "ppc64le"
+ elif self.arch == Arch.s390x:
+ return "s390x"
+ else:
+ return ""
+
+ def env_markers(self, extra: str) -> Dict[str, str]:
+ return {
+ "extra": extra,
+ "os_name": self.os_name,
+ "sys_platform": self.sys_platform,
+ "platform_machine": self.platform_machine,
+ "platform_system": self.platform_system,
+ "platform_release": "", # unset
+ "platform_version": "", # unset
+ # we assume that the following are the same as the interpreter used to setup the deps:
+ # "implementation_version": "X.Y.Z",
+ # "implementation_name": "cpython"
+ # "python_version": "X.Y",
+ # "python_full_version": "X.Y.Z",
+ # "platform_python_implementation: "CPython",
+ }
+
+
+@dataclass(frozen=True)
+class FrozenDeps:
+ deps: List[str]
+ deps_select: Dict[str, List[str]]
+
+
+class Deps:
+ def __init__(
+ self,
+ name: str,
+ extras: Optional[Set[str]] = None,
+ platforms: Optional[Set[Platform]] = None,
+ ):
+ self.name: str = Deps._normalize(name)
+ self._deps: Set[str] = set()
+ self._select: Dict[Platform, Set[str]] = defaultdict(set)
+ self._want_extras: Set[str] = extras or {""} # empty strings means no extras
+ self._platforms: Set[Platform] = platforms or set()
+
+ def _add(self, dep: str, platform: Optional[Platform]):
+ dep = Deps._normalize(dep)
+
+ # Packages may create dependency cycles when specifying optional-dependencies / 'extras'.
+ # Example: github.com/google/etils/blob/a0b71032095db14acf6b33516bca6d885fe09e35/pyproject.toml#L32.
+ if dep == self.name:
+ return
+
+ if platform:
+ self._select[platform].add(dep)
+ else:
+ self._deps.add(dep)
+
+ @staticmethod
+ def _normalize(name: str) -> str:
+ return re.sub(r"[-_.]+", "_", name).lower()
+
+ def add(self, *wheel_reqs: str) -> None:
+ reqs = [Requirement(wheel_req) for wheel_req in wheel_reqs]
+
+ # Resolve any extra extras due to self-edges
+ self._want_extras = self._resolve_extras(reqs)
+
+ # process self-edges first to resolve the extras used
+ for req in reqs:
+ self._add_req(req)
+
+ def _resolve_extras(self, reqs: List[Requirement]) -> Set[str]:
+ """Resolve extras which are due to depending on self[some_other_extra].
+
+ Some packages may have cyclic dependencies resulting from extras being used, one example is
+ `elint`, where we have one set of extras as aliases for other extras
+ and we have an extra called 'all' that includes all other extras.
+
+ When the `requirements.txt` is generated by `pip-tools`, then it is likely that
+ this step is not needed, but for other `requirements.txt` files this may be useful.
+
+ NOTE @aignas 2023-12-08: the extra resolution is not platform dependent, but
+ in order for it to become platform dependent we would have to have separate targets for each extra in
+ self._want_extras.
+ """
+ extras = self._want_extras
+
+ self_reqs = []
+ for req in reqs:
+ if Deps._normalize(req.name) != self.name:
+ continue
+
+ if req.marker is None:
+ # I am pretty sure we cannot reach this code as it does not
+ # make sense to specify packages in this way, but since it is
+ # easy to handle, lets do it.
+ #
+ # TODO @aignas 2023-12-08: add a test
+ extras = extras | req.extras
+ else:
+ # process these in a separate loop
+ self_reqs.append(req)
+
+ # A double loop is not strictly optimal, but always correct without recursion
+ for req in self_reqs:
+ if any(req.marker.evaluate({"extra": extra}) for extra in extras):
+ extras = extras | req.extras
+ else:
+ continue
+
+ # Iterate through all packages to ensure that we include all of the extras from previously
+ # visited packages.
+ for req_ in self_reqs:
+ if any(req_.marker.evaluate({"extra": extra}) for extra in extras):
+ extras = extras | req_.extras
+
+ return extras
+
+ def _add_req(self, req: Requirement) -> None:
+ extras = self._want_extras
+
+ if req.marker is None:
+ self._add(req.name, None)
+ return
+
+ marker_str = str(req.marker)
+
+ # NOTE @aignas 2023-12-08: in order to have reasonable select statements
+ # we do have to have some parsing of the markers, so it begs the question
+ # if packaging should be reimplemented in Starlark to have the best solution
+ # for now we will implement it in Python and see what the best parsing result
+ # can be before making this decision.
+ if not self._platforms or not any(
+ tag in marker_str
+ for tag in [
+ "os_name",
+ "sys_platform",
+ "platform_machine",
+ "platform_system",
+ ]
+ ):
+ if any(req.marker.evaluate({"extra": extra}) for extra in extras):
+ self._add(req.name, None)
+ return
+
+ for plat in self._platforms:
+ if not any(
+ req.marker.evaluate(plat.env_markers(extra)) for extra in extras
+ ):
+ continue
+
+ if "platform_machine" in marker_str:
+ self._add(req.name, plat)
+ else:
+ self._add(req.name, Platform(plat.os))
+
+ def build(self) -> FrozenDeps:
+ if not self._select:
+ return FrozenDeps(
+ deps=sorted(self._deps),
+ deps_select={},
+ )
+
+ # Get all of the OS-specific dependencies applicable to all architectures
+ select = {
+ p: deps for p, deps in self._select.items() if deps and p.arch is None
+ }
+ # Now add them to all arch specific dependencies
+ select.update(
+ {
+ p: deps | select.get(Platform(p.os), set())
+ for p, deps in self._select.items()
+ if deps and p.arch is not None
+ }
+ )
+
+ return FrozenDeps(
+ deps=sorted(self._deps),
+ deps_select={str(p): sorted(deps) for p, deps in sorted(select.items())},
+ )
+
+
class Wheel:
"""Representation of the compressed .whl file"""
- def __init__(self, path: str):
+ def __init__(self, path: Path):
self._path = path
@property
@@ -70,19 +424,20 @@ class Wheel:
return entry_points_mapping
- def dependencies(self, extras_requested: Optional[Set[str]] = None) -> Set[str]:
- dependency_set = set()
-
+ def dependencies(
+ self,
+ extras_requested: Set[str] = None,
+ platforms: Optional[Set[Platform]] = None,
+ ) -> FrozenDeps:
+ dependency_set = Deps(
+ self.name,
+ extras=extras_requested,
+ platforms=platforms,
+ )
for wheel_req in self.metadata.get_all("Requires-Dist", []):
- req = pkg_resources.Requirement(wheel_req) # type: ignore
-
- if req.marker is None or any(
- req.marker.evaluate({"extra": extra})
- for extra in extras_requested or [""]
- ):
- dependency_set.add(req.name) # type: ignore
+ dependency_set.add(wheel_req)
- return dependency_set
+ return dependency_set.build()
def unzip(self, directory: str) -> None:
installation_schemes = {