Spaces:
Running
Running
tpriyadata
feat(preprocessing): ADNI loader complete β 24,916 clinical records in Pinecone
344ca85 | """ | |
| preprocessing/adni_loader.py | |
| ============================= | |
| ALZDETECT-AI β Enterprise ADNI Clinical Data Loader. | |
| WHAT: Loads 9 ADNI CSV files, joins on subject_id + visit, | |
| validates every field with Pydantic, converts each | |
| patient-visit into a text chunk, upserts to Pinecone. | |
| WHY: Adds real patient clinical data alongside PubMed chunks. | |
| Claude can now cite actual patient measurements not just | |
| published literature. | |
| WHO: Called by scripts/run_pipeline.py after PubMed pipeline. | |
| WHERE: Reads data/adni/*.csv β upserts to Pinecone alzdetect index. | |
| WHEN: Once per plan. After PubMed chunks already in Pinecone. | |
| FILES USED: | |
| 1. My_Table β master record (subject + visit + diagnosis) | |
| 2. MMSE β cognitive scores (MMSCORE 0-30) | |
| 3. ADAS β cognitive assessment (TOTSCORE 0-70) | |
| 4. MOCA β Montreal cognitive assessment (MOCATOTS 0-30) | |
| 5. CDR β clinical dementia rating (CDRSB) | |
| 6. APOERES β APOE genotype (genetic risk) | |
| 7. LILLY β pTau217 blood biomarker | |
| 8. UPENNBIOMK β CSF biomarkers (Abeta42, pTau, tTau) | |
| 9. Key_MRI β brain volume measurements | |
| WORST-CASE DESIGN: | |
| - Missing values (-4, "", NA, NOT DONE) β None, never crash | |
| - MMSCORE=-1 found in data β rejected by validator | |
| - RID not zero-padded β normalized automatically | |
| - LILLY COMMENT legal text β stripped before chunking | |
| - Join produces no match β subject still included | |
| - Pinecone upsert fails β retry 3 times with backoff | |
| - File not found β clear error, pipeline stops | |
| """ | |
| import json | |
| import time | |
| import re | |
| from pathlib import Path | |
| from typing import Optional | |
| from enum import Enum | |
| import pandas as pd | |
| import numpy as np | |
| from pydantic import BaseModel, Field, field_validator, model_validator | |
| from loguru import logger | |
| from tqdm import tqdm | |
| from configs.settings import get_settings | |
| # ββ Constants βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # ADNI missing value sentinels β all become None | |
| MISSING_VALUES = {"-4", "-4.0", "", "NA", "N/A", "nan", | |
| "NaN", "NOT DONE", "None", "NONE"} | |
| # File names | |
| FILES = { | |
| "master": "All_Subjects_My_Table_17Apr2026.csv", | |
| "mmse": "All_Subjects_MMSE_17Apr2026.csv", | |
| "adas": "All_Subjects_ADAS_17Apr2026.csv", | |
| "moca": "All_Subjects_MOCA_17Apr2026.csv", | |
| "cdr": "All_Subjects_CDR_17Apr2026.csv", | |
| "apoe": "All_Subjects_APOERES_17Apr2026.csv", | |
| "lilly": "All_Subjects_LILLY_PTAU217_MSD600_17Apr2026.csv", | |
| "csf": "All_Subjects_UPENNBIOMK_ROCHE_ELECSYS_17Apr2026.csv", | |
| "mri": "All_Subjects_Key_MRI_17Apr2026.csv", | |
| } | |
| # ββ Diagnosis enum ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class ADNIDiagnosis(str, Enum): | |
| CN = "CN" # Cognitively Normal | |
| MCI = "MCI" # Mild Cognitive Impairment | |
| AD = "AD" # Alzheimer's Disease | |
| UNKNOWN = "Unknown" | |
| # ββ APOE genotype enum ββββββββββββββββββββββββββββββββββββββββββββ | |
| class APOEGenotype(str, Enum): | |
| E2E3 = "APOE2/APOE3" # protective | |
| E3E3 = "APOE3/APOE3" # neutral | |
| E3E4 = "APOE3/APOE4" # one risk allele | |
| E4E4 = "APOE4/APOE4" # two risk alleles β highest risk | |
| E2E4 = "APOE2/APOE4" # mixed | |
| E2E2 = "APOE2/APOE2" # rare protective | |
| UNKNOWN = "Unknown" | |
| # ββ Helper functions ββββββββββββββββββββββββββββββββββββββββββββββ | |
| def is_missing(v) -> bool: | |
| """Check if a value is any form of ADNI missing sentinel.""" | |
| return str(v).strip() in MISSING_VALUES | |
| def safe_float(v) -> Optional[float]: | |
| """ | |
| Convert to float β return None for missing or invalid. | |
| Handles -4, "", NA, nan (pandas join produces nan for no-match rows). | |
| """ | |
| if v is None: | |
| return None | |
| # Handle pandas nan β produced when join finds no match | |
| try: | |
| import math | |
| if isinstance(v, float) and math.isnan(v): | |
| return None | |
| except (TypeError, ValueError): | |
| pass | |
| if is_missing(str(v).strip()): | |
| return None | |
| try: | |
| f = float(str(v).strip()) | |
| if f == -4.0 or (isinstance(f, float) and math.isnan(f)): | |
| return None | |
| return f | |
| except (ValueError, TypeError): | |
| return None | |
| def safe_int(v) -> Optional[int]: | |
| """Convert to int β return None for missing or invalid.""" | |
| f = safe_float(v) | |
| if f is None: | |
| return None | |
| return int(f) | |
| def normalize_rid(v) -> str: | |
| """ | |
| Zero-pad RID to 4 digits. | |
| RID=2 β '0002' | RID=1412 β '1412' | |
| """ | |
| try: | |
| return str(int(str(v).strip())).zfill(4) | |
| except (ValueError, TypeError): | |
| return "0000" | |
| def normalize_viscode(v) -> str: | |
| """Normalize visit code β lowercase and strip.""" | |
| return str(v).strip().lower() | |
| # ββ Pydantic model β one ADNI patient visit βββββββββββββββββββββββ | |
| class ADNIRecord(BaseModel): | |
| """ | |
| One validated ADNI patient-visit record. | |
| Analogy: One complete patient file after all lab results | |
| are attached to the admission form. Every field validated | |
| before the file goes to the library (Pinecone). | |
| WORST-CASE FIELDS: | |
| All numeric fields β Optional β ADNI uses -4 for missing | |
| diagnosis β defaults to Unknown if missing | |
| genotype β defaults to Unknown if not in APOERES | |
| """ | |
| # Identity | |
| subject_id: str = Field(..., description="PTID e.g. 002_S_0295") | |
| rid: str = Field(..., description="Zero-padded RID e.g. 0295") | |
| visit: str = Field(..., description="Visit code e.g. bl m06") | |
| year: Optional[int] = Field(default=None) | |
| # Diagnosis | |
| diagnosis: ADNIDiagnosis = Field(default=ADNIDiagnosis.UNKNOWN) | |
| # Cognitive scores | |
| mmse: Optional[int] = Field(default=None, description="MMSE 0-30") | |
| adas_cog11: Optional[float] = Field(default=None, description="ADAS-Cog11 0-70") | |
| adas_cog13: Optional[float] = Field(default=None, description="ADAS-Cog13 0-85") | |
| moca: Optional[int] = Field(default=None, description="MoCA 0-30") | |
| cdr_sb: Optional[float] = Field(default=None, description="CDR Sum of Boxes") | |
| # Genetics | |
| apoe_genotype: APOEGenotype = Field(default=APOEGenotype.UNKNOWN) | |
| # Blood biomarker | |
| ptau217: Optional[float] = Field(default=None, description="pTau217 pg/mL") | |
| # CSF biomarkers | |
| csf_abeta42: Optional[float] = Field(default=None, description="CSF Abeta42 pg/mL") | |
| csf_ptau: Optional[float] = Field(default=None, description="CSF pTau pg/mL") | |
| csf_ttau: Optional[float] = Field(default=None, description="CSF tTau pg/mL") | |
| # MRI | |
| hippocampus: Optional[float] = Field(default=None, description="Hippocampal volume mm3") | |
| entorhinal: Optional[float] = Field(default=None, description="Entorhinal cortex mm") | |
| # Metadata | |
| source: str = Field(default="adni") | |
| def pad_rid(cls, v: str) -> str: | |
| return normalize_rid(v) | |
| def clean_visit(cls, v: str) -> str: | |
| return normalize_viscode(v) | |
| def validate_mmse(cls, v: Optional[int]) -> Optional[int]: | |
| """ | |
| MMSE must be 0-30. | |
| We found min=-1 in data β reject anything below 0. | |
| """ | |
| if v is None: | |
| return None | |
| if not (0 <= v <= 30): | |
| logger.debug(f"[ADNI] MMSE {v} out of range [0,30] β setting None") | |
| return None | |
| return v | |
| def validate_adas11(cls, v: Optional[float]) -> Optional[float]: | |
| """ADAS-Cog11 must be 0-70.""" | |
| if v is None: | |
| return None | |
| if not (0 <= v <= 70): | |
| logger.debug(f"[ADNI] ADAS-Cog11 {v} out of range [0,70] β setting None") | |
| return None | |
| return v | |
| def validate_moca(cls, v: Optional[int]) -> Optional[int]: | |
| """MoCA must be 0-30.""" | |
| if v is None: | |
| return None | |
| if not (0 <= v <= 30): | |
| logger.debug(f"[ADNI] MoCA {v} out of range [0,30] β setting None") | |
| return None | |
| return v | |
| def to_chunk_text(self) -> str: | |
| """ | |
| Convert this record into a readable text chunk | |
| for embedding and RAG retrieval. | |
| This is what Claude will read when answering questions. | |
| Every field present = richer context for Claude. | |
| """ | |
| lines = [ | |
| f"ADNI Clinical Record", | |
| f"Subject: {self.subject_id} | Visit: {self.visit} | " | |
| f"Year: {self.year or 'unknown'}", | |
| f"Diagnosis: {self.diagnosis.value}", | |
| ] | |
| # Cognitive scores | |
| cog_parts = [] | |
| if self.mmse is not None: | |
| cog_parts.append(f"MMSE={self.mmse}/30") | |
| if self.moca is not None: | |
| cog_parts.append(f"MoCA={self.moca}/30") | |
| if self.adas_cog11 is not None: | |
| cog_parts.append(f"ADAS-Cog11={self.adas_cog11:.1f}") | |
| if self.cdr_sb is not None: | |
| cog_parts.append(f"CDR-SB={self.cdr_sb:.1f}") | |
| if cog_parts: | |
| lines.append(f"Cognitive scores: {' | '.join(cog_parts)}") | |
| # Genetics | |
| if self.apoe_genotype != APOEGenotype.UNKNOWN: | |
| risk = "" | |
| if self.apoe_genotype == APOEGenotype.E4E4: | |
| risk = " β homozygous APOE4, highest AD risk" | |
| elif self.apoe_genotype == APOEGenotype.E3E4: | |
| risk = " β one APOE4 allele, elevated AD risk" | |
| elif self.apoe_genotype == APOEGenotype.E2E3: | |
| risk = " β APOE2 carrier, reduced AD risk" | |
| lines.append(f"Genetics: {self.apoe_genotype.value}{risk}") | |
| # Blood biomarker | |
| if self.ptau217 is not None: | |
| lines.append(f"Blood biomarker: pTau217={self.ptau217:.3f} pg/mL " | |
| f"(Lilly MSD600 assay)") | |
| # CSF biomarkers | |
| csf_parts = [] | |
| if self.csf_abeta42 is not None: | |
| csf_parts.append(f"Abeta42={self.csf_abeta42:.1f} pg/mL") | |
| if self.csf_ptau is not None: | |
| csf_parts.append(f"pTau={self.csf_ptau:.1f} pg/mL") | |
| if self.csf_ttau is not None: | |
| csf_parts.append(f"tTau={self.csf_ttau:.1f} pg/mL") | |
| if csf_parts: | |
| lines.append(f"CSF biomarkers: {' | '.join(csf_parts)}") | |
| # MRI | |
| mri_parts = [] | |
| if self.hippocampus is not None: | |
| mri_parts.append(f"Hippocampus={self.hippocampus:.0f} mm3") | |
| if self.entorhinal is not None: | |
| mri_parts.append(f"Entorhinal={self.entorhinal:.2f} mm") | |
| if mri_parts: | |
| lines.append(f"MRI volumes: {' | '.join(mri_parts)}") | |
| lines.append("Source: ADNI clinical trial data") | |
| return "\n".join(lines) | |
| def to_chunk_id(self) -> str: | |
| """Unique chunk ID for Pinecone.""" | |
| return f"adni_{self.rid}_{self.visit}" | |
| def word_count(self) -> int: | |
| return len(self.to_chunk_text().split()) | |
| # ββ ADNI diagnostic model βββββββββββββββββββββββββββββββββββββββββ | |
| class ADNIDiagnostic(BaseModel): | |
| """RE inspector for ADNI loading stage.""" | |
| total_records: int | |
| valid_records: int | |
| invalid_records: int | |
| diagnosis_counts: dict | |
| missing_mmse: int | |
| missing_ptau: int | |
| missing_mri: int | |
| upserted: int | |
| duration_secs: float | |
| def log_summary(self) -> None: | |
| logger.info("=" * 60) | |
| logger.info("[ADNI-DIAGNOSTIC] Run complete") | |
| logger.info(f" Total records : {self.total_records:,}") | |
| logger.info(f" Valid records : {self.valid_records:,}") | |
| logger.info(f" Invalid records : {self.invalid_records:,}") | |
| logger.info(f" Diagnosis dist : {self.diagnosis_counts}") | |
| logger.info(f" Missing MMSE : {self.missing_mmse:,}") | |
| logger.info(f" Missing pTau217 : {self.missing_ptau:,}") | |
| logger.info(f" Missing MRI : {self.missing_mri:,}") | |
| logger.info(f" Upserted : {self.upserted:,}") | |
| logger.info(f" Duration : {self.duration_secs:.1f}s") | |
| logger.info("=" * 60) | |
| # ββ Core ADNI loader class ββββββββββββββββββββββββββββββββββββββββ | |
| class ADNILoader: | |
| """ | |
| Enterprise ADNI data loader. | |
| Analogy: The hospital records clerk. | |
| Takes 9 separate department files (lab, radiology, genetics), | |
| staples them together per patient-visit, | |
| validates each complete record, | |
| files it in the main library (Pinecone). | |
| Usage: | |
| loader = ADNILoader() | |
| diagnostic = loader.run() | |
| """ | |
| _MAX_RETRIES: int = 3 | |
| _RETRY_BACKOFF: float = 2.0 | |
| _UPSERT_BATCH: int = 100 | |
| def __init__(self) -> None: | |
| self.settings = get_settings() | |
| self.adni_path = self.settings.adni_data_path | |
| self._verify_files() | |
| self._setup_pinecone() | |
| self._setup_embedder() | |
| def _verify_files(self) -> None: | |
| """Verify all required files exist β fail fast if missing.""" | |
| missing = [] | |
| for name, filename in FILES.items(): | |
| path = self.adni_path / filename | |
| if not path.exists(): | |
| missing.append(filename) | |
| if missing: | |
| logger.error(f"[ADNI] Missing files: {missing}") | |
| raise FileNotFoundError( | |
| f"Missing ADNI files in {self.adni_path}: {missing}" | |
| ) | |
| logger.info(f"[ADNI] All {len(FILES)} files verified") | |
| def _setup_pinecone(self) -> None: | |
| """Connect to Pinecone β same index as PubMed chunks.""" | |
| from pinecone import Pinecone | |
| pc = Pinecone(api_key=self.settings.pinecone_api_key) | |
| self.index = pc.Index(self.settings.pinecone_index_name) | |
| stats = self.index.describe_index_stats() | |
| logger.info( | |
| f"[ADNI] Pinecone connected | " | |
| f"existing vectors: {stats.total_vector_count:,}" | |
| ) | |
| def _setup_embedder(self) -> None: | |
| """Load embedding model β same as PubMed pipeline.""" | |
| from sentence_transformers import SentenceTransformer | |
| logger.info(f"[ADNI] Loading model: {self.settings.embedding_model}") | |
| self.model = SentenceTransformer(self.settings.embedding_model) | |
| logger.info("[ADNI] Embedding model loaded") | |
| def _load_csv(self, key: str) -> pd.DataFrame: | |
| """Load one ADNI CSV file β all columns as string.""" | |
| path = self.adni_path / FILES[key] | |
| df = pd.read_csv(path, dtype=str, keep_default_na=False) | |
| logger.info(f"[ADNI] Loaded {key}: {df.shape[0]:,} rows") | |
| return df | |
| def _load_master(self) -> pd.DataFrame: | |
| """Load My_Table β master record with subject+visit+diagnosis.""" | |
| df = self._load_csv("master") | |
| # Normalize diagnosis | |
| def map_diagnosis(v: str) -> ADNIDiagnosis: | |
| mapping = { | |
| "1": ADNIDiagnosis.CN, | |
| "2": ADNIDiagnosis.MCI, | |
| "3": ADNIDiagnosis.AD, | |
| } | |
| return mapping.get(str(v).strip(), ADNIDiagnosis.UNKNOWN) | |
| df["diagnosis_mapped"] = df["DIAGNOSIS"].apply(map_diagnosis) | |
| df["subject_id_clean"] = df["subject_id"].str.strip() | |
| df["visit_clean"] = df["visit"].str.strip().str.lower() | |
| return df | |
| def _load_mmse(self) -> pd.DataFrame: | |
| """Load MMSE β extract MMSCORE per PTID+VISCODE.""" | |
| df = self._load_csv("mmse") | |
| df["mmse_val"] = df["MMSCORE"].apply(safe_int) | |
| return df[["PTID", "VISCODE", "mmse_val"]].copy() | |
| def _load_adas(self) -> pd.DataFrame: | |
| """Load ADAS β extract TOTSCORE and TOTAL13.""" | |
| df = self._load_csv("adas") | |
| df["adas11_val"] = df["TOTSCORE"].apply(safe_float) | |
| df["adas13_val"] = df["TOTAL13"].apply(safe_float) | |
| return df[["PTID", "VISCODE", "adas11_val", "adas13_val"]].copy() | |
| def _load_moca(self) -> pd.DataFrame: | |
| """Load MoCA β total score column is 'MOCA'.""" | |
| df = self._load_csv("moca") | |
| df["moca_val"] = df["MOCA"].apply(safe_int) | |
| return df[["PTID", "VISCODE", "moca_val"]].copy() | |
| def _load_cdr(self) -> pd.DataFrame: | |
| """Load CDR β extract CDRSB (sum of boxes).""" | |
| df = self._load_csv("cdr") | |
| cdr_col = next( | |
| (c for c in df.columns if "CDRSB" in c.upper() or | |
| ("CDR" in c.upper() and "SB" in c.upper())), None | |
| ) | |
| if cdr_col: | |
| df["cdrsb_val"] = df[cdr_col].apply(safe_float) | |
| else: | |
| logger.warning("[ADNI] CDRSB column not found") | |
| df["cdrsb_val"] = None | |
| vis_col = "VISCODE2" if "VISCODE2" in df.columns else "VISCODE" | |
| return df[["PTID", vis_col, "cdrsb_val"]].rename( | |
| columns={vis_col: "VISCODE"} | |
| ).copy() | |
| def _load_apoe(self) -> pd.DataFrame: | |
| """Load APOERES β map GENOTYPE to APOEGenotype enum.""" | |
| df = self._load_csv("apoe") | |
| def map_genotype(v: str) -> APOEGenotype: | |
| mapping = { | |
| "2/2": APOEGenotype.E2E2, | |
| "2/3": APOEGenotype.E2E3, | |
| "2/4": APOEGenotype.E2E4, | |
| "3/3": APOEGenotype.E3E3, | |
| "3/4": APOEGenotype.E3E4, | |
| "4/3": APOEGenotype.E3E4, | |
| "4/4": APOEGenotype.E4E4, | |
| } | |
| return mapping.get(str(v).strip(), APOEGenotype.UNKNOWN) | |
| df["apoe_val"] = df["GENOTYPE"].apply(map_genotype) | |
| # APOE has one row per subject β use PTID only for join | |
| return df[["PTID", "apoe_val"]].drop_duplicates("PTID").copy() | |
| def _load_lilly(self) -> pd.DataFrame: | |
| """Load Lilly pTau217 β strip legal comment, extract ORRES.""" | |
| df = self._load_csv("lilly") | |
| # Skip rows where test was not done | |
| df = df[~df["STAT"].isin(["NOT DONE", "not done"])].copy() | |
| df["ptau217_val"] = df["ORRES"].apply(safe_float) | |
| vis_col = "VISCODE2" if "VISCODE2" in df.columns else "VISCODE" | |
| return df[["PTID", vis_col, "ptau217_val"]].rename( | |
| columns={vis_col: "VISCODE"} | |
| ).copy() | |
| def _load_csf(self) -> pd.DataFrame: | |
| """Load UPenn/Roche CSF biomarkers β Abeta42, pTau, tTau.""" | |
| df = self._load_csv("csf") | |
| # Find column names β they vary by file version | |
| abeta_col = next((c for c in df.columns if "ABETA" in c.upper() | |
| and "42" in c), None) | |
| ptau_col = next((c for c in df.columns if "PTAU" in c.upper() | |
| and "ABETA" not in c.upper()), None) | |
| ttau_col = next((c for c in df.columns if "TTAU" in c.upper() | |
| or "TAU" in c.upper() and "P" not in c.upper()), None) | |
| df["abeta42_val"] = df[abeta_col].apply(safe_float) if abeta_col else None | |
| df["ptau_val"] = df[ptau_col].apply(safe_float) if ptau_col else None | |
| df["ttau_val"] = df[ttau_col].apply(safe_float) if ttau_col else None | |
| vis_col = "VISCODE2" if "VISCODE2" in df.columns else "VISCODE" | |
| return df[["PTID", vis_col, "abeta42_val", "ptau_val", "ttau_val"]].rename( | |
| columns={vis_col: "VISCODE"} | |
| ).copy() | |
| def _load_mri(self) -> pd.DataFrame: | |
| """ | |
| Key_MRI contains scanner metadata β not brain volume measurements. | |
| Brain volumes require processed FreeSurfer output files. | |
| Return empty dataframe β MRI volumes skipped for Plan 3. | |
| """ | |
| logger.warning( | |
| "[ADNI] Key_MRI contains scanner metadata only β " | |
| "no hippocampal volumes available. Skipping MRI for Plan 3." | |
| ) | |
| return pd.DataFrame(columns=["PTID", "VISCODE", "hipp_val", "ent_val"]) | |
| def _build_records(self) -> tuple[list[ADNIRecord], int]: | |
| """ | |
| Join all 9 files and build validated ADNIRecord objects. | |
| Strategy: | |
| 1. Start with My_Table (master) | |
| 2. Left join each clinical file on PTID + VISCODE | |
| 3. Left join APOE on PTID only (one row per subject) | |
| 4. Validate each merged row through ADNIRecord | |
| """ | |
| logger.info("[ADNI] Loading all files...") | |
| master = self._load_master() | |
| mmse = self._load_mmse() | |
| adas = self._load_adas() | |
| moca = self._load_moca() | |
| cdr = self._load_cdr() | |
| apoe = self._load_apoe() | |
| lilly = self._load_lilly() | |
| csf = self._load_csf() | |
| mri = self._load_mri() | |
| logger.info("[ADNI] Joining files...") | |
| # Normalize join keys in all files | |
| for df in [mmse, adas, moca, cdr, lilly, csf, mri]: | |
| if "PTID" in df.columns: | |
| df["PTID"] = df["PTID"].str.strip() | |
| if "VISCODE" in df.columns: | |
| df["VISCODE"] = df["VISCODE"].str.strip().str.lower() | |
| apoe["PTID"] = apoe["PTID"].str.strip() | |
| # Join on subject_id + visit | |
| merged = master.copy() | |
| merged = merged.merge( | |
| mmse, left_on=["subject_id_clean", "visit_clean"], | |
| right_on=["PTID", "VISCODE"], how="left" | |
| ) | |
| merged = merged.merge( | |
| adas, left_on=["subject_id_clean", "visit_clean"], | |
| right_on=["PTID", "VISCODE"], how="left", suffixes=("", "_adas") | |
| ) | |
| merged = merged.merge( | |
| moca, left_on=["subject_id_clean", "visit_clean"], | |
| right_on=["PTID", "VISCODE"], how="left", suffixes=("", "_moca") | |
| ) | |
| merged = merged.merge( | |
| cdr, left_on=["subject_id_clean", "visit_clean"], | |
| right_on=["PTID", "VISCODE"], how="left", suffixes=("", "_cdr") | |
| ) | |
| merged = merged.merge( | |
| lilly, left_on=["subject_id_clean", "visit_clean"], | |
| right_on=["PTID", "VISCODE"], how="left", suffixes=("", "_lilly") | |
| ) | |
| merged = merged.merge( | |
| csf, left_on=["subject_id_clean", "visit_clean"], | |
| right_on=["PTID", "VISCODE"], how="left", suffixes=("", "_csf") | |
| ) | |
| merged = merged.merge( | |
| mri, left_on=["subject_id_clean", "visit_clean"], | |
| right_on=["PTID", "VISCODE"], how="left", suffixes=("", "_mri") | |
| ) | |
| # APOE β join on subject only | |
| merged = merged.merge( | |
| apoe, left_on="subject_id_clean", | |
| right_on="PTID", how="left", suffixes=("", "_apoe") | |
| ) | |
| logger.info(f"[ADNI] Merged dataset: {merged.shape[0]:,} rows") | |
| # Build validated ADNIRecord objects | |
| records = [] | |
| invalid = 0 | |
| import math | |
| def clean(v): | |
| """Convert nan and missing sentinels to None before Pydantic sees it.""" | |
| if v is None: | |
| return None | |
| if isinstance(v, float) and math.isnan(v): | |
| return None | |
| if str(v).strip() in MISSING_VALUES: | |
| return None | |
| return v | |
| for _, row in tqdm(merged.iterrows(), | |
| total=len(merged), | |
| desc="Validating", | |
| unit="record"): | |
| try: | |
| # Extract RID from subject_id (002_S_0295 β 0295) | |
| subject_id = str(row.get("subject_id_clean", "")).strip() | |
| rid_raw = subject_id.split("_")[-1] if "_" in subject_id else "0000" | |
| # APOE β clean nan before enum validation | |
| apoe_raw = row.get("apoe_val") | |
| apoe_val = (apoe_raw if isinstance(apoe_raw, APOEGenotype) | |
| else APOEGenotype.UNKNOWN) | |
| record = ADNIRecord( | |
| subject_id = subject_id, | |
| rid = rid_raw, | |
| visit = str(row.get("visit_clean", "")).strip(), | |
| diagnosis = row.get("diagnosis_mapped", ADNIDiagnosis.UNKNOWN), | |
| mmse = clean(row.get("mmse_val")), | |
| adas_cog11 = clean(row.get("adas11_val")), | |
| adas_cog13 = clean(row.get("adas13_val")), | |
| moca = clean(row.get("moca_val")), | |
| cdr_sb = clean(row.get("cdrsb_val")), | |
| apoe_genotype = apoe_val, | |
| ptau217 = clean(row.get("ptau217_val")), | |
| csf_abeta42 = clean(row.get("abeta42_val")), | |
| csf_ptau = clean(row.get("ptau_val")), | |
| csf_ttau = clean(row.get("ttau_val")), | |
| hippocampus = clean(row.get("hipp_val")), | |
| entorhinal = clean(row.get("ent_val")), | |
| ) | |
| records.append(record) | |
| except Exception as e: | |
| invalid += 1 | |
| logger.debug(f"[ADNI] Record invalid: {e}") | |
| logger.info( | |
| f"[ADNI] Validated: {len(records):,} valid | {invalid:,} invalid" | |
| ) | |
| return records, invalid | |
| def _upsert_records(self, records: list[ADNIRecord]) -> int: | |
| """ | |
| Embed chunk texts and upsert to Pinecone. | |
| Same pattern as PubMed embedder β batches of 100. | |
| """ | |
| total_upserted = 0 | |
| for i in tqdm( | |
| range(0, len(records), self._UPSERT_BATCH), | |
| desc="Upserting", | |
| unit="batch" | |
| ): | |
| batch = records[i : i + self._UPSERT_BATCH] | |
| texts = [r.to_chunk_text() for r in batch] | |
| # Embed | |
| try: | |
| embeddings = self.model.encode( | |
| texts, | |
| batch_size=32, | |
| show_progress_bar=False, | |
| convert_to_numpy=True, | |
| ) | |
| except Exception as e: | |
| logger.error(f"[ADNI] Embedding failed batch {i}: {e}") | |
| continue | |
| # Build Pinecone vectors | |
| vectors = [] | |
| for record, embedding in zip(batch, embeddings): | |
| # Pinecone rejects None β only include fields with values | |
| metadata = { | |
| "subject_id": record.subject_id, | |
| "rid": record.rid, | |
| "visit": record.visit, | |
| "diagnosis": record.diagnosis.value, | |
| "apoe": record.apoe_genotype.value, | |
| "source": "adni", | |
| "text": record.to_chunk_text()[:1000], | |
| } | |
| if record.mmse is not None: metadata["mmse"] = record.mmse | |
| if record.adas_cog11 is not None: metadata["adas_cog11"] = record.adas_cog11 | |
| if record.moca is not None: metadata["moca"] = record.moca | |
| if record.cdr_sb is not None: metadata["cdr_sb"] = record.cdr_sb | |
| if record.ptau217 is not None: metadata["ptau217"] = record.ptau217 | |
| if record.csf_abeta42 is not None: metadata["csf_abeta42"] = record.csf_abeta42 | |
| if record.csf_ptau is not None: metadata["csf_ptau"] = record.csf_ptau | |
| if record.csf_ttau is not None: metadata["csf_ttau"] = record.csf_ttau | |
| if record.hippocampus is not None: metadata["hippocampus"] = record.hippocampus | |
| vectors.append({ | |
| "id": record.to_chunk_id(), | |
| "values": embedding.tolist(), | |
| "metadata": metadata, | |
| }) | |
| # Upsert with retry | |
| for attempt in range(1, self._MAX_RETRIES + 1): | |
| try: | |
| self.index.upsert(vectors=vectors) | |
| total_upserted += len(vectors) | |
| break | |
| except Exception as e: | |
| logger.warning( | |
| f"[ADNI] Upsert attempt {attempt}/{self._MAX_RETRIES}: {e}" | |
| ) | |
| if attempt < self._MAX_RETRIES: | |
| time.sleep(self._RETRY_BACKOFF * attempt) | |
| return total_upserted | |
| def run(self) -> ADNIDiagnostic: | |
| """ | |
| Main entry point β load, validate, embed, upsert all ADNI data. | |
| """ | |
| start_time = time.time() | |
| logger.info("[ADNI] Starting enterprise ADNI loader") | |
| records, invalid_count = self._build_records() | |
| # Count diagnosis distribution | |
| dx_counts = {} | |
| for r in records: | |
| dx_counts[r.diagnosis.value] = dx_counts.get(r.diagnosis.value, 0) + 1 | |
| upserted = self._upsert_records(records) | |
| duration = round(time.time() - start_time, 1) | |
| diagnostic = ADNIDiagnostic( | |
| total_records = len(records) + invalid_count, | |
| valid_records = len(records), | |
| invalid_records = invalid_count, | |
| diagnosis_counts= dx_counts, | |
| missing_mmse = sum(1 for r in records if r.mmse is None), | |
| missing_ptau = sum(1 for r in records if r.ptau217 is None), | |
| missing_mri = sum(1 for r in records if r.hippocampus is None), | |
| upserted = upserted, | |
| duration_secs = duration, | |
| ) | |
| diagnostic.log_summary() | |
| return diagnostic | |
| # ββ RE probe ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def diagnose_adni() -> None: | |
| """ | |
| RE probe β check ADNI vectors in Pinecone. | |
| Usage: | |
| python -c "from preprocessing.adni_loader import diagnose_adni; diagnose_adni()" | |
| """ | |
| from pinecone import Pinecone | |
| settings = get_settings() | |
| pc = Pinecone(api_key=settings.pinecone_api_key) | |
| index = pc.Index(settings.pinecone_index_name) | |
| stats = index.describe_index_stats() | |
| logger.info("=" * 60) | |
| logger.info("[RE-ADNI] Pinecone index health check") | |
| logger.info(f" Total vectors : {stats.total_vector_count:,}") | |
| logger.info(f" Namespaces : {dict(stats.namespaces)}") | |
| # Test query for ADNI-specific content | |
| from sentence_transformers import SentenceTransformer | |
| model = SentenceTransformer(settings.embedding_model) | |
| vector = model.encode("ADNI patient MMSE score MCI diagnosis", | |
| convert_to_numpy=True).tolist() | |
| results = index.query( | |
| vector=vector, top_k=3, | |
| include_metadata=True, | |
| filter={"source": {"$eq": "adni"}} | |
| ) | |
| logger.info(f" ADNI chunks found: {len(results.matches)}") | |
| for m in results.matches: | |
| logger.info(f" {m.id} | score={m.score:.3f} | " | |
| f"dx={m.metadata.get('diagnosis')}") | |
| logger.info("=" * 60) |