Spaces:
Running
Running
| """Unified DDI dataset builder for multi-source integration. | |
| Sources supported via adapters: | |
| - DDInter | |
| - DrugBank | |
| - TWOSIDES | |
| - SIDER | |
| - FAERS | |
| - ChEMBL | |
| - PubChem | |
| - KEGG | |
| The output schema is immutable and reproducible. | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import hashlib | |
| import json | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import Dict, Iterable, List | |
| import pandas as pd | |
| from preprocessing.artifact_manager import manager | |
| SEVERITY_LEVELS = ['unknown', 'minor', 'moderate', 'major'] | |
| SEVERITY_RANK = {v: i for i, v in enumerate(SEVERITY_LEVELS)} | |
| SOURCE_RELIABILITY = { | |
| 'drugbank': 1.0, | |
| 'ddinter': 0.95, | |
| 'kegg': 0.9, | |
| 'chembl': 0.85, | |
| 'pubchem': 0.8, | |
| 'twosides': 0.75, | |
| 'sider': 0.7, | |
| 'faers': 0.65, | |
| } | |
| class UnifiedSchema: | |
| version: str = 'ddi_unified_v1' | |
| columns: tuple[str, ...] = ( | |
| 'drug_a', | |
| 'drug_b', | |
| 'severity', | |
| 'source', | |
| 'support', | |
| 'evidence', | |
| ) | |
| def normalize_drug_name(v: str) -> str: | |
| return ' '.join(str(v).strip().lower().split()) | |
| def canonical_pair(a: str, b: str) -> tuple[str, str]: | |
| na = normalize_drug_name(a) | |
| nb = normalize_drug_name(b) | |
| return tuple(sorted((na, nb))) | |
| def normalize_severity(v: str) -> str: | |
| s = str(v).strip().lower() | |
| if s in SEVERITY_RANK: | |
| return s | |
| if s in {'severe', 'contraindicated', 'high'}: | |
| return 'major' | |
| if s in {'medium', 'moderate risk'}: | |
| return 'moderate' | |
| if s in {'low', 'mild'}: | |
| return 'minor' | |
| return 'unknown' | |
| def ingest_ddinter(path: Path) -> pd.DataFrame: | |
| df = manager.load_artifact('ddinter_combined') | |
| out = pd.DataFrame( | |
| { | |
| 'drug_a': df['Drug_A'].astype(str), | |
| 'drug_b': df['Drug_B'].astype(str), | |
| 'severity': df['Level'].astype(str).map(normalize_severity), | |
| 'source': 'ddinter', | |
| 'support': 1, | |
| 'evidence': df.get('Description', '').astype(str) if 'Description' in df.columns else '', | |
| } | |
| ) | |
| return out | |
| def ingest_generic(path: Path, source: str, mapping: Dict[str, str]) -> pd.DataFrame: | |
| df = manager.load_artifact('ddinter_combined') | |
| def col(name: str) -> str: | |
| if name not in mapping: | |
| raise ValueError(f'Missing mapping for {name} in source {source}') | |
| return mapping[name] | |
| out = pd.DataFrame( | |
| { | |
| 'drug_a': df[col('drug_a')].astype(str), | |
| 'drug_b': df[col('drug_b')].astype(str), | |
| 'severity': df[col('severity')].astype(str).map(normalize_severity), | |
| 'source': source, | |
| 'support': 1, | |
| 'evidence': df[col('evidence')].astype(str) if 'evidence' in mapping else '', | |
| } | |
| ) | |
| return out | |
| def dedupe_and_resolve(df: pd.DataFrame) -> pd.DataFrame: | |
| buckets: Dict[tuple[str, str], List[dict]] = {} | |
| for _, row in df.iterrows(): | |
| key = canonical_pair(row['drug_a'], row['drug_b']) | |
| buckets.setdefault(key, []).append( | |
| { | |
| 'severity': normalize_severity(row['severity']), | |
| 'source': str(row['source']), | |
| 'support': int(row.get('support', 1)), | |
| 'evidence': str(row.get('evidence', '')), | |
| } | |
| ) | |
| merged = [] | |
| for (a, b), rows in buckets.items(): | |
| # Reliability-aware conservative merge. | |
| severity_support = {level: 0.0 for level in SEVERITY_LEVELS} | |
| for r in rows: | |
| src = str(r['source']).strip().lower() | |
| reliability = SOURCE_RELIABILITY.get(src, 0.6) | |
| sev = normalize_severity(r['severity']) | |
| severity_support[sev] += reliability * max(1, int(r.get('support', 1))) | |
| ranked = sorted( | |
| severity_support.items(), | |
| key=lambda item: (item[1], SEVERITY_RANK.get(item[0], 0)), | |
| reverse=True, | |
| ) | |
| chosen_severity = ranked[0][0] | |
| max_seen = max(rows, key=lambda r: SEVERITY_RANK.get(normalize_severity(r['severity']), 0))['severity'] | |
| disagreement = len({normalize_severity(r['severity']) for r in rows}) > 1 | |
| # Safety-first tie break: if signals conflict and strong major evidence exists, keep major. | |
| if disagreement and severity_support.get('major', 0.0) >= 0.9: | |
| chosen_severity = 'major' | |
| merged.append( | |
| { | |
| 'drug_a': a, | |
| 'drug_b': b, | |
| 'severity': chosen_severity, | |
| 'source': '|'.join(sorted({r['source'] for r in rows})), | |
| 'support': int(sum(r['support'] for r in rows)), | |
| 'evidence': ' || '.join([r['evidence'] for r in rows if r['evidence']][:5]), | |
| 'conflict': int(disagreement), | |
| 'max_observed_severity': normalize_severity(max_seen), | |
| } | |
| ) | |
| out = pd.DataFrame(merged) | |
| return out.sort_values(['drug_a', 'drug_b']).reset_index(drop=True) | |
| def dataset_stats(df: pd.DataFrame) -> dict: | |
| return { | |
| 'rows': int(len(df)), | |
| 'unique_drugs': int(len(set(df['drug_a']).union(set(df['drug_b'])))), | |
| 'severity_distribution': df['severity'].value_counts().to_dict(), | |
| 'conflict_rows': int(df['conflict'].sum()) if 'conflict' in df.columns else 0, | |
| 'sources': sorted(set('|'.join(df['source'].tolist()).split('|'))), | |
| 'checksum': hashlib.sha256(df.to_csv(index=False).encode('utf-8')).hexdigest(), | |
| } | |
| def main() -> None: | |
| parser = argparse.ArgumentParser(description='Build unified DDI dataset from multi-source inputs') | |
| parser.add_argument('--ddinter', type=str, required=True) | |
| parser.add_argument('--extra-config', type=str, default=None, help='JSON config listing extra CSV sources and column mappings') | |
| parser.add_argument('--out-csv', type=str, required=True) | |
| parser.add_argument('--out-stats', type=str, required=True) | |
| args = parser.parse_args() | |
| frames: List[pd.DataFrame] = [ingest_ddinter(Path(args.ddinter))] | |
| if args.extra_config: | |
| cfg = json.loads(Path(args.extra_config).read_text(encoding='utf-8')) | |
| for source in cfg.get('sources', []): | |
| frames.append( | |
| ingest_generic( | |
| path=Path(source['path']), | |
| source=str(source['name']).lower(), | |
| mapping=source['mapping'], | |
| ) | |
| ) | |
| all_df = pd.concat(frames, ignore_index=True) | |
| unified = dedupe_and_resolve(all_df) | |
| schema = UnifiedSchema() | |
| missing = [c for c in schema.columns if c not in unified.columns] | |
| if missing: | |
| raise ValueError(f'Unified schema mismatch, missing: {missing}') | |
| out_csv = Path(args.out_csv) | |
| out_csv.parent.mkdir(parents=True, exist_ok=True) | |
| unified.to_csv(out_csv, index=False) | |
| stats = dataset_stats(unified) | |
| stats['schema_version'] = schema.version | |
| Path(args.out_stats).write_text(json.dumps(stats, indent=2), encoding='utf-8') | |
| if __name__ == '__main__': | |
| main() | |