dataset-quality-env-fixed / tasks /task2_medium.py
Adithya765's picture
fix task3 score bounds
7910de6
"""
task2_medium.py
===============
Task 2 β€” Data Quality Analyzer (Medium)
OpenEnv Project | Meta Γ— Hugging Face Hackathon
What it does:
Fetches real rows from a HuggingFace dataset and runs
8 data quality checks on the actual data content.
Checks (8):
1. Exact duplicates 5. Class imbalance
2. Missing values 6. Wrong data types
3. Outliers (IQR method) 7. Invalid value ranges
4. Inconsistencies 8. Empty/constant columns
Usage:
python task2_medium.py
β†’ Enter HuggingFace dataset URL when prompted
β†’ Copy the JSON output into grader2.py
Requirements:
pip install requests
"""
# ============================================
# task2_medium.py
# Task 2: Data Quality Analysis
# Difficulty: Medium
# 8 Quality Checks on Real Dataset Rows
# Output: JSON format
# Compatible with Google Colab
# ============================================
import requests
import json
from collections import Counter
# ─────────────────────────────────────────────
# HELPER: Extract dataset name from URL
# ─────────────────────────────────────────────
def extract_dataset_name_t2(url):
if "huggingface.co" in url:
if "datasets/" in url:
name = url.split("datasets/")[-1]
else:
name = url.split("huggingface.co/")[-1]
return name.strip("/").strip()
return url.strip()
# ─────────────────────────────────────────────
# FETCH REAL ROWS FROM HUGGING FACE
# ─────────────────────────────────────────────
def fetch_dataset_rows_t2(dataset_name):
"""
Fetches real data rows from Hugging Face datasets-server API.
Tries multiple configs to maximize success rate.
"""
configs_to_try = [
f"https://datasets-server.huggingface.co/rows?dataset={dataset_name}&config=default&split=train&offset=0&limit=50",
f"https://datasets-server.huggingface.co/rows?dataset={dataset_name}&split=train&offset=0&limit=50",
f"https://datasets-server.huggingface.co/rows?dataset={dataset_name}&config=plain_text&split=train&offset=0&limit=50",
]
for url in configs_to_try:
try:
print(f" Trying: {url[:80]}...")
response = requests.get(url, timeout=15)
if response.status_code == 200:
raw = response.json()
rows_raw = raw.get("rows", [])
if rows_raw:
rows = [item.get("row", {}) for item in rows_raw]
columns = []
if raw.get("features"):
columns = [f["name"] for f in raw["features"]]
elif rows:
columns = list(rows[0].keys())
print(f" SUCCESS: Fetched {len(rows)} rows, {len(columns)} columns")
return {
"dataset_name": dataset_name,
"columns": columns,
"rows": rows,
"total_fetched": len(rows)
}
except Exception as e:
print(f" Failed: {str(e)}")
continue
print(" All fetch attempts failed.")
return None
# ─────────────────────────────────────────────
# THE 8 QUALITY CHECKS
# ─────────────────────────────────────────────
def check_duplicates(rows):
"""Check 1: Find exact duplicate rows."""
issues = []
seen = {}
dup_count = 0
for i, row in enumerate(rows):
key = json.dumps(row, sort_keys=True)
if key in seen:
dup_count += 1
issues.append({
"check": "duplicate",
"severity": "high",
"row_index": i,
"duplicate_of": seen[key],
"description": f"Row {i} is exact duplicate of Row {seen[key]}"
})
else:
seen[key] = i
return issues, dup_count
def check_missing_values(rows, columns):
"""Check 2: Find null, empty, or None values."""
issues = []
affected_rows = 0
for i, row in enumerate(rows):
missing_cols = []
for col in columns:
val = row.get(col)
if val is None or val == "" or str(val).lower() == "null" or str(val).lower() == "nan":
missing_cols.append(col)
if missing_cols:
affected_rows += 1
issues.append({
"check": "missing_value",
"severity": "medium",
"row_index": i,
"missing_cols": missing_cols,
"description": f"Row {i} has missing values in: {missing_cols}"
})
return issues, affected_rows
def check_outliers(rows, columns):
"""Check 3: Find outliers using IQR method on numeric columns."""
issues = []
outlier_count = 0
for col in columns:
values = []
for row in rows:
val = row.get(col)
if val is not None and val != "":
try:
values.append((rows.index(row), float(val)))
except (ValueError, TypeError):
pass
if len(values) < 5:
continue
nums = [v[1] for v in values]
sorted_ = sorted(nums)
q1 = sorted_[len(sorted_) // 4]
q3 = sorted_[(len(sorted_) * 3) // 4]
iqr = q3 - q1
if iqr == 0:
continue
lower = q1 - 1.5 * iqr
upper = q3 + 1.5 * iqr
for row_idx, num_val in values:
if num_val < lower or num_val > upper:
outlier_count += 1
issues.append({
"check": "outlier",
"severity": "high",
"row_index": row_idx,
"column": col,
"value": num_val,
"expected_range": f"{round(lower,2)} to {round(upper,2)}",
"description": f"Row {row_idx}: '{col}'={num_val} is an outlier"
})
return issues, outlier_count
def check_inconsistencies(rows, columns):
"""Check 4: Find same values written differently (e.g. USA vs U.S.A)."""
issues = []
incon_count = 0
for col in columns:
normalized_map = {}
for i, row in enumerate(rows):
val = row.get(col)
if val is not None and isinstance(val, str) and val.strip() != "":
norm = val.lower().strip().replace(".", "").replace("-", "").replace("_", "")
if norm not in normalized_map:
normalized_map[norm] = set()
normalized_map[norm].add(val)
for norm, variants in normalized_map.items():
if len(variants) > 1:
incon_count += 1
issues.append({
"check": "inconsistency",
"severity": "medium",
"column": col,
"variants": list(variants),
"description": f"Column '{col}' has inconsistent values: {list(variants)}"
})
return issues, incon_count
def check_class_imbalance(rows, columns):
"""Check 5: Find heavily imbalanced label/target columns."""
issues = []
imbal_count = 0
# Look for columns likely to be labels
label_keywords = ["label", "target", "class", "category", "sentiment",
"output", "y", "tag", "type", "split"]
for col in columns:
if not any(kw in col.lower() for kw in label_keywords):
continue
values = []
for row in rows:
val = row.get(col)
if val is not None and val != "":
values.append(str(val))
if len(values) < 5:
continue
counts = Counter(values)
total = sum(counts.values())
max_count = max(counts.values())
min_count = min(counts.values())
ratio = max_count / total
# Flag if one class dominates more than 80%
if ratio > 0.80:
imbal_count += 1
issues.append({
"check": "class_imbalance",
"severity": "high",
"column": col,
"distribution": dict(counts),
"dominant_ratio": round(ratio, 2),
"description": f"Column '{col}' is imbalanced: {dict(counts)}. Dominant class = {round(ratio*100)}%"
})
return issues, imbal_count
def check_wrong_data_types(rows, columns):
"""Check 6: Find columns where values have mixed/wrong data types."""
issues = []
type_count = 0
for col in columns:
type_counts = {"int": 0, "float": 0, "str": 0, "bool": 0, "none": 0}
for row in rows:
val = row.get(col)
if val is None:
type_counts["none"] += 1
elif isinstance(val, bool):
type_counts["bool"] += 1
elif isinstance(val, int):
type_counts["int"] += 1
elif isinstance(val, float):
type_counts["float"] += 1
elif isinstance(val, str):
type_counts["str"] += 1
# Find active types (ignore none)
active_types = {k: v for k, v in type_counts.items() if v > 0 and k != "none"}
# Flag if more than one type exists in the column
# (excluding int+float combo which is fine)
meaningful_types = set(active_types.keys()) - {"none"}
if "int" in meaningful_types and "float" in meaningful_types:
meaningful_types.discard("int") # int+float is acceptable
if len(meaningful_types) > 1:
type_count += 1
issues.append({
"check": "wrong_data_type",
"severity": "medium",
"column": col,
"types_found": active_types,
"description": f"Column '{col}' has mixed data types: {active_types}"
})
return issues, type_count
def check_invalid_ranges(rows, columns):
"""Check 7: Find values outside valid/logical ranges."""
issues = []
range_count = 0
# Common column name patterns and their valid ranges
range_rules = {
"age": (0, 120),
"year": (1900, 2100),
"rating": (0, 10),
"score": (0, 100),
"percentage": (0, 100),
"percent": (0, 100),
"price": (0, 1e9),
"salary": (0, 1e9),
"count": (0, 1e9),
"rank": (1, 1e6),
}
for col in columns:
col_lower = col.lower()
rule = None
for keyword, (min_val, max_val) in range_rules.items():
if keyword in col_lower:
rule = (min_val, max_val)
break
if not rule:
continue
min_val, max_val = rule
for i, row in enumerate(rows):
val = row.get(col)
if val is None or val == "":
continue
try:
num = float(val)
if num < min_val or num > max_val:
range_count += 1
issues.append({
"check": "invalid_range",
"severity": "high",
"row_index": i,
"column": col,
"value": num,
"valid_range": f"{min_val} to {max_val}",
"description": f"Row {i}: '{col}'={num} is outside valid range ({min_val}-{max_val})"
})
except (ValueError, TypeError):
pass
return issues, range_count
def check_empty_constant_columns(rows, columns):
"""Check 8: Find columns that are empty or have only one unique value."""
issues = []
col_count = 0
for col in columns:
values = []
for row in rows:
val = row.get(col)
if val is not None and val != "":
values.append(str(val))
total_rows = len(rows)
# Empty column: all values are missing
if len(values) == 0:
col_count += 1
issues.append({
"check": "empty_column",
"severity": "high",
"column": col,
"description": f"Column '{col}' is completely empty across all rows"
})
# Constant column: only one unique value
elif len(set(values)) == 1 and len(values) == total_rows:
col_count += 1
issues.append({
"check": "constant_column",
"severity": "medium",
"column": col,
"unique_value": values[0],
"description": f"Column '{col}' has only one unique value: '{values[0]}' β€” useless for ML"
})
return issues, col_count
# ─────────────────────────────────────────────
# MAIN ANALYSIS FUNCTION
# Runs all 8 checks and builds JSON output
# ─────────────────────────────────────────────
def analyze_data_quality(data):
"""
Runs all 8 quality checks on the dataset rows.
Returns complete JSON output.
"""
rows = data["rows"]
columns = data["columns"]
print("\n Running 8 quality checks...")
# Run all 8 checks
dup_issues, dup_count = check_duplicates(rows)
miss_issues, miss_count = check_missing_values(rows, columns)
out_issues, out_count = check_outliers(rows, columns)
incon_issues, incon_count = check_inconsistencies(rows, columns)
imbal_issues, imbal_count = check_class_imbalance(rows, columns)
type_issues, type_count = check_wrong_data_types(rows, columns)
range_issues, range_count = check_invalid_ranges(rows, columns)
col_issues, col_count = check_empty_constant_columns(rows, columns)
print(f" Check 1 - Duplicates: {dup_count} found")
print(f" Check 2 - Missing Values: {miss_count} rows affected")
print(f" Check 3 - Outliers: {out_count} found")
print(f" Check 4 - Inconsistencies: {incon_count} found")
print(f" Check 5 - Class Imbalance: {imbal_count} columns affected")
print(f" Check 6 - Wrong Data Types: {type_count} columns affected")
print(f" Check 7 - Invalid Ranges: {range_count} found")
print(f" Check 8 - Empty/Constant Cols: {col_count} found")
# Combine all issues
all_issues = (
dup_issues +
miss_issues +
out_issues +
incon_issues +
imbal_issues +
type_issues +
range_issues +
col_issues
)
total_issues = len(all_issues)
# ── Calculate Quality Score ──
# Each issue type has a penalty weight
penalty = (
dup_count * 0.05 +
miss_count * 0.02 +
out_count * 0.03 +
incon_count * 0.02 +
imbal_count * 0.08 +
type_count * 0.04 +
range_count * 0.04 +
col_count * 0.05
)
quality_score = round(max(0.01, min(0.99, 1.0 - penalty)), 2)
# ── Verdict ──
high_count = sum(1 for i in all_issues if i["severity"] == "high")
if quality_score < 0.30 or high_count >= 5:
verdict = "rejected"
elif quality_score < 0.55 or high_count >= 3:
verdict = "needs_major_fixes"
elif quality_score < 0.80 or total_issues > 2:
verdict = "needs_minor_fixes"
else:
verdict = "good_quality"
# ── Recommendations ──
recommendations = []
if dup_count > 0: recommendations.append(f"Remove {dup_count} duplicate rows to prevent model overfitting")
if miss_count > 0: recommendations.append(f"Handle missing values in {miss_count} rows β€” impute or remove")
if out_count > 0: recommendations.append(f"Investigate {out_count} outlier values β€” verify or cap/remove")
if incon_count > 0: recommendations.append(f"Standardize {incon_count} inconsistent values (e.g. USA vs U.S.A)")
if imbal_count > 0: recommendations.append(f"Fix class imbalance in {imbal_count} columns β€” use oversampling or SMOTE")
if type_count > 0: recommendations.append(f"Fix mixed data types in {type_count} columns β€” enforce consistent types")
if range_count > 0: recommendations.append(f"Remove or correct {range_count} values outside valid ranges")
if col_count > 0: recommendations.append(f"Drop {col_count} empty or constant columns β€” they add no ML value")
if not recommendations:
recommendations.append("Data quality looks good! No major issues detected.")
# ── Build Final JSON Output ──
final_output = {
"dataset_info": {
"dataset_name": data["dataset_name"],
"total_rows": len(rows),
"total_columns": len(columns),
"columns": columns
},
"quality_report": {
"total_issues_found": total_issues,
"issue_summary": {
"duplicates": dup_count,
"missing_values": miss_count,
"outliers": out_count,
"inconsistencies": incon_count,
"class_imbalance": imbal_count,
"wrong_data_types": type_count,
"invalid_ranges": range_count,
"empty_constant_cols": col_count
},
"issues_found": all_issues,
"quality_score": quality_score,
"recommendations": recommendations
},
"verdict": verdict,
# This is what goes into grader2
"agent_action": {
"task_id": "task2_medium",
"total_issues": total_issues,
"issue_summary": {
"duplicates": dup_count,
"missing_values": miss_count,
"outliers": out_count,
"inconsistencies": incon_count,
"class_imbalance": imbal_count,
"wrong_data_types": type_count,
"invalid_ranges": range_count,
"empty_constant_cols": col_count
},
"issues_found": all_issues,
"quality_score": quality_score,
"recommendations": recommendations,
"verdict": verdict
}
}
return final_output
# ─────────────────────────────────────────────
# ─────────────────────────────────────────────
# USER INPUT + MAIN RUNNER (only when executed directly)
# ─────────────────────────────────────────────
if __name__ == "__main__":
print("=" * 60)
print("TASK 2 - Data Quality Analyzer (8 Checks)")
print("=" * 60)
print("Example URLs:")
print(" https://huggingface.co/datasets/imdb")
print(" https://huggingface.co/datasets/ag_news")
print("=" * 60)
user_url = input("\nPaste your Hugging Face dataset URL: ").strip()
dataset_name = extract_dataset_name_t2(user_url)
print(f"\nFetching rows from '{dataset_name}'...")
data = fetch_dataset_rows_t2(dataset_name)
if data is None:
print("\nCould not fetch rows from this dataset.")
else:
result = analyze_data_quality(data)
print("\n" + "=" * 60)
print("RESULTS IN JSON FORMAT")
print("=" * 60)
import json as _json
print(_json.dumps(result, indent=2))
TASK2 = {
"task_id": "task2_medium",
"name": "Data Quality Analysis",
"difficulty": "medium",
"max_turns": 1,
"description": (
"Analyze 50 real dataset rows from HuggingFace for 8 quality issues: "
"duplicates, missing values, outliers, inconsistencies, class imbalance, "
"wrong data types, invalid ranges, empty/constant columns."
),
"expected_score_range": [0.50, 0.75],
}