import sys import os # Add parent directory to path so we can import 'envs' and 'inference' sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from fastapi import FastAPI, HTTPException, Body, Query from fastapi.responses import HTMLResponse from pydantic import BaseModel, Field from typing import Optional, Dict, Any, List from enum import Enum from envs.social_stream_moderation.environment import SocialStreamModerationEnv from envs.social_stream_moderation.models import State, ModerationAction from envs.social_stream_moderation.graders import list_graders as _list_graders, get_grader, grade_episode from envs.social_stream_moderation.tasks import TASKS, TASK_ALIASES, resolve_task # Reverse mapping: canonical name -> legacy task ID (for openenv.yaml alignment) CANONICAL_TO_LEGACY = {v: k for k, v in TASK_ALIASES.items()} # Enums for Swagger Dropdowns class TaskName(str, Enum): TASK_1 = "Task 1: Basic Safety" TASK_2 = "Task 2: Context & Nuance" TASK_3 = "Task 3: Fairness & Bias" class PolicyModeChoice(str, Enum): NORMAL = "Standard Moderation" STRICT = "Strict Enforcement" LENIENT = "Lenient Privacy" class UserHistoryChoice(str, Enum): CLEAN = "Clean History" REPEATED = "Repeat Offender" class ContextTypeChoice(str, Enum): ROOT = "Main Post" COMMENT = "Comment" # Mapping UI labels back to backend IDs TASK_MAP = { TaskName.TASK_1: "Task 1: Basic Safety", TaskName.TASK_2: "Task 2: Context & Nuance", TaskName.TASK_3: "Task 3: Fairness & Bias" } POLICY_MAP = { PolicyModeChoice.NORMAL: "normal", PolicyModeChoice.STRICT: "strict", PolicyModeChoice.LENIENT: "lenient" } HISTORY_MAP = { UserHistoryChoice.CLEAN: "no_prior_violations", UserHistoryChoice.REPEATED: "prior_violations" } CONTEXT_MAP = { ContextTypeChoice.ROOT: "root_post", ContextTypeChoice.COMMENT: "comment" } # API Metadata for Swagger TAGS_METADATA = [ { "name": "๐Ÿค– Automated Benchmarking", "description": "Autonomous evaluation loop. Sequence: **Reset** -> **Predict & Step** (Repeat). This tracks the official hackathon metrics.", }, { "name": "๐Ÿงช Interactive Lab", "description": "Manual testing endpoints. Perfect for testing specific edge cases with custom inputs and human overrides.", }, { "name": "๐Ÿ“Š System Monitoring", "description": "Real-time state and status tracking for the moderation engine.", } ] app = FastAPI( title="๐Ÿ›ก๏ธ PolicyPulse AI | Intelligence Center", description=""" ### Evaluation Guide for Hackathon Judges: 1. **Automated Testing:** Use `[POST] /reset` then `[POST] /predict_and_step`. 2. **Fairness Testing (Task 3):** Start an episode with `task_name='policy_fairness'`. 3. **Internal Logic:** Use `[POST] /evaluate` to see the model's reasoning without advancing the environment. """, version="1.2.0", openapi_tags=TAGS_METADATA ) env = SocialStreamModerationEnv() class ResetRequest(BaseModel): task_name: Optional[TaskName] = Field(None, description="Select the benchmark level to initialize (Swagger UI).") task_id: Optional[str] = Field(None, description="Machine-readable task ID (e.g. 'clear_cut_moderation'). Used by the validator.") seed: Optional[int] = Field(42, description="Reproducibility seed for dataset sampling.") class EvaluateRequest(BaseModel): text: str = Field("I will kill you", description="The user content string to analyze.") api_base_url: Optional[str] = Field(None, description="Optional override. If blank, defaults to server's API_BASE_URL config.") model_name: Optional[str] = Field(None, description="Optional override. If blank, defaults to server's MODEL_NAME config.") api_key: Optional[str] = Field(None, description="Optional override. If blank, defaults to server's HF_TOKEN config.") class LLMConfigRequest(BaseModel): api_base_url: Optional[str] = Field(None, description="Optional override. If blank, defaults to server's API_BASE_URL config.") model_name: Optional[str] = Field(None, description="Optional override. If blank, defaults to server's MODEL_NAME config.") api_key: Optional[str] = Field(None, description="Optional override. If blank, defaults to server's HF_TOKEN config.") class StepRequest(BaseModel): action: ModerationAction = Field(ModerationAction.ALLOW, description="The action to apply to the current post.") class FeedbackRequest(BaseModel): text: str corrected_action: ModerationAction reason: str @app.get("/", response_class=HTMLResponse) def read_root(): return r""" PolicyPulse AI | Intelligence Center
REVISION 1.0
Operation Center
LIVE MODE
GRADER MODE
Optional: Custom LLM Override
Model Accuracy
--
Aggregate Reward
0.000
System State
IDLE
๐Ÿ“‰
Intelligence Stream Idle

