akryldigital's picture
Pilot (#2)
92633a7 verified
raw
history blame
37.5 kB
"""Context retrieval with reranking capabilities."""
import os
from typing import List, Optional, Tuple, Dict, Any
from langchain.schema import Document
from langchain_community.vectorstores import Qdrant
from langchain_community.embeddings import HuggingFaceEmbeddings
from sentence_transformers import CrossEncoder
import numpy as np
import torch
from qdrant_client.http import models as rest
import traceback
from .filter import create_filter
class ContextRetriever:
"""
Context retriever for hybrid search with optional filtering and reranking.
"""
def __init__(self, vectorstore: Qdrant, config: dict = None):
"""
Initialize the context retriever.
Args:
vectorstore: Qdrant vector store instance
config: Configuration dictionary
"""
self.vectorstore = vectorstore
self.config = config or {}
self.reranker = None
# BM25 attributes
self.bm25_vectorizer = None
self.bm25_matrix = None
self.bm25_documents = None
# Initialize reranker if available
# Try to get reranker model from different config paths
self.reranker_model_name = (
config.get('retrieval', {}).get('reranker_model') or
config.get('ranker', {}).get('model') or
config.get('reranker_model') or
'BAAI/bge-reranker-v2-m3'
)
self.reranker_type = self._detect_reranker_type(self.reranker_model_name)
try:
if self.reranker_type == 'colbert':
from colbert.infra import Run, ColBERTConfig
from colbert.modeling.checkpoint import Checkpoint
# ColBERT uses late interaction - different implementation needed
print(f"✅ RERANKER: ColBERT model detected ({self.reranker_model_name})")
print(f"🔍 INTERACTION TYPE: Late interaction (token-level embeddings)")
# Create ColBERT config for CPU mode
colbert_config = ColBERTConfig(
doc_maxlen=300,
query_maxlen=32,
nbits=2,
kmeans_niters=4,
root="./colbert_data"
)
# Load checkpoint (e.g. "colbert-ir/colbertv2.0")
self.colbert_checkpoint = Checkpoint(self.reranker_model_name, colbert_config=colbert_config)
self.colbert_model = self.colbert_checkpoint.model
self.colbert_tokenizer = self.colbert_checkpoint.raw_tokenizer
self.reranker = self._colbert_rerank # attach wrapper function
print(f"✅ COLBERT: Model and tokenizer loaded successfully")
else:
# Standard CrossEncoder for BGE and other models
from sentence_transformers import CrossEncoder
self.reranker = CrossEncoder(self.reranker_model_name)
print(f"✅ RERANKER: Initialized {self.reranker_model_name}")
print(f"🔍 INTERACTION TYPE: Cross-encoder (single relevance score)")
except Exception as e:
print(f"⚠️ Reranker initialization failed: {e}")
self.reranker = None
def _detect_reranker_type(self, model_name: str) -> str:
"""
Detect the type of reranker based on model name.
Args:
model_name: Name of the reranker model
Returns:
'colbert' for ColBERT models, 'crossencoder' for others
"""
model_name_lower = model_name.lower()
# ColBERT model patterns
colbert_patterns = [
'colbert',
'colbert-ir',
'colbertv2',
'colbert-v2'
]
for pattern in colbert_patterns:
if pattern in model_name_lower:
return 'colbert'
# Default to cross-encoder for BGE and other models
return 'crossencoder'
def _similarity_search_with_colbert_embeddings(self, query: str, k: int = 5, **kwargs) -> List[Tuple[Document, float]]:
"""
Perform similarity search and fetch ColBERT embeddings for documents.
Args:
query: Search query
k: Number of documents to retrieve
**kwargs: Additional search parameters (filter, etc.)
Returns:
List of (Document, score) tuples with ColBERT embeddings in metadata
"""
try:
print(f"🔍 COLBERT RETRIEVAL: Fetching documents with ColBERT embeddings")
# Use the vectorstore's similarity_search_with_score method instead of direct client
# This ensures proper filter handling
if 'filter' in kwargs and kwargs['filter']:
# Use the vectorstore method with filter
result = self.vectorstore.similarity_search_with_score(
query,
k=k,
filter=kwargs['filter']
)
else:
# Use the vectorstore method without filter
result = self.vectorstore.similarity_search_with_score(query, k=k)
# Convert to the format we need
if isinstance(result, tuple) and len(result) == 2:
documents, scores = result
elif isinstance(result, list):
documents = []
scores = []
for item in result:
if isinstance(item, tuple) and len(item) == 2:
doc, score = item
documents.append(doc)
scores.append(score)
else:
documents.append(item)
scores.append(0.0)
else:
documents = []
scores = []
# Now we need to fetch the ColBERT embeddings for these documents
# We'll use the Qdrant client directly for this part since we need specific payload fields
from qdrant_client.http import models as rest
collection_name = self.vectorstore.collection_name
# Get document IDs from the retrieved documents
doc_ids = []
for doc in documents:
# Extract ID from document metadata or use page_content hash as fallback
doc_id = doc.metadata.get('id') or doc.metadata.get('_id')
if not doc_id:
# Use a hash of the content as ID
import hashlib
doc_id = hashlib.md5(doc.page_content.encode()).hexdigest()
doc_ids.append(doc_id)
# Fetch documents with ColBERT embeddings from Qdrant
search_result = self.vectorstore.client.retrieve(
collection_name=collection_name,
ids=doc_ids,
with_payload=True,
with_vectors=False
)
# Convert results to Document objects with ColBERT embeddings
enhanced_documents = []
enhanced_scores = []
# Create a mapping from doc_id to original score
doc_id_to_score = {}
for i, doc in enumerate(documents):
doc_id = doc.metadata.get('id') or doc.metadata.get('_id')
if not doc_id:
import hashlib
doc_id = hashlib.md5(doc.page_content.encode()).hexdigest()
doc_id_to_score[doc_id] = scores[i]
for point in search_result:
# Extract payload
payload = point.payload
# Get the original score for this document
doc_id = str(point.id)
original_score = doc_id_to_score.get(doc_id, 0.0)
# Create Document object with ColBERT embeddings
doc = Document(
page_content=payload.get('page_content', ''),
metadata={
**payload.get('metadata', {}),
'colbert_embedding': payload.get('colbert_embedding'),
'colbert_model': payload.get('colbert_model'),
'colbert_calculated_at': payload.get('colbert_calculated_at')
}
)
enhanced_documents.append(doc)
enhanced_scores.append(original_score)
print(f"✅ COLBERT RETRIEVAL: Retrieved {len(enhanced_documents)} documents with ColBERT embeddings")
return list(zip(enhanced_documents, enhanced_scores))
except Exception as e:
print(f"❌ COLBERT RETRIEVAL ERROR: {e}")
print(f"❌ Falling back to regular similarity search")
# Fallback to regular search - handle filter parameter correctly
if 'filter' in kwargs and kwargs['filter']:
return self.vectorstore.similarity_search_with_score(query, k=k, filter=kwargs['filter'])
else:
return self.vectorstore.similarity_search_with_score(query, k=k)
def retrieve_context(
self,
query: str,
k: int = 5,
reports: Optional[List[str]] = None,
sources: Optional[List[str]] = None,
subtype: Optional[str] = None,
year: Optional[str] = None,
district: Optional[List[str]] = None,
filenames: Optional[List[str]] = None,
use_reranking: bool = False,
qdrant_filter: Optional[rest.Filter] = None
) -> List[Document]:
"""
Retrieve context documents using hybrid search with optional filtering and reranking.
Args:
query: User query
top_k: Number of documents to retrieve
reports: List of report names to filter by
sources: List of sources to filter by
subtype: Document subtype to filter by
year: Year to filter by
use_reranking: Whether to apply reranking
qdrant_filter: Pre-built Qdrant filter to use
Returns:
List of retrieved documents
"""
try:
# Determine how many documents to retrieve
retrieve_k = k #* 3 if use_reranking else k # Retrieve more for reranking
# Build search kwargs
search_kwargs = {}
# Use qdrant_filter if provided (this takes precedence)
if qdrant_filter:
search_kwargs = {"filter": qdrant_filter}
print(f"✅ FILTERS APPLIED: Using inferred Qdrant filter")
else:
# Build filter from individual parameters
filter_obj = create_filter(
reports=reports,
sources=sources,
subtype=subtype,
year=year,
district=district,
filenames=filenames
)
if filter_obj:
search_kwargs = {"filter": filter_obj}
print(f"✅ FILTERS APPLIED: Using built filter")
else:
search_kwargs = {}
print(f"⚠️ NO FILTERS APPLIED: All documents will be searched")
# Perform vector search
try:
# Check if we need ColBERT embeddings for reranking
if use_reranking and self.reranker_type == 'colbert':
result = self._similarity_search_with_colbert_embeddings(
query,
k=retrieve_k,
**search_kwargs
)
else:
result = self.vectorstore.similarity_search_with_score(
query,
k=retrieve_k,
**search_kwargs
)
# Handle different return formats
if isinstance(result, tuple) and len(result) == 2:
documents, scores = result
elif isinstance(result, list) and len(result) > 0:
# Handle case where result is a list of (Document, score) tuples
documents = []
scores = []
for item in result:
if isinstance(item, tuple) and len(item) == 2:
doc, score = item
documents.append(doc)
scores.append(score)
else:
# Handle case where item is just a Document
documents.append(item)
scores.append(0.0) # Default score
else:
documents = []
scores = []
print(f"✅ RETRIEVAL SUCCESS: Retrieved {len(documents)} documents (requested: {retrieve_k})")
# If we got fewer documents than requested, try without filters
if len(documents) < retrieve_k and search_kwargs.get('filter'):
print(f"⚠️ RETRIEVAL: Got {len(documents)} docs with filters, trying without filters...")
try:
result_no_filter = self.vectorstore.similarity_search_with_score(
query,
k=retrieve_k
)
if isinstance(result_no_filter, tuple) and len(result_no_filter) == 2:
documents_no_filter, scores_no_filter = result_no_filter
elif isinstance(result_no_filter, list):
documents_no_filter = []
scores_no_filter = []
for item in result_no_filter:
if isinstance(item, tuple) and len(item) == 2:
doc, score = item
documents_no_filter.append(doc)
scores_no_filter.append(score)
else:
documents_no_filter.append(item)
scores_no_filter.append(0.0)
else:
documents_no_filter = []
scores_no_filter = []
if len(documents_no_filter) > len(documents):
print(f"✅ RETRIEVAL: Got {len(documents_no_filter)} docs without filters")
documents = documents_no_filter
scores = scores_no_filter
except Exception as e:
print(f"⚠️ RETRIEVAL: Fallback search failed: {e}")
except Exception as e:
print(f"❌ RETRIEVAL ERROR: {str(e)}")
return []
# Apply reranking if enabled
reranking_applied = False
if use_reranking and len(documents) > 1:
print(f"🔄 RERANKING: Applying {self.reranker_model_name} to {len(documents)} documents...")
try:
original_docs = documents.copy()
original_scores = scores.copy()
# Apply reranking
# print(f"🔍 ORIGINAL DOCS: {documents[0]}")
reranked_docs = self._apply_reranking(query, documents, scores)
# print(f"🔍 RERANKED DOCS: {reranked_docs[0]}")
reranking_applied = len(reranked_docs) > 0
if reranking_applied:
print(f"✅ RERANKING APPLIED: {self.reranker_model_name}")
documents = reranked_docs
# Update scores to reflect reranking
# scores = [0.0] * len(documents) # Reranked scores are not directly comparable
else:
print(f"⚠️ RERANKING FAILED: Using original order")
documents = original_docs
scores = original_scores
return documents
except Exception as e:
print(f"❌ RERANKING ERROR: {str(e)}")
print(f"⚠️ RERANKING FAILED: Using original order")
reranking_applied = False
elif use_reranking and len(documents) <= 1:
print(f"ℹ️ RERANKING: Skipped (only {len(documents)} document(s) retrieved)")
if use_reranking:
print(f"ℹ️ RERANKING: Skipped (disabled or insufficient documents)")
# Store original scores in metadata
for i, (doc, score) in enumerate(zip(documents, scores)):
doc.metadata['original_score'] = float(score)
doc.metadata['reranking_applied'] = False
return documents
else:
print(f"ℹ️ RERANKING: Skipped (disabled or insufficient documents)")
# Limit to requested number of documents
documents = documents[:k]
scores = scores[:k] if scores else [0.0] * len(documents)
# Add metadata to documents
for i, (doc, score) in enumerate(zip(documents, scores)):
if hasattr(doc, 'metadata'):
doc.metadata.update({
'reranking_applied': reranking_applied,
'reranker_model': 'BAAI/bge-reranker-v2-m3' if reranking_applied else None,
'original_rank': i + 1,
'final_rank': i + 1,
'original_score': float(score) if score is not None else 0.0
})
return documents
except Exception as e:
print(f"❌ CONTEXT RETRIEVAL ERROR: {str(e)}")
return []
def _apply_reranking(self, query: str, documents: List[Document], scores: List[float]) -> List[Document]:
"""
Apply reranking to documents using the appropriate reranker.
Args:
query: User query
documents: List of documents to rerank
scores: Original scores
Returns:
Reranked list of documents
"""
if not self.reranker or len(documents) == 0:
return documents
try:
print(f"🔍 RERANKING METHOD: Starting reranking with {len(documents)} documents")
print(f"🔍 RERANKING TYPE: {self.reranker_type.upper()}")
if self.reranker_type == 'colbert':
return self._apply_colbert_reranking(query, documents, scores)
else:
return self._apply_crossencoder_reranking(query, documents, scores)
except Exception as e:
print(f"❌ RERANKING ERROR: {str(e)}")
return documents
def _apply_crossencoder_reranking(self, query: str, documents: List[Document], scores: List[float]) -> List[Document]:
"""
Apply reranking using CrossEncoder (BGE and other models).
Args:
query: User query
documents: List of documents to rerank
scores: Original scores
Returns:
Reranked list of documents
"""
# Prepare pairs for reranking
pairs = []
for doc in documents:
pairs.append([query, doc.page_content])
print(f"🔍 CROSS-ENCODER: Prepared {len(pairs)} pairs for reranking")
# Get reranking scores using the correct CrossEncoder API
rerank_scores = self.reranker.predict(pairs)
# Handle single score case
if not isinstance(rerank_scores, (list, np.ndarray)):
rerank_scores = [rerank_scores]
# Ensure we have the right number of scores
if len(rerank_scores) != len(documents):
print(f"⚠️ RERANKING WARNING: Expected {len(documents)} scores, got {len(rerank_scores)}")
return documents
print(f"🔍 CROSS-ENCODER: Got {len(rerank_scores)} rerank scores")
print(f"🔍 CROSS-ENCODER SCORES: {rerank_scores[:5]}...") # Show first 5 scores
# Combine documents with their rerank scores
doc_scores = list(zip(documents, rerank_scores))
# Sort by rerank score (descending)
doc_scores.sort(key=lambda x: x[1], reverse=True)
# Extract reranked documents and store scores in metadata
reranked_docs = []
for i, (doc, rerank_score) in enumerate(doc_scores):
# Find original index for original score
original_idx = documents.index(doc)
original_score = scores[original_idx] if original_idx < len(scores) else 0.0
# Create new document with reranking metadata
new_doc = Document(
page_content=doc.page_content,
metadata={
**doc.metadata,
'reranking_applied': True,
'reranker_model': self.reranker_model_name,
'reranker_type': self.reranker_type,
'original_rank': original_idx + 1,
'final_rank': i + 1,
'original_score': float(original_score),
'reranked_score': float(rerank_score)
}
)
reranked_docs.append(new_doc)
print(f"✅ CROSS-ENCODER: Reranked {len(reranked_docs)} documents")
return reranked_docs
def _apply_colbert_reranking(self, query: str, documents: List[Document], scores: List[float]) -> List[Document]:
"""
Apply reranking using ColBERT late interaction.
Args:
query: User query
documents: List of documents to rerank
scores: Original scores
Returns:
Reranked list of documents
"""
# Use the actual ColBERT reranking implementation
return self._colbert_rerank(query, documents, scores)
def _colbert_rerank(self, query: str, documents: List[Document], scores: List[float]) -> List[Document]:
"""
ColBERT reranking using late interaction with pre-calculated embeddings support.
Args:
query: User query
documents: List of documents to rerank
scores: Original scores
Returns:
Reranked list of documents
"""
try:
print(f"🔍 COLBERT: Starting late interaction reranking with {len(documents)} documents")
# Check if documents have pre-calculated ColBERT embeddings
pre_calculated_embeddings = []
documents_without_embeddings = []
documents_without_indices = []
for i, doc in enumerate(documents):
if (hasattr(doc, 'metadata') and
'colbert_embedding' in doc.metadata and
doc.metadata['colbert_embedding'] is not None):
# Use pre-calculated embedding
colbert_embedding = doc.metadata['colbert_embedding']
if isinstance(colbert_embedding, list):
colbert_embedding = torch.tensor(colbert_embedding)
pre_calculated_embeddings.append(colbert_embedding)
else:
# Need to calculate embedding
documents_without_embeddings.append(doc)
documents_without_indices.append(i)
# Calculate query embedding
query_embeddings = self.colbert_checkpoint.queryFromText([query])
# Calculate embeddings for documents without pre-calculated ones
if documents_without_embeddings:
print(f"🔄 COLBERT: Calculating embeddings for {len(documents_without_embeddings)} documents without pre-calculated embeddings")
doc_texts = [doc.page_content for doc in documents_without_embeddings]
doc_embeddings = self.colbert_checkpoint.docFromText(doc_texts)
# Insert calculated embeddings into the right positions
for i, embedding in enumerate(doc_embeddings):
idx = documents_without_indices[i]
pre_calculated_embeddings.insert(idx, embedding)
else:
print(f"✅ COLBERT: Using pre-calculated embeddings for all {len(documents)} documents")
# Calculate late interaction scores
# ColBERT uses MaxSim: for each query token, find max similarity with document tokens
colbert_scores = []
for i, doc_embedding in enumerate(pre_calculated_embeddings):
# Calculate similarity matrix between query and document i
sim_matrix = torch.matmul(query_embeddings[0], doc_embedding.transpose(-1, -2))
# MaxSim: for each query token, take max similarity with document
max_sim_per_query_token = torch.max(sim_matrix, dim=-1)[0]
# Sum over query tokens to get final score
final_score = torch.sum(max_sim_per_query_token).item()
colbert_scores.append(final_score)
# Sort documents by ColBERT scores
doc_scores = list(zip(documents, colbert_scores))
doc_scores.sort(key=lambda x: x[1], reverse=True)
# Create reranked documents with metadata
reranked_docs = []
for i, (doc, colbert_score) in enumerate(doc_scores):
original_idx = documents.index(doc)
original_score = scores[original_idx] if original_idx < len(scores) else 0.0
new_doc = Document(
page_content=doc.page_content,
metadata={
**doc.metadata,
'reranking_applied': True,
'reranker_model': self.reranker_model_name,
'reranker_type': self.reranker_type,
'original_rank': original_idx + 1,
'final_rank': i + 1,
'original_score': float(original_score),
'reranked_score': float(colbert_score),
'colbert_score': float(colbert_score),
'colbert_embedding_pre_calculated': 'colbert_embedding' in doc.metadata
}
)
reranked_docs.append(new_doc)
print(f"✅ COLBERT: Reranked {len(reranked_docs)} documents using late interaction")
print(f"🔍 COLBERT SCORES: {[f'{score:.4f}' for score in colbert_scores[:5]]}...")
return reranked_docs
except Exception as e:
print(f"❌ COLBERT RERANKING ERROR: {str(e)}")
print(f"❌ COLBERT TRACEBACK: {traceback.format_exc()}")
# Fallback to original order - return documents as-is
return documents
def retrieve_with_scores(self, query: str, vectorstore=None, k: int = 5, reports: List[str] = None,
sources: List[str] = None, subtype: List[str] = None,
year: List[str] = None, use_reranking: bool = False,
qdrant_filter: Optional[rest.Filter] = None) -> Tuple[List[Document], List[float]]:
"""
Retrieve context documents with scores using hybrid search with optional reranking.
Args:
query: User query
vectorstore: Optional vectorstore instance (for compatibility)
k: Number of documents to retrieve
reports: List of report names to filter by
sources: List of sources to filter by
subtype: Document subtype to filter by
year: List of years to filter by
use_reranking: Whether to apply reranking
qdrant_filter: Pre-built Qdrant filter
Returns:
Tuple of (documents, scores)
"""
try:
# Use the provided vectorstore if available, otherwise use the instance one
if vectorstore:
self.vectorstore = vectorstore
# Determine search strategy
search_strategy = self.config.get('retrieval', {}).get('search_strategy', 'vector_only')
if search_strategy == 'vector_only':
# Vector search only
print(f"🔄 VECTOR SEARCH: Retrieving {k} documents...")
if qdrant_filter:
print(f"✅ QDRANT FILTER APPLIED: Using inferred Qdrant filter")
# Pass filter as positional argument, not keyword argument
results = self.vectorstore.similarity_search_with_score(
query,
k=k,
filter=qdrant_filter
)
else:
# Build filter from individual parameters
filter_conditions = self._build_filter_conditions(reports, sources, subtype, year)
if filter_conditions:
print(f"✅ FILTER APPLIED: {filter_conditions}")
results = self.vectorstore.similarity_search_with_score(
query,
k=k,
filter=filter_conditions
)
else:
print(f"ℹ️ NO FILTERS APPLIED: All documents will be searched")
results = self.vectorstore.similarity_search_with_score(query, k=k)
print(f"🔍 SEARCH DEBUG: Raw result type: {type(results)}")
print(f"🔍 SEARCH DEBUG: Raw result length: {len(results)}")
# Handle different result formats
if results and isinstance(results[0], tuple):
documents = [doc for doc, score in results]
scores = [score for doc, score in results]
print(f"🔍 SEARCH DEBUG: After unpacking - documents: {len(documents)}, scores: {len(scores)}")
else:
documents = results
scores = [0.0] * len(documents)
print(f"🔍 SEARCH DEBUG: No scores available, using default")
print(f"🔧 CONVERTING: Converting {len(documents)} documents")
# Convert to Document objects and store original scores
final_documents = []
for i, (doc, score) in enumerate(zip(documents, scores)):
if hasattr(doc, 'page_content'):
new_doc = Document(
page_content=doc.page_content,
metadata=doc.metadata.copy()
)
# Store original score in metadata
new_doc.metadata['original_score'] = float(score) if score is not None else 0.0
final_documents.append(new_doc)
else:
print(f"⚠️ WARNING: Document {i} has no page_content")
print(f"✅ RETRIEVAL SUCCESS: Retrieved {len(final_documents)} documents")
# Apply reranking if enabled
if use_reranking and len(final_documents) > 1:
print(f"🔄 RERANKING: Applying {self.reranker_model} to {len(final_documents)} documents...")
final_documents = self._apply_reranking(query, final_documents, scores)
print(f"✅ RERANKING APPLIED: {self.reranker_model}")
else:
print(f"ℹ️ RERANKING: Skipped (disabled or no documents)")
return final_documents, scores
else:
print(f"❌ UNSUPPORTED STRATEGY: {search_strategy}")
return [], []
except Exception as e:
print(f"❌ RETRIEVAL ERROR: {e}")
print(f"❌ RETRIEVAL TRACEBACK: {traceback.format_exc()}")
return [], []
def _build_filter_conditions(self, reports: List[str] = None, sources: List[str] = None,
subtype: List[str] = None, year: List[str] = None) -> Optional[rest.Filter]:
"""
Build Qdrant filter conditions from individual parameters.
Args:
reports: List of report names
sources: List of sources
subtype: Document subtype
year: List of years
Returns:
Qdrant filter or None
"""
conditions = []
if reports:
conditions.append(rest.FieldCondition(
key="metadata.filename",
match=rest.MatchAny(any=reports)
))
if sources:
conditions.append(rest.FieldCondition(
key="metadata.source",
match=rest.MatchAny(any=sources)
))
if subtype:
conditions.append(rest.FieldCondition(
key="metadata.subtype",
match=rest.MatchAny(any=subtype)
))
if year:
conditions.append(rest.FieldCondition(
key="metadata.year",
match=rest.MatchAny(any=year)
))
if conditions:
return rest.Filter(must=conditions)
return None
def get_context(
query: str,
vectorstore: Qdrant,
k: int = 5,
reports: Optional[List[str]] = None,
sources: Optional[List[str]] = None,
subtype: Optional[str] = None,
year: Optional[str] = None,
use_reranking: bool = False,
qdrant_filter: Optional[rest.Filter] = None
) -> List[Document]:
"""
Convenience function to get context documents.
Args:
query: User query
vectorstore: Qdrant vector store instance
k: Number of documents to retrieve
reports: Optional list of report names to filter by
sources: Optional list of source categories to filter by
subtype: Optional subtype to filter by
year: Optional year to filter by
use_reranking: Whether to apply reranking
qdrant_filter: Optional pre-built Qdrant filter
Returns:
List of retrieved documents
"""
retriever = ContextRetriever(vectorstore)
return retriever.retrieve_context(
query=query,
k=k,
reports=reports,
sources=sources,
subtype=subtype,
year=year,
use_reranking=use_reranking,
qdrant_filter=qdrant_filter
)
def format_context_for_llm(documents: List[Document]) -> str:
"""
Format retrieved documents for LLM input.
Args:
documents: List of Document objects
Returns:
Formatted string for LLM
"""
if not documents:
return ""
formatted_parts = []
for i, doc in enumerate(documents, 1):
content = doc.page_content.strip()
source = doc.metadata.get('filename', 'Unknown')
formatted_parts.append(f"Document {i} (Source: {source}):\n{content}")
return "\n\n".join(formatted_parts)
def get_context_metadata(documents: List[Document]) -> Dict[str, Any]:
"""
Extract metadata summary from retrieved documents.
Args:
documents: List of Document objects
Returns:
Dictionary with metadata summary
"""
if not documents:
return {}
sources = set()
years = set()
doc_types = set()
for doc in documents:
metadata = doc.metadata
if 'filename' in metadata:
sources.add(metadata['filename'])
if 'year' in metadata:
years.add(metadata['year'])
if 'source' in metadata:
doc_types.add(metadata['source'])
return {
"num_documents": len(documents),
"sources": list(sources),
"years": list(years),
"document_types": list(doc_types)
}