asd-screening-tool / src /data_loader.py
Paoo
deploy: publish sanitized v0.17 streamlit app
8514865
"""
Data loader for ASD-project corpora (Eigsti, Nadig, Rollins).
Reads CHAT (.cha) transcripts with pylangacq and extracts child-level
linguistic features for downstream ML / progress tracking.
Outputs:
data/combined_features.csv -> Eigsti + Nadig (for classification)
data/rollins_features.csv -> Rollins (for longitudinal tracking)
"""
from __future__ import annotations
import re
from pathlib import Path
from typing import Optional
import pandas as pd
import pylangacq as pla
# ---------------------------------------------------------------------------
# Paths
# ---------------------------------------------------------------------------
PROJECT_ROOT = Path(__file__).resolve().parent.parent
DATA_DIR = PROJECT_ROOT / "data"
EIGSTI_DIR = DATA_DIR / "Eigsti"
NADIG_DIR = DATA_DIR / "Nadig"
ROLLINS_DIR = DATA_DIR / "Rollins"
NYU_EMR_DIR = DATA_DIR / "NYU-Emerson"
QUIGLEY_DIR = DATA_DIR / "QuigleyMcNally"
FLUSBERG_DIR = DATA_DIR / "Flusberg"
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
_AGE_RE = re.compile(r"^(\d+);(\d*)\.?(\d*)$")
def _age_to_months(age_str: Optional[str]) -> Optional[float]:
"""Convert CHAT age string (e.g. '5;03.10' or '2;08.') to months (float)."""
if not age_str:
return None
age_str = str(age_str).strip()
m = _AGE_RE.match(age_str)
if not m:
return None
years = int(m.group(1) or 0)
months = int(m.group(2) or 0)
days = int(m.group(3) or 0)
return years * 12 + months + days / 30.0
def _normalize_group(raw: Optional[str]) -> Optional[str]:
"""Normalize CHAT group codes to {ASD, DD, TD}."""
if not raw:
return None
g = str(raw).strip().upper()
if g in ("TYP", "TD", "NT", "CONTROL"):
return "TD"
if g in ("ASD", "AUTISM"):
return "ASD"
if g in ("DD", "DELAY"):
return "DD"
return g # leave as-is for anything else
def _safe_first(values):
"""Return first element of a list-like, or None."""
if values is None:
return None
try:
return values[0]
except (IndexError, TypeError):
return None
def _extract_child_participant(reader) -> Optional[object]:
"""Return the CHI Participant object from the first header, or None."""
headers = reader.headers()
if not headers:
return None
for p in headers[0].participants:
if p.code == "CHI":
return p
return None
def _content_tokens(utt) -> list[str]:
"""Lower-cased word tokens with punctuation removed."""
PUNCT = {".", "?", "!", ",", ";", ":", "+...", "+..", "+/.", "+//.", "+/?"}
out = []
for t in utt.tokens or []:
w = (t.word or "").lower().strip()
if not w or w in PUNCT:
continue
out.append(w)
return out
def _count_echolalia(all_utts, window: int = 5, min_tokens: int = 2) -> int:
"""
Count CHI utterances that *repeat* a recent utterance verbatim.
A CHI utterance counts as echolalia when its sequence of content tokens
matches the sequence of any utterance (by any speaker, including CHI
itself for self-repetition) in the previous `window` utterances.
Single-word utterances are excluded because routine "yes"/"no"/"mama"
repeats are not clinically meaningful echolalia.
References
----------
Prizant, B. M. (1983). Echolalia in autism: Assessment, intervention, and
theoretical considerations. *Journal of Child Psychology and Psychiatry,
24*(3), 399-418.
"""
seqs: list[tuple[str, ...]] = [] # parallel history of token sequences
count = 0
for u in all_utts:
toks = tuple(_content_tokens(u))
if u.participant == "CHI" and len(toks) >= min_tokens:
recent = seqs[-window:]
if toks in recent:
count += 1
seqs.append(toks)
return count
def _extract_features(cha_path: Path) -> Optional[dict]:
"""Extract features from one .cha file. Returns a dict or None if unreadable."""
try:
reader = pla.read_chat(str(cha_path))
except Exception: # noqa: BLE001
# Some files use non-standard terminators (e.g. "+!?", "+...").
# Fall back to non-strict parsing before giving up.
try:
reader = pla.read_chat(str(cha_path), strict=False)
except Exception as e: # noqa: BLE001
print(f" [skip] cannot read {cha_path.name}: {e}")
return None
chi = _extract_child_participant(reader)
if chi is None:
print(f" [skip] no CHI participant in {cha_path.name}")
return None
# All utterances (across participants) -> filter CHI
all_utts = reader.utterances()
chi_utts = [u for u in all_utts if u.participant == "CHI"]
total_utt = len(chi_utts)
if total_utt == 0:
print(f" [skip] no CHI utterances in {cha_path.name}")
return None
# MLU / TTR via pylangacq (one value per file)
mlu_morph = _safe_first(reader.mlu(participant="CHI"))
mlu_words = _safe_first(reader.mluw(participant="CHI"))
ttr = _safe_first(reader.ttr(participant="CHI"))
# Counts from tokens (exclude punctuation tokens)
PUNCT = {".", "?", "!", ",", ";", ":", "+...", "+..", "+/.", "+//.", "+/?"}
total_words = 0
question_utts = 0
for u in chi_utts:
# raw CHI tier text
raw = u.tiers.get("CHI", "")
if raw.rstrip().endswith("?"):
question_utts += 1
for t in u.tokens:
w = t.word
if not w:
continue
if w in PUNCT:
continue
total_words += 1
# Unintelligible + zero vocalizations from raw tier text
unintelligible = 0
zero_vocal = 0
vocalization = 0 # &=laugh, &=gasp, &=cough...
for u in chi_utts:
raw = u.tiers.get("CHI", "").strip()
# zero vocalization: line is just "0 ." or "0."
stripped = raw.rstrip(" .?!").strip()
if stripped == "0":
zero_vocal += 1
# xxx / yyy markers (unintelligible / phonological coding)
if re.search(r"\bxxx\b|\byyy\b", raw):
unintelligible += 1
# non-verbal vocalizations &=gasp etc.
if re.search(r"&=[A-Za-z]+", raw):
vocalization += 1
age_months = _age_to_months(chi.age)
# Echolalia: CHI utterance verbatim-matches a recent utterance
echolalia_count = _count_echolalia(all_utts)
return {
"participant_id": cha_path.stem,
"group_header": _normalize_group(chi.group),
"sex": chi.sex or None,
"age_months": round(age_months, 2) if age_months is not None else None,
"total_utterances": total_utt,
"mlu": round(mlu_morph, 3) if mlu_morph is not None else None,
"mluw": round(mlu_words, 3) if mlu_words is not None else None,
"ttr": round(ttr, 4) if ttr is not None else None,
"total_words": total_words,
"unintelligible_count": unintelligible,
"unintelligible_ratio": round(unintelligible / total_utt, 4),
"zero_vocalization_count": zero_vocal,
"nonverbal_vocalization_count": vocalization,
"question_ratio": round(question_utts / total_utt, 4),
"echolalia_count": echolalia_count,
"echolalia_ratio": round(echolalia_count / total_utt, 4),
}
# ---------------------------------------------------------------------------
# Corpus loaders
# ---------------------------------------------------------------------------
def load_eigsti() -> pd.DataFrame:
"""Eigsti: labels come from subfolder (ASD / DD / TD), verified with @ID header."""
print("\n[Eigsti] loading...")
rows = []
for subgroup_dir in sorted(p for p in EIGSTI_DIR.iterdir() if p.is_dir()):
folder_label = subgroup_dir.name # ASD / DD / TD
for cha in sorted(subgroup_dir.glob("*.cha")):
feats = _extract_features(cha)
if feats is None:
continue
# folder label is authoritative for Eigsti
feats["group"] = _normalize_group(folder_label) or folder_label
feats["corpus"] = "eigsti"
rows.append(feats)
df = pd.DataFrame(rows)
print(f"[Eigsti] {len(df)} files loaded.")
return df
def load_nadig() -> pd.DataFrame:
"""Nadig: labels from @ID header (mixed ASD + TYP despite 0types.txt)."""
print("\n[Nadig] loading...")
rows = []
for cha in sorted(NADIG_DIR.glob("*.cha")):
feats = _extract_features(cha)
if feats is None:
continue
# header group is authoritative for Nadig
feats["group"] = feats["group_header"] or "ASD"
feats["corpus"] = "nadig"
rows.append(feats)
df = pd.DataFrame(rows)
print(f"[Nadig] {len(df)} files loaded.")
return df
def load_rollins() -> pd.DataFrame:
"""Rollins: longitudinal ASD, one subfolder per child. session_order from filename."""
print("\n[Rollins] loading...")
rows = []
for child_dir in sorted(p for p in ROLLINS_DIR.iterdir() if p.is_dir()):
child_name = child_dir.name
cha_files = sorted(child_dir.glob("*.cha"), key=lambda p: p.stem)
for order, cha in enumerate(cha_files, start=1):
feats = _extract_features(cha)
if feats is None:
continue
feats["child"] = child_name
feats["session_id"] = cha.stem # e.g. "020800"
feats["session_order"] = order # 1, 2, 3, ...
# Corpus ships as all ASD
feats["group"] = feats["group_header"] or "ASD"
feats["corpus"] = "rollins"
rows.append(feats)
df = pd.DataFrame(rows)
print(f"[Rollins] {len(df)} sessions loaded.")
return df
def load_nyu_emerson() -> pd.DataFrame:
"""NYU-Emerson: 30 ASD children with audio/video. Flat structure."""
print("\n[NYU-Emerson] loading...")
rows = []
for cha in sorted(NYU_EMR_DIR.glob("*.cha")):
feats = _extract_features(cha)
if feats is None:
continue
# All NYU-Emerson are ASD
feats["group"] = "ASD"
feats["corpus"] = "nyu_emerson"
rows.append(feats)
df = pd.DataFrame(rows)
print(f"[NYU-Emerson] {len(df)} files loaded.")
return df
def load_quigley_classification() -> pd.DataFrame:
"""QuigleyMcNally: HR=ASD (10 children), LR=TD (9 children). Use session 1 only."""
print("\n[QuigleyMcNally - Classification] loading...")
rows = []
# HR folder = High Risk = ASD
hr_dir = QUIGLEY_DIR / "HR"
for child_dir in sorted(p for p in hr_dir.iterdir() if p.is_dir()):
child_name = child_dir.name
cha_files = sorted(child_dir.glob("*.cha"), key=lambda p: p.stem)
if not cha_files:
continue
# Use first session only to avoid repeated measures
cha = cha_files[0]
feats = _extract_features(cha)
if feats is None:
continue
feats["child"] = child_name
feats["group"] = "ASD"
feats["corpus"] = "quigley"
rows.append(feats)
# LR folder = Low Risk = TD
lr_dir = QUIGLEY_DIR / "LR"
for child_dir in sorted(p for p in lr_dir.iterdir() if p.is_dir()):
child_name = child_dir.name
cha_files = sorted(child_dir.glob("*.cha"), key=lambda p: p.stem)
if not cha_files:
continue
cha = cha_files[0]
feats = _extract_features(cha)
if feats is None:
continue
feats["child"] = child_name
feats["group"] = "TD"
feats["corpus"] = "quigley"
rows.append(feats)
df = pd.DataFrame(rows)
print(f"[QuigleyMcNally] {len(df)} children loaded (session 1 only).")
return df
def load_quigley_progress() -> pd.DataFrame:
"""QuigleyMcNally: All sessions for longitudinal analysis."""
print("\n[QuigleyMcNally - Progress] loading...")
rows = []
# HR folder = ASD
hr_dir = QUIGLEY_DIR / "HR"
for child_dir in sorted(p for p in hr_dir.iterdir() if p.is_dir()):
child_name = child_dir.name
cha_files = sorted(child_dir.glob("*.cha"), key=lambda p: p.stem)
for order, cha in enumerate(cha_files, start=1):
feats = _extract_features(cha)
if feats is None:
continue
feats["child"] = child_name
feats["session_id"] = cha.stem
feats["session_order"] = order
feats["group"] = "ASD"
feats["corpus"] = "quigley"
rows.append(feats)
# LR folder = TD
lr_dir = QUIGLEY_DIR / "LR"
for child_dir in sorted(p for p in lr_dir.iterdir() if p.is_dir()):
child_name = child_dir.name
cha_files = sorted(child_dir.glob("*.cha"), key=lambda p: p.stem)
for order, cha in enumerate(cha_files, start=1):
feats = _extract_features(cha)
if feats is None:
continue
feats["child"] = child_name
feats["session_id"] = cha.stem
feats["session_order"] = order
feats["group"] = "TD"
feats["corpus"] = "quigley"
rows.append(feats)
df = pd.DataFrame(rows)
print(f"[QuigleyMcNally] {len(df)} sessions loaded (longitudinal).")
return df
def load_flusberg_classification() -> pd.DataFrame:
"""Flusberg: 6 ASD children, use session 1 only for classification."""
print("\n[Flusberg - Classification] loading...")
rows = []
for child_dir in sorted(p for p in FLUSBERG_DIR.iterdir() if p.is_dir()):
child_name = child_dir.name
cha_files = sorted(child_dir.glob("*.cha"), key=lambda p: p.stem)
if not cha_files:
continue
cha = cha_files[0]
feats = _extract_features(cha)
if feats is None:
continue
feats["child"] = child_name
feats["group"] = "ASD"
feats["corpus"] = "flusberg"
rows.append(feats)
df = pd.DataFrame(rows)
print(f"[Flusberg] {len(df)} children loaded (session 1 only).")
return df
def load_flusberg_progress() -> pd.DataFrame:
"""Flusberg: All sessions for longitudinal analysis."""
print("\n[Flusberg - Progress] loading...")
rows = []
for child_dir in sorted(p for p in FLUSBERG_DIR.iterdir() if p.is_dir()):
child_name = child_dir.name
cha_files = sorted(child_dir.glob("*.cha"), key=lambda p: p.stem)
for order, cha in enumerate(cha_files, start=1):
feats = _extract_features(cha)
if feats is None:
continue
feats["child"] = child_name
feats["session_id"] = cha.stem
feats["session_order"] = order
feats["group"] = "ASD"
feats["corpus"] = "flusberg"
rows.append(feats)
df = pd.DataFrame(rows)
print(f"[Flusberg] {len(df)} sessions loaded (longitudinal).")
return df
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main() -> None:
# Load all corpora
eigsti_df = load_eigsti()
nadig_df = load_nadig()
rollins_df = load_rollins()
nyu_df = load_nyu_emerson()
quigley_cls_df = load_quigley_classification()
quigley_prog_df = load_quigley_progress()
flusberg_cls_df = load_flusberg_classification()
flusberg_prog_df = load_flusberg_progress()
# Combine classification datasets
combined_df = pd.concat([
eigsti_df, nadig_df, nyu_df,
quigley_cls_df, flusberg_cls_df
], ignore_index=True)
# Combine longitudinal datasets
longitudinal_df = pd.concat([
rollins_df, quigley_prog_df, flusberg_prog_df
], ignore_index=True)
# Column ordering for the classification CSV
combined_cols = [
"participant_id", "corpus", "group", "group_header",
"sex", "age_months",
"total_utterances", "mlu", "mluw", "ttr", "total_words",
"unintelligible_count", "unintelligible_ratio",
"zero_vocalization_count", "nonverbal_vocalization_count",
"question_ratio",
"echolalia_count", "echolalia_ratio",
]
combined_df = combined_df[combined_cols]
# Column ordering for longitudinal CSV
longitudinal_cols = [
"child", "session_id", "session_order",
"participant_id", "corpus", "group", "group_header",
"sex", "age_months",
"total_utterances", "mlu", "mluw", "ttr", "total_words",
"unintelligible_count", "unintelligible_ratio",
"zero_vocalization_count", "nonverbal_vocalization_count",
"question_ratio",
"echolalia_count", "echolalia_ratio",
]
longitudinal_df = longitudinal_df[longitudinal_cols]
# Save outputs
combined_path = DATA_DIR / "combined_features.csv"
longitudinal_path = DATA_DIR / "longitudinal_features.csv"
combined_df.to_csv(combined_path, index=False)
longitudinal_df.to_csv(longitudinal_path, index=False)
print("\n" + "=" * 72)
print(f"Saved: {combined_path.relative_to(PROJECT_ROOT)} ({len(combined_df)} rows)")
print(f"Saved: {longitudinal_path.relative_to(PROJECT_ROOT)} ({len(longitudinal_df)} rows)")
print("=" * 72)
print("\n--- combined_features.csv (head) ---")
print(combined_df.head(10).to_string(index=False))
print("\nGroup distribution in combined:")
print(combined_df.groupby(["corpus", "group"]).size())
print("\n--- longitudinal_features.csv (head) ---")
print(longitudinal_df.head(10).to_string(index=False))
print("\nSessions per child (longitudinal):")
print(longitudinal_df.groupby(["corpus", "child"]).size().sort_index())
if __name__ == "__main__":
main()