File size: 6,118 Bytes
8103f33 2a3badd 8103f33 2a3badd 8103f33 2a3badd 8103f33 2a3badd 8103f33 2a3badd 8103f33 2a3badd 8103f33 2a3badd 8103f33 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 | """
Convert a Kaggle drug CSV into features.yaml entries and append to the pharma KB.
Supported CSV formats:
- Drug Labels & Side Effects (drug_name, medical_condition, side_effects)
- Drug Dataset: Uses, Side Effects & User Reviews (same columns)
- Medicine Side-Effects Analysis (drug, use, side_effects / adverse_effects)
Usage:
python scripts/kaggle_to_yaml.py path/to/drugs.csv [--max 15] [--dry-run]
Options:
--max N Max number of drugs to import (default: 15)
--dry-run Print YAML to stdout instead of appending to features.yaml
--domain D Target domain (default: pharma)
"""
import argparse
import csv
import re
import sys
from pathlib import Path
KNOWLEDGE_ROOT = Path(__file__).parent.parent / "knowledge"
_DRUG_COLS = ["drug_name", "Drug Name", "name", "drug", "drugName"]
_USE_COLS = ["indications", "medical_condition", "condition", "use", "uses",
"indication", "Medical Condition"]
_EFFECT_COLS = ["side_effects", "Side_Effects", "Side Effects", "sideEffects",
"adverse_effects", "adverse_events"]
_CONTRA_COLS = ["contraindications", "contraindication", "Contraindications"]
_WARN_COLS = ["warnings", "warning", "Warnings", "precautions"]
def _find_col(headers: list[str], candidates: list[str]) -> str | None:
h_lower = {h.lower().strip(): h for h in headers}
for c in candidates:
if c in headers:
return c
if c.lower().strip() in h_lower:
return h_lower[c.lower().strip()]
return None
def _slugify(s: str) -> str:
return re.sub(r"[^a-z0-9]+", "-", s.lower()).strip("-")
def _truncate(text: str, max_items: int = 8) -> str:
sep = "," if text.count(",") >= text.count(";") else ";"
items = [i.strip() for i in text.split(sep) if i.strip()]
if len(items) > max_items:
return ", ".join(items[:max_items]) + ", and others"
return ", ".join(items)
def _to_yaml_block(doc_id: str, title: str, content: str, tags: list[str]) -> str:
safe_content = content.replace('"', '\\"')
tags_yaml = ", ".join(f'"{t}"' for t in tags)
return (
f"\n - id: {doc_id}\n"
f" title: \"{title}\"\n"
f" content: >\n"
+ "".join(f" {line}\n" for line in content.splitlines())
+ f" tags: [{tags_yaml}]\n"
)
def convert(csv_path: Path, max_drugs: int, dry_run: bool, domain: str) -> None:
with csv_path.open(newline="", encoding="utf-8-sig") as f:
reader = csv.DictReader(f)
headers = list(reader.fieldnames or [])
drug_col = _find_col(headers, _DRUG_COLS)
use_col = _find_col(headers, _USE_COLS)
effect_col = _find_col(headers, _EFFECT_COLS)
contra_col = _find_col(headers, _CONTRA_COLS)
warn_col = _find_col(headers, _WARN_COLS)
if not drug_col:
print(f"ERROR: no drug name column found. Headers: {headers}", file=sys.stderr)
sys.exit(1)
print(f"Detected columns — drug: {drug_col!r}, use: {use_col!r}, "
f"effects: {effect_col!r}, contra: {contra_col!r}, warnings: {warn_col!r}")
blocks: list[str] = []
seen: set[str] = set()
for i, row in enumerate(reader):
if len(blocks) >= max_drugs:
break
drug = row.get(drug_col, "").strip().title()
if not drug or drug.lower() in seen:
continue
seen.add(drug.lower())
condition = row.get(use_col, "").strip() if use_col else ""
effects = row.get(effect_col, "").strip() if effect_col else ""
contra = row.get(contra_col, "").strip() if contra_col else ""
warnings = row.get(warn_col, "").strip() if warn_col else ""
condition_str = condition or "the indicated condition"
effects_str = _truncate(effects) if effects else "not listed"
parts = [
f"{drug} is indicated for the treatment or management of {condition_str}.",
f"Known adverse events include: {effects_str}.",
]
if contra:
parts.append(f"Contraindicated in patients with: {contra}.")
if warnings:
parts.append(f"Prescriber warning: {warnings}.")
parts.append(
"Serious unexpected adverse events must be reported to the regulatory "
"authority within 15 days."
)
content = " ".join(parts)
tags = list(filter(None, [
_slugify(drug),
_slugify(condition) if condition else None,
"adverse-event",
"drug-profile",
]))
doc_id = f"pharma_drug_{i + 1:03d}"
blocks.append(_to_yaml_block(doc_id, f"{drug} — Drug Profile", content, tags))
if not blocks:
print("No documents generated. Check the CSV format.", file=sys.stderr)
sys.exit(1)
yaml_str = "".join(blocks)
if dry_run:
print(yaml_str)
print(f"\n[dry-run] {len(blocks)} drug entries would be appended.", file=sys.stderr)
return
features_path = KNOWLEDGE_ROOT / domain / "features.yaml"
original = features_path.read_text()
# Remove trailing newline before appending
updated = original.rstrip() + "\n" + yaml_str
features_path.write_text(updated)
print(f"Appended {len(blocks)} drug entries to {features_path}")
def main() -> None:
parser = argparse.ArgumentParser(description="Convert Kaggle drug CSV to features.yaml entries")
parser.add_argument("csv", type=Path, help="Path to Kaggle drug CSV")
parser.add_argument("--max", type=int, default=15, dest="max_drugs")
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--domain", default="pharma")
args = parser.parse_args()
if not args.csv.exists():
print(f"ERROR: file not found: {args.csv}", file=sys.stderr)
sys.exit(1)
convert(args.csv, args.max_drugs, args.dry_run, args.domain)
if __name__ == "__main__":
main()
|