Spaces:
Sleeping
Sleeping
| """ | |
| AuditEnv — Hackathon inference entrypoint. | |
| Mandatory environment variables: | |
| - API_BASE_URL : The API endpoint for the LLM. | |
| - MODEL_NAME : The model identifier to use for inference. | |
| - HF_TOKEN : Your Hugging Face / API key. | |
| This script emits only three stdout line types, in order: | |
| [START], [STEP], [END] | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import os | |
| import textwrap | |
| from typing import Any, Dict, List, Optional | |
| import httpx | |
| from openai import OpenAI | |
| # --------------------------------------------------------------------------- | |
| # Environment variables (hackathon mandatory) | |
| # --------------------------------------------------------------------------- | |
| API_BASE_URL = os.getenv("API_BASE_URL", "https://router.huggingface.co/v1") | |
| MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen2.5-72B-Instruct") | |
| API_KEY = os.getenv("HF_TOKEN") or os.getenv("API_KEY", "") | |
| # Environment server URL (where AuditEnv FastAPI is running) | |
| ENV_BASE_URL = os.getenv("AUDITENV_BASE_URL", "http://127.0.0.1:8000") | |
| # Inference config | |
| TASK_IDS = ["easy", "medium", "hard"] | |
| SEED = 42 | |
| MAX_STEPS_MAP = {"easy": 12, "medium": 20, "hard": 28} | |
| TEMPERATURE = 0.3 | |
| MAX_TOKENS = 400 | |
| BENCHMARK = "auditenv" | |
| # --------------------------------------------------------------------------- | |
| # Logging helpers — strict [START], [STEP], [END] format | |
| # --------------------------------------------------------------------------- | |
| def _bool_str(value: bool) -> str: | |
| return "true" if value else "false" | |
| def _single_line(value: str) -> str: | |
| return " ".join(str(value).splitlines()).strip() | |
| def log_start(task: str, env: str, model: str) -> None: | |
| print(f"[START] task={task} env={env} model={model}", flush=True) | |
| def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None: | |
| error_val = _single_line(error) if error else "null" | |
| action_val = _single_line(action) | |
| print( | |
| f"[STEP] step={step} action={action_val} reward={reward:.2f} " | |
| f"done={_bool_str(done)} error={error_val}", | |
| flush=True, | |
| ) | |
| def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None: | |
| rewards_str = ",".join(f"{r:.2f}" for r in rewards) | |
| print( | |
| f"[END] success={_bool_str(success)} steps={steps} score={score:.2f} rewards={rewards_str}", | |
| flush=True, | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # System prompt for the audit agent | |
| # --------------------------------------------------------------------------- | |
| SYSTEM_PROMPT = textwrap.dedent("""\ | |
| You are an expert compliance auditor AI agent. You are reviewing documents in a | |
| simulated audit environment. Your job is to identify compliance violations, | |
| fraud, and policy breaches. | |
| For each step, you will receive a list of documents. Analyze them and decide on | |
| one of three actions: | |
| 1. **submit_finding** — You found a violation. Return JSON with: | |
| - "action_type": "submit_finding" | |
| - "document_id": the ID of the suspicious document | |
| - "violation_type": the type of violation you detected | |
| - "evidence": list of document IDs as supporting evidence | |
| - "confidence": float between 0.0 and 1.0 | |
| - "note": brief explanation | |
| 2. **flag_human_review** — You want a human to look at something. Return JSON with: | |
| - "action_type": "flag_human_review" | |
| - "note": explanation of concern | |
| 3. **noop** — Nothing suspicious in the current batch. Return JSON with: | |
| - "action_type": "noop" | |
| - "note": reason for no finding | |
| VIOLATION TYPES by task difficulty: | |
| - Easy: "duplicate_receipt", "alcohol_over_limit", "late_submission" | |
| - Medium: "sod_conflict", "dormant_account_reactivation", "temporal_anomaly" | |
| - Hard: "shell_company", "invoice_splitting", "round_tripping" | |
| IMPORTANT: Respond with ONLY a valid JSON object. No markdown, no explanation. | |
| """) | |
| # --------------------------------------------------------------------------- | |
| # LLM interaction | |
| # --------------------------------------------------------------------------- | |
| def build_user_prompt(task_id: str, step: int, observation: Dict[str, Any], history: List[str]) -> str: | |
| docs = observation.get("documents", []) | |
| docs_text = "" | |
| for doc in docs[:10]: # limit to 10 docs for context window | |
| docs_text += f" - ID: {doc.get('id', 'N/A')}, Type: {doc.get('type', 'N/A')}, Text: {doc.get('text', '')[:200]}\n" | |
| findings_submitted = observation.get("findings_submitted", 0) | |
| steps_remaining = observation.get("steps_remaining", 0) | |
| current_score = observation.get("current_partial_score", 0.0) | |
| history_block = "\n".join(history[-5:]) if history else "None" | |
| return textwrap.dedent(f"""\ | |
| Task: {task_id} (Step {step}) | |
| Findings submitted so far: {findings_submitted} | |
| Steps remaining: {steps_remaining} | |
| Current partial score: {current_score:.2f} | |
| Documents to review: | |
| {docs_text} | |
| Recent history: | |
| {history_block} | |
| Analyze the documents and return a JSON action. Look for violations relevant to this task difficulty. | |
| """) | |
| def get_model_action( | |
| client: OpenAI, | |
| task_id: str, | |
| step: int, | |
| observation: Dict[str, Any], | |
| history: List[str], | |
| ) -> Dict[str, Any]: | |
| user_prompt = build_user_prompt(task_id, step, observation, history) | |
| try: | |
| completion = client.chat.completions.create( | |
| model=MODEL_NAME, | |
| messages=[ | |
| {"role": "system", "content": SYSTEM_PROMPT}, | |
| {"role": "user", "content": user_prompt}, | |
| ], | |
| temperature=TEMPERATURE, | |
| max_tokens=MAX_TOKENS, | |
| stream=False, | |
| ) | |
| text = (completion.choices[0].message.content or "").strip() | |
| # Strip markdown fences if present | |
| if text.startswith("```"): | |
| lines = text.split("\n") | |
| lines = [l for l in lines if not l.strip().startswith("```")] | |
| text = "\n".join(lines).strip() | |
| payload = json.loads(text) | |
| return _build_action_from_llm(task_id, payload, observation) | |
| except (json.JSONDecodeError, Exception) as exc: | |
| print(f"[DEBUG] Model parse/request failed: {exc}", flush=True) | |
| return _build_heuristic_action(task_id, observation) | |
| def _build_action_from_llm(task_id: str, payload: Dict[str, Any], observation: Dict[str, Any]) -> Dict[str, Any]: | |
| action_type = payload.get("action_type", "noop") | |
| if action_type not in {"submit_finding", "flag_human_review", "noop"}: | |
| action_type = "noop" | |
| action: Dict[str, Any] = { | |
| "action_type": action_type, | |
| "task_id": task_id, | |
| "note": str(payload.get("note", ""))[:200], | |
| } | |
| if action_type == "submit_finding": | |
| documents = observation.get("documents", []) | |
| doc_id = payload.get("document_id", documents[0]["id"] if documents else "UNKNOWN") | |
| violation_type = payload.get("violation_type", "duplicate_receipt") | |
| evidence = payload.get("evidence", [doc_id]) | |
| confidence = float(payload.get("confidence", 0.5)) | |
| confidence = max(0.0, min(1.0, confidence)) | |
| action["finding"] = { | |
| "document_id": doc_id, | |
| "violation_type": violation_type, | |
| "evidence": evidence if isinstance(evidence, list) else [evidence], | |
| "confidence": confidence, | |
| } | |
| return action | |
| def _build_heuristic_action(task_id: str, observation: Dict[str, Any]) -> Dict[str, Any]: | |
| """Fallback heuristic policy when LLM call fails.""" | |
| documents = observation.get("documents", []) | |
| doc_id = documents[0]["id"] if documents else "UNKNOWN" | |
| violation_map = { | |
| "easy": "duplicate_receipt", | |
| "medium": "sod_conflict", | |
| "hard": "shell_company", | |
| } | |
| return { | |
| "action_type": "submit_finding", | |
| "task_id": task_id, | |
| "finding": { | |
| "document_id": doc_id, | |
| "violation_type": violation_map.get(task_id, "duplicate_receipt"), | |
| "evidence": [doc_id], | |
| "confidence": 0.5, | |
| }, | |
| "note": "heuristic_fallback", | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Main inference loop | |
| # --------------------------------------------------------------------------- | |
| def run_task(task_id: str, client: OpenAI, http: httpx.Client) -> tuple[float, bool]: | |
| """Run a single task and return (score, success).""" | |
| max_steps = MAX_STEPS_MAP.get(task_id, 12) | |
| history: List[str] = [] | |
| rewards: List[float] = [] | |
| steps_taken = 0 | |
| score = 0.0 | |
| success = False | |
| log_start(task=task_id, env=BENCHMARK, model=MODEL_NAME) | |
| try: | |
| # Reset environment | |
| reset_resp = http.post(f"{ENV_BASE_URL}/reset", json={"task_id": task_id, "seed": SEED}) | |
| reset_resp.raise_for_status() | |
| observation = reset_resp.json() | |
| done = False | |
| for step in range(1, max_steps + 1): | |
| if done: | |
| break | |
| # Get action from LLM | |
| action = get_model_action(client, task_id, step, observation, history) | |
| action_summary = f"{action['action_type']}" | |
| if action.get("finding"): | |
| action_summary += f"({action['finding']['document_id']}:{action['finding']['violation_type']})" | |
| # Step the environment | |
| step_resp = http.post(f"{ENV_BASE_URL}/step", json=action) | |
| step_resp.raise_for_status() | |
| result = step_resp.json() | |
| # Extract results | |
| reward_obj = result.get("reward", {}) | |
| reward = float(reward_obj.get("normalized", 0.0)) if isinstance(reward_obj, dict) else 0.0 | |
| done = bool(result.get("done", False)) | |
| observation = result.get("observation", {}) | |
| error = result.get("info", {}).get("reason", None) | |
| rewards.append(reward) | |
| steps_taken = step | |
| log_step(step=step, action=action_summary, reward=reward, done=done, error=error) | |
| history.append(f"Step {step}: {action_summary} -> reward {reward:.2f} reason={error}") | |
| # Compute final score | |
| score = sum(rewards) / len(rewards) if rewards else 0.0 | |
| score = min(max(score, 0.0), 1.0) | |
| success = score >= 0.3 | |
| except Exception as exc: | |
| print(f"[DEBUG] Task {task_id} failed: {exc}", flush=True) | |
| success = False | |
| log_end(success=success, steps=steps_taken, score=score, rewards=rewards) | |
| return score, success | |
| def main() -> None: | |
| client = OpenAI(base_url=API_BASE_URL, api_key=API_KEY) | |
| with httpx.Client(timeout=30.0) as http: | |
| # Verify environment is running | |
| try: | |
| health = http.get(f"{ENV_BASE_URL}/health") | |
| health.raise_for_status() | |
| print(f"[DEBUG] Environment healthy: {health.json()}", flush=True) | |
| except Exception as exc: | |
| print(f"[DEBUG] Environment health check failed: {exc}", flush=True) | |
| print("[DEBUG] Make sure the AuditEnv server is running.", flush=True) | |
| return | |
| all_scores = {} | |
| for task_id in TASK_IDS: | |
| score, success = run_task(task_id, client, http) | |
| all_scores[task_id] = score | |
| print(f"[DEBUG] Task {task_id}: score={score:.4f} success={success}", flush=True) | |
| print("\n--- Final Scores ---", flush=True) | |
| for tid, sc in all_scores.items(): | |
| print(f" {tid}: {sc:.4f}", flush=True) | |
| if __name__ == "__main__": | |
| main() | |