Spaces:
Running
Running
| """CLI: Full ingestion pipeline. | |
| Loads papers from a data source, stores in SQLite, chunks them, | |
| generates embeddings, and stores in ChromaDB. | |
| Usage: | |
| python scripts/ingest.py --source hf --parquet-path data/raw/acl-publication-info.74k.parquet | |
| python scripts/ingest.py --source hf --parquet-path data/raw/acl-publication-info.74k.parquet --year-from 2018 --year-to 2022 --max-papers 5000 | |
| python scripts/ingest.py --source acl --year-from 2023 --year-to 2025 | |
| """ | |
| import argparse | |
| import logging | |
| import sys | |
| from pathlib import Path | |
| sys.path.insert(0, str(Path(__file__).parent.parent)) | |
| from src.config import get_config | |
| from src.ingestion.base_loader import DataLoader | |
| from src.ingestion.chunking import chunk_papers | |
| from src.ingestion.embeddings import EmbeddingGenerator | |
| from src.ingestion.load_acl_anthology import ACLAnthologyLoader | |
| from src.ingestion.load_hf_data import HFDataLoader | |
| from src.storage.chroma_store import ChromaStore | |
| from src.storage.sqlite_db import SQLiteDB | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", | |
| ) | |
| logger = logging.getLogger(__name__) | |
| def get_loader(args, config) -> DataLoader: | |
| """Create the appropriate data loader from CLI args.""" | |
| if args.source == "hf": | |
| if not args.parquet_path: | |
| raise ValueError("--parquet-path is required for HuggingFace source") | |
| return HFDataLoader(args.parquet_path) | |
| elif args.source == "acl": | |
| return ACLAnthologyLoader() | |
| else: | |
| raise ValueError(f"Unknown source: {args.source}") | |
| def main(): | |
| parser = argparse.ArgumentParser(description="ResearchRadar ingestion pipeline") | |
| parser.add_argument( | |
| "--source", required=True, choices=["hf", "acl"], | |
| help="Data source: 'hf' (HuggingFace parquet) or 'acl' (ACL Anthology package)", | |
| ) | |
| parser.add_argument("--parquet-path", type=str, help="Path to HF parquet file") | |
| parser.add_argument("--year-from", type=int, default=None) | |
| parser.add_argument("--year-to", type=int, default=None) | |
| parser.add_argument( | |
| "--venues", type=str, nargs="+", default=None, | |
| help="Venue filter (e.g., acl emnlp naacl)", | |
| ) | |
| parser.add_argument("--max-papers", type=int, default=None) | |
| parser.add_argument( | |
| "--chunk-strategy", choices=["abstract", "fixed", "section"], default="abstract", | |
| ) | |
| parser.add_argument("--skip-embeddings", action="store_true", help="Skip embedding generation") | |
| args = parser.parse_args() | |
| config = get_config() | |
| # 1. Load papers | |
| logger.info("=== Step 1: Loading papers ===") | |
| loader = get_loader(args, config) | |
| papers = loader.load( | |
| year_from=args.year_from, | |
| year_to=args.year_to, | |
| venues=args.venues, | |
| max_papers=args.max_papers, | |
| ) | |
| logger.info("Loaded %d papers from %s", len(papers), loader.source_name) | |
| if not papers: | |
| logger.warning("No papers loaded. Exiting.") | |
| return | |
| # 2. Store in SQLite | |
| logger.info("=== Step 2: Storing in SQLite ===") | |
| db = SQLiteDB(config.sqlite_db_path) | |
| db.create_schema() | |
| db.insert_papers(papers) | |
| logger.info("SQLite: %d papers stored", db.get_paper_count()) | |
| # 3. Chunk papers | |
| logger.info("=== Step 3: Chunking papers (strategy=%s) ===", args.chunk_strategy) | |
| chunks = chunk_papers(papers, strategy=args.chunk_strategy) | |
| db.insert_chunks(chunks) | |
| logger.info("SQLite: %d chunks stored", db.get_chunk_count()) | |
| # 4. Generate embeddings and store in ChromaDB | |
| if not args.skip_embeddings: | |
| logger.info("=== Step 4: Generating embeddings ===") | |
| chroma = ChromaStore(config.chroma_db_path) | |
| embedder = EmbeddingGenerator(config.embedding_model) | |
| # Retrieve chunks with metadata for ChromaDB | |
| chunks_with_meta = db.get_all_chunks() | |
| embedder.embed_and_store(chunks_with_meta, chroma) | |
| logger.info("ChromaDB: %d embeddings stored", chroma.count()) | |
| else: | |
| logger.info("=== Step 4: Skipped (--skip-embeddings) ===") | |
| logger.info("=== Ingestion complete ===") | |
| logger.info("Papers: %d | Chunks: %d", db.get_paper_count(), db.get_chunk_count()) | |
| if __name__ == "__main__": | |
| main() | |