Spaces:
Sleeping
Sleeping
File size: 13,189 Bytes
7743c15 500e2b8 d23cbe2 7743c15 500e2b8 7743c15 d23cbe2 7743c15 d23cbe2 500e2b8 d23cbe2 7743c15 500e2b8 7743c15 500e2b8 7743c15 013e24a 7743c15 d23cbe2 7743c15 4535620 d23cbe2 6475c9b 7743c15 d23cbe2 7743c15 d23cbe2 7743c15 d23cbe2 7743c15 d23cbe2 7743c15 d23cbe2 7743c15 d23cbe2 7743c15 d23cbe2 7743c15 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 | """
ContentGuardEnv — Policy Enforcement Gateway (v1.0)
==================================================
This server acts as the primary interface between LLM agents and the
ContentGuard moderation environment. It exposes a standardized OpenEnv
OpenAPI/WebSocket interface to facilitate autonomous training and
benchmarking across Meta community standards.
Key Features:
- Real-time policy-trace streaming via WebSockets.
- Dynamic environment resetting for multi-task RLHF.
- Automated grading & reward calculation.
"""
import os
import json
import time
from typing import Dict, Any, Optional
from dotenv import load_dotenv
load_dotenv()
import uvicorn
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse
from pydantic import BaseModel, Field
from openai import AsyncOpenAI
# Internal module imports (Post-Restructure)
from env import ContentGuardEnv
app = FastAPI(
title="ContentGuardEnv Gateway",
description="Operational environment for Meta-scale Trust & Safety agent benchmarking.",
version="1.0.0",
)
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
# Resource Path Configuration
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
STATIC_DIR = os.path.join(BASE_DIR, "static")
if os.path.exists(STATIC_DIR):
app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static")
sessions: Dict[str, ContentGuardEnv] = {}
# LLM Inference Client (Defaulting to Hackathon standard endpoints)
DEFAULT_BASE_URL = os.environ.get("API_BASE_URL", "https://api.openai.com/v1")
MODEL_NAME = os.environ.get("MODEL_NAME", "gpt-4o-mini")
DEFAULT_API_KEY = os.environ.get("HF_TOKEN") or os.environ.get("OPENAI_API_KEY")
BUILD_TAG = os.environ.get("HF_SPACE_SHA") or os.environ.get("SPACE_BUILD_TAG") or os.environ.get("BUILD_TAG") or str(int(time.time()))
def _is_placeholder_api_key(api_key: Optional[str]) -> bool:
if not api_key:
return True
lowered = api_key.strip().lower()
return lowered in {"sk-placeholder", "your_api_key", "changeme"}
def _resolve_session_client(cfg: Optional[Dict[str, Any]]) -> tuple[Optional[AsyncOpenAI], str]:
"""Build a runtime client from UI config with provider-aware routing rules."""
if not cfg:
return aclient, MODEL_NAME
api_key = (cfg.get("api_key") or "").strip()
base_url = (cfg.get("base_url") or "").strip() or DEFAULT_BASE_URL
model = (cfg.get("model") or "").strip() or MODEL_NAME
if _is_placeholder_api_key(api_key):
# No runtime key provided: use server default if configured, otherwise deterministic grading fallback.
return aclient, MODEL_NAME
if api_key.startswith("hf_") and "openai.com" in base_url:
base_url = "https://api-inference.huggingface.co/v1"
elif api_key.startswith("sk-") and "huggingface.co" in base_url:
base_url = "https://api.openai.com/v1"
if api_key.startswith("hf_") and not (cfg.get("model") or "").strip():
model = "meta-llama/Llama-3-70b-instruct"
return AsyncOpenAI(api_key=api_key, base_url=base_url), model
def _build_demo_action(env: ContentGuardEnv) -> Dict[str, Any]:
"""Generate a task-valid deterministic action when live inference is unavailable."""
gt = env.ground_truth or {}
if env.task_id == "easy":
return {"violation": gt.get("violation", "safe")}
if env.task_id == "medium":
return {
"action": gt.get("action", "no_action"),
"severity": int(gt.get("severity", 3)),
"reasoning": "Deterministic demo fallback due unavailable inference credentials.",
}
return {
"ruling": gt.get("ruling", "upheld"),
"policy_references": gt.get("policy_references", []),
"explanation": "Deterministic fallback path used because model inference is unavailable.",
"user_guidance": "Review platform standards and avoid repeating flagged behavior.",
}
aclient: Optional[AsyncOpenAI] = None
if not _is_placeholder_api_key(DEFAULT_API_KEY):
aclient = AsyncOpenAI(api_key=DEFAULT_API_KEY.strip(), base_url=DEFAULT_BASE_URL)
class ResetRequest(BaseModel):
task_id: str = Field(default="easy", description="Difficulty tier: easy | medium | hard")
class StepRequest(BaseModel):
action: Dict[str, Any] = Field(..., description="Agent moderation decision package")
@app.middleware("http")
async def disable_cache_for_dashboard_assets(request: Request, call_next):
response = await call_next(request)
path = request.url.path
if path == "/" or path.startswith("/static/"):
response.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0"
response.headers["Pragma"] = "no-cache"
response.headers["Expires"] = "0"
return response
@app.get("/", response_class=HTMLResponse, include_in_schema=False)
async def serve_dashboard():
"""Serves the primary autonomous monitoring interface."""
index_path = os.path.join(STATIC_DIR, "index.html")
if not os.path.exists(index_path):
return HTMLResponse("ContentGuard Dashboard: Static assets not found. Check /server/static deployment.")
with open(index_path, "r", encoding="utf-8") as f:
html = f.read()
html = html.replace('/static/style.css', f'/static/style.css?v={BUILD_TAG}', 1)
html = html.replace('/static/app.js', f'/static/app.js?v={BUILD_TAG}', 1)
return HTMLResponse(content=html)
@app.post("/reset", tags=["Core API"])
async def reset_environment(req: ResetRequest = ResetRequest()):
"""Initializes a new moderation case and returns the observation state."""
env = ContentGuardEnv()
obs = env.reset(task_id=req.task_id)
sessions[env.episode_id] = env
return obs
@app.post("/step/{episode_id}", tags=["Core API"])
async def process_step(episode_id: str, req: StepRequest):
"""Submits an agent decision and returns a policy-aligned reward signal."""
env = sessions.get(episode_id)
if not env:
raise HTTPException(404, f"Session '{episode_id}' not active or expired.")
try:
return await env.step(req.action, client=aclient, model=MODEL_NAME)
except RuntimeError as e:
raise HTTPException(400, f"Policy Engine Conflict: {str(e)}")
@app.get("/state/{episode_id}", tags=["Advanced Utility"])
async def get_env_state(episode_id: str):
"""Retrieves the full internal state of an active moderation episode."""
env = sessions.get(episode_id)
if not env:
raise HTTPException(404, f"Session '{episode_id}' not found.")
return env.state()
@app.get("/health", tags=["System"])
async def check_health():
return {"status": "operational", "active_sessions": len(sessions), "engine": "ContentGuardEnv"}
@app.websocket("/ws")
async def policy_trace_socket(websocket: WebSocket):
"""Streams real-time reasoning traces and environment telemetry."""
await websocket.accept()
env: ContentGuardEnv | None = None
session_client: AsyncOpenAI | None = aclient
session_model: str = MODEL_NAME
try:
while True:
raw = await websocket.receive_text()
try:
msg = json.loads(raw)
except json.JSONDecodeError:
await websocket.send_json({"type": "error", "message": "Malformed WebSocket packet."})
continue
cmd = msg.get("action")
# Universal Credential Injector (Session-based)
if "config" in msg:
session_client, session_model = _resolve_session_client(msg.get("config"))
if cmd == "reset":
env = ContentGuardEnv()
try:
obs = env.reset(task_id=msg.get("task_id", "easy"))
# Explicitly dump Pydantic model for WebSocket JSON serialization
await websocket.send_json({"type": "reset", "observation": obs.model_dump()})
except ValueError as e:
await websocket.send_json({"type": "error", "message": str(e)})
elif cmd == "step":
if not env:
await websocket.send_json({"type": "error", "message": "State conflict: Submit reset before step."})
continue
try:
result = await env.step(msg.get("data", {}), client=session_client, model=session_model)
await websocket.send_json({"type": "step", "result": result})
except RuntimeError as e:
await websocket.send_json({"type": "error", "message": f"Execution halted: {str(e)}"})
elif cmd == "run_agent":
if not env:
await websocket.send_json({"type": "error", "message": "Session inactive."})
continue
if env.done:
await websocket.send_json({"type": "error", "message": "Episode finished. Call reset() for a new case."})
continue
try:
await websocket.send_json({"type": "stream", "content": f"[START] ep={env.episode_id} task={env.task_id}\n"})
if session_client is None:
raise RuntimeError("No API credentials configured.")
sys_prompt = "Expert Safety Moderator. Respond with JSON only. Strictly align with platform policies."
user_prompt = f"Policy Task: {env._task_config['description']}\n\nEvidence:\n{json.dumps(env.case)}\n\nSubmit ruling in JSON."
stream = await session_client.chat.completions.create(
model=session_model,
messages=[{"role": "system", "content": sys_prompt}, {"role": "user", "content": user_prompt}],
temperature=0.0,
stream=True
)
full_response = ""
async for chunk in stream:
content = chunk.choices[0].delta.content
if content:
full_response += content
await websocket.send_json({"type": "stream", "content": content})
if not full_response.strip():
raise ValueError("Model returned an empty response.")
# Clean/Parse Output
js_str = full_response.strip()
if js_str.startswith("```"):
js_str = js_str.split("```")[1]
if js_str.startswith("json"): js_str = js_str[4:]
action = json.loads(js_str.strip())
await websocket.send_json({"type": "stream", "content": f"\n\n[STEP] Policy Ingested: {json.dumps(action)}\n"})
result = await env.step(action, client=session_client, model=session_model)
await websocket.send_json({"type": "step", "result": result})
await websocket.send_json({"type": "stream", "content": f"[END] Result: Success. Reward: {result['reward']:.4f}\n"})
except Exception as e:
err_text = str(e)
lowered_err = err_text.lower()
if "invalid_api_key" in lowered_err or "incorrect api key" in lowered_err or "api key" in lowered_err or "401" in lowered_err:
err_text = "Authentication failed for the configured provider."
await websocket.send_json({"type": "stream", "content": f"\n\n[NOTICE] Inference Unavailable: {err_text}\nInitiating Passive Grader demo...\n"})
if env.done:
await websocket.send_json({"type": "error", "message": "Episode finished. Call reset() for a new case."})
continue
sim_action = _build_demo_action(env)
try:
result = await env.step(sim_action, client=None, model=session_model)
await websocket.send_json({"type": "step", "result": result})
await websocket.send_json({"type": "stream", "content": f"\n[DEMO] Passive Ruling Emitted. Final Reward: {result['reward']:.4f}\n"})
except RuntimeError as step_error:
await websocket.send_json({"type": "error", "message": str(step_error)})
elif cmd == "state":
if env: await websocket.send_json({"type": "state", "state": env.state()})
except WebSocketDisconnect:
pass
def main():
"""Server entry point for OpenEnv validation."""
print("🚀 Initializing ContentGuard Policy Gateway...")
uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=False)
if __name__ == "__main__":
main()
|