"""Hypothesis engine: batch-generate, persist, and rank testable hypotheses. Phase 3 of the NeuroClaw discovery loop: 1. batch_generate() — traverse the graph to produce hypotheses at scale 2. save / load — persist hypotheses to JSON 3. rank_hypotheses() — sort by novelty, evidence, testability, confidence 4. (Phase 5-6) hypotheses become executable NeuroClaw analysis tasks Usage: from core.knowledge_graph import load_graph, HypothesisEngine kg = load_graph() engine = HypothesisEngine(kg) # batch generate across all domain pairs hypotheses = engine.batch_generate() engine.save_hypotheses(hypotheses, "data/hypotheses.json") # or load and re-rank hypotheses = engine.load_hypotheses("data/hypotheses.json") ranked = engine.rank_hypotheses(hypotheses) """ from __future__ import annotations import json import logging import math import re from dataclasses import asdict, dataclass, field from pathlib import Path from typing import Optional import networkx as nx from .graph_manager import KnowledgeGraph from .schema import ConceptNode logger = logging.getLogger(__name__) # ── data structures ──────────────────────────────────────────────────── @dataclass class HypothesisLink: """A single step in a hypothesis chain.""" from_id: str from_name: str to_id: str to_name: str relation_type: str confidence: float claim_id: str = "" raw_text: str = "" evidence: dict = field(default_factory=dict) source_paper: dict = field(default_factory=dict) @dataclass class Hypothesis: """A generated hypothesis with full evidence chain.""" id: str = "" hypothesis_type: str = "" # "path", "bridge", "gap", "contradiction" source_id: str = "" source_name: str = "" target_id: str = "" target_name: str = "" path: list[HypothesisLink] = field(default_factory=list) confidence_score: float = 0.0 novelty_score: float = 0.0 evidence_score: float = 0.0 testability_score: float = 0.0 composite_score: float = 0.0 supporting_claims: list[str] = field(default_factory=list) explanation: str = "" testability_reason: str = "" metadata: dict = field(default_factory=dict) critic_score: float = 0.0 critic_feedback: list[dict] = field(default_factory=list) critic_rounds: int = 0 evolve_score: float = 0.0 def to_dict(self) -> dict: d = asdict(self) return d @classmethod def from_dict(cls, d: dict) -> Hypothesis: d = d.copy() if "path" in d and isinstance(d["path"], list): d["path"] = [HypothesisLink(**p) if isinstance(p, dict) else p for p in d["path"]] return cls(**{k: v for k, v in d.items() if k in cls.__dataclass_fields__}) @dataclass class Contradiction: """A pair of conflicting claims.""" concept_a_id: str = "" concept_a_name: str = "" concept_b_id: str = "" concept_b_name: str = "" claim_for_id: str = "" claim_for_predicate: str = "" claim_for_text: str = "" claim_against_id: str = "" claim_against_predicate: str = "" claim_against_text: str = "" severity: float = 0.0 @dataclass class Gap: """An unexplored relationship between two concepts.""" concept_a_id: str = "" concept_a_name: str = "" concept_b_id: str = "" concept_b_name: str = "" distance: int = 0 connecting_concepts: list[str] = field(default_factory=list) domain_a: str = "" domain_b: str = "" potential_relation: str = "" # ── constants ────────────────────────────────────────────────────────── OPPOSING_PREDICATES = { ("increases", "reduces"), ("reduces", "increases"), ("causes", "inhibits"), ("inhibits", "causes"), ("treats", "contraindicated_for"), ("contraindicated_for", "treats"), ("activates", "inhibits"), ("inhibits", "activates"), } # Review-only study types (no independent empirical evidence). # Used by compute_frequency_boost and compute_temporal_decay. Edge-level # weighting by study_type lives in phase4_optimize.apply_evidence_weighting. _REVIEW_TYPES = {"review", "narrative_review", "systematic_review"} COMMON_RELATIONS = {"is_a", "part_of", "associated_with", "about", "is_associated_with"} # Noisy entity name patterns — hypotheses involving these are low quality. # Two categories: # (a) process-word ≠ entity: nominalized verbs/states ("loss", "progression") # that pop up as bridge nodes but carry no biological content. # (b) generic containers: vague collective terms ("tissue volumes", "Family") # that don't refer to a specific measurable thing. _NOISE_WORDS = frozenset({ # original set "unseen", "risk", "effect", "level", "status", "change", "type", "group", "factor", "model", "method", "unknown", "other", "none", "miscellaneous", "various", "difference", "increase", "decrease", # nominalized processes/states (category a) "loss", "progression", "reduction", "elevation", "alteration", "disruption", "dysfunction", "impairment", "deterioration", "improvement", "recovery", "response", "onset", "activation", "inhibition", "regulation", "modulation", "stimulation", "expression", "function", "functions", # generic containers (category b) "family", "members", "phenomenon", "phenomena", "processes", "mechanisms", "pathways", "symptoms", "manifestations", "volumes", "volume", # life events / demographics that are not biological entities "stress", "life", "events", "exposure", "outcome", "outcomes", "quality", }) NOISE_PATTERNS = [ re.compile(r"^[A-Z][a-z]?$"), # 1-2 letter: "Id", "Ca", "Mg" re.compile(r"^[A-Z][a-z]{2,4}$"), # Short mixed-case: "Tics", "Risk" re.compile(r"^\d+$"), # Pure numbers ] # (C-1) Generic-phrase patterns for INTERMEDIATE nodes. The token-based # `_NOISE_WORDS` filter misses phrases like "functional connectivity" or # "neural activity" because no individual word is in the noise list, but # the WHOLE phrase carries no measurable content. We only block these when # they appear as INTERMEDIATE nodes (paths can legitimately end in # "functional connectivity" as an outcome metric). _GENERIC_INTERMEDIATE_PATTERNS = [ re.compile(r"^(abnormal|altered|impaired|reduced|increased|disrupted|aberrant)?\s*" r"(brain|neural|neuronal|cortical|cerebral)\s+" r"(activity|activation|function|functioning|connectivity|" r"network|networks|signaling|metabolism|response|responses)$", re.I), re.compile(r"^(functional|structural|anatomical|effective)\s+" r"(connectivity|network|networks|integrity|abnormalit(y|ies))$", re.I), re.compile(r"^(disease|symptom|clinical|treatment|therapeutic)\s+" r"(progression|outcome|outcomes|response|severity|burden|stage|staging)$", re.I), re.compile(r"^(common|typical|specific|various|different)\s+" r"(features|patterns|mechanisms|processes)$", re.I), re.compile(r"^(neuro)?(degeneration|inflammation|protection|plasticity|genesis|imaging)$", re.I), re.compile(r"^(grey|gray|white)\s+matter$", re.I), re.compile(r"^(cognitive|behavioral|emotional|motor|sensory)\s+" r"(deficit|deficits|dysfunction|impairment|abnormalit(y|ies))$", re.I), ] # (C-3) Target-name patterns that LOOK like outcomes (so they pass # _is_dataset_outcome's keyword fallback) but are actually too broad to # drive a DL experiment. We block these even if their domain says # disease/cognitive_function. _TARGET_TOO_BROAD_PATTERNS = [ # bare umbrella nouns (single token) re.compile(r"^(skill|skills|ability|abilities|outcome|outcomes|" r"symptom|symptoms|manifestation|manifestations|" r"phenomenon|phenomena|finding|findings|" r"deficit|deficits|impairment|impairments|" r"function|functions|functioning|behavior|behaviors|" r"capability|capabilities|condition|conditions|" r"disease|diseases|disorder|disorders|syndrome|syndromes|" r"focus|integration|balance|knowledge|autonomy|" r"performance|adaptation|resilience|vulnerability|" r"recovery|progression|mechanism|process)$", re.I), # broad-category disease umbrellas (when these are the literal target, # they're too generic — but specific subtypes like "Alzheimer Disease" # don't match these patterns) re.compile(r"^(neurological|psychiatric|mental|cognitive|behavioral|" r"neurodegenerative|cardiovascular)\s+" r"(disease|diseases|disorder|disorders|condition|conditions)$", re.I), re.compile(r"^(human\s+)?(disease|diseases|disorder|disorders)$", re.I), re.compile(r"^(brain|mental|psychiatric|psychological)\s+health$", re.I), re.compile(r"^clinical\s+(features|outcome|outcomes|presentation|status)$", re.I), # "X deficits/impairments" patterns (too vague as targets) re.compile(r"^(motor|cognitive|neurocognitive|functional|social|" r"verbal|visual|sensory|emotional|behavioral)\s+" r"(deficit|deficits|impairment|impairments|dysfunction|" r"disability|decline|deterioration)$", re.I), ] # Vague relation types that add little signal VAGUE_RELATIONS = {"is_associated_with", "associated_with", "about"} # CognitiveAtlas / MeSH concept ids that are top-degree generic hubs # in the KG. The audit found these at degrees 700-9000+, with names that # are real English words (not caught by _NOISE_WORDS) but referring to # extremely abstract umbrella concepts: # # COGAT trm_4a3fd79d0a891 "memory" degree 2248 # COGAT trm_4a3fd79d0a80f "logic" degree 2052 # COGAT trm_5159c80c1dd24 "loss" degree 1034 # COGAT trm_4a3fd79d09741 "activation" degree 840 # COGAT trm_4a3fd79d0afcf "risk" degree 722 # COGAT trm_4a3fd79d0b2a8 "stress" degree 139 # MSH:D001921 "Brain" degree 9157 # MSH:D009474 "Neurons" degree 1354 # # Hypotheses with these as intermediate nodes or endpoints are too vague # to drive a downstream DL experiment ("FPN -> memory" is not testable # because we don't know which memory subsystem). Filtered in post_process. PATH_IGNORE_NODE_IDS = frozenset({ "COGAT_CONCEPT:trm_4a3fd79d0a891", # memory "COGAT_CONCEPT:trm_4a3fd79d0a80f", # logic "COGAT_CONCEPT:trm_5159c80c1dd24", # loss "COGAT_CONCEPT:trm_4a3fd79d09741", # activation "COGAT_CONCEPT:trm_4a3fd79d0afcf", # risk "COGAT_CONCEPT:trm_4a3fd79d0b2a8", # stress "MSH:D001921", # Brain (umbrella) "MSH:D009474", # Neurons (umbrella) }) # Disease/category mega-hubs that are valid as hypothesis endpoints # ("predict Alzheimer" is fine) but NOT as intermediate transit nodes # ("A → Alzheimer → B" is just "A relates to AD, AD relates to B" — no # discovery value). Audit found 37.8% of hypotheses transit through these. INTERMEDIATE_ONLY_IGNORE_IDS = frozenset({ "COGAT_DISORDER:dso_5419", # schizophrenia (degree 1005) "MSH:D009103", # Multiple Sclerosis (816) "COGAT_DISORDER:dso_3312", # bipolar disorder (703) "MSH:D000544", # Alzheimer Disease (746) "MSH:D004827", # Epilepsy (750) "MSH:D010300", # Parkinson Disease (709) "COGAT_DISORDER:dso_0060041", # autism spectrum disorder (613) "MSH:D001289", # ADHD (601) "MSH:D003863", # Depression (577) "MSH:D001523", # Mental Disorders (489) }) DIRECTIONAL_RELATIONS = { "causes", "treats", "increases", "reduces", "modulates", "activates", "inhibits", "is_biomarker_of", "is_risk_factor_for", "predicts", "distinguishes", "mediates", # Brain decoding directional predicates "evokes", "decoded_from", "elicits", } # domain pairs worth exploring — aligned with NeuroClaw imaging experiments # target datasets: UKB (T1w/dMRI/rfMRI/SWI), ADNI (T1w/PET/fMRI/DTI), HCP-YA (T1w/T2w/fMRI/dMRI/MEG) # experiment models: BrainGNN, NeuroStorm, SVM, XGBoost on raw images + handcrafted features # # Design principle: target should be a dataset OUTCOME (what we want to predict), # source should be a MEASURABLE feature (what the dataset provides as input). # - UKB outcomes: fluid intelligence, neuroticism, dementia diagnosis, motor tests # - ADNI outcomes: MCI→AD conversion, CDR-SB, cognitive composite # - HCP outcomes: fluid/crystallized IQ, emotion recognition, personality traits # # Allowed sources (what we can measure): neuroanatomy (MRI regions), connectivity # networks, gene, biomarker (CSF/PET), drug (for intervention studies). # Allowed targets (what we predict): disease (diagnostic labels), cognitive_function # (the OUTCOMES — includes behavior, personality, affect). DEFAULT_DOMAIN_PAIRS = [ # core: measurable features → clinical/behavioral OUTCOMES ("neuroanatomy", "disease"), # MRI → diagnosis ("neuroanatomy", "cognitive_function"), # MRI → cognition/behavior ("connectivity", "disease"), # dMRI/fMRI connectivity → diagnosis ("connectivity", "cognitive_function"), # connectivity → cognition # genetics → outcomes (UKB 500k WGS) ("gene", "disease"), ("gene", "cognitive_function"), # GWAS → behavior/IQ # fluid biomarkers → outcomes (ADNI CSF, blood) ("biomarker", "disease"), ("biomarker", "cognitive_function"), # drug → outcomes (ADNI pharmaceutical arms) ("drug", "disease"), ("drug", "cognitive_function"), # cross-outcome (comorbidity, transdiagnostic) ("disease", "disease"), ("cognitive_function", "disease"), # e.g. anxiety → MS diagnosis risk ("disease", "cognitive_function"), # e.g. AD → processing speed decline ] # Domains that are NOT directly measurable from brain imaging # These hypotheses will be filtered out in post_process NON_MEASURABLE_BIOMARKER_TYPES = { "neurotransmitter", # needs specialized PET tracers (e.g., 11C-raclopride for DA) "protein", # needs tissue biopsy or CSF "enzyme", # needs molecular assays "receptor", # needs specialized PET (e.g., 11C-PIB for Aβ, but that's biomarker domain) # fluid biomarkers — not available in UKB/HCP-YA, only ADNI CSF subset "csf_biomarker", "blood_biomarker", "saliva_biomarker", "tear_biomarker", } # Specific entity name patterns that are NOT directly measurable from imaging _NON_MEASURABLE_PATTERNS = [ re.compile(r"(neurotransmitter|dopamine|serotonin|norepinephrine|gaba|glutamate|acetylcholine)\s+(level|concentration|release|synthesis)", re.I), re.compile(r"(alpha|beta|gamma|delta|kappa)\s*synuclein\s*(pathology|aggregation|expression)", re.I), re.compile(r"(amyloid|tau|phosphorylated)\s*(beta|protein|peptide)\s*(aggregation|production|clearance)", re.I), re.compile(r"(enzyme|kinase|phosphatase)\s*(activity|expression)", re.I), re.compile(r"(receptor|transporter)\s*(density|binding|expression)", re.I), re.compile(r"(TNF|interleukin|IL-\d|cytokine|chemokine)\s*(alpha|beta|level|concentration|production)", re.I), re.compile(r"CSF\s+(Aβ|amyloid|tau|p-tau|NFL|neurofilament)", re.I), re.compile(r"(blood|plasma|serum)\s+(biomarker|marker|level|concentration)", re.I), re.compile(r"(CSF|cerebrospinal fluid)\s+", re.I), re.compile(r"(saliva|tear|urine)\s+(biomarker|marker|level)", re.I), re.compile(r"(biopsy|tissue sample)", re.I), ] # Non-neurological target domains — brain regions should not directly predict these _NON_NEUROLOGICAL_TARGETS = re.compile( r"(urinary|incontinence|frequency|enuresis|bladder|renal|kidney|liver|" r"gastrointestinal|cardiac|pulmonary|dermatol|orthopedic|musculoskeletal|" r"fracture|sprain|tumor|cancer|carcinoma|leukemia|lymphoma)", re.I ) # DATASET-OUTCOME whitelist — covers actual predicted variables in UKB/ADNI/HCP-YA # papers (see README "Dataset Outcomes" for references to typical prediction tasks). # Target must match one of these patterns to pass the post_process filter. # We also auto-accept any concept in the `disease` domain (clinical diagnosis # IS the most common outcome) and any MSH/CogAtlas concept in the # `cognitive_function` domain (behavior/cognition). # # Categories cover: # - Clinical diagnostic labels (Alzheimer, schizophrenia, MCI, etc.) — all 3 datasets # - AD staging / conversion (CN→MCI→AD, ATN) — ADNI # - Clinical scales (CDR, MMSE, ADAS-Cog, PHQ-9, MoCA, NPI) — ADNI + UKB # - Cognitive abilities (IQ, memory, attention, processing speed) — all 3 # - Specific cognitive tests (PMAT, flanker, N-back, delay discounting) — HCP # - Personality (Big Five) — HCP + UKB # - Behavior/affect (anxiety, depression, aggression, risk-taking) — all 3 # - Motor/sensory (grip strength, gait, reaction time, dexterity) — UKB + HCP # - Brain age / neurodegeneration markers — UKB + ADNI # - NeuroSTORM-evaluated phenotypes: MND, early psychosis (HCP-EP), ADHD200, # COBRE, UCLA L5c, TCP psychiatric scales, fMRI task state classification # - Subject fingerprinting / re-identification _OUTCOME_KEYWORDS = re.compile( r"(" # cognitive abilities — general r"intelligence|cognition|cognitive\s+(function|ability|performance|deterioration|impairment|dysfunction|decline|test|assessment|composite|score)|" r"memory|attention|executive|processing\s+speed|reasoning|language|" r"fluency|perception|reaction\s+time|fluid\s+intelligence|" r"crystallized\s+intelligence|working\s+memory|episodic\s+memory|" r"semantic\s+memory|verbal\s+(memory|fluency|learning)|visuospatial|" # specific HCP NIH Toolbox / cognitive tasks r"pmat|flanker|card\s+sort|n-?back|list\s+sort|picture\s+sequence|" r"pattern\s+comparison|picture\s+vocabulary|oral\s+reading|" r"delay\s+discounting|risk[- ]taking|go[- ]no[- ]go|" # HCP Penn CNB cognitive battery r"penn\s+(word|matrix|line\s+orientation|continuous\s+performance|progressive\s+matrices|fear|emotion|cnb)|" r"matrix\s+pattern|numeric\s+memory|prospective\s+memory|pairs\s+matching|" r"trail\s+making|symbol\s+digit|boston\s+naming|animal\s+fluency|" r"category\s+fluency|logical\s+memory|clock\s+drawing|ravlt|" # HCP 7 task states (NeuroSTORM state classification) r"emotion\s+task|gambling\s+task|language\s+task|motor\s+task|" r"relational\s+task|social\s+task|working\s+memory\s+task|" # clinical scales (ADNI/UKB/TCP/HCP) r"\b(cdr|cdr-sb|mmse|moca|adas|adas-cog|npi|faq|gds|phq-?9|gad-?7|bai|hdrs|hrsd|hamd|ham-d|" r"bdi|ymrs|panss|sans|saps|audit|asrs|pro|adi|srs|tci|neo-?ffi|asr|abcl|" r"cidi|cidi-sf|eysenck|swemwbs|psqi|ftnd|ssaga|masq|promis|upsit)\b|" r"adult\s+self\s+report|adult\s+behavior\s+checklist|" # personality / affect r"neuroticism|extraversion|agreeableness|conscientiousness|openness|" r"personality|temperament|affect|mood|emotion|anxiety|depression|" r"well-?being|satisfaction|life\s+satisfaction|psychological|stress\s+response|" r"anxiety\s+sensitivity|cautiousness|" r"affect\s+(positive|negative)|emotion\s+recognition|emotional\s+regulation|" r"perceived\s+(stress|rejection|hostility)|anger|fear|sadness|" # social functioning (HCP + UKB) r"loneliness|social\s+(isolation|support|relationship|cognition)|" r"meaning\s+and\s+purpose|instrumental\s+support|emotional\s+support|" r"friendship|" # behavior r"behavior|aggression|impulsivity|addiction|substance|alcohol|smoking|" r"tobacco|cannabis|cocaine|opiate|opioid|hallucinogen|" r"drug\s+use|substance\s+use|sleep\s+quality|insomnia|" # diagnoses / clinical outcomes — added NeuroSTORM-evaluated cohorts and ADNI stages r"alzheimer|parkinson|schizophrenia|autism|adhd|bipolar|epilepsy|" r"mci|mild\s+cognitive|dementia|psychosis|early\s+psychosis|stroke|post[- ]stroke|" r"multiple\s+sclerosis|huntington|frontotemporal|lewy\s+body|" r"motor\s+neuron\s+disease|mnd|als|" r"transdiagnostic|psychiatric\s+disorder|mental\s+health\s+disorder|" r"ocd|ptsd|phobia|panic|agoraphobia|somatoform|eating\s+disorder|" # ADNI-specific diagnostic stages r"\b(cn|smc|emci|lmci|ad\b|preclinical|at\b|atn|alzheimer\s+continuum)\b|" r"significant\s+memory\s+concern|subjective\s+(memory|cognitive)\s+(concern|complaint|decline)|" r"cognitively\s+(normal|unimpaired)|" r"disorder|syndrome|diagnosis|onset|conversion|progression|severity|" r"symptom|manifestation|prognosis|outcome|treatment\s+response|" r"disease\s+(stage|staging|duration|burden)|" # cardiovascular / metabolic diseases (UKB ICD-10) r"myocardial\s+infarction|heart\s+failure|hypertension|atrial\s+fibrillation|" r"coronary|cardiovascular\s+disease|diabetes|type\s*[12]\s+diabetes|" r"chronic\s+kidney|fatty\s+liver|nafld|metabolic\s+syndrome|obesity|" # AD-specific biomarker status r"amyloid\s+(status|positivity|positive|negative|load|burden|suvr)|" r"tau\s+(status|positivity|positive|tangle|pathology|burden|suvr)|" r"atn\s+(profile|stage|classification)|" r"neurodegeneration\s+(stage|status)|" # brain age / aging r"brain\s+age|brain-?age(-?gap)?|aging|age[- ]related|age\s+acceleration|" # motor / sensory r"grip\s+strength|gait|motor\s+coordination|motor\s+function|" r"balance|tremor|dexterity|walking\s+speed|two[- ]minute\s+walk|endurance|" r"visual\s+(acuity|field)|audition|hearing|olfaction|taste|pain|" r"chronic\s+pain|musculoskeletal\s+pain|" # mortality / longevity r"mortality|all-?cause\s+death|survival|life\s+expectancy" r")", re.I ) # Target domains considered as valid dataset outcomes _OUTCOME_DOMAINS = {"disease", "cognitive_function"} # NeuroClaw testable modalities and their keywords # Aligned with UKB/ADNI/HCP-YA available data + deep learning models TESTABLE_MODALITIES = { "sMRI": ["cortical thickness", "volume", "atrophy", "gray matter", "white matter", "brain structure", "morphometry", "VBM", "FreeSurfer", "recon-all", "brain region", "hippocampus", "amygdala", "thalamus", "caudate", "putamen", "cerebellum", "insula", "cortex", "ventricle"], "fMRI": ["functional connectivity", "BOLD", "activation", "resting-state", "task-based", "network", "default mode", "fMRI", "brain response", "neural activity", "brain activation"], "dMRI": ["DTI", "diffusion", "fractional anisotropy", "tractography", "white matter integrity", "structural connectivity", "FA", "MD", "connectivity matrix", "fiber bundle", "white matter tract"], "PET": ["PET", "tracer", "amyloid", "tau", "FDG", "SUVr", "binding potential", "glucose metabolism", "florbetapir", "flortaucipir"], "EEG": ["EEG", "ERP", "oscillation", "power spectrum", "alpha", "beta", "theta", "delta", "gamma", "microstate", "coherence", "event-related"], "organ_volume": ["organ volume", "liver volume", "kidney volume", "spleen volume", "MedSAM", "segmentation", "organ size"], } # Deep learning model keywords for testability scoring DL_MODEL_KEYWORDS = [ "BrainGNN", "NeuroStorm", "GNN", "graph neural", "region of interest", "ROI", "connectivity matrix", "adjacency", "node feature", "graph convolution", "deep learning", "CNN", "ResNet", "attention", "transformer", "voxel", "patch", "whole-brain", ] # ── Dataset-Available Variables ────────────────────────────────────── # Defines what can be measured in each dataset. Hypotheses must start # from these features and end at dataset-available outcomes. DATASET_FEATURES = { "UKB": { # sMRI (T1w): FreeSurfer-derived ROI measures "smri_cortical_thickness": {"modality": "sMRI", "tool": "FreeSurfer", "level": "ROI"}, "smri_subcortical_volume": {"modality": "sMRI", "tool": "FreeSurfer", "level": "ROI"}, "smri_cortical_area": {"modality": "sMRI", "tool": "FreeSurfer", "level": "ROI"}, "smri_cortical_volume": {"modality": "sMRI", "tool": "FreeSurfer", "level": "ROI"}, "smri_voxel": {"modality": "sMRI", "tool": "voxel", "level": "voxel"}, # dMRI: diffusion metrics per tract "dmri_fa": {"modality": "dMRI", "tool": "TBSS", "level": "tract"}, "dmri_md": {"modality": "dMRI", "tool": "TBSS", "level": "tract"}, "dmri_sc": {"modality": "dMRI", "tool": "tractography", "level": "connectivity"}, # rfMRI: functional connectivity "rfmri_fc": {"modality": "fMRI", "tool": "rfMRI", "level": "connectivity"}, # lesion segmentation "lesion_volume": {"modality": "sMRI", "tool": "MedSAM", "level": "ROI"}, # non-imaging "genetics": {"modality": "genetics", "tool": "WGS/GSA", "level": "SNP"}, "environment": {"modality": "environment", "tool": "questionnaire","level": "variable"}, "physical": {"modality": "physical", "tool": "measurement", "level": "variable"}, "hospitalization":{"modality": "clinical", "tool": "ICD10", "level": "outcome"}, }, "ADNI": { "smri_cortical_thickness": {"modality": "sMRI", "tool": "FreeSurfer", "level": "ROI"}, "smri_subcortical_volume": {"modality": "sMRI", "tool": "FreeSurfer", "level": "ROI"}, "smri_voxel": {"modality": "sMRI", "tool": "voxel", "level": "voxel"}, "pet_amyloid": {"modality": "PET", "tool": "florbetapir", "level": "ROI"}, "pet_tau": {"modality": "PET", "tool": "flortaucipir", "level": "ROI"}, "pet_fdg": {"modality": "PET", "tool": "FDG", "level": "ROI"}, "fmri_fc": {"modality": "fMRI", "tool": "task/resting", "level": "connectivity"}, "dti_fa": {"modality": "dMRI", "tool": "DTI", "level": "tract"}, "lesion_volume": {"modality": "sMRI", "tool": "MedSAM", "level": "ROI"}, "genetics": {"modality": "genetics", "tool": "APOE/GWAS", "level": "SNP"}, "medication": {"modality": "clinical", "tool": "medication_log", "level": "variable"}, }, "HCP_YA": { "smri_cortical_thickness": {"modality": "sMRI", "tool": "FreeSurfer", "level": "ROI"}, "smri_myelin": {"modality": "sMRI", "tool": "T1w/T2w", "level": "ROI"}, "smri_voxel": {"modality": "sMRI", "tool": "voxel", "level": "voxel"}, "rfmri_fc": {"modality": "fMRI", "tool": "rfMRI", "level": "connectivity"}, "tfmri_task":{"modality": "fMRI", "tool": "task fMRI","level": "activation"}, "dmri_sc": {"modality": "dMRI", "tool": "HARDI", "level": "connectivity"}, "meg": {"modality": "MEG", "tool": "MEG", "level": "connectivity"}, }, # NAS-available patient cohorts with preprocessed ROI time series. # Phenotype CSVs live under Z:\Dataset\fMRI\phenotype and the dataset- # specific rest csvs. All supply rfMRI volumes or ROI series; structural # T1 is available for HCP-EP and HCP-Aging (the other four are rfMRI-only # public releases). "ABIDE": { "rfmri_fc": {"modality": "fMRI", "tool": "rfMRI", "level": "connectivity"}, "rfmri_roi_ts": {"modality": "fMRI", "tool": "rfMRI", "level": "ROI"}, }, "ADHD200": { "rfmri_fc": {"modality": "fMRI", "tool": "rfMRI", "level": "connectivity"}, "rfmri_roi_ts": {"modality": "fMRI", "tool": "rfMRI", "level": "ROI"}, }, "COBRE": { "rfmri_fc": {"modality": "fMRI", "tool": "rfMRI", "level": "connectivity"}, "rfmri_roi_ts": {"modality": "fMRI", "tool": "rfMRI", "level": "ROI"}, }, "UCLA": { # UCLA CNP — rest + 6 task contrasts, cross-diagnosis cohort. "rfmri_fc": {"modality": "fMRI", "tool": "rfMRI", "level": "connectivity"}, "rfmri_roi_ts": {"modality": "fMRI", "tool": "rfMRI", "level": "ROI"}, "tfmri_task": {"modality": "fMRI", "tool": "task fMRI", "level": "activation"}, }, "HCP_EP": { # HCP Early Psychosis — patient cohort, T1w + rfMRI cleaned. "smri_cortical_thickness": {"modality": "sMRI", "tool": "FreeSurfer", "level": "ROI"}, "smri_subcortical_volume": {"modality": "sMRI", "tool": "FreeSurfer", "level": "ROI"}, "rfmri_fc": {"modality": "fMRI", "tool": "rfMRI", "level": "connectivity"}, "rfmri_roi_ts": {"modality": "fMRI", "tool": "rfMRI", "level": "ROI"}, }, "HCP_AGING": { # HCP-Aging — T1w + rfMRI REST1/REST2 + 3 task contrasts. "smri_cortical_thickness": {"modality": "sMRI", "tool": "FreeSurfer", "level": "ROI"}, "smri_subcortical_volume": {"modality": "sMRI", "tool": "FreeSurfer", "level": "ROI"}, "smri_myelin": {"modality": "sMRI", "tool": "T1w/T2w", "level": "ROI"}, "rfmri_fc": {"modality": "fMRI", "tool": "rfMRI", "level": "connectivity"}, "rfmri_roi_ts": {"modality": "fMRI", "tool": "rfMRI", "level": "ROI"}, "tfmri_task": {"modality": "fMRI", "tool": "task fMRI", "level": "activation"}, }, # ── Visual decoding (fMRI) ────────────────────────────────────────── # NSD & BOLD5000: image-stimulus visual task fMRI, no rest. "NSD": { "smri_cortical_thickness": {"modality": "sMRI", "tool": "FreeSurfer", "level": "ROI"}, "tfmri_visual_voxel": {"modality": "fMRI", "tool": "task fMRI", "level": "voxel", "stimulus": "natural_image"}, "tfmri_visual_roi": {"modality": "fMRI", "tool": "task fMRI", "level": "ROI", "stimulus": "natural_image"}, }, "BOLD5000": { "smri_cortical_thickness": {"modality": "sMRI", "tool": "FreeSurfer", "level": "ROI"}, "tfmri_visual_voxel": {"modality": "fMRI", "tool": "task fMRI", "level": "voxel", "stimulus": "ImageNet_COCO_Scene"}, "tfmri_visual_roi": {"modality": "fMRI", "tool": "task fMRI", "level": "ROI", "stimulus": "ImageNet_COCO_Scene"}, }, # ── Visual decoding (EEG) ─────────────────────────────────────────── "SEED_DV": { "eeg_psd": {"modality": "EEG", "tool": "PSD", "level": "channel"}, "eeg_de": {"modality": "EEG", "tool": "DE", "level": "channel"}, }, # ── Emotion decoding (EEG + eye tracking) ─────────────────────────── "SEED": { "eeg_de": {"modality": "EEG", "tool": "DE", "level": "channel"}, "eeg_psd": {"modality": "EEG", "tool": "PSD", "level": "channel"}, }, "SEED_IV": { "eeg_de": {"modality": "EEG", "tool": "DE", "level": "channel"}, "eye_movement": {"modality": "eye_tracking", "tool": "saccade/fixation", "level": "variable"}, }, "SEED_V": { "eeg_de": {"modality": "EEG", "tool": "DE", "level": "channel"}, "eye_movement": {"modality": "eye_tracking", "tool": "saccade/fixation", "level": "variable"}, }, "SEED_VII": { "eeg_de": {"modality": "EEG", "tool": "DE", "level": "channel"}, "eye_movement": {"modality": "eye_tracking", "tool": "saccade/fixation", "level": "variable"}, }, "SEED_GER": { "eeg_de": {"modality": "EEG", "tool": "DE", "level": "channel"}, "eye_movement": {"modality": "eye_tracking", "tool": "saccade/fixation", "level": "variable"}, }, "SEED_FRA": { "eeg_de": {"modality": "EEG", "tool": "DE", "level": "channel"}, "eye_movement": {"modality": "eye_tracking", "tool": "saccade/fixation", "level": "variable"}, }, # ── Vigilance decoding (EEG) ──────────────────────────────────────── "SEED_VIG": { "eeg_de": {"modality": "EEG", "tool": "DE", "level": "channel"}, "eog": {"modality": "EOG", "tool": "EOG", "level": "channel"}, "eye_movement": {"modality": "eye_tracking", "tool": "gaze/blink", "level": "variable"}, }, } DATASET_OUTCOMES = { "UKB": [ "disease_diagnosis", # ICD10 codes "mortality", # death registry "cognitive_score", # touchscreen cognitive tests "imaging_phenotype", # derived imaging phenotypes ], "ADNI": [ "diagnosis", # CN / MCI / AD "conversion", # MCI → AD conversion "cognitive_decline", # ADAS-Cog, MMSE decline "biomarker_status", # amyloid+/tau+ status ], "HCP_YA": [ "behavioral_score", # NIH Toolbox "cognitive_task", # task fMRI performance "personality", # NEO-FFI ], # ABIDE — ASD vs controls, rest only. "ABIDE": [ "diagnosis", # ASD vs TD "symptom_severity", # ADOS, ADI-R, SRS "cognitive_score", # FIQ/VIQ/PIQ ], # ADHD200 — ADHD subtype vs TDC. "ADHD200": [ "diagnosis", # ADHD (combined/inattentive/hyperactive) vs TDC "symptom_severity", # ADHD-RS, Conners "cognitive_score", # WASI/WISC ], # COBRE — schizophrenia vs controls. "COBRE": [ "diagnosis", # schizophrenia vs HC "symptom_severity", # PANSS positive/negative/general "cognitive_score", # WAIS ], # UCLA CNP — schizophrenia/bipolar/ADHD vs controls. "UCLA": [ "diagnosis", # SCZ / BP / ADHD / HC "symptom_severity", # HAM-D, YMRS, ADHD-RS "cognitive_task", # 6 task contrasts ], # HCP-EP — early psychosis (FES + AR) vs HC. "HCP_EP": [ "diagnosis", # affective/non-affective psychosis vs HC "symptom_severity", # PANSS, SANS, YMRS "cognitive_score", # MATRICS Consensus Cognitive Battery ], # HCP-Aging — lifespan 36-100 yrs, healthy aging. "HCP_AGING": [ "cognitive_decline", # NIH Toolbox across age "behavioral_score", # same battery as HCP-YA "cognitive_task", # CARIT/FACENAME/VISMOTOR ], # ── Visual decoding outcomes ──────────────────────────────────────── "NSD": [ "image_category", # COCO 80-class "image_semantic", # CLIP / language-model embedding "stimulus_reconstruction",# pixel / latent reconstruction ], "BOLD5000": [ "image_category", # ImageNet 1000-class / COCO / Scene "scene_type", # Scene 365-class "image_semantic", ], "SEED_DV": [ "video_class", # discrete video categories "video_semantic", "video_reconstruction", ], # ── Emotion decoding outcomes ─────────────────────────────────────── "SEED": ["emotion_3class"], # positive/neutral/negative "SEED_IV": ["emotion_4class"], # happy/sad/fear/neutral "SEED_V": ["emotion_5class"], # +disgust "SEED_VII": ["emotion_7class", "emotion_continuous"], "SEED_GER": ["emotion_3class"], "SEED_FRA": ["emotion_3class"], # ── Vigilance decoding outcomes ───────────────────────────────────── "SEED_VIG": ["vigilance_continuous", "perclos"], } # Imaging feature templates — dynamically combined with AAL atlas regions # {region} is replaced with actual neuroanatomy node names at generation time IMAGING_FEATURE_TEMPLATES = { # sMRI FreeSurfer ROI features "cortical thickness of {region}": {"modality": "sMRI", "tool": "FreeSurfer", "level": "ROI", "datasets": ["UKB", "ADNI", "HCP_YA", "HCP_EP", "HCP_AGING"]}, "gray matter volume of {region}": {"modality": "sMRI", "tool": "FreeSurfer", "level": "ROI", "datasets": ["UKB", "ADNI", "HCP_YA", "HCP_EP", "HCP_AGING"]}, "subcortical volume of {region}": {"modality": "sMRI", "tool": "FreeSurfer", "level": "ROI", "datasets": ["UKB", "ADNI", "HCP_YA", "HCP_EP", "HCP_AGING"]}, "cortical area of {region}": {"modality": "sMRI", "tool": "FreeSurfer", "level": "ROI", "datasets": ["UKB", "HCP_YA", "HCP_AGING"]}, # dMRI tract features "fractional anisotropy of {region}": {"modality": "dMRI", "tool": "TBSS", "level": "tract", "datasets": ["UKB", "HCP_YA"]}, "mean diffusivity of {region}": {"modality": "dMRI", "tool": "TBSS", "level": "tract", "datasets": ["UKB", "HCP_YA"]}, # PET ROI features (ADNI) "amyloid SUVR of {region}": {"modality": "PET", "tool": "florbetapir", "level": "ROI", "datasets": ["ADNI"]}, "tau SUVR of {region}": {"modality": "PET", "tool": "flortaucipir", "level": "ROI", "datasets": ["ADNI"]}, "FDG uptake of {region}": {"modality": "PET", "tool": "FDG", "level": "ROI", "datasets": ["ADNI"]}, # lesion segmentation "lesion volume of {region}": {"modality": "sMRI", "tool": "MedSAM", "level": "ROI", "datasets": ["UKB", "ADNI"]}, } # Connectivity feature templates — {a} and {b} are AAL regions CONNECTIVITY_FEATURE_TEMPLATES = { "functional connectivity between {a} and {b}": {"modality": "fMRI", "tool": "rfMRI", "level": "connectivity", "datasets": ["UKB", "ADNI", "HCP_YA", "ABIDE", "ADHD200", "COBRE", "UCLA", "HCP_EP", "HCP_AGING"]}, "effective connectivity from {a} to {b}": {"modality": "fMRI", "tool": "DCM/GC", "level": "connectivity", "datasets": ["ADNI", "HCP_YA", "UCLA", "HCP_EP", "HCP_AGING"]}, "structural connectivity between {a} and {b}": {"modality": "dMRI", "tool": "tractography", "level": "connectivity", "datasets": ["UKB", "HCP_YA"]}, } # Domain pairs for imaging-driven hypothesis generation # source domain → target domain, aligned with dataset modalities IMAGING_DOMAIN_PAIRS = [ # sMRI features → disease ("neuroanatomy", "disease"), # connectivity → disease ("connectivity", "disease"), # sMRI features → cognitive function ("neuroanatomy", "cognitive_function"), # gene → brain structure (UKB genetics + imaging) ("gene", "neuroanatomy"), # disease → drug (ADNI) ("disease", "drug"), ] # Brain decoding domain pairs (NSD / BOLD5000 / SEED family). # These are SEPARATE from IMAGING_DOMAIN_PAIRS because decoding hypotheses # reverse the usual direction: instead of "brain feature → clinical outcome", # they go "stimulus ↔ brain" or "brain → psychological-state label". DECODING_DOMAIN_PAIRS = [ # Encoding: stimulus drives brain response ("visual_stimulus", "neuroanatomy"), ("visual_stimulus", "imaging_feature"), ("visual_stimulus", "connectivity"), # Decoding: brain predicts stimulus identity ("neuroanatomy", "visual_stimulus"), ("imaging_feature", "visual_stimulus"), # EEG → emotion (SEED/SEED-IV/SEED-V/SEED-VII/SEED-GER/SEED-FRA) ("imaging_feature", "emotion"), ("neuroanatomy", "emotion"), # EEG → vigilance (SEED-VIG) ("imaging_feature", "vigilance"), ("neuroanatomy", "vigilance"), ] # AAL atlas regions used for imaging feature generation # Subset of neuroanatomy nodes from NN_AAL source _AAL_REGION_KEYWORDS = [ "Precentral", "Frontal_Sup", "Frontal_Mid", "Frontal_Inf", "Rolandic_Oper", "Supp_Motor", "Olfactory", "Frontal_Sup_Med", "Frontal_Med_Orb", "Rectus", "Insula", "Cingulate", "Hippocampus", "Parahippocampal", "Amygdala", "Calcarine", "Cuneus", "Lingual", "Occipital", "Fusiform", "Postcentral", "Parietal", "SupraMarginal", "Angular", "Precuneus", "Paracentral", "Caudate", "Putamen", "Pallidum", "Thalamus", "Heschl", "Temporal", "Temporal_Pole", ] # ── engine ───────────────────────────────────────────────────────────── class HypothesisEngine: """Batch-generate, persist, and rank testable hypotheses from a knowledge graph.""" def __init__(self, kg: KnowledgeGraph): self.kg = kg self.G = kg.G self._index = kg._index # Build claims index for frequency_boost: (subj, pred, obj) → [claim_meta, ...] self._claims_by_triple: dict[tuple[str, str, str], list[dict]] = {} for nid, node in self._index.items(): if "claim" not in node.domain_tags: continue meta = node.metadata key = (meta.get("subject_id", ""), meta.get("predicate", ""), meta.get("object_id", "")) if key[0] and key[2]: self._claims_by_triple.setdefault(key, []).append(meta) # ── batch generation ─────────────────────────────────────────────── def batch_generate( self, domain_pairs: Optional[list[tuple[str, str]]] = None, max_hops: int = 3, max_paths_per_pair: int = 5, max_seeds_per_domain: int = 50, ) -> list[Hypothesis]: """Batch-generate hypotheses across the entire graph. Strategy: for each domain pair, sample seed concepts from domain_a, find paths to concepts in domain_b within max_hops hops. """ if domain_pairs is None: domain_pairs = DEFAULT_DOMAIN_PAIRS all_hypotheses: list[Hypothesis] = [] seen_pairs: set[tuple[str, str]] = set() _hyp_counter = 0 for dom_a, dom_b in domain_pairs: logger.info(f"generating hypotheses: {dom_a} -> {dom_b}") seeds_a = self._sample_domain_nodes(dom_a, max_seeds_per_domain) targets_b = { nid for nid, data in self.G.nodes(data=True) if dom_b in data.get("domain_tags", []) and "claim" not in data.get("domain_tags", []) and nid not in PATH_IGNORE_NODE_IDS } for seed_id in seeds_a: if seed_id not in self.G: continue # BFS from seed try: reachable = nx.single_source_shortest_path( self.G, seed_id, cutoff=max_hops ) except nx.NetworkXError: continue # find targets in domain_b candidates = [ nid for nid in reachable if nid in targets_b and nid != seed_id ] pair_count = 0 for target_id in candidates: pair_key = tuple(sorted([seed_id, target_id])) if pair_key in seen_pairs: continue seen_pairs.add(pair_key) raw_path = reachable[target_id] links = self._enrich_path(raw_path) if not links: continue conf = self._compute_confidence_score(links) nov = self._compute_novelty_score(links) evi = self._compute_evidence_score(links) test, test_reason = self._compute_testability_score(links) claim_ids = [l.claim_id for l in links if l.claim_id] _hyp_counter += 1 h = Hypothesis( id=f"HYP:{_hyp_counter:06d}", hypothesis_type="bridge", source_id=seed_id, source_name=self._index[seed_id].preferred_name, target_id=target_id, target_name=self._index[target_id].preferred_name, path=links, confidence_score=conf, novelty_score=nov, evidence_score=evi, testability_score=test, composite_score=0.0, # set below supporting_claims=claim_ids, testability_reason=test_reason, metadata={"domain_a": dom_a, "domain_b": dom_b}, ) h.explanation = self._generate_explanation(h) h.composite_score = self._composite_score(h) all_hypotheses.append(h) pair_count += 1 if pair_count >= max_paths_per_pair: break logger.info(f"batch generation complete: {len(all_hypotheses)} hypotheses from {len(domain_pairs)} domain pairs") all_hypotheses = self.post_process(all_hypotheses) return all_hypotheses def post_process( self, hypotheses: list[Hypothesis], min_hops: int = 2, filter_vague_relations: bool = True, filter_non_measurable: bool = True, max_hops_filter: int = 5, ) -> list[Hypothesis]: """Filter low-quality hypotheses after generation. Filters: 1. Noisy entities — source/target name matches NOISE_PATTERNS 2. 1-hop hypotheses — too simple, just restates existing edges 3. Vague relations — all links are is_associated_with / associated_with / about 4. Non-measurable biomarkers — entities not directly measurable from brain imaging 5. Pure association chains — no directional predicates (causes/treats/increases/etc.) 6. Overly long paths — exceeds max_hops_filter (default 5) to reduce noise accumulation """ before = len(hypotheses) filtered = [] for h in hypotheses: # filter noisy entities (source, target, and all intermediate nodes) all_names = {h.source_name, h.target_name} for link in h.path: all_names.add(link.from_name) all_names.add(link.to_name) if any(self._is_noisy_entity(name) for name in all_names): continue # filter 1-hop (single direct edge = no discovery value) if len(h.path) < min_hops: continue # filter all-vague-relations if filter_vague_relations: relation_types = {l.relation_type for l in h.path} if relation_types and relation_types <= VAGUE_RELATIONS: continue # filter single-PMID bridges (all hops cite the same paper = not a real bridge) if len(h.path) >= 2: pmids = set() for link in h.path: pmid = link.source_paper.get("pmid", "") if isinstance(link.source_paper, dict) else "" if pmid: pmids.add(pmid) if len(pmids) == 1: continue # filter non-measurable biomarkers (not testable from imaging) if filter_non_measurable: if self._has_non_measurable_entity(h): continue # filter biologically implausible paths (brain region → non-neurological target) if self._has_implausible_path(h): continue # filter paths with weak evidence (target not mentioned in raw_text) if self._has_weak_evidence(h): continue # filter paths where both ends of any edge are broad hubs # ("Brain Diseases --causes--> Cognitive Dysfunction" is uninformative) if self._has_hub_to_hub_edge(h): continue # filter paths touching any vague COGAT/MeSH umbrella hub # (memory/logic/loss/activation/risk/stress/Brain/Neurons). # These nodes are too abstract to drive a DL experiment whether # they appear as source, target, or intermediate. if self._touches_path_ignore_node(h): continue # filter paths that transit through disease mega-hubs as # intermediate nodes (A → Disease → B is uninformative). # These nodes are still valid as source/target endpoints. if self._transits_intermediate_only_hub(h): continue # (C-1) filter paths whose INTERMEDIATE node is a generic # phrase ("neural activity", "disease progression", "grey # matter", ...). Endpoints are not checked here. if self._has_intermediate_generic_phrase(h): continue # (C-2) filter paths whose directional density is too thin # (3+ hops with < 50% directional relations = too vague to # be a mechanism hypothesis). if self._has_thin_directional_density(h): continue # filter: target must be a dataset outcome (diagnosis/cognition/behavior/ # personality/motor). Predicting "White Matter" or "Neurons" is not a # hypothesis UKB/ADNI/HCP can directly test — those are imaging features # used as INPUTS, not outcomes. if not self._is_dataset_outcome(h): continue # (C-3) filter: target name is an umbrella concept ("skill", # "disease", "neurological disorder", "clinical features") # even though it passes the outcome keyword check. These # can't anchor a concrete DL label. if self._is_too_broad_target(h.target_name): continue # filter paths with no directional predicates (pure association chains) if len(h.path) >= 2: relation_types = {l.relation_type for l in h.path} if not (relation_types & DIRECTIONAL_RELATIONS): continue # filter paths that exceed max hop length (noise accumulation) if len(h.path) > max_hops_filter: continue filtered.append(h) # Deduplicate: for each (source, target) pair, keep top 2 by composite score from collections import defaultdict pair_groups = defaultdict(list) for h in filtered: key = (h.source_id, h.target_id) pair_groups[key].append(h) deduplicated = [] for key, group in pair_groups.items(): # Sort by composite score descending group.sort(key=lambda x: x.composite_score, reverse=True) # Keep top 2 (or 1 if only one exists) deduplicated.extend(group[:2]) logger.info(f"post_process: {before} -> {len(filtered)} filtered -> {len(deduplicated)} deduplicated " f"(removed {before - len(deduplicated)} total)") return deduplicated def _has_non_measurable_entity(self, h: Hypothesis) -> bool: """Check if hypothesis involves entities not measurable from brain imaging. Filters out hypotheses where source or target is: - A non-measurable domain (neurotransmitter levels, protein expression, etc.) - Matches non-measurable entity name patterns (CSF markers, blood markers, etc.) """ for node_name, node_id in [(h.source_name, h.source_id), (h.target_name, h.target_id)]: # check domain tags node = self._index.get(node_id) if node: domains = set(node.domain_tags) - {"claim"} # allow neurotransmitter/protein as intermediate hops only if source or target is neuroanatomy if domains & NON_MEASURABLE_BIOMARKER_TYPES: # check if the OTHER end is a brain region (then it's a valid "X affects brain" hypothesis) other_name = h.target_name if node_name == h.source_name else h.source_name other_id = h.target_id if node_name == h.source_name else h.source_id other_node = self._index.get(other_id) if other_node and "neuroanatomy" not in other_node.domain_tags: return True # check name patterns for pattern in _NON_MEASURABLE_PATTERNS: if pattern.search(node_name): return True return False @staticmethod def _is_noisy_entity(name: str) -> bool: """Check if an entity name matches known noise patterns.""" if not name or len(name.strip()) == 0: return True name_clean = name.strip() for pattern in NOISE_PATTERNS: if pattern.match(name_clean): return True # check if name contains any noise word words = set(re.split(r"[\s\-_,/]+", name_clean.lower())) if words & _NOISE_WORDS: return True return False @staticmethod def _is_generic_intermediate(name: str) -> bool: """(C-1) Phrase-level filter for intermediate node names that pass token-level `_NOISE_WORDS` but are still too vague. Examples that get blocked: - "neural activity" (no individual noise token) - "functional connectivity" (legit metric but not a mechanism) - "disease progression" - "grey matter" (umbrella) - "cognitive deficit" Only call on intermediate nodes — these phrases can be valid as endpoints (e.g. "functional connectivity" as a target metric). """ if not name: return True s = name.strip() for pattern in _GENERIC_INTERMEDIATE_PATTERNS: if pattern.match(s): return True return False @staticmethod def _is_too_broad_target(name: str) -> bool: """(C-3) Block target names that pass the outcome keyword regex but are umbrella concepts ("disease", "skill", "neurological disorder", "clinical features"). A DL experiment can't be designed against these — you don't know which subtype to label. """ if not name: return True s = name.strip() for pattern in _TARGET_TOO_BROAD_PATTERNS: if pattern.match(s): return True return False def _has_intermediate_generic_phrase(self, h: Hypothesis) -> bool: """(C-1) Reject paths whose intermediate node is a generic phrase like "neural activity" or "disease progression". Endpoints are excluded from this check because some metrics (e.g. "functional connectivity") legitimately appear as outcomes. """ if len(h.path) < 2: return False intermediate_names: list[str] = [] for i, link in enumerate(h.path): # link.from_name is intermediate when i >= 1 # link.to_name is intermediate when i < len(path) - 1 if i >= 1: intermediate_names.append(link.from_name or "") if i < len(h.path) - 1: intermediate_names.append(link.to_name or "") for name in intermediate_names: if self._is_generic_intermediate(name): return True return False def _has_thin_directional_density(self, h: Hypothesis) -> bool: """(C-2) Reject paths where directional relations are too sparse. Current rule (older): >= 1 directional anywhere = pass. Problem: a 4-hop path with 1 directional + 3 vague edges still looks like a real chain to scoring but is essentially a vague association narrative. New rule: - 1-2 hop path: at least 1 directional (unchanged) - 3+ hop path: at least half of the edges must be directional """ n = len(h.path) if n < 3: return False directional = sum(1 for l in h.path if l.relation_type in DIRECTIONAL_RELATIONS) return directional * 2 < n # < 50% directional def _has_implausible_path(self, h: Hypothesis) -> bool: """Check if hypothesis path has biologically implausible connections. Filters paths where a brain region directly predicts a non-neurological condition (e.g., amygdala → urinary incontinence) without a plausible intermediate neurological mechanism. """ # Check if source is a brain region and target is non-neurological source_node = self._index.get(h.source_id) target_node = self._index.get(h.target_id) if not source_node or not target_node: return False source_is_brain = "neuroanatomy" in source_node.domain_tags target_is_neuro = any(d in target_node.domain_tags for d in ["neuroanatomy", "disease", "cognitive_function", "biomarker", "gene", "drug", "neurotransmitter"]) # If source is brain region and target is non-neurological, check target name if source_is_brain and not target_is_neuro: if _NON_NEUROLOGICAL_TARGETS.search(h.target_name): return True # Also check intermediate nodes in the path for link in h.path: if _NON_NEUROLOGICAL_TARGETS.search(link.to_name): # Check if the previous node is a brain region prev_node = self._index.get(link.from_id) if prev_node and "neuroanatomy" in prev_node.domain_tags: # Only filter if there's no disease intermediate has_disease_intermediate = any( "disease" in self._index.get(l.from_id, ConceptNode(id="", preferred_name="")).domain_tags for l in h.path[:h.path.index(link)] ) if not has_disease_intermediate: return True return False def _has_hub_to_hub_edge(self, h: Hypothesis) -> bool: """Reject paths containing any edge whose endpoints are both broad hubs. Example: "Brain Diseases --causes--> Cognitive Dysfunction" — both ends are top-level categories; the edge is too generic to be a mechanistic step in a hypothesis. Hub set is the top-N nodes by non-'about' degree, computed once and cached. Uses a low bar (N=50) because hubs are self-evidently generic. """ if not hasattr(self, "_hub_id_set"): # Build once per engine instance from collections import Counter degree = Counter() for u, v, data in self.G.edges(data=True): if data.get("relation_type") != "about": degree[u] += 1 degree[v] += 1 top = degree.most_common(50) self._hub_id_set = {cid for cid, _ in top} for link in h.path: if link.from_id in self._hub_id_set and link.to_id in self._hub_id_set: return True return False def _touches_path_ignore_node(self, h: Hypothesis) -> bool: """Reject paths whose source, target, or any intermediate node is in PATH_IGNORE_NODE_IDS (vague COGAT/MeSH umbrella hubs). Catches concepts the token-based _is_noisy_entity misses because the names ("memory", "logic", "Brain", "Neurons") are legitimate English words but the KG concept id refers to an over-general umbrella that's not testable. """ if h.source_id in PATH_IGNORE_NODE_IDS: return True if h.target_id in PATH_IGNORE_NODE_IDS: return True for link in h.path: if link.from_id in PATH_IGNORE_NODE_IDS: return True if link.to_id in PATH_IGNORE_NODE_IDS: return True return False @staticmethod def _transits_intermediate_only_hub(h: Hypothesis) -> bool: """Reject paths that use disease mega-hubs as intermediate transit. INTERMEDIATE_ONLY_IGNORE_IDS nodes are valid as source/target (predicting Alzheimer is a real hypothesis) but not as middle hops (A → Alzheimer → B is just "both relate to AD"). """ if len(h.path) < 2: return False for i, link in enumerate(h.path): if i >= 1 and link.from_id in INTERMEDIATE_ONLY_IGNORE_IDS: return True if i < len(h.path) - 1 and link.to_id in INTERMEDIATE_ONLY_IGNORE_IDS: return True return False def _is_dataset_outcome(self, h: Hypothesis) -> bool: """Check if target is a UKB/ADNI/HCP-testable outcome. The goal of our hypotheses is to predict SOMETHING from brain imaging. Valid targets: - Clinical diagnoses (disease domain) — Alzheimer, MCI, schizophrenia, etc. - Cognitive/behavioral/personality measures (cognitive_function domain) - Brain decoding targets: * neuroanatomy (for encoding: stimulus → brain activation) * visual_stimulus (for decoding: brain → stimulus category) * emotion (SEED family: EEG → affect label) * vigilance (SEED-VIG: EEG → alertness) Invalid targets: - Molecular entities (gene, biomarker, drug, neurotransmitter) — these may be predictors, not predicted quantities - Overly generic disease categories (Brain Diseases, Mental Disorders) — already filtered by hub-to-hub, but double-check by keyword. Accepts target if EITHER: a) target's domain is in _OUTCOME_DOMAINS ∪ decoding domains, OR b) target name matches _OUTCOME_KEYWORDS regex (as fallback for claim_extraction concepts whose domain may be uncertain) """ target = self._index.get(h.target_id) if target is None: return False domains = set(target.domain_tags) # Accept: disease, cognitive_function, or decoding-target domains outcome_domains = _OUTCOME_DOMAINS | {"visual_stimulus", "emotion", "vigilance"} if domains & outcome_domains: return True # Accept: neuroanatomy targets when the hypothesis is a brain-decoding # encoding path (stimulus → brain region). Excludes the clinical- # prediction case where a target of 'White Matter' would be an input. if "neuroanatomy" in domains: source = self._index.get(h.source_id) if source: source_domains = set(source.domain_tags) if source_domains & {"visual_stimulus", "emotion", "vigilance"}: return True # Fallback: outcome keyword match (catches claim_extraction concepts # that describe outcomes but have wrong domain tags) if _OUTCOME_KEYWORDS.search(h.target_name): return True return False def _has_weak_evidence(self, h: Hypothesis) -> bool: """Check if hypothesis path has weak evidence (target not mentioned in raw_text). For hypotheses where the target is a specific brain region, check if any hop's raw_text actually mentions that region. If not, the path is likely spurious (e.g., IL-1β → Internal Capsula where the evidence text talks about "grey matter" but never mentions internal capsule). Exception: paths anchored by curated functional facts (e.g. `evokes` from visual_stimulus to a functional ROI) carry programmatic confidence, not paper evidence — skip the raw_text requirement for them. """ target_node = self._index.get(h.target_id) if not target_node or "neuroanatomy" not in target_node.domain_tags: return False # Skip paths whose source is a visual_stimulus / emotion / vigilance node, or # which contain at least one curated functional edge (evokes / decoded_from / # elicits). These are seeded from neuroscience textbooks, not paper claims. source_node = self._index.get(h.source_id) if source_node: decoding_domains = {"visual_stimulus", "emotion", "vigilance"} if any(t in decoding_domains for t in source_node.domain_tags): return False if any(l.relation_type in {"evokes", "decoded_from", "elicits"} for l in h.path): return False # Extract key terms from target name (e.g., "Internal Capsula" → ["internal", "capsula"]) target_terms = set(re.findall(r'\b\w{4,}\b', h.target_name.lower())) if not target_terms: return False # Check if any hop mentions the target region for link in h.path: raw = link.raw_text or link.evidence.get("raw_text", "") if isinstance(link.evidence, dict) else "" if raw: raw_lower = raw.lower() # If any target term appears in raw_text, evidence is OK if any(term in raw_lower for term in target_terms): return False # No hop mentions the target region → weak evidence logger.debug(f"weak evidence: {h.id} target '{h.target_name}' not mentioned in any raw_text") return True # ── imaging-driven batch generation ────────────────────────────── def batch_generate_imaging( self, dataset: str = "UKB", max_paths_per_pair: int = 5, max_seeds: int = 50, max_hops: int = 3, include_connectivity: bool = True, ) -> list[Hypothesis]: """Generate hypotheses driven by imaging features available in a dataset. Strategy: 1. Find AAL atlas neuroanatomy nodes in the graph as ROI seeds 2. For each ROI × imaging feature template, construct a feature name (e.g., "cortical thickness of Hippocampus_L") 3. Find graph paths from each ROI to disease/cognitive_function nodes 4. Filter using expanded exclusion rules 5. Annotate each hypothesis with dataset metadata """ dataset_key = dataset.upper().replace("-", "_") if dataset_key not in DATASET_FEATURES: raise ValueError(f"Unknown dataset: {dataset}. Available: {list(DATASET_FEATURES.keys())}") ds_features = DATASET_FEATURES[dataset_key] ds_outcomes = DATASET_OUTCOMES.get(dataset_key, []) # 1. Find AAL atlas ROI nodes aal_nodes = self._find_aal_regions(max_seeds) if not aal_nodes: logger.warning("No AAL atlas regions found in graph") return [] logger.info(f"Found {len(aal_nodes)} AAL regions for imaging hypothesis generation") # 2. Collect outcome nodes (disease, cognitive_function) outcome_nodes = self._collect_outcome_nodes() if not outcome_nodes: logger.warning("No outcome nodes (disease/cognitive_function) found") return [] # 3. Determine which imaging templates apply to this dataset applicable_templates = { name: meta for name, meta in IMAGING_FEATURE_TEMPLATES.items() if dataset_key in meta["datasets"] } all_hypotheses: list[Hypothesis] = [] _hyp_counter = 0 seen_pairs: set[tuple[str, str]] = set() # 4. Generate ROI-level imaging hypotheses for region_id, region_name in aal_nodes.items(): for feat_template, feat_meta in applicable_templates.items(): feature_name = feat_template.replace("{region}", region_name) # Find paths from this ROI to outcomes try: reachable = nx.single_source_shortest_path( self.G, region_id, cutoff=max_hops ) except nx.NetworkXError: continue candidates = [ nid for nid in reachable if nid in outcome_nodes and nid != region_id ] pair_count = 0 for target_id in candidates: pair_key = (region_id, target_id, feat_template) if pair_key in seen_pairs: continue seen_pairs.add(pair_key) raw_path = reachable[target_id] links = self._enrich_path(raw_path) if not links: continue # Skip if path contains non-measurable entities if self._path_has_non_measurable(links): continue conf = self._compute_confidence_score(links) nov = self._compute_novelty_score(links) evi = self._compute_evidence_score(links) test, test_reason = self._compute_testability_score(links) # Boost testability for imaging-driven hypotheses test = min(test + 0.15, 1.0) claim_ids = [l.claim_id for l in links if l.claim_id] _hyp_counter += 1 target_node = self._index.get(target_id) h = Hypothesis( id=f"HYP:IMG:{_hyp_counter:06d}", hypothesis_type="imaging", source_id=region_id, source_name=feature_name, target_id=target_id, target_name=target_node.preferred_name if target_node else target_id, path=links, confidence_score=conf, novelty_score=nov, evidence_score=evi, testability_score=test, composite_score=0.0, supporting_claims=claim_ids, testability_reason=test_reason, metadata={ "dataset": dataset_key, "input_modality": feat_meta["modality"], "input_feature": feature_name, "input_level": feat_meta["level"], "input_tool": feat_meta["tool"], "input_region": region_name, "outcome_type": self._classify_outcome(target_node), }, ) h.explanation = self._generate_explanation(h) h.composite_score = self._composite_score(h) all_hypotheses.append(h) pair_count += 1 if pair_count >= max_paths_per_pair: break # 5. Generate connectivity-level hypotheses if include_connectivity: conn_templates = { name: meta for name, meta in CONNECTIVITY_FEATURE_TEMPLATES.items() if dataset_key in meta["datasets"] } if conn_templates: hyps = self._generate_connectivity_hypotheses( aal_nodes, outcome_nodes, conn_templates, dataset_key, max_paths_per_pair, max_hops, _hyp_counter, seen_pairs, ) _hyp_counter += len(hyps) all_hypotheses.extend(hyps) logger.info( f"imaging batch generation ({dataset_key}): " f"{len(all_hypotheses)} hypotheses from {len(aal_nodes)} regions" ) all_hypotheses = self.post_process(all_hypotheses) return all_hypotheses def _find_aal_regions(self, max_n: int) -> dict[str, str]: """Find AAL atlas neuroanatomy nodes. Returns {node_id: region_name}.""" candidates = {} for nid, data in self.G.nodes(data=True): if "neuroanatomy" not in data.get("domain_tags", []): continue name = data.get("preferred_name", "") # Match against AAL region keywords name_lower = name.lower() for kw in _AAL_REGION_KEYWORDS: if kw.lower() in name_lower: candidates[nid] = name break # Sort by degree (more connected = richer paths) sorted_items = sorted( candidates.items(), key=lambda item: self.G.degree(item[0]), reverse=True, ) return dict(sorted_items[:max_n]) def _collect_outcome_nodes(self) -> set[str]: """Collect all disease + cognitive_function nodes as potential outcomes.""" outcome_ids = set() for nid, data in self.G.nodes(data=True): domains = set(data.get("domain_tags", [])) if "claim" in domains: continue if nid in PATH_IGNORE_NODE_IDS: continue if domains & {"disease", "cognitive_function"}: outcome_ids.add(nid) return outcome_ids def _classify_outcome(self, node: Optional[ConceptNode]) -> str: """Classify outcome node type for metadata.""" if not node: return "unknown" domains = set(node.domain_tags) if "disease" in domains: return "disease" if "cognitive_function" in domains: return "cognitive_function" if "biomarker" in domains: return "biomarker" return "other" def _path_has_non_measurable(self, links: list[HypothesisLink]) -> bool: """Check if any intermediate node in the path is non-measurable.""" for link in links: for name, nid in [(link.from_name, link.from_id), (link.to_name, link.to_id)]: node = self._index.get(nid) if node: domains = set(node.domain_tags) - {"claim"} if domains & NON_MEASURABLE_BIOMARKER_TYPES: return True for pattern in _NON_MEASURABLE_PATTERNS: if pattern.search(name): return True return False def _generate_connectivity_hypotheses( self, aal_nodes: dict[str, str], outcome_nodes: set[str], conn_templates: dict, dataset_key: str, max_paths_per_pair: int, max_hops: int, hyp_counter_start: int, seen_pairs: set, ) -> list[Hypothesis]: """Generate hypotheses for connectivity features (FC/EC/SC between region pairs).""" hypotheses = [] counter = hyp_counter_start region_ids = list(aal_nodes.keys()) # Sample region pairs (limit to avoid O(n^2) explosion) max_pairs = min(len(region_ids) * 3, 200) import random if len(region_ids) > 20: sampled_pairs = [] for _ in range(max_pairs): a, b = random.sample(region_ids, 2) sampled_pairs.append((a, b)) else: sampled_pairs = [(a, b) for i, a in enumerate(region_ids) for b in region_ids[i+1:]] sampled_pairs = sampled_pairs[:max_pairs] for region_a_id, region_b_id in sampled_pairs: name_a = aal_nodes[region_a_id] name_b = aal_nodes[region_b_id] for feat_template, feat_meta in conn_templates.items(): feature_name = feat_template.replace("{a}", name_a).replace("{b}", name_b) # Find paths from region_a to outcomes (potentially through region_b) try: reachable = nx.single_source_shortest_path( self.G, region_a_id, cutoff=max_hops ) except nx.NetworkXError: continue candidates = [ nid for nid in reachable if nid in outcome_nodes and nid != region_a_id ] pair_count = 0 for target_id in candidates: pair_key = (region_a_id, target_id, feat_template) if pair_key in seen_pairs: continue seen_pairs.add(pair_key) raw_path = reachable[target_id] links = self._enrich_path(raw_path) if not links: continue if self._path_has_non_measurable(links): continue conf = self._compute_confidence_score(links) nov = self._compute_novelty_score(links) evi = self._compute_evidence_score(links) test, test_reason = self._compute_testability_score(links) test = min(test + 0.15, 1.0) claim_ids = [l.claim_id for l in links if l.claim_id] counter += 1 target_node = self._index.get(target_id) h = Hypothesis( id=f"HYP:IMG:{counter:06d}", hypothesis_type="imaging_connectivity", source_id=region_a_id, source_name=feature_name, target_id=target_id, target_name=target_node.preferred_name if target_node else target_id, path=links, confidence_score=conf, novelty_score=nov, evidence_score=evi, testability_score=test, composite_score=0.0, supporting_claims=claim_ids, testability_reason=test_reason, metadata={ "dataset": dataset_key, "input_modality": feat_meta["modality"], "input_feature": feature_name, "input_level": feat_meta["level"], "input_tool": feat_meta["tool"], "input_region_a": name_a, "input_region_b": name_b, "input_region": f"{name_a} - {name_b}", "outcome_type": self._classify_outcome(target_node), }, ) h.explanation = self._generate_explanation(h) h.composite_score = self._composite_score(h) hypotheses.append(h) pair_count += 1 if pair_count >= max_paths_per_pair: break return hypotheses # ── persistence ──────────────────────────────────────────────────── def save_hypotheses(self, hypotheses: list[Hypothesis], path: str | Path) -> None: """Save hypotheses to JSON.""" path = Path(path) path.parent.mkdir(parents=True, exist_ok=True) data = { "n_hypotheses": len(hypotheses), "hypotheses": [h.to_dict() for h in hypotheses], } path.write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8") logger.info(f"saved {len(hypotheses)} hypotheses to {path}") def load_hypotheses(self, path: str | Path) -> list[Hypothesis]: """Load hypotheses from JSON.""" path = Path(path) data = json.loads(path.read_text(encoding="utf-8")) hypotheses = [Hypothesis.from_dict(h) for h in data["hypotheses"]] logger.info(f"loaded {len(hypotheses)} hypotheses from {path}") return hypotheses # ── ranking ──────────────────────────────────────────────────────── def rank_hypotheses( self, hypotheses: list[Hypothesis], weights: Optional[dict[str, float]] = None, top_n: int = 100, skip_post_process: bool = False, ) -> list[Hypothesis]: """Rank hypotheses by composite score (novelty, evidence, testability, confidence). Args: hypotheses: list of hypotheses to rank weights: custom weights dict, keys: confidence, evidence, novelty, testability top_n: return top N results skip_post_process: if True, skip the post-processing filter """ if not skip_post_process: hypotheses = self.post_process(hypotheses) if weights is None: # testability weighted highest — must be verifiable with imaging experiments weights = { "confidence": 0.20, "evidence": 0.20, "novelty": 0.25, "testability": 0.35, } for h in hypotheses: h.composite_score = ( (h.confidence_score ** weights["confidence"]) * (h.evidence_score ** weights["evidence"]) * (h.novelty_score ** weights["novelty"]) * (max(h.testability_score, 0.01) ** weights["testability"]) ) hypotheses.sort(key=lambda h: h.composite_score, reverse=True) return hypotheses[:top_n] # ── query-based (kept for interactive use) ───────────────────────── def find_paths( self, source_id: str, target_id: str, max_hops: int = 3, max_paths: int = 20, ) -> list[Hypothesis]: """Find hypothesis paths between two concepts with evidence enrichment.""" if source_id not in self.G or target_id not in self.G: return [] claim_nodes = {nid for nid, n in self._index.items() if "claim" in n.domain_tags} intermediate_exclude = claim_nodes - {source_id, target_id} # Also strip vague umbrella hubs from the search subgraph so paths # never include them as intermediates. Endpoints are excluded from # the strip so a caller can still query them directly. intermediate_exclude |= (PATH_IGNORE_NODE_IDS - {source_id, target_id}) subgraph = self.G.copy() subgraph.remove_nodes_from(intermediate_exclude) if source_id not in subgraph or target_id not in subgraph: return [] try: raw_paths = list(nx.all_simple_paths( subgraph, source_id, target_id, cutoff=max_hops )) except nx.NetworkXError: return [] raw_paths = raw_paths[:max_paths] return self._build_hypotheses_from_paths(raw_paths, "path") def bridge_discovery( self, concept_id: str, target_domain: str, max_hops: int = 3, max_results: int = 20, ) -> list[Hypothesis]: """Find cross-domain connections through intermediate claims.""" if concept_id not in self.G: return [] target_nodes = { nid for nid, data in self.G.nodes(data=True) if target_domain in data.get("domain_tags", []) } if not target_nodes: return [] try: reachable = nx.single_source_shortest_path( self.G, concept_id, cutoff=max_hops ) except nx.NetworkXError: return [] candidates = { nid for nid in reachable if nid in target_nodes and nid != concept_id and "claim" not in self._index.get(nid, ConceptNode(id="", preferred_name="")).domain_tags } hypotheses = [] for target_id in candidates: raw_path = reachable[target_id] links = self._enrich_path(raw_path) if not links: continue conf = self._compute_confidence_score(links) nov = self._compute_novelty_score(links) evi = self._compute_evidence_score(links) test, test_reason = self._compute_testability_score(links) claim_ids = [l.claim_id for l in links if l.claim_id] h = Hypothesis( hypothesis_type="bridge", source_id=concept_id, source_name=self._index[concept_id].preferred_name, target_id=target_id, target_name=self._index[target_id].preferred_name, path=links, confidence_score=conf, novelty_score=nov, evidence_score=evi, testability_score=test, supporting_claims=claim_ids, testability_reason=test_reason, ) h.explanation = self._generate_explanation(h) h.composite_score = self._composite_score(h) hypotheses.append(h) hypotheses.sort(key=lambda h: h.composite_score, reverse=True) return hypotheses[:max_results] def discover_hypotheses( self, concept_id: str, max_hops: int = 3, max_results: int = 30, exclude_domains: Optional[set[str]] = None, ) -> list[Hypothesis]: """Find hypotheses radiating from a single concept to all reachable domains.""" if concept_id not in self.G: return [] exclude = exclude_domains or {"claim"} source_node = self._index.get(concept_id) source_domains = set(source_node.domain_tags) - exclude if source_node else set() try: reachable = nx.single_source_shortest_path(self.G, concept_id, cutoff=max_hops) except nx.NetworkXError: return [] candidates = [] for target_id, raw_path in reachable.items(): if target_id == concept_id: continue target_node = self._index.get(target_id) if not target_node: continue target_domains = set(target_node.domain_tags) - exclude if not target_domains or target_domains <= source_domains: continue candidates.append((target_id, raw_path)) hypotheses = [] for target_id, raw_path in candidates: links = self._enrich_path(raw_path) if not links: continue conf = self._compute_confidence_score(links) nov = self._compute_novelty_score(links) evi = self._compute_evidence_score(links) test, test_reason = self._compute_testability_score(links) claim_ids = [l.claim_id for l in links if l.claim_id] h = Hypothesis( hypothesis_type="discover", source_id=concept_id, source_name=self._index[concept_id].preferred_name, target_id=target_id, target_name=self._index[target_id].preferred_name, path=links, confidence_score=conf, novelty_score=nov, evidence_score=evi, testability_score=test, supporting_claims=claim_ids, testability_reason=test_reason, ) h.explanation = self._generate_explanation(h) h.composite_score = self._composite_score(h) hypotheses.append(h) hypotheses = self.post_process(hypotheses) hypotheses.sort(key=lambda h: h.composite_score, reverse=True) return hypotheses[:max_results] def find_trending( self, since_year: int = 2020, min_claims: int = 3, direction: str = "strengthening", max_results: int = 30, ) -> list[dict]: """Find concept pairs with strengthening/weakening evidence over time. Returns list of dicts with: concept_a, concept_b, years, slope, direction, claims. """ from collections import Counter # Group claims by (subject, object) claim_groups: dict[tuple[str, str], list[dict]] = {} for nid, node in self._index.items(): if "claim" not in node.domain_tags: continue meta = node.metadata sid = meta.get("subject_id", "") oid = meta.get("object_id", "") if not sid or not oid: continue key = (sid, oid) claim_groups.setdefault(key, []).append(meta) results = [] for (sid, oid), claims in claim_groups.items(): years = [] for c in claims: sp = c.get("source_paper", {}) y = sp.get("year") if y and y >= since_year: years.append(y) if len(years) < min_claims: continue year_counts = Counter(years) ys = sorted(year_counts.keys()) cs = [year_counts[y] for y in ys] slope = _simple_slope(ys, cs) if direction == "strengthening" and slope <= 0.3: continue if direction == "weakening" and slope >= -0.3: continue if direction == "emerging" and max(ys) < 2025: continue src_node = self._index.get(sid) tgt_node = self._index.get(oid) results.append({ "concept_a": src_node.preferred_name if src_node else sid, "concept_b": tgt_node.preferred_name if tgt_node else oid, "concept_a_id": sid, "concept_b_id": oid, "year_counts": {str(y): year_counts[y] for y in ys}, "slope": round(slope, 3), "direction": direction, "n_claims": len(claims), }) results.sort(key=lambda r: abs(r["slope"]), reverse=True) return results[:max_results] def contradiction_detection( self, domain_filter: Optional[str] = None, max_results: int = 50, ) -> list[Contradiction]: """Find pairs of claims that assert opposite things about the same concept pair.""" claim_lookup: dict[tuple[str, str], list[ConceptNode]] = {} for nid, node in self._index.items(): if "claim" not in node.domain_tags: continue meta = node.metadata sid = meta.get("subject_id", "") oid = meta.get("object_id", "") if not sid or not oid: continue if domain_filter: src_node = self._index.get(sid) tgt_node = self._index.get(oid) domains = set() if src_node: domains.update(src_node.domain_tags) if tgt_node: domains.update(tgt_node.domain_tags) if domain_filter not in domains: continue key = (sid, oid) claim_lookup.setdefault(key, []).append(node) contradictions = [] for (sid, oid), claims in claim_lookup.items(): if len(claims) < 2: continue for i in range(len(claims)): for j in range(i + 1, len(claims)): c1, c2 = claims[i], claims[j] m1, m2 = c1.metadata, c2.metadata severity = self._check_contradiction(m1, m2) if severity > 0: contradictions.append(Contradiction( concept_a_id=sid, concept_a_name=m1.get("subject_name", sid), concept_b_id=oid, concept_b_name=m1.get("object_name", oid), claim_for_id=c1.id, claim_for_predicate=m1.get("predicate", ""), claim_for_text=m1.get("raw_text", ""), claim_against_id=c2.id, claim_against_predicate=m2.get("predicate", ""), claim_against_text=m2.get("raw_text", ""), severity=severity, )) contradictions.sort(key=lambda c: c.severity, reverse=True) return contradictions[:max_results] def gap_detection( self, domain_a: str, domain_b: Optional[str] = None, max_results: int = 50, ) -> list[Gap]: """Find concept pairs 2 hops apart with no direct edge.""" if domain_b is None: domain_b = domain_a nodes_a = { nid for nid, data in self.G.nodes(data=True) if domain_a in data.get("domain_tags", []) and "claim" not in data.get("domain_tags", []) } nodes_b = { nid for nid, data in self.G.nodes(data=True) if domain_b in data.get("domain_tags", []) and "claim" not in data.get("domain_tags", []) } gaps = [] seen = set() for a_id in nodes_a: if a_id not in self.G: continue hop1 = set(self.G.successors(a_id)) | set(self.G.predecessors(a_id)) hop2 = set() for n1 in hop1: if "claim" in self._index.get(n1, ConceptNode(id="", preferred_name="")).domain_tags: continue hop2.update(self.G.successors(n1)) hop2.update(self.G.predecessors(n1)) hop2 -= {a_id} hop2 -= hop1 for b_id in hop2 & nodes_b: pair = tuple(sorted([a_id, b_id])) if pair in seen: continue seen.add(pair) if self.G.has_edge(a_id, b_id) or self.G.has_edge(b_id, a_id): continue try: path = nx.shortest_path(self.G, a_id, b_id) except (nx.NetworkXNoPath, nx.NetworkXError): continue if len(path) > 3: continue connecting = [n for n in path[1:-1] if "claim" not in self._index.get(n, ConceptNode(id="", preferred_name="")).domain_tags] a_node = self._index.get(a_id) b_node = self._index.get(b_id) gaps.append(Gap( concept_a_id=a_id, concept_a_name=a_node.preferred_name if a_node else a_id, concept_b_id=b_id, concept_b_name=b_node.preferred_name if b_node else b_id, distance=len(path) - 1, connecting_concepts=connecting, domain_a=domain_a, domain_b=domain_b, potential_relation=self._infer_relation(path), )) gaps.sort(key=lambda g: (0 if g.domain_a != g.domain_b else 1, g.distance)) return gaps[:max_results] # ── name resolution ──────────────────────────────────────────────── def resolve_name(self, query: str) -> Optional[str]: """Resolve a name to a concept ID. Returns None if not found.""" if not query: return None for node in self._index.values(): if node.preferred_name == query: return node.id query_lower = query.lower() for node in self._index.values(): if node.preferred_name.lower() == query_lower: return node.id for node in self._index.values(): for alias in node.aliases: if alias.lower() == query_lower: return node.id candidates = [] for node in self._index.values(): name_lower = node.preferred_name.lower() if query_lower in name_lower or name_lower in query_lower: candidates.append(node) continue for alias in node.aliases: if query_lower in alias.lower() or alias.lower() in query_lower: candidates.append(node) break if len(candidates) == 1: return candidates[0].id elif len(candidates) > 1: candidates.sort(key=lambda n: len(n.preferred_name)) return candidates[0].id return None # ── internal helpers ─────────────────────────────────────────────── def _sample_domain_nodes(self, domain: str, max_n: int) -> list[str]: """Sample up to max_n non-claim nodes from a domain, preferring nodes with edges.""" nodes = [ nid for nid, data in self.G.nodes(data=True) if domain in data.get("domain_tags", []) and "claim" not in data.get("domain_tags", []) and nid not in PATH_IGNORE_NODE_IDS ] # sort by degree (more connected = more useful as seed) nodes.sort(key=lambda n: self.G.degree(n), reverse=True) return nodes[:max_n] def _build_hypotheses_from_paths( self, raw_paths: list[list[str]], hyp_type: str ) -> list[Hypothesis]: """Build Hypothesis objects from raw node-ID paths.""" hypotheses = [] for raw_path in raw_paths: links = self._enrich_path(raw_path) if not links: continue conf = self._compute_confidence_score(links) nov = self._compute_novelty_score(links) evi = self._compute_evidence_score(links) test, test_reason = self._compute_testability_score(links) claim_ids = [l.claim_id for l in links if l.claim_id] h = Hypothesis( hypothesis_type=hyp_type, source_id=raw_path[0], source_name=self._index[raw_path[0]].preferred_name, target_id=raw_path[-1], target_name=self._index[raw_path[-1]].preferred_name, path=links, confidence_score=conf, novelty_score=nov, evidence_score=evi, testability_score=test, supporting_claims=claim_ids, testability_reason=test_reason, ) h.explanation = self._generate_explanation(h) h.composite_score = self._composite_score(h) hypotheses.append(h) hypotheses.sort(key=lambda h: h.composite_score, reverse=True) return hypotheses def _enrich_path(self, raw_path: list[str]) -> list[HypothesisLink]: """Convert a raw node-ID path into rich HypothesisLink objects.""" links = [] for i in range(len(raw_path) - 1): src_id, tgt_id = raw_path[i], raw_path[i + 1] if not self.G.has_edge(src_id, tgt_id): continue edge_data = self.G.edges[src_id, tgt_id] src_node = self._index.get(src_id) tgt_node = self._index.get(tgt_id) claim_id = edge_data.get("metadata", {}).get("claim_id", "") claim_node = self._index.get(claim_id) if claim_id else None evidence = {} paper = {} raw_text = "" if claim_node and claim_node.metadata: meta = claim_node.metadata evidence = meta.get("evidence", {}) paper = meta.get("source_paper", {}) raw_text = meta.get("raw_text", "") links.append(HypothesisLink( from_id=src_id, from_name=src_node.preferred_name if src_node else src_id, to_id=tgt_id, to_name=tgt_node.preferred_name if tgt_node else tgt_id, relation_type=edge_data.get("relation_type", "unknown"), confidence=edge_data.get("confidence", 0.5), claim_id=claim_id, raw_text=raw_text, evidence=evidence, source_paper=paper, )) return links # ── scoring ──────────────────────────────────────────────────────── def compute_frequency_boost(self, claim_meta: dict) -> float: """Frequency boost based on independent PRIMARY study replication. Prefers the merged `primary_supporting_papers` list set by `phase4_optimize.merge_duplicate_claims` (already filtered for non-review study types). Falls back to rebuilding from the pre-merge index, matching the same filter logic. """ # Fast path: canonical claim carries primary-PMID list primary = claim_meta.get("primary_supporting_papers") if primary is not None and isinstance(primary, list): n = len(primary) if n >= 3: return 1.2 elif n >= 1: return 1.0 else: return 0.5 # Fallback: scan all claims with the same SPO, filter reviews key = ( claim_meta.get("subject_id", ""), claim_meta.get("predicate", ""), claim_meta.get("object_id", ""), ) all_claims = self._claims_by_triple.get(key, []) primary_pmids = set() for c in all_claims: st = c.get("evidence", {}).get("study_type", "") if st not in _REVIEW_TYPES: pmid = c.get("source_paper", {}).get("pmid", "") if pmid: primary_pmids.add(pmid) if len(primary_pmids) >= 3: return 1.2 elif len(primary_pmids) >= 1: return 1.0 else: return 0.5 @staticmethod def compute_temporal_decay(claim_meta: dict, reference_year: int = 2026) -> float: """Temporal decay: newer primary studies get higher weight. Reviews get no time bonus (1.0). Primary studies decay 3% per year, floor 0.7. """ st = claim_meta.get("evidence", {}).get("study_type", "") if st in _REVIEW_TYPES: return 1.0 year = claim_meta.get("source_paper", {}).get("year", 0) if not year: return 0.85 # unknown year, neutral age = reference_year - year return max(0.7, 1.0 - 0.03 * age) def _compute_confidence_score(self, path: list[HypothesisLink]) -> float: """Confidence = geometric mean of per-link scores, with weak-link penalty. Per-link score = edge.confidence × freq_boost × temporal_decay (edge.confidence already includes study_type weighting from phase4_optimize.apply_evidence_weighting and the claim-level statistical quality signals from claim_extractor._estimate_confidence) Aggregate: geometric mean (one weak link crushes the path) + weakest-link penalty (×0.7 when min_edge < 0.1) Single source of truth for each multiplier: - study_type → phase4_optimize.WEIGHT_MAP (canonical, idempotent) - p_value/sample_size/replicability → claim_extractor._estimate_confidence - freq across primary PMIDs → compute_frequency_boost - publication recency → compute_temporal_decay """ if not path: return 0.0 import math scores = [] min_conf = float("inf") for link in path: raw = max(link.confidence, 1e-3) # tiny floor for log() min_conf = min(min_conf, raw) full_meta = { "evidence": link.evidence, "source_paper": link.source_paper, "subject_id": link.from_id, "predicate": link.relation_type, "object_id": link.to_id, } freq_boost = self.compute_frequency_boost(full_meta) temp_decay = self.compute_temporal_decay(full_meta) s = raw * freq_boost * temp_decay scores.append(min(s, 1.0)) log_sum = sum(math.log(max(s, 1e-6)) for s in scores) gm = math.exp(log_sum / len(scores)) if min_conf < 0.1: gm *= 0.7 return max(min(gm, 1.0), 0.0) def _compute_novelty_score(self, path: list[HypothesisLink]) -> float: """Score how novel/surprising a hypothesis is. Lower = more expected (direct known relationship), Higher = more surprising. """ score = 0.3 # base # hop bonus: longer paths = more novel connections score += 0.1 * min(len(path) - 1, 3) # cross-domain bonus: connecting different domains is more novel domains_seen = set() for link in path: src = self._index.get(link.from_id) tgt = self._index.get(link.to_id) if src: domains_seen.update(src.domain_tags) if tgt: domains_seen.update(tgt.domain_tags) domains_seen.discard("claim") n_domains = len(domains_seen) if n_domains >= 3: score += 0.15 elif n_domains >= 2: score += 0.10 # rare relation bonus: non-generic relations are more novel rare_count = sum(1 for l in path if l.relation_type not in COMMON_RELATIONS) score += 0.05 * min(rare_count, 3) # evidence diversity: more papers = better supported, less novel # fewer papers = more speculative, more novel pmids = {l.source_paper.get("pmid", "") for l in path if l.source_paper.get("pmid")} if len(pmids) == 0: score += 0.10 # no paper support = speculative but novel elif len(pmids) == 1: score += 0.05 # single source = weak replication return min(score, 1.0) def _compute_evidence_score(self, path: list[HypothesisLink]) -> float: """Score evidence quality: traceability and text availability. DOES NOT use p_value/sample_size/effect_size — those signals already flow into edge.confidence via claim_extractor._estimate_confidence and are aggregated by _compute_confidence_score. Counting them again here was double-dipping. This score asks a different question: "How well-anchored is the evidence in source documents?" — which complements confidence's "How statistically strong is the evidence?". Path-level: most well-extracted edges score 0.6-0.8; we reserve >0.9 for paths whose every step has rich provenance. """ _REVIEW_TYPES = {"narrative_review", "review"} scores = [] for link in path: study_type = (link.evidence.get("study_type") or "").lower() s = 0.2 if study_type in _REVIEW_TYPES else 0.3 if link.raw_text and len(link.raw_text) > 20: s += 0.20 if link.claim_id: s += 0.15 if link.source_paper.get("pmid"): s += 0.15 if link.evidence.get("study_type"): s += 0.10 scores.append(min(s, 1.0)) return self._geometric_mean(scores) def _compute_testability_score(self, path: list[HypothesisLink]) -> tuple[float, str]: """Score how testable a hypothesis is with NeuroClaw imaging experiments. Boosts for: - Brain region features directly measurable from sMRI (volume, thickness) - Connectivity features (functional/structural) for GNN models - Modalities available in UKB/ADNI/HCP-YA - Deep learning model compatibility (BrainGNN, NeuroStorm) - Target diseases present in datasets (AD, PD, depression, etc.) Returns (score, reason_string). """ all_text = " ".join( l.raw_text + " " + l.from_name + " " + l.to_name + " " + l.relation_type for l in path ).lower() # check which modalities are mentioned matched_modalities = [] for modality, keywords in TESTABLE_MODALITIES.items(): for kw in keywords: if kw.lower() in all_text: matched_modalities.append(modality) break if not matched_modalities: return 0.15, "no imaging modality detected" score = 0.25 # base for having a modality # modality bonus (more = more testable angles) score += 0.10 * min(len(matched_modalities), 3) # heavy bonus for sMRI features (volume/thickness — directly measurable in all 3 datasets) if "sMRI" in matched_modalities: score += 0.15 # heavy bonus for connectivity features (input to BrainGNN/GNN models) if "dMRI" in matched_modalities or "fMRI" in matched_modalities: score += 0.15 # bonus for PET (available in ADNI, key for AD research) if "PET" in matched_modalities: score += 0.10 # bonus for brain region specificity (testable with atlas parcellation) brain_region_keywords = ["cortex", "hippocampus", "amygdala", "thalamus", "cerebellum", "striatum", "insula", "gyrus", "caudate", "putamen", "pallidum", "accumbens", "precuneus", "cuneus", "lingual", "fusiform", "parahippocampal", "entorhinal", "parietal", "frontal", "temporal", "occipital"] regions_found = [kw for kw in brain_region_keywords if kw in all_text] if regions_found: score += 0.10 # atlas-based ROI analysis if len(regions_found) >= 2: score += 0.05 # pair of regions = connectivity hypothesis # bonus for diseases present in target datasets dataset_diseases = [ "alzheimer", "parkinson", "depression", "schizophrenia", "adhd", "autism", "epilepsy", "multiple sclerosis", "anxiety", "bipolar", "dementia", "mci", "mild cognitive", ] if any(d in all_text for d in dataset_diseases): score += 0.05 # bonus for DL-model-compatible features (graph structure, ROI, connectivity matrix) if any(kw.lower() in all_text for kw in DL_MODEL_KEYWORDS): score += 0.05 # build reason string modalities_str = ", ".join(matched_modalities) reason = f"modalities: {modalities_str}" if regions_found: reason += f" | brain regions: {', '.join(regions_found[:4])}" if any(d in all_text for d in dataset_diseases): matched_diseases = [d for d in dataset_diseases if d in all_text] reason += f" | diseases: {', '.join(matched_diseases[:3])}" return min(score, 1.0), reason def _composite_score(self, h: Hypothesis) -> float: """Weighted geometric mean of the 4 score components. Geometric: a hypothesis is only as good as its weakest dimension. A path with great evidence but 0 testability is worthless to us. Matches the linear fitness in evolution_engine._score_fitness (same weights, different aggregation — fitness adds convergence / diversity / length modifiers not relevant here). """ c = max(h.confidence_score, 0.01) e = max(h.evidence_score, 0.01) n = max(h.novelty_score, 0.01) t = max(h.testability_score, 0.01) score = (c ** 0.20) * (e ** 0.20) * (n ** 0.25) * (t ** 0.35) if self._has_only_review_evidence(h): score *= 0.7 return score @staticmethod def _has_only_review_evidence(h: Hypothesis) -> bool: """True if every link in the path comes from a review/narrative_review.""" _REVIEW_TYPES = {"narrative_review", "review"} if not h.path: return False for link in h.path: study_type = (link.evidence.get("study_type") or "").lower() if study_type and study_type not in _REVIEW_TYPES: return False return True def _check_contradiction(self, m1: dict, m2: dict) -> float: """Check if two claims contradict each other. Returns severity 0-1.""" p1 = m1.get("predicate", "") p2 = m2.get("predicate", "") n1 = m1.get("negated", False) n2 = m2.get("negated", False) if p1 == p2 and n1 != n2: return 1.0 if (p1, p2) in OPPOSING_PREDICATES: return 0.8 if p1 == p2 and not n1 and not n2: d1 = m1.get("evidence", {}).get("direction", "") d2 = m2.get("evidence", {}).get("direction", "") if d1 and d2 and d1 != d2: return 0.6 return 0.0 def _infer_relation(self, path: list[str]) -> str: """Infer a potential relation from a path's edge types.""" relations = [] for i in range(len(path) - 1): if self.G.has_edge(path[i], path[i + 1]): rt = self.G.edges[path[i], path[i + 1]].get("relation_type", "") if rt and rt not in ("about", "is_a", "part_of"): relations.append(rt) if relations: for r in relations: if r not in COMMON_RELATIONS: return r return relations[0] return "associated_with" def _generate_explanation(self, h: Hypothesis) -> str: """Generate a human-readable explanation for a hypothesis.""" path_str = " --> ".join( f"{l.from_name} --[{l.relation_type}]--> {l.to_name}" for l in h.path ) if not path_str: return "" pmids = {l.source_paper.get("pmid", "") for l in h.path if l.source_paper.get("pmid")} key_finding = "" for l in h.path: if l.raw_text: key_finding = l.raw_text[:150] if len(l.raw_text) > 150: key_finding += "..." break lines = [ f"Hypothesis: {h.source_name} may relate to {h.target_name} via {len(h.path)}-hop path.", f"Path: {path_str}", f"Evidence: {len(h.supporting_claims)} claims from {len(pmids)} papers", ] if key_finding: lines.append(f"Key finding: '{key_finding}'") if h.testability_reason: lines.append(f"Testability: {h.testability_reason}") lines.append( f"Confidence: {h.confidence_score:.2f} | " f"Novelty: {h.novelty_score:.2f} | " f"Evidence: {h.evidence_score:.2f} | " f"Testability: {h.testability_score:.2f}" ) return "\n".join(lines) @staticmethod def _geometric_mean(values: list[float]) -> float: if not values: return 0.0 product = math.prod(values) return product ** (1.0 / len(values)) def _simple_slope(xs: list[int], ys: list[int]) -> float: """Simple linear regression slope without numpy.""" n = len(xs) if n < 2: return 0.0 mean_x = sum(xs) / n mean_y = sum(ys) / n num = sum((x - mean_x) * (y - mean_y) for x, y in zip(xs, ys)) den = sum((x - mean_x) ** 2 for x in xs) if den == 0: return 0.0 return num / den