File size: 3,785 Bytes
c745a99
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
"""MiniStack-backed simulator strategy."""

from __future__ import annotations

import logging
import os
import shlex
import subprocess

import httpx

from server.services.environment_strategy import EnvironmentStrategy

logger = logging.getLogger(__name__)

_DEFAULT_URL = os.getenv("AWS_INFRA_URL", "http://localhost:4566")


class SimulatorStrategy(EnvironmentStrategy):
    def __init__(self, aws_infra_url: str = _DEFAULT_URL) -> None:
        self._aws_infra_url = aws_infra_url

    def reset_environment(self) -> None:
        try:
            resp = httpx.post(f"{self._aws_infra_url}/_ministack/reset", timeout=10)
            resp.raise_for_status()
            logger.info("MiniStack state reset successfully")
        except httpx.HTTPError as e:
            logger.warning("Failed to reset MiniStack state: %s", e)
            raise

    def get_infra_state(self) -> dict:
        try:
            resp = httpx.get(f"{self._aws_infra_url}/_ministack/state", timeout=10)
            resp.raise_for_status()
            return resp.json()
        except httpx.HTTPError as e:
            logger.warning("Failed to fetch MiniStack state: %s", e)
            return {}

    def get_service_help(self, service_name: str) -> tuple[bool, str]:
        try:
            resp = httpx.get(
                f"{self._aws_infra_url}/_ministack/handlers/{service_name}",
                timeout=10,
            )
            resp.raise_for_status()
            data = resp.json()
            lines = [
                f"SERVICE: {data['service']}",
                "",
                "DESCRIPTION",
                data.get("description", "No description available."),
                "",
                f"AVAILABLE ACTIONS ({data['action_count']}):",
                "",
            ]
            for action in data.get("supported_actions", []):
                lines.append(f"  - {action}")
            state = data.get("state", {})
            if state:
                lines.append("")
                lines.append("CURRENT STATE:")
                for resource, info in state.items():
                    count = info.get("count", 0)
                    names = info.get("names", info.get("ids", info.get("arns", [])))
                    lines.append(f"  {resource}: {count}")
                    if names:
                        for n in names[:20]:
                            lines.append(f"    - {n}")
                        if len(names) > 20:
                            lines.append(f"    ... and {len(names) - 20} more")
            return True, "\n".join(lines)
        except httpx.HTTPStatusError as e:
            if e.response.status_code == 404:
                return False, f"Unknown service: {service_name}"
            return False, f"Failed to fetch service help: {e}"
        except httpx.HTTPError as e:
            return False, f"Failed to fetch service help: {e}"

    def execute_command(self, command: str) -> tuple[bool, str, str]:
        env = {
            **os.environ,
            "AWS_ENDPOINT_URL": self._aws_infra_url,
            "AWS_ACCESS_KEY_ID": "test",
            "AWS_SECRET_ACCESS_KEY": "test",
            "AWS_DEFAULT_REGION": "us-east-1",
        }
        print(
            f"Executing command: {command} with env AWS_ENDPOINT_URL={self._aws_infra_url}"
        )
        try:
            result = subprocess.run(
                shlex.split(command),
                capture_output=True,
                text=True,
                timeout=30,
                env=env,
            )
            return result.returncode == 0, result.stdout, result.stderr
        except subprocess.TimeoutExpired:
            return False, "", "Command timed out after 30s"
        except Exception as e:
            return False, "", str(e)