Spaces:
Runtime error
Runtime error
| import os | |
| import json | |
| import logging | |
| from fastapi import FastAPI, HTTPException | |
| from pydantic import BaseModel | |
| from typing import Dict, Any, Optional | |
| import uvicorn | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| app = FastAPI() | |
| # Global state for the environment | |
| class EnvironmentState: | |
| def __init__(self): | |
| self.reset() | |
| def reset(self): | |
| self.step_count = 0 | |
| self.current_observation = { | |
| "anomaly_score": 0.0, | |
| "verdict": "normal", | |
| "alerts": [] | |
| } | |
| self.done = False | |
| self.info = {} | |
| logger.info("Environment reset") | |
| def step(self, action: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Process one step of the environment. | |
| action: dictionary containing e.g., {"packet_data": "..."} | |
| Returns: observation, reward, done, info | |
| """ | |
| self.step_count += 1 | |
| # --- REPLACE THIS WITH YOUR ACTUAL MAYONE DETECTION LOGIC --- | |
| # Here you would call your framework's functions. | |
| # For now, a dummy detection: | |
| packet_data = action.get("packet_data", "") | |
| if "malicious" in packet_data.lower() or "attack" in packet_data.lower(): | |
| anomaly_score = 0.92 | |
| verdict = "malicious" | |
| alerts = ["Potential C2 beacon detected"] | |
| else: | |
| anomaly_score = 0.12 | |
| verdict = "normal" | |
| alerts = [] | |
| observation = { | |
| "anomaly_score": anomaly_score, | |
| "verdict": verdict, | |
| "alerts": alerts, | |
| "step": self.step_count | |
| } | |
| reward = 1.0 if verdict == "malicious" else 0.0 | |
| done = self.step_count >= 100 # optional max steps | |
| info = {"model": os.getenv("MODEL_NAME", "MayOne")} | |
| self.current_observation = observation | |
| self.done = done | |
| self.info = info | |
| logger.info(f"Step {self.step_count}: verdict={verdict}") | |
| return { | |
| "observation": observation, | |
| "reward": reward, | |
| "done": done, | |
| "info": info | |
| } | |
| def get_state(self) -> Dict[str, Any]: | |
| return { | |
| "step_count": self.step_count, | |
| "current_observation": self.current_observation, | |
| "done": self.done, | |
| "info": self.info | |
| } | |
| # Initialize global environment | |
| env = EnvironmentState() | |
| # --- API Endpoints required by OpenEnv --- | |
| class ResetRequest(BaseModel): | |
| config: Optional[Dict[str, Any]] = None | |
| class StepRequest(BaseModel): | |
| action: Dict[str, Any] | |
| async def reset_endpoint(request: ResetRequest = None): | |
| """Reset the environment to initial state.""" | |
| env.reset() | |
| # Optionally apply config if provided | |
| if request and request.config: | |
| logger.info(f"Applying config: {request.config}") | |
| return {"status": "ok", "observation": env.current_observation} | |
| async def step_endpoint(request: StepRequest): | |
| """Take one step in the environment.""" | |
| result = env.step(request.action) | |
| return result | |
| async def state_endpoint(): | |
| """Get current state of the environment.""" | |
| return env.get_state() | |
| async def health(): | |
| return {"status": "alive"} | |
| if __name__ == "__main__": | |
| port = int(os.getenv("PORT", 7860)) | |
| uvicorn.run(app, host="0.0.0.0", port=port) |