warbler-cda / warbler_cda /utils /hf_warbler_ingest.py
Bellok
feat: Add HFDatasetRepoManager for Hugging Face dataset repo management
3523a4a
#!/usr/bin/env python3
"""
HuggingFace Warbler Pack Ingestion Pipeline.
Imports HF datasets and transforms them into Warbler-compatible packs
for NPC intelligence training via the magma layer self-training system.
"""
import logging
import tempfile
from pathlib import Path
from typing import List, Optional
import click
try:
from huggingface_hub import HfApi
except ImportError: # pragma: no cover - optional at import time for tests/environments
HfApi = None
from .transformers import (
NPCDialogueTransformer,
MultiCharacterTransformer,
SystemChatTransformer,
ArxivTransformer,
PromptReportTransformer,
NovelsTransformer,
ManualsTransformer,
EnterpriseTransformer,
PortugueseEducationTransformer,
EdustoriesTransformer,
WarblerPackBuilder,
)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
BASE_DIR = Path(__file__).resolve().parent
PACKS_DIR = BASE_DIR.parents[1] / "packs"
def _require_hf_api():
"""Return HfApi or raise a clear CLI error if unavailable."""
if HfApi is None:
raise click.ClickException(
"huggingface_hub is required for dataset repo operations. "
"Install dependencies and try again."
)
return HfApi
def _dataset_card(repo_id: str, pack_names: List[str], source_dir: Path) -> str:
"""Create a basic dataset card for a Warbler dataset repo."""
pack_list = "\n".join(f"- {pack_name}" for pack_name in pack_names) or "- none specified"
return f"""---
language:
- en
license: mit
task_categories:
- text-retrieval
pretty_name: {repo_id.split('/')[-1]}
size_categories:
- unknown
---
# {repo_id.split('/')[-1]}
Warbler CDA dataset repository containing pack artifacts for hosted retrieval.
## Included packs
{pack_list}
## Source
Generated from Warbler CDA using the local source directory:
`{source_dir}`
## Notes
- This repo is intended to store retrieval corpora separately from the Space repo.
- Pack files are uploaded as Warbler-compatible directories with `package.json` metadata and JSONL content shards.
"""
class HFDatasetRepoManager:
"""Create and publish Warbler dataset repositories on Hugging Face."""
def __init__(self, token: Optional[str] = None):
api_class = _require_hf_api()
self.api = api_class(token=token)
def create_dataset_repo(
self,
repo_id: str,
private: bool = False,
exist_ok: bool = True,
write_dataset_card: bool = True,
) -> str:
"""Create a dataset repository and optionally seed a dataset card."""
repo_url = self.api.create_repo(
repo_id=repo_id,
repo_type="dataset",
private=private,
exist_ok=exist_ok,
)
if write_dataset_card:
with tempfile.NamedTemporaryFile("w", suffix=".md", delete=False, encoding="utf-8") as handle:
handle.write(_dataset_card(repo_id, [], PACKS_DIR))
temp_path = Path(handle.name)
try:
self.api.upload_file(
path_or_fileobj=str(temp_path),
path_in_repo="README.md",
repo_id=repo_id,
repo_type="dataset",
commit_message="Initialize Warbler dataset card",
)
finally:
temp_path.unlink(missing_ok=True)
return repo_url.repo_id if hasattr(repo_url, "repo_id") else str(repo_url)
def publish_packs(
self,
repo_id: str,
pack_names: List[str],
source_dir: Path,
target_prefix: str = "packs",
update_dataset_card: bool = True,
) -> None:
"""Upload one or more pack directories into a dataset repository."""
missing_packs = [pack_name for pack_name in pack_names if not (source_dir / pack_name).exists()]
if missing_packs:
raise click.ClickException(f"Pack directories not found: {', '.join(missing_packs)}")
for pack_name in pack_names:
pack_dir = source_dir / pack_name
path_in_repo = f"{target_prefix}/{pack_name}" if target_prefix else pack_name
self.api.upload_folder(
folder_path=str(pack_dir),
path_in_repo=path_in_repo,
repo_id=repo_id,
repo_type="dataset",
commit_message=f"Upload {pack_name}",
)
if update_dataset_card:
with tempfile.NamedTemporaryFile("w", suffix=".md", delete=False, encoding="utf-8") as handle:
handle.write(_dataset_card(repo_id, pack_names, source_dir))
temp_path = Path(handle.name)
try:
self.api.upload_file(
path_or_fileobj=str(temp_path),
path_in_repo="README.md",
repo_id=repo_id,
repo_type="dataset",
commit_message="Update dataset card",
)
finally:
temp_path.unlink(missing_ok=True)
@click.group()
def cli():
"""HuggingFace Warbler Pack Ingestion Tool."""
pass
@cli.command()
@click.option(
"--datasets",
"-d",
multiple=True,
type=click.Choice(
[
"npc-dialogue",
"multi-character",
"system-chat",
"arxiv",
"prompt-report",
"novels",
"manuals",
"enterprise",
"portuguese-edu",
"edustories",
"all",
]
),
default=["arxiv"],
help="Datasets to ingest",
)
@click.option("--pack-prefix", "-p", default="warbler-pack-hf", help="Prefix for pack names")
@click.option(
"--arxiv-limit", type=int, default=None, help="Limit number of arXiv papers to ingest (HARD LIMIT: 250,000 for 1GB storage compliance)"
)
@click.option(
"--max-docs-per-chunk",
type=int,
default=50000,
help="Maximum documents per chunk file (default: 50000). Use 0 for no chunking.",
)
@click.option(
"--max-pdf-pages",
type=int,
default=None,
help="Maximum PDF pages to extract (default: None for unlimited)",
)
def ingest(datasets, pack_prefix, arxiv_limit, max_docs_per_chunk, max_pdf_pages):
"""Ingest HF datasets into Warbler packs."""
PACKS_DIR.mkdir(exist_ok=True, parents=True)
builder = WarblerPackBuilder(PACKS_DIR)
results = {}
if "all" in datasets:
datasets = [
"arxiv",
"prompt-report",
"novels",
"manuals",
"enterprise",
"portuguese-edu",
"edustories",
]
if max_docs_per_chunk > 0:
click.echo(f"[PACK] Chunking enabled: {max_docs_per_chunk} documents per chunk")
else:
click.echo("[PACK] Chunking disabled: single file per pack")
if max_pdf_pages is not None:
click.echo(f"[PDF] PDF extraction limit: {max_pdf_pages} pages")
else:
click.echo("[PDF] PDF extraction: unlimited pages")
click.echo()
for dataset in datasets:
click.echo(f"\n[*] Processing {dataset}...")
try:
docs = None
pack_name = None
if dataset == "npc-dialogue":
transformer = NPCDialogueTransformer(max_pdf_pages=max_pdf_pages)
docs = transformer.transform()
pack_name = f"{pack_prefix}-npc-dialogue"
elif dataset == "multi-character":
transformer = MultiCharacterTransformer(max_pdf_pages=max_pdf_pages)
docs = transformer.transform()
pack_name = f"{pack_prefix}-multi-character"
elif dataset == "system-chat":
transformer = SystemChatTransformer(max_pdf_pages=max_pdf_pages)
docs = transformer.transform()
pack_name = f"{pack_prefix}-system-chat"
elif dataset == "arxiv":
transformer = ArxivTransformer(max_pdf_pages=max_pdf_pages)
docs = transformer.transform(limit=arxiv_limit)
pack_name = f"{pack_prefix}-arxiv"
elif dataset == "prompt-report":
transformer = PromptReportTransformer(max_pdf_pages=max_pdf_pages)
docs = transformer.transform()
pack_name = f"{pack_prefix}-prompt-report"
elif dataset == "novels":
transformer = NovelsTransformer(max_pdf_pages=max_pdf_pages)
docs = transformer.transform()
pack_name = f"{pack_prefix}-novels"
elif dataset == "manuals":
transformer = ManualsTransformer(max_pdf_pages=max_pdf_pages)
docs = transformer.transform()
pack_name = f"{pack_prefix}-manuals"
elif dataset == "enterprise":
transformer = EnterpriseTransformer(max_pdf_pages=max_pdf_pages)
docs = transformer.transform()
pack_name = f"{pack_prefix}-enterprise"
elif dataset == "portuguese-edu":
transformer = PortugueseEducationTransformer(max_pdf_pages=max_pdf_pages)
docs = transformer.transform()
pack_name = f"{pack_prefix}-portuguese-edu"
elif dataset == "edustories":
transformer = EdustoriesTransformer(max_pdf_pages=max_pdf_pages)
docs = transformer.transform()
pack_name = f"{pack_prefix}-edustories"
else:
click.echo(f"[ERROR] Unknown dataset: {dataset}")
continue
if docs:
chunk_size = max_docs_per_chunk if max_docs_per_chunk > 0 else float("inf")
pack_path = builder.create_pack(docs, pack_name, max_docs_per_chunk=chunk_size)
results[dataset] = {"documents": len(docs), "pack_path": pack_path}
click.echo(f"[OK] {dataset}: {len(docs)} documents -> {pack_path}")
except Exception as e:
click.echo(f"[ERROR] Error processing {dataset}: {str(e)}")
if results:
report_file = builder.save_report(results)
click.echo("\n[SUCCESS] Ingestion Complete!")
click.echo(f"[STATS] Total Documents: {sum(r['documents'] for r in results.values())}")
click.echo(f"[STATS] Packs Created: {len(results)}")
click.echo(f"[STATS] Report saved to: {report_file}")
else:
click.echo("\n[WARNING] No datasets processed successfully")
@cli.command("create-dataset-repo")
@click.option("--repo-id", required=True, help="Hugging Face dataset repo id, e.g. Bellok/warbler-cda-corpus")
@click.option("--private/--public", default=False, help="Create the dataset repo as private or public")
@click.option("--token", envvar="HF_TOKEN", default=None, help="Hugging Face token or use HF_TOKEN env var")
@click.option("--write-dataset-card/--no-write-dataset-card", default=True, help="Seed the repo with a README dataset card")
def create_dataset_repo(repo_id, private, token, write_dataset_card):
"""Create a Hugging Face dataset repo for Warbler corpora."""
manager = HFDatasetRepoManager(token=token)
created_repo = manager.create_dataset_repo(
repo_id=repo_id,
private=private,
write_dataset_card=write_dataset_card,
)
click.echo(f"[OK] Dataset repo ready: {created_repo}")
@cli.command("publish-packs")
@click.option("--repo-id", required=True, help="Hugging Face dataset repo id, e.g. Bellok/warbler-cda-corpus")
@click.option("--pack", "pack_names", multiple=True, required=True, help="Pack directory names to upload")
@click.option("--source-dir", type=click.Path(path_type=Path, file_okay=False, dir_okay=True), default=PACKS_DIR, show_default=True, help="Local directory containing Warbler pack directories")
@click.option("--target-prefix", default="packs", show_default=True, help="Folder prefix inside the dataset repo")
@click.option("--token", envvar="HF_TOKEN", default=None, help="Hugging Face token or use HF_TOKEN env var")
@click.option("--update-dataset-card/--no-update-dataset-card", default=True, help="Refresh README.md with uploaded pack names")
def publish_packs(repo_id, pack_names, source_dir, target_prefix, token, update_dataset_card):
"""Upload Warbler pack directories into a Hugging Face dataset repo."""
manager = HFDatasetRepoManager(token=token)
manager.publish_packs(
repo_id=repo_id,
pack_names=list(pack_names),
source_dir=Path(source_dir),
target_prefix=target_prefix,
update_dataset_card=update_dataset_card,
)
click.echo(f"[OK] Uploaded {len(pack_names)} pack(s) to {repo_id}")
class HFWarblerIngestor:
"""Runtime ingestion class for downloading datasets at app launch."""
def __init__(self, packs_dir: Path = None, verbose: bool = True):
"""Initialize the ingestor."""
if packs_dir is None:
packs_dir = PACKS_DIR
self.packs_dir = Path(packs_dir)
self.verbose = verbose
self.builder = WarblerPackBuilder(self.packs_dir)
def ingest_dataset(self, dataset_name: str, pack_prefix: str = "warbler-pack-hf",
arxiv_limit: Optional[int] = None, max_docs_per_chunk: int = 50000,
max_pdf_pages: Optional[int] = None) -> bool:
"""Ingest a specific dataset.
Args:
dataset_name: Name of dataset to ingest
pack_prefix: Prefix for pack names
arxiv_limit: Limit for arXiv papers
max_docs_per_chunk: Chunking configuration
max_pdf_pages: PDF extraction limit
Returns:
True if ingestion successful, False otherwise
"""
try:
if self.verbose:
print(f"📦 Processing {dataset_name}...")
docs = None
pack_name = None
if dataset_name == "npc-dialogue":
transformer = NPCDialogueTransformer(max_pdf_pages=max_pdf_pages)
docs = transformer.transform()
pack_name = f"{pack_prefix}-npc-dialogue"
elif dataset_name == "multi-character":
transformer = MultiCharacterTransformer(max_pdf_pages=max_pdf_pages)
docs = transformer.transform()
pack_name = f"{pack_prefix}-multi-character"
elif dataset_name == "system-chat":
transformer = SystemChatTransformer(max_pdf_pages=max_pdf_pages)
docs = transformer.transform()
pack_name = f"{pack_prefix}-system-chat"
elif dataset_name == "arxiv":
transformer = ArxivTransformer(max_pdf_pages=max_pdf_pages)
docs = transformer.transform(limit=arxiv_limit)
pack_name = f"{pack_prefix}-arxiv"
elif dataset_name == "prompt-report":
transformer = PromptReportTransformer(max_pdf_pages=max_pdf_pages)
docs = transformer.transform()
pack_name = f"{pack_prefix}-prompt-report"
elif dataset_name == "novels":
transformer = NovelsTransformer(max_pdf_pages=max_pdf_pages)
docs = transformer.transform()
pack_name = f"{pack_prefix}-novels"
elif dataset_name == "manuals":
transformer = ManualsTransformer(max_pdf_pages=max_pdf_pages)
docs = transformer.transform()
pack_name = f"{pack_prefix}-manuals"
elif dataset_name == "enterprise":
transformer = EnterpriseTransformer(max_pdf_pages=max_pdf_pages)
docs = transformer.transform()
pack_name = f"{pack_prefix}-enterprise"
elif dataset_name == "portuguese-edu":
transformer = PortugueseEducationTransformer(max_pdf_pages=max_pdf_pages)
docs = transformer.transform()
pack_name = f"{pack_prefix}-portuguese-edu"
elif dataset_name == "edustories":
transformer = EdustoriesTransformer(max_pdf_pages=max_pdf_pages)
docs = transformer.transform()
pack_name = f"{pack_prefix}-edustories"
else:
if self.verbose:
print(f"❌ Unknown dataset: {dataset_name}")
return False
if docs:
chunk_size = max_docs_per_chunk if max_docs_per_chunk > 0 else float("inf")
pack_path = self.builder.create_pack(docs, pack_name, max_docs_per_chunk=chunk_size)
if self.verbose:
print(f"✅ {dataset_name}: {len(docs)} documents -> {pack_path}")
return True
else:
if self.verbose:
print(f"⚠️ No documents found for {dataset_name}")
return False
except Exception as e:
if self.verbose:
print(f"❌ Error processing {dataset_name}: {e}")
return False
if __name__ == "__main__":
cli()