""" API REST para RAG Template usando FastAPI. Endpoints principais: - POST /api/v1/ingest - Ingestao de documentos - POST /api/v1/query - Query RAG - GET /api/v1/documents - Listar documentos - DELETE /api/v1/documents/{id} - Deletar documento - GET /api/v1/health - Health check """ from typing import List, Optional, Dict, Any from datetime import datetime from fastapi import FastAPI, HTTPException, Depends, status, File, UploadFile, Header from fastapi.responses import JSONResponse from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field import os import tempfile from pathlib import Path from src.database import DatabaseManager from src.embeddings import EmbeddingManager from src.generation import GenerationManager from src.document_processing import DocumentProcessor from src.chunking import ChunkingStrategy from src.metadata import MetadataManager, DocumentMetadata from src.config import DATABASE_URL from src.monitoring import get_metrics_collector, get_tracing_manager # Schemas Pydantic class IngestRequest(BaseModel): """Request para ingestao de documento.""" text: str = Field(..., description="Texto do documento") title: str = Field(..., description="Titulo do documento") chunk_size: int = Field(default=1000, ge=100, le=5000, description="Tamanho dos chunks") chunk_overlap: int = Field(default=200, ge=0, le=1000, description="Overlap entre chunks") strategy: str = Field(default="recursive", description="Estrategia de chunking") metadata: Optional[Dict[str, Any]] = Field(default=None, description="Metadados do documento") class IngestResponse(BaseModel): """Response da ingestao.""" document_id: int num_chunks: int message: str metadata: Optional[Dict[str, Any]] = None class QueryRequest(BaseModel): """Request para query RAG.""" query: str = Field(..., min_length=1, description="Query do usuario") top_k: int = Field(default=5, ge=1, le=20, description="Numero de resultados") temperature: float = Field(default=0.3, ge=0.0, le=2.0, description="Temperature para geracao") max_tokens: int = Field(default=512, ge=50, le=2048, description="Tokens maximos") model: Optional[str] = Field(default=None, description="Modelo LLM a usar") filters: Optional[Dict[str, Any]] = Field(default=None, description="Filtros de metadata") class QueryResponse(BaseModel): """Response da query.""" query: str response: str contexts: List[Dict[str, Any]] metadata: Dict[str, Any] class DocumentResponse(BaseModel): """Response de documento.""" id: int title: str content: Optional[str] = None chunk_count: int created_at: Optional[str] = None metadata: Optional[Dict[str, Any]] = None class HealthResponse(BaseModel): """Response do health check.""" status: str timestamp: str database: str embeddings: str version: str # API Key Authentication API_KEYS = set(os.getenv("API_KEYS", "").split(",")) async def verify_api_key(x_api_key: str = Header(...)): """Verifica API key.""" if not API_KEYS or x_api_key in API_KEYS: return x_api_key raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid API key" ) # FastAPI App app = FastAPI( title="RAG Template API", description="API REST para sistema RAG com PostgreSQL + pgvector", version="1.6.0", docs_url="/api/docs", redoc_url="/api/redoc", openapi_url="/api/openapi.json" ) # CORS app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Monitoring metrics_collector = get_metrics_collector() tracing_manager = get_tracing_manager( service_name="rag-template-api", otlp_endpoint=os.getenv("OTLP_ENDPOINT") ) # Inicializar managers (lazy loading) _db_manager = None _embedding_manager = None _generation_manager = None _metadata_manager = None def get_db_manager(): """Obtem instancia do DatabaseManager.""" global _db_manager if _db_manager is None: _db_manager = DatabaseManager(DATABASE_URL) return _db_manager def get_embedding_manager(): """Obtem instancia do EmbeddingManager.""" global _embedding_manager if _embedding_manager is None: _embedding_manager = EmbeddingManager() return _embedding_manager def get_generation_manager(): """Obtem instancia do GenerationManager.""" global _generation_manager if _generation_manager is None: _generation_manager = GenerationManager() return _generation_manager def get_metadata_manager(): """Obtem instancia do MetadataManager.""" global _metadata_manager if _metadata_manager is None: _metadata_manager = MetadataManager(get_db_manager()) return _metadata_manager # Endpoints @app.get("/api/v1/health", response_model=HealthResponse) async def health_check(): """Health check do sistema.""" db_status = "healthy" embeddings_status = "healthy" try: db = get_db_manager() db.get_database_stats() except Exception: db_status = "unhealthy" try: emb = get_embedding_manager() emb.encode("test") except Exception: embeddings_status = "unhealthy" status_overall = "healthy" if db_status == "healthy" and embeddings_status == "healthy" else "degraded" return HealthResponse( status=status_overall, timestamp=datetime.now().isoformat(), database=db_status, embeddings=embeddings_status, version="1.6.0" ) @app.get("/metrics") async def metrics(): """ Endpoint de metricas Prometheus. Retorna metricas no formato Prometheus. """ from fastapi.responses import Response from prometheus_client import CONTENT_TYPE_LATEST metrics_data = metrics_collector.get_metrics() return Response(content=metrics_data, media_type=CONTENT_TYPE_LATEST) @app.post("/api/v1/ingest", response_model=IngestResponse, dependencies=[Depends(verify_api_key)]) async def ingest_document(request: IngestRequest): """ Ingere documento no sistema. Processa texto, divide em chunks, gera embeddings e armazena no banco. """ try: db = get_db_manager() emb = get_embedding_manager() metadata_manager = get_metadata_manager() # Criar chunking strategy strategy_map = { "fixed": ChunkingStrategy.FIXED_SIZE, "sentence": ChunkingStrategy.SENTENCE, "semantic": ChunkingStrategy.SEMANTIC, "recursive": ChunkingStrategy.RECURSIVE } strategy = strategy_map.get(request.strategy, ChunkingStrategy.RECURSIVE) # Processar chunks from src.chunking import chunk_text chunks = chunk_text( request.text, strategy=strategy, chunk_size=request.chunk_size, overlap=request.chunk_overlap ) if not chunks: raise HTTPException(status_code=400, detail="No chunks generated from text") # Gerar embeddings chunk_texts = [c.text for c in chunks] embeddings = emb.encode_batch(chunk_texts) # Criar metadata doc_metadata = None if request.metadata: doc_metadata = DocumentMetadata.from_dict(request.metadata) metadata_manager.validate_metadata(doc_metadata) # Inserir no banco session_id = f"api_{datetime.now().timestamp()}" document_id = db.insert_document( title=request.title, content=request.text, chunks=chunk_texts, embeddings=embeddings, session_id=session_id ) # Atualizar metadata se fornecido if doc_metadata: metadata_manager.update_document_metadata(document_id, doc_metadata) return IngestResponse( document_id=document_id, num_chunks=len(chunks), message="Document ingested successfully", metadata=request.metadata ) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/v1/query", response_model=QueryResponse, dependencies=[Depends(verify_api_key)]) async def query_rag(request: QueryRequest): """ Executa query RAG. Busca contextos relevantes e gera resposta usando LLM. """ try: db = get_db_manager() emb = get_embedding_manager() gen = get_generation_manager() metadata_manager = get_metadata_manager() # Gerar embedding da query query_embedding = emb.encode(request.query) # Buscar contextos (com filtros se fornecido) if request.filters: contexts = metadata_manager.search_with_filters( query_embedding=query_embedding, filters=request.filters, top_k=request.top_k ) else: contexts = db.search_similar( query_embedding=query_embedding, top_k=request.top_k ) if not contexts: return QueryResponse( query=request.query, response="Desculpe, nao encontrei informacoes relevantes.", contexts=[], metadata={"num_contexts": 0, "model": request.model or "default"} ) # Gerar resposta context_texts = [c['content'] for c in contexts] response = gen.generate_response( query=request.query, contexts=context_texts, temperature=request.temperature, max_tokens=request.max_tokens, model=request.model ) return QueryResponse( query=request.query, response=response, contexts=contexts, metadata={ "num_contexts": len(contexts), "model": request.model or "default", "temperature": request.temperature, "max_tokens": request.max_tokens } ) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/v1/documents", response_model=List[DocumentResponse], dependencies=[Depends(verify_api_key)]) async def list_documents( limit: int = 100, offset: int = 0, session_id: Optional[str] = None ): """Lista documentos no sistema.""" try: db = get_db_manager() docs = db.list_documents(session_id=session_id, limit=limit, offset=offset) return [ DocumentResponse( id=doc['id'], title=doc['title'], content=doc.get('content'), chunk_count=doc.get('chunk_count', 0), created_at=doc.get('created_at'), metadata=doc.get('metadata') ) for doc in docs ] except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.delete("/api/v1/documents/{document_id}", dependencies=[Depends(verify_api_key)]) async def delete_document(document_id: int): """Deleta documento do sistema.""" try: db = get_db_manager() success = db.delete_document(document_id) if not success: raise HTTPException(status_code=404, detail="Document not found") return {"message": "Document deleted successfully", "document_id": document_id} except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/v1/stats", dependencies=[Depends(verify_api_key)]) async def get_stats(): """Retorna estatisticas do sistema.""" try: db = get_db_manager() metadata_manager = get_metadata_manager() db_stats = db.get_database_stats() metadata_stats = metadata_manager.get_documents_count_by_metadata() return { "database": db_stats, "metadata": metadata_stats, "timestamp": datetime.now().isoformat() } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/v1/upload", response_model=IngestResponse, dependencies=[Depends(verify_api_key)]) async def upload_file( file: UploadFile = File(...), chunk_size: int = 1000, chunk_overlap: int = 200, strategy: str = "recursive" ): """ Upload e ingestao de arquivo. Suporta PDF e TXT. """ try: # Salvar arquivo temporariamente with tempfile.NamedTemporaryFile(delete=False, suffix=Path(file.filename).suffix) as tmp: content = await file.read() tmp.write(content) tmp_path = tmp.name # Processar arquivo processor = DocumentProcessor() result = processor.process_file(tmp_path) # Criar request de ingestao ingest_request = IngestRequest( text=result['text'], title=file.filename, chunk_size=chunk_size, chunk_overlap=chunk_overlap, strategy=strategy, metadata={ "document_type": result['file_type'], "upload_date": datetime.now().isoformat() } ) # Processar ingestao response = await ingest_document(ingest_request) # Limpar arquivo temporario Path(tmp_path).unlink(missing_ok=True) return response except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # Error handlers @app.exception_handler(404) async def not_found_handler(request, exc): return JSONResponse( status_code=404, content={"detail": "Not found"} ) @app.exception_handler(500) async def server_error_handler(request, exc): return JSONResponse( status_code=500, content={"detail": "Internal server error"} )