Spaces:
Sleeping
Sleeping
File size: 5,867 Bytes
f884e6e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
#!/usr/bin/env python3
"""
HF Spaces Document Processing Pipeline
Processes synthetic_policies documents and stores embeddings in HF Dataset
"""
import hashlib
import logging
from pathlib import Path
from src.embedding.hf_embedding_service import HFEmbeddingService
# Import your existing services
from src.ingestion.document_chunker import DocumentChunker
from src.ingestion.document_parser import DocumentParser
from src.vector_store.hf_dataset_store import HFDatasetVectorStore
class HFDocumentPipeline:
"""
Free-tier document processing pipeline for HF Spaces
"""
def __init__(self):
self.document_parser = DocumentParser()
self.document_chunker = DocumentChunker(chunk_size=500, overlap=50)
self.embedding_service = HFEmbeddingService()
self.vector_store = HFDatasetVectorStore()
def process_synthetic_policies(self, policies_dir: str = "synthetic_policies"):
"""
Process all policy documents and store embeddings
"""
logging.info("π Starting synthetic policies processing...")
policy_files = list(Path(policies_dir).glob("*.txt"))
if not policy_files:
policy_files = list(Path(policies_dir).glob("*.md"))
if not policy_files:
logging.warning(f"β οΈ No policy files found in {policies_dir}")
return
logging.info(f"π Found {len(policy_files)} policy files to process")
all_documents = []
all_embeddings = []
all_metadata = []
for idx, policy_file in enumerate(policy_files, 1):
try:
logging.info(f"π Processing file {idx}/{len(policy_files)}: {policy_file.name}")
# Read document
with open(policy_file, "r", encoding="utf-8") as f:
content = f.read()
logging.info(f"π Document length: {len(content)} characters")
# Parse document to get structured data (parser validates/throws on errors)
_ = self.document_parser.parse_document(str(policy_file))
# Chunk the document using the proper chunker
chunks = self.document_chunker.chunk_document(
text=content,
doc_metadata={
"filename": policy_file.name,
"source": str(policy_file),
},
)
logging.info(f"βοΈ Created {len(chunks)} chunks from {policy_file.name}")
# Process in batches to stay within memory limits
batch_size = 10 # Small batches for free tier
total_batches = (len(chunks) + batch_size - 1) // batch_size
for batch_idx in range(0, len(chunks), batch_size):
batch_num = (batch_idx // batch_size) + 1
batch_chunks = chunks[batch_idx : batch_idx + batch_size]
batch_texts = [chunk["content"] for chunk in batch_chunks]
logging.info(f"π Processing batch {batch_num}/{total_batches} ({len(batch_texts)} chunks)")
# Generate embeddings using HF API
try:
batch_embeddings = self.embedding_service.get_embeddings(batch_texts)
logging.info(f"β
Generated {len(batch_embeddings) if batch_embeddings else 0} embeddings")
except Exception as e:
logging.error(f"β Embedding generation failed for batch {batch_num}: {e}")
continue
if batch_embeddings:
all_documents.extend(batch_texts)
all_embeddings.extend(batch_embeddings)
# Create metadata from chunk metadata
for chunk in batch_chunks:
metadata = {
"source_file": policy_file.name,
"chunk_id": chunk["metadata"].get("chunk_id", ""),
"chunk_index": chunk["metadata"].get("chunk_index", 0),
"content_hash": hashlib.md5(chunk["content"].encode()).hexdigest(),
}
all_metadata.append(metadata)
logging.info(f"β
Completed {policy_file.name}: {len(chunks)} chunks processed")
except Exception as e:
logging.error(f"β Error processing {policy_file}: {e}")
# Save all embeddings to HF Dataset
if all_embeddings:
logging.info(f"πΎ Saving {len(all_embeddings)} total embeddings to HF Dataset...")
try:
self.vector_store.save_embeddings(all_documents, all_embeddings, all_metadata)
logging.info(f"π Pipeline complete: {len(all_embeddings)} total embeddings saved successfully!")
except Exception as e:
logging.error(f"β Failed to save embeddings: {e}")
else:
logging.warning("β οΈ No embeddings generated - pipeline completed with no results!")
def main():
"""Run the document processing pipeline"""
pipeline = HFDocumentPipeline()
pipeline.process_synthetic_policies()
def run_hf_pipeline():
"""Entry point for HF document processing pipeline - called from app startup"""
try:
logging.info("Starting HF document processing pipeline from app startup...")
pipeline = HFDocumentPipeline()
pipeline.process_synthetic_policies()
logging.info("HF pipeline completed successfully")
return True
except Exception as e:
logging.error(f"HF pipeline failed: {e}")
return False
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
main()
|