|
|
|
|
|
""" |
|
|
Streamlined Document Processing Module |
|
|
|
|
|
This module provides a document processing pipeline with: |
|
|
- Direct LangChain loader integration with glob patterns |
|
|
- Built-in FAISS vector storage without external file tracking |
|
|
- Semantic text chunking using RecursiveCharacterTextSplitter |
|
|
- Consolidated document metadata handling |
|
|
""" |
|
|
|
|
|
import os |
|
|
import time |
|
|
|
|
|
|
|
|
os.environ.setdefault("TOKENIZERS_PARALLELISM", "true") |
|
|
|
|
|
from pathlib import Path |
|
|
from typing import Dict, List, Optional, Any, Callable |
|
|
from datetime import datetime |
|
|
|
|
|
|
|
|
from langchain_community.document_loaders import DirectoryLoader, PyPDFLoader, Docx2txtLoader, TextLoader |
|
|
from langchain_community.vectorstores import FAISS |
|
|
from langchain_core.documents import Document |
|
|
from langchain_text_splitters import RecursiveCharacterTextSplitter |
|
|
from langchain_huggingface import HuggingFaceEmbeddings |
|
|
|
|
|
|
|
|
from app.core.config import get_app_config |
|
|
from app.core.model_cache import get_cached_embeddings |
|
|
from app.core.logging import logger |
|
|
from app.core.performance import get_performance_manager, monitor_performance, cached_by_content |
|
|
|
|
|
|
|
|
try: |
|
|
from accelerate import Accelerator |
|
|
ACCELERATE_AVAILABLE = True |
|
|
except ImportError: |
|
|
ACCELERATE_AVAILABLE = False |
|
|
Accelerator = None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def safe_execute(func: Callable, default: Any = None, context: str = "", log_errors: bool = True) -> Any: |
|
|
""" |
|
|
Execute a function with basic error handling and logging |
|
|
|
|
|
Args: |
|
|
func: Function to execute |
|
|
default: Value to return on error |
|
|
context: Brief description for logs |
|
|
log_errors: Whether to log errors |
|
|
|
|
|
Returns: |
|
|
Function result or default value on error |
|
|
""" |
|
|
try: |
|
|
return func() |
|
|
except Exception as e: |
|
|
if log_errors: |
|
|
logger.error(f"{context or func.__name__}: {e}") |
|
|
return default |
|
|
|
|
|
|
|
|
def escape_markdown_math(text: str) -> str: |
|
|
"""Escape dollar signs and other LaTeX-like patterns to prevent Streamlit from interpreting them as math.""" |
|
|
if not text: |
|
|
return text |
|
|
|
|
|
text = text.replace('$', '\\$') |
|
|
|
|
|
text = text.replace('\\(', '\\\\(') |
|
|
text = text.replace('\\)', '\\\\)') |
|
|
text = text.replace('\\[', '\\\\[') |
|
|
text = text.replace('\\]', '\\\\]') |
|
|
return text |
|
|
|
|
|
|
|
|
class DocumentProcessor: |
|
|
""" |
|
|
Streamlined document processing class with integrated FAISS vector storage |
|
|
|
|
|
This class consolidates all document processing functionality including: |
|
|
- Document loading using LangChain's DirectoryLoader with glob patterns |
|
|
- Semantic text chunking with RecursiveCharacterTextSplitter |
|
|
- FAISS vector storage for similarity search |
|
|
- Document metadata handling |
|
|
""" |
|
|
|
|
|
def __init__(self, model_name: Optional[str] = None, store_name: Optional[str] = None): |
|
|
""" |
|
|
Initialize the document processor |
|
|
|
|
|
Args: |
|
|
model_name: Name of the sentence transformer model for embeddings (optional) |
|
|
store_name: Name for the FAISS store (optional, uses config default) |
|
|
""" |
|
|
config = get_app_config() |
|
|
self.model_name = model_name or config.model['sentence_transformer_model'] |
|
|
self.store_name = store_name or config.processing['faiss_store_name'] |
|
|
|
|
|
|
|
|
self.documents: List[Document] = [] |
|
|
self.vector_store: Optional[FAISS] = None |
|
|
self.embeddings: Optional[HuggingFaceEmbeddings] = None |
|
|
self.text_splitter: Optional[RecursiveCharacterTextSplitter] = None |
|
|
self.performance_stats = {} |
|
|
|
|
|
|
|
|
self.chunks = [] |
|
|
|
|
|
|
|
|
self._init_text_splitter() |
|
|
|
|
|
|
|
|
if self.model_name: |
|
|
self.embeddings = get_cached_embeddings(self.model_name) |
|
|
logger.info(f"Initialized cached embeddings with model: {self.model_name}") |
|
|
|
|
|
|
|
|
if ACCELERATE_AVAILABLE: |
|
|
try: |
|
|
self.accelerator = Accelerator() |
|
|
logger.info(f"Accelerate initialized with device: {self.accelerator.device}") |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to initialize accelerate: {e}") |
|
|
self.accelerator = None |
|
|
else: |
|
|
self.accelerator = None |
|
|
else: |
|
|
logger.warning("No model name provided - embeddings not initialized") |
|
|
self.accelerator = None |
|
|
|
|
|
|
|
|
self._load_existing_store() |
|
|
|
|
|
def _init_text_splitter(self): |
|
|
"""Initialize the text splitter with optimal settings for semantic chunking""" |
|
|
config = get_app_config() |
|
|
self.text_splitter = RecursiveCharacterTextSplitter( |
|
|
chunk_size=config.processing['chunk_size'], |
|
|
chunk_overlap=config.processing['chunk_overlap'], |
|
|
|
|
|
separators=[ |
|
|
"\n\n\n", |
|
|
"\n\n", |
|
|
"\n", |
|
|
". ", |
|
|
".\n", |
|
|
"! ", |
|
|
"? ", |
|
|
"; ", |
|
|
", ", |
|
|
" ", |
|
|
"", |
|
|
], |
|
|
length_function=len, |
|
|
is_separator_regex=False, |
|
|
|
|
|
keep_separator=True, |
|
|
) |
|
|
logger.info(f"Initialized semantic text splitter: {config.processing['chunk_size']} chars, {config.processing['chunk_overlap']} overlap") |
|
|
|
|
|
def _load_existing_store(self): |
|
|
"""Load existing FAISS store if available""" |
|
|
if not self.embeddings: |
|
|
return |
|
|
|
|
|
config = get_app_config() |
|
|
faiss_dir = config.paths['faiss_dir'] |
|
|
faiss_index_path = faiss_dir / f"{self.store_name}.faiss" |
|
|
faiss_pkl_path = faiss_dir / f"{self.store_name}.pkl" |
|
|
|
|
|
try: |
|
|
if faiss_index_path.exists() and faiss_pkl_path.exists(): |
|
|
self.vector_store = FAISS.load_local( |
|
|
str(faiss_dir), |
|
|
self.embeddings, |
|
|
index_name=self.store_name, |
|
|
allow_dangerous_deserialization=True |
|
|
) |
|
|
logger.info(f"Loaded existing FAISS store: {self.store_name} with {self.vector_store.index.ntotal} vectors") |
|
|
else: |
|
|
logger.info(f"No existing FAISS store found for: {self.store_name}") |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to load FAISS store: {e}") |
|
|
self.vector_store = None |
|
|
|
|
|
@monitor_performance |
|
|
def load_data_room(self, data_room_path: str, progress_bar=None) -> Dict[str, Any]: |
|
|
""" |
|
|
Load and process an entire data room using DirectoryLoader with glob patterns |
|
|
|
|
|
Args: |
|
|
data_room_path: Path to the data room directory |
|
|
progress_bar: Optional Streamlit progress bar object |
|
|
|
|
|
Returns: |
|
|
Dictionary with processing results including performance metrics |
|
|
""" |
|
|
import time |
|
|
start_time = time.time() |
|
|
|
|
|
config = get_app_config() |
|
|
data_room_path = Path(data_room_path) |
|
|
|
|
|
if not data_room_path.exists(): |
|
|
logger.error(f"Data room path does not exist: {data_room_path}") |
|
|
return {'documents_count': 0, 'chunks_count': 0, 'has_embeddings': False} |
|
|
|
|
|
logger.info(f"Starting streamlined data room processing: {data_room_path}") |
|
|
|
|
|
|
|
|
self.documents = [] |
|
|
|
|
|
@monitor_performance |
|
|
def load_data_room(self, data_room_path: str, progress_bar=None) -> Dict[str, Any]: |
|
|
start_time = time.time() |
|
|
documents_loaded = 0 |
|
|
config = get_app_config() |
|
|
|
|
|
|
|
|
supported_extensions = config.processing['supported_file_extensions'] |
|
|
perf_manager = get_performance_manager() |
|
|
|
|
|
|
|
|
mem_info = perf_manager.monitor_memory_usage() |
|
|
logger.info(f"Memory usage at start: {mem_info['percent']:.1f}%") |
|
|
logger.info(f"Available memory: {mem_info['rss']:.1f}MB") |
|
|
|
|
|
for ext in supported_extensions: |
|
|
try: |
|
|
|
|
|
glob_pattern = f"**/*{ext}" |
|
|
|
|
|
|
|
|
if ext == '.pdf': |
|
|
loader_cls = PyPDFLoader |
|
|
elif ext in ['.docx', '.doc']: |
|
|
loader_cls = Docx2txtLoader |
|
|
elif ext in ['.txt', '.md']: |
|
|
loader_cls = TextLoader |
|
|
else: |
|
|
continue |
|
|
|
|
|
|
|
|
loader = DirectoryLoader( |
|
|
str(data_room_path), |
|
|
glob=glob_pattern, |
|
|
loader_cls=loader_cls, |
|
|
loader_kwargs={'encoding': 'utf-8'} if ext in ['.txt', '.md'] else {}, |
|
|
recursive=True, |
|
|
show_progress=False, |
|
|
use_multithreading=True |
|
|
) |
|
|
|
|
|
|
|
|
docs = safe_execute( |
|
|
lambda: loader.load(), |
|
|
default=[], |
|
|
context=f"Loading {ext} files" |
|
|
) |
|
|
|
|
|
if docs: |
|
|
|
|
|
for doc in docs: |
|
|
if 'source' in doc.metadata: |
|
|
source_path = Path(doc.metadata['source']) |
|
|
if source_path.exists(): |
|
|
try: |
|
|
rel_path = source_path.relative_to(data_room_path) |
|
|
doc.metadata['path'] = str(rel_path) |
|
|
doc.metadata['name'] = source_path.name |
|
|
except ValueError: |
|
|
|
|
|
doc.metadata['path'] = doc.metadata['source'] |
|
|
doc.metadata['name'] = source_path.name |
|
|
|
|
|
self.documents.extend(docs) |
|
|
documents_loaded += len(docs) |
|
|
logger.info(f"Loaded {len(docs)} {ext} documents") |
|
|
|
|
|
|
|
|
mem_usage = perf_manager.monitor_memory_usage() |
|
|
if perf_manager.should_gc_collect(mem_usage): |
|
|
import gc |
|
|
gc.collect() |
|
|
logger.debug(f"GC triggered - memory usage: {mem_usage['rss']:.1f}MB") |
|
|
except Exception as e: |
|
|
logger.error(f"Error loading {ext} files: {e}") |
|
|
|
|
|
scan_time = time.time() - start_time |
|
|
logger.info(f"Document loading completed in {scan_time:.2f} seconds") |
|
|
|
|
|
|
|
|
chunk_start = time.time() |
|
|
if self.documents and self.text_splitter: |
|
|
|
|
|
original_docs = {doc.metadata.get('source', ''): True for doc in self.documents} |
|
|
|
|
|
self.documents = self.text_splitter.split_documents(self.documents) |
|
|
|
|
|
|
|
|
|
|
|
seen_documents = {} |
|
|
self.chunks = [] |
|
|
|
|
|
for i, doc in enumerate(self.documents): |
|
|
doc.metadata['chunk_id'] = f"chunk_{i}" |
|
|
doc.metadata['processed_at'] = datetime.now().isoformat() |
|
|
|
|
|
|
|
|
doc_source = doc.metadata.get('source', '') |
|
|
if doc_source not in seen_documents: |
|
|
doc.metadata['is_first_chunk'] = True |
|
|
seen_documents[doc_source] = True |
|
|
logger.debug(f"First chunk marked for: {doc_source}") |
|
|
else: |
|
|
doc.metadata['is_first_chunk'] = False |
|
|
|
|
|
|
|
|
if 'page' in doc.metadata: |
|
|
doc.metadata['citation'] = f"page {doc.metadata['page']}" |
|
|
else: |
|
|
doc.metadata['citation'] = doc.metadata.get('name', 'document') |
|
|
|
|
|
|
|
|
chunk_dict = { |
|
|
'text': doc.page_content, |
|
|
'source': doc.metadata.get('name', ''), |
|
|
'path': doc.metadata.get('path', ''), |
|
|
'full_path': doc.metadata.get('source', ''), |
|
|
'metadata': doc.metadata |
|
|
} |
|
|
self.chunks.append(chunk_dict) |
|
|
|
|
|
first_chunks_count = len([doc for doc in self.documents if doc.metadata.get('is_first_chunk', False)]) |
|
|
logger.info(f"Marked {first_chunks_count} first chunks out of {len(self.documents)} total chunks") |
|
|
|
|
|
chunk_time = time.time() - chunk_start |
|
|
logger.info(f"Text splitting completed in {chunk_time:.2f} seconds") |
|
|
|
|
|
|
|
|
embedding_time = 0 |
|
|
if self.embeddings and self.documents: |
|
|
embedding_start = time.time() |
|
|
|
|
|
if self.vector_store is None: |
|
|
logger.debug("FAISS store not pre-loaded (expected during index building)") |
|
|
else: |
|
|
logger.info(f"Using pre-loaded FAISS store with {self.vector_store.index.ntotal} vectors") |
|
|
|
|
|
embedding_time = time.time() - embedding_start |
|
|
logger.info(f"FAISS check completed in {embedding_time:.2f} seconds") |
|
|
|
|
|
total_time = time.time() - start_time |
|
|
logger.info(f"Total data room processing completed in {total_time:.2f} seconds") |
|
|
|
|
|
|
|
|
self.performance_stats = { |
|
|
'total_time': total_time, |
|
|
'scan_time': scan_time, |
|
|
'chunk_time': chunk_time, |
|
|
'embedding_time': embedding_time, |
|
|
'documents_per_second': documents_loaded / scan_time if scan_time > 0 else 0 |
|
|
} |
|
|
|
|
|
return { |
|
|
'documents_count': documents_loaded, |
|
|
'chunks_count': len(self.documents), |
|
|
'total_chunks_in_store': self.vector_store.index.ntotal if self.vector_store else 0, |
|
|
'has_embeddings': self.vector_store is not None, |
|
|
'performance': self.performance_stats |
|
|
} |
|
|
|
|
|
def search(self, query: str, top_k: int = 5, threshold: Optional[float] = None) -> List[Dict]: |
|
|
""" |
|
|
Search documents using FAISS similarity search |
|
|
|
|
|
Args: |
|
|
query: Search query |
|
|
top_k: Number of top results to return |
|
|
threshold: Minimum similarity threshold |
|
|
|
|
|
Returns: |
|
|
List of search results with scores and metadata |
|
|
""" |
|
|
if not self.vector_store: |
|
|
logger.warning("FAISS vector store not available for search") |
|
|
return [] |
|
|
|
|
|
config = get_app_config() |
|
|
if threshold is None: |
|
|
threshold = config.processing['similarity_threshold'] |
|
|
|
|
|
try: |
|
|
|
|
|
docs_and_scores = self.vector_store.similarity_search_with_score(query, k=max(20, top_k*3)) |
|
|
|
|
|
|
|
|
import numpy as np |
|
|
|
|
|
|
|
|
docs = [doc for doc, score in docs_and_scores] |
|
|
scores = np.array([score for doc, score in docs_and_scores]) |
|
|
|
|
|
|
|
|
similarity_scores = np.where(scores <= 2.0, 1.0 - (scores / 2.0), 0.0) |
|
|
|
|
|
|
|
|
threshold_mask = similarity_scores >= threshold |
|
|
valid_indices = np.where(threshold_mask)[0] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
candidates = [] |
|
|
|
|
|
for idx in valid_indices: |
|
|
doc = docs[idx] |
|
|
similarity_score = similarity_scores[idx] |
|
|
|
|
|
candidates.append({ |
|
|
'text': doc.page_content, |
|
|
'source': doc.metadata.get('name', ''), |
|
|
'path': doc.metadata.get('path', ''), |
|
|
'score': float(similarity_score), |
|
|
'metadata': doc.metadata |
|
|
}) |
|
|
|
|
|
|
|
|
if candidates: |
|
|
try: |
|
|
|
|
|
from app.core.ranking import rerank_results |
|
|
|
|
|
|
|
|
candidates_to_rerank = candidates[:min(15, len(candidates))] |
|
|
|
|
|
reranked_results = rerank_results(query, candidates_to_rerank) |
|
|
results = reranked_results[:top_k] |
|
|
logger.info(f"Reranked {len(reranked_results)} search results for query: {query[:50]}...") |
|
|
except Exception as e: |
|
|
|
|
|
logger.warning(f"Reranking failed for search query '{query}': {e}. Using original similarity scores.") |
|
|
results = candidates[:top_k] |
|
|
else: |
|
|
results = [] |
|
|
|
|
|
return results |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to search FAISS store: {e}") |
|
|
raise RuntimeError(f"Document search failed for query '{query}': {e}") from e |
|
|
|
|
|
def get_statistics(self) -> Dict[str, Any]: |
|
|
"""Get processing statistics""" |
|
|
stats = { |
|
|
'total_documents': len(self.documents), |
|
|
'total_vectors_in_store': self.vector_store.index.ntotal if self.vector_store else 0, |
|
|
'has_embeddings': self.vector_store is not None, |
|
|
'store_name': self.store_name, |
|
|
'model_name': self.model_name |
|
|
} |
|
|
|
|
|
|
|
|
if self.performance_stats: |
|
|
stats['performance'] = self.performance_stats |
|
|
|
|
|
return stats |
|
|
|