| |
| |
|
|
| from __future__ import annotations |
|
|
| import contextlib |
| import ctypes |
| import ctypes.util |
| import os |
| from typing import TYPE_CHECKING, cast |
|
|
| from cuda.pathfinder._dynamic_libs.load_dl_common import LoadedDL |
|
|
| if TYPE_CHECKING: |
| from cuda.pathfinder._dynamic_libs.lib_descriptor import LibDescriptor |
|
|
| CDLL_MODE = os.RTLD_NOW | os.RTLD_GLOBAL |
|
|
|
|
| def _load_libdl() -> ctypes.CDLL: |
| |
| |
| |
| |
| name = ctypes.util.find_library("dl") or "libdl.so.2" |
| try: |
| return ctypes.CDLL(name) |
| except OSError as e: |
| raise RuntimeError(f"Could not load {name!r} (required for dlinfo/dlerror on Linux)") from e |
|
|
|
|
| LIBDL = _load_libdl() |
|
|
| |
| LIBDL.dlinfo.argtypes = [ctypes.c_void_p, ctypes.c_int, ctypes.c_void_p] |
| LIBDL.dlinfo.restype = ctypes.c_int |
|
|
| |
| LIBDL.dlerror.argtypes = [] |
| LIBDL.dlerror.restype = ctypes.c_char_p |
|
|
| |
| RTLD_DI_LINKMAP = 2 |
| RTLD_DI_ORIGIN = 6 |
|
|
|
|
| class _LinkMapLNameView(ctypes.Structure): |
| """ |
| Prefix-only view of glibc's `struct link_map` used **solely** to read `l_name`. |
| |
| Background: |
| - `dlinfo(handle, RTLD_DI_LINKMAP, ...)` returns a `struct link_map*`. |
| - The first few members of `struct link_map` (including `l_name`) have been |
| stable on glibc for decades and are documented as debugger-visible. |
| - We only need the offset/layout of `l_name`, not the full struct. |
| |
| Safety constraints: |
| - This is a **partial** definition (prefix). It must only be used via a pointer |
| returned by `dlinfo(...)`. |
| - Do **not** instantiate it or pass it **by value** to any C function. |
| - Do **not** access any members beyond those declared here. |
| - Do **not** rely on `ctypes.sizeof(LinkMapPrefix)` for allocation. |
| |
| Rationale: |
| - Defining only the leading fields avoids depending on internal/unstable |
| tail members while keeping code more readable than raw pointer arithmetic. |
| """ |
|
|
| _fields_ = ( |
| ("l_addr", ctypes.c_void_p), |
| ("l_name", ctypes.c_char_p), |
| ) |
|
|
|
|
| |
| assert _LinkMapLNameView.l_addr.offset == 0 |
| assert _LinkMapLNameView.l_name.offset == ctypes.sizeof(ctypes.c_void_p) |
|
|
|
|
| def _dl_last_error() -> str | None: |
| msg_bytes = cast(bytes | None, LIBDL.dlerror()) |
| if not msg_bytes: |
| return None |
| |
| return msg_bytes.decode("utf-8", "surrogateescape") |
|
|
|
|
| def l_name_for_dynamic_library(libname: str, handle: ctypes.CDLL) -> str: |
| lm_view = ctypes.POINTER(_LinkMapLNameView)() |
| rc = LIBDL.dlinfo(ctypes.c_void_p(handle._handle), RTLD_DI_LINKMAP, ctypes.byref(lm_view)) |
| if rc != 0: |
| err = _dl_last_error() |
| raise OSError(f"dlinfo failed for {libname=!r} (rc={rc})" + (f": {err}" if err else "")) |
| if not lm_view: |
| raise OSError(f"dlinfo returned NULL link_map pointer for {libname=!r}") |
|
|
| l_name_bytes = lm_view.contents.l_name |
| if not l_name_bytes: |
| raise OSError(f"dlinfo returned empty link_map->l_name for {libname=!r}") |
|
|
| path = os.fsdecode(l_name_bytes) |
| if not path: |
| raise OSError(f"dlinfo returned empty l_name string for {libname=!r}") |
|
|
| return path |
|
|
|
|
| def l_origin_for_dynamic_library(libname: str, handle: ctypes.CDLL) -> str: |
| l_origin_buf = ctypes.create_string_buffer(4096) |
| rc = LIBDL.dlinfo(ctypes.c_void_p(handle._handle), RTLD_DI_ORIGIN, l_origin_buf) |
| if rc != 0: |
| err = _dl_last_error() |
| raise OSError(f"dlinfo failed for {libname=!r} (rc={rc})" + (f": {err}" if err else "")) |
|
|
| path = os.fsdecode(l_origin_buf.value) |
| if not path: |
| raise OSError(f"dlinfo returned empty l_origin string for {libname=!r}") |
|
|
| return path |
|
|
|
|
| def abs_path_for_dynamic_library(libname: str, handle: ctypes.CDLL) -> str: |
| l_name = l_name_for_dynamic_library(libname, handle) |
| l_origin = l_origin_for_dynamic_library(libname, handle) |
| return os.path.join(l_origin, os.path.basename(l_name)) |
|
|
|
|
| def _candidate_sonames(desc: LibDescriptor) -> list[str]: |
| |
| candidates = list(reversed(desc.linux_sonames)) |
| candidates.append(f"lib{desc.name}.so") |
| return candidates |
|
|
|
|
| def check_if_already_loaded_from_elsewhere(desc: LibDescriptor, _have_abs_path: bool) -> LoadedDL | None: |
| for soname in _candidate_sonames(desc): |
| try: |
| handle = ctypes.CDLL(soname, mode=os.RTLD_NOLOAD) |
| except OSError: |
| continue |
| else: |
| return LoadedDL( |
| abs_path_for_dynamic_library(desc.name, handle), |
| True, |
| handle._handle, |
| "was-already-loaded-from-elsewhere", |
| ) |
| return None |
|
|
|
|
| def _load_lib(desc: LibDescriptor, filename: str) -> ctypes.CDLL: |
| cdll_mode = CDLL_MODE |
| if desc.requires_rtld_deepbind: |
| cdll_mode |= os.RTLD_DEEPBIND |
| return ctypes.CDLL(filename, cdll_mode) |
|
|
|
|
| def load_with_system_search(desc: LibDescriptor) -> LoadedDL | None: |
| """Try to load a library using system search paths. |
| |
| Args: |
| libname: The name of the library to load |
| |
| Returns: |
| A LoadedDL object if successful, None if the library cannot be loaded |
| |
| Raises: |
| RuntimeError: If the library is loaded but no expected symbol is found |
| """ |
| for soname in _candidate_sonames(desc): |
| try: |
| handle = _load_lib(desc, soname) |
| except OSError: |
| pass |
| else: |
| abs_path = abs_path_for_dynamic_library(desc.name, handle) |
| if abs_path is None: |
| raise RuntimeError(f"No expected symbol for libname={desc.name!r}") |
| return LoadedDL(abs_path, False, handle._handle, "system-search") |
| return None |
|
|
|
|
| def _work_around_known_bugs(libname: str, found_path: str) -> None: |
| if libname == "nvrtc": |
| |
| |
| |
| |
| |
| |
| |
| dirname, basename = os.path.split(found_path) |
| if basename == "libnvrtc.so.13": |
| dep_basename = "libnvrtc-builtins.so.13.0" |
| dep_path = os.path.join(dirname, dep_basename) |
| if os.path.isfile(dep_path): |
| |
| with contextlib.suppress(OSError): |
| ctypes.CDLL(dep_path, CDLL_MODE) |
|
|
|
|
| def load_with_abs_path(desc: LibDescriptor, found_path: str, found_via: str | None = None) -> LoadedDL: |
| """Load a dynamic library from the given path. |
| |
| Args: |
| desc: Descriptor for the library to load. |
| found_path: The absolute path to the library file. |
| found_via: Label indicating how the path was discovered. |
| |
| Returns: |
| A LoadedDL object representing the loaded library. |
| |
| Raises: |
| RuntimeError: If the library cannot be loaded. |
| """ |
| _work_around_known_bugs(desc.name, found_path) |
| try: |
| handle = _load_lib(desc, found_path) |
| except OSError as e: |
| raise RuntimeError(f"Failed to dlopen {found_path}: {e}") from e |
| return LoadedDL(found_path, False, handle._handle, found_via) |
|
|