File size: 5,232 Bytes
9d9c6fd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d13e7d7
 
 
 
 
9d9c6fd
d13e7d7
 
 
 
 
 
 
 
9d9c6fd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import uvicorn
import json

# Import your existing environment
from cloud_arena_final import CloudArenaEnv 

app = FastAPI()

# Initialize the global environment instance
env = CloudArenaEnv([0], [0])

# --- Translation Dictionaries ---
# Map text actions from the LLM back to your integer Action Space
ACTION_MAP = {
    "NOOP": 0, "ANALYZE": 1, "VERIFY_DEPS": 2, "RESIZE_DOWN": 3, 
    "RESIZE_UP": 4, "STOP": 5, "RESTART": 6, "DELETE": 7, 
    "PATCH": 8, "ENCRYPT": 9, "RESTRICT": 10, "ROTATE_CREDS": 11, 
    "ENABLE_LOG": 12, "ARCHIVE": 13, "OPT_NET": 14
}

class ActionRequest(BaseModel):
    action_text: str  # e.g., "RESIZE_DOWN res_2"

def format_semantic_obs(env_instance):
    """
    Translates the internal Python objects into a structured text prompt 
    that an LLM can read and reason over.
    """
    obs_dict = {
        "global_state": {
            "step_count": env_instance.step_count,
            "chaos_active": env_instance.chaos_active,
            "total_system_cost_score": round(sum(r.get_cost() for r in env_instance.resources), 2),
            "total_security_risk": round(env_instance._risk_aggregate(), 2)
        },
        "resources": {}
    }

    for r in env_instance.resources:
        if r.is_deleted:
            continue
            
        res_id = f"res_{r.idx}"
        
        # Base attributes that are ALWAYS visible
        res_data = {
            "health": "Operational" if r.health else "BROKEN",
            "criticality": "HIGH" if r.criticality == 1.0 else "MED" if r.criticality == 0.6 else "LOW",
            "status": "Stopped" if r.is_stopped else ("Running" if r.activity_status > 0.5 else "Idle"),
            "data_staleness": round(r.staleness, 2)
        }

        # Handle Fog of War masking
        if r.fog_active:
            res_data["fog_of_war"] = "ACTIVE - Execute ANALYZE to reveal metrics"
            res_data["cost_rate"] = "HIDDEN"
            res_data["security_risk"] = "HIDDEN"
            res_data["exposure"] = "HIDDEN"
        else:
            res_data["fog_of_war"] = "LIFTED"
            res_data["cost_rate"] = round(r.cost_rate, 2)
            res_data["security_risk"] = round(r.risk_score, 2)
            res_data["exposure"] = round(r.exposure, 2)
            
            # If the LLM has analyzed it, we can also give it helpful semantic hints
            if r.usage < r.allocated - 0.10 and not r.is_stopped:
                res_data["efficiency_hint"] = "Overprovisioned (Can be Resized Down)"

        obs_dict["resources"][res_id] = res_data

    # We return it as a JSON-formatted string so the LLM reads it as text
    return json.dumps(obs_dict, indent=2)


# Track the previous veto count to know if the CURRENT action was vetoed
server_state = {"last_veto_count": 0}



class ResetRequest(BaseModel):
    scenario_id: int = 0  # Default to 0 (normal random episode)

@app.post("/reset")
def reset(req: ResetRequest = None):
    # If no request body is sent, default to scenario 0
    scenario = req.scenario_id if req else 0
    
    # Pass the scenario option to your original environment
    obs, info = env.reset(options={"scenario": scenario})
    
    server_state["last_veto_count"] = env.veto_count
    return {"observation": format_semantic_obs(env)}

@app.post("/step")
def step(req: ActionRequest):
    try:
        # 1. Parse the text action (e.g., "PATCH res_3")
        parts = req.action_text.strip().split()
        if len(parts) == 1 and parts[0] == "NOOP":
            act_type_str, res_id_str = "NOOP", "0"
        else:
            act_type_str, res_id_str = parts[0], parts[1].replace("res_", "")
        
        # 2. Convert to your internal discrete action ID
        atype = ACTION_MAP[act_type_str]
        ridx = int(res_id_str)
        discrete_action = (atype * 10) + ridx # MAX_RESOURCES is 10
        
        # 3. Step the environment
        obs, reward, terminated, truncated, info = env.step(discrete_action)
        
        # 4. Handle Vetoes (Check if the count went up THIS step!)
        current_vetoes = env.veto_count
        if current_vetoes > server_state["last_veto_count"]: 
            server_state["last_veto_count"] = current_vetoes
            error_msg = f"Action {req.action_text} failed: Constraints not met (e.g., trying to modify an active critical resource, or resource doesn't need this action)."
            return {"observation": error_msg, "reward": -0.1, "done": False}
        
        server_state["last_veto_count"] = current_vetoes

        # 5. Return standard OpenEnv semantic response
        # FIXED: Pass the 'env' object directly
        return {
            "observation": format_semantic_obs(env),
            "reward": float(reward),
            "done": bool(terminated or truncated),
            "info": info
        }

    except KeyError:
        return {"observation": f"Syntax Error: Invalid action type. Use one of {list(ACTION_MAP.keys())}", "reward": -0.5, "done": False}
    except Exception as e:
        return {"observation": f"Execution Error: {str(e)}", "reward": -0.5, "done": False}

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)