import os import json import hashlib from pathlib import Path from typing import List, Dict, Union import uuid from qdrant_client.http import models class DocumentIngestor: def __init__(self, qdrant_client, collection_name: str): """ Initialize document ingestor with Qdrant client and collection name """ self.qdrant_client = qdrant_client self.collection_name = collection_name self.hash_file_path = f"./{collection_name}_document_hashes.json" def _calculate_file_hash(self, file_path: str) -> str: """ Calculate SHA256 hash of a file """ hash_sha256 = hashlib.sha256() with open(file_path, "rb") as f: # Read the file in chunks to handle large files efficiently for chunk in iter(lambda: f.read(4096), b""): hash_sha256.update(chunk) return hash_sha256.hexdigest() def _load_document_hashes(self) -> Dict[str, str]: """ Load previously saved document hashes from file """ if os.path.exists(self.hash_file_path): try: with open(self.hash_file_path, 'r', encoding='utf-8') as f: return json.load(f) except (json.JSONDecodeError, FileNotFoundError): return {} return {} def _save_document_hashes(self, hashes: Dict[str, str]) -> None: """ Save document hashes to file """ with open(self.hash_file_path, 'w', encoding='utf-8') as f: json.dump(hashes, f, ensure_ascii=False, indent=2) def _get_changed_documents(self, data_dir: str) -> List[str]: """ Compare current files with previously hashed files to determine which ones have changed Returns list of file paths that have changed or are new """ current_hashes = {} changed_files = [] data_path = Path(data_dir) # Get all JSON and TXT files in the directory all_files = list(data_path.glob("*.json")) + list(data_path.glob("*.txt")) # Load previous hashes previous_hashes = self._load_document_hashes() # Calculate hashes for current files for file_path in all_files: file_str = str(file_path) current_hash = self._calculate_file_hash(file_str) current_hashes[file_str] = current_hash # Check if file is new or has changed if file_str not in previous_hashes or previous_hashes[file_str] != current_hash: changed_files.append(file_str) # Also check for deleted files (present in previous but not in current) deleted_files = [file for file in previous_hashes if file not in current_hashes] # Update the hash file with current hashes self._save_document_hashes(current_hashes) print(f"Detected {len(changed_files)} changed/new files, {len(deleted_files)} deleted files") return changed_files def load_hindi_texts(self, data_dir: str, only_changed: bool = True) -> List[Dict]: """ Load Hindi poems and stories from data directory Expected format: JSON files with 'title', 'author', 'text', 'genre' fields If only_changed is True, only load documents from files that have changed since last ingestion """ documents = [] # Determine which files to process if only_changed: files_to_process = self._get_changed_documents(data_dir) if not files_to_process: print("No document changes detected. Skipping ingestion.") return [] else: # Process all files data_path = Path(data_dir) all_files = list(data_path.glob("*.json")) + list(data_path.glob("*.txt")) files_to_process = [str(f) for f in all_files] print(f"Processing {len(files_to_process)} files") # Process JSON files json_files = [f for f in files_to_process if f.endswith('.json')] print(f"Found {len(json_files)} JSON files to process") for json_file in json_files: print(f"Processing file: {json_file}") try: with open(json_file, 'r', encoding='utf-8') as f: data = json.load(f) # Handle both single document and list of documents if isinstance(data, dict): data = [data] for item in data: doc = { 'id': str(uuid.uuid4()), 'title': item.get('title', ''), 'author': item.get('author', ''), 'text': item.get('text', ''), 'genre': item.get('genre', 'story'), # Default to story if not specified 'source_file': str(json_file) } documents.append(doc) print(f" - Loaded {len(data)} documents from {Path(json_file).name}") except json.JSONDecodeError as e: print(f" - Error reading {json_file}: {e}") except Exception as e: print(f" - Unexpected error reading {json_file}: {e}") # Process text files txt_files = [f for f in files_to_process if f.endswith('.txt')] for txt_file in txt_files: print(f"Processing text file: {txt_file}") try: with open(txt_file, 'r', encoding='utf-8') as f: text = f.read().strip() # Simple splitting for multiple poems/stories in one file # Assuming each poem/story is separated by double newlines texts = text.split('\n\n') for i, t in enumerate(texts): if t.strip(): doc = { 'id': str(uuid.uuid4()), 'title': f"{Path(txt_file).stem}_{i}", 'author': 'Unknown', 'text': t.strip(), 'genre': 'story', # Default to story for txt files 'source_file': str(txt_file) } documents.append(doc) print(f" - Loaded {len([t for t in texts if t.strip()])} text chunks from {Path(txt_file).name}") except Exception as e: print(f" - Error reading {txt_file}: {e}") print(f"Total documents loaded: {len(documents)}") return documents def chunk_text(self, text: str, max_chunk_size: int = 1000) -> List[str]: """ Split text into chunks of specified size """ # Split by sentences to maintain coherence sentences = text.split('. ') chunks = [] current_chunk = "" for sentence in sentences: if len(current_chunk) + len(sentence) < max_chunk_size: current_chunk += sentence + ". " else: if current_chunk: chunks.append(current_chunk.strip()) current_chunk = sentence + ". " if current_chunk: chunks.append(current_chunk.strip()) return chunks def ingest_documents(self, documents: List[Dict], embedding_function) -> None: """ Ingest documents into Qdrant collection with embeddings """ import time from httpx import TimeoutException from qdrant_client.http.exceptions import ResponseHandlingException points = [] for idx, doc in enumerate(documents): # Chunk the text if it's too long text_chunks = self.chunk_text(doc['text']) for i, chunk in enumerate(text_chunks): # Generate embedding for the chunk embedding = embedding_function(chunk) # Create a unique ID for this chunk - using UUID for compatibility chunk_id = str(uuid.uuid4()) # Prepare payload with metadata payload = { 'title': doc['title'], 'author': doc['author'], 'genre': doc['genre'], 'source_file': doc['source_file'], 'original_id': doc['id'], 'chunk_index': i, 'full_text': chunk } # Add point to the list points.append(models.PointStruct( id=chunk_id, vector=embedding, payload=payload )) # Batch upload every 50 points to avoid timeout issues (reduced from 100) if len(points) >= 50: if points: success = False attempts = 0 max_attempts = 3 while not success and attempts < max_attempts: try: self.qdrant_client.upsert( collection_name=self.collection_name, points=points ) print(f"Batch uploaded {len(points)} document chunks to Qdrant collection '{self.collection_name}'") points = [] # Reset points list after uploading success = True except (ResponseHandlingException, TimeoutException) as e: attempts += 1 print(f"Upload attempt {attempts} failed: {e}") if attempts < max_attempts: print(f"Retrying in 2 seconds... (attempt {attempts + 1})") time.sleep(2) else: print(f"Max attempts reached. Skipping this batch of {len(points)} points.") points = [] # Clear the problematic points to continue # Progress indicator if (idx + 1) % 100 == 0: print(f"Processed {idx + 1}/{len(documents)} documents...") # Upload remaining points if points: success = False attempts = 0 max_attempts = 3 while not success and attempts < max_attempts: try: self.qdrant_client.upsert( collection_name=self.collection_name, points=points ) print(f"Ingested {len(points)} final document chunks into Qdrant collection '{self.collection_name}'") success = True except (ResponseHandlingException, TimeoutException) as e: attempts += 1 print(f"Final upload attempt {attempts} failed: {e}") if attempts < max_attempts: print(f"Retrying in 2 seconds... (attempt {attempts + 1})") time.sleep(2) else: print(f"Max attempts reached for final batch. {len(points)} points not ingested.") def load_and_ingest(self, data_dir: str, embedding_function, only_changed: bool = True) -> int: """ Load documents from directory and ingest them into Qdrant If only_changed is True, only ingest documents from files that have changed since last ingestion """ print(f"Loading documents from {data_dir}") documents = self.load_hindi_texts(data_dir, only_changed=only_changed) if not documents: print("No new or changed documents to ingest.") return 0 print(f"Loaded {len(documents)} documents") print("Ingesting documents into Qdrant...") self.ingest_documents(documents, embedding_function) return len(documents) # Example usage if __name__ == "__main__": # This would be called from the main application pass