File size: 12,292 Bytes
2d51ea8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
import os
import json
import hashlib
from pathlib import Path
from typing import List, Dict, Union
import uuid
from qdrant_client.http import models

class DocumentIngestor:
    def __init__(self, qdrant_client, collection_name: str):
        """
        Initialize document ingestor with Qdrant client and collection name
        """
        self.qdrant_client = qdrant_client
        self.collection_name = collection_name
        self.hash_file_path = f"./{collection_name}_document_hashes.json"

    def _calculate_file_hash(self, file_path: str) -> str:
        """
        Calculate SHA256 hash of a file
        """
        hash_sha256 = hashlib.sha256()
        with open(file_path, "rb") as f:
            # Read the file in chunks to handle large files efficiently
            for chunk in iter(lambda: f.read(4096), b""):
                hash_sha256.update(chunk)
        return hash_sha256.hexdigest()

    def _load_document_hashes(self) -> Dict[str, str]:
        """
        Load previously saved document hashes from file
        """
        if os.path.exists(self.hash_file_path):
            try:
                with open(self.hash_file_path, 'r', encoding='utf-8') as f:
                    return json.load(f)
            except (json.JSONDecodeError, FileNotFoundError):
                return {}
        return {}

    def _save_document_hashes(self, hashes: Dict[str, str]) -> None:
        """
        Save document hashes to file
        """
        with open(self.hash_file_path, 'w', encoding='utf-8') as f:
            json.dump(hashes, f, ensure_ascii=False, indent=2)

    def _get_changed_documents(self, data_dir: str) -> List[str]:
        """
        Compare current files with previously hashed files to determine which ones have changed
        Returns list of file paths that have changed or are new
        """
        current_hashes = {}
        changed_files = []

        data_path = Path(data_dir)

        # Get all JSON and TXT files in the directory
        all_files = list(data_path.glob("*.json")) + list(data_path.glob("*.txt"))

        # Load previous hashes
        previous_hashes = self._load_document_hashes()

        # Calculate hashes for current files
        for file_path in all_files:
            file_str = str(file_path)
            current_hash = self._calculate_file_hash(file_str)
            current_hashes[file_str] = current_hash

            # Check if file is new or has changed
            if file_str not in previous_hashes or previous_hashes[file_str] != current_hash:
                changed_files.append(file_str)

        # Also check for deleted files (present in previous but not in current)
        deleted_files = [file for file in previous_hashes if file not in current_hashes]

        # Update the hash file with current hashes
        self._save_document_hashes(current_hashes)

        print(f"Detected {len(changed_files)} changed/new files, {len(deleted_files)} deleted files")
        return changed_files
    
    def load_hindi_texts(self, data_dir: str, only_changed: bool = True) -> List[Dict]:
        """
        Load Hindi poems and stories from data directory
        Expected format: JSON files with 'title', 'author', 'text', 'genre' fields
        If only_changed is True, only load documents from files that have changed since last ingestion
        """
        documents = []

        # Determine which files to process
        if only_changed:
            files_to_process = self._get_changed_documents(data_dir)
            if not files_to_process:
                print("No document changes detected. Skipping ingestion.")
                return []
        else:
            # Process all files
            data_path = Path(data_dir)
            all_files = list(data_path.glob("*.json")) + list(data_path.glob("*.txt"))
            files_to_process = [str(f) for f in all_files]

        print(f"Processing {len(files_to_process)} files")

        # Process JSON files
        json_files = [f for f in files_to_process if f.endswith('.json')]
        print(f"Found {len(json_files)} JSON files to process")

        for json_file in json_files:
            print(f"Processing file: {json_file}")
            try:
                with open(json_file, 'r', encoding='utf-8') as f:
                    data = json.load(f)

                    # Handle both single document and list of documents
                    if isinstance(data, dict):
                        data = [data]

                    for item in data:
                        doc = {
                            'id': str(uuid.uuid4()),
                            'title': item.get('title', ''),
                            'author': item.get('author', ''),
                            'text': item.get('text', ''),
                            'genre': item.get('genre', 'story'),  # Default to story if not specified
                            'source_file': str(json_file)
                        }
                        documents.append(doc)

                    print(f"  - Loaded {len(data)} documents from {Path(json_file).name}")
            except json.JSONDecodeError as e:
                print(f"  - Error reading {json_file}: {e}")
            except Exception as e:
                print(f"  - Unexpected error reading {json_file}: {e}")

        # Process text files
        txt_files = [f for f in files_to_process if f.endswith('.txt')]
        for txt_file in txt_files:
            print(f"Processing text file: {txt_file}")
            try:
                with open(txt_file, 'r', encoding='utf-8') as f:
                    text = f.read().strip()

                    # Simple splitting for multiple poems/stories in one file
                    # Assuming each poem/story is separated by double newlines
                    texts = text.split('\n\n')

                    for i, t in enumerate(texts):
                        if t.strip():
                            doc = {
                                'id': str(uuid.uuid4()),
                                'title': f"{Path(txt_file).stem}_{i}",
                                'author': 'Unknown',
                                'text': t.strip(),
                                'genre': 'story',  # Default to story for txt files
                                'source_file': str(txt_file)
                            }
                            documents.append(doc)

                    print(f"  - Loaded {len([t for t in texts if t.strip()])} text chunks from {Path(txt_file).name}")
            except Exception as e:
                print(f"  - Error reading {txt_file}: {e}")

        print(f"Total documents loaded: {len(documents)}")
        return documents
    
    def chunk_text(self, text: str, max_chunk_size: int = 1000) -> List[str]:
        """
        Split text into chunks of specified size
        """
        # Split by sentences to maintain coherence
        sentences = text.split('. ')
        chunks = []
        current_chunk = ""
        
        for sentence in sentences:
            if len(current_chunk) + len(sentence) < max_chunk_size:
                current_chunk += sentence + ". "
            else:
                if current_chunk:
                    chunks.append(current_chunk.strip())
                current_chunk = sentence + ". "
        
        if current_chunk:
            chunks.append(current_chunk.strip())
        
        return chunks
    
    def ingest_documents(self, documents: List[Dict], embedding_function) -> None:
        """
        Ingest documents into Qdrant collection with embeddings
        """
        import time
        from httpx import TimeoutException
        from qdrant_client.http.exceptions import ResponseHandlingException

        points = []

        for idx, doc in enumerate(documents):
            # Chunk the text if it's too long
            text_chunks = self.chunk_text(doc['text'])

            for i, chunk in enumerate(text_chunks):
                # Generate embedding for the chunk
                embedding = embedding_function(chunk)

                # Create a unique ID for this chunk - using UUID for compatibility
                chunk_id = str(uuid.uuid4())

                # Prepare payload with metadata
                payload = {
                    'title': doc['title'],
                    'author': doc['author'],
                    'genre': doc['genre'],
                    'source_file': doc['source_file'],
                    'original_id': doc['id'],
                    'chunk_index': i,
                    'full_text': chunk
                }

                # Add point to the list
                points.append(models.PointStruct(
                    id=chunk_id,
                    vector=embedding,
                    payload=payload
                ))

            # Batch upload every 50 points to avoid timeout issues (reduced from 100)
            if len(points) >= 50:
                if points:
                    success = False
                    attempts = 0
                    max_attempts = 3
                    while not success and attempts < max_attempts:
                        try:
                            self.qdrant_client.upsert(
                                collection_name=self.collection_name,
                                points=points
                            )
                            print(f"Batch uploaded {len(points)} document chunks to Qdrant collection '{self.collection_name}'")
                            points = []  # Reset points list after uploading
                            success = True
                        except (ResponseHandlingException, TimeoutException) as e:
                            attempts += 1
                            print(f"Upload attempt {attempts} failed: {e}")
                            if attempts < max_attempts:
                                print(f"Retrying in 2 seconds... (attempt {attempts + 1})")
                                time.sleep(2)
                            else:
                                print(f"Max attempts reached. Skipping this batch of {len(points)} points.")
                                points = []  # Clear the problematic points to continue

            # Progress indicator
            if (idx + 1) % 100 == 0:
                print(f"Processed {idx + 1}/{len(documents)} documents...")

        # Upload remaining points
        if points:
            success = False
            attempts = 0
            max_attempts = 3
            while not success and attempts < max_attempts:
                try:
                    self.qdrant_client.upsert(
                        collection_name=self.collection_name,
                        points=points
                    )
                    print(f"Ingested {len(points)} final document chunks into Qdrant collection '{self.collection_name}'")
                    success = True
                except (ResponseHandlingException, TimeoutException) as e:
                    attempts += 1
                    print(f"Final upload attempt {attempts} failed: {e}")
                    if attempts < max_attempts:
                        print(f"Retrying in 2 seconds... (attempt {attempts + 1})")
                        time.sleep(2)
                    else:
                        print(f"Max attempts reached for final batch. {len(points)} points not ingested.")
    
    def load_and_ingest(self, data_dir: str, embedding_function, only_changed: bool = True) -> int:
        """
        Load documents from directory and ingest them into Qdrant
        If only_changed is True, only ingest documents from files that have changed since last ingestion
        """
        print(f"Loading documents from {data_dir}")
        documents = self.load_hindi_texts(data_dir, only_changed=only_changed)

        if not documents:
            print("No new or changed documents to ingest.")
            return 0

        print(f"Loaded {len(documents)} documents")

        print("Ingesting documents into Qdrant...")
        self.ingest_documents(documents, embedding_function)

        return len(documents)

# Example usage
if __name__ == "__main__":
    # This would be called from the main application
    pass