Numidium / app /services /investigator_agent.py
Madras1's picture
Upload 79 files
d2fb3fb verified
"""
Investigator Agent - Autonomous Investigation with Tool Calling
Uses Cerebras native tool calling for multi-source investigations
"""
import json
import re
import httpx
from typing import Optional, List, Dict, Any
from dataclasses import dataclass, field
from datetime import datetime
from sqlalchemy.orm import Session
from app.config import settings
from app.services import lancer
from app.services.brazil_apis import consultar_cnpj
from app.models.entity import Entity, Relationship
def sanitize_text(text: str) -> str:
"""
Clean up text from model that may contain thinking artifacts.
Only removes thinking tags, does NOT remove valid characters.
"""
if not text:
return text
# Remove thinking tags and content between them
text = re.sub(r'<think>.*?</think>', '', text, flags=re.DOTALL)
text = re.sub(r'<\|think\|>.*?<\|/think\|>', '', text, flags=re.DOTALL)
# Remove other common model artifacts like <|...|> tags
text = re.sub(r'<\|.*?\|>', '', text)
# Clean up excessive newlines only
text = re.sub(r'\n{3,}', '\n\n', text)
return text.strip()
@dataclass
class Finding:
"""A discovery made during investigation"""
title: str
content: str
source: str
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
@dataclass
class InvestigationResult:
"""Complete investigation result"""
mission: str
findings: List[Finding]
entities_discovered: List[Dict[str, Any]]
connections_mapped: List[Dict[str, Any]]
report: str
iterations: int
tools_used: List[str]
status: str = "completed"
# Tool definitions for Cerebras API
TOOLS = [
{
"type": "function",
"function": {
"name": "search_entity",
"description": "Buscar entidade no NUMIDIUM (grafo de conhecimento) por nome. Use para encontrar pessoas, empresas ou locais já conhecidos.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Nome ou termo para buscar"
},
"entity_type": {
"type": "string",
"enum": ["person", "organization", "location", "any"],
"description": "Tipo de entidade (opcional)"
}
},
"required": ["query"]
}
}
},
{
"type": "function",
"function": {
"name": "get_connections",
"description": "Obter a rede de conexões de uma entidade específica. Retorna entidades relacionadas.",
"parameters": {
"type": "object",
"properties": {
"entity_id": {
"type": "string",
"description": "ID da entidade no NUMIDIUM"
}
},
"required": ["entity_id"]
}
}
},
{
"type": "function",
"function": {
"name": "lookup_cnpj",
"description": "Consultar dados de uma empresa brasileira pelo CNPJ. Retorna razão social, sócios, endereço, CNAEs, etc.",
"parameters": {
"type": "object",
"properties": {
"cnpj": {
"type": "string",
"description": "CNPJ da empresa (com ou sem formatação)"
}
},
"required": ["cnpj"]
}
}
},
{
"type": "function",
"function": {
"name": "web_search",
"description": "Pesquisar informações na web. Use para buscar notícias, artigos e informações públicas.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Termo de busca"
},
"freshness": {
"type": "string",
"enum": ["day", "week", "month", "any"],
"description": "Frescor dos resultados",
"default": "any"
}
},
"required": ["query"]
}
}
},
{
"type": "function",
"function": {
"name": "deep_research",
"description": "Pesquisa profunda e multi-dimensional sobre um tema. Use para tópicos complexos.",
"parameters": {
"type": "object",
"properties": {
"topic": {
"type": "string",
"description": "Tópico para pesquisa profunda"
}
},
"required": ["topic"]
}
}
},
{
"type": "function",
"function": {
"name": "save_finding",
"description": "Salvar uma descoberta importante da investigação.",
"parameters": {
"type": "object",
"properties": {
"title": {
"type": "string",
"description": "Título curto da descoberta"
},
"content": {
"type": "string",
"description": "Conteúdo detalhado"
},
"source": {
"type": "string",
"description": "Fonte da informação"
}
},
"required": ["title", "content", "source"]
}
}
},
{
"type": "function",
"function": {
"name": "finish_investigation",
"description": "Finalizar a investigação e gerar o relatório final.",
"parameters": {
"type": "object",
"properties": {
"summary": {
"type": "string",
"description": "Resumo das descobertas principais"
}
},
"required": ["summary"]
}
}
}
]
SYSTEM_PROMPT = """Você é um agente investigador autônomo do sistema NUMIDIUM/AVANGARD. /no_think
Sua missão é investigar temas usando múltiplas fontes de dados:
- NUMIDIUM: Grafo de conhecimento com entidades e relacionamentos
- Consulta CNPJ: Dados oficiais de empresas brasileiras (BrasilAPI)
- Web Search: Pesquisa na internet via Lancer
## Estratégia de Investigação:
1. Comece buscando no NUMIDIUM se já temos informações sobre o alvo
2. Para empresas brasileiras, consulte o CNPJ para obter sócios e dados
3. Use web_search para buscar notícias e informações públicas
4. Para cada sócio/conexão descoberta, considere investigar mais a fundo
5. Use save_finding para registrar descobertas importantes
6. Quando tiver informações suficientes, use finish_investigation
## Regras:
- Seja metódico e siga pistas
- Não invente informações - use apenas dados das ferramentas
- Priorize qualidade sobre quantidade
- Cite sempre as fontes
- NÃO use pensamento interno ou tags <think>. Responda diretamente."""
class InvestigatorAgent:
"""Autonomous investigation agent with tool calling"""
def __init__(self):
self.api_url = "https://api.cerebras.ai/v1/chat/completions"
self.api_key = settings.cerebras_api_key
self.model = "zai-glm-4.7"
# Investigation state
self.findings: List[Finding] = []
self.entities_discovered: List[Dict[str, Any]] = []
self.connections_mapped: List[Dict[str, Any]] = []
self.tools_used: List[str] = []
self.messages: List[Dict[str, Any]] = []
self.db: Optional[Session] = None
def _reset_state(self):
"""Reset investigation state"""
self.findings = []
self.entities_discovered = []
self.connections_mapped = []
self.tools_used = []
self.messages = []
async def _call_llm(
self,
messages: List[Dict[str, Any]],
tools: List[Dict] = None
) -> Dict[str, Any]:
"""Call Cerebras API with tool calling support"""
try:
payload = {
"model": self.model,
"messages": messages,
"temperature": 0.3,
"max_tokens": 2048,
}
if tools:
payload["tools"] = tools
payload["tool_choice"] = "auto"
payload["parallel_tool_calls"] = True
async with httpx.AsyncClient(timeout=60.0) as client:
response = await client.post(
self.api_url,
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
},
json=payload
)
if response.status_code != 200:
raise Exception(f"API error: {response.status_code} - {response.text}")
return response.json()
except Exception as e:
raise Exception(f"LLM call failed: {str(e)}")
async def _execute_tool(self, tool_name: str, arguments: Dict) -> str:
"""Execute a tool and return the result"""
self.tools_used.append(tool_name)
try:
if tool_name == "search_entity":
return await self._search_entity(
arguments.get("query", ""),
arguments.get("entity_type")
)
elif tool_name == "get_connections":
return await self._get_connections(arguments.get("entity_id"))
elif tool_name == "lookup_cnpj":
return await self._lookup_cnpj(arguments.get("cnpj", ""))
elif tool_name == "web_search":
return await self._web_search(
arguments.get("query", ""),
arguments.get("freshness", "any")
)
elif tool_name == "deep_research":
return await self._deep_research(arguments.get("topic", ""))
elif tool_name == "aether_search":
return await self._aether_search(arguments.get("query", ""))
elif tool_name == "aether_entities":
return await self._aether_entities()
elif tool_name == "save_finding":
finding = Finding(
title=arguments.get("title", ""),
content=arguments.get("content", ""),
source=arguments.get("source", "")
)
self.findings.append(finding)
return f"Descoberta salva: {finding.title}"
elif tool_name == "finish_investigation":
return f"INVESTIGATION_COMPLETE: {arguments.get('summary', '')}"
else:
return f"Ferramenta desconhecida: {tool_name}"
except Exception as e:
return f"Erro ao executar {tool_name}: {str(e)}"
async def _search_entity(self, query: str, entity_type: Optional[str]) -> str:
"""Search entities in database"""
if not self.db:
return "Erro: Banco de dados não disponível"
q = self.db.query(Entity).filter(Entity.name.ilike(f"%{query}%"))
if entity_type and entity_type != "any":
q = q.filter(Entity.type == entity_type)
entities = q.limit(10).all()
if entities:
result = []
for e in entities:
self.entities_discovered.append({
"id": str(e.id),
"name": e.name,
"type": e.type
})
result.append({
"id": str(e.id),
"name": e.name,
"type": e.type,
"description": e.description[:200] if e.description else None
})
return json.dumps(result, ensure_ascii=False, indent=2)
return "Nenhuma entidade encontrada no NUMIDIUM."
async def _get_connections(self, entity_id: str) -> str:
"""Get entity connections"""
if not self.db:
return "Erro: Banco de dados não disponível"
relationships = self.db.query(Relationship).filter(
(Relationship.source_id == entity_id) | (Relationship.target_id == entity_id)
).limit(20).all()
if relationships:
connections = []
for rel in relationships:
source = self.db.query(Entity).filter(Entity.id == rel.source_id).first()
target = self.db.query(Entity).filter(Entity.id == rel.target_id).first()
if source and target:
connections.append({
"source": source.name,
"target": target.name,
"type": rel.type
})
return json.dumps(connections, ensure_ascii=False, indent=2)
return "Nenhuma conexão encontrada."
async def _lookup_cnpj(self, cnpj: str) -> str:
"""Lookup CNPJ via BrasilAPI"""
cnpj_clean = cnpj.replace(".", "").replace("/", "").replace("-", "")
result = await consultar_cnpj(cnpj_clean)
if result:
data = {
"razao_social": result.razao_social,
"nome_fantasia": result.nome_fantasia,
"situacao": result.situacao,
"data_abertura": result.data_abertura,
"capital_social": result.capital_social,
"endereco": f"{result.logradouro}, {result.numero} - {result.cidade}/{result.uf}",
"cnae": f"{result.cnae_principal} - {result.cnae_descricao}",
"socios": result.socios
}
return json.dumps(data, ensure_ascii=False, indent=2)
return "CNPJ não encontrado."
async def _lookup_phone(self, phone: str) -> str:
"""Lookup phone number via NumVerify API"""
# Clean phone number - keep only digits
phone_clean = "".join(c for c in phone if c.isdigit())
# NumVerify API key (free tier: 100 req/month)
numverify_key = getattr(settings, 'numverify_api_key', None)
if not numverify_key:
# Fallback: just do a web search for the number
return await self._web_search(f'"{phone_clean}" telefone', "any")
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(
"http://apilayer.net/api/validate",
params={
"access_key": numverify_key,
"number": phone_clean,
"country_code": "", # Auto-detect
"format": 1
}
)
if response.status_code == 200:
data = response.json()
if data.get("valid"):
result = {
"numero": data.get("international_format"),
"valido": True,
"pais": data.get("country_name"),
"codigo_pais": data.get("country_code"),
"operadora": data.get("carrier"),
"tipo_linha": data.get("line_type"), # mobile, landline, etc
"localizacao": data.get("location")
}
return json.dumps(result, ensure_ascii=False, indent=2)
else:
return f"Número {phone_clean} não é válido ou não foi encontrado."
return "Erro ao consultar número."
except Exception as e:
# Fallback to web search
return await self._web_search(f'"{phone_clean}" telefone', "any")
async def _web_search(self, query: str, freshness: str) -> str:
"""Web search via Lancer"""
try:
result = await lancer.search(query, max_results=5, freshness=freshness)
if result.answer:
return f"Resumo: {result.answer}\n\nFontes: {len(result.results)} resultados"
return "Nenhum resultado encontrado."
except Exception as e:
return f"Erro na busca web: {str(e)}"
async def _deep_research(self, topic: str) -> str:
"""Deep research via Lancer"""
try:
result = await lancer.deep_research(topic, max_dimensions=3)
if result.answer:
return result.answer
return "Pesquisa profunda não retornou resultados."
except Exception as e:
return f"Erro na pesquisa: {str(e)}"
async def _aether_search(self, query: str) -> str:
"""Semantic search via AetherMap"""
try:
# Check if we have a job_id cached
if not aethermap.current_job_id:
# Index entities from database first
if self.db:
entities = self.db.query(Entity).limit(500).all()
if entities:
texts = []
for e in entities:
text = f"{e.name} ({e.type})"
if e.description:
text += f": {e.description[:500]}"
texts.append(text)
if texts:
result = await aethermap.process_documents(texts, fast_mode=True)
# Continue with search
if aethermap.current_job_id:
result = await aethermap.semantic_search(query, turbo_mode=True)
return f"RAG Response:\n{result.summary}"
else:
return "Nenhum documento indexado no AetherMap."
except Exception as e:
return f"Erro no AetherMap search: {str(e)}"
async def _aether_entities(self) -> str:
"""Extract NER entities via AetherMap"""
try:
if not aethermap.current_job_id:
return "Nenhum documento indexado. Use aether_search primeiro."
result = await aethermap.extract_entities()
# Format response
output = []
if result.hubs:
output.append("**Entidades Centrais (Hubs):**")
for hub in result.hubs[:5]:
output.append(f"- {hub.get('entity')} ({hub.get('type')}): {hub.get('degree')} conexões")
if result.insights:
output.append(f"\n**Insights:**")
output.append(f"- Total de conexões: {result.insights.get('total_connections', 0)}")
output.append(f"- Grau médio: {result.insights.get('avg_degree', 0)}")
if result.edges:
output.append(f"\n**Top 5 Relacionamentos:**")
for edge in result.edges[:5]:
output.append(f"- {edge.source_entity} <-> {edge.target_entity}: {edge.reason}")
return "\n".join(output) if output else "Nenhuma entidade significativa encontrada."
except Exception as e:
return f"Erro na extração de entidades: {str(e)}"
async def investigate(
self,
mission: str,
db: Session,
max_iterations: int = 10
) -> InvestigationResult:
"""Main investigation loop"""
self._reset_state()
self.db = db
self.messages = [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": f"Missão de investigação: {mission}\n\nComece a investigação."}
]
iteration = 0
final_summary = ""
while iteration < max_iterations:
iteration += 1
response = await self._call_llm(self.messages, TOOLS)
choice = response["choices"][0]
message = choice["message"]
self.messages.append(message)
tool_calls = message.get("tool_calls", [])
if not tool_calls:
if message.get("content"):
final_summary = message["content"]
break
for tool_call in tool_calls:
func = tool_call["function"]
tool_name = func["name"]
try:
arguments = json.loads(func["arguments"])
except:
arguments = {}
result = await self._execute_tool(tool_name, arguments)
if result.startswith("INVESTIGATION_COMPLETE:"):
final_summary = result.replace("INVESTIGATION_COMPLETE:", "").strip()
break
self.messages.append({
"role": "tool",
"tool_call_id": tool_call["id"],
"content": result
})
if final_summary:
break
if not final_summary:
final_summary = await self._generate_report(mission)
# Sanitize all text outputs to remove thinking artifacts
final_summary = sanitize_text(final_summary)
# Sanitize findings content
sanitized_findings = []
for f in self.findings:
sanitized_findings.append(Finding(
title=sanitize_text(f.title),
content=sanitize_text(f.content),
source=f.source,
timestamp=f.timestamp
))
return InvestigationResult(
mission=mission,
findings=sanitized_findings,
entities_discovered=self.entities_discovered,
connections_mapped=self.connections_mapped,
report=final_summary,
iterations=iteration,
tools_used=list(set(self.tools_used)),
status="completed"
)
async def _generate_report(self, mission: str) -> str:
"""Generate final report"""
findings_text = "\n".join([
f"- {f.title}: {f.content} (Fonte: {f.source})"
for f in self.findings
]) or "Nenhuma descoberta registrada."
entities_text = ", ".join([
e.get("name", "Unknown") for e in self.entities_discovered[:10]
]) or "Nenhuma entidade."
prompt = f"""Gere um relatório de investigação:
Missão: {mission}
Descobertas:
{findings_text}
Entidades: {entities_text}
Ferramentas usadas: {', '.join(set(self.tools_used))}
Gere relatório estruturado com: Resumo Executivo, Descobertas, Entidades, Recomendações."""
response = await self._call_llm([
{"role": "system", "content": "Gere relatórios concisos."},
{"role": "user", "content": prompt}
])
return sanitize_text(response["choices"][0]["message"]["content"])
# Singleton
investigator_agent = InvestigatorAgent()