Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| HF Spaces Document Processing Pipeline | |
| Processes synthetic_policies documents and stores embeddings in HF Dataset | |
| """ | |
| import hashlib | |
| import logging | |
| from pathlib import Path | |
| from src.embedding.hf_embedding_service import HFEmbeddingService | |
| # Import your existing services | |
| from src.ingestion.document_chunker import DocumentChunker | |
| from src.ingestion.document_parser import DocumentParser | |
| from src.vector_store.hf_dataset_store import HFDatasetVectorStore | |
| class HFDocumentPipeline: | |
| """ | |
| Free-tier document processing pipeline for HF Spaces | |
| """ | |
| def __init__(self): | |
| self.document_parser = DocumentParser() | |
| self.document_chunker = DocumentChunker(chunk_size=500, overlap=50) | |
| self.embedding_service = HFEmbeddingService() | |
| self.vector_store = HFDatasetVectorStore() | |
| def process_synthetic_policies(self, policies_dir: str = "synthetic_policies"): | |
| """ | |
| Process all policy documents and store embeddings | |
| """ | |
| logging.info("π Starting synthetic policies processing...") | |
| policy_files = list(Path(policies_dir).glob("*.txt")) | |
| if not policy_files: | |
| policy_files = list(Path(policies_dir).glob("*.md")) | |
| if not policy_files: | |
| logging.warning(f"β οΈ No policy files found in {policies_dir}") | |
| return | |
| logging.info(f"π Found {len(policy_files)} policy files to process") | |
| all_documents = [] | |
| all_embeddings = [] | |
| all_metadata = [] | |
| for idx, policy_file in enumerate(policy_files, 1): | |
| try: | |
| logging.info(f"π Processing file {idx}/{len(policy_files)}: {policy_file.name}") | |
| # Read document | |
| with open(policy_file, "r", encoding="utf-8") as f: | |
| content = f.read() | |
| logging.info(f"π Document length: {len(content)} characters") | |
| # Parse document to get structured data (parser validates/throws on errors) | |
| _ = self.document_parser.parse_document(str(policy_file)) | |
| # Chunk the document using the proper chunker | |
| chunks = self.document_chunker.chunk_document( | |
| text=content, | |
| doc_metadata={ | |
| "filename": policy_file.name, | |
| "source": str(policy_file), | |
| }, | |
| ) | |
| logging.info(f"βοΈ Created {len(chunks)} chunks from {policy_file.name}") | |
| # Process in batches to stay within memory limits | |
| batch_size = 10 # Small batches for free tier | |
| total_batches = (len(chunks) + batch_size - 1) // batch_size | |
| for batch_idx in range(0, len(chunks), batch_size): | |
| batch_num = (batch_idx // batch_size) + 1 | |
| batch_chunks = chunks[batch_idx : batch_idx + batch_size] | |
| batch_texts = [chunk["content"] for chunk in batch_chunks] | |
| logging.info(f"π Processing batch {batch_num}/{total_batches} ({len(batch_texts)} chunks)") | |
| # Generate embeddings using HF API | |
| try: | |
| batch_embeddings = self.embedding_service.get_embeddings(batch_texts) | |
| logging.info(f"β Generated {len(batch_embeddings) if batch_embeddings else 0} embeddings") | |
| except Exception as e: | |
| logging.error(f"β Embedding generation failed for batch {batch_num}: {e}") | |
| continue | |
| if batch_embeddings: | |
| all_documents.extend(batch_texts) | |
| all_embeddings.extend(batch_embeddings) | |
| # Create metadata from chunk metadata | |
| for chunk in batch_chunks: | |
| metadata = { | |
| "source_file": policy_file.name, | |
| "chunk_id": chunk["metadata"].get("chunk_id", ""), | |
| "chunk_index": chunk["metadata"].get("chunk_index", 0), | |
| "content_hash": hashlib.md5(chunk["content"].encode()).hexdigest(), | |
| } | |
| all_metadata.append(metadata) | |
| logging.info(f"β Completed {policy_file.name}: {len(chunks)} chunks processed") | |
| except Exception as e: | |
| logging.error(f"β Error processing {policy_file}: {e}") | |
| # Save all embeddings to HF Dataset | |
| if all_embeddings: | |
| logging.info(f"πΎ Saving {len(all_embeddings)} total embeddings to HF Dataset...") | |
| try: | |
| self.vector_store.save_embeddings(all_documents, all_embeddings, all_metadata) | |
| logging.info(f"π Pipeline complete: {len(all_embeddings)} total embeddings saved successfully!") | |
| except Exception as e: | |
| logging.error(f"β Failed to save embeddings: {e}") | |
| else: | |
| logging.warning("β οΈ No embeddings generated - pipeline completed with no results!") | |
| def main(): | |
| """Run the document processing pipeline""" | |
| pipeline = HFDocumentPipeline() | |
| pipeline.process_synthetic_policies() | |
| def run_hf_pipeline(): | |
| """Entry point for HF document processing pipeline - called from app startup""" | |
| try: | |
| logging.info("Starting HF document processing pipeline from app startup...") | |
| pipeline = HFDocumentPipeline() | |
| pipeline.process_synthetic_policies() | |
| logging.info("HF pipeline completed successfully") | |
| return True | |
| except Exception as e: | |
| logging.error(f"HF pipeline failed: {e}") | |
| return False | |
| if __name__ == "__main__": | |
| logging.basicConfig(level=logging.INFO) | |
| main() | |