Community_Collections_App / src /column_detection.py
lynn-twinkl
expadned logs with freeform col scores
7da8ace
from __future__ import annotations # harmless on 3.11+, useful on 3.7‑3.10
import re
import string
from typing import Sequence, Dict, Tuple, Optional
import pandas as pd
# ========= HELPER FUNCTIONS ========
def _max_or_eps(values, eps: float = 1e-9) -> float:
"""Avoid divide‑by‑zero during normalisation."""
return max(values) or eps
def _normalise(value: float, max_value: float) -> float:
return value / max_value if max_value else 0.0
# =================== FREEFORM COL =====================
def detect_freeform_col(
df: pd.DataFrame,
*,
length_weight: float = 0.4,
punct_weight: float = 0.3,
unique_weight: float = 0.3,
low_uniqueness_penalty: float = 0.4,
name_boosts: dict[str, float] | None = None,
min_score: float = 0.50,
return_scores: bool = False,
) -> str | None | Tuple[str | None, Dict[str, float]]:
"""
Guess which *object* column contains free‑text answers or comments.
A good free‑text column tends to be longish, rich in punctuation,
and fairly unique row‑to‑row.
name_boosts
e.g. ``{"additional_comment": 3.1, "usage_reason": 0.5}``
Multiplicative factors applied if the token appears in the header.
"""
name_boosts = name_boosts or {}
obj_cols = df.select_dtypes(include=["object"]).columns
# quick exit
if not obj_cols.size:
return (None, {}) if return_scores else None
# pre‑compute raw metrics
raw: Dict[str, dict[str, float]] = {}
for col in obj_cols:
ser = df[col].dropna().astype(str)
if ser.empty:
continue
raw[col] = {
"avg_len": ser.str.len().mean(),
"avg_punct": ser.apply(lambda s: sum(c in string.punctuation for c in s)).mean(),
"unique_ratio": ser.nunique() / len(ser),
}
if not raw:
return (None, {}) if return_scores else None
# normalisers
max_len = _max_or_eps([m["avg_len"] for m in raw.values()])
max_punc = _max_or_eps([m["avg_punct"] for m in raw.values()])
# composite scores
scores: Dict[str, float] = {}
for col, m in raw.items():
score = (
length_weight * _normalise(m["avg_len"], max_len)
+ punct_weight * _normalise(m["avg_punct"], max_punc)
+ unique_weight * m["unique_ratio"]
)
# header boosts / penalties
for token, factor in name_boosts.items():
if token in col.lower():
score *= factor
# penalise low uniqueness
if m["unique_ratio"] < low_uniqueness_penalty:
score *= 0.5
scores[col] = score
best_col, best_score = max(scores.items(), key=lambda kv: kv[1])
passed = best_score >= min_score
if return_scores:
return (best_col if passed else None, scores)
return best_col if passed else None
# ================= ID COLUMN =================
def detect_id_col(df: pd.DataFrame) -> str | None:
n_rows = len(df)
# 1) Name‐based detection
name_pattern = re.compile(r'\b(id|identifier|key)\b', re.IGNORECASE)
for col in df.columns:
if name_pattern.search(col):
return col
# 2) Uniqueness detection: columns where every row is unique
unique_cols = [
col for col in df.columns
if df[col].nunique(dropna=False) == n_rows
]
if not unique_cols:
return None
# 3) Prioritise int cols over object cols when both are unique
non_unnamed = [c for c in unique_cols if not c.startswith("Unnamed")]
candidates = non_unnamed or unique_cols
# 4) Prefer integer dtypes among candidates
for col in candidates:
if pd.api.types.is_integer_dtype(df[col]):
return col
# Fallback: return the first candidate
return candidates[0]
# ============== SCHOOL TYPE COLUMN =============
def detect_school_type_col(
df: pd.DataFrame,
*,
uniqueness_weight: float = 0.3,
content_match_weight: float = 0.4, # <-- New weight for content
length_weight: float = 0.2,
punct_weight: float = 0.1,
name_boosts: dict[str, float] | None = None,
value_keywords: set[str] | None = None, # <-- New parameter for keywords
min_score: float = 0.40,
high_uniqueness_penalty: float = 0.95,
return_scores: bool = False,
) -> str | None | Tuple[str | None, Dict[str, float]]:
"""
Analyzes a DataFrame to find the column that most likely represents a 'school type'.
The function operates on heuristics based on common characteristics of a school-type col:
1. **Content Match**: A significant portion of values match known school types (the strongest signal).
2. **Low Uniqueness**: Values are often repeated (e.g., 'Primary', 'All-through').
3. **Short Text**: Entries are typically brief.
4. **Minimal Punctuation**: Values are clean strings, not sentences.
5. **Header Keywords**: The column name itself is a strong indicator (e.g., 'School Type').
"""
# More robust default name boosts
if name_boosts is None:
name_boosts = {'school': 3.0, 'type': 2.0}
# Default set of keywords to search for within the column's values
if value_keywords is None:
value_keywords = {
'nursery', 'primary', 'secondary', 'infant', 'junior',
'college', 'academy', 'independent', 'special', 'pru',
'all-through', 'middle', 'state', 'educator', 'home'
}
obj_cols = df.select_dtypes(include=["object"]).columns
if not obj_cols.size:
return (None, {}) if return_scores else None
# Pre-compute raw metrics for each object column
raw_metrics: Dict[str, dict[str, float]] = {}
for col in obj_cols:
ser = df[col].dropna().astype(str)
if ser.empty:
continue
# --- New Content Match Calculation ---
unique_values = ser.unique()
content_match_score = 0.0
if len(unique_values) > 0:
match_count = 0
for val in unique_values:
# Check if any keyword is a substring of the lowercase value
if any(keyword in val.lower() for keyword in value_keywords):
match_count += 1
content_match_score = match_count / len(unique_values)
# --- End of New Calculation ---
raw_metrics[col] = {
"avg_len": ser.str.len().mean(),
"avg_punct": ser.apply(lambda s: sum(c in string.punctuation for c in s)).mean(),
"unique_ratio": ser.nunique() / len(ser) if len(ser) > 0 else 0.0,
"content_match": content_match_score # Store the new score
}
if not raw_metrics:
return (None, {}) if return_scores else None
# Get max values for normalization
max_len = _max_or_eps([m["avg_len"] for m in raw_metrics.values()])
max_punc = _max_or_eps([m["avg_punct"] for m in raw_metrics.values()])
# Calculate a final score for each column
scores: Dict[str, float] = {}
for col, metrics in raw_metrics.items():
len_score = 1 - _normalise(metrics["avg_len"], max_len)
punc_score = 1 - _normalise(metrics["avg_punct"], max_punc)
uniq_score = 1 - metrics["unique_ratio"]
# --- Updated Final Scoring Formula ---
score = (
content_match_weight * metrics["content_match"] # Use the new score directly
+ uniqueness_weight * uniq_score
+ length_weight * len_score
+ punct_weight * punc_score
)
# Apply boosts for matching header keywords
for token, factor in name_boosts.items():
if token in col.lower().strip():
score *= factor
# Apply penalty for columns that are almost entirely unique
if metrics["unique_ratio"] > high_uniqueness_penalty:
score *= 0.1 # Heavy penalty
scores[col] = score
if not scores:
return (None, {}) if return_scores else None
best_col, best_score = max(scores.items(), key=lambda item: item[1])
passed = best_score >= min_score
if return_scores:
return (best_col if passed else None, scores)
return best_col if passed else None
# =========== USAGE ============
def main():
df = pd.read_csv('data/raw/new-application-format-data.csv')
df.columns = df.columns.str.strip()
print("--- Testing Column Detection Functions ---")
id_col = detect_id_col(df)
freeform_col, freeform_scores = detect_freeform_col(df, return_scores=True)
school_type_col, school_type_scores = detect_school_type_col(df, return_scores=True)
print(f"\nDetected ID Column: '{id_col}'")
print(f"Detected Free-Form Column: '{freeform_col}'")
print(f"Detected School Type Column: '{school_type_col}'")
print()
print("\n--- Free-form Column Scores (Higher is better) ---")
if freeform_scores:
sorted_scores = sorted(freeform_scores.items(), key=lambda item: item[1], reverse=True)
for col, score in sorted_scores:
print(f" - {col:<25}: {score:.4f}")
else:
print("No object columns found to score for freeform col...")
print("\n--- School Type Column Scores (Higher is better) ---")
if school_type_scores:
sorted_scores = sorted(school_type_scores.items(), key=lambda item: item[1], reverse=True)
for col, score in sorted_scores:
print(f" - {col:<25}: {score:.4f}")
else:
print("No object columns found to score for career.")
if __name__ == '__main__':
main()