"""Service for retrieving relevant documents from vector store.""" import hashlib import json from src.db.postgres.vector_store import get_vector_store from src.db.redis.connection import get_redis from sqlalchemy.ext.asyncio import AsyncSession from src.middlewares.logging import get_logger from typing import List, Dict, Any logger = get_logger("retriever") _RETRIEVAL_CACHE_TTL = 3600 # 1 hour class RetrieverService: """Service for retrieving relevant documents.""" def __init__(self): self.vector_store = get_vector_store() async def retrieve( self, query: str, user_id: str, db: AsyncSession, k: int = 5 ) -> List[Dict[str, Any]]: """Retrieve relevant chunks for a query, scoped to the user's documents. Returns: List of dicts with keys: content, metadata metadata includes: document_id, user_id, filename, chunk_index, page_label (if PDF) """ try: redis = await get_redis() query_hash = hashlib.md5(query.encode()).hexdigest() cache_key = f"retrieval:{user_id}:{query_hash}:{k}" cached = await redis.get(cache_key) if cached: logger.info("Returning cached retrieval results") return json.loads(cached) logger.info(f"Retrieving for user {user_id}, query: {query[:50]}...") docs = await self.vector_store.asimilarity_search( query=query, k=k, filter={"user_id": user_id} ) results = [ { "content": doc.page_content, "metadata": doc.metadata, } for doc in docs ] logger.info(f"Retrieved {len(results)} chunks") await redis.setex(cache_key, _RETRIEVAL_CACHE_TTL, json.dumps(results)) return results except Exception as e: logger.error("Retrieval failed", error=str(e)) return [] retriever = RetrieverService()