Adithya765's picture
Dataset Quality Inspector - OpenEnv Hackathon
4e4ba7a
"""
inference.py
============================================
Baseline Inference Script
Dataset Quality Inspector Agent
Meta x Hugging Face x OpenEnv Hackathon
============================================
STDOUT FORMAT (strictly followed):
[START] task=<task_name> env=<benchmark> model=<model_name>
[STEP] step=<n> action=<action_str> reward=<0.00> done=<true|false> error=<msg|null>
[END] success=<true|false> steps=<n> score=<score> rewards=<r1,r2,...,rn>
MANDATORY Environment Variables (must be set before running):
API_BASE_URL The API endpoint for the LLM.
Default: https://router.huggingface.co/v1
MODEL_NAME The model identifier to use for inference.
Default: Qwen/Qwen2.5-72B-Instruct
HF_TOKEN Your Hugging Face / API key.
SPACE_URL Your deployed HF Space URL.
Example: https://your-username-dataset-quality-env.hf.space
The script will wait up to 90s for the Space to wake if needed.
Example:
export API_BASE_URL=https://router.huggingface.co/v1
export MODEL_NAME=Qwen/Qwen2.5-72B-Instruct
export HF_TOKEN=hf_xxxxxxxxxxxxxxxxxxxx
export SPACE_URL=https://your-username-dataset-quality-env.hf.space
python inference.py
"""
import os
import sys
import json
import time
import requests
import textwrap
from typing import List, Optional
from openai import OpenAI
# ─────────────────────────────────────────────
# ENVIRONMENT VARIABLES
# ─────────────────────────────────────────────
API_BASE_URL = os.getenv("API_BASE_URL", "https://router.huggingface.co/v1")
MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen2.5-72B-Instruct")
API_KEY = os.getenv("HF_TOKEN") or os.getenv("API_KEY", "hf_placeholder")
SPACE_URL = os.getenv("SPACE_URL", "http://localhost:7860").rstrip("/")
BENCHMARK = os.getenv("BENCHMARK", "dataset-quality-inspector")
# Test datasets for each task
TEST_DATASETS = {
"task1_easy": "https://huggingface.co/datasets/imdb",
"task2_medium": "https://huggingface.co/datasets/ag_news",
"task3_hard": "dair-ai/emotion",
}
MAX_STEPS = 8
SUCCESS_SCORE_THRESHOLD = 0.5
# ─────────────────────────────────────────────
# STDOUT LOGGING β€” exact required format
# ─────────────────────────────────────────────
def log_start(task: str, env: str, model: str) -> None:
print(f"[START] task={task} env={env} model={model}", flush=True)
def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None:
# Sanitize action β€” no newlines allowed on a single line
action_clean = str(action).replace("\n", " ").replace("\r", " ")[:200]
error_val = error if error else "null"
done_val = str(done).lower()
print(
f"[STEP] step={step} action={action_clean} reward={reward:.2f} "
f"done={done_val} error={error_val}",
flush=True,
)
def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None:
rewards_str = ",".join(f"{r:.2f}" for r in rewards)
print(
f"[END] success={str(success).lower()} steps={steps} "
f"score={score:.3f} rewards={rewards_str}",
flush=True,
)
# ─────────────────────────────────────────────
# ENVIRONMENT API CALLS
# Calls your running HF Space, with retry/wake-up logic
# ─────────────────────────────────────────────
def wait_for_space(max_wait: int = 90) -> bool:
"""
Ping GET / on the Space until it responds 200 or max_wait seconds pass.
HF Spaces can be cold-started β€” this prevents instant 0-score on wakeup.
Returns True if Space is up, False if timed out.
"""
deadline = time.time() + max_wait
attempt = 0
while time.time() < deadline:
try:
r = requests.get(SPACE_URL, timeout=15)
if r.status_code == 200:
if attempt > 0:
print(f"[DEBUG] Space woke up after {attempt} attempts.", flush=True)
return True
except requests.exceptions.RequestException:
pass
attempt += 1
time.sleep(5)
print(f"[DEBUG] Space did not respond after {max_wait}s.", flush=True)
return False
def _post_with_retry(url: str, payload: dict, retries: int = 3, timeout: int = 60) -> dict:
"""POST with exponential back-off retry."""
last_err = None
for attempt in range(1, retries + 1):
try:
resp = requests.post(url, json=payload, timeout=timeout)
resp.raise_for_status()
return resp.json()
except Exception as e:
last_err = e
if attempt < retries:
wait = 2 ** attempt
print(f"[DEBUG] POST {url} failed (attempt {attempt}): {e}. Retrying in {wait}s...", flush=True)
time.sleep(wait)
raise RuntimeError(f"All {retries} attempts to POST {url} failed. Last error: {last_err}")
def env_reset(task_id: str, user_url: str) -> dict:
"""Call POST /reset on HF Space with retry."""
return _post_with_retry(
f"{SPACE_URL}/reset",
{"task_id": task_id, "user_url": user_url}
)
def env_step(action: dict) -> dict:
"""Call POST /step on HF Space with retry."""
return _post_with_retry(
f"{SPACE_URL}/step",
{"action": action}
)
def env_state() -> dict:
"""Call GET /state on HF Space."""
resp = requests.get(f"{SPACE_URL}/state", timeout=30)
resp.raise_for_status()
return resp.json()
# ─────────────────────────────────────────────
# SYSTEM PROMPTS β€” per task
# ─────────────────────────────────────────────
SYSTEM_PROMPT_T1 = textwrap.dedent("""
You are a professional dataset quality inspector.
You will receive HuggingFace dataset metadata.
Your job is to identify ALL missing or incomplete fields and return a JSON action.
You MUST return a valid JSON object with these exact fields:
{
"task_id": "task1_easy",
"missing_fields": ["list", "of", "missing", "field", "names"],
"issues_found": [
{"field": "license", "issue": "License is missing", "severity": "critical", "reason": "Cannot use without license"}
],
"quality_score": 0.45,
"severity_summary": {"critical": 1, "high": 2, "medium": 1, "low": 2},
"recommendation": "Add license urgently. Specify task_type and column_descriptions.",
"verdict": "incomplete"
}
Fields to check: source, author, description, license, num_rows, column_descriptions,
task_type, language, date_created, tags, citation.
Severity levels: critical (blocks usage), high (very important), medium (should fix), low (nice to have)
Verdicts: complete, needs_minor_fixes, incomplete, rejected
Return ONLY the JSON object. No explanation, no markdown, no backticks.
""").strip()
SYSTEM_PROMPT_T2 = textwrap.dedent("""
You are a data quality analyst.
You will receive real dataset rows from HuggingFace.
Your job is to count ALL data quality issues and return a JSON action.
You MUST return a valid JSON object with these exact fields:
{
"task_id": "task2_medium",
"issue_summary": {
"duplicates": 0,
"missing_values": 0,
"outliers": 0,
"inconsistencies": 0,
"class_imbalance": 0,
"wrong_data_types": 0,
"invalid_ranges": 0,
"empty_constant_cols": 0
},
"total_issues": 0,
"quality_score": 0.85,
"verdict": "good_quality",
"recommendations": ["list of recommendations"]
}
Check for:
1. duplicates - exact same rows
2. missing_values - null or empty cells
3. outliers - impossible values (age=-5, age=999)
4. inconsistencies - same value written differently (USA vs U.S.A)
5. class_imbalance - one label dominates >80%
6. wrong_data_types - mixed types in same column
7. invalid_ranges - values outside logical bounds
8. empty_constant_cols - columns with only one value or all empty
Verdicts: good_quality, needs_minor_fixes, needs_major_fixes, rejected
Return ONLY the JSON object. No explanation, no markdown, no backticks.
""").strip()
SYSTEM_PROMPT_T3 = textwrap.dedent("""
You are a senior data auditor conducting a 5-turn deep audit.
Each turn focuses on a different aspect of dataset quality.
Return a JSON object for each turn.
Turn 1 - Dataset Overview: Analyze columns, types, missing rates, detect text/label columns.
Turn 2 - Near Duplicates: Find near-duplicate rows and annotation inconsistencies.
Turn 3 - Label Noise & Toxicity: Find noisy labels and toxic/hate content.
Turn 4 - Bias & Language: Detect gender/racial/cultural bias, check language diversity.
Turn 5 - Final Report: Data leakage, domain drift, complete audit report.
For turns 1-4, return:
{"task_id": "task3_hard", "turn_response": "Your detailed analysis here"}
For turn 5 ONLY, return:
{
"task_id": "task3_hard",
"turn_response": "Final summary",
"audit_report": {
"audit_score": 0.75,
"verdict": "GOOD",
"scores": {
"near_duplicate_score": 0.95,
"annotation_consistency_score": 0.90,
"label_noise_score": 0.85,
"toxicity_score": 1.0,
"bias_score": 0.80,
"language_consistency_score": 0.98,
"data_leakage_score": 1.0,
"domain_drift_score": 0.9
},
"recommendations": ["List of specific fixes needed"]
}
}
Return ONLY the JSON object. No explanation, no markdown, no backticks.
""").strip()
# ─────────────────────────────────────────────
# LLM CALL β€” uses OpenAI client
# ─────────────────────────────────────────────
def call_llm(client: OpenAI, system_prompt: str, user_content: str) -> str:
"""Call LLM via OpenAI client. Returns raw text response."""
try:
completion = client.chat.completions.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_content},
],
temperature=0.1, # Low temperature for deterministic JSON output
max_tokens=1000,
stream=False,
)
text = (completion.choices[0].message.content or "").strip()
return text
except Exception as e:
print(f"[DEBUG] LLM call failed: {e}", flush=True)
return ""
def parse_json_response(text: str) -> dict:
"""Parse JSON from LLM response. Handles markdown code blocks."""
text = text.strip()
# Remove markdown code blocks if present
if text.startswith("```"):
lines = text.split("\n")
text = "\n".join(lines[1:-1]) if len(lines) > 2 else text
text = text.strip()
try:
return json.loads(text)
except json.JSONDecodeError:
# Try to find JSON object in response
start = text.find("{")
end = text.rfind("}") + 1
if start >= 0 and end > start:
try:
return json.loads(text[start:end])
except json.JSONDecodeError:
pass
return {}
# ─────────────────────────────────────────────
# FALLBACK ACTIONS
# Used if LLM fails β€” ensures valid submission
# ─────────────────────────────────────────────
def fallback_action_t1(observation: dict) -> dict:
"""Fallback action for Task 1 if LLM fails."""
dataset = observation.get("dataset", {})
missing = []
issues = []
sev_map = {
"license": "critical", "description": "critical", "source": "critical",
"task_type": "high", "column_descriptions": "high", "author": "high",
"language": "medium", "date_created": "medium", "num_rows": "medium",
"tags": "low", "citation": "low"
}
for field, sev in sev_map.items():
val = dataset.get(field)
if not val or val == "" or val == {} or val == [] or val == 0:
missing.append(field)
issues.append({
"field": field,
"issue": f"{field} is missing or empty",
"severity": sev,
"reason": f"Important field {field} needs to be filled"
})
crit = [f for f in missing if sev_map.get(f) == "critical"]
high = [f for f in missing if sev_map.get(f) == "high"]
med = [f for f in missing if sev_map.get(f) == "medium"]
low = [f for f in missing if sev_map.get(f) == "low"]
if len(crit) >= 2: verdict = "rejected"
elif len(crit) == 1 or len(high) >= 2: verdict = "incomplete"
elif missing: verdict = "needs_minor_fixes"
else: verdict = "complete"
filled = 12 - len(missing)
score = round(max(0.0, (filled / 12) - len(crit) * 0.15), 2)
parts = []
if crit: parts.append(f"URGENT: Add {', '.join(crit)}")
if high: parts.append(f"HIGH: Add {', '.join(high)}")
if med: parts.append(f"MEDIUM: Add {', '.join(med)}")
return {
"task_id": "task1_easy",
"missing_fields": missing,
"issues_found": issues,
"quality_score": score,
"severity_summary": {"critical": len(crit), "high": len(high),
"medium": len(med), "low": len(low)},
"recommendation": ". ".join(parts) or "Dataset looks complete.",
"verdict": verdict
}
def fallback_action_t2() -> dict:
"""Fallback action for Task 2 if LLM fails."""
return {
"task_id": "task2_medium",
"issue_summary": {
"duplicates": 0, "missing_values": 0, "outliers": 0,
"inconsistencies": 0, "class_imbalance": 0,
"wrong_data_types": 0, "invalid_ranges": 0, "empty_constant_cols": 0
},
"total_issues": 0,
"quality_score": 0.90,
"verdict": "good_quality",
"recommendations": ["Dataset appears to be in good quality."]
}
def fallback_action_t3(turn: int) -> dict:
"""Fallback action for Task 3 if LLM fails."""
responses = {
1: "Dataset loaded. Analyzing columns and structure.",
2: "Checking for near-duplicates and annotation inconsistencies.",
3: "Checking for label noise and toxic content.",
4: "Analyzing bias and linguistic diversity.",
}
if turn < 5:
return {
"task_id": "task3_hard",
"turn_response": responses.get(turn, f"Completing turn {turn} analysis.")
}
return {
"task_id": "task3_hard",
"turn_response": "Audit complete.",
"audit_report": {
"audit_score": 0.70,
"verdict": "GOOD",
"scores": {
"near_duplicate_score": 0.90,
"annotation_consistency_score": 0.85,
"label_noise_score": 0.90,
"toxicity_score": 0.95,
"bias_score": 0.75,
"language_consistency_score": 0.98,
"data_leakage_score": 1.00,
"domain_drift_score": 0.90
},
"recommendations": [
"Review near-duplicate pairs",
"Check annotation consistency",
"Audit for bias in sensitive columns"
]
}
}
# ─────────────────────────────────────────────
# RUN ONE TASK EPISODE
# ─────────────────────────────────────────────
def run_task(client: OpenAI, task_id: str, user_url: str) -> dict:
"""
Runs one full episode for a given task.
Returns: {score, steps, rewards, success}
"""
rewards = []
steps_taken = 0
score = 0.0
success = False
error_msg = None
log_start(task=task_id, env=BENCHMARK, model=MODEL_NAME)
try:
# ── Reset environment ──
observation = env_reset(task_id, user_url)
if observation.get("error"):
raise Exception(observation.get("message", "Reset failed"))
max_turns = observation.get("max_turns", 1)
done = False
for step in range(1, max_turns + 1):
if done:
break
steps_taken = step
action = {}
error_msg = None
try:
# ── Build user prompt from observation ──
obs_text = json.dumps(observation, indent=2)[:3000] # limit size
# ── Get LLM response ──
if task_id == "task1_easy":
system_prompt = SYSTEM_PROMPT_T1
user_prompt = (
f"Inspect this dataset metadata and return your JSON action:\n\n"
f"{obs_text}"
)
elif task_id == "task2_medium":
system_prompt = SYSTEM_PROMPT_T2
user_prompt = (
f"Analyze these dataset rows for quality issues and return your JSON action:\n\n"
f"{obs_text}"
)
else: # task3_hard
system_prompt = SYSTEM_PROMPT_T3
turn_num = observation.get("turn", step)
user_prompt = (
f"You are on Turn {turn_num} of 5.\n"
f"Instructions: {observation.get('instructions', '')}\n\n"
f"Dataset info:\n{obs_text}\n\n"
f"Return your JSON action for this turn."
)
llm_response = call_llm(client, system_prompt, user_prompt)
action = parse_json_response(llm_response)
# ── Use fallback if LLM returned empty/invalid JSON ──
if not action or "task_id" not in action:
print(f"[DEBUG] LLM returned invalid JSON, using fallback", flush=True)
if task_id == "task1_easy":
action = fallback_action_t1(observation)
elif task_id == "task2_medium":
action = fallback_action_t2()
else:
turn_num = observation.get("turn", step)
action = fallback_action_t3(turn_num)
# ── Ensure task_id is set ──
action["task_id"] = task_id
# ── Send action to environment ──
result = env_step(action)
reward = float(result.get("score", result.get("cumulative_score", 0.0)))
# For multi-turn, use turn score not cumulative
if task_id == "task3_hard" and not result.get("done", False):
reward = float(result.get("score", 0.10))
done = result.get("done", True)
rewards.append(reward)
# ── Log step ──
action_str = json.dumps(action).replace("\n", " ")[:200]
log_step(
step=step,
action=action_str,
reward=reward,
done=done,
error=error_msg
)
# ── Update observation for next turn (task3) ──
if not done and task_id == "task3_hard":
next_obs = result.get("next_observation", {})
if next_obs:
observation = next_obs
except Exception as step_err:
error_msg = str(step_err)[:100]
rewards.append(0.0)
log_step(
step=step,
action="error",
reward=0.0,
done=True,
error=error_msg
)
done = True
break
# ── Calculate final score ──
if rewards:
score = round(sum(rewards) / len(rewards), 4)
score = min(max(score, 0.0), 1.0)
success = score >= SUCCESS_SCORE_THRESHOLD
except Exception as e:
error_msg = str(e)[:100]
print(f"[DEBUG] Task {task_id} failed: {error_msg}", flush=True)
if not rewards:
rewards.append(0.0)
score = 0.0
success = False
log_end(
success=success,
steps=steps_taken,
score=score,
rewards=rewards
)
return {
"task_id": task_id,
"score": score,
"steps": steps_taken,
"rewards": rewards,
"success": success
}
# ─────────────────────────────────────────────
# MAIN β€” runs all 3 tasks
# ─────────────────────────────────────────────
def main() -> None:
"""
Runs the baseline inference agent through all 3 tasks.
Prints [START], [STEP], [END] lines for each task.
"""
# ── Validate required env vars ──
missing_vars = []
if not os.getenv("HF_TOKEN") and not os.getenv("API_KEY"):
missing_vars.append("HF_TOKEN")
if not os.getenv("SPACE_URL"):
print("[DEBUG] WARNING: SPACE_URL not set β€” defaulting to http://localhost:7860", flush=True)
if missing_vars:
print(f"[DEBUG] WARNING: Missing env vars: {missing_vars}. Scores may be 0.", flush=True)
# ── Wait for HF Space to wake up ──
print(f"[DEBUG] Space URL: {SPACE_URL}", flush=True)
print(f"[DEBUG] Model: {MODEL_NAME}", flush=True)
print(f"[DEBUG] API Base: {API_BASE_URL}", flush=True)
print(f"[DEBUG] Waiting for Space to be ready...", flush=True)
if not wait_for_space(max_wait=90):
print("[DEBUG] FATAL: HF Space unreachable. Aborting.", flush=True)
sys.exit(1)
print(f"[DEBUG] Space is ready. Running 3 tasks...", flush=True)
print(flush=True)
# Initialize OpenAI client
client = OpenAI(
base_url=API_BASE_URL,
api_key=API_KEY
)
all_results = []
# ── Run Task 1 ──
result1 = run_task(
client = client,
task_id = "task1_easy",
user_url = TEST_DATASETS["task1_easy"]
)
all_results.append(result1)
print(flush=True)
# ── Run Task 2 ──
result2 = run_task(
client = client,
task_id = "task2_medium",
user_url = TEST_DATASETS["task2_medium"]
)
all_results.append(result2)
print(flush=True)
# ── Run Task 3 ──
result3 = run_task(
client = client,
task_id = "task3_hard",
user_url = TEST_DATASETS["task3_hard"]
)
all_results.append(result3)
print(flush=True)
# ── Final Summary ──
print("=" * 60, flush=True)
print("BASELINE INFERENCE SUMMARY", flush=True)
print("=" * 60, flush=True)
for r in all_results:
status = "PASS" if r["success"] else "FAIL"
print(
f" {r['task_id']:<20} score={r['score']:.3f} "
f"steps={r['steps']} [{status}]",
flush=True
)
valid_scores = [r["score"] for r in all_results]
overall_score = round(sum(valid_scores) / len(valid_scores), 4)
print(f"\n Overall average score: {overall_score:.4f}", flush=True)
print("=" * 60, flush=True)
if __name__ == "__main__":
main()