File size: 5,147 Bytes
3e107da
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
"""Package installation and checking for the configured environment."""

from __future__ import annotations

import asyncio
import logging
from pathlib import Path

from app.config import Settings, get_settings
from app.models import PackageCheckResult

logger = logging.getLogger(__name__)


class PackageManager:
    """Manages package installation and checking."""

    def __init__(self, settings: Settings | None = None):
        self._settings = settings or get_settings()
        self._install_lock = asyncio.Lock()

    async def install_packages(self, packages: list[str]) -> dict:
        """Install packages into the configured environment."""
        async with self._install_lock:
            cmd = self._settings.get_pip_command() + packages
            logger.info("Installing packages: %s", " ".join(packages))

            try:
                process = await asyncio.create_subprocess_exec(
                    *cmd,
                    stdout=asyncio.subprocess.PIPE,
                    stderr=asyncio.subprocess.PIPE,
                )

                stdout_bytes, stderr_bytes = await asyncio.wait_for(
                    process.communicate(),
                    timeout=300,  # 5 minute timeout for installs
                )

                stdout = stdout_bytes.decode("utf-8", errors="replace")
                stderr = stderr_bytes.decode("utf-8", errors="replace")
                success = process.returncode == 0

                if success:
                    logger.info("Successfully installed: %s", ", ".join(packages))
                else:
                    logger.error("Failed to install packages: %s", stderr)

                return {
                    "success": success,
                    "stdout": stdout,
                    "stderr": stderr,
                    "return_code": process.returncode,
                    "packages": packages,
                }
            except asyncio.TimeoutError:
                return {
                    "success": False,
                    "stderr": "Package installation timed out after 300s",
                    "stdout": "",
                    "return_code": -1,
                    "packages": packages,
                }
            except Exception as e:
                logger.exception("Error installing packages")
                return {
                    "success": False,
                    "stderr": str(e),
                    "stdout": "",
                    "return_code": -1,
                    "packages": packages,
                }

    async def check_packages(self, packages: list[str]) -> list[PackageCheckResult]:
        """Check if packages are installed."""
        python_exec = self._settings.get_python_executable()

        # Build a single script that checks all packages
        check_script = "import importlib, json\nresults = {}\n"
        for pkg in packages:
            # Normalize package name for import
            import_name = pkg.split("==")[0].split(">=")[0].split("<=")[0].split("[")[0]
            import_name = import_name.replace("-", "_").lower()
            check_script += f"""
try:
    mod = importlib.import_module("{import_name}")
    version = getattr(mod, "__version__", "unknown")
    results["{pkg}"] = {{"installed": True, "version": version}}
except ImportError:
    results["{pkg}"] = {{"installed": False, "version": None}}
"""
        check_script += "print(json.dumps(results))\n"

        try:
            if self._settings.env_type.value == "conda":
                cmd = f"{python_exec} -c {repr(check_script)}"
                process = await asyncio.create_subprocess_shell(
                    cmd,
                    stdout=asyncio.subprocess.PIPE,
                    stderr=asyncio.subprocess.PIPE,
                )
            else:
                process = await asyncio.create_subprocess_exec(
                    python_exec,
                    "-c",
                    check_script,
                    stdout=asyncio.subprocess.PIPE,
                    stderr=asyncio.subprocess.PIPE,
                )

            stdout_bytes, _ = await asyncio.wait_for(
                process.communicate(), timeout=30
            )

            import json
            results_data = json.loads(stdout_bytes.decode("utf-8").strip())

            return [
                PackageCheckResult(
                    package=pkg,
                    installed=info["installed"],
                    version=info.get("version"),
                )
                for pkg, info in results_data.items()
            ]
        except Exception as e:
            logger.exception("Error checking packages")
            return [
                PackageCheckResult(package=pkg, installed=False) for pkg in packages
            ]


_manager: PackageManager | None = None


def get_package_manager() -> PackageManager:
    global _manager
    if _manager is None:
        _manager = PackageManager()
    return _manager


def reset_package_manager(settings: Settings | None = None) -> PackageManager:
    global _manager
    _manager = PackageManager(settings)
    return _manager