Spaces:
Sleeping
Sleeping
| """ | |
| Content Moderation Inference Script | |
| Env vars: API_BASE_URL, MODEL_NAME, HF_TOKEN, SERVER_URL, TASK_NAME | |
| """ | |
| import json | |
| import os | |
| import textwrap | |
| from typing import Dict, Any, List, Optional | |
| from dotenv import load_dotenv | |
| import requests | |
| load_dotenv() | |
| API_BASE_URL = os.getenv("API_BASE_URL", "https://router.huggingface.co/v1") | |
| MODEL_NAME = os.getenv("MODEL_NAME", "meta-llama/Llama-3.1-8B-Instruct") | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| API_KEY = HF_TOKEN or os.getenv("API_KEY") | |
| SERVER_URL = os.getenv("SERVER_URL", "http://localhost:7860").rstrip("/") | |
| TASK_NAME = os.getenv("TASK_NAME", "text_spam") | |
| BENCHMARK = "content_moderation_env" | |
| MAX_STEPS = 10 | |
| USE_LOCAL = os.getenv("USE_LOCAL_MODEL", "false").lower() == "true" | |
| SUCCESS_SCORE_THRESHOLD = 0.5 | |
| VALID_DECISIONS = {"approve", "reject", "escalate", "flag"} | |
| VALID_LABELS = [ | |
| "spam", "scam", "phishing", "pharmaceutical_spam", | |
| "hate_speech", "violence", "harassment", "misinformation", | |
| "adult_content", "deepfake", "political_manipulation", "fraud", | |
| ] | |
| SYSTEM_PROMPT = textwrap.dedent(""" | |
| You are an expert content moderation AI. For each piece of content, analyze it carefully and respond ONLY with a valid JSON object. | |
| Required JSON fields: | |
| - "decision": one of "approve", "reject", "escalate", "flag" | |
| - "reason": brief explanation (max 2 sentences) | |
| - "confidence": float between 0.0 and 1.0 | |
| - "labels": list of applicable labels | |
| Decision guidelines: | |
| - approve: content is safe and legitimate | |
| - reject: content clearly violates policy | |
| - flag: content is borderline or needs human review | |
| - escalate: uncertain, requires specialist review | |
| Output ONLY the JSON object, no other text. | |
| """).strip() | |
| def log_start(task: str, env: str, model: str): | |
| print(f"[START] task={task} env={env} model={model}", flush=True) | |
| def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]): | |
| err = error if error else "null" | |
| print(f"[STEP] step={step} action={action} reward={reward:.2f} done={str(done).lower()} error={err}", flush=True) | |
| def log_end(success: bool, steps: int, score: float, rewards: List[float]): | |
| rewards_str = ",".join(f"{r:.2f}" for r in rewards) | |
| print(f"[END] success={str(success).lower()} steps={steps} score={score:.3f} rewards={rewards_str}", flush=True) | |
| def build_prompt(obs: Dict[str, Any]) -> str: | |
| parts = [f"Content ID: {obs.get('content_id', 'unknown')}"] | |
| parts.append(f"Type: {obs.get('content_type', 'text')}") | |
| if obs.get("text"): | |
| parts.append(f"Text: {obs['text']}") | |
| if obs.get("image_description"): | |
| parts.append(f"Image analysis: {obs['image_description']}") | |
| if obs.get("detector_score") is not None: | |
| score = obs["detector_score"] | |
| parts.append(f"Deepfake detector score (higher = more likely fake): {score:.3f}") | |
| meta = obs.get("metadata", {}) | |
| if meta: | |
| meta_str = ", ".join(f"{k}={v}" for k, v in meta.items()) | |
| parts.append(f"Metadata: {meta_str}") | |
| parts.append(f"\nStep {obs.get('step_num', '?')} of {obs.get('total_steps', '?')}") | |
| return "\n".join(parts) | |
| def _default_action() -> Dict: | |
| return {"decision": "escalate", "reason": "Unable to analyze content.", "confidence": 0.3, "labels": []} | |
| def call_local_model(prompt: str) -> Dict: | |
| from transformers import pipeline | |
| pipe = pipeline("text-generation", model="meta-llama/Llama-3.1-8B-Instruct") | |
| messages = [ | |
| {"role": "system", "content": SYSTEM_PROMPT}, | |
| {"role": "user", "content": prompt}, | |
| ] | |
| output = pipe(messages, max_new_tokens=256, temperature=0.2, do_sample=True) | |
| text = output[0]["generated_text"] | |
| if isinstance(text, list): | |
| text = text[-1].get("content", "") | |
| return parse_llm_response(text) | |
| def call_api_model(prompt: str) -> Dict: | |
| from openai import OpenAI | |
| client = OpenAI(base_url=API_BASE_URL, api_key=API_KEY or "hf_default") | |
| completion = client.chat.completions.create( | |
| model=MODEL_NAME, | |
| messages=[ | |
| {"role": "system", "content": SYSTEM_PROMPT}, | |
| {"role": "user", "content": prompt}, | |
| ], | |
| temperature=0.2, | |
| max_tokens=256, | |
| ) | |
| text = (completion.choices[0].message.content or "").strip() | |
| return parse_llm_response(text) | |
| def parse_llm_response(text: str) -> Dict: | |
| try: | |
| start = text.find("{") | |
| end = text.rfind("}") + 1 | |
| if start >= 0 and end > start: | |
| parsed = json.loads(text[start:end]) | |
| decision = parsed.get("decision", "escalate") | |
| if decision not in VALID_DECISIONS: | |
| decision = "escalate" | |
| return { | |
| "decision": decision, | |
| "reason": str(parsed.get("reason", ""))[:200], | |
| "confidence": float(max(0.0, min(1.0, parsed.get("confidence", 0.5)))), | |
| "labels": [l for l in parsed.get("labels", []) if l in VALID_LABELS], | |
| } | |
| except Exception: | |
| pass | |
| return _default_action() | |
| def get_decision(prompt: str) -> Dict: | |
| try: | |
| if USE_LOCAL: | |
| return call_local_model(prompt) | |
| return call_api_model(prompt) | |
| except Exception as e: | |
| print(f"[DEBUG] Model error: {e}", flush=True) | |
| return _default_action() | |
| def server_reset(task: str) -> Optional[Dict]: | |
| try: | |
| r = requests.post(f"{SERVER_URL}/reset", json={"task": task}, timeout=30) | |
| r.raise_for_status() | |
| return r.json() | |
| except Exception as e: | |
| print(f"[DEBUG] reset error: {e}", flush=True) | |
| return None | |
| def server_step(action: Dict) -> Optional[Dict]: | |
| try: | |
| r = requests.post(f"{SERVER_URL}/step", json=action, timeout=30) | |
| r.raise_for_status() | |
| return r.json() | |
| except Exception as e: | |
| print(f"[DEBUG] step error: {e}", flush=True) | |
| return None | |
| def server_close(): | |
| try: | |
| requests.post(f"{SERVER_URL}/close", timeout=10) | |
| except Exception: | |
| pass | |
| def run_episode(task: str): | |
| rewards: List[float] = [] | |
| steps_taken = 0 | |
| score = 0.0 | |
| success = False | |
| obs = None | |
| log_start(task=task, env=BENCHMARK, model=MODEL_NAME) | |
| try: | |
| reset_result = server_reset(task) | |
| if reset_result is None: | |
| log_end(success=False, steps=0, score=0.0, rewards=[]) | |
| return | |
| obs = reset_result.get("observation", {}) | |
| done = False | |
| for step in range(1, MAX_STEPS + 1): | |
| if done or obs is None: | |
| break | |
| prompt = build_prompt(obs) | |
| action = get_decision(prompt) | |
| action_str = json.dumps({k: v for k, v in action.items() if k != "reason"}) | |
| result = server_step(action) | |
| if result is None: | |
| log_step(step, action_str, 0.0, True, "server_error") | |
| break | |
| reward = float(result.get("reward", 0.0)) | |
| done = bool(result.get("done", False)) | |
| error = result.get("info", {}).get("error") | |
| rewards.append(reward) | |
| steps_taken = step | |
| log_step(step, action_str, reward, done, error) | |
| obs = result.get("observation") | |
| total_steps_in_task = obs.get("total_steps", len(rewards)) if obs else len(rewards) | |
| max_possible = float(total_steps_in_task) | |
| score = sum(rewards) / max_possible if max_possible > 0 else 0.0 | |
| score = min(max(score, 0.0), 1.0) | |
| success = score >= SUCCESS_SCORE_THRESHOLD | |
| finally: | |
| server_close() | |
| log_end(success=success, steps=steps_taken, score=score, rewards=rewards) | |
| if __name__ == "__main__": | |
| run_episode(TASK_NAME) | |