Spaces:
Sleeping
Sleeping
| """Package installation and checking for the configured environment.""" | |
| from __future__ import annotations | |
| import asyncio | |
| import logging | |
| from pathlib import Path | |
| from app.config import Settings, get_settings | |
| from app.models import PackageCheckResult | |
| logger = logging.getLogger(__name__) | |
| class PackageManager: | |
| """Manages package installation and checking.""" | |
| def __init__(self, settings: Settings | None = None): | |
| self._settings = settings or get_settings() | |
| self._install_lock = asyncio.Lock() | |
| async def install_packages(self, packages: list[str]) -> dict: | |
| """Install packages into the configured environment.""" | |
| async with self._install_lock: | |
| cmd = self._settings.get_pip_command() + packages | |
| logger.info("Installing packages: %s", " ".join(packages)) | |
| try: | |
| process = await asyncio.create_subprocess_exec( | |
| *cmd, | |
| stdout=asyncio.subprocess.PIPE, | |
| stderr=asyncio.subprocess.PIPE, | |
| ) | |
| stdout_bytes, stderr_bytes = await asyncio.wait_for( | |
| process.communicate(), | |
| timeout=300, # 5 minute timeout for installs | |
| ) | |
| stdout = stdout_bytes.decode("utf-8", errors="replace") | |
| stderr = stderr_bytes.decode("utf-8", errors="replace") | |
| success = process.returncode == 0 | |
| if success: | |
| logger.info("Successfully installed: %s", ", ".join(packages)) | |
| else: | |
| logger.error("Failed to install packages: %s", stderr) | |
| return { | |
| "success": success, | |
| "stdout": stdout, | |
| "stderr": stderr, | |
| "return_code": process.returncode, | |
| "packages": packages, | |
| } | |
| except asyncio.TimeoutError: | |
| return { | |
| "success": False, | |
| "stderr": "Package installation timed out after 300s", | |
| "stdout": "", | |
| "return_code": -1, | |
| "packages": packages, | |
| } | |
| except Exception as e: | |
| logger.exception("Error installing packages") | |
| return { | |
| "success": False, | |
| "stderr": str(e), | |
| "stdout": "", | |
| "return_code": -1, | |
| "packages": packages, | |
| } | |
| async def check_packages(self, packages: list[str]) -> list[PackageCheckResult]: | |
| """Check if packages are installed.""" | |
| python_exec = self._settings.get_python_executable() | |
| # Build a single script that checks all packages | |
| check_script = "import importlib, json\nresults = {}\n" | |
| for pkg in packages: | |
| # Normalize package name for import | |
| import_name = pkg.split("==")[0].split(">=")[0].split("<=")[0].split("[")[0] | |
| import_name = import_name.replace("-", "_").lower() | |
| check_script += f""" | |
| try: | |
| mod = importlib.import_module("{import_name}") | |
| version = getattr(mod, "__version__", "unknown") | |
| results["{pkg}"] = {{"installed": True, "version": version}} | |
| except ImportError: | |
| results["{pkg}"] = {{"installed": False, "version": None}} | |
| """ | |
| check_script += "print(json.dumps(results))\n" | |
| try: | |
| if self._settings.env_type.value == "conda": | |
| cmd = f"{python_exec} -c {repr(check_script)}" | |
| process = await asyncio.create_subprocess_shell( | |
| cmd, | |
| stdout=asyncio.subprocess.PIPE, | |
| stderr=asyncio.subprocess.PIPE, | |
| ) | |
| else: | |
| process = await asyncio.create_subprocess_exec( | |
| python_exec, | |
| "-c", | |
| check_script, | |
| stdout=asyncio.subprocess.PIPE, | |
| stderr=asyncio.subprocess.PIPE, | |
| ) | |
| stdout_bytes, _ = await asyncio.wait_for( | |
| process.communicate(), timeout=30 | |
| ) | |
| import json | |
| results_data = json.loads(stdout_bytes.decode("utf-8").strip()) | |
| return [ | |
| PackageCheckResult( | |
| package=pkg, | |
| installed=info["installed"], | |
| version=info.get("version"), | |
| ) | |
| for pkg, info in results_data.items() | |
| ] | |
| except Exception as e: | |
| logger.exception("Error checking packages") | |
| return [ | |
| PackageCheckResult(package=pkg, installed=False) for pkg in packages | |
| ] | |
| _manager: PackageManager | None = None | |
| def get_package_manager() -> PackageManager: | |
| global _manager | |
| if _manager is None: | |
| _manager = PackageManager() | |
| return _manager | |
| def reset_package_manager(settings: Settings | None = None) -> PackageManager: | |
| global _manager | |
| _manager = PackageManager(settings) | |
| return _manager |