| """ |
| inference.py |
| ============================================ |
| Baseline Inference Script |
| Dataset Quality Inspector Agent |
| Meta x Hugging Face x OpenEnv Hackathon |
| ============================================ |
| |
| STDOUT FORMAT (strictly followed): |
| [START] task=<task_name> env=<benchmark> model=<model_name> |
| [STEP] step=<n> action=<action_str> reward=<0.00> done=<true|false> error=<msg|null> |
| [END] success=<true|false> steps=<n> score=<score> rewards=<r1,r2,...,rn> |
| |
| MANDATORY Environment Variables (must be set before running): |
| API_BASE_URL The API endpoint for the LLM. |
| Default: https://router.huggingface.co/v1 |
| MODEL_NAME The model identifier to use for inference. |
| Default: Qwen/Qwen2.5-72B-Instruct |
| HF_TOKEN Your Hugging Face / API key. |
| SPACE_URL Your deployed HF Space URL. |
| Example: https://your-username-dataset-quality-env.hf.space |
| The script will wait up to 90s for the Space to wake if needed. |
| |
| Example: |
| export API_BASE_URL=https://router.huggingface.co/v1 |
| export MODEL_NAME=Qwen/Qwen2.5-72B-Instruct |
| export HF_TOKEN=hf_xxxxxxxxxxxxxxxxxxxx |
| export SPACE_URL=https://your-username-dataset-quality-env.hf.space |
| python inference.py |
| """ |
|
|
| import os |
| import sys |
| import json |
| import time |
| import requests |
| import textwrap |
| from typing import List, Optional |
| from openai import OpenAI |
|
|
| |
| |
| |
|
|
| API_BASE_URL = os.getenv("API_BASE_URL", "https://router.huggingface.co/v1") |
| MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen2.5-72B-Instruct") |
| API_KEY = os.getenv("HF_TOKEN") or os.getenv("API_KEY", "hf_placeholder") |
| SPACE_URL = os.getenv("SPACE_URL", "http://localhost:7860").rstrip("/") |
| BENCHMARK = os.getenv("BENCHMARK", "dataset-quality-inspector") |
|
|
| |
| TEST_DATASETS = { |
| "task1_easy": "https://huggingface.co/datasets/imdb", |
| "task2_medium": "https://huggingface.co/datasets/ag_news", |
| "task3_hard": "dair-ai/emotion", |
| } |
|
|
| MAX_STEPS = 8 |
| SUCCESS_SCORE_THRESHOLD = 0.5 |
|
|
|
|
| |
| |
| |
|
|
| def log_start(task: str, env: str, model: str) -> None: |
| print(f"[START] task={task} env={env} model={model}", flush=True) |
|
|
|
|
| def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None: |
| |
| action_clean = str(action).replace("\n", " ").replace("\r", " ")[:200] |
| error_val = error if error else "null" |
| done_val = str(done).lower() |
| print( |
| f"[STEP] step={step} action={action_clean} reward={reward:.2f} " |
| f"done={done_val} error={error_val}", |
| flush=True, |
| ) |
|
|
|
|
| def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None: |
| rewards_str = ",".join(f"{r:.2f}" for r in rewards) |
| print( |
| f"[END] success={str(success).lower()} steps={steps} " |
| f"score={score:.3f} rewards={rewards_str}", |
| flush=True, |
| ) |
|
|
|
|
| |
| |
| |
| |
|
|
| def wait_for_space(max_wait: int = 90) -> bool: |
| """ |
| Ping GET / on the Space until it responds 200 or max_wait seconds pass. |
| HF Spaces can be cold-started β this prevents instant 0-score on wakeup. |
| Returns True if Space is up, False if timed out. |
| """ |
| deadline = time.time() + max_wait |
| attempt = 0 |
| while time.time() < deadline: |
| try: |
| r = requests.get(SPACE_URL, timeout=15) |
| if r.status_code == 200: |
| if attempt > 0: |
| print(f"[DEBUG] Space woke up after {attempt} attempts.", flush=True) |
| return True |
| except requests.exceptions.RequestException: |
| pass |
| attempt += 1 |
| time.sleep(5) |
| print(f"[DEBUG] Space did not respond after {max_wait}s.", flush=True) |
| return False |
|
|
|
|
| def _post_with_retry(url: str, payload: dict, retries: int = 3, timeout: int = 60) -> dict: |
| """POST with exponential back-off retry.""" |
| last_err = None |
| for attempt in range(1, retries + 1): |
| try: |
| resp = requests.post(url, json=payload, timeout=timeout) |
| resp.raise_for_status() |
| return resp.json() |
| except Exception as e: |
| last_err = e |
| if attempt < retries: |
| wait = 2 ** attempt |
| print(f"[DEBUG] POST {url} failed (attempt {attempt}): {e}. Retrying in {wait}s...", flush=True) |
| time.sleep(wait) |
| raise RuntimeError(f"All {retries} attempts to POST {url} failed. Last error: {last_err}") |
|
|
|
|
| def env_reset(task_id: str, user_url: str) -> dict: |
| """Call POST /reset on HF Space with retry.""" |
| return _post_with_retry( |
| f"{SPACE_URL}/reset", |
| {"task_id": task_id, "user_url": user_url} |
| ) |
|
|
|
|
| def env_step(action: dict) -> dict: |
| """Call POST /step on HF Space with retry.""" |
| return _post_with_retry( |
| f"{SPACE_URL}/step", |
| {"action": action} |
| ) |
|
|
|
|
| def env_state() -> dict: |
| """Call GET /state on HF Space.""" |
| resp = requests.get(f"{SPACE_URL}/state", timeout=30) |
| resp.raise_for_status() |
| return resp.json() |
|
|
|
|
| |
| |
| |
|
|
| SYSTEM_PROMPT_T1 = textwrap.dedent(""" |
| You are a professional dataset quality inspector. |
| You will receive HuggingFace dataset metadata. |
| Your job is to identify ALL missing or incomplete fields and return a JSON action. |
| |
| You MUST return a valid JSON object with these exact fields: |
| { |
| "task_id": "task1_easy", |
| "missing_fields": ["list", "of", "missing", "field", "names"], |
| "issues_found": [ |
| {"field": "license", "issue": "License is missing", "severity": "critical", "reason": "Cannot use without license"} |
| ], |
| "quality_score": 0.45, |
| "severity_summary": {"critical": 1, "high": 2, "medium": 1, "low": 2}, |
| "recommendation": "Add license urgently. Specify task_type and column_descriptions.", |
| "verdict": "incomplete" |
| } |
| |
| Fields to check: source, author, description, license, num_rows, column_descriptions, |
| task_type, language, date_created, tags, citation. |
| |
| Severity levels: critical (blocks usage), high (very important), medium (should fix), low (nice to have) |
| Verdicts: complete, needs_minor_fixes, incomplete, rejected |
| |
| Return ONLY the JSON object. No explanation, no markdown, no backticks. |
| """).strip() |
|
|
|
|
| SYSTEM_PROMPT_T2 = textwrap.dedent(""" |
| You are a data quality analyst. |
| You will receive real dataset rows from HuggingFace. |
| Your job is to count ALL data quality issues and return a JSON action. |
| |
| You MUST return a valid JSON object with these exact fields: |
| { |
| "task_id": "task2_medium", |
| "issue_summary": { |
| "duplicates": 0, |
| "missing_values": 0, |
| "outliers": 0, |
| "inconsistencies": 0, |
| "class_imbalance": 0, |
| "wrong_data_types": 0, |
| "invalid_ranges": 0, |
| "empty_constant_cols": 0 |
| }, |
| "total_issues": 0, |
| "quality_score": 0.85, |
| "verdict": "good_quality", |
| "recommendations": ["list of recommendations"] |
| } |
| |
| Check for: |
| 1. duplicates - exact same rows |
| 2. missing_values - null or empty cells |
| 3. outliers - impossible values (age=-5, age=999) |
| 4. inconsistencies - same value written differently (USA vs U.S.A) |
| 5. class_imbalance - one label dominates >80% |
| 6. wrong_data_types - mixed types in same column |
| 7. invalid_ranges - values outside logical bounds |
| 8. empty_constant_cols - columns with only one value or all empty |
| |
| Verdicts: good_quality, needs_minor_fixes, needs_major_fixes, rejected |
| |
| Return ONLY the JSON object. No explanation, no markdown, no backticks. |
| """).strip() |
|
|
|
|
| SYSTEM_PROMPT_T3 = textwrap.dedent(""" |
| You are a senior data auditor conducting a 5-turn deep audit. |
| Each turn focuses on a different aspect of dataset quality. |
| Return a JSON object for each turn. |
| |
| Turn 1 - Dataset Overview: Analyze columns, types, missing rates, detect text/label columns. |
| Turn 2 - Near Duplicates: Find near-duplicate rows and annotation inconsistencies. |
| Turn 3 - Label Noise & Toxicity: Find noisy labels and toxic/hate content. |
| Turn 4 - Bias & Language: Detect gender/racial/cultural bias, check language diversity. |
| Turn 5 - Final Report: Data leakage, domain drift, complete audit report. |
| |
| For turns 1-4, return: |
| {"task_id": "task3_hard", "turn_response": "Your detailed analysis here"} |
| |
| For turn 5 ONLY, return: |
| { |
| "task_id": "task3_hard", |
| "turn_response": "Final summary", |
| "audit_report": { |
| "audit_score": 0.75, |
| "verdict": "GOOD", |
| "scores": { |
| "near_duplicate_score": 0.95, |
| "annotation_consistency_score": 0.90, |
| "label_noise_score": 0.85, |
| "toxicity_score": 1.0, |
| "bias_score": 0.80, |
| "language_consistency_score": 0.98, |
| "data_leakage_score": 1.0, |
| "domain_drift_score": 0.9 |
| }, |
| "recommendations": ["List of specific fixes needed"] |
| } |
| } |
| |
| Return ONLY the JSON object. No explanation, no markdown, no backticks. |
| """).strip() |
|
|
|
|
| |
| |
| |
|
|
| def call_llm(client: OpenAI, system_prompt: str, user_content: str) -> str: |
| """Call LLM via OpenAI client. Returns raw text response.""" |
| try: |
| completion = client.chat.completions.create( |
| model=MODEL_NAME, |
| messages=[ |
| {"role": "system", "content": system_prompt}, |
| {"role": "user", "content": user_content}, |
| ], |
| temperature=0.1, |
| max_tokens=1000, |
| stream=False, |
| ) |
| text = (completion.choices[0].message.content or "").strip() |
| return text |
| except Exception as e: |
| print(f"[DEBUG] LLM call failed: {e}", flush=True) |
| return "" |
|
|
|
|
| def parse_json_response(text: str) -> dict: |
| """Parse JSON from LLM response. Handles markdown code blocks.""" |
| text = text.strip() |
| |
| if text.startswith("```"): |
| lines = text.split("\n") |
| text = "\n".join(lines[1:-1]) if len(lines) > 2 else text |
| text = text.strip() |
| try: |
| return json.loads(text) |
| except json.JSONDecodeError: |
| |
| start = text.find("{") |
| end = text.rfind("}") + 1 |
| if start >= 0 and end > start: |
| try: |
| return json.loads(text[start:end]) |
| except json.JSONDecodeError: |
| pass |
| return {} |
|
|
|
|
| |
| |
| |
| |
|
|
| def fallback_action_t1(observation: dict) -> dict: |
| """Fallback action for Task 1 if LLM fails.""" |
| dataset = observation.get("dataset", {}) |
| missing = [] |
| issues = [] |
| sev_map = { |
| "license": "critical", "description": "critical", "source": "critical", |
| "task_type": "high", "column_descriptions": "high", "author": "high", |
| "language": "medium", "date_created": "medium", "num_rows": "medium", |
| "tags": "low", "citation": "low" |
| } |
| for field, sev in sev_map.items(): |
| val = dataset.get(field) |
| if not val or val == "" or val == {} or val == [] or val == 0: |
| missing.append(field) |
| issues.append({ |
| "field": field, |
| "issue": f"{field} is missing or empty", |
| "severity": sev, |
| "reason": f"Important field {field} needs to be filled" |
| }) |
|
|
| crit = [f for f in missing if sev_map.get(f) == "critical"] |
| high = [f for f in missing if sev_map.get(f) == "high"] |
| med = [f for f in missing if sev_map.get(f) == "medium"] |
| low = [f for f in missing if sev_map.get(f) == "low"] |
|
|
| if len(crit) >= 2: verdict = "rejected" |
| elif len(crit) == 1 or len(high) >= 2: verdict = "incomplete" |
| elif missing: verdict = "needs_minor_fixes" |
| else: verdict = "complete" |
|
|
| filled = 12 - len(missing) |
| score = round(max(0.0, (filled / 12) - len(crit) * 0.15), 2) |
|
|
| parts = [] |
| if crit: parts.append(f"URGENT: Add {', '.join(crit)}") |
| if high: parts.append(f"HIGH: Add {', '.join(high)}") |
| if med: parts.append(f"MEDIUM: Add {', '.join(med)}") |
|
|
| return { |
| "task_id": "task1_easy", |
| "missing_fields": missing, |
| "issues_found": issues, |
| "quality_score": score, |
| "severity_summary": {"critical": len(crit), "high": len(high), |
| "medium": len(med), "low": len(low)}, |
| "recommendation": ". ".join(parts) or "Dataset looks complete.", |
| "verdict": verdict |
| } |
|
|
|
|
| def fallback_action_t2() -> dict: |
| """Fallback action for Task 2 if LLM fails.""" |
| return { |
| "task_id": "task2_medium", |
| "issue_summary": { |
| "duplicates": 0, "missing_values": 0, "outliers": 0, |
| "inconsistencies": 0, "class_imbalance": 0, |
| "wrong_data_types": 0, "invalid_ranges": 0, "empty_constant_cols": 0 |
| }, |
| "total_issues": 0, |
| "quality_score": 0.90, |
| "verdict": "good_quality", |
| "recommendations": ["Dataset appears to be in good quality."] |
| } |
|
|
|
|
| def fallback_action_t3(turn: int) -> dict: |
| """Fallback action for Task 3 if LLM fails.""" |
| responses = { |
| 1: "Dataset loaded. Analyzing columns and structure.", |
| 2: "Checking for near-duplicates and annotation inconsistencies.", |
| 3: "Checking for label noise and toxic content.", |
| 4: "Analyzing bias and linguistic diversity.", |
| } |
| if turn < 5: |
| return { |
| "task_id": "task3_hard", |
| "turn_response": responses.get(turn, f"Completing turn {turn} analysis.") |
| } |
| return { |
| "task_id": "task3_hard", |
| "turn_response": "Audit complete.", |
| "audit_report": { |
| "audit_score": 0.70, |
| "verdict": "GOOD", |
| "scores": { |
| "near_duplicate_score": 0.90, |
| "annotation_consistency_score": 0.85, |
| "label_noise_score": 0.90, |
| "toxicity_score": 0.95, |
| "bias_score": 0.75, |
| "language_consistency_score": 0.98, |
| "data_leakage_score": 1.00, |
| "domain_drift_score": 0.90 |
| }, |
| "recommendations": [ |
| "Review near-duplicate pairs", |
| "Check annotation consistency", |
| "Audit for bias in sensitive columns" |
| ] |
| } |
| } |
|
|
|
|
| |
| |
| |
|
|
| def run_task(client: OpenAI, task_id: str, user_url: str) -> dict: |
| """ |
| Runs one full episode for a given task. |
| Returns: {score, steps, rewards, success} |
| """ |
| rewards = [] |
| steps_taken = 0 |
| score = 0.0 |
| success = False |
| error_msg = None |
|
|
| log_start(task=task_id, env=BENCHMARK, model=MODEL_NAME) |
|
|
| try: |
| |
| observation = env_reset(task_id, user_url) |
|
|
| if observation.get("error"): |
| raise Exception(observation.get("message", "Reset failed")) |
|
|
| max_turns = observation.get("max_turns", 1) |
| done = False |
|
|
| for step in range(1, max_turns + 1): |
| if done: |
| break |
|
|
| steps_taken = step |
| action = {} |
| error_msg = None |
|
|
| try: |
| |
| obs_text = json.dumps(observation, indent=2)[:3000] |
|
|
| |
| if task_id == "task1_easy": |
| system_prompt = SYSTEM_PROMPT_T1 |
| user_prompt = ( |
| f"Inspect this dataset metadata and return your JSON action:\n\n" |
| f"{obs_text}" |
| ) |
| elif task_id == "task2_medium": |
| system_prompt = SYSTEM_PROMPT_T2 |
| user_prompt = ( |
| f"Analyze these dataset rows for quality issues and return your JSON action:\n\n" |
| f"{obs_text}" |
| ) |
| else: |
| system_prompt = SYSTEM_PROMPT_T3 |
| turn_num = observation.get("turn", step) |
| user_prompt = ( |
| f"You are on Turn {turn_num} of 5.\n" |
| f"Instructions: {observation.get('instructions', '')}\n\n" |
| f"Dataset info:\n{obs_text}\n\n" |
| f"Return your JSON action for this turn." |
| ) |
|
|
| llm_response = call_llm(client, system_prompt, user_prompt) |
| action = parse_json_response(llm_response) |
|
|
| |
| if not action or "task_id" not in action: |
| print(f"[DEBUG] LLM returned invalid JSON, using fallback", flush=True) |
| if task_id == "task1_easy": |
| action = fallback_action_t1(observation) |
| elif task_id == "task2_medium": |
| action = fallback_action_t2() |
| else: |
| turn_num = observation.get("turn", step) |
| action = fallback_action_t3(turn_num) |
|
|
| |
| action["task_id"] = task_id |
|
|
| |
| result = env_step(action) |
| reward = float(result.get("score", result.get("cumulative_score", 0.0))) |
|
|
| |
| if task_id == "task3_hard" and not result.get("done", False): |
| reward = float(result.get("score", 0.10)) |
|
|
| done = result.get("done", True) |
| rewards.append(reward) |
|
|
| |
| action_str = json.dumps(action).replace("\n", " ")[:200] |
| log_step( |
| step=step, |
| action=action_str, |
| reward=reward, |
| done=done, |
| error=error_msg |
| ) |
|
|
| |
| if not done and task_id == "task3_hard": |
| next_obs = result.get("next_observation", {}) |
| if next_obs: |
| observation = next_obs |
|
|
| except Exception as step_err: |
| error_msg = str(step_err)[:100] |
| rewards.append(0.0) |
| log_step( |
| step=step, |
| action="error", |
| reward=0.0, |
| done=True, |
| error=error_msg |
| ) |
| done = True |
| break |
|
|
| |
| if rewards: |
| score = round(sum(rewards) / len(rewards), 4) |
| score = min(max(score, 0.0), 1.0) |
| success = score >= SUCCESS_SCORE_THRESHOLD |
|
|
| except Exception as e: |
| error_msg = str(e)[:100] |
| print(f"[DEBUG] Task {task_id} failed: {error_msg}", flush=True) |
| if not rewards: |
| rewards.append(0.0) |
| score = 0.0 |
| success = False |
|
|
| log_end( |
| success=success, |
| steps=steps_taken, |
| score=score, |
| rewards=rewards |
| ) |
|
|
| return { |
| "task_id": task_id, |
| "score": score, |
| "steps": steps_taken, |
| "rewards": rewards, |
| "success": success |
| } |
|
|
|
|
| |
| |
| |
|
|
| def main() -> None: |
| """ |
| Runs the baseline inference agent through all 3 tasks. |
| Prints [START], [STEP], [END] lines for each task. |
| """ |
|
|
| |
| missing_vars = [] |
| if not os.getenv("HF_TOKEN") and not os.getenv("API_KEY"): |
| missing_vars.append("HF_TOKEN") |
| if not os.getenv("SPACE_URL"): |
| print("[DEBUG] WARNING: SPACE_URL not set β defaulting to http://localhost:7860", flush=True) |
| if missing_vars: |
| print(f"[DEBUG] WARNING: Missing env vars: {missing_vars}. Scores may be 0.", flush=True) |
|
|
| |
| print(f"[DEBUG] Space URL: {SPACE_URL}", flush=True) |
| print(f"[DEBUG] Model: {MODEL_NAME}", flush=True) |
| print(f"[DEBUG] API Base: {API_BASE_URL}", flush=True) |
| print(f"[DEBUG] Waiting for Space to be ready...", flush=True) |
|
|
| if not wait_for_space(max_wait=90): |
| print("[DEBUG] FATAL: HF Space unreachable. Aborting.", flush=True) |
| sys.exit(1) |
|
|
| print(f"[DEBUG] Space is ready. Running 3 tasks...", flush=True) |
| print(flush=True) |
|
|
| |
| client = OpenAI( |
| base_url=API_BASE_URL, |
| api_key=API_KEY |
| ) |
|
|
| all_results = [] |
|
|
| |
| result1 = run_task( |
| client = client, |
| task_id = "task1_easy", |
| user_url = TEST_DATASETS["task1_easy"] |
| ) |
| all_results.append(result1) |
| print(flush=True) |
|
|
| |
| result2 = run_task( |
| client = client, |
| task_id = "task2_medium", |
| user_url = TEST_DATASETS["task2_medium"] |
| ) |
| all_results.append(result2) |
| print(flush=True) |
|
|
| |
| result3 = run_task( |
| client = client, |
| task_id = "task3_hard", |
| user_url = TEST_DATASETS["task3_hard"] |
| ) |
| all_results.append(result3) |
| print(flush=True) |
|
|
| |
| print("=" * 60, flush=True) |
| print("BASELINE INFERENCE SUMMARY", flush=True) |
| print("=" * 60, flush=True) |
| for r in all_results: |
| status = "PASS" if r["success"] else "FAIL" |
| print( |
| f" {r['task_id']:<20} score={r['score']:.3f} " |
| f"steps={r['steps']} [{status}]", |
| flush=True |
| ) |
|
|
| valid_scores = [r["score"] for r in all_results] |
| overall_score = round(sum(valid_scores) / len(valid_scores), 4) |
| print(f"\n Overall average score: {overall_score:.4f}", flush=True) |
| print("=" * 60, flush=True) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|