File size: 5,867 Bytes
f884e6e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
#!/usr/bin/env python3
"""
HF Spaces Document Processing Pipeline
Processes synthetic_policies documents and stores embeddings in HF Dataset
"""

import hashlib
import logging
from pathlib import Path

from src.embedding.hf_embedding_service import HFEmbeddingService

# Import your existing services
from src.ingestion.document_chunker import DocumentChunker
from src.ingestion.document_parser import DocumentParser
from src.vector_store.hf_dataset_store import HFDatasetVectorStore


class HFDocumentPipeline:
    """
    Free-tier document processing pipeline for HF Spaces
    """

    def __init__(self):
        self.document_parser = DocumentParser()
        self.document_chunker = DocumentChunker(chunk_size=500, overlap=50)
        self.embedding_service = HFEmbeddingService()
        self.vector_store = HFDatasetVectorStore()

    def process_synthetic_policies(self, policies_dir: str = "synthetic_policies"):
        """
        Process all policy documents and store embeddings
        """
        logging.info("πŸš€ Starting synthetic policies processing...")

        policy_files = list(Path(policies_dir).glob("*.txt"))
        if not policy_files:
            policy_files = list(Path(policies_dir).glob("*.md"))

        if not policy_files:
            logging.warning(f"⚠️  No policy files found in {policies_dir}")
            return

        logging.info(f"πŸ“ Found {len(policy_files)} policy files to process")

        all_documents = []
        all_embeddings = []
        all_metadata = []

        for idx, policy_file in enumerate(policy_files, 1):
            try:
                logging.info(f"πŸ“„ Processing file {idx}/{len(policy_files)}: {policy_file.name}")

                # Read document
                with open(policy_file, "r", encoding="utf-8") as f:
                    content = f.read()

                logging.info(f"πŸ“ Document length: {len(content)} characters")

                # Parse document to get structured data (parser validates/throws on errors)
                _ = self.document_parser.parse_document(str(policy_file))

                # Chunk the document using the proper chunker
                chunks = self.document_chunker.chunk_document(
                    text=content,
                    doc_metadata={
                        "filename": policy_file.name,
                        "source": str(policy_file),
                    },
                )

                logging.info(f"βœ‚οΈ  Created {len(chunks)} chunks from {policy_file.name}")

                # Process in batches to stay within memory limits
                batch_size = 10  # Small batches for free tier
                total_batches = (len(chunks) + batch_size - 1) // batch_size

                for batch_idx in range(0, len(chunks), batch_size):
                    batch_num = (batch_idx // batch_size) + 1
                    batch_chunks = chunks[batch_idx : batch_idx + batch_size]
                    batch_texts = [chunk["content"] for chunk in batch_chunks]

                    logging.info(f"πŸ”„ Processing batch {batch_num}/{total_batches} ({len(batch_texts)} chunks)")

                    # Generate embeddings using HF API
                    try:
                        batch_embeddings = self.embedding_service.get_embeddings(batch_texts)
                        logging.info(f"βœ… Generated {len(batch_embeddings) if batch_embeddings else 0} embeddings")
                    except Exception as e:
                        logging.error(f"❌ Embedding generation failed for batch {batch_num}: {e}")
                        continue

                    if batch_embeddings:
                        all_documents.extend(batch_texts)
                        all_embeddings.extend(batch_embeddings)

                        # Create metadata from chunk metadata
                        for chunk in batch_chunks:
                            metadata = {
                                "source_file": policy_file.name,
                                "chunk_id": chunk["metadata"].get("chunk_id", ""),
                                "chunk_index": chunk["metadata"].get("chunk_index", 0),
                                "content_hash": hashlib.md5(chunk["content"].encode()).hexdigest(),
                            }
                            all_metadata.append(metadata)

                logging.info(f"βœ… Completed {policy_file.name}: {len(chunks)} chunks processed")

            except Exception as e:
                logging.error(f"❌ Error processing {policy_file}: {e}")

        # Save all embeddings to HF Dataset
        if all_embeddings:
            logging.info(f"πŸ’Ύ Saving {len(all_embeddings)} total embeddings to HF Dataset...")
            try:
                self.vector_store.save_embeddings(all_documents, all_embeddings, all_metadata)
                logging.info(f"πŸŽ‰ Pipeline complete: {len(all_embeddings)} total embeddings saved successfully!")
            except Exception as e:
                logging.error(f"❌ Failed to save embeddings: {e}")
        else:
            logging.warning("⚠️  No embeddings generated - pipeline completed with no results!")


def main():
    """Run the document processing pipeline"""
    pipeline = HFDocumentPipeline()
    pipeline.process_synthetic_policies()


def run_hf_pipeline():
    """Entry point for HF document processing pipeline - called from app startup"""
    try:
        logging.info("Starting HF document processing pipeline from app startup...")
        pipeline = HFDocumentPipeline()
        pipeline.process_synthetic_policies()
        logging.info("HF pipeline completed successfully")
        return True
    except Exception as e:
        logging.error(f"HF pipeline failed: {e}")
        return False


if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    main()