Spaces:
Sleeping
Sleeping
| """ | |
| API REST para RAG Template usando FastAPI. | |
| Endpoints principais: | |
| - POST /api/v1/ingest - Ingestao de documentos | |
| - POST /api/v1/query - Query RAG | |
| - GET /api/v1/documents - Listar documentos | |
| - DELETE /api/v1/documents/{id} - Deletar documento | |
| - GET /api/v1/health - Health check | |
| """ | |
| from typing import List, Optional, Dict, Any | |
| from datetime import datetime | |
| from fastapi import FastAPI, HTTPException, Depends, status, File, UploadFile, Header | |
| from fastapi.responses import JSONResponse | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel, Field | |
| import os | |
| import tempfile | |
| from pathlib import Path | |
| from src.database import DatabaseManager | |
| from src.embeddings import EmbeddingManager | |
| from src.generation import GenerationManager | |
| from src.document_processing import DocumentProcessor | |
| from src.chunking import ChunkingStrategy | |
| from src.metadata import MetadataManager, DocumentMetadata | |
| from src.config import DATABASE_URL | |
| from src.monitoring import get_metrics_collector, get_tracing_manager | |
| # Schemas Pydantic | |
| class IngestRequest(BaseModel): | |
| """Request para ingestao de documento.""" | |
| text: str = Field(..., description="Texto do documento") | |
| title: str = Field(..., description="Titulo do documento") | |
| chunk_size: int = Field(default=1000, ge=100, le=5000, description="Tamanho dos chunks") | |
| chunk_overlap: int = Field(default=200, ge=0, le=1000, description="Overlap entre chunks") | |
| strategy: str = Field(default="recursive", description="Estrategia de chunking") | |
| metadata: Optional[Dict[str, Any]] = Field(default=None, description="Metadados do documento") | |
| class IngestResponse(BaseModel): | |
| """Response da ingestao.""" | |
| document_id: int | |
| num_chunks: int | |
| message: str | |
| metadata: Optional[Dict[str, Any]] = None | |
| class QueryRequest(BaseModel): | |
| """Request para query RAG.""" | |
| query: str = Field(..., min_length=1, description="Query do usuario") | |
| top_k: int = Field(default=5, ge=1, le=20, description="Numero de resultados") | |
| temperature: float = Field(default=0.3, ge=0.0, le=2.0, description="Temperature para geracao") | |
| max_tokens: int = Field(default=512, ge=50, le=2048, description="Tokens maximos") | |
| model: Optional[str] = Field(default=None, description="Modelo LLM a usar") | |
| filters: Optional[Dict[str, Any]] = Field(default=None, description="Filtros de metadata") | |
| class QueryResponse(BaseModel): | |
| """Response da query.""" | |
| query: str | |
| response: str | |
| contexts: List[Dict[str, Any]] | |
| metadata: Dict[str, Any] | |
| class DocumentResponse(BaseModel): | |
| """Response de documento.""" | |
| id: int | |
| title: str | |
| content: Optional[str] = None | |
| chunk_count: int | |
| created_at: Optional[str] = None | |
| metadata: Optional[Dict[str, Any]] = None | |
| class HealthResponse(BaseModel): | |
| """Response do health check.""" | |
| status: str | |
| timestamp: str | |
| database: str | |
| embeddings: str | |
| version: str | |
| # API Key Authentication | |
| API_KEYS = set(os.getenv("API_KEYS", "").split(",")) | |
| async def verify_api_key(x_api_key: str = Header(...)): | |
| """Verifica API key.""" | |
| if not API_KEYS or x_api_key in API_KEYS: | |
| return x_api_key | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Invalid API key" | |
| ) | |
| # FastAPI App | |
| app = FastAPI( | |
| title="RAG Template API", | |
| description="API REST para sistema RAG com PostgreSQL + pgvector", | |
| version="1.6.0", | |
| docs_url="/api/docs", | |
| redoc_url="/api/redoc", | |
| openapi_url="/api/openapi.json" | |
| ) | |
| # CORS | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Monitoring | |
| metrics_collector = get_metrics_collector() | |
| tracing_manager = get_tracing_manager( | |
| service_name="rag-template-api", | |
| otlp_endpoint=os.getenv("OTLP_ENDPOINT") | |
| ) | |
| # Inicializar managers (lazy loading) | |
| _db_manager = None | |
| _embedding_manager = None | |
| _generation_manager = None | |
| _metadata_manager = None | |
| def get_db_manager(): | |
| """Obtem instancia do DatabaseManager.""" | |
| global _db_manager | |
| if _db_manager is None: | |
| _db_manager = DatabaseManager(DATABASE_URL) | |
| return _db_manager | |
| def get_embedding_manager(): | |
| """Obtem instancia do EmbeddingManager.""" | |
| global _embedding_manager | |
| if _embedding_manager is None: | |
| _embedding_manager = EmbeddingManager() | |
| return _embedding_manager | |
| def get_generation_manager(): | |
| """Obtem instancia do GenerationManager.""" | |
| global _generation_manager | |
| if _generation_manager is None: | |
| _generation_manager = GenerationManager() | |
| return _generation_manager | |
| def get_metadata_manager(): | |
| """Obtem instancia do MetadataManager.""" | |
| global _metadata_manager | |
| if _metadata_manager is None: | |
| _metadata_manager = MetadataManager(get_db_manager()) | |
| return _metadata_manager | |
| # Endpoints | |
| async def health_check(): | |
| """Health check do sistema.""" | |
| db_status = "healthy" | |
| embeddings_status = "healthy" | |
| try: | |
| db = get_db_manager() | |
| db.get_database_stats() | |
| except Exception: | |
| db_status = "unhealthy" | |
| try: | |
| emb = get_embedding_manager() | |
| emb.encode("test") | |
| except Exception: | |
| embeddings_status = "unhealthy" | |
| status_overall = "healthy" if db_status == "healthy" and embeddings_status == "healthy" else "degraded" | |
| return HealthResponse( | |
| status=status_overall, | |
| timestamp=datetime.now().isoformat(), | |
| database=db_status, | |
| embeddings=embeddings_status, | |
| version="1.6.0" | |
| ) | |
| async def metrics(): | |
| """ | |
| Endpoint de metricas Prometheus. | |
| Retorna metricas no formato Prometheus. | |
| """ | |
| from fastapi.responses import Response | |
| from prometheus_client import CONTENT_TYPE_LATEST | |
| metrics_data = metrics_collector.get_metrics() | |
| return Response(content=metrics_data, media_type=CONTENT_TYPE_LATEST) | |
| async def ingest_document(request: IngestRequest): | |
| """ | |
| Ingere documento no sistema. | |
| Processa texto, divide em chunks, gera embeddings e armazena no banco. | |
| """ | |
| try: | |
| db = get_db_manager() | |
| emb = get_embedding_manager() | |
| metadata_manager = get_metadata_manager() | |
| # Criar chunking strategy | |
| strategy_map = { | |
| "fixed": ChunkingStrategy.FIXED_SIZE, | |
| "sentence": ChunkingStrategy.SENTENCE, | |
| "semantic": ChunkingStrategy.SEMANTIC, | |
| "recursive": ChunkingStrategy.RECURSIVE | |
| } | |
| strategy = strategy_map.get(request.strategy, ChunkingStrategy.RECURSIVE) | |
| # Processar chunks | |
| from src.chunking import chunk_text | |
| chunks = chunk_text( | |
| request.text, | |
| strategy=strategy, | |
| chunk_size=request.chunk_size, | |
| overlap=request.chunk_overlap | |
| ) | |
| if not chunks: | |
| raise HTTPException(status_code=400, detail="No chunks generated from text") | |
| # Gerar embeddings | |
| chunk_texts = [c.text for c in chunks] | |
| embeddings = emb.encode_batch(chunk_texts) | |
| # Criar metadata | |
| doc_metadata = None | |
| if request.metadata: | |
| doc_metadata = DocumentMetadata.from_dict(request.metadata) | |
| metadata_manager.validate_metadata(doc_metadata) | |
| # Inserir no banco | |
| session_id = f"api_{datetime.now().timestamp()}" | |
| document_id = db.insert_document( | |
| title=request.title, | |
| content=request.text, | |
| chunks=chunk_texts, | |
| embeddings=embeddings, | |
| session_id=session_id | |
| ) | |
| # Atualizar metadata se fornecido | |
| if doc_metadata: | |
| metadata_manager.update_document_metadata(document_id, doc_metadata) | |
| return IngestResponse( | |
| document_id=document_id, | |
| num_chunks=len(chunks), | |
| message="Document ingested successfully", | |
| metadata=request.metadata | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def query_rag(request: QueryRequest): | |
| """ | |
| Executa query RAG. | |
| Busca contextos relevantes e gera resposta usando LLM. | |
| """ | |
| try: | |
| db = get_db_manager() | |
| emb = get_embedding_manager() | |
| gen = get_generation_manager() | |
| metadata_manager = get_metadata_manager() | |
| # Gerar embedding da query | |
| query_embedding = emb.encode(request.query) | |
| # Buscar contextos (com filtros se fornecido) | |
| if request.filters: | |
| contexts = metadata_manager.search_with_filters( | |
| query_embedding=query_embedding, | |
| filters=request.filters, | |
| top_k=request.top_k | |
| ) | |
| else: | |
| contexts = db.search_similar( | |
| query_embedding=query_embedding, | |
| top_k=request.top_k | |
| ) | |
| if not contexts: | |
| return QueryResponse( | |
| query=request.query, | |
| response="Desculpe, nao encontrei informacoes relevantes.", | |
| contexts=[], | |
| metadata={"num_contexts": 0, "model": request.model or "default"} | |
| ) | |
| # Gerar resposta | |
| context_texts = [c['content'] for c in contexts] | |
| response = gen.generate_response( | |
| query=request.query, | |
| contexts=context_texts, | |
| temperature=request.temperature, | |
| max_tokens=request.max_tokens, | |
| model=request.model | |
| ) | |
| return QueryResponse( | |
| query=request.query, | |
| response=response, | |
| contexts=contexts, | |
| metadata={ | |
| "num_contexts": len(contexts), | |
| "model": request.model or "default", | |
| "temperature": request.temperature, | |
| "max_tokens": request.max_tokens | |
| } | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def list_documents( | |
| limit: int = 100, | |
| offset: int = 0, | |
| session_id: Optional[str] = None | |
| ): | |
| """Lista documentos no sistema.""" | |
| try: | |
| db = get_db_manager() | |
| docs = db.list_documents(session_id=session_id, limit=limit, offset=offset) | |
| return [ | |
| DocumentResponse( | |
| id=doc['id'], | |
| title=doc['title'], | |
| content=doc.get('content'), | |
| chunk_count=doc.get('chunk_count', 0), | |
| created_at=doc.get('created_at'), | |
| metadata=doc.get('metadata') | |
| ) | |
| for doc in docs | |
| ] | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def delete_document(document_id: int): | |
| """Deleta documento do sistema.""" | |
| try: | |
| db = get_db_manager() | |
| success = db.delete_document(document_id) | |
| if not success: | |
| raise HTTPException(status_code=404, detail="Document not found") | |
| return {"message": "Document deleted successfully", "document_id": document_id} | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_stats(): | |
| """Retorna estatisticas do sistema.""" | |
| try: | |
| db = get_db_manager() | |
| metadata_manager = get_metadata_manager() | |
| db_stats = db.get_database_stats() | |
| metadata_stats = metadata_manager.get_documents_count_by_metadata() | |
| return { | |
| "database": db_stats, | |
| "metadata": metadata_stats, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def upload_file( | |
| file: UploadFile = File(...), | |
| chunk_size: int = 1000, | |
| chunk_overlap: int = 200, | |
| strategy: str = "recursive" | |
| ): | |
| """ | |
| Upload e ingestao de arquivo. | |
| Suporta PDF e TXT. | |
| """ | |
| try: | |
| # Salvar arquivo temporariamente | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=Path(file.filename).suffix) as tmp: | |
| content = await file.read() | |
| tmp.write(content) | |
| tmp_path = tmp.name | |
| # Processar arquivo | |
| processor = DocumentProcessor() | |
| result = processor.process_file(tmp_path) | |
| # Criar request de ingestao | |
| ingest_request = IngestRequest( | |
| text=result['text'], | |
| title=file.filename, | |
| chunk_size=chunk_size, | |
| chunk_overlap=chunk_overlap, | |
| strategy=strategy, | |
| metadata={ | |
| "document_type": result['file_type'], | |
| "upload_date": datetime.now().isoformat() | |
| } | |
| ) | |
| # Processar ingestao | |
| response = await ingest_document(ingest_request) | |
| # Limpar arquivo temporario | |
| Path(tmp_path).unlink(missing_ok=True) | |
| return response | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # Error handlers | |
| async def not_found_handler(request, exc): | |
| return JSONResponse( | |
| status_code=404, | |
| content={"detail": "Not found"} | |
| ) | |
| async def server_error_handler(request, exc): | |
| return JSONResponse( | |
| status_code=500, | |
| content={"detail": "Internal server error"} | |
| ) | |