Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, HTTPException | |
| from pydantic import BaseModel | |
| import uvicorn | |
| import json | |
| # Import your existing environment | |
| from cloud_arena_final import CloudArenaEnv | |
| app = FastAPI() | |
| # Initialize the global environment instance | |
| env = CloudArenaEnv([0], [0]) | |
| # --- Translation Dictionaries --- | |
| # Map text actions from the LLM back to your integer Action Space | |
| ACTION_MAP = { | |
| "NOOP": 0, "ANALYZE": 1, "VERIFY_DEPS": 2, "RESIZE_DOWN": 3, | |
| "RESIZE_UP": 4, "STOP": 5, "RESTART": 6, "DELETE": 7, | |
| "PATCH": 8, "ENCRYPT": 9, "RESTRICT": 10, "ROTATE_CREDS": 11, | |
| "ENABLE_LOG": 12, "ARCHIVE": 13, "OPT_NET": 14 | |
| } | |
| class ActionRequest(BaseModel): | |
| action_text: str # e.g., "RESIZE_DOWN res_2" | |
| def format_semantic_obs(env_instance): | |
| """ | |
| Translates the internal Python objects into a structured text prompt | |
| that an LLM can read and reason over. | |
| """ | |
| obs_dict = { | |
| "global_state": { | |
| "step_count": env_instance.step_count, | |
| "chaos_active": env_instance.chaos_active, | |
| "total_system_cost_score": round(sum(r.get_cost() for r in env_instance.resources), 2), | |
| "total_security_risk": round(env_instance._risk_aggregate(), 2) | |
| }, | |
| "resources": {} | |
| } | |
| for r in env_instance.resources: | |
| if r.is_deleted: | |
| continue | |
| res_id = f"res_{r.idx}" | |
| # Base attributes that are ALWAYS visible | |
| res_data = { | |
| "health": "Operational" if r.health else "BROKEN", | |
| "criticality": "HIGH" if r.criticality == 1.0 else "MED" if r.criticality == 0.6 else "LOW", | |
| "status": "Stopped" if r.is_stopped else ("Running" if r.activity_status > 0.5 else "Idle"), | |
| "data_staleness": round(r.staleness, 2) | |
| } | |
| # Handle Fog of War masking | |
| if r.fog_active: | |
| res_data["fog_of_war"] = "ACTIVE - Execute ANALYZE to reveal metrics" | |
| res_data["cost_rate"] = "HIDDEN" | |
| res_data["security_risk"] = "HIDDEN" | |
| res_data["exposure"] = "HIDDEN" | |
| else: | |
| res_data["fog_of_war"] = "LIFTED" | |
| res_data["cost_rate"] = round(r.cost_rate, 2) | |
| res_data["security_risk"] = round(r.risk_score, 2) | |
| res_data["exposure"] = round(r.exposure, 2) | |
| # If the LLM has analyzed it, we can also give it helpful semantic hints | |
| if r.usage < r.allocated - 0.10 and not r.is_stopped: | |
| res_data["efficiency_hint"] = "Overprovisioned (Can be Resized Down)" | |
| obs_dict["resources"][res_id] = res_data | |
| # We return it as a JSON-formatted string so the LLM reads it as text | |
| return json.dumps(obs_dict, indent=2) | |
| # Track the previous veto count to know if the CURRENT action was vetoed | |
| server_state = {"last_veto_count": 0} | |
| class ResetRequest(BaseModel): | |
| scenario_id: int = 0 # Default to 0 (normal random episode) | |
| def reset(req: ResetRequest = None): | |
| # If no request body is sent, default to scenario 0 | |
| scenario = req.scenario_id if req else 0 | |
| # Pass the scenario option to your original environment | |
| obs, info = env.reset(options={"scenario": scenario}) | |
| server_state["last_veto_count"] = env.veto_count | |
| return {"observation": format_semantic_obs(env)} | |
| def step(req: ActionRequest): | |
| try: | |
| # 1. Parse the text action (e.g., "PATCH res_3") | |
| parts = req.action_text.strip().split() | |
| if len(parts) == 1 and parts[0] == "NOOP": | |
| act_type_str, res_id_str = "NOOP", "0" | |
| else: | |
| act_type_str, res_id_str = parts[0], parts[1].replace("res_", "") | |
| # 2. Convert to your internal discrete action ID | |
| atype = ACTION_MAP[act_type_str] | |
| ridx = int(res_id_str) | |
| discrete_action = (atype * 10) + ridx # MAX_RESOURCES is 10 | |
| # 3. Step the environment | |
| obs, reward, terminated, truncated, info = env.step(discrete_action) | |
| # 4. Handle Vetoes (Check if the count went up THIS step!) | |
| current_vetoes = env.veto_count | |
| if current_vetoes > server_state["last_veto_count"]: | |
| server_state["last_veto_count"] = current_vetoes | |
| error_msg = f"Action {req.action_text} failed: Constraints not met (e.g., trying to modify an active critical resource, or resource doesn't need this action)." | |
| return {"observation": error_msg, "reward": -0.1, "done": False} | |
| server_state["last_veto_count"] = current_vetoes | |
| # 5. Return standard OpenEnv semantic response | |
| # FIXED: Pass the 'env' object directly | |
| return { | |
| "observation": format_semantic_obs(env), | |
| "reward": float(reward), | |
| "done": bool(terminated or truncated), | |
| "info": info | |
| } | |
| except KeyError: | |
| return {"observation": f"Syntax Error: Invalid action type. Use one of {list(ACTION_MAP.keys())}", "reward": -0.5, "done": False} | |
| except Exception as e: | |
| return {"observation": f"Execution Error: {str(e)}", "reward": -0.5, "done": False} | |
| if __name__ == "__main__": | |
| uvicorn.run(app, host="0.0.0.0", port=8000) |