| """Service for retrieving relevant documents from vector store.""" |
|
|
| import hashlib |
| import json |
| from src.db.postgres.vector_store import get_vector_store |
| from src.db.redis.connection import get_redis |
| from sqlalchemy.ext.asyncio import AsyncSession |
| from src.middlewares.logging import get_logger |
| from typing import List, Dict, Any |
|
|
| logger = get_logger("retriever") |
|
|
| _RETRIEVAL_CACHE_TTL = 3600 |
|
|
|
|
| class RetrieverService: |
| """Service for retrieving relevant documents.""" |
|
|
| def __init__(self): |
| self.vector_store = get_vector_store() |
|
|
| async def retrieve( |
| self, |
| query: str, |
| user_id: str, |
| db: AsyncSession, |
| k: int = 5 |
| ) -> List[Dict[str, Any]]: |
| """Retrieve relevant chunks for a query, scoped to the user's documents. |
| |
| Returns: |
| List of dicts with keys: content, metadata |
| metadata includes: document_id, user_id, filename, chunk_index, page_label (if PDF) |
| """ |
| try: |
| redis = await get_redis() |
| query_hash = hashlib.md5(query.encode()).hexdigest() |
| cache_key = f"retrieval:{user_id}:{query_hash}:{k}" |
|
|
| cached = await redis.get(cache_key) |
| if cached: |
| logger.info("Returning cached retrieval results") |
| return json.loads(cached) |
|
|
| logger.info(f"Retrieving for user {user_id}, query: {query[:50]}...") |
|
|
| docs = await self.vector_store.asimilarity_search( |
| query=query, |
| k=k, |
| filter={"user_id": user_id} |
| ) |
|
|
| results = [ |
| { |
| "content": doc.page_content, |
| "metadata": doc.metadata, |
| } |
| for doc in docs |
| ] |
|
|
| logger.info(f"Retrieved {len(results)} chunks") |
| await redis.setex(cache_key, _RETRIEVAL_CACHE_TTL, json.dumps(results)) |
| return results |
|
|
| except Exception as e: |
| logger.error("Retrieval failed", error=str(e)) |
| return [] |
|
|
|
|
| retriever = RetrieverService() |
|
|