| """
|
| Investigator Agent - Autonomous Investigation with Tool Calling
|
| Uses Cerebras native tool calling for multi-source investigations
|
| """
|
| import json
|
| import re
|
| import httpx
|
| from typing import Optional, List, Dict, Any
|
| from dataclasses import dataclass, field
|
| from datetime import datetime
|
| from sqlalchemy.orm import Session
|
|
|
| from app.config import settings
|
| from app.services import lancer
|
| from app.services.brazil_apis import consultar_cnpj
|
| from app.models.entity import Entity, Relationship
|
|
|
|
|
| def sanitize_text(text: str) -> str:
|
| """
|
| Clean up text from model that may contain thinking artifacts.
|
| Only removes thinking tags, does NOT remove valid characters.
|
| """
|
| if not text:
|
| return text
|
|
|
|
|
| text = re.sub(r'<think>.*?</think>', '', text, flags=re.DOTALL)
|
| text = re.sub(r'<\|think\|>.*?<\|/think\|>', '', text, flags=re.DOTALL)
|
|
|
|
|
| text = re.sub(r'<\|.*?\|>', '', text)
|
|
|
|
|
| text = re.sub(r'\n{3,}', '\n\n', text)
|
|
|
| return text.strip()
|
|
|
|
|
| @dataclass
|
| class Finding:
|
| """A discovery made during investigation"""
|
| title: str
|
| content: str
|
| source: str
|
| timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
|
|
|
|
|
| @dataclass
|
| class InvestigationResult:
|
| """Complete investigation result"""
|
| mission: str
|
| findings: List[Finding]
|
| entities_discovered: List[Dict[str, Any]]
|
| connections_mapped: List[Dict[str, Any]]
|
| report: str
|
| iterations: int
|
| tools_used: List[str]
|
| status: str = "completed"
|
|
|
|
|
|
|
| TOOLS = [
|
| {
|
| "type": "function",
|
| "function": {
|
| "name": "search_entity",
|
| "description": "Buscar entidade no NUMIDIUM (grafo de conhecimento) por nome. Use para encontrar pessoas, empresas ou locais já conhecidos.",
|
| "parameters": {
|
| "type": "object",
|
| "properties": {
|
| "query": {
|
| "type": "string",
|
| "description": "Nome ou termo para buscar"
|
| },
|
| "entity_type": {
|
| "type": "string",
|
| "enum": ["person", "organization", "location", "any"],
|
| "description": "Tipo de entidade (opcional)"
|
| }
|
| },
|
| "required": ["query"]
|
| }
|
| }
|
| },
|
| {
|
| "type": "function",
|
| "function": {
|
| "name": "get_connections",
|
| "description": "Obter a rede de conexões de uma entidade específica. Retorna entidades relacionadas.",
|
| "parameters": {
|
| "type": "object",
|
| "properties": {
|
| "entity_id": {
|
| "type": "string",
|
| "description": "ID da entidade no NUMIDIUM"
|
| }
|
| },
|
| "required": ["entity_id"]
|
| }
|
| }
|
| },
|
| {
|
| "type": "function",
|
| "function": {
|
| "name": "lookup_cnpj",
|
| "description": "Consultar dados de uma empresa brasileira pelo CNPJ. Retorna razão social, sócios, endereço, CNAEs, etc.",
|
| "parameters": {
|
| "type": "object",
|
| "properties": {
|
| "cnpj": {
|
| "type": "string",
|
| "description": "CNPJ da empresa (com ou sem formatação)"
|
| }
|
| },
|
| "required": ["cnpj"]
|
| }
|
| }
|
| },
|
| {
|
| "type": "function",
|
| "function": {
|
| "name": "web_search",
|
| "description": "Pesquisar informações na web. Use para buscar notícias, artigos e informações públicas.",
|
| "parameters": {
|
| "type": "object",
|
| "properties": {
|
| "query": {
|
| "type": "string",
|
| "description": "Termo de busca"
|
| },
|
| "freshness": {
|
| "type": "string",
|
| "enum": ["day", "week", "month", "any"],
|
| "description": "Frescor dos resultados",
|
| "default": "any"
|
| }
|
| },
|
| "required": ["query"]
|
| }
|
| }
|
| },
|
| {
|
| "type": "function",
|
| "function": {
|
| "name": "deep_research",
|
| "description": "Pesquisa profunda e multi-dimensional sobre um tema. Use para tópicos complexos.",
|
| "parameters": {
|
| "type": "object",
|
| "properties": {
|
| "topic": {
|
| "type": "string",
|
| "description": "Tópico para pesquisa profunda"
|
| }
|
| },
|
| "required": ["topic"]
|
| }
|
| }
|
| },
|
| {
|
| "type": "function",
|
| "function": {
|
| "name": "save_finding",
|
| "description": "Salvar uma descoberta importante da investigação.",
|
| "parameters": {
|
| "type": "object",
|
| "properties": {
|
| "title": {
|
| "type": "string",
|
| "description": "Título curto da descoberta"
|
| },
|
| "content": {
|
| "type": "string",
|
| "description": "Conteúdo detalhado"
|
| },
|
| "source": {
|
| "type": "string",
|
| "description": "Fonte da informação"
|
| }
|
| },
|
| "required": ["title", "content", "source"]
|
| }
|
| }
|
| },
|
| {
|
| "type": "function",
|
| "function": {
|
| "name": "finish_investigation",
|
| "description": "Finalizar a investigação e gerar o relatório final.",
|
| "parameters": {
|
| "type": "object",
|
| "properties": {
|
| "summary": {
|
| "type": "string",
|
| "description": "Resumo das descobertas principais"
|
| }
|
| },
|
| "required": ["summary"]
|
| }
|
| }
|
| }
|
| ]
|
|
|
|
|
| SYSTEM_PROMPT = """Você é um agente investigador autônomo do sistema NUMIDIUM/AVANGARD. /no_think
|
|
|
| Sua missão é investigar temas usando múltiplas fontes de dados:
|
| - NUMIDIUM: Grafo de conhecimento com entidades e relacionamentos
|
| - Consulta CNPJ: Dados oficiais de empresas brasileiras (BrasilAPI)
|
| - Web Search: Pesquisa na internet via Lancer
|
|
|
| ## Estratégia de Investigação:
|
|
|
| 1. Comece buscando no NUMIDIUM se já temos informações sobre o alvo
|
| 2. Para empresas brasileiras, consulte o CNPJ para obter sócios e dados
|
| 3. Use web_search para buscar notícias e informações públicas
|
| 4. Para cada sócio/conexão descoberta, considere investigar mais a fundo
|
| 5. Use save_finding para registrar descobertas importantes
|
| 6. Quando tiver informações suficientes, use finish_investigation
|
|
|
| ## Regras:
|
| - Seja metódico e siga pistas
|
| - Não invente informações - use apenas dados das ferramentas
|
| - Priorize qualidade sobre quantidade
|
| - Cite sempre as fontes
|
| - NÃO use pensamento interno ou tags <think>. Responda diretamente."""
|
|
|
|
|
| class InvestigatorAgent:
|
| """Autonomous investigation agent with tool calling"""
|
|
|
| def __init__(self):
|
| self.api_url = "https://api.cerebras.ai/v1/chat/completions"
|
| self.api_key = settings.cerebras_api_key
|
| self.model = "zai-glm-4.7"
|
|
|
|
|
| self.findings: List[Finding] = []
|
| self.entities_discovered: List[Dict[str, Any]] = []
|
| self.connections_mapped: List[Dict[str, Any]] = []
|
| self.tools_used: List[str] = []
|
| self.messages: List[Dict[str, Any]] = []
|
| self.db: Optional[Session] = None
|
|
|
| def _reset_state(self):
|
| """Reset investigation state"""
|
| self.findings = []
|
| self.entities_discovered = []
|
| self.connections_mapped = []
|
| self.tools_used = []
|
| self.messages = []
|
|
|
| async def _call_llm(
|
| self,
|
| messages: List[Dict[str, Any]],
|
| tools: List[Dict] = None
|
| ) -> Dict[str, Any]:
|
| """Call Cerebras API with tool calling support"""
|
| try:
|
| payload = {
|
| "model": self.model,
|
| "messages": messages,
|
| "temperature": 0.3,
|
| "max_tokens": 2048,
|
| }
|
|
|
| if tools:
|
| payload["tools"] = tools
|
| payload["tool_choice"] = "auto"
|
| payload["parallel_tool_calls"] = True
|
|
|
| async with httpx.AsyncClient(timeout=60.0) as client:
|
| response = await client.post(
|
| self.api_url,
|
| headers={
|
| "Authorization": f"Bearer {self.api_key}",
|
| "Content-Type": "application/json"
|
| },
|
| json=payload
|
| )
|
|
|
| if response.status_code != 200:
|
| raise Exception(f"API error: {response.status_code} - {response.text}")
|
|
|
| return response.json()
|
|
|
| except Exception as e:
|
| raise Exception(f"LLM call failed: {str(e)}")
|
|
|
| async def _execute_tool(self, tool_name: str, arguments: Dict) -> str:
|
| """Execute a tool and return the result"""
|
| self.tools_used.append(tool_name)
|
|
|
| try:
|
| if tool_name == "search_entity":
|
| return await self._search_entity(
|
| arguments.get("query", ""),
|
| arguments.get("entity_type")
|
| )
|
|
|
| elif tool_name == "get_connections":
|
| return await self._get_connections(arguments.get("entity_id"))
|
|
|
| elif tool_name == "lookup_cnpj":
|
| return await self._lookup_cnpj(arguments.get("cnpj", ""))
|
|
|
|
|
| elif tool_name == "web_search":
|
| return await self._web_search(
|
| arguments.get("query", ""),
|
| arguments.get("freshness", "any")
|
| )
|
|
|
| elif tool_name == "deep_research":
|
| return await self._deep_research(arguments.get("topic", ""))
|
|
|
| elif tool_name == "aether_search":
|
| return await self._aether_search(arguments.get("query", ""))
|
|
|
| elif tool_name == "aether_entities":
|
| return await self._aether_entities()
|
|
|
| elif tool_name == "save_finding":
|
| finding = Finding(
|
| title=arguments.get("title", ""),
|
| content=arguments.get("content", ""),
|
| source=arguments.get("source", "")
|
| )
|
| self.findings.append(finding)
|
| return f"Descoberta salva: {finding.title}"
|
|
|
| elif tool_name == "finish_investigation":
|
| return f"INVESTIGATION_COMPLETE: {arguments.get('summary', '')}"
|
|
|
| else:
|
| return f"Ferramenta desconhecida: {tool_name}"
|
|
|
| except Exception as e:
|
| return f"Erro ao executar {tool_name}: {str(e)}"
|
|
|
| async def _search_entity(self, query: str, entity_type: Optional[str]) -> str:
|
| """Search entities in database"""
|
| if not self.db:
|
| return "Erro: Banco de dados não disponível"
|
|
|
| q = self.db.query(Entity).filter(Entity.name.ilike(f"%{query}%"))
|
| if entity_type and entity_type != "any":
|
| q = q.filter(Entity.type == entity_type)
|
|
|
| entities = q.limit(10).all()
|
|
|
| if entities:
|
| result = []
|
| for e in entities:
|
| self.entities_discovered.append({
|
| "id": str(e.id),
|
| "name": e.name,
|
| "type": e.type
|
| })
|
| result.append({
|
| "id": str(e.id),
|
| "name": e.name,
|
| "type": e.type,
|
| "description": e.description[:200] if e.description else None
|
| })
|
| return json.dumps(result, ensure_ascii=False, indent=2)
|
|
|
| return "Nenhuma entidade encontrada no NUMIDIUM."
|
|
|
| async def _get_connections(self, entity_id: str) -> str:
|
| """Get entity connections"""
|
| if not self.db:
|
| return "Erro: Banco de dados não disponível"
|
|
|
| relationships = self.db.query(Relationship).filter(
|
| (Relationship.source_id == entity_id) | (Relationship.target_id == entity_id)
|
| ).limit(20).all()
|
|
|
| if relationships:
|
| connections = []
|
| for rel in relationships:
|
| source = self.db.query(Entity).filter(Entity.id == rel.source_id).first()
|
| target = self.db.query(Entity).filter(Entity.id == rel.target_id).first()
|
| if source and target:
|
| connections.append({
|
| "source": source.name,
|
| "target": target.name,
|
| "type": rel.type
|
| })
|
| return json.dumps(connections, ensure_ascii=False, indent=2)
|
|
|
| return "Nenhuma conexão encontrada."
|
|
|
| async def _lookup_cnpj(self, cnpj: str) -> str:
|
| """Lookup CNPJ via BrasilAPI"""
|
| cnpj_clean = cnpj.replace(".", "").replace("/", "").replace("-", "")
|
| result = await consultar_cnpj(cnpj_clean)
|
|
|
| if result:
|
| data = {
|
| "razao_social": result.razao_social,
|
| "nome_fantasia": result.nome_fantasia,
|
| "situacao": result.situacao,
|
| "data_abertura": result.data_abertura,
|
| "capital_social": result.capital_social,
|
| "endereco": f"{result.logradouro}, {result.numero} - {result.cidade}/{result.uf}",
|
| "cnae": f"{result.cnae_principal} - {result.cnae_descricao}",
|
| "socios": result.socios
|
| }
|
| return json.dumps(data, ensure_ascii=False, indent=2)
|
|
|
| return "CNPJ não encontrado."
|
|
|
| async def _lookup_phone(self, phone: str) -> str:
|
| """Lookup phone number via NumVerify API"""
|
|
|
| phone_clean = "".join(c for c in phone if c.isdigit())
|
|
|
|
|
| numverify_key = getattr(settings, 'numverify_api_key', None)
|
|
|
| if not numverify_key:
|
|
|
| return await self._web_search(f'"{phone_clean}" telefone', "any")
|
|
|
| try:
|
| async with httpx.AsyncClient(timeout=10.0) as client:
|
| response = await client.get(
|
| "http://apilayer.net/api/validate",
|
| params={
|
| "access_key": numverify_key,
|
| "number": phone_clean,
|
| "country_code": "",
|
| "format": 1
|
| }
|
| )
|
|
|
| if response.status_code == 200:
|
| data = response.json()
|
|
|
| if data.get("valid"):
|
| result = {
|
| "numero": data.get("international_format"),
|
| "valido": True,
|
| "pais": data.get("country_name"),
|
| "codigo_pais": data.get("country_code"),
|
| "operadora": data.get("carrier"),
|
| "tipo_linha": data.get("line_type"),
|
| "localizacao": data.get("location")
|
| }
|
| return json.dumps(result, ensure_ascii=False, indent=2)
|
| else:
|
| return f"Número {phone_clean} não é válido ou não foi encontrado."
|
|
|
| return "Erro ao consultar número."
|
|
|
| except Exception as e:
|
|
|
| return await self._web_search(f'"{phone_clean}" telefone', "any")
|
|
|
| async def _web_search(self, query: str, freshness: str) -> str:
|
| """Web search via Lancer"""
|
| try:
|
| result = await lancer.search(query, max_results=5, freshness=freshness)
|
| if result.answer:
|
| return f"Resumo: {result.answer}\n\nFontes: {len(result.results)} resultados"
|
| return "Nenhum resultado encontrado."
|
| except Exception as e:
|
| return f"Erro na busca web: {str(e)}"
|
|
|
| async def _deep_research(self, topic: str) -> str:
|
| """Deep research via Lancer"""
|
| try:
|
| result = await lancer.deep_research(topic, max_dimensions=3)
|
| if result.answer:
|
| return result.answer
|
| return "Pesquisa profunda não retornou resultados."
|
| except Exception as e:
|
| return f"Erro na pesquisa: {str(e)}"
|
|
|
| async def _aether_search(self, query: str) -> str:
|
| """Semantic search via AetherMap"""
|
| try:
|
|
|
| if not aethermap.current_job_id:
|
|
|
| if self.db:
|
| entities = self.db.query(Entity).limit(500).all()
|
| if entities:
|
| texts = []
|
| for e in entities:
|
| text = f"{e.name} ({e.type})"
|
| if e.description:
|
| text += f": {e.description[:500]}"
|
| texts.append(text)
|
|
|
| if texts:
|
| result = await aethermap.process_documents(texts, fast_mode=True)
|
|
|
|
|
| if aethermap.current_job_id:
|
| result = await aethermap.semantic_search(query, turbo_mode=True)
|
| return f"RAG Response:\n{result.summary}"
|
| else:
|
| return "Nenhum documento indexado no AetherMap."
|
|
|
| except Exception as e:
|
| return f"Erro no AetherMap search: {str(e)}"
|
|
|
| async def _aether_entities(self) -> str:
|
| """Extract NER entities via AetherMap"""
|
| try:
|
| if not aethermap.current_job_id:
|
| return "Nenhum documento indexado. Use aether_search primeiro."
|
|
|
| result = await aethermap.extract_entities()
|
|
|
|
|
| output = []
|
|
|
| if result.hubs:
|
| output.append("**Entidades Centrais (Hubs):**")
|
| for hub in result.hubs[:5]:
|
| output.append(f"- {hub.get('entity')} ({hub.get('type')}): {hub.get('degree')} conexões")
|
|
|
| if result.insights:
|
| output.append(f"\n**Insights:**")
|
| output.append(f"- Total de conexões: {result.insights.get('total_connections', 0)}")
|
| output.append(f"- Grau médio: {result.insights.get('avg_degree', 0)}")
|
|
|
| if result.edges:
|
| output.append(f"\n**Top 5 Relacionamentos:**")
|
| for edge in result.edges[:5]:
|
| output.append(f"- {edge.source_entity} <-> {edge.target_entity}: {edge.reason}")
|
|
|
| return "\n".join(output) if output else "Nenhuma entidade significativa encontrada."
|
|
|
| except Exception as e:
|
| return f"Erro na extração de entidades: {str(e)}"
|
|
|
| async def investigate(
|
| self,
|
| mission: str,
|
| db: Session,
|
| max_iterations: int = 10
|
| ) -> InvestigationResult:
|
| """Main investigation loop"""
|
| self._reset_state()
|
| self.db = db
|
|
|
| self.messages = [
|
| {"role": "system", "content": SYSTEM_PROMPT},
|
| {"role": "user", "content": f"Missão de investigação: {mission}\n\nComece a investigação."}
|
| ]
|
|
|
| iteration = 0
|
| final_summary = ""
|
|
|
| while iteration < max_iterations:
|
| iteration += 1
|
|
|
| response = await self._call_llm(self.messages, TOOLS)
|
|
|
| choice = response["choices"][0]
|
| message = choice["message"]
|
| self.messages.append(message)
|
|
|
| tool_calls = message.get("tool_calls", [])
|
|
|
| if not tool_calls:
|
| if message.get("content"):
|
| final_summary = message["content"]
|
| break
|
|
|
| for tool_call in tool_calls:
|
| func = tool_call["function"]
|
| tool_name = func["name"]
|
|
|
| try:
|
| arguments = json.loads(func["arguments"])
|
| except:
|
| arguments = {}
|
|
|
| result = await self._execute_tool(tool_name, arguments)
|
|
|
| if result.startswith("INVESTIGATION_COMPLETE:"):
|
| final_summary = result.replace("INVESTIGATION_COMPLETE:", "").strip()
|
| break
|
|
|
| self.messages.append({
|
| "role": "tool",
|
| "tool_call_id": tool_call["id"],
|
| "content": result
|
| })
|
|
|
| if final_summary:
|
| break
|
|
|
| if not final_summary:
|
| final_summary = await self._generate_report(mission)
|
|
|
|
|
| final_summary = sanitize_text(final_summary)
|
|
|
|
|
| sanitized_findings = []
|
| for f in self.findings:
|
| sanitized_findings.append(Finding(
|
| title=sanitize_text(f.title),
|
| content=sanitize_text(f.content),
|
| source=f.source,
|
| timestamp=f.timestamp
|
| ))
|
|
|
| return InvestigationResult(
|
| mission=mission,
|
| findings=sanitized_findings,
|
| entities_discovered=self.entities_discovered,
|
| connections_mapped=self.connections_mapped,
|
| report=final_summary,
|
| iterations=iteration,
|
| tools_used=list(set(self.tools_used)),
|
| status="completed"
|
| )
|
|
|
| async def _generate_report(self, mission: str) -> str:
|
| """Generate final report"""
|
| findings_text = "\n".join([
|
| f"- {f.title}: {f.content} (Fonte: {f.source})"
|
| for f in self.findings
|
| ]) or "Nenhuma descoberta registrada."
|
|
|
| entities_text = ", ".join([
|
| e.get("name", "Unknown") for e in self.entities_discovered[:10]
|
| ]) or "Nenhuma entidade."
|
|
|
| prompt = f"""Gere um relatório de investigação:
|
|
|
| Missão: {mission}
|
|
|
| Descobertas:
|
| {findings_text}
|
|
|
| Entidades: {entities_text}
|
|
|
| Ferramentas usadas: {', '.join(set(self.tools_used))}
|
|
|
| Gere relatório estruturado com: Resumo Executivo, Descobertas, Entidades, Recomendações."""
|
|
|
| response = await self._call_llm([
|
| {"role": "system", "content": "Gere relatórios concisos."},
|
| {"role": "user", "content": prompt}
|
| ])
|
|
|
| return sanitize_text(response["choices"][0]["message"]["content"])
|
|
|
|
|
|
|
| investigator_agent = InvestigatorAgent()
|
|
|