""" Application Layer - Query Processing Use Case Orchestrates the RAG pipeline for answering user queries. """ import time from typing import List from app.application.dto import QueryDTO, QueryResponseDTO, SourceDTO from app.domain.entities import Query, QueryRequest, Source from app.domain.interfaces import ILLM, ICache, IPromptBuilder, IReranker, IRetriever class QueryProcessingUseCase: """Use case for processing user queries through RAG pipeline""" def __init__( self, retriever: IRetriever, reranker: IReranker, llm: ILLM, prompt_builder: IPromptBuilder, cache: ICache, ): self.retriever = retriever self.reranker = reranker self.llm = llm self.prompt_builder = prompt_builder self.cache = cache async def execute(self, query_dto: QueryDTO) -> QueryResponseDTO: """Execute query processing pipeline""" start_time = time.time() # 1. Create query request query_request = QueryRequest( query_text=query_dto.query_text, department=query_dto.department, user_id=query_dto.user_id, session_id=query_dto.session_id, top_k=query_dto.top_k, temperature=query_dto.temperature, max_tokens=query_dto.max_tokens, filters=query_dto.filters, ) # 2. Check semantic cache cache_key = f"query:{hash(query_dto.query_text)}:{query_dto.department}" cached_response = await self.cache.get(cache_key) if cached_response: return cached_response # 3. Retrieve relevant documents filters = {"department": query_dto.department} if query_dto.filters: filters.update(query_dto.filters) retrieval_results = await self.retriever.hybrid_search( query=query_dto.query_text, top_k=100, # Initial retrieval alpha=0.5, filters=filters, ) # 4. Rerank results reranked_results = await self.reranker.rerank( query=query_dto.query_text, results=retrieval_results, top_k=query_dto.top_k ) # 5. Build context context = [result.content for result in reranked_results] # 6. Build prompt messages = self.prompt_builder.build_rag_prompt( query=query_dto.query_text, context=context, system_prompt=self._get_system_prompt(query_dto.department), ) # 7. Generate answer llm_response = await self.llm.generate( messages=messages, temperature=query_dto.temperature, max_tokens=query_dto.max_tokens, ) # 8. Create sources sources = [ SourceDTO( title=f"Document {result.document_id}", content=result.content[:500], # Truncate for response relevance_score=result.score, document_id=result.document_id, chunk_index=result.chunk_index, metadata=result.metadata, ) for result in reranked_results ] # 9. Calculate metrics processing_time_ms = int((time.time() - start_time) * 1000) # 10. Build response response = QueryResponseDTO( query_id=str(query_request.id) if hasattr(query_request, "id") else "temp", answer=llm_response.content, sources=sources, confidence=self._calculate_confidence(reranked_results), processing_time_ms=processing_time_ms, tokens_used=llm_response.tokens_used, model=llm_response.model, ) # 11. Cache response await self.cache.set(cache_key, response, ttl=3600) return response def _get_system_prompt(self, department: str) -> str: """Get department-specific system prompt""" prompts = { "HR": "You are a helpful HR assistant for employee onboarding. Provide clear, accurate information about HR policies, benefits, and procedures.", "IT": "You are an IT support assistant for new employees. Help with technical setup, access, and IT policies.", "Legal": "You are a legal compliance assistant. Provide information about legal policies, regulations, and compliance requirements.", "Finance": "You are a finance assistant. Help with expense policies, financial procedures, and budget information.", "General": "You are a helpful corporate onboarding assistant. Provide accurate information to help new employees integrate successfully.", } return prompts.get(department, prompts["General"]) def _calculate_confidence(self, results: List) -> float: """Calculate confidence score based on retrieval results""" if not results: return 0.0 # Average of top 3 scores top_scores = [r.score for r in results[:3]] return sum(top_scores) / len(top_scores) if top_scores else 0.0