devjhawar's picture
Upload folder using huggingface_hub
aa76de3 verified
"""CLI entry-point for the RAG ingestion pipeline.
Usage:
python rag_engine/main.py --dry-run
python rag_engine/main.py --pdf path/to/policy.pdf --policy-id POL-001
"""
import sys
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
from __future__ import annotations
import argparse
import collections
import sys
from pathlib import Path
from typing import List, Tuple
from rag_engine.chunking.clause_chunker import ClauseChunker
from rag_engine.ingestion.cleaner import DocumentCleaner
from rag_engine.schemas.chunk_metadata import ChunkMetadata
from rag_engine.utils.logger import get_logger
logger = get_logger(__name__)
# ---------------------------------------------------------------------------
# Hardcoded sample for --dry-run (no LlamaParse API call required)
# ---------------------------------------------------------------------------
_DRY_RUN_SAMPLE = """\
## SECTION 1 - DEFINITIONS
1.1 Insured means the person named in the Schedule.
1.2 Deductible means the first amount of any claim payable.
## SECTION 2 - COVERAGE
2.1 We will pay for loss caused by fire, smoke, or explosion.
2.2 We will pay for theft subject to the deductible in Schedule A.
## SECTION 3 - EXCLUSIONS
3.1 This policy does not cover flood or surface water damage.
3.2 This policy does not cover earthquake or earth movement.
## SECTION 4 - SCHEDULE OF DEDUCTIBLES
| Event | Deductible |
|--------|------------|
| Fire | $500 |
| Theft | $1,000 |
"""
# ---------------------------------------------------------------------------
# Result printer
# ---------------------------------------------------------------------------
def _print_results(
policy_id: str,
chunks: List[Tuple[str, ChunkMetadata]],
) -> None:
"""Pretty-print chunk summary to stdout."""
print(f"\n{'=' * 60}")
print(f" Policy: {policy_id} | Total chunks: {len(chunks)}")
print(f"{'=' * 60}")
# Clause-type distribution
counter = collections.Counter(meta.clause_type.value for _, meta in chunks)
print("\nClause-type distribution:")
for ctype, count in counter.most_common():
print(f" {ctype:<20s} {count}")
# First 5 chunks
print(f"\n{'—' * 60}")
for text, meta in chunks[:5]:
preview = text[:200].replace("\n", " ")
print(
f" [{meta.chunk_index}] section={meta.section_name!r}\n"
f" clause_type={meta.clause_type.value} "
f"coverage_category={meta.coverage_category}\n"
f" tokens={meta.token_count} "
f"deductible_related={meta.deductible_related} "
f"limit_related={meta.limit_related} "
f"table_chunk={meta.table_chunk}\n"
f" text={preview!r}\n"
)
if len(chunks) > 5:
print(f" ... and {len(chunks) - 5} more chunks\n")
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def _build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="RAG Engine — ingestion pipeline CLI",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Run cleaner + chunker on a hardcoded sample (no LlamaParse call).",
)
parser.add_argument(
"--pdf",
type=str,
default=None,
help="Path to a PDF file to ingest.",
)
parser.add_argument(
"--policy-id",
type=str,
default=None,
help="Policy identifier (required with --pdf).",
)
return parser
def main() -> None:
parser = _build_parser()
args = parser.parse_args()
if args.dry_run:
logger.info("=== DRY-RUN MODE ===")
cleaner = DocumentCleaner()
chunker = ClauseChunker()
clean_text = cleaner.clean(_DRY_RUN_SAMPLE)
chunks = chunker.chunk(clean_text, "DRY-RUN-001", "dry_run_sample.md")
_print_results(policy_id="DRY-RUN-001", chunks=chunks)
elif args.pdf:
if not args.policy_id:
parser.error("--policy-id is required when using --pdf")
from rag_engine.ingestion.pipeline import IngestionPipeline
pipeline = IngestionPipeline()
chunks = pipeline.run(pdf_path=args.pdf, policy_id=args.policy_id)
_print_results(policy_id=args.policy_id, chunks=chunks)
else:
parser.print_help()
sys.exit(1)
if __name__ == "__main__":
main()