import os import json import logging from fastapi import FastAPI, HTTPException from pydantic import BaseModel from typing import Dict, Any, Optional import uvicorn # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = FastAPI() # Global state for the environment class EnvironmentState: def __init__(self): self.reset() def reset(self): self.step_count = 0 self.current_observation = { "anomaly_score": 0.0, "verdict": "normal", "alerts": [] } self.done = False self.info = {} logger.info("Environment reset") def step(self, action: Dict[str, Any]) -> Dict[str, Any]: """ Process one step of the environment. action: dictionary containing e.g., {"packet_data": "..."} Returns: observation, reward, done, info """ self.step_count += 1 # --- REPLACE THIS WITH YOUR ACTUAL MAYONE DETECTION LOGIC --- # Here you would call your framework's functions. # For now, a dummy detection: packet_data = action.get("packet_data", "") if "malicious" in packet_data.lower() or "attack" in packet_data.lower(): anomaly_score = 0.92 verdict = "malicious" alerts = ["Potential C2 beacon detected"] else: anomaly_score = 0.12 verdict = "normal" alerts = [] observation = { "anomaly_score": anomaly_score, "verdict": verdict, "alerts": alerts, "step": self.step_count } reward = 1.0 if verdict == "malicious" else 0.0 done = self.step_count >= 100 # optional max steps info = {"model": os.getenv("MODEL_NAME", "MayOne")} self.current_observation = observation self.done = done self.info = info logger.info(f"Step {self.step_count}: verdict={verdict}") return { "observation": observation, "reward": reward, "done": done, "info": info } def get_state(self) -> Dict[str, Any]: return { "step_count": self.step_count, "current_observation": self.current_observation, "done": self.done, "info": self.info } # Initialize global environment env = EnvironmentState() # --- API Endpoints required by OpenEnv --- class ResetRequest(BaseModel): config: Optional[Dict[str, Any]] = None class StepRequest(BaseModel): action: Dict[str, Any] @app.post("/reset") async def reset_endpoint(request: ResetRequest = None): """Reset the environment to initial state.""" env.reset() # Optionally apply config if provided if request and request.config: logger.info(f"Applying config: {request.config}") return {"status": "ok", "observation": env.current_observation} @app.post("/step") async def step_endpoint(request: StepRequest): """Take one step in the environment.""" result = env.step(request.action) return result @app.get("/state") async def state_endpoint(): """Get current state of the environment.""" return env.get_state() @app.get("/health") async def health(): return {"status": "alive"} if __name__ == "__main__": port = int(os.getenv("PORT", 7860)) uvicorn.run(app, host="0.0.0.0", port=port)