|
|
from __future__ import annotations |
|
|
import re |
|
|
import string |
|
|
from typing import Sequence, Dict, Tuple, Optional |
|
|
import pandas as pd |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _max_or_eps(values, eps: float = 1e-9) -> float: |
|
|
"""Avoid divide‑by‑zero during normalisation.""" |
|
|
return max(values) or eps |
|
|
|
|
|
|
|
|
def _normalise(value: float, max_value: float) -> float: |
|
|
return value / max_value if max_value else 0.0 |
|
|
|
|
|
|
|
|
|
|
|
def detect_freeform_col( |
|
|
df: pd.DataFrame, |
|
|
*, |
|
|
length_weight: float = 0.4, |
|
|
punct_weight: float = 0.3, |
|
|
unique_weight: float = 0.3, |
|
|
low_uniqueness_penalty: float = 0.4, |
|
|
name_boosts: dict[str, float] | None = None, |
|
|
min_score: float = 0.50, |
|
|
return_scores: bool = False, |
|
|
) -> str | None | Tuple[str | None, Dict[str, float]]: |
|
|
""" |
|
|
Guess which *object* column contains free‑text answers or comments. |
|
|
|
|
|
A good free‑text column tends to be longish, rich in punctuation, |
|
|
and fairly unique row‑to‑row. |
|
|
|
|
|
name_boosts |
|
|
e.g. ``{"additional_comment": 3.1, "usage_reason": 0.5}`` |
|
|
Multiplicative factors applied if the token appears in the header. |
|
|
""" |
|
|
name_boosts = name_boosts or {} |
|
|
obj_cols = df.select_dtypes(include=["object"]).columns |
|
|
|
|
|
|
|
|
if not obj_cols.size: |
|
|
return (None, {}) if return_scores else None |
|
|
|
|
|
|
|
|
raw: Dict[str, dict[str, float]] = {} |
|
|
for col in obj_cols: |
|
|
ser = df[col].dropna().astype(str) |
|
|
if ser.empty: |
|
|
continue |
|
|
raw[col] = { |
|
|
"avg_len": ser.str.len().mean(), |
|
|
"avg_punct": ser.apply(lambda s: sum(c in string.punctuation for c in s)).mean(), |
|
|
"unique_ratio": ser.nunique() / len(ser), |
|
|
} |
|
|
|
|
|
if not raw: |
|
|
return (None, {}) if return_scores else None |
|
|
|
|
|
|
|
|
max_len = _max_or_eps([m["avg_len"] for m in raw.values()]) |
|
|
max_punc = _max_or_eps([m["avg_punct"] for m in raw.values()]) |
|
|
|
|
|
|
|
|
scores: Dict[str, float] = {} |
|
|
for col, m in raw.items(): |
|
|
score = ( |
|
|
length_weight * _normalise(m["avg_len"], max_len) |
|
|
+ punct_weight * _normalise(m["avg_punct"], max_punc) |
|
|
+ unique_weight * m["unique_ratio"] |
|
|
) |
|
|
|
|
|
|
|
|
for token, factor in name_boosts.items(): |
|
|
if token in col.lower(): |
|
|
score *= factor |
|
|
|
|
|
|
|
|
if m["unique_ratio"] < low_uniqueness_penalty: |
|
|
score *= 0.5 |
|
|
|
|
|
scores[col] = score |
|
|
|
|
|
best_col, best_score = max(scores.items(), key=lambda kv: kv[1]) |
|
|
passed = best_score >= min_score |
|
|
|
|
|
if return_scores: |
|
|
return (best_col if passed else None, scores) |
|
|
return best_col if passed else None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def detect_id_col(df: pd.DataFrame) -> str | None: |
|
|
n_rows = len(df) |
|
|
|
|
|
|
|
|
name_pattern = re.compile(r'\b(id|identifier|key)\b', re.IGNORECASE) |
|
|
for col in df.columns: |
|
|
if name_pattern.search(col): |
|
|
return col |
|
|
|
|
|
|
|
|
unique_cols = [ |
|
|
col for col in df.columns |
|
|
if df[col].nunique(dropna=False) == n_rows |
|
|
] |
|
|
if not unique_cols: |
|
|
return None |
|
|
|
|
|
|
|
|
non_unnamed = [c for c in unique_cols if not c.startswith("Unnamed")] |
|
|
candidates = non_unnamed or unique_cols |
|
|
|
|
|
|
|
|
for col in candidates: |
|
|
if pd.api.types.is_integer_dtype(df[col]): |
|
|
return col |
|
|
|
|
|
|
|
|
return candidates[0] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def detect_school_type_col( |
|
|
df: pd.DataFrame, |
|
|
*, |
|
|
uniqueness_weight: float = 0.3, |
|
|
content_match_weight: float = 0.4, |
|
|
length_weight: float = 0.2, |
|
|
punct_weight: float = 0.1, |
|
|
name_boosts: dict[str, float] | None = None, |
|
|
value_keywords: set[str] | None = None, |
|
|
min_score: float = 0.40, |
|
|
high_uniqueness_penalty: float = 0.95, |
|
|
return_scores: bool = False, |
|
|
) -> str | None | Tuple[str | None, Dict[str, float]]: |
|
|
""" |
|
|
Analyzes a DataFrame to find the column that most likely represents a 'school type'. |
|
|
|
|
|
The function operates on heuristics based on common characteristics of a school-type col: |
|
|
1. **Content Match**: A significant portion of values match known school types (the strongest signal). |
|
|
2. **Low Uniqueness**: Values are often repeated (e.g., 'Primary', 'All-through'). |
|
|
3. **Short Text**: Entries are typically brief. |
|
|
4. **Minimal Punctuation**: Values are clean strings, not sentences. |
|
|
5. **Header Keywords**: The column name itself is a strong indicator (e.g., 'School Type'). |
|
|
""" |
|
|
|
|
|
if name_boosts is None: |
|
|
name_boosts = {'school': 3.0, 'type': 2.0} |
|
|
|
|
|
|
|
|
if value_keywords is None: |
|
|
value_keywords = { |
|
|
'nursery', 'primary', 'secondary', 'infant', 'junior', |
|
|
'college', 'academy', 'independent', 'special', 'pru', |
|
|
'all-through', 'middle', 'state', 'educator', 'home' |
|
|
} |
|
|
|
|
|
obj_cols = df.select_dtypes(include=["object"]).columns |
|
|
if not obj_cols.size: |
|
|
return (None, {}) if return_scores else None |
|
|
|
|
|
|
|
|
raw_metrics: Dict[str, dict[str, float]] = {} |
|
|
for col in obj_cols: |
|
|
ser = df[col].dropna().astype(str) |
|
|
if ser.empty: |
|
|
continue |
|
|
|
|
|
|
|
|
unique_values = ser.unique() |
|
|
content_match_score = 0.0 |
|
|
if len(unique_values) > 0: |
|
|
match_count = 0 |
|
|
for val in unique_values: |
|
|
|
|
|
if any(keyword in val.lower() for keyword in value_keywords): |
|
|
match_count += 1 |
|
|
content_match_score = match_count / len(unique_values) |
|
|
|
|
|
|
|
|
raw_metrics[col] = { |
|
|
"avg_len": ser.str.len().mean(), |
|
|
"avg_punct": ser.apply(lambda s: sum(c in string.punctuation for c in s)).mean(), |
|
|
"unique_ratio": ser.nunique() / len(ser) if len(ser) > 0 else 0.0, |
|
|
"content_match": content_match_score |
|
|
} |
|
|
|
|
|
if not raw_metrics: |
|
|
return (None, {}) if return_scores else None |
|
|
|
|
|
|
|
|
max_len = _max_or_eps([m["avg_len"] for m in raw_metrics.values()]) |
|
|
max_punc = _max_or_eps([m["avg_punct"] for m in raw_metrics.values()]) |
|
|
|
|
|
|
|
|
scores: Dict[str, float] = {} |
|
|
for col, metrics in raw_metrics.items(): |
|
|
len_score = 1 - _normalise(metrics["avg_len"], max_len) |
|
|
punc_score = 1 - _normalise(metrics["avg_punct"], max_punc) |
|
|
uniq_score = 1 - metrics["unique_ratio"] |
|
|
|
|
|
|
|
|
score = ( |
|
|
content_match_weight * metrics["content_match"] |
|
|
+ uniqueness_weight * uniq_score |
|
|
+ length_weight * len_score |
|
|
+ punct_weight * punc_score |
|
|
) |
|
|
|
|
|
|
|
|
for token, factor in name_boosts.items(): |
|
|
if token in col.lower().strip(): |
|
|
score *= factor |
|
|
|
|
|
|
|
|
if metrics["unique_ratio"] > high_uniqueness_penalty: |
|
|
score *= 0.1 |
|
|
|
|
|
scores[col] = score |
|
|
|
|
|
if not scores: |
|
|
return (None, {}) if return_scores else None |
|
|
|
|
|
best_col, best_score = max(scores.items(), key=lambda item: item[1]) |
|
|
passed = best_score >= min_score |
|
|
|
|
|
if return_scores: |
|
|
return (best_col if passed else None, scores) |
|
|
return best_col if passed else None |
|
|
|
|
|
|
|
|
def main(): |
|
|
|
|
|
df = pd.read_csv('data/raw/new-application-format-data.csv') |
|
|
df.columns = df.columns.str.strip() |
|
|
|
|
|
print("--- Testing Column Detection Functions ---") |
|
|
|
|
|
id_col = detect_id_col(df) |
|
|
freeform_col, freeform_scores = detect_freeform_col(df, return_scores=True) |
|
|
school_type_col, school_type_scores = detect_school_type_col(df, return_scores=True) |
|
|
|
|
|
print(f"\nDetected ID Column: '{id_col}'") |
|
|
print(f"Detected Free-Form Column: '{freeform_col}'") |
|
|
print(f"Detected School Type Column: '{school_type_col}'") |
|
|
print() |
|
|
print("\n--- Free-form Column Scores (Higher is better) ---") |
|
|
if freeform_scores: |
|
|
sorted_scores = sorted(freeform_scores.items(), key=lambda item: item[1], reverse=True) |
|
|
for col, score in sorted_scores: |
|
|
print(f" - {col:<25}: {score:.4f}") |
|
|
else: |
|
|
print("No object columns found to score for freeform col...") |
|
|
|
|
|
|
|
|
print("\n--- School Type Column Scores (Higher is better) ---") |
|
|
if school_type_scores: |
|
|
sorted_scores = sorted(school_type_scores.items(), key=lambda item: item[1], reverse=True) |
|
|
for col, score in sorted_scores: |
|
|
print(f" - {col:<25}: {score:.4f}") |
|
|
else: |
|
|
print("No object columns found to score for career.") |
|
|
|
|
|
if __name__ == '__main__': |
|
|
main() |
|
|
|