Numidium / app /api /routes /entities.py
Madras1's picture
Upload 77 files
1f9b97a verified
"""
Entity CRUD Routes
"""
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from sqlalchemy import or_
from typing import List, Optional
from app.api.deps import get_scoped_db
from app.models import Entity, Relationship
from app.schemas import EntityCreate, EntityUpdate, EntityResponse, GraphData, GraphNode, GraphEdge
router = APIRouter(prefix="/entities", tags=["Entities"])
@router.get("", response_model=List[EntityResponse])
def list_entities(
type: Optional[str] = None,
search: Optional[str] = None,
project_id: Optional[str] = None,
limit: int = Query(default=50, le=200),
offset: int = 0,
db: Session = Depends(get_scoped_db)
):
"""Lista todas as entidades com filtros opcionais"""
query = db.query(Entity)
if project_id:
query = query.filter(Entity.project_id == project_id)
if type:
query = query.filter(Entity.type == type)
if search:
query = query.filter(
or_(
Entity.name.ilike(f"%{search}%"),
Entity.description.ilike(f"%{search}%")
)
)
query = query.order_by(Entity.created_at.desc())
return query.offset(offset).limit(limit).all()
@router.get("/types")
def get_entity_types(db: Session = Depends(get_scoped_db)):
"""Retorna todos os tipos de entidade únicos"""
types = db.query(Entity.type).distinct().all()
return [t[0] for t in types]
@router.get("/suggest-merge")
async def suggest_merge_candidates(
limit: int = Query(default=10, le=50),
db: Session = Depends(get_scoped_db)
):
"""
Use LLM to find potential duplicate entities that could be merged.
Returns pairs of entities that might be the same.
"""
import httpx
import json
import re
from app.config import settings
# Get all entities
entities = db.query(Entity).order_by(Entity.name).limit(200).all()
if len(entities) < 2:
return {"candidates": [], "message": "Not enough entities to compare"}
# Build entity list for LLM
entity_list = []
for e in entities:
aliases = (e.properties or {}).get("aliases", [])
entity_list.append({
"id": e.id,
"name": e.name,
"type": e.type,
"aliases": aliases[:5] if aliases else []
})
# Ask LLM to find duplicates
prompt = f"""Analise esta lista de entidades e encontre possíveis DUPLICATAS (mesma pessoa/organização/local com nomes diferentes).
Entidades:
{entity_list[:100]}
Retorne APENAS um JSON válido com pares de IDs que são provavelmente a mesma entidade:
```json
{{
"duplicates": [
{{
"id1": "uuid1",
"id2": "uuid2",
"confidence": 0.95,
"reason": "Mesmo nome com variação"
}}
]
}}
```
Se não houver duplicatas, retorne: {{"duplicates": []}}
"""
try:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
"https://api.cerebras.ai/v1/chat/completions",
headers={
"Authorization": f"Bearer {settings.cerebras_api_key}",
"Content-Type": "application/json"
},
json={
"model": "zai-glm-4.7",
"messages": [
{"role": "system", "content": "Você é um especialista em detecção de entidades duplicadas. Responda apenas em JSON válido."},
{"role": "user", "content": prompt}
],
"temperature": 0.1,
"max_tokens": 1024
}
)
if response.status_code != 200:
return {"candidates": [], "error": "LLM API error"}
data = response.json()
content = data["choices"][0]["message"]["content"]
# Parse JSON from response
json_match = re.search(r'\{.*\}', content, re.DOTALL)
if json_match:
result = json.loads(json_match.group(0))
# Enrich with entity names
candidates = []
for dup in result.get("duplicates", [])[:limit]:
e1 = next((e for e in entities if e.id == dup.get("id1")), None)
e2 = next((e for e in entities if e.id == dup.get("id2")), None)
if e1 and e2:
candidates.append({
"entity1": {"id": e1.id, "name": e1.name, "type": e1.type},
"entity2": {"id": e2.id, "name": e2.name, "type": e2.type},
"confidence": dup.get("confidence", 0.5),
"reason": dup.get("reason", "Possível duplicata")
})
return {"candidates": candidates}
return {"candidates": [], "message": "No duplicates found"}
except Exception as e:
return {"candidates": [], "error": str(e)}
@router.get("/{entity_id}", response_model=EntityResponse)
def get_entity(entity_id: str, db: Session = Depends(get_scoped_db)):
"""Busca uma entidade por ID"""
entity = db.query(Entity).filter(Entity.id == entity_id).first()
if not entity:
raise HTTPException(status_code=404, detail="Entity not found")
return entity
@router.post("", response_model=EntityResponse, status_code=201)
def create_entity(entity: EntityCreate, db: Session = Depends(get_scoped_db)):
"""Cria uma nova entidade"""
db_entity = Entity(**entity.model_dump())
db.add(db_entity)
db.commit()
db.refresh(db_entity)
return db_entity
@router.put("/{entity_id}", response_model=EntityResponse)
def update_entity(entity_id: str, entity: EntityUpdate, db: Session = Depends(get_scoped_db)):
"""Atualiza uma entidade existente"""
db_entity = db.query(Entity).filter(Entity.id == entity_id).first()
if not db_entity:
raise HTTPException(status_code=404, detail="Entity not found")
update_data = entity.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(db_entity, field, value)
db.commit()
db.refresh(db_entity)
return db_entity
@router.delete("/{entity_id}")
def delete_entity(entity_id: str, db: Session = Depends(get_scoped_db)):
"""Deleta uma entidade"""
db_entity = db.query(Entity).filter(Entity.id == entity_id).first()
if not db_entity:
raise HTTPException(status_code=404, detail="Entity not found")
# Delete related relationships
db.query(Relationship).filter(
or_(
Relationship.source_id == entity_id,
Relationship.target_id == entity_id
)
).delete()
db.delete(db_entity)
db.commit()
return {"message": "Entity deleted"}
@router.get("/{entity_id}/connections", response_model=GraphData)
def get_entity_connections(
entity_id: str,
depth: int = Query(default=1, le=3),
db: Session = Depends(get_scoped_db)
):
"""
Retorna o grafo de conexões de uma entidade
Usado para visualização de rede no frontend
"""
entity = db.query(Entity).filter(Entity.id == entity_id).first()
if not entity:
raise HTTPException(status_code=404, detail="Entity not found")
nodes = {}
edges = []
visited = set()
def explore(eid: str, current_depth: int):
if current_depth > depth or eid in visited:
return
visited.add(eid)
e = db.query(Entity).filter(Entity.id == eid).first()
if not e:
return
nodes[e.id] = GraphNode(
id=e.id,
type=e.type,
name=e.name,
properties=e.properties or {}
)
# Outgoing relationships
for rel in db.query(Relationship).filter(Relationship.source_id == eid).all():
edges.append(GraphEdge(
source=rel.source_id,
target=rel.target_id,
type=rel.type,
confidence=rel.confidence
))
explore(rel.target_id, current_depth + 1)
# Incoming relationships
for rel in db.query(Relationship).filter(Relationship.target_id == eid).all():
edges.append(GraphEdge(
source=rel.source_id,
target=rel.target_id,
type=rel.type,
confidence=rel.confidence
))
explore(rel.source_id, current_depth + 1)
explore(entity_id, 0)
return GraphData(
nodes=list(nodes.values()),
edges=edges
)
@router.post("/merge")
def merge_entities(
primary_id: str,
secondary_id: str,
db: Session = Depends(get_scoped_db)
):
"""
Merge two entities into one.
The primary entity is kept, the secondary is deleted.
All relationships from secondary are transferred to primary.
"""
if primary_id == secondary_id:
raise HTTPException(status_code=400, detail="Cannot merge entity with itself")
primary = db.query(Entity).filter(Entity.id == primary_id).first()
secondary = db.query(Entity).filter(Entity.id == secondary_id).first()
if not primary:
raise HTTPException(status_code=404, detail="Primary entity not found")
if not secondary:
raise HTTPException(status_code=404, detail="Secondary entity not found")
# Merge properties
primary_props = primary.properties or {}
secondary_props = secondary.properties or {}
# Add aliases from secondary
aliases = primary_props.get("aliases", []) or []
if secondary.name not in aliases:
aliases.append(secondary.name)
secondary_aliases = secondary_props.get("aliases", []) or []
for alias in secondary_aliases:
if alias not in aliases:
aliases.append(alias)
primary_props["aliases"] = aliases
# Add merge history
merge_history = primary_props.get("merged_from", []) or []
merge_history.append({
"id": secondary.id,
"name": secondary.name,
"source": secondary.source
})
primary_props["merged_from"] = merge_history
# Combine descriptions if primary has none
if not primary.description and secondary.description:
primary.description = secondary.description
primary.properties = primary_props
# Transfer relationships from secondary to primary
# Update source_id
db.query(Relationship).filter(
Relationship.source_id == secondary_id
).update({"source_id": primary_id})
# Update target_id
db.query(Relationship).filter(
Relationship.target_id == secondary_id
).update({"target_id": primary_id})
# Delete duplicate relationships (same source, target, type)
# This is a simple approach - in production you'd want more sophisticated deduplication
# Delete the secondary entity
db.delete(secondary)
db.commit()
db.refresh(primary)
return {
"message": f"Merged '{secondary.name}' into '{primary.name}'",
"primary": {
"id": primary.id,
"name": primary.name,
"aliases": aliases
}
}