ishaq101's picture
[NOTICKET] Demo agentic agent
bef5e76
"""Service for retrieving relevant documents from vector store."""
import hashlib
import json
from src.db.postgres.vector_store import get_vector_store
from src.db.redis.connection import get_redis
from sqlalchemy.ext.asyncio import AsyncSession
from src.middlewares.logging import get_logger
from typing import List, Dict, Any
logger = get_logger("retriever")
_RETRIEVAL_CACHE_TTL = 3600 # 1 hour
class RetrieverService:
"""Service for retrieving relevant documents."""
def __init__(self):
self.vector_store = get_vector_store()
async def retrieve(
self,
query: str,
user_id: str,
db: AsyncSession,
k: int = 5
) -> List[Dict[str, Any]]:
"""Retrieve relevant chunks for a query, scoped to the user's documents.
Returns:
List of dicts with keys: content, metadata
metadata includes: document_id, user_id, filename, chunk_index, page_label (if PDF)
"""
try:
redis = await get_redis()
query_hash = hashlib.md5(query.encode()).hexdigest()
cache_key = f"retrieval:{user_id}:{query_hash}:{k}"
cached = await redis.get(cache_key)
if cached:
logger.info("Returning cached retrieval results")
return json.loads(cached)
logger.info(f"Retrieving for user {user_id}, query: {query[:50]}...")
docs = await self.vector_store.asimilarity_search(
query=query,
k=k,
filter={"user_id": user_id}
)
results = [
{
"content": doc.page_content,
"metadata": doc.metadata,
}
for doc in docs
]
logger.info(f"Retrieved {len(results)} chunks")
await redis.setex(cache_key, _RETRIEVAL_CACHE_TTL, json.dumps(results))
return results
except Exception as e:
logger.error("Retrieval failed", error=str(e))
return []
retriever = RetrieverService()