Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| HuggingFace Warbler Pack Ingestion Pipeline. | |
| Imports HF datasets and transforms them into Warbler-compatible packs | |
| for NPC intelligence training via the magma layer self-training system. | |
| """ | |
| import logging | |
| import tempfile | |
| from pathlib import Path | |
| from typing import List, Optional | |
| import click | |
| try: | |
| from huggingface_hub import HfApi | |
| except ImportError: # pragma: no cover - optional at import time for tests/environments | |
| HfApi = None | |
| from .transformers import ( | |
| NPCDialogueTransformer, | |
| MultiCharacterTransformer, | |
| SystemChatTransformer, | |
| ArxivTransformer, | |
| PromptReportTransformer, | |
| NovelsTransformer, | |
| ManualsTransformer, | |
| EnterpriseTransformer, | |
| PortugueseEducationTransformer, | |
| EdustoriesTransformer, | |
| WarblerPackBuilder, | |
| ) | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| BASE_DIR = Path(__file__).resolve().parent | |
| PACKS_DIR = BASE_DIR.parents[1] / "packs" | |
| def _require_hf_api(): | |
| """Return HfApi or raise a clear CLI error if unavailable.""" | |
| if HfApi is None: | |
| raise click.ClickException( | |
| "huggingface_hub is required for dataset repo operations. " | |
| "Install dependencies and try again." | |
| ) | |
| return HfApi | |
| def _dataset_card(repo_id: str, pack_names: List[str], source_dir: Path) -> str: | |
| """Create a basic dataset card for a Warbler dataset repo.""" | |
| pack_list = "\n".join(f"- {pack_name}" for pack_name in pack_names) or "- none specified" | |
| return f"""--- | |
| language: | |
| - en | |
| license: mit | |
| task_categories: | |
| - text-retrieval | |
| pretty_name: {repo_id.split('/')[-1]} | |
| size_categories: | |
| - unknown | |
| --- | |
| # {repo_id.split('/')[-1]} | |
| Warbler CDA dataset repository containing pack artifacts for hosted retrieval. | |
| ## Included packs | |
| {pack_list} | |
| ## Source | |
| Generated from Warbler CDA using the local source directory: | |
| `{source_dir}` | |
| ## Notes | |
| - This repo is intended to store retrieval corpora separately from the Space repo. | |
| - Pack files are uploaded as Warbler-compatible directories with `package.json` metadata and JSONL content shards. | |
| """ | |
| class HFDatasetRepoManager: | |
| """Create and publish Warbler dataset repositories on Hugging Face.""" | |
| def __init__(self, token: Optional[str] = None): | |
| api_class = _require_hf_api() | |
| self.api = api_class(token=token) | |
| def create_dataset_repo( | |
| self, | |
| repo_id: str, | |
| private: bool = False, | |
| exist_ok: bool = True, | |
| write_dataset_card: bool = True, | |
| ) -> str: | |
| """Create a dataset repository and optionally seed a dataset card.""" | |
| repo_url = self.api.create_repo( | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| private=private, | |
| exist_ok=exist_ok, | |
| ) | |
| if write_dataset_card: | |
| with tempfile.NamedTemporaryFile("w", suffix=".md", delete=False, encoding="utf-8") as handle: | |
| handle.write(_dataset_card(repo_id, [], PACKS_DIR)) | |
| temp_path = Path(handle.name) | |
| try: | |
| self.api.upload_file( | |
| path_or_fileobj=str(temp_path), | |
| path_in_repo="README.md", | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| commit_message="Initialize Warbler dataset card", | |
| ) | |
| finally: | |
| temp_path.unlink(missing_ok=True) | |
| return repo_url.repo_id if hasattr(repo_url, "repo_id") else str(repo_url) | |
| def publish_packs( | |
| self, | |
| repo_id: str, | |
| pack_names: List[str], | |
| source_dir: Path, | |
| target_prefix: str = "packs", | |
| update_dataset_card: bool = True, | |
| ) -> None: | |
| """Upload one or more pack directories into a dataset repository.""" | |
| missing_packs = [pack_name for pack_name in pack_names if not (source_dir / pack_name).exists()] | |
| if missing_packs: | |
| raise click.ClickException(f"Pack directories not found: {', '.join(missing_packs)}") | |
| for pack_name in pack_names: | |
| pack_dir = source_dir / pack_name | |
| path_in_repo = f"{target_prefix}/{pack_name}" if target_prefix else pack_name | |
| self.api.upload_folder( | |
| folder_path=str(pack_dir), | |
| path_in_repo=path_in_repo, | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| commit_message=f"Upload {pack_name}", | |
| ) | |
| if update_dataset_card: | |
| with tempfile.NamedTemporaryFile("w", suffix=".md", delete=False, encoding="utf-8") as handle: | |
| handle.write(_dataset_card(repo_id, pack_names, source_dir)) | |
| temp_path = Path(handle.name) | |
| try: | |
| self.api.upload_file( | |
| path_or_fileobj=str(temp_path), | |
| path_in_repo="README.md", | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| commit_message="Update dataset card", | |
| ) | |
| finally: | |
| temp_path.unlink(missing_ok=True) | |
| def cli(): | |
| """HuggingFace Warbler Pack Ingestion Tool.""" | |
| pass | |
| def ingest(datasets, pack_prefix, arxiv_limit, max_docs_per_chunk, max_pdf_pages): | |
| """Ingest HF datasets into Warbler packs.""" | |
| PACKS_DIR.mkdir(exist_ok=True, parents=True) | |
| builder = WarblerPackBuilder(PACKS_DIR) | |
| results = {} | |
| if "all" in datasets: | |
| datasets = [ | |
| "arxiv", | |
| "prompt-report", | |
| "novels", | |
| "manuals", | |
| "enterprise", | |
| "portuguese-edu", | |
| "edustories", | |
| ] | |
| if max_docs_per_chunk > 0: | |
| click.echo(f"[PACK] Chunking enabled: {max_docs_per_chunk} documents per chunk") | |
| else: | |
| click.echo("[PACK] Chunking disabled: single file per pack") | |
| if max_pdf_pages is not None: | |
| click.echo(f"[PDF] PDF extraction limit: {max_pdf_pages} pages") | |
| else: | |
| click.echo("[PDF] PDF extraction: unlimited pages") | |
| click.echo() | |
| for dataset in datasets: | |
| click.echo(f"\n[*] Processing {dataset}...") | |
| try: | |
| docs = None | |
| pack_name = None | |
| if dataset == "npc-dialogue": | |
| transformer = NPCDialogueTransformer(max_pdf_pages=max_pdf_pages) | |
| docs = transformer.transform() | |
| pack_name = f"{pack_prefix}-npc-dialogue" | |
| elif dataset == "multi-character": | |
| transformer = MultiCharacterTransformer(max_pdf_pages=max_pdf_pages) | |
| docs = transformer.transform() | |
| pack_name = f"{pack_prefix}-multi-character" | |
| elif dataset == "system-chat": | |
| transformer = SystemChatTransformer(max_pdf_pages=max_pdf_pages) | |
| docs = transformer.transform() | |
| pack_name = f"{pack_prefix}-system-chat" | |
| elif dataset == "arxiv": | |
| transformer = ArxivTransformer(max_pdf_pages=max_pdf_pages) | |
| docs = transformer.transform(limit=arxiv_limit) | |
| pack_name = f"{pack_prefix}-arxiv" | |
| elif dataset == "prompt-report": | |
| transformer = PromptReportTransformer(max_pdf_pages=max_pdf_pages) | |
| docs = transformer.transform() | |
| pack_name = f"{pack_prefix}-prompt-report" | |
| elif dataset == "novels": | |
| transformer = NovelsTransformer(max_pdf_pages=max_pdf_pages) | |
| docs = transformer.transform() | |
| pack_name = f"{pack_prefix}-novels" | |
| elif dataset == "manuals": | |
| transformer = ManualsTransformer(max_pdf_pages=max_pdf_pages) | |
| docs = transformer.transform() | |
| pack_name = f"{pack_prefix}-manuals" | |
| elif dataset == "enterprise": | |
| transformer = EnterpriseTransformer(max_pdf_pages=max_pdf_pages) | |
| docs = transformer.transform() | |
| pack_name = f"{pack_prefix}-enterprise" | |
| elif dataset == "portuguese-edu": | |
| transformer = PortugueseEducationTransformer(max_pdf_pages=max_pdf_pages) | |
| docs = transformer.transform() | |
| pack_name = f"{pack_prefix}-portuguese-edu" | |
| elif dataset == "edustories": | |
| transformer = EdustoriesTransformer(max_pdf_pages=max_pdf_pages) | |
| docs = transformer.transform() | |
| pack_name = f"{pack_prefix}-edustories" | |
| else: | |
| click.echo(f"[ERROR] Unknown dataset: {dataset}") | |
| continue | |
| if docs: | |
| chunk_size = max_docs_per_chunk if max_docs_per_chunk > 0 else float("inf") | |
| pack_path = builder.create_pack(docs, pack_name, max_docs_per_chunk=chunk_size) | |
| results[dataset] = {"documents": len(docs), "pack_path": pack_path} | |
| click.echo(f"[OK] {dataset}: {len(docs)} documents -> {pack_path}") | |
| except Exception as e: | |
| click.echo(f"[ERROR] Error processing {dataset}: {str(e)}") | |
| if results: | |
| report_file = builder.save_report(results) | |
| click.echo("\n[SUCCESS] Ingestion Complete!") | |
| click.echo(f"[STATS] Total Documents: {sum(r['documents'] for r in results.values())}") | |
| click.echo(f"[STATS] Packs Created: {len(results)}") | |
| click.echo(f"[STATS] Report saved to: {report_file}") | |
| else: | |
| click.echo("\n[WARNING] No datasets processed successfully") | |
| def create_dataset_repo(repo_id, private, token, write_dataset_card): | |
| """Create a Hugging Face dataset repo for Warbler corpora.""" | |
| manager = HFDatasetRepoManager(token=token) | |
| created_repo = manager.create_dataset_repo( | |
| repo_id=repo_id, | |
| private=private, | |
| write_dataset_card=write_dataset_card, | |
| ) | |
| click.echo(f"[OK] Dataset repo ready: {created_repo}") | |
| def publish_packs(repo_id, pack_names, source_dir, target_prefix, token, update_dataset_card): | |
| """Upload Warbler pack directories into a Hugging Face dataset repo.""" | |
| manager = HFDatasetRepoManager(token=token) | |
| manager.publish_packs( | |
| repo_id=repo_id, | |
| pack_names=list(pack_names), | |
| source_dir=Path(source_dir), | |
| target_prefix=target_prefix, | |
| update_dataset_card=update_dataset_card, | |
| ) | |
| click.echo(f"[OK] Uploaded {len(pack_names)} pack(s) to {repo_id}") | |
| class HFWarblerIngestor: | |
| """Runtime ingestion class for downloading datasets at app launch.""" | |
| def __init__(self, packs_dir: Path = None, verbose: bool = True): | |
| """Initialize the ingestor.""" | |
| if packs_dir is None: | |
| packs_dir = PACKS_DIR | |
| self.packs_dir = Path(packs_dir) | |
| self.verbose = verbose | |
| self.builder = WarblerPackBuilder(self.packs_dir) | |
| def ingest_dataset(self, dataset_name: str, pack_prefix: str = "warbler-pack-hf", | |
| arxiv_limit: Optional[int] = None, max_docs_per_chunk: int = 50000, | |
| max_pdf_pages: Optional[int] = None) -> bool: | |
| """Ingest a specific dataset. | |
| Args: | |
| dataset_name: Name of dataset to ingest | |
| pack_prefix: Prefix for pack names | |
| arxiv_limit: Limit for arXiv papers | |
| max_docs_per_chunk: Chunking configuration | |
| max_pdf_pages: PDF extraction limit | |
| Returns: | |
| True if ingestion successful, False otherwise | |
| """ | |
| try: | |
| if self.verbose: | |
| print(f"📦 Processing {dataset_name}...") | |
| docs = None | |
| pack_name = None | |
| if dataset_name == "npc-dialogue": | |
| transformer = NPCDialogueTransformer(max_pdf_pages=max_pdf_pages) | |
| docs = transformer.transform() | |
| pack_name = f"{pack_prefix}-npc-dialogue" | |
| elif dataset_name == "multi-character": | |
| transformer = MultiCharacterTransformer(max_pdf_pages=max_pdf_pages) | |
| docs = transformer.transform() | |
| pack_name = f"{pack_prefix}-multi-character" | |
| elif dataset_name == "system-chat": | |
| transformer = SystemChatTransformer(max_pdf_pages=max_pdf_pages) | |
| docs = transformer.transform() | |
| pack_name = f"{pack_prefix}-system-chat" | |
| elif dataset_name == "arxiv": | |
| transformer = ArxivTransformer(max_pdf_pages=max_pdf_pages) | |
| docs = transformer.transform(limit=arxiv_limit) | |
| pack_name = f"{pack_prefix}-arxiv" | |
| elif dataset_name == "prompt-report": | |
| transformer = PromptReportTransformer(max_pdf_pages=max_pdf_pages) | |
| docs = transformer.transform() | |
| pack_name = f"{pack_prefix}-prompt-report" | |
| elif dataset_name == "novels": | |
| transformer = NovelsTransformer(max_pdf_pages=max_pdf_pages) | |
| docs = transformer.transform() | |
| pack_name = f"{pack_prefix}-novels" | |
| elif dataset_name == "manuals": | |
| transformer = ManualsTransformer(max_pdf_pages=max_pdf_pages) | |
| docs = transformer.transform() | |
| pack_name = f"{pack_prefix}-manuals" | |
| elif dataset_name == "enterprise": | |
| transformer = EnterpriseTransformer(max_pdf_pages=max_pdf_pages) | |
| docs = transformer.transform() | |
| pack_name = f"{pack_prefix}-enterprise" | |
| elif dataset_name == "portuguese-edu": | |
| transformer = PortugueseEducationTransformer(max_pdf_pages=max_pdf_pages) | |
| docs = transformer.transform() | |
| pack_name = f"{pack_prefix}-portuguese-edu" | |
| elif dataset_name == "edustories": | |
| transformer = EdustoriesTransformer(max_pdf_pages=max_pdf_pages) | |
| docs = transformer.transform() | |
| pack_name = f"{pack_prefix}-edustories" | |
| else: | |
| if self.verbose: | |
| print(f"❌ Unknown dataset: {dataset_name}") | |
| return False | |
| if docs: | |
| chunk_size = max_docs_per_chunk if max_docs_per_chunk > 0 else float("inf") | |
| pack_path = self.builder.create_pack(docs, pack_name, max_docs_per_chunk=chunk_size) | |
| if self.verbose: | |
| print(f"✅ {dataset_name}: {len(docs)} documents -> {pack_path}") | |
| return True | |
| else: | |
| if self.verbose: | |
| print(f"⚠️ No documents found for {dataset_name}") | |
| return False | |
| except Exception as e: | |
| if self.verbose: | |
| print(f"❌ Error processing {dataset_name}: {e}") | |
| return False | |
| if __name__ == "__main__": | |
| cli() | |