theghostcmd's picture
Upload app.py
745e8b5 verified
Raw
History Blame Contribute Delete
3.46 kB
import os
import json
import logging
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Dict, Any, Optional
import uvicorn
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI()
# Global state for the environment
class EnvironmentState:
def __init__(self):
self.reset()
def reset(self):
self.step_count = 0
self.current_observation = {
"anomaly_score": 0.0,
"verdict": "normal",
"alerts": []
}
self.done = False
self.info = {}
logger.info("Environment reset")
def step(self, action: Dict[str, Any]) -> Dict[str, Any]:
"""
Process one step of the environment.
action: dictionary containing e.g., {"packet_data": "..."}
Returns: observation, reward, done, info
"""
self.step_count += 1
# --- REPLACE THIS WITH YOUR ACTUAL MAYONE DETECTION LOGIC ---
# Here you would call your framework's functions.
# For now, a dummy detection:
packet_data = action.get("packet_data", "")
if "malicious" in packet_data.lower() or "attack" in packet_data.lower():
anomaly_score = 0.92
verdict = "malicious"
alerts = ["Potential C2 beacon detected"]
else:
anomaly_score = 0.12
verdict = "normal"
alerts = []
observation = {
"anomaly_score": anomaly_score,
"verdict": verdict,
"alerts": alerts,
"step": self.step_count
}
reward = 1.0 if verdict == "malicious" else 0.0
done = self.step_count >= 100 # optional max steps
info = {"model": os.getenv("MODEL_NAME", "MayOne")}
self.current_observation = observation
self.done = done
self.info = info
logger.info(f"Step {self.step_count}: verdict={verdict}")
return {
"observation": observation,
"reward": reward,
"done": done,
"info": info
}
def get_state(self) -> Dict[str, Any]:
return {
"step_count": self.step_count,
"current_observation": self.current_observation,
"done": self.done,
"info": self.info
}
# Initialize global environment
env = EnvironmentState()
# --- API Endpoints required by OpenEnv ---
class ResetRequest(BaseModel):
config: Optional[Dict[str, Any]] = None
class StepRequest(BaseModel):
action: Dict[str, Any]
@app.post("/reset")
async def reset_endpoint(request: ResetRequest = None):
"""Reset the environment to initial state."""
env.reset()
# Optionally apply config if provided
if request and request.config:
logger.info(f"Applying config: {request.config}")
return {"status": "ok", "observation": env.current_observation}
@app.post("/step")
async def step_endpoint(request: StepRequest):
"""Take one step in the environment."""
result = env.step(request.action)
return result
@app.get("/state")
async def state_endpoint():
"""Get current state of the environment."""
return env.get_state()
@app.get("/health")
async def health():
return {"status": "alive"}
if __name__ == "__main__":
port = int(os.getenv("PORT", 7860))
uvicorn.run(app, host="0.0.0.0", port=port)