| """Raw corpus ingestion utilities.""" |
|
|
| from __future__ import annotations |
|
|
| import hashlib |
| import json |
| from dataclasses import dataclass |
| from pathlib import Path |
| from typing import Iterable, Iterator |
|
|
|
|
| @dataclass(frozen=True) |
| class SourceSpec: |
| """Describes one raw corpus source.""" |
|
|
| name: str |
| domain_tag: str |
| quality_tier: str |
| license_category: str |
| estimated_tokens: int |
| path: str |
| text_key: str = "text" |
|
|
|
|
| SOURCE_REGISTRY: tuple[SourceSpec, ...] = ( |
| SourceSpec("general_web", "general", "medium", "permissive", 20_000_000_000, "data/raw/general_web.jsonl"), |
| SourceSpec("code", "code", "high", "permissive", 8_000_000_000, "data/raw/code.jsonl"), |
| SourceSpec("math_science", "math", "high", "permissive", 4_000_000_000, "data/raw/math_science.jsonl"), |
| SourceSpec("books_longform", "general", "high", "restricted", 5_000_000_000, "data/raw/books.jsonl"), |
| SourceSpec("multilingual", "multilingual", "medium", "permissive", 3_000_000_000, "data/raw/multilingual.jsonl"), |
| SourceSpec("synthetic", "reasoning", "high", "permissive", 1_000_000_000, "data/raw/synthetic.jsonl"), |
| ) |
|
|
|
|
| def iter_jsonl(path: Path, text_key: str = "text") -> Iterator[dict[str, object]]: |
| """Yield JSONL records from disk.""" |
| with path.open("r", encoding="utf-8") as handle: |
| for line in handle: |
| line = line.strip() |
| if not line: |
| continue |
| payload = json.loads(line) |
| text = payload.get(text_key) |
| if not isinstance(text, str) or not text.strip(): |
| continue |
| yield payload |
|
|
|
|
| def stream_source(spec: SourceSpec) -> Iterator[dict[str, object]]: |
| """Yield normalized records for one configured source.""" |
| path = Path(spec.path) |
| if not path.exists(): |
| return iter(()) |
| return ( |
| { |
| "id": stable_record_id(spec.name, record[spec.text_key]), |
| "text": record[spec.text_key], |
| "domain_tag": spec.domain_tag, |
| "quality_tier": spec.quality_tier, |
| "license_category": spec.license_category, |
| "source_name": spec.name, |
| } |
| for record in iter_jsonl(path, spec.text_key) |
| ) |
|
|
|
|
| def stream_all_sources(sources: Iterable[SourceSpec] = SOURCE_REGISTRY) -> Iterator[dict[str, object]]: |
| """Yield records from every source in the registry.""" |
| for spec in sources: |
| yield from stream_source(spec) |
|
|
|
|
| def stable_record_id(source_name: str, text: str) -> str: |
| """Hash a source+text pair into a stable content id.""" |
| digest = hashlib.sha256() |
| digest.update(source_name.encode("utf-8")) |
| digest.update(b"\0") |
| digest.update(text.encode("utf-8")) |
| return digest.hexdigest() |
|
|