""" Presentation Layer - API Endpoints """ import time from datetime import datetime from typing import List from uuid import uuid4 from fastapi import APIRouter, Depends, File, Form, HTTPException, UploadFile, status from sqlalchemy.ext.asyncio import AsyncSession from app.application.dto import DocumentUploadDTO, QueryDTO from app.application.services import ChunkingService from app.application.use_cases.document_indexing import DocumentIndexingUseCase from app.application.use_cases.query_processing import QueryProcessingUseCase from app.core.config import get_settings from app.core.logging import get_logger, set_correlation_id from app.core.metrics import ( active_requests, http_request_duration_seconds, http_requests_total, queries_total, ) from app.infrastructure.cache.redis_cache import RedisCache from app.infrastructure.external.embedder import SentenceTransformerEmbedder from app.infrastructure.external.gemini_llm import GeminiLLM from app.infrastructure.external.prompt_builder import DefaultPromptBuilder from app.infrastructure.external.qdrant_retriever import QdrantRetriever from app.infrastructure.repositories.postgres_repository import ( PostgresChunkRepository, PostgresDocumentRepository, ) from app.presentation.api.v1.schemas import ( DocumentResponse, HealthResponse, QueryRequest, QueryResponse, SourceSchema, ) router = APIRouter(prefix="/api/v1", tags=["api"]) logger = get_logger(__name__) settings = get_settings() # Dependency injection (simplified - in production use proper DI container) async def get_query_use_case() -> QueryProcessingUseCase: """Get query processing use case""" # Initialize services embedder = SentenceTransformerEmbedder(settings.embedding_model) retriever = QdrantRetriever( url=settings.qdrant_url, collection_name=settings.qdrant_collection_name, vector_size=settings.qdrant_vector_size, api_key=settings.qdrant_api_key if settings.qdrant_api_key else None, ) llm = GeminiLLM(api_key=settings.gemini_api_key, model_name=settings.gemini_model) prompt_builder = DefaultPromptBuilder() cache = RedisCache(redis_url=settings.redis_url) # For now, using a simple reranker (in production use cross-encoder) from app.infrastructure.external.simple_reranker import SimpleReranker reranker = SimpleReranker() return QueryProcessingUseCase( retriever=retriever, reranker=reranker, llm=llm, prompt_builder=prompt_builder, cache=cache, ) @router.post("/query", response_model=QueryResponse, status_code=status.HTTP_200_OK) async def process_query( request: QueryRequest, use_case: QueryProcessingUseCase = Depends(get_query_use_case), ) -> QueryResponse: """Process user query through RAG pipeline""" start_time = time.time() correlation_id = str(uuid4()) set_correlation_id(correlation_id) active_requests.inc() try: logger.info("processing_query", query=request.query_text, department=request.department) # Convert to DTO query_dto = QueryDTO( query_text=request.query_text, department=request.department, user_id=request.user_id, session_id=request.session_id, top_k=request.top_k, temperature=request.temperature, max_tokens=request.max_tokens, filters=request.filters, ) # Execute use case response_dto = await use_case.execute(query_dto) # Convert to response schema response = QueryResponse( query_id=response_dto.query_id, answer=response_dto.answer, sources=[ SourceSchema( title=src.title, content=src.content, relevance_score=src.relevance_score, document_id=src.document_id, chunk_index=src.chunk_index, metadata=src.metadata, ) for src in response_dto.sources ], confidence=response_dto.confidence, processing_time_ms=response_dto.processing_time_ms, tokens_used=response_dto.tokens_used, model=response_dto.model, ) # Metrics duration = time.time() - start_time http_requests_total.labels(method="POST", endpoint="/api/v1/query", status="200").inc() http_request_duration_seconds.labels(method="POST", endpoint="/api/v1/query").observe( duration ) queries_total.labels(department=request.department, status="success").inc() logger.info("query_processed", query_id=response.query_id, duration_ms=int(duration * 1000)) return response except Exception as e: logger.error("query_processing_error", error=str(e), exc_info=True) http_requests_total.labels(method="POST", endpoint="/api/v1/query", status="500").inc() queries_total.labels(department=request.department, status="error").inc() raise HTTPException(status_code=500, detail=f"Query processing failed: {str(e)}") finally: active_requests.dec() @router.get("/health", response_model=HealthResponse) async def health_check() -> HealthResponse: """Health check endpoint""" return HealthResponse( status="healthy", version=settings.app_version, timestamp=datetime.utcnow(), services={ "database": "unknown", # TODO: Add actual health checks "redis": "unknown", "qdrant": "unknown", }, ) @router.get("/metrics") async def metrics(): """Prometheus metrics endpoint""" from prometheus_client import CONTENT_TYPE_LATEST, generate_latest return generate_latest()