ContentGuardEnv / inference.py
Mridul Jain
Initial Release: High-fidelity policy moderation research framework
4535620
import asyncio
import os
import json
from typing import List, Optional
from openai import AsyncOpenAI
from server.env.environment import ContentGuardEnv
# --- MANDATORY CONFIGURATION ---
API_BASE_URL = os.getenv("API_BASE_URL", "https://api.openai.com/v1")
MODEL_NAME = os.getenv("MODEL_NAME", "gpt-4o-mini")
HF_TOKEN = os.getenv("HF_TOKEN") or os.getenv("API_KEY")
LOCAL_IMAGE_NAME = os.getenv("LOCAL_IMAGE_NAME", "content-guard-env")
# -------------------------------
BENCHMARK = "ContentGuardEnv"
MAX_STEPS = 1
TEMPERATURE = 0.0
MAX_TOKENS = 500
SUCCESS_THRESHOLD = 0.7
def log_start(task: str, env: str, model: str) -> None:
print(f"[START] task={task} env={env} model={model}", flush=True)
def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None:
error_val = error if error else "null"
done_val = str(done).lower()
action_clean = str(action).replace("\n", " ").replace("\r", "")
print(f"[STEP] step={step} action={action_clean} reward={reward:.4f} done={done_val} error={error_val}", flush=True)
def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None:
rewards_str = ",".join(f"{r:.4f}" for r in rewards)
print(f"[END] success={str(success).lower()} steps={steps} score={score:.4f} rewards={rewards_str}", flush=True)
async def evaluate_task(env: ContentGuardEnv, client: AsyncOpenAI, task_id: str) -> float:
"""Evaluates a single task and returns the reward."""
log_start(task=task_id, env=BENCHMARK, model=MODEL_NAME)
rewards: List[float] = []
steps_total = 0
try:
obs = env.reset(task_id=task_id)
for step_idx in range(1, MAX_STEPS + 1):
sys_prompt = "Expert Trust & Safety Auditor. Respond with JSON only. Strictly align with Meta Policies."
user_prompt = f"Task: {obs.task_description}\n\nCase:\n{obs.content_case.model_dump_json()}\n\nPolicy Context: {obs.policy_briefing.model_dump_json()}"
response = await client.chat.completions.create(
model=MODEL_NAME,
messages=[{"role": "system", "content": sys_prompt}, {"role": "user", "content": user_prompt}],
temperature=TEMPERATURE,
max_tokens=MAX_TOKENS,
response_format={"type": "json_object"}
)
action_pkg = json.loads(response.choices[0].message.content)
result = await env.step(action_pkg, client=client, model=MODEL_NAME)
reward = result["reward"]
done = result["done"]
rewards.append(reward)
steps_total = step_idx
log_step(step=step_idx, action=json.dumps(action_pkg), reward=reward, done=done, error=None)
if done: break
score = sum(rewards) / len(rewards) if rewards else 0.0
success = score >= SUCCESS_THRESHOLD
log_end(success=success, steps=steps_total, score=score, rewards=rewards)
return score
except Exception as e:
print(f"[ERROR] Task {task_id} failed: {e}")
log_end(success=False, steps=steps_total, score=0.05, rewards=[0.05])
return 0.05
async def main() -> None:
# 1. Initialize Standard Client
if not HF_TOKEN:
print("[ERROR] HF_TOKEN is missing. Evaluation cannot proceed.")
return
client = AsyncOpenAI(base_url=API_BASE_URL, api_key=HF_TOKEN)
env = await ContentGuardEnv.from_docker_image(LOCAL_IMAGE_NAME)
# 2. Portfolio Evaluation (Standard Multitask Loop)
# If CG_TASK is set, run only that task. Otherwise, run the full suite.
target_task = os.getenv("CG_TASK")
tasks_to_run = [target_task] if target_task else ["easy", "medium", "hard"]
for tid in tasks_to_run:
await evaluate_task(env, client, tid)
await env.close()
if __name__ == "__main__":
asyncio.run(main())