ai-engineering-project / scripts /hf_process_documents.py
GitHub Action
Clean deployment without binary files
f884e6e
#!/usr/bin/env python3
"""
HF Spaces Document Processing Pipeline
Processes synthetic_policies documents and stores embeddings in HF Dataset
"""
import hashlib
import logging
from pathlib import Path
from src.embedding.hf_embedding_service import HFEmbeddingService
# Import your existing services
from src.ingestion.document_chunker import DocumentChunker
from src.ingestion.document_parser import DocumentParser
from src.vector_store.hf_dataset_store import HFDatasetVectorStore
class HFDocumentPipeline:
"""
Free-tier document processing pipeline for HF Spaces
"""
def __init__(self):
self.document_parser = DocumentParser()
self.document_chunker = DocumentChunker(chunk_size=500, overlap=50)
self.embedding_service = HFEmbeddingService()
self.vector_store = HFDatasetVectorStore()
def process_synthetic_policies(self, policies_dir: str = "synthetic_policies"):
"""
Process all policy documents and store embeddings
"""
logging.info("πŸš€ Starting synthetic policies processing...")
policy_files = list(Path(policies_dir).glob("*.txt"))
if not policy_files:
policy_files = list(Path(policies_dir).glob("*.md"))
if not policy_files:
logging.warning(f"⚠️ No policy files found in {policies_dir}")
return
logging.info(f"πŸ“ Found {len(policy_files)} policy files to process")
all_documents = []
all_embeddings = []
all_metadata = []
for idx, policy_file in enumerate(policy_files, 1):
try:
logging.info(f"πŸ“„ Processing file {idx}/{len(policy_files)}: {policy_file.name}")
# Read document
with open(policy_file, "r", encoding="utf-8") as f:
content = f.read()
logging.info(f"πŸ“ Document length: {len(content)} characters")
# Parse document to get structured data (parser validates/throws on errors)
_ = self.document_parser.parse_document(str(policy_file))
# Chunk the document using the proper chunker
chunks = self.document_chunker.chunk_document(
text=content,
doc_metadata={
"filename": policy_file.name,
"source": str(policy_file),
},
)
logging.info(f"βœ‚οΈ Created {len(chunks)} chunks from {policy_file.name}")
# Process in batches to stay within memory limits
batch_size = 10 # Small batches for free tier
total_batches = (len(chunks) + batch_size - 1) // batch_size
for batch_idx in range(0, len(chunks), batch_size):
batch_num = (batch_idx // batch_size) + 1
batch_chunks = chunks[batch_idx : batch_idx + batch_size]
batch_texts = [chunk["content"] for chunk in batch_chunks]
logging.info(f"πŸ”„ Processing batch {batch_num}/{total_batches} ({len(batch_texts)} chunks)")
# Generate embeddings using HF API
try:
batch_embeddings = self.embedding_service.get_embeddings(batch_texts)
logging.info(f"βœ… Generated {len(batch_embeddings) if batch_embeddings else 0} embeddings")
except Exception as e:
logging.error(f"❌ Embedding generation failed for batch {batch_num}: {e}")
continue
if batch_embeddings:
all_documents.extend(batch_texts)
all_embeddings.extend(batch_embeddings)
# Create metadata from chunk metadata
for chunk in batch_chunks:
metadata = {
"source_file": policy_file.name,
"chunk_id": chunk["metadata"].get("chunk_id", ""),
"chunk_index": chunk["metadata"].get("chunk_index", 0),
"content_hash": hashlib.md5(chunk["content"].encode()).hexdigest(),
}
all_metadata.append(metadata)
logging.info(f"βœ… Completed {policy_file.name}: {len(chunks)} chunks processed")
except Exception as e:
logging.error(f"❌ Error processing {policy_file}: {e}")
# Save all embeddings to HF Dataset
if all_embeddings:
logging.info(f"πŸ’Ύ Saving {len(all_embeddings)} total embeddings to HF Dataset...")
try:
self.vector_store.save_embeddings(all_documents, all_embeddings, all_metadata)
logging.info(f"πŸŽ‰ Pipeline complete: {len(all_embeddings)} total embeddings saved successfully!")
except Exception as e:
logging.error(f"❌ Failed to save embeddings: {e}")
else:
logging.warning("⚠️ No embeddings generated - pipeline completed with no results!")
def main():
"""Run the document processing pipeline"""
pipeline = HFDocumentPipeline()
pipeline.process_synthetic_policies()
def run_hf_pipeline():
"""Entry point for HF document processing pipeline - called from app startup"""
try:
logging.info("Starting HF document processing pipeline from app startup...")
pipeline = HFDocumentPipeline()
pipeline.process_synthetic_policies()
logging.info("HF pipeline completed successfully")
return True
except Exception as e:
logging.error(f"HF pipeline failed: {e}")
return False
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
main()