Configure your parameters and click 'RUN MODERATION' to begin ingestion.

""" @app.post("/reset", tags=["๐Ÿค– Automated Benchmarking"], summary="1. Initialize Environment (Task Selection)") async def reset_env(req: ResetRequest = Body(default=ResetRequest())): """Resets the environment with a given task and seed. This must be the first step in any benchmarking track. Accepts either ``task_id`` (legacy machine ID like ``clear_cut_moderation``) or ``task_name`` (Swagger UI enum). ``task_id`` takes precedence when both are supplied. """ try: if req.task_id: # Validator sends task_id (legacy ID like "clear_cut_moderation") task_cfg = resolve_task(req.task_id) internal_task_name = task_cfg.name elif req.task_name: # Swagger UI sends the enum internal_task_name = TASK_MAP[req.task_name] else: # Default to Task 1 internal_task_name = "Task 1: Basic Safety" state = await env.reset(task_name=internal_task_name, seed=req.seed) return state except (ValueError, KeyError) as e: raise HTTPException(status_code=400, detail=str(e)) @app.get("/health", tags=["๐Ÿ“Š System Monitoring"]) def health_check(): """Health check endpoint required by OpenEnv runtime validation.""" return {"status": "healthy"} @app.get("/metadata", tags=["๐Ÿ“Š System Monitoring"]) def metadata(): """Returns environment metadata required by OpenEnv runtime validation.""" return { "name": "SocialStreamModerationEnv", "description": ( "A content-moderation RL environment where an agent must classify " "social-media posts as safe or harmful under varying policy regimes, " "with tasks spanning basic safety, contextual nuance, and fairness." ), "version": "1.2.0", "tasks": list(CANONICAL_TO_LEGACY.values()), } @app.get("/schema", tags=["๐Ÿ“Š System Monitoring"]) def schema(): """Returns action, observation, and state schemas for OpenEnv validation.""" return { "action": { "type": "string", "enum": [a.value for a in ModerationAction], }, "observation": { "type": "object", "properties": { "post_id": {"type": "string"}, "text": {"type": "string"}, "user_history_summary": {"type": "string"}, "context_type": {"type": "string"}, "platform_policy_mode": {"type": "string"}, "user_group": {"type": "string"}, "step_index": {"type": "integer"}, "total_steps": {"type": "integer"}, }, }, "state": { "type": "object", "properties": { "post_id": {"type": "string"}, "text": {"type": "string"}, "context_type": {"type": "string"}, "platform_policy_mode": {"type": "string"}, "user_group": {"type": "string"}, "step_index": {"type": "integer"}, "total_steps": {"type": "integer"}, }, }, } @app.get("/tasks", tags=["๐Ÿค– Automated Benchmarking"]) async def list_tasks(): """Returns the list of tasks available in the environment for discovery. ``task_id`` / ``id`` use the legacy machine-readable IDs that match ``openenv.yaml`` (e.g. ``clear_cut_moderation``) so the external validator can cross-reference them. """ return [ { "task_id": CANONICAL_TO_LEGACY.get(canonical, canonical), "id": CANONICAL_TO_LEGACY.get(canonical, canonical), "name": task_cfg.name, "difficulty": task_cfg.difficulty, "description": f"Episode length: {task_cfg.episode_length} posts. Policy mode: {task_cfg.policy_mode.value}.", "grader_id": task_cfg.grader_id, } for canonical, task_cfg in TASKS.items() ] @app.get("/graders", tags=["๐Ÿ›ก๏ธ Automated Benchmarking"]) async def list_graders_endpoint(): """Returns the list of graders available in the environment for discovery.""" return _list_graders() # Per-task score cache so /grader?task_id=... can return past scores _task_scores: Dict[str, float] = {} @app.get("/grader", tags=["๐Ÿค– Automated Benchmarking"]) def grader_score(task_id: Optional[str] = Query(None, description="Legacy task ID to retrieve a specific task's score.")): """Returns the grader score for the current (or most recent) episode. The Scaler / OpenEnv hackathon validator calls this endpoint after running an episode to obtain the final score. Accepts an optional ``task_id`` query parameter to retrieve the score for a specific task. If no episode has been run yet a minimal default score is returned. """ # If a specific task_id is requested, look up its cached score if task_id: if task_id in _task_scores: return {"score": _task_scores[task_id]} # Also check canonical name canonical = TASK_ALIASES.get(task_id) if canonical and canonical in _task_scores: return {"score": _task_scores[canonical]} return {"score": 0.001} # Use the environment's last episode info to compute the score if env.episode_history: task = env.current_task if task is not None: grader_inst = get_grader(task.grader_id) score = grader_inst.grade(env.episode_history) # Cache the score under both canonical name and legacy ID _task_scores[task.name] = score legacy_id = CANONICAL_TO_LEGACY.get(task.name) if legacy_id: _task_scores[legacy_id] = score else: score = grade_episode(env.episode_history, use_fairness=False) else: score = 0.001 return {"score": score} @app.post("/evaluate", tags=["๐Ÿงช Interactive Lab"], summary="Test Model Logic (XAI Insight)") async def evaluate_text( req: EvaluateRequest, policy_mode: PolicyModeChoice = Query(PolicyModeChoice.NORMAL, description="Select the active safety policy regime."), user_history: UserHistoryChoice = Query(UserHistoryChoice.CLEAN, description="Select the author's moderation history."), context_type: ContextTypeChoice = Query(ContextTypeChoice.ROOT, description="Select the content hierarchical context.") ): """Internal endpoint for the Interactive Sandbox UI. Returns logic reasoning and calculated rewards.""" # Use proper Enum types from envs.social_stream_moderation.models import PolicyMode, State from envs.social_stream_moderation.graders import compute_per_post_reward, REWARD_TABLE from inference import get_agent # Map choice to model enum try: p_mode = PolicyMode(POLICY_MAP[policy_mode]) except ValueError: p_mode = PolicyMode.NORMAL # Predict action using rule-based or LLM logic agent = get_agent(api_base_url=req.api_base_url, model_name=req.model_name, api_key=req.api_key) mock_state = State( post_id="playground_test", text=req.text, user_history_summary=HISTORY_MAP[user_history], context_type=CONTEXT_MAP[context_type], platform_policy_mode=p_mode.value, user_group="A", step_index=0, total_steps=1 ) action, reason = agent.predict(mock_state) # We need to simulate a 'correct' label for reward calculation in demo # For simplicity in demo, if the agent bans, we assume it caught something # (HarmLabel mapping is internal, so we'll just derive a representative reward) # Check if this input matches our known harmful patterns to determine reward from envs.social_stream_moderation.models import HarmLabel from inference import SAFETY_KEYWORDS best_harm_guess = HarmLabel.SAFE for category, keywords in SAFETY_KEYWORDS.items(): if any(kw in req.text.lower() for kw in keywords): best_harm_guess = category break reward = compute_per_post_reward(best_harm_guess, action, p_mode) return { "action": action.value, "reward": float(reward), "reason": reason } @app.post("/step", tags=["๐Ÿงช Interactive Lab"]) async def step_env(req: StepRequest): try: next_state, reward, done, info = await env.step(req.action) final_score = 0.0 grader_id = None if done: # The environment now uses the task-specific grader internally; # the final score and grader_id are returned in ``info``. final_score = info.get("score", 0.0) grader_id = info.get("grader_id") return { "next_state": next_state, "reward": reward, "done": done, "info": info, "final_score": final_score, "grader_id": grader_id, } except RuntimeError as e: raise HTTPException(status_code=400, detail=str(e)) @app.post("/predict_and_step", tags=["๐Ÿค– Automated Benchmarking"], summary="2. Autonomous Model Execution (Autonomous)") async def predict_and_step(req: Optional[LLMConfigRequest] = Body(None)): """Predicts using dynamic agent and steps the env automatically. This matches our inference.py autonomous loop.""" from inference import get_agent state = env._get_state() if state is None: raise HTTPException(status_code=400, detail="No active episode. Please call /reset first.") agent = get_agent( api_base_url=req.api_base_url if req else None, model_name=req.model_name if req else None, api_key=req.api_key if req else None ) action, reason = agent.predict(state) # Execute the step with the model's prediction next_state, reward, done, info = await env.step(action) final_score = 0.0 grader_id = None if done: # The environment now uses the task-specific grader internally final_score = info.get("score", 0.0) grader_id = info.get("grader_id") return { "prediction": action.value, "reason": reason, "reward": reward, "done": done, "final_score": final_score, "grader_id": grader_id, "next_state": next_state, "info": info } @app.post("/feedback") async def save_feedback(req: FeedbackRequest): """Saves human correction to local JSON memory for reinforcement learning.""" import json memory_path = os.path.join(os.path.dirname(__file__), "..", "envs", "social_stream_moderation", "human_memory.json") # Load existing memory memory = [] if os.path.exists(memory_path): with open(memory_path, "r") as f: try: memory = json.load(f) except: memory = [] # Check for duplicates or update found = False for entry in memory: if entry["text"] == req.text: entry["action"] = req.corrected_action entry["reason"] = req.reason found = True break if not found: memory.append({ "text": req.text, "action": req.corrected_action, "reason": req.reason }) with open(memory_path, "w") as f: json.dump(memory, f, indent=2) return {"status": "success", "message": "Memory reinforced."} @app.get("/state", tags=["๐Ÿ“Š System Monitoring"]) def get_state(): state = env._get_state() if state is None: return { "status": "Ready", "message": "Environment is initialized but no episode is currently active.", "how_to_start": "Call 'POST /reset' with a task_name (e.g., 'clear_cut_moderation') to begin benchmarking." } return state def kill_port(port): import subprocess import os import sys try: if sys.platform == "win32": # Windows logic output = subprocess.check_output(f'netstat -ano | findstr :{port}', shell=True).decode() for line in output.strip().split('\n'): if 'LISTENING' in line: pid = line.strip().split()[-1] if pid != str(os.getpid()): print(f"Cleanup: Stopping existing process {pid} on port {port}...") subprocess.run(f'taskkill /F /PID {pid}', shell=True, capture_output=True) else: # Unix/Mac/Linux logic try: # Use lsof to find the PID output = subprocess.check_output(['lsof', '-ti', f':{port}']).decode().strip() if output: for pid in output.split('\n'): if pid != str(os.getpid()): print(f"Cleanup: Stopping existing process {pid} on port {port}...") subprocess.run(['kill', '-9', pid], capture_output=True) except (subprocess.CalledProcessError, FileNotFoundError): # Fallback to fuser if lsof is missing try: subprocess.run(['fuser', '-k', f'{port}/tcp'], capture_output=True) except Exception: pass except Exception: pass def main(): import uvicorn # Automatically clear the port before starting to avoid [WinError 10048] kill_port(7860) uvicorn.run(app, host="0.0.0.0", port=7860) if __name__ == "__main__": main()