Configure your parameters and click 'RUN MODERATION' to begin ingestion.
Audit Inspector
"""
@app.post("/reset", tags=["๐ค Automated Benchmarking"], summary="1. Initialize Environment (Task Selection)")
async def reset_env(req: ResetRequest = Body(default=ResetRequest())):
"""Resets the environment with a given task and seed. This must be the first step in any benchmarking track.
Accepts either ``task_id`` (legacy machine ID like ``clear_cut_moderation``)
or ``task_name`` (Swagger UI enum). ``task_id`` takes precedence when both
are supplied.
"""
try:
if req.task_id:
# Validator sends task_id (legacy ID like "clear_cut_moderation")
task_cfg = resolve_task(req.task_id)
internal_task_name = task_cfg.name
elif req.task_name:
# Swagger UI sends the enum
internal_task_name = TASK_MAP[req.task_name]
else:
# Default to Task 1
internal_task_name = "Task 1: Basic Safety"
state = await env.reset(task_name=internal_task_name, seed=req.seed)
return state
except (ValueError, KeyError) as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/health", tags=["๐ System Monitoring"])
def health_check():
"""Health check endpoint required by OpenEnv runtime validation."""
return {"status": "healthy"}
@app.get("/metadata", tags=["๐ System Monitoring"])
def metadata():
"""Returns environment metadata required by OpenEnv runtime validation."""
return {
"name": "SocialStreamModerationEnv",
"description": (
"A content-moderation RL environment where an agent must classify "
"social-media posts as safe or harmful under varying policy regimes, "
"with tasks spanning basic safety, contextual nuance, and fairness."
),
"version": "1.2.0",
"tasks": list(CANONICAL_TO_LEGACY.values()),
}
@app.get("/schema", tags=["๐ System Monitoring"])
def schema():
"""Returns action, observation, and state schemas for OpenEnv validation."""
return {
"action": {
"type": "string",
"enum": [a.value for a in ModerationAction],
},
"observation": {
"type": "object",
"properties": {
"post_id": {"type": "string"},
"text": {"type": "string"},
"user_history_summary": {"type": "string"},
"context_type": {"type": "string"},
"platform_policy_mode": {"type": "string"},
"user_group": {"type": "string"},
"step_index": {"type": "integer"},
"total_steps": {"type": "integer"},
},
},
"state": {
"type": "object",
"properties": {
"post_id": {"type": "string"},
"text": {"type": "string"},
"context_type": {"type": "string"},
"platform_policy_mode": {"type": "string"},
"user_group": {"type": "string"},
"step_index": {"type": "integer"},
"total_steps": {"type": "integer"},
},
},
}
@app.get("/tasks", tags=["๐ค Automated Benchmarking"])
async def list_tasks():
"""Returns the list of tasks available in the environment for discovery.
``task_id`` / ``id`` use the legacy machine-readable IDs that match
``openenv.yaml`` (e.g. ``clear_cut_moderation``) so the external validator
can cross-reference them.
"""
return [
{
"task_id": CANONICAL_TO_LEGACY.get(canonical, canonical),
"id": CANONICAL_TO_LEGACY.get(canonical, canonical),
"name": task_cfg.name,
"difficulty": task_cfg.difficulty,
"description": f"Episode length: {task_cfg.episode_length} posts. Policy mode: {task_cfg.policy_mode.value}.",
"grader_id": task_cfg.grader_id,
}
for canonical, task_cfg in TASKS.items()
]
@app.get("/graders", tags=["๐ก๏ธ Automated Benchmarking"])
async def list_graders_endpoint():
"""Returns the list of graders available in the environment for discovery."""
return _list_graders()
# Per-task score cache so /grader?task_id=... can return past scores
_task_scores: Dict[str, float] = {}
@app.get("/grader", tags=["๐ค Automated Benchmarking"])
def grader_score(task_id: Optional[str] = Query(None, description="Legacy task ID to retrieve a specific task's score.")):
"""Returns the grader score for the current (or most recent) episode.
The Scaler / OpenEnv hackathon validator calls this endpoint after running
an episode to obtain the final score. Accepts an optional ``task_id``
query parameter to retrieve the score for a specific task.
If no episode has been run yet a minimal default score is returned.
"""
# If a specific task_id is requested, look up its cached score
if task_id:
if task_id in _task_scores:
return {"score": _task_scores[task_id]}
# Also check canonical name
canonical = TASK_ALIASES.get(task_id)
if canonical and canonical in _task_scores:
return {"score": _task_scores[canonical]}
return {"score": 0.001}
# Use the environment's last episode info to compute the score
if env.episode_history:
task = env.current_task
if task is not None:
grader_inst = get_grader(task.grader_id)
score = grader_inst.grade(env.episode_history)
# Cache the score under both canonical name and legacy ID
_task_scores[task.name] = score
legacy_id = CANONICAL_TO_LEGACY.get(task.name)
if legacy_id:
_task_scores[legacy_id] = score
else:
score = grade_episode(env.episode_history, use_fairness=False)
else:
score = 0.001
return {"score": score}
@app.post("/evaluate", tags=["๐งช Interactive Lab"], summary="Test Model Logic (XAI Insight)")
async def evaluate_text(
req: EvaluateRequest,
policy_mode: PolicyModeChoice = Query(PolicyModeChoice.NORMAL, description="Select the active safety policy regime."),
user_history: UserHistoryChoice = Query(UserHistoryChoice.CLEAN, description="Select the author's moderation history."),
context_type: ContextTypeChoice = Query(ContextTypeChoice.ROOT, description="Select the content hierarchical context.")
):
"""Internal endpoint for the Interactive Sandbox UI. Returns logic reasoning and calculated rewards."""
# Use proper Enum types
from envs.social_stream_moderation.models import PolicyMode, State
from envs.social_stream_moderation.graders import compute_per_post_reward, REWARD_TABLE
from inference import get_agent
# Map choice to model enum
try:
p_mode = PolicyMode(POLICY_MAP[policy_mode])
except ValueError:
p_mode = PolicyMode.NORMAL
# Predict action using rule-based or LLM logic
agent = get_agent(api_base_url=req.api_base_url, model_name=req.model_name, api_key=req.api_key)
mock_state = State(
post_id="playground_test",
text=req.text,
user_history_summary=HISTORY_MAP[user_history],
context_type=CONTEXT_MAP[context_type],
platform_policy_mode=p_mode.value,
user_group="A",
step_index=0,
total_steps=1
)
action, reason = agent.predict(mock_state)
# We need to simulate a 'correct' label for reward calculation in demo
# For simplicity in demo, if the agent bans, we assume it caught something
# (HarmLabel mapping is internal, so we'll just derive a representative reward)
# Check if this input matches our known harmful patterns to determine reward
from envs.social_stream_moderation.models import HarmLabel
from inference import SAFETY_KEYWORDS
best_harm_guess = HarmLabel.SAFE
for category, keywords in SAFETY_KEYWORDS.items():
if any(kw in req.text.lower() for kw in keywords):
best_harm_guess = category
break
reward = compute_per_post_reward(best_harm_guess, action, p_mode)
return {
"action": action.value,
"reward": float(reward),
"reason": reason
}
@app.post("/step", tags=["๐งช Interactive Lab"])
async def step_env(req: StepRequest):
try:
next_state, reward, done, info = await env.step(req.action)
final_score = 0.0
grader_id = None
if done:
# The environment now uses the task-specific grader internally;
# the final score and grader_id are returned in ``info``.
final_score = info.get("score", 0.0)
grader_id = info.get("grader_id")
return {
"next_state": next_state,
"reward": reward,
"done": done,
"info": info,
"final_score": final_score,
"grader_id": grader_id,
}
except RuntimeError as e:
raise HTTPException(status_code=400, detail=str(e))
@app.post("/predict_and_step", tags=["๐ค Automated Benchmarking"], summary="2. Autonomous Model Execution (Autonomous)")
async def predict_and_step(req: Optional[LLMConfigRequest] = Body(None)):
"""Predicts using dynamic agent and steps the env automatically. This matches our inference.py autonomous loop."""
from inference import get_agent
state = env._get_state()
if state is None:
raise HTTPException(status_code=400, detail="No active episode. Please call /reset first.")
agent = get_agent(
api_base_url=req.api_base_url if req else None,
model_name=req.model_name if req else None,
api_key=req.api_key if req else None
)
action, reason = agent.predict(state)
# Execute the step with the model's prediction
next_state, reward, done, info = await env.step(action)
final_score = 0.0
grader_id = None
if done:
# The environment now uses the task-specific grader internally
final_score = info.get("score", 0.0)
grader_id = info.get("grader_id")
return {
"prediction": action.value,
"reason": reason,
"reward": reward,
"done": done,
"final_score": final_score,
"grader_id": grader_id,
"next_state": next_state,
"info": info
}
@app.post("/feedback")
async def save_feedback(req: FeedbackRequest):
"""Saves human correction to local JSON memory for reinforcement learning."""
import json
memory_path = os.path.join(os.path.dirname(__file__), "..", "envs", "social_stream_moderation", "human_memory.json")
# Load existing memory
memory = []
if os.path.exists(memory_path):
with open(memory_path, "r") as f:
try:
memory = json.load(f)
except:
memory = []
# Check for duplicates or update
found = False
for entry in memory:
if entry["text"] == req.text:
entry["action"] = req.corrected_action
entry["reason"] = req.reason
found = True
break
if not found:
memory.append({
"text": req.text,
"action": req.corrected_action,
"reason": req.reason
})
with open(memory_path, "w") as f:
json.dump(memory, f, indent=2)
return {"status": "success", "message": "Memory reinforced."}
@app.get("/state", tags=["๐ System Monitoring"])
def get_state():
state = env._get_state()
if state is None:
return {
"status": "Ready",
"message": "Environment is initialized but no episode is currently active.",
"how_to_start": "Call 'POST /reset' with a task_name (e.g., 'clear_cut_moderation') to begin benchmarking."
}
return state
def kill_port(port):
import subprocess
import os
import sys
try:
if sys.platform == "win32":
# Windows logic
output = subprocess.check_output(f'netstat -ano | findstr :{port}', shell=True).decode()
for line in output.strip().split('\n'):
if 'LISTENING' in line:
pid = line.strip().split()[-1]
if pid != str(os.getpid()):
print(f"Cleanup: Stopping existing process {pid} on port {port}...")
subprocess.run(f'taskkill /F /PID {pid}', shell=True, capture_output=True)
else:
# Unix/Mac/Linux logic
try:
# Use lsof to find the PID
output = subprocess.check_output(['lsof', '-ti', f':{port}']).decode().strip()
if output:
for pid in output.split('\n'):
if pid != str(os.getpid()):
print(f"Cleanup: Stopping existing process {pid} on port {port}...")
subprocess.run(['kill', '-9', pid], capture_output=True)
except (subprocess.CalledProcessError, FileNotFoundError):
# Fallback to fuser if lsof is missing
try:
subprocess.run(['fuser', '-k', f'{port}/tcp'], capture_output=True)
except Exception:
pass
except Exception:
pass
def main():
import uvicorn
# Automatically clear the port before starting to avoid [WinError 10048]
kill_port(7860)
uvicorn.run(app, host="0.0.0.0", port=7860)
if __name__ == "__main__":
main()