""" Data loader for ASD-project corpora (Eigsti, Nadig, Rollins). Reads CHAT (.cha) transcripts with pylangacq and extracts child-level linguistic features for downstream ML / progress tracking. Outputs: data/combined_features.csv -> Eigsti + Nadig (for classification) data/rollins_features.csv -> Rollins (for longitudinal tracking) """ from __future__ import annotations import re from pathlib import Path from typing import Optional import pandas as pd import pylangacq as pla # --------------------------------------------------------------------------- # Paths # --------------------------------------------------------------------------- PROJECT_ROOT = Path(__file__).resolve().parent.parent DATA_DIR = PROJECT_ROOT / "data" EIGSTI_DIR = DATA_DIR / "Eigsti" NADIG_DIR = DATA_DIR / "Nadig" ROLLINS_DIR = DATA_DIR / "Rollins" NYU_EMR_DIR = DATA_DIR / "NYU-Emerson" QUIGLEY_DIR = DATA_DIR / "QuigleyMcNally" FLUSBERG_DIR = DATA_DIR / "Flusberg" # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- _AGE_RE = re.compile(r"^(\d+);(\d*)\.?(\d*)$") def _age_to_months(age_str: Optional[str]) -> Optional[float]: """Convert CHAT age string (e.g. '5;03.10' or '2;08.') to months (float).""" if not age_str: return None age_str = str(age_str).strip() m = _AGE_RE.match(age_str) if not m: return None years = int(m.group(1) or 0) months = int(m.group(2) or 0) days = int(m.group(3) or 0) return years * 12 + months + days / 30.0 def _normalize_group(raw: Optional[str]) -> Optional[str]: """Normalize CHAT group codes to {ASD, DD, TD}.""" if not raw: return None g = str(raw).strip().upper() if g in ("TYP", "TD", "NT", "CONTROL"): return "TD" if g in ("ASD", "AUTISM"): return "ASD" if g in ("DD", "DELAY"): return "DD" return g # leave as-is for anything else def _safe_first(values): """Return first element of a list-like, or None.""" if values is None: return None try: return values[0] except (IndexError, TypeError): return None def _extract_child_participant(reader) -> Optional[object]: """Return the CHI Participant object from the first header, or None.""" headers = reader.headers() if not headers: return None for p in headers[0].participants: if p.code == "CHI": return p return None def _content_tokens(utt) -> list[str]: """Lower-cased word tokens with punctuation removed.""" PUNCT = {".", "?", "!", ",", ";", ":", "+...", "+..", "+/.", "+//.", "+/?"} out = [] for t in utt.tokens or []: w = (t.word or "").lower().strip() if not w or w in PUNCT: continue out.append(w) return out def _count_echolalia(all_utts, window: int = 5, min_tokens: int = 2) -> int: """ Count CHI utterances that *repeat* a recent utterance verbatim. A CHI utterance counts as echolalia when its sequence of content tokens matches the sequence of any utterance (by any speaker, including CHI itself for self-repetition) in the previous `window` utterances. Single-word utterances are excluded because routine "yes"/"no"/"mama" repeats are not clinically meaningful echolalia. References ---------- Prizant, B. M. (1983). Echolalia in autism: Assessment, intervention, and theoretical considerations. *Journal of Child Psychology and Psychiatry, 24*(3), 399-418. """ seqs: list[tuple[str, ...]] = [] # parallel history of token sequences count = 0 for u in all_utts: toks = tuple(_content_tokens(u)) if u.participant == "CHI" and len(toks) >= min_tokens: recent = seqs[-window:] if toks in recent: count += 1 seqs.append(toks) return count def _extract_features(cha_path: Path) -> Optional[dict]: """Extract features from one .cha file. Returns a dict or None if unreadable.""" try: reader = pla.read_chat(str(cha_path)) except Exception: # noqa: BLE001 # Some files use non-standard terminators (e.g. "+!?", "+..."). # Fall back to non-strict parsing before giving up. try: reader = pla.read_chat(str(cha_path), strict=False) except Exception as e: # noqa: BLE001 print(f" [skip] cannot read {cha_path.name}: {e}") return None chi = _extract_child_participant(reader) if chi is None: print(f" [skip] no CHI participant in {cha_path.name}") return None # All utterances (across participants) -> filter CHI all_utts = reader.utterances() chi_utts = [u for u in all_utts if u.participant == "CHI"] total_utt = len(chi_utts) if total_utt == 0: print(f" [skip] no CHI utterances in {cha_path.name}") return None # MLU / TTR via pylangacq (one value per file) mlu_morph = _safe_first(reader.mlu(participant="CHI")) mlu_words = _safe_first(reader.mluw(participant="CHI")) ttr = _safe_first(reader.ttr(participant="CHI")) # Counts from tokens (exclude punctuation tokens) PUNCT = {".", "?", "!", ",", ";", ":", "+...", "+..", "+/.", "+//.", "+/?"} total_words = 0 question_utts = 0 for u in chi_utts: # raw CHI tier text raw = u.tiers.get("CHI", "") if raw.rstrip().endswith("?"): question_utts += 1 for t in u.tokens: w = t.word if not w: continue if w in PUNCT: continue total_words += 1 # Unintelligible + zero vocalizations from raw tier text unintelligible = 0 zero_vocal = 0 vocalization = 0 # &=laugh, &=gasp, &=cough... for u in chi_utts: raw = u.tiers.get("CHI", "").strip() # zero vocalization: line is just "0 ." or "0." stripped = raw.rstrip(" .?!").strip() if stripped == "0": zero_vocal += 1 # xxx / yyy markers (unintelligible / phonological coding) if re.search(r"\bxxx\b|\byyy\b", raw): unintelligible += 1 # non-verbal vocalizations &=gasp etc. if re.search(r"&=[A-Za-z]+", raw): vocalization += 1 age_months = _age_to_months(chi.age) # Echolalia: CHI utterance verbatim-matches a recent utterance echolalia_count = _count_echolalia(all_utts) return { "participant_id": cha_path.stem, "group_header": _normalize_group(chi.group), "sex": chi.sex or None, "age_months": round(age_months, 2) if age_months is not None else None, "total_utterances": total_utt, "mlu": round(mlu_morph, 3) if mlu_morph is not None else None, "mluw": round(mlu_words, 3) if mlu_words is not None else None, "ttr": round(ttr, 4) if ttr is not None else None, "total_words": total_words, "unintelligible_count": unintelligible, "unintelligible_ratio": round(unintelligible / total_utt, 4), "zero_vocalization_count": zero_vocal, "nonverbal_vocalization_count": vocalization, "question_ratio": round(question_utts / total_utt, 4), "echolalia_count": echolalia_count, "echolalia_ratio": round(echolalia_count / total_utt, 4), } # --------------------------------------------------------------------------- # Corpus loaders # --------------------------------------------------------------------------- def load_eigsti() -> pd.DataFrame: """Eigsti: labels come from subfolder (ASD / DD / TD), verified with @ID header.""" print("\n[Eigsti] loading...") rows = [] for subgroup_dir in sorted(p for p in EIGSTI_DIR.iterdir() if p.is_dir()): folder_label = subgroup_dir.name # ASD / DD / TD for cha in sorted(subgroup_dir.glob("*.cha")): feats = _extract_features(cha) if feats is None: continue # folder label is authoritative for Eigsti feats["group"] = _normalize_group(folder_label) or folder_label feats["corpus"] = "eigsti" rows.append(feats) df = pd.DataFrame(rows) print(f"[Eigsti] {len(df)} files loaded.") return df def load_nadig() -> pd.DataFrame: """Nadig: labels from @ID header (mixed ASD + TYP despite 0types.txt).""" print("\n[Nadig] loading...") rows = [] for cha in sorted(NADIG_DIR.glob("*.cha")): feats = _extract_features(cha) if feats is None: continue # header group is authoritative for Nadig feats["group"] = feats["group_header"] or "ASD" feats["corpus"] = "nadig" rows.append(feats) df = pd.DataFrame(rows) print(f"[Nadig] {len(df)} files loaded.") return df def load_rollins() -> pd.DataFrame: """Rollins: longitudinal ASD, one subfolder per child. session_order from filename.""" print("\n[Rollins] loading...") rows = [] for child_dir in sorted(p for p in ROLLINS_DIR.iterdir() if p.is_dir()): child_name = child_dir.name cha_files = sorted(child_dir.glob("*.cha"), key=lambda p: p.stem) for order, cha in enumerate(cha_files, start=1): feats = _extract_features(cha) if feats is None: continue feats["child"] = child_name feats["session_id"] = cha.stem # e.g. "020800" feats["session_order"] = order # 1, 2, 3, ... # Corpus ships as all ASD feats["group"] = feats["group_header"] or "ASD" feats["corpus"] = "rollins" rows.append(feats) df = pd.DataFrame(rows) print(f"[Rollins] {len(df)} sessions loaded.") return df def load_nyu_emerson() -> pd.DataFrame: """NYU-Emerson: 30 ASD children with audio/video. Flat structure.""" print("\n[NYU-Emerson] loading...") rows = [] for cha in sorted(NYU_EMR_DIR.glob("*.cha")): feats = _extract_features(cha) if feats is None: continue # All NYU-Emerson are ASD feats["group"] = "ASD" feats["corpus"] = "nyu_emerson" rows.append(feats) df = pd.DataFrame(rows) print(f"[NYU-Emerson] {len(df)} files loaded.") return df def load_quigley_classification() -> pd.DataFrame: """QuigleyMcNally: HR=ASD (10 children), LR=TD (9 children). Use session 1 only.""" print("\n[QuigleyMcNally - Classification] loading...") rows = [] # HR folder = High Risk = ASD hr_dir = QUIGLEY_DIR / "HR" for child_dir in sorted(p for p in hr_dir.iterdir() if p.is_dir()): child_name = child_dir.name cha_files = sorted(child_dir.glob("*.cha"), key=lambda p: p.stem) if not cha_files: continue # Use first session only to avoid repeated measures cha = cha_files[0] feats = _extract_features(cha) if feats is None: continue feats["child"] = child_name feats["group"] = "ASD" feats["corpus"] = "quigley" rows.append(feats) # LR folder = Low Risk = TD lr_dir = QUIGLEY_DIR / "LR" for child_dir in sorted(p for p in lr_dir.iterdir() if p.is_dir()): child_name = child_dir.name cha_files = sorted(child_dir.glob("*.cha"), key=lambda p: p.stem) if not cha_files: continue cha = cha_files[0] feats = _extract_features(cha) if feats is None: continue feats["child"] = child_name feats["group"] = "TD" feats["corpus"] = "quigley" rows.append(feats) df = pd.DataFrame(rows) print(f"[QuigleyMcNally] {len(df)} children loaded (session 1 only).") return df def load_quigley_progress() -> pd.DataFrame: """QuigleyMcNally: All sessions for longitudinal analysis.""" print("\n[QuigleyMcNally - Progress] loading...") rows = [] # HR folder = ASD hr_dir = QUIGLEY_DIR / "HR" for child_dir in sorted(p for p in hr_dir.iterdir() if p.is_dir()): child_name = child_dir.name cha_files = sorted(child_dir.glob("*.cha"), key=lambda p: p.stem) for order, cha in enumerate(cha_files, start=1): feats = _extract_features(cha) if feats is None: continue feats["child"] = child_name feats["session_id"] = cha.stem feats["session_order"] = order feats["group"] = "ASD" feats["corpus"] = "quigley" rows.append(feats) # LR folder = TD lr_dir = QUIGLEY_DIR / "LR" for child_dir in sorted(p for p in lr_dir.iterdir() if p.is_dir()): child_name = child_dir.name cha_files = sorted(child_dir.glob("*.cha"), key=lambda p: p.stem) for order, cha in enumerate(cha_files, start=1): feats = _extract_features(cha) if feats is None: continue feats["child"] = child_name feats["session_id"] = cha.stem feats["session_order"] = order feats["group"] = "TD" feats["corpus"] = "quigley" rows.append(feats) df = pd.DataFrame(rows) print(f"[QuigleyMcNally] {len(df)} sessions loaded (longitudinal).") return df def load_flusberg_classification() -> pd.DataFrame: """Flusberg: 6 ASD children, use session 1 only for classification.""" print("\n[Flusberg - Classification] loading...") rows = [] for child_dir in sorted(p for p in FLUSBERG_DIR.iterdir() if p.is_dir()): child_name = child_dir.name cha_files = sorted(child_dir.glob("*.cha"), key=lambda p: p.stem) if not cha_files: continue cha = cha_files[0] feats = _extract_features(cha) if feats is None: continue feats["child"] = child_name feats["group"] = "ASD" feats["corpus"] = "flusberg" rows.append(feats) df = pd.DataFrame(rows) print(f"[Flusberg] {len(df)} children loaded (session 1 only).") return df def load_flusberg_progress() -> pd.DataFrame: """Flusberg: All sessions for longitudinal analysis.""" print("\n[Flusberg - Progress] loading...") rows = [] for child_dir in sorted(p for p in FLUSBERG_DIR.iterdir() if p.is_dir()): child_name = child_dir.name cha_files = sorted(child_dir.glob("*.cha"), key=lambda p: p.stem) for order, cha in enumerate(cha_files, start=1): feats = _extract_features(cha) if feats is None: continue feats["child"] = child_name feats["session_id"] = cha.stem feats["session_order"] = order feats["group"] = "ASD" feats["corpus"] = "flusberg" rows.append(feats) df = pd.DataFrame(rows) print(f"[Flusberg] {len(df)} sessions loaded (longitudinal).") return df # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def main() -> None: # Load all corpora eigsti_df = load_eigsti() nadig_df = load_nadig() rollins_df = load_rollins() nyu_df = load_nyu_emerson() quigley_cls_df = load_quigley_classification() quigley_prog_df = load_quigley_progress() flusberg_cls_df = load_flusberg_classification() flusberg_prog_df = load_flusberg_progress() # Combine classification datasets combined_df = pd.concat([ eigsti_df, nadig_df, nyu_df, quigley_cls_df, flusberg_cls_df ], ignore_index=True) # Combine longitudinal datasets longitudinal_df = pd.concat([ rollins_df, quigley_prog_df, flusberg_prog_df ], ignore_index=True) # Column ordering for the classification CSV combined_cols = [ "participant_id", "corpus", "group", "group_header", "sex", "age_months", "total_utterances", "mlu", "mluw", "ttr", "total_words", "unintelligible_count", "unintelligible_ratio", "zero_vocalization_count", "nonverbal_vocalization_count", "question_ratio", "echolalia_count", "echolalia_ratio", ] combined_df = combined_df[combined_cols] # Column ordering for longitudinal CSV longitudinal_cols = [ "child", "session_id", "session_order", "participant_id", "corpus", "group", "group_header", "sex", "age_months", "total_utterances", "mlu", "mluw", "ttr", "total_words", "unintelligible_count", "unintelligible_ratio", "zero_vocalization_count", "nonverbal_vocalization_count", "question_ratio", "echolalia_count", "echolalia_ratio", ] longitudinal_df = longitudinal_df[longitudinal_cols] # Save outputs combined_path = DATA_DIR / "combined_features.csv" longitudinal_path = DATA_DIR / "longitudinal_features.csv" combined_df.to_csv(combined_path, index=False) longitudinal_df.to_csv(longitudinal_path, index=False) print("\n" + "=" * 72) print(f"Saved: {combined_path.relative_to(PROJECT_ROOT)} ({len(combined_df)} rows)") print(f"Saved: {longitudinal_path.relative_to(PROJECT_ROOT)} ({len(longitudinal_df)} rows)") print("=" * 72) print("\n--- combined_features.csv (head) ---") print(combined_df.head(10).to_string(index=False)) print("\nGroup distribution in combined:") print(combined_df.groupby(["corpus", "group"]).size()) print("\n--- longitudinal_features.csv (head) ---") print(longitudinal_df.head(10).to_string(index=False)) print("\nSessions per child (longitudinal):") print(longitudinal_df.groupby(["corpus", "child"]).size().sort_index()) if __name__ == "__main__": main()