content-moderation-env / inference.py
ANI00's picture
Refactor: Reduce comments in inference.py
059feb0 verified
"""
Content Moderation Inference Script
Env vars: API_BASE_URL, MODEL_NAME, HF_TOKEN, SERVER_URL, TASK_NAME
"""
import json
import os
import textwrap
from typing import Dict, Any, List, Optional
from dotenv import load_dotenv
import requests
load_dotenv()
API_BASE_URL = os.getenv("API_BASE_URL", "https://router.huggingface.co/v1")
MODEL_NAME = os.getenv("MODEL_NAME", "meta-llama/Llama-3.1-8B-Instruct")
HF_TOKEN = os.getenv("HF_TOKEN")
API_KEY = HF_TOKEN or os.getenv("API_KEY")
SERVER_URL = os.getenv("SERVER_URL", "http://localhost:7860").rstrip("/")
TASK_NAME = os.getenv("TASK_NAME", "text_spam")
BENCHMARK = "content_moderation_env"
MAX_STEPS = 10
USE_LOCAL = os.getenv("USE_LOCAL_MODEL", "false").lower() == "true"
SUCCESS_SCORE_THRESHOLD = 0.5
VALID_DECISIONS = {"approve", "reject", "escalate", "flag"}
VALID_LABELS = [
"spam", "scam", "phishing", "pharmaceutical_spam",
"hate_speech", "violence", "harassment", "misinformation",
"adult_content", "deepfake", "political_manipulation", "fraud",
]
SYSTEM_PROMPT = textwrap.dedent("""
You are an expert content moderation AI. For each piece of content, analyze it carefully and respond ONLY with a valid JSON object.
Required JSON fields:
- "decision": one of "approve", "reject", "escalate", "flag"
- "reason": brief explanation (max 2 sentences)
- "confidence": float between 0.0 and 1.0
- "labels": list of applicable labels
Decision guidelines:
- approve: content is safe and legitimate
- reject: content clearly violates policy
- flag: content is borderline or needs human review
- escalate: uncertain, requires specialist review
Output ONLY the JSON object, no other text.
""").strip()
def log_start(task: str, env: str, model: str):
print(f"[START] task={task} env={env} model={model}", flush=True)
def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]):
err = error if error else "null"
print(f"[STEP] step={step} action={action} reward={reward:.2f} done={str(done).lower()} error={err}", flush=True)
def log_end(success: bool, steps: int, score: float, rewards: List[float]):
rewards_str = ",".join(f"{r:.2f}" for r in rewards)
print(f"[END] success={str(success).lower()} steps={steps} score={score:.3f} rewards={rewards_str}", flush=True)
def build_prompt(obs: Dict[str, Any]) -> str:
parts = [f"Content ID: {obs.get('content_id', 'unknown')}"]
parts.append(f"Type: {obs.get('content_type', 'text')}")
if obs.get("text"):
parts.append(f"Text: {obs['text']}")
if obs.get("image_description"):
parts.append(f"Image analysis: {obs['image_description']}")
if obs.get("detector_score") is not None:
score = obs["detector_score"]
parts.append(f"Deepfake detector score (higher = more likely fake): {score:.3f}")
meta = obs.get("metadata", {})
if meta:
meta_str = ", ".join(f"{k}={v}" for k, v in meta.items())
parts.append(f"Metadata: {meta_str}")
parts.append(f"\nStep {obs.get('step_num', '?')} of {obs.get('total_steps', '?')}")
return "\n".join(parts)
def _default_action() -> Dict:
return {"decision": "escalate", "reason": "Unable to analyze content.", "confidence": 0.3, "labels": []}
def call_local_model(prompt: str) -> Dict:
from transformers import pipeline
pipe = pipeline("text-generation", model="meta-llama/Llama-3.1-8B-Instruct")
messages = [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": prompt},
]
output = pipe(messages, max_new_tokens=256, temperature=0.2, do_sample=True)
text = output[0]["generated_text"]
if isinstance(text, list):
text = text[-1].get("content", "")
return parse_llm_response(text)
def call_api_model(prompt: str) -> Dict:
from openai import OpenAI
client = OpenAI(base_url=API_BASE_URL, api_key=API_KEY or "hf_default")
completion = client.chat.completions.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": prompt},
],
temperature=0.2,
max_tokens=256,
)
text = (completion.choices[0].message.content or "").strip()
return parse_llm_response(text)
def parse_llm_response(text: str) -> Dict:
try:
start = text.find("{")
end = text.rfind("}") + 1
if start >= 0 and end > start:
parsed = json.loads(text[start:end])
decision = parsed.get("decision", "escalate")
if decision not in VALID_DECISIONS:
decision = "escalate"
return {
"decision": decision,
"reason": str(parsed.get("reason", ""))[:200],
"confidence": float(max(0.0, min(1.0, parsed.get("confidence", 0.5)))),
"labels": [l for l in parsed.get("labels", []) if l in VALID_LABELS],
}
except Exception:
pass
return _default_action()
def get_decision(prompt: str) -> Dict:
try:
if USE_LOCAL:
return call_local_model(prompt)
return call_api_model(prompt)
except Exception as e:
print(f"[DEBUG] Model error: {e}", flush=True)
return _default_action()
def server_reset(task: str) -> Optional[Dict]:
try:
r = requests.post(f"{SERVER_URL}/reset", json={"task": task}, timeout=30)
r.raise_for_status()
return r.json()
except Exception as e:
print(f"[DEBUG] reset error: {e}", flush=True)
return None
def server_step(action: Dict) -> Optional[Dict]:
try:
r = requests.post(f"{SERVER_URL}/step", json=action, timeout=30)
r.raise_for_status()
return r.json()
except Exception as e:
print(f"[DEBUG] step error: {e}", flush=True)
return None
def server_close():
try:
requests.post(f"{SERVER_URL}/close", timeout=10)
except Exception:
pass
def run_episode(task: str):
rewards: List[float] = []
steps_taken = 0
score = 0.0
success = False
obs = None
log_start(task=task, env=BENCHMARK, model=MODEL_NAME)
try:
reset_result = server_reset(task)
if reset_result is None:
log_end(success=False, steps=0, score=0.0, rewards=[])
return
obs = reset_result.get("observation", {})
done = False
for step in range(1, MAX_STEPS + 1):
if done or obs is None:
break
prompt = build_prompt(obs)
action = get_decision(prompt)
action_str = json.dumps({k: v for k, v in action.items() if k != "reason"})
result = server_step(action)
if result is None:
log_step(step, action_str, 0.0, True, "server_error")
break
reward = float(result.get("reward", 0.0))
done = bool(result.get("done", False))
error = result.get("info", {}).get("error")
rewards.append(reward)
steps_taken = step
log_step(step, action_str, reward, done, error)
obs = result.get("observation")
total_steps_in_task = obs.get("total_steps", len(rewards)) if obs else len(rewards)
max_possible = float(total_steps_in_task)
score = sum(rewards) / max_possible if max_possible > 0 else 0.0
score = min(max(score, 0.0), 1.0)
success = score >= SUCCESS_SCORE_THRESHOLD
finally:
server_close()
log_end(success=success, steps=steps_taken, score=score, rewards=rewards)
if __name__ == "__main__":
run_episode(TASK_NAME)