Spaces:
Sleeping
Sleeping
| """ | |
| Presentation Layer - API Endpoints | |
| """ | |
| import time | |
| from datetime import datetime | |
| from typing import List | |
| from uuid import uuid4 | |
| from fastapi import APIRouter, Depends, File, Form, HTTPException, UploadFile, status | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from app.application.dto import DocumentUploadDTO, QueryDTO | |
| from app.application.services import ChunkingService | |
| from app.application.use_cases.document_indexing import DocumentIndexingUseCase | |
| from app.application.use_cases.query_processing import QueryProcessingUseCase | |
| from app.core.config import get_settings | |
| from app.core.logging import get_logger, set_correlation_id | |
| from app.core.metrics import ( | |
| active_requests, | |
| http_request_duration_seconds, | |
| http_requests_total, | |
| queries_total, | |
| ) | |
| from app.infrastructure.cache.redis_cache import RedisCache | |
| from app.infrastructure.external.embedder import SentenceTransformerEmbedder | |
| from app.infrastructure.external.gemini_llm import GeminiLLM | |
| from app.infrastructure.external.prompt_builder import DefaultPromptBuilder | |
| from app.infrastructure.external.qdrant_retriever import QdrantRetriever | |
| from app.infrastructure.repositories.postgres_repository import ( | |
| PostgresChunkRepository, | |
| PostgresDocumentRepository, | |
| ) | |
| from app.presentation.api.v1.schemas import ( | |
| DocumentResponse, | |
| HealthResponse, | |
| QueryRequest, | |
| QueryResponse, | |
| SourceSchema, | |
| ) | |
| router = APIRouter(prefix="/api/v1", tags=["api"]) | |
| logger = get_logger(__name__) | |
| settings = get_settings() | |
| # Dependency injection (simplified - in production use proper DI container) | |
| async def get_query_use_case() -> QueryProcessingUseCase: | |
| """Get query processing use case""" | |
| # Initialize services | |
| embedder = SentenceTransformerEmbedder(settings.embedding_model) | |
| retriever = QdrantRetriever( | |
| url=settings.qdrant_url, | |
| collection_name=settings.qdrant_collection_name, | |
| vector_size=settings.qdrant_vector_size, | |
| api_key=settings.qdrant_api_key if settings.qdrant_api_key else None, | |
| ) | |
| llm = GeminiLLM(api_key=settings.gemini_api_key, model_name=settings.gemini_model) | |
| prompt_builder = DefaultPromptBuilder() | |
| cache = RedisCache(redis_url=settings.redis_url) | |
| # For now, using a simple reranker (in production use cross-encoder) | |
| from app.infrastructure.external.simple_reranker import SimpleReranker | |
| reranker = SimpleReranker() | |
| return QueryProcessingUseCase( | |
| retriever=retriever, | |
| reranker=reranker, | |
| llm=llm, | |
| prompt_builder=prompt_builder, | |
| cache=cache, | |
| ) | |
| async def process_query( | |
| request: QueryRequest, | |
| use_case: QueryProcessingUseCase = Depends(get_query_use_case), | |
| ) -> QueryResponse: | |
| """Process user query through RAG pipeline""" | |
| start_time = time.time() | |
| correlation_id = str(uuid4()) | |
| set_correlation_id(correlation_id) | |
| active_requests.inc() | |
| try: | |
| logger.info("processing_query", query=request.query_text, department=request.department) | |
| # Convert to DTO | |
| query_dto = QueryDTO( | |
| query_text=request.query_text, | |
| department=request.department, | |
| user_id=request.user_id, | |
| session_id=request.session_id, | |
| top_k=request.top_k, | |
| temperature=request.temperature, | |
| max_tokens=request.max_tokens, | |
| filters=request.filters, | |
| ) | |
| # Execute use case | |
| response_dto = await use_case.execute(query_dto) | |
| # Convert to response schema | |
| response = QueryResponse( | |
| query_id=response_dto.query_id, | |
| answer=response_dto.answer, | |
| sources=[ | |
| SourceSchema( | |
| title=src.title, | |
| content=src.content, | |
| relevance_score=src.relevance_score, | |
| document_id=src.document_id, | |
| chunk_index=src.chunk_index, | |
| metadata=src.metadata, | |
| ) | |
| for src in response_dto.sources | |
| ], | |
| confidence=response_dto.confidence, | |
| processing_time_ms=response_dto.processing_time_ms, | |
| tokens_used=response_dto.tokens_used, | |
| model=response_dto.model, | |
| ) | |
| # Metrics | |
| duration = time.time() - start_time | |
| http_requests_total.labels(method="POST", endpoint="/api/v1/query", status="200").inc() | |
| http_request_duration_seconds.labels(method="POST", endpoint="/api/v1/query").observe( | |
| duration | |
| ) | |
| queries_total.labels(department=request.department, status="success").inc() | |
| logger.info("query_processed", query_id=response.query_id, duration_ms=int(duration * 1000)) | |
| return response | |
| except Exception as e: | |
| logger.error("query_processing_error", error=str(e), exc_info=True) | |
| http_requests_total.labels(method="POST", endpoint="/api/v1/query", status="500").inc() | |
| queries_total.labels(department=request.department, status="error").inc() | |
| raise HTTPException(status_code=500, detail=f"Query processing failed: {str(e)}") | |
| finally: | |
| active_requests.dec() | |
| async def health_check() -> HealthResponse: | |
| """Health check endpoint""" | |
| return HealthResponse( | |
| status="healthy", | |
| version=settings.app_version, | |
| timestamp=datetime.utcnow(), | |
| services={ | |
| "database": "unknown", # TODO: Add actual health checks | |
| "redis": "unknown", | |
| "qdrant": "unknown", | |
| }, | |
| ) | |
| async def metrics(): | |
| """Prometheus metrics endpoint""" | |
| from prometheus_client import CONTENT_TYPE_LATEST, generate_latest | |
| return generate_latest() | |