HindiRAG / src /document_ingestor.py
hardkpentium101's picture
initial and final commit
2d51ea8
import os
import json
import hashlib
from pathlib import Path
from typing import List, Dict, Union
import uuid
from qdrant_client.http import models
class DocumentIngestor:
def __init__(self, qdrant_client, collection_name: str):
"""
Initialize document ingestor with Qdrant client and collection name
"""
self.qdrant_client = qdrant_client
self.collection_name = collection_name
self.hash_file_path = f"./{collection_name}_document_hashes.json"
def _calculate_file_hash(self, file_path: str) -> str:
"""
Calculate SHA256 hash of a file
"""
hash_sha256 = hashlib.sha256()
with open(file_path, "rb") as f:
# Read the file in chunks to handle large files efficiently
for chunk in iter(lambda: f.read(4096), b""):
hash_sha256.update(chunk)
return hash_sha256.hexdigest()
def _load_document_hashes(self) -> Dict[str, str]:
"""
Load previously saved document hashes from file
"""
if os.path.exists(self.hash_file_path):
try:
with open(self.hash_file_path, 'r', encoding='utf-8') as f:
return json.load(f)
except (json.JSONDecodeError, FileNotFoundError):
return {}
return {}
def _save_document_hashes(self, hashes: Dict[str, str]) -> None:
"""
Save document hashes to file
"""
with open(self.hash_file_path, 'w', encoding='utf-8') as f:
json.dump(hashes, f, ensure_ascii=False, indent=2)
def _get_changed_documents(self, data_dir: str) -> List[str]:
"""
Compare current files with previously hashed files to determine which ones have changed
Returns list of file paths that have changed or are new
"""
current_hashes = {}
changed_files = []
data_path = Path(data_dir)
# Get all JSON and TXT files in the directory
all_files = list(data_path.glob("*.json")) + list(data_path.glob("*.txt"))
# Load previous hashes
previous_hashes = self._load_document_hashes()
# Calculate hashes for current files
for file_path in all_files:
file_str = str(file_path)
current_hash = self._calculate_file_hash(file_str)
current_hashes[file_str] = current_hash
# Check if file is new or has changed
if file_str not in previous_hashes or previous_hashes[file_str] != current_hash:
changed_files.append(file_str)
# Also check for deleted files (present in previous but not in current)
deleted_files = [file for file in previous_hashes if file not in current_hashes]
# Update the hash file with current hashes
self._save_document_hashes(current_hashes)
print(f"Detected {len(changed_files)} changed/new files, {len(deleted_files)} deleted files")
return changed_files
def load_hindi_texts(self, data_dir: str, only_changed: bool = True) -> List[Dict]:
"""
Load Hindi poems and stories from data directory
Expected format: JSON files with 'title', 'author', 'text', 'genre' fields
If only_changed is True, only load documents from files that have changed since last ingestion
"""
documents = []
# Determine which files to process
if only_changed:
files_to_process = self._get_changed_documents(data_dir)
if not files_to_process:
print("No document changes detected. Skipping ingestion.")
return []
else:
# Process all files
data_path = Path(data_dir)
all_files = list(data_path.glob("*.json")) + list(data_path.glob("*.txt"))
files_to_process = [str(f) for f in all_files]
print(f"Processing {len(files_to_process)} files")
# Process JSON files
json_files = [f for f in files_to_process if f.endswith('.json')]
print(f"Found {len(json_files)} JSON files to process")
for json_file in json_files:
print(f"Processing file: {json_file}")
try:
with open(json_file, 'r', encoding='utf-8') as f:
data = json.load(f)
# Handle both single document and list of documents
if isinstance(data, dict):
data = [data]
for item in data:
doc = {
'id': str(uuid.uuid4()),
'title': item.get('title', ''),
'author': item.get('author', ''),
'text': item.get('text', ''),
'genre': item.get('genre', 'story'), # Default to story if not specified
'source_file': str(json_file)
}
documents.append(doc)
print(f" - Loaded {len(data)} documents from {Path(json_file).name}")
except json.JSONDecodeError as e:
print(f" - Error reading {json_file}: {e}")
except Exception as e:
print(f" - Unexpected error reading {json_file}: {e}")
# Process text files
txt_files = [f for f in files_to_process if f.endswith('.txt')]
for txt_file in txt_files:
print(f"Processing text file: {txt_file}")
try:
with open(txt_file, 'r', encoding='utf-8') as f:
text = f.read().strip()
# Simple splitting for multiple poems/stories in one file
# Assuming each poem/story is separated by double newlines
texts = text.split('\n\n')
for i, t in enumerate(texts):
if t.strip():
doc = {
'id': str(uuid.uuid4()),
'title': f"{Path(txt_file).stem}_{i}",
'author': 'Unknown',
'text': t.strip(),
'genre': 'story', # Default to story for txt files
'source_file': str(txt_file)
}
documents.append(doc)
print(f" - Loaded {len([t for t in texts if t.strip()])} text chunks from {Path(txt_file).name}")
except Exception as e:
print(f" - Error reading {txt_file}: {e}")
print(f"Total documents loaded: {len(documents)}")
return documents
def chunk_text(self, text: str, max_chunk_size: int = 1000) -> List[str]:
"""
Split text into chunks of specified size
"""
# Split by sentences to maintain coherence
sentences = text.split('. ')
chunks = []
current_chunk = ""
for sentence in sentences:
if len(current_chunk) + len(sentence) < max_chunk_size:
current_chunk += sentence + ". "
else:
if current_chunk:
chunks.append(current_chunk.strip())
current_chunk = sentence + ". "
if current_chunk:
chunks.append(current_chunk.strip())
return chunks
def ingest_documents(self, documents: List[Dict], embedding_function) -> None:
"""
Ingest documents into Qdrant collection with embeddings
"""
import time
from httpx import TimeoutException
from qdrant_client.http.exceptions import ResponseHandlingException
points = []
for idx, doc in enumerate(documents):
# Chunk the text if it's too long
text_chunks = self.chunk_text(doc['text'])
for i, chunk in enumerate(text_chunks):
# Generate embedding for the chunk
embedding = embedding_function(chunk)
# Create a unique ID for this chunk - using UUID for compatibility
chunk_id = str(uuid.uuid4())
# Prepare payload with metadata
payload = {
'title': doc['title'],
'author': doc['author'],
'genre': doc['genre'],
'source_file': doc['source_file'],
'original_id': doc['id'],
'chunk_index': i,
'full_text': chunk
}
# Add point to the list
points.append(models.PointStruct(
id=chunk_id,
vector=embedding,
payload=payload
))
# Batch upload every 50 points to avoid timeout issues (reduced from 100)
if len(points) >= 50:
if points:
success = False
attempts = 0
max_attempts = 3
while not success and attempts < max_attempts:
try:
self.qdrant_client.upsert(
collection_name=self.collection_name,
points=points
)
print(f"Batch uploaded {len(points)} document chunks to Qdrant collection '{self.collection_name}'")
points = [] # Reset points list after uploading
success = True
except (ResponseHandlingException, TimeoutException) as e:
attempts += 1
print(f"Upload attempt {attempts} failed: {e}")
if attempts < max_attempts:
print(f"Retrying in 2 seconds... (attempt {attempts + 1})")
time.sleep(2)
else:
print(f"Max attempts reached. Skipping this batch of {len(points)} points.")
points = [] # Clear the problematic points to continue
# Progress indicator
if (idx + 1) % 100 == 0:
print(f"Processed {idx + 1}/{len(documents)} documents...")
# Upload remaining points
if points:
success = False
attempts = 0
max_attempts = 3
while not success and attempts < max_attempts:
try:
self.qdrant_client.upsert(
collection_name=self.collection_name,
points=points
)
print(f"Ingested {len(points)} final document chunks into Qdrant collection '{self.collection_name}'")
success = True
except (ResponseHandlingException, TimeoutException) as e:
attempts += 1
print(f"Final upload attempt {attempts} failed: {e}")
if attempts < max_attempts:
print(f"Retrying in 2 seconds... (attempt {attempts + 1})")
time.sleep(2)
else:
print(f"Max attempts reached for final batch. {len(points)} points not ingested.")
def load_and_ingest(self, data_dir: str, embedding_function, only_changed: bool = True) -> int:
"""
Load documents from directory and ingest them into Qdrant
If only_changed is True, only ingest documents from files that have changed since last ingestion
"""
print(f"Loading documents from {data_dir}")
documents = self.load_hindi_texts(data_dir, only_changed=only_changed)
if not documents:
print("No new or changed documents to ingest.")
return 0
print(f"Loaded {len(documents)} documents")
print("Ingesting documents into Qdrant...")
self.ingest_documents(documents, embedding_function)
return len(documents)
# Example usage
if __name__ == "__main__":
# This would be called from the main application
pass