Spaces:
Sleeping
Sleeping
| import asyncio | |
| import os | |
| import json | |
| from typing import List, Optional | |
| from openai import AsyncOpenAI | |
| from server.env.environment import ContentGuardEnv | |
| # --- MANDATORY CONFIGURATION --- | |
| API_BASE_URL = os.getenv("API_BASE_URL", "https://api.openai.com/v1") | |
| MODEL_NAME = os.getenv("MODEL_NAME", "gpt-4o-mini") | |
| HF_TOKEN = os.getenv("HF_TOKEN") or os.getenv("API_KEY") | |
| LOCAL_IMAGE_NAME = os.getenv("LOCAL_IMAGE_NAME", "content-guard-env") | |
| # ------------------------------- | |
| BENCHMARK = "ContentGuardEnv" | |
| MAX_STEPS = 1 | |
| TEMPERATURE = 0.0 | |
| MAX_TOKENS = 500 | |
| SUCCESS_THRESHOLD = 0.7 | |
| def log_start(task: str, env: str, model: str) -> None: | |
| print(f"[START] task={task} env={env} model={model}", flush=True) | |
| def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None: | |
| error_val = error if error else "null" | |
| done_val = str(done).lower() | |
| action_clean = str(action).replace("\n", " ").replace("\r", "") | |
| print(f"[STEP] step={step} action={action_clean} reward={reward:.4f} done={done_val} error={error_val}", flush=True) | |
| def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None: | |
| rewards_str = ",".join(f"{r:.4f}" for r in rewards) | |
| print(f"[END] success={str(success).lower()} steps={steps} score={score:.4f} rewards={rewards_str}", flush=True) | |
| async def evaluate_task(env: ContentGuardEnv, client: AsyncOpenAI, task_id: str) -> float: | |
| """Evaluates a single task and returns the reward.""" | |
| log_start(task=task_id, env=BENCHMARK, model=MODEL_NAME) | |
| rewards: List[float] = [] | |
| steps_total = 0 | |
| try: | |
| obs = env.reset(task_id=task_id) | |
| for step_idx in range(1, MAX_STEPS + 1): | |
| sys_prompt = "Expert Trust & Safety Auditor. Respond with JSON only. Strictly align with Meta Policies." | |
| user_prompt = f"Task: {obs.task_description}\n\nCase:\n{obs.content_case.model_dump_json()}\n\nPolicy Context: {obs.policy_briefing.model_dump_json()}" | |
| response = await client.chat.completions.create( | |
| model=MODEL_NAME, | |
| messages=[{"role": "system", "content": sys_prompt}, {"role": "user", "content": user_prompt}], | |
| temperature=TEMPERATURE, | |
| max_tokens=MAX_TOKENS, | |
| response_format={"type": "json_object"} | |
| ) | |
| action_pkg = json.loads(response.choices[0].message.content) | |
| result = await env.step(action_pkg, client=client, model=MODEL_NAME) | |
| reward = result["reward"] | |
| done = result["done"] | |
| rewards.append(reward) | |
| steps_total = step_idx | |
| log_step(step=step_idx, action=json.dumps(action_pkg), reward=reward, done=done, error=None) | |
| if done: break | |
| score = sum(rewards) / len(rewards) if rewards else 0.0 | |
| success = score >= SUCCESS_THRESHOLD | |
| log_end(success=success, steps=steps_total, score=score, rewards=rewards) | |
| return score | |
| except Exception as e: | |
| print(f"[ERROR] Task {task_id} failed: {e}") | |
| log_end(success=False, steps=steps_total, score=0.05, rewards=[0.05]) | |
| return 0.05 | |
| async def main() -> None: | |
| # 1. Initialize Standard Client | |
| if not HF_TOKEN: | |
| print("[ERROR] HF_TOKEN is missing. Evaluation cannot proceed.") | |
| return | |
| client = AsyncOpenAI(base_url=API_BASE_URL, api_key=HF_TOKEN) | |
| env = await ContentGuardEnv.from_docker_image(LOCAL_IMAGE_NAME) | |
| # 2. Portfolio Evaluation (Standard Multitask Loop) | |
| # If CG_TASK is set, run only that task. Otherwise, run the full suite. | |
| target_task = os.getenv("CG_TASK") | |
| tasks_to_run = [target_task] if target_task else ["easy", "medium", "hard"] | |
| for tid in tasks_to_run: | |
| await evaluate_task(env, client, tid) | |
| await env.close() | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |