Buckets:
| from __future__ import annotations | |
| import hashlib | |
| import json | |
| import re | |
| from collections import Counter | |
| from pathlib import Path | |
| PROJECT_ROOT = Path(__file__).resolve().parents[1] | |
| RAW_DATA_PATH = PROJECT_ROOT / "data" / "raw_medquad.jsonl" | |
| OUTPUT_PATH = PROJECT_ROOT / "data" / "medical_kb.jsonl" | |
| SCRAPED_AT = "2026-03-25" | |
| disease_keywords = { | |
| "heart": [ | |
| "heart", | |
| "cardiac", | |
| "chest pain", | |
| "angina", | |
| "coronary", | |
| "arrhythmia", | |
| "hypertension", | |
| "blood pressure", | |
| ], | |
| "diabetes": ["diabetes", "blood sugar", "insulin", "glucose", "diabetic", "hyperglycemia"], | |
| "asthma": ["asthma", "breathing", "inhaler", "bronchial", "wheeze", "respiratory"], | |
| "liver": ["liver", "hepatitis", "cirrhosis", "jaundice", "bile"], | |
| "kidney": ["kidney", "renal", "dialysis", "nephritis", "urinary"], | |
| "mental_health": ["depression", "anxiety", "mental", "psychiatric", "bipolar", "stress"], | |
| "cancer": ["cancer", "tumor", "oncology", "chemotherapy", "malignant"], | |
| } | |
| def detect_disease(text: str) -> str: | |
| normalized_text = (text or "").lower() | |
| for disease_id, keywords in disease_keywords.items(): | |
| if any(keyword in normalized_text for keyword in keywords): | |
| return disease_id | |
| return "general" | |
| def chunk_text(text: str, chunk_size: int = 700, overlap: int = 150) -> list[str]: | |
| normalized_text = re.sub(r"\s+", " ", (text or "")).strip() | |
| if not normalized_text: | |
| return [] | |
| sentences = [sentence.strip() for sentence in re.split(r"(?<=[.!?])\s+", normalized_text) if sentence.strip()] | |
| if not sentences: | |
| return [normalized_text] | |
| chunks: list[str] = [] | |
| start = 0 | |
| while start < len(sentences): | |
| end = start | |
| current_sentences: list[str] = [] | |
| current_length = 0 | |
| while end < len(sentences): | |
| sentence = sentences[end] | |
| proposed_length = len(sentence) if not current_sentences else current_length + 1 + len(sentence) | |
| if current_sentences and proposed_length > chunk_size: | |
| break | |
| current_sentences.append(sentence) | |
| current_length = proposed_length | |
| end += 1 | |
| chunk = " ".join(current_sentences).strip() | |
| if not chunk: | |
| break | |
| chunks.append(chunk) | |
| if end >= len(sentences): | |
| break | |
| overlap_length = 0 | |
| next_start = end | |
| for index in range(end - 1, start - 1, -1): | |
| overlap_length += len(sentences[index]) + (1 if overlap_length else 0) | |
| if overlap_length >= overlap: | |
| next_start = index | |
| break | |
| if next_start <= start: | |
| next_start = start + 1 | |
| start = next_start | |
| deduped_chunks: list[str] = [] | |
| for chunk in chunks: | |
| if not deduped_chunks or deduped_chunks[-1] != chunk: | |
| deduped_chunks.append(chunk) | |
| return deduped_chunks | |
| def main() -> None: | |
| if not RAW_DATA_PATH.exists(): | |
| raise FileNotFoundError( | |
| f"Input file not found: {RAW_DATA_PATH}. Run scripts/download_dataset.py first." | |
| ) | |
| OUTPUT_PATH.parent.mkdir(parents=True, exist_ok=True) | |
| records: list[dict] = [] | |
| disease_counts: Counter[str] = Counter() | |
| with RAW_DATA_PATH.open("r", encoding="utf-8") as file_obj: | |
| for line in file_obj: | |
| row = json.loads(line) | |
| question = row.get("question", "") | |
| answer = row.get("answer", "") | |
| qtype = row.get("qtype", "") | |
| detected_disease = detect_disease(question) | |
| content = f"{qtype.title()}: {answer}" | |
| chunks = chunk_text(content) | |
| document_id = hashlib.md5(question.encode()).hexdigest()[:12] | |
| for chunk_idx, chunk in enumerate(chunks): | |
| uid = hashlib.md5(f"{question}_{chunk_idx}".encode()).hexdigest()[:12] | |
| chunk_id = f"{uid}_chunk_{chunk_idx:03d}" | |
| record = { | |
| "id": chunk_id, | |
| "content": chunk, | |
| "metadata": { | |
| "disease_id": detected_disease, | |
| "topic": qtype, | |
| "source": "MedQuAD", | |
| "document_id": document_id, | |
| "chunk_index": chunk_idx, | |
| "scraped_at": SCRAPED_AT, | |
| }, | |
| } | |
| records.append(record) | |
| disease_counts[detected_disease] += 1 | |
| with OUTPUT_PATH.open("w", encoding="utf-8") as file_obj: | |
| for record in records: | |
| file_obj.write(json.dumps(record, ensure_ascii=False) + "\n") | |
| print(f"Total chunks created: {len(records)}") | |
| print("Chunks per disease_id:") | |
| for disease_id, count in sorted(disease_counts.items()): | |
| print(f" {disease_id}: {count}") | |
| print("Sample records:") | |
| for record in records[:2]: | |
| print(json.dumps(record, ensure_ascii=False, indent=2)) | |
| if __name__ == "__main__": | |
| main() | |
Xet Storage Details
- Size:
- 5.06 kB
- Xet hash:
- 70013006f84c34f8304308e77a2e69e36241bbae0d9bdb97f7cacba7dea08e36
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.