Numidium / app /api /routes /aethermap.py
Madras1's picture
Upload 79 files
d2fb3fb verified
"""
AetherMap Routes - Document Mapping & Semantic Search
Integrates with AetherMap API for document clustering, NER, and semantic search.
"""
from fastapi import APIRouter, HTTPException, UploadFile, File, Form, Depends
from pydantic import BaseModel, Field
from typing import Optional, List, Dict, Any
from sqlalchemy.orm import Session
import io
from app.api.deps import get_scoped_db
from app.services.aethermap_client import aethermap, ProcessResult, SearchResult, EntityGraphResult
router = APIRouter()
# ============================================================================
# Request/Response Models
# ============================================================================
class IndexDocumentsRequest(BaseModel):
"""Request to index documents from text list"""
documents: List[str] = Field(..., description="Lista de textos para indexar")
fast_mode: bool = Field(True, description="Modo rápido (PCA) ou preciso (UMAP)")
class IndexEntitiesRequest(BaseModel):
"""Request to index entities from NUMIDIUM database"""
entity_types: Optional[List[str]] = Field(None, description="Filtrar por tipos de entidade")
limit: int = Field(500, description="Limite de entidades")
class SemanticSearchRequest(BaseModel):
"""Request for semantic search"""
query: str = Field(..., description="Termo de busca")
turbo_mode: bool = Field(True, description="Modo turbo (mais rápido)")
class IndexResponse(BaseModel):
"""Response from indexing"""
job_id: str
num_documents: int
num_clusters: int
num_noise: int
metrics: Dict[str, Any] = {}
cluster_analysis: Dict[str, Any] = {}
class SearchResponse(BaseModel):
"""Response from search"""
summary: str
results: List[Dict[str, Any]] = []
class EntityGraphResponse(BaseModel):
"""Response from NER extraction"""
hubs: List[Dict[str, Any]] = []
insights: Dict[str, Any] = {}
node_count: int = 0
edge_count: int = 0
class StatusResponse(BaseModel):
"""AetherMap status"""
connected: bool
job_id: Optional[str] = None
documents_indexed: int = 0
# ============================================================================
# Endpoints
# ============================================================================
@router.get("/status", response_model=StatusResponse)
async def get_status():
"""
Get AetherMap connection status.
"""
return StatusResponse(
connected=True,
job_id=aethermap.current_job_id,
documents_indexed=0 # TODO: track this
)
@router.post("/index", response_model=IndexResponse)
async def index_documents(request: IndexDocumentsRequest):
"""
Index a list of documents for semantic search.
The documents will be:
- Embedded using sentence transformers
- Clustered using HDBSCAN
- Indexed in FAISS + BM25 for hybrid search
"""
try:
if not request.documents:
raise HTTPException(status_code=400, detail="Nenhum documento fornecido")
result = await aethermap.process_documents(
texts=request.documents,
fast_mode=request.fast_mode
)
return IndexResponse(
job_id=result.job_id,
num_documents=result.num_documents,
num_clusters=result.num_clusters,
num_noise=result.num_noise,
metrics=result.metrics,
cluster_analysis=result.cluster_analysis
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/index-entities", response_model=IndexResponse)
async def index_entities(
request: IndexEntitiesRequest,
db: Session = Depends(get_scoped_db)
):
"""
Index entities from NUMIDIUM database.
Collects entity names and descriptions, sends to AetherMap for processing.
"""
from app.models.entity import Entity
try:
query = db.query(Entity)
if request.entity_types:
query = query.filter(Entity.type.in_(request.entity_types))
entities = query.limit(request.limit).all()
if not entities:
raise HTTPException(status_code=404, detail="Nenhuma entidade encontrada")
# Build text representations
documents = []
for e in entities:
text = f"{e.name} ({e.type})"
if e.description:
text += f": {e.description[:1000]}"
documents.append(text)
result = await aethermap.process_documents(
texts=documents,
fast_mode=request.fast_mode if hasattr(request, 'fast_mode') else True
)
return IndexResponse(
job_id=result.job_id,
num_documents=result.num_documents,
num_clusters=result.num_clusters,
num_noise=result.num_noise,
metrics=result.metrics,
cluster_analysis=result.cluster_analysis
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/upload", response_model=IndexResponse)
async def upload_documents(
file: UploadFile = File(...),
fast_mode: bool = Form(True)
):
"""
Upload a file (TXT or CSV) for indexing.
- TXT: One document per line
- CSV: Will use first text column found
"""
try:
content = await file.read()
text = content.decode('utf-8', errors='ignore')
# Split by lines for TXT
documents = [line.strip() for line in text.splitlines() if line.strip()]
if not documents:
raise HTTPException(status_code=400, detail="Arquivo vazio ou sem texto válido")
result = await aethermap.process_documents(
texts=documents,
fast_mode=fast_mode
)
return IndexResponse(
job_id=result.job_id,
num_documents=result.num_documents,
num_clusters=result.num_clusters,
num_noise=result.num_noise,
metrics=result.metrics,
cluster_analysis=result.cluster_analysis
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/search", response_model=SearchResponse)
async def semantic_search(request: SemanticSearchRequest):
"""
Semantic search in indexed documents.
Uses hybrid RAG (FAISS + BM25 + reranking + LLM).
Returns a summary answering the query with citations.
"""
try:
if not aethermap.current_job_id:
raise HTTPException(status_code=400, detail="Nenhum documento indexado. Use /index primeiro.")
result = await aethermap.semantic_search(
query=request.query,
turbo_mode=request.turbo_mode
)
return SearchResponse(
summary=result.summary,
results=result.results
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/entities", response_model=EntityGraphResponse)
async def extract_entities():
"""
Extract named entities (NER) from indexed documents.
Returns:
- Hub entities (most connected)
- Relationship insights
- Graph metrics
"""
try:
if not aethermap.current_job_id:
raise HTTPException(status_code=400, detail="Nenhum documento indexado. Use /index primeiro.")
result = await aethermap.extract_entities()
return EntityGraphResponse(
hubs=result.hubs,
insights=result.insights,
node_count=len(result.nodes),
edge_count=len(result.edges)
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/analyze")
async def analyze_graph():
"""
Analyze entity graph using LLM.
Returns semantic insights about relationships and patterns.
"""
try:
if not aethermap.current_job_id:
raise HTTPException(status_code=400, detail="Nenhum documento indexado. Use /index primeiro.")
result = await aethermap.analyze_graph()
return {
"analysis": result.analysis,
"key_entities": result.key_entities,
"relationships": result.relationships
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/describe-clusters")
async def describe_clusters():
"""
Get LLM descriptions for each cluster found.
"""
try:
if not aethermap.current_job_id:
raise HTTPException(status_code=400, detail="Nenhum documento indexado. Use /index primeiro.")
result = await aethermap.describe_clusters()
return result
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))