#!/usr/bin/env python3 """ HF Spaces Document Processing Pipeline Processes synthetic_policies documents and stores embeddings in HF Dataset """ import hashlib import logging from pathlib import Path from src.embedding.hf_embedding_service import HFEmbeddingService # Import your existing services from src.ingestion.document_chunker import DocumentChunker from src.ingestion.document_parser import DocumentParser from src.vector_store.hf_dataset_store import HFDatasetVectorStore class HFDocumentPipeline: """ Free-tier document processing pipeline for HF Spaces """ def __init__(self): self.document_parser = DocumentParser() self.document_chunker = DocumentChunker(chunk_size=500, overlap=50) self.embedding_service = HFEmbeddingService() self.vector_store = HFDatasetVectorStore() def process_synthetic_policies(self, policies_dir: str = "synthetic_policies"): """ Process all policy documents and store embeddings """ logging.info("🚀 Starting synthetic policies processing...") policy_files = list(Path(policies_dir).glob("*.txt")) if not policy_files: policy_files = list(Path(policies_dir).glob("*.md")) if not policy_files: logging.warning(f"⚠️ No policy files found in {policies_dir}") return logging.info(f"📁 Found {len(policy_files)} policy files to process") all_documents = [] all_embeddings = [] all_metadata = [] for idx, policy_file in enumerate(policy_files, 1): try: logging.info(f"📄 Processing file {idx}/{len(policy_files)}: {policy_file.name}") # Read document with open(policy_file, "r", encoding="utf-8") as f: content = f.read() logging.info(f"📏 Document length: {len(content)} characters") # Parse document to get structured data (parser validates/throws on errors) _ = self.document_parser.parse_document(str(policy_file)) # Chunk the document using the proper chunker chunks = self.document_chunker.chunk_document( text=content, doc_metadata={ "filename": policy_file.name, "source": str(policy_file), }, ) logging.info(f"✂️ Created {len(chunks)} chunks from {policy_file.name}") # Process in batches to stay within memory limits batch_size = 10 # Small batches for free tier total_batches = (len(chunks) + batch_size - 1) // batch_size for batch_idx in range(0, len(chunks), batch_size): batch_num = (batch_idx // batch_size) + 1 batch_chunks = chunks[batch_idx : batch_idx + batch_size] batch_texts = [chunk["content"] for chunk in batch_chunks] logging.info(f"🔄 Processing batch {batch_num}/{total_batches} ({len(batch_texts)} chunks)") # Generate embeddings using HF API try: batch_embeddings = self.embedding_service.get_embeddings(batch_texts) logging.info(f"✅ Generated {len(batch_embeddings) if batch_embeddings else 0} embeddings") except Exception as e: logging.error(f"❌ Embedding generation failed for batch {batch_num}: {e}") continue if batch_embeddings: all_documents.extend(batch_texts) all_embeddings.extend(batch_embeddings) # Create metadata from chunk metadata for chunk in batch_chunks: metadata = { "source_file": policy_file.name, "chunk_id": chunk["metadata"].get("chunk_id", ""), "chunk_index": chunk["metadata"].get("chunk_index", 0), "content_hash": hashlib.md5(chunk["content"].encode()).hexdigest(), } all_metadata.append(metadata) logging.info(f"✅ Completed {policy_file.name}: {len(chunks)} chunks processed") except Exception as e: logging.error(f"❌ Error processing {policy_file}: {e}") # Save all embeddings to HF Dataset if all_embeddings: logging.info(f"💾 Saving {len(all_embeddings)} total embeddings to HF Dataset...") try: self.vector_store.save_embeddings(all_documents, all_embeddings, all_metadata) logging.info(f"🎉 Pipeline complete: {len(all_embeddings)} total embeddings saved successfully!") except Exception as e: logging.error(f"❌ Failed to save embeddings: {e}") else: logging.warning("⚠️ No embeddings generated - pipeline completed with no results!") def main(): """Run the document processing pipeline""" pipeline = HFDocumentPipeline() pipeline.process_synthetic_policies() def run_hf_pipeline(): """Entry point for HF document processing pipeline - called from app startup""" try: logging.info("Starting HF document processing pipeline from app startup...") pipeline = HFDocumentPipeline() pipeline.process_synthetic_policies() logging.info("HF pipeline completed successfully") return True except Exception as e: logging.error(f"HF pipeline failed: {e}") return False if __name__ == "__main__": logging.basicConfig(level=logging.INFO) main()