""" inference.py ============================================ Baseline Inference Script Dataset Quality Inspector Agent Meta x Hugging Face x OpenEnv Hackathon ============================================ STDOUT FORMAT (strictly followed): [START] task= env= model= [STEP] step= action= reward=<0.00> done= error= [END] success= steps= score= rewards= MANDATORY Environment Variables (must be set before running): API_BASE_URL The API endpoint for the LLM. Default: https://router.huggingface.co/v1 MODEL_NAME The model identifier to use for inference. Default: Qwen/Qwen2.5-72B-Instruct HF_TOKEN Your Hugging Face / API key. SPACE_URL Your deployed HF Space URL. Example: https://your-username-dataset-quality-env.hf.space The script will wait up to 90s for the Space to wake if needed. Example: export API_BASE_URL=https://router.huggingface.co/v1 export MODEL_NAME=Qwen/Qwen2.5-72B-Instruct export HF_TOKEN=hf_xxxxxxxxxxxxxxxxxxxx export SPACE_URL=https://your-username-dataset-quality-env.hf.space python inference.py """ import os import sys import json import time import requests import textwrap from typing import List, Optional from openai import OpenAI # ───────────────────────────────────────────── # ENVIRONMENT VARIABLES # ───────────────────────────────────────────── API_BASE_URL = os.getenv("API_BASE_URL", "https://router.huggingface.co/v1") MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen2.5-72B-Instruct") API_KEY = os.getenv("HF_TOKEN") or os.getenv("API_KEY", "hf_placeholder") SPACE_URL = os.getenv("SPACE_URL", "http://localhost:7860").rstrip("/") BENCHMARK = os.getenv("BENCHMARK", "dataset-quality-inspector") # Test datasets for each task TEST_DATASETS = { "task1_easy": "https://huggingface.co/datasets/imdb", "task2_medium": "https://huggingface.co/datasets/ag_news", "task3_hard": "dair-ai/emotion", } MAX_STEPS = 8 SUCCESS_SCORE_THRESHOLD = 0.5 # ───────────────────────────────────────────── # STDOUT LOGGING — exact required format # ───────────────────────────────────────────── def log_start(task: str, env: str, model: str) -> None: print(f"[START] task={task} env={env} model={model}", flush=True) def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None: # Sanitize action — no newlines allowed on a single line action_clean = str(action).replace("\n", " ").replace("\r", " ")[:200] error_val = error if error else "null" done_val = str(done).lower() print( f"[STEP] step={step} action={action_clean} reward={reward:.2f} " f"done={done_val} error={error_val}", flush=True, ) def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None: rewards_str = ",".join(f"{r:.2f}" for r in rewards) print( f"[END] success={str(success).lower()} steps={steps} " f"score={score:.3f} rewards={rewards_str}", flush=True, ) # ───────────────────────────────────────────── # ENVIRONMENT API CALLS # Calls your running HF Space, with retry/wake-up logic # ───────────────────────────────────────────── def wait_for_space(max_wait: int = 90) -> bool: """ Ping GET / on the Space until it responds 200 or max_wait seconds pass. HF Spaces can be cold-started — this prevents instant 0-score on wakeup. Returns True if Space is up, False if timed out. """ deadline = time.time() + max_wait attempt = 0 while time.time() < deadline: try: r = requests.get(SPACE_URL, timeout=15) if r.status_code == 200: if attempt > 0: print(f"[DEBUG] Space woke up after {attempt} attempts.", flush=True) return True except requests.exceptions.RequestException: pass attempt += 1 time.sleep(5) print(f"[DEBUG] Space did not respond after {max_wait}s.", flush=True) return False def _post_with_retry(url: str, payload: dict, retries: int = 3, timeout: int = 60) -> dict: """POST with exponential back-off retry.""" last_err = None for attempt in range(1, retries + 1): try: resp = requests.post(url, json=payload, timeout=timeout) resp.raise_for_status() return resp.json() except Exception as e: last_err = e if attempt < retries: wait = 2 ** attempt print(f"[DEBUG] POST {url} failed (attempt {attempt}): {e}. Retrying in {wait}s...", flush=True) time.sleep(wait) raise RuntimeError(f"All {retries} attempts to POST {url} failed. Last error: {last_err}") def env_reset(task_id: str, user_url: str) -> dict: """Call POST /reset on HF Space with retry.""" return _post_with_retry( f"{SPACE_URL}/reset", {"task_id": task_id, "user_url": user_url} ) def env_step(action: dict) -> dict: """Call POST /step on HF Space with retry.""" return _post_with_retry( f"{SPACE_URL}/step", {"action": action} ) def env_state() -> dict: """Call GET /state on HF Space.""" resp = requests.get(f"{SPACE_URL}/state", timeout=30) resp.raise_for_status() return resp.json() # ───────────────────────────────────────────── # SYSTEM PROMPTS — per task # ───────────────────────────────────────────── SYSTEM_PROMPT_T1 = textwrap.dedent(""" You are a professional dataset quality inspector. You will receive HuggingFace dataset metadata. Your job is to identify ALL missing or incomplete fields and return a JSON action. You MUST return a valid JSON object with these exact fields: { "task_id": "task1_easy", "missing_fields": ["list", "of", "missing", "field", "names"], "issues_found": [ {"field": "license", "issue": "License is missing", "severity": "critical", "reason": "Cannot use without license"} ], "quality_score": 0.45, "severity_summary": {"critical": 1, "high": 2, "medium": 1, "low": 2}, "recommendation": "Add license urgently. Specify task_type and column_descriptions.", "verdict": "incomplete" } Fields to check: source, author, description, license, num_rows, column_descriptions, task_type, language, date_created, tags, citation. Severity levels: critical (blocks usage), high (very important), medium (should fix), low (nice to have) Verdicts: complete, needs_minor_fixes, incomplete, rejected Return ONLY the JSON object. No explanation, no markdown, no backticks. """).strip() SYSTEM_PROMPT_T2 = textwrap.dedent(""" You are a data quality analyst. You will receive real dataset rows from HuggingFace. Your job is to count ALL data quality issues and return a JSON action. You MUST return a valid JSON object with these exact fields: { "task_id": "task2_medium", "issue_summary": { "duplicates": 0, "missing_values": 0, "outliers": 0, "inconsistencies": 0, "class_imbalance": 0, "wrong_data_types": 0, "invalid_ranges": 0, "empty_constant_cols": 0 }, "total_issues": 0, "quality_score": 0.85, "verdict": "good_quality", "recommendations": ["list of recommendations"] } Check for: 1. duplicates - exact same rows 2. missing_values - null or empty cells 3. outliers - impossible values (age=-5, age=999) 4. inconsistencies - same value written differently (USA vs U.S.A) 5. class_imbalance - one label dominates >80% 6. wrong_data_types - mixed types in same column 7. invalid_ranges - values outside logical bounds 8. empty_constant_cols - columns with only one value or all empty Verdicts: good_quality, needs_minor_fixes, needs_major_fixes, rejected Return ONLY the JSON object. No explanation, no markdown, no backticks. """).strip() SYSTEM_PROMPT_T3 = textwrap.dedent(""" You are a senior data auditor conducting a 5-turn deep audit. Each turn focuses on a different aspect of dataset quality. Return a JSON object for each turn. Turn 1 - Dataset Overview: Analyze columns, types, missing rates, detect text/label columns. Turn 2 - Near Duplicates: Find near-duplicate rows and annotation inconsistencies. Turn 3 - Label Noise & Toxicity: Find noisy labels and toxic/hate content. Turn 4 - Bias & Language: Detect gender/racial/cultural bias, check language diversity. Turn 5 - Final Report: Data leakage, domain drift, complete audit report. For turns 1-4, return: {"task_id": "task3_hard", "turn_response": "Your detailed analysis here"} For turn 5 ONLY, return: { "task_id": "task3_hard", "turn_response": "Final summary", "audit_report": { "audit_score": 0.75, "verdict": "GOOD", "scores": { "near_duplicate_score": 0.95, "annotation_consistency_score": 0.90, "label_noise_score": 0.85, "toxicity_score": 1.0, "bias_score": 0.80, "language_consistency_score": 0.98, "data_leakage_score": 1.0, "domain_drift_score": 0.9 }, "recommendations": ["List of specific fixes needed"] } } Return ONLY the JSON object. No explanation, no markdown, no backticks. """).strip() # ───────────────────────────────────────────── # LLM CALL — uses OpenAI client # ───────────────────────────────────────────── def call_llm(client: OpenAI, system_prompt: str, user_content: str) -> str: """Call LLM via OpenAI client. Returns raw text response.""" try: completion = client.chat.completions.create( model=MODEL_NAME, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_content}, ], temperature=0.1, # Low temperature for deterministic JSON output max_tokens=1000, stream=False, ) text = (completion.choices[0].message.content or "").strip() return text except Exception as e: print(f"[DEBUG] LLM call failed: {e}", flush=True) return "" def parse_json_response(text: str) -> dict: """Parse JSON from LLM response. Handles markdown code blocks.""" text = text.strip() # Remove markdown code blocks if present if text.startswith("```"): lines = text.split("\n") text = "\n".join(lines[1:-1]) if len(lines) > 2 else text text = text.strip() try: return json.loads(text) except json.JSONDecodeError: # Try to find JSON object in response start = text.find("{") end = text.rfind("}") + 1 if start >= 0 and end > start: try: return json.loads(text[start:end]) except json.JSONDecodeError: pass return {} # ───────────────────────────────────────────── # FALLBACK ACTIONS # Used if LLM fails — ensures valid submission # ───────────────────────────────────────────── def fallback_action_t1(observation: dict) -> dict: """Fallback action for Task 1 if LLM fails.""" dataset = observation.get("dataset", {}) missing = [] issues = [] sev_map = { "license": "critical", "description": "critical", "source": "critical", "task_type": "high", "column_descriptions": "high", "author": "high", "language": "medium", "date_created": "medium", "num_rows": "medium", "tags": "low", "citation": "low" } for field, sev in sev_map.items(): val = dataset.get(field) if not val or val == "" or val == {} or val == [] or val == 0: missing.append(field) issues.append({ "field": field, "issue": f"{field} is missing or empty", "severity": sev, "reason": f"Important field {field} needs to be filled" }) crit = [f for f in missing if sev_map.get(f) == "critical"] high = [f for f in missing if sev_map.get(f) == "high"] med = [f for f in missing if sev_map.get(f) == "medium"] low = [f for f in missing if sev_map.get(f) == "low"] if len(crit) >= 2: verdict = "rejected" elif len(crit) == 1 or len(high) >= 2: verdict = "incomplete" elif missing: verdict = "needs_minor_fixes" else: verdict = "complete" filled = 12 - len(missing) score = round(max(0.0, (filled / 12) - len(crit) * 0.15), 2) parts = [] if crit: parts.append(f"URGENT: Add {', '.join(crit)}") if high: parts.append(f"HIGH: Add {', '.join(high)}") if med: parts.append(f"MEDIUM: Add {', '.join(med)}") return { "task_id": "task1_easy", "missing_fields": missing, "issues_found": issues, "quality_score": score, "severity_summary": {"critical": len(crit), "high": len(high), "medium": len(med), "low": len(low)}, "recommendation": ". ".join(parts) or "Dataset looks complete.", "verdict": verdict } def fallback_action_t2() -> dict: """Fallback action for Task 2 if LLM fails.""" return { "task_id": "task2_medium", "issue_summary": { "duplicates": 0, "missing_values": 0, "outliers": 0, "inconsistencies": 0, "class_imbalance": 0, "wrong_data_types": 0, "invalid_ranges": 0, "empty_constant_cols": 0 }, "total_issues": 0, "quality_score": 0.90, "verdict": "good_quality", "recommendations": ["Dataset appears to be in good quality."] } def fallback_action_t3(turn: int) -> dict: """Fallback action for Task 3 if LLM fails.""" responses = { 1: "Dataset loaded. Analyzing columns and structure.", 2: "Checking for near-duplicates and annotation inconsistencies.", 3: "Checking for label noise and toxic content.", 4: "Analyzing bias and linguistic diversity.", } if turn < 5: return { "task_id": "task3_hard", "turn_response": responses.get(turn, f"Completing turn {turn} analysis.") } return { "task_id": "task3_hard", "turn_response": "Audit complete.", "audit_report": { "audit_score": 0.70, "verdict": "GOOD", "scores": { "near_duplicate_score": 0.90, "annotation_consistency_score": 0.85, "label_noise_score": 0.90, "toxicity_score": 0.95, "bias_score": 0.75, "language_consistency_score": 0.98, "data_leakage_score": 1.00, "domain_drift_score": 0.90 }, "recommendations": [ "Review near-duplicate pairs", "Check annotation consistency", "Audit for bias in sensitive columns" ] } } # ───────────────────────────────────────────── # RUN ONE TASK EPISODE # ───────────────────────────────────────────── def run_task(client: OpenAI, task_id: str, user_url: str) -> dict: """ Runs one full episode for a given task. Returns: {score, steps, rewards, success} """ rewards = [] steps_taken = 0 score = 0.0 success = False error_msg = None log_start(task=task_id, env=BENCHMARK, model=MODEL_NAME) try: # ── Reset environment ── observation = env_reset(task_id, user_url) if observation.get("error"): raise Exception(observation.get("message", "Reset failed")) max_turns = observation.get("max_turns", 1) done = False for step in range(1, max_turns + 1): if done: break steps_taken = step action = {} error_msg = None try: # ── Build user prompt from observation ── obs_text = json.dumps(observation, indent=2)[:3000] # limit size # ── Get LLM response ── if task_id == "task1_easy": system_prompt = SYSTEM_PROMPT_T1 user_prompt = ( f"Inspect this dataset metadata and return your JSON action:\n\n" f"{obs_text}" ) elif task_id == "task2_medium": system_prompt = SYSTEM_PROMPT_T2 user_prompt = ( f"Analyze these dataset rows for quality issues and return your JSON action:\n\n" f"{obs_text}" ) else: # task3_hard system_prompt = SYSTEM_PROMPT_T3 turn_num = observation.get("turn", step) user_prompt = ( f"You are on Turn {turn_num} of 5.\n" f"Instructions: {observation.get('instructions', '')}\n\n" f"Dataset info:\n{obs_text}\n\n" f"Return your JSON action for this turn." ) llm_response = call_llm(client, system_prompt, user_prompt) action = parse_json_response(llm_response) # ── Use fallback if LLM returned empty/invalid JSON ── if not action or "task_id" not in action: print(f"[DEBUG] LLM returned invalid JSON, using fallback", flush=True) if task_id == "task1_easy": action = fallback_action_t1(observation) elif task_id == "task2_medium": action = fallback_action_t2() else: turn_num = observation.get("turn", step) action = fallback_action_t3(turn_num) # ── Ensure task_id is set ── action["task_id"] = task_id # ── Send action to environment ── result = env_step(action) reward = float(result.get("score", result.get("cumulative_score", 0.0))) # For multi-turn, use turn score not cumulative if task_id == "task3_hard" and not result.get("done", False): reward = float(result.get("score", 0.10)) done = result.get("done", True) rewards.append(reward) # ── Log step ── action_str = json.dumps(action).replace("\n", " ")[:200] log_step( step=step, action=action_str, reward=reward, done=done, error=error_msg ) # ── Update observation for next turn (task3) ── if not done and task_id == "task3_hard": next_obs = result.get("next_observation", {}) if next_obs: observation = next_obs except Exception as step_err: error_msg = str(step_err)[:100] rewards.append(0.0) log_step( step=step, action="error", reward=0.0, done=True, error=error_msg ) done = True break # ── Calculate final score ── if rewards: score = round(sum(rewards) / len(rewards), 4) score = min(max(score, 0.0), 1.0) success = score >= SUCCESS_SCORE_THRESHOLD except Exception as e: error_msg = str(e)[:100] print(f"[DEBUG] Task {task_id} failed: {error_msg}", flush=True) if not rewards: rewards.append(0.0) score = 0.0 success = False log_end( success=success, steps=steps_taken, score=score, rewards=rewards ) return { "task_id": task_id, "score": score, "steps": steps_taken, "rewards": rewards, "success": success } # ───────────────────────────────────────────── # MAIN — runs all 3 tasks # ───────────────────────────────────────────── def main() -> None: """ Runs the baseline inference agent through all 3 tasks. Prints [START], [STEP], [END] lines for each task. """ # ── Validate required env vars ── missing_vars = [] if not os.getenv("HF_TOKEN") and not os.getenv("API_KEY"): missing_vars.append("HF_TOKEN") if not os.getenv("SPACE_URL"): print("[DEBUG] WARNING: SPACE_URL not set — defaulting to http://localhost:7860", flush=True) if missing_vars: print(f"[DEBUG] WARNING: Missing env vars: {missing_vars}. Scores may be 0.", flush=True) # ── Wait for HF Space to wake up ── print(f"[DEBUG] Space URL: {SPACE_URL}", flush=True) print(f"[DEBUG] Model: {MODEL_NAME}", flush=True) print(f"[DEBUG] API Base: {API_BASE_URL}", flush=True) print(f"[DEBUG] Waiting for Space to be ready...", flush=True) if not wait_for_space(max_wait=90): print("[DEBUG] FATAL: HF Space unreachable. Aborting.", flush=True) sys.exit(1) print(f"[DEBUG] Space is ready. Running 3 tasks...", flush=True) print(flush=True) # Initialize OpenAI client client = OpenAI( base_url=API_BASE_URL, api_key=API_KEY ) all_results = [] # ── Run Task 1 ── result1 = run_task( client = client, task_id = "task1_easy", user_url = TEST_DATASETS["task1_easy"] ) all_results.append(result1) print(flush=True) # ── Run Task 2 ── result2 = run_task( client = client, task_id = "task2_medium", user_url = TEST_DATASETS["task2_medium"] ) all_results.append(result2) print(flush=True) # ── Run Task 3 ── result3 = run_task( client = client, task_id = "task3_hard", user_url = TEST_DATASETS["task3_hard"] ) all_results.append(result3) print(flush=True) # ── Final Summary ── print("=" * 60, flush=True) print("BASELINE INFERENCE SUMMARY", flush=True) print("=" * 60, flush=True) for r in all_results: status = "PASS" if r["success"] else "FAIL" print( f" {r['task_id']:<20} score={r['score']:.3f} " f"steps={r['steps']} [{status}]", flush=True ) valid_scores = [r["score"] for r in all_results] overall_score = round(sum(valid_scores) / len(valid_scores), 4) print(f"\n Overall average score: {overall_score:.4f}", flush=True) print("=" * 60, flush=True) if __name__ == "__main__": main()