Numidium / app /services /investigation.py
Madras1's picture
Upload 58 files
5ddc5dd verified
"""
Investigation Service - Builds comprehensive dossiers
Combines CNPJ data, transparency/sanctions, Lancer web search, and NER
"""
import httpx
from typing import Optional, Dict, Any, List
from dataclasses import dataclass, field, asdict
import asyncio
from app.services.brazil_apis import consultar_cnpj, CompanyData
from app.services.transparencia_api import verificar_sancoes
# from app.services.tse_api import buscar_politico # TSE API needs fixing
from app.services import lancer
from app.services.nlp import entity_extractor
from app.core.database import get_db
from app.models.entity import Entity, Relationship
LANCER_URL = "https://madras1-lancer.hf.space/api/v1"
@dataclass
class DossierSection:
"""A section of the dossier"""
titulo: str
conteudo: Any
status: str = "ok" # ok, warning, danger, info
icone: str = "📋"
@dataclass
class Dossier:
"""Complete investigation dossier"""
tipo: str # "organization" or "person"
alvo: str # Target name
cnpj_cpf: Optional[str] = None
# Sections
dados_cadastrais: Optional[DossierSection] = None
socios: Optional[DossierSection] = None
sancoes: Optional[DossierSection] = None
dados_politicos: Optional[DossierSection] = None # TSE data
noticias: Optional[DossierSection] = None
entidades_relacionadas: Optional[DossierSection] = None
# Metadata
red_flags: List[str] = field(default_factory=list)
score_risco: int = 0 # 0-100
data_geracao: str = ""
fonte_dados: List[str] = field(default_factory=list)
async def investigar_empresa(nome_ou_cnpj: str) -> Dossier:
"""
Investigate a company and build a comprehensive dossier.
"""
import re
from datetime import datetime
dossier = Dossier(
tipo="organization",
alvo=nome_ou_cnpj,
data_geracao=datetime.now().isoformat()
)
# Check if input is CNPJ
cnpj_clean = re.sub(r'[^0-9]', '', nome_ou_cnpj)
is_cnpj = len(cnpj_clean) == 14
company_data = None
# 1. Get company data from CNPJ
if is_cnpj:
dossier.cnpj_cpf = cnpj_clean
company_data = await consultar_cnpj(cnpj_clean)
if company_data:
dossier.alvo = company_data.razao_social or company_data.nome_fantasia or nome_ou_cnpj
dossier.fonte_dados.append(company_data.fonte)
# Build cadastral section
dossier.dados_cadastrais = DossierSection(
titulo="Dados Cadastrais",
icone="🏢",
conteudo={
"cnpj": company_data.cnpj,
"razao_social": company_data.razao_social,
"nome_fantasia": company_data.nome_fantasia,
"situacao": company_data.situacao,
"data_abertura": company_data.data_abertura,
"natureza_juridica": company_data.natureza_juridica,
"capital_social": company_data.capital_social,
"porte": company_data.porte,
"endereco": f"{company_data.logradouro}, {company_data.numero} - {company_data.bairro}, {company_data.cidade}/{company_data.uf}",
"cep": company_data.cep,
"telefone": company_data.telefone,
"email": company_data.email,
"atividade_principal": f"{company_data.cnae_principal} - {company_data.cnae_descricao}"
}
)
# Check situação for red flags
if company_data.situacao and "ATIVA" not in company_data.situacao.upper():
dossier.red_flags.append(f"⚠️ Situação cadastral: {company_data.situacao}")
dossier.dados_cadastrais.status = "warning"
# Build partners section
if company_data.socios:
dossier.socios = DossierSection(
titulo=f"Sócios ({len(company_data.socios)})",
icone="👥",
conteudo=company_data.socios
)
# 2. Check sanctions/transparency
if dossier.cnpj_cpf:
sancoes = await verificar_sancoes(dossier.cnpj_cpf)
dossier.fonte_dados.append("Portal da Transparência")
if sancoes["tem_sancoes"]:
dossier.red_flags.append(f"🚨 Encontrado em {sancoes['total_sancoes']} lista(s) de sanções")
dossier.score_risco += 40
dossier.sancoes = DossierSection(
titulo=f"Sanções ({sancoes['total_sancoes']})",
icone="⚠️",
status="danger",
conteudo=sancoes
)
else:
dossier.sancoes = DossierSection(
titulo="Sanções",
icone="✅",
status="ok",
conteudo={"mensagem": "Nenhuma sanção encontrada nos cadastros públicos"}
)
# 3. Web search for news and context
search_query = dossier.alvo
if company_data and company_data.nome_fantasia:
search_query = company_data.nome_fantasia
try:
web_result = await lancer.search(f"{search_query} notícias escândalos processos", max_results=8)
if web_result.answer or web_result.results:
dossier.fonte_dados.append("Lancer Web Search")
news_content = {
"resumo": web_result.answer or "Sem resumo disponível",
"fontes": [
{"titulo": r.title, "url": r.url, "snippet": r.content[:200]}
for r in web_result.results[:5]
]
}
dossier.noticias = DossierSection(
titulo="Notícias e Mídia",
icone="📰",
conteudo=news_content
)
# Check for negative keywords in news
negative_keywords = ["escândalo", "fraude", "corrupção", "prisão", "investigado", "denúncia", "irregularidade"]
raw_text = (web_result.answer or "").lower()
for kw in negative_keywords:
if kw in raw_text:
dossier.red_flags.append(f"📰 Menção a '{kw}' encontrada nas notícias")
dossier.noticias.status = "warning"
dossier.score_risco += 10
break
except Exception as e:
print(f"Web search error: {e}")
# 4. Extract related entities using NER
if dossier.noticias and dossier.noticias.conteudo.get("resumo"):
try:
text_to_analyze = dossier.noticias.conteudo.get("resumo", "")[:3000]
ner_result = await entity_extractor.extract(text_to_analyze)
if ner_result.entities:
entities = [
{"nome": e.name, "tipo": e.type, "descricao": e.description or e.role}
for e in ner_result.entities[:10]
]
dossier.entidades_relacionadas = DossierSection(
titulo=f"Entidades Relacionadas ({len(entities)})",
icone="🔗",
conteudo=entities
)
except Exception as e:
print(f"NER error: {e}")
# Calculate final risk score
dossier.score_risco = min(100, dossier.score_risco + len(dossier.red_flags) * 5)
return dossier
async def investigar_pessoa(nome: str, cpf: Optional[str] = None) -> Dossier:
"""
Investigate a person and build a dossier.
Note: CPF data is heavily protected by LGPD, so mainly uses web search.
"""
from datetime import datetime
dossier = Dossier(
tipo="person",
alvo=nome,
cnpj_cpf=cpf,
data_geracao=datetime.now().isoformat()
)
# 1. Check sanctions if CPF provided
if cpf:
sancoes = await verificar_sancoes(cpf)
dossier.fonte_dados.append("Portal da Transparência")
if sancoes["tem_sancoes"]:
dossier.red_flags.append(f"🚨 Encontrado em {sancoes['total_sancoes']} lista(s) de sanções")
dossier.score_risco += 50
dossier.sancoes = DossierSection(
titulo=f"Sanções ({sancoes['total_sancoes']})",
icone="⚠️",
status="danger",
conteudo=sancoes
)
# 2. Check TSE for political data (DISABLED - API needs fixing)
# try:
# tse_data = await buscar_politico(nome)
# if tse_data.get("encontrado"):
# dossier.fonte_dados.append("TSE (DivulgaCand)")
# candidaturas = tse_data.get("candidaturas", [])
# patrimonio = tse_data.get("total_patrimonio", 0)
# partidos = tse_data.get("partidos", [])
# dossier.dados_politicos = DossierSection(...)
# except Exception as e:
# print(f"TSE search error: {e}")
# 3. Web search for information
try:
web_result = await lancer.search(f'"{nome}" biografia cargo empresa', max_results=10)
if web_result.answer or web_result.results:
dossier.fonte_dados.append("Lancer Web Search")
dossier.noticias = DossierSection(
titulo="Informações Públicas",
icone="🌐",
conteudo={
"resumo": web_result.answer or "Informações limitadas",
"fontes": [
{"titulo": r.title, "url": r.url, "snippet": r.content[:200]}
for r in web_result.results[:5]
]
}
)
# Check for negative keywords
negative_keywords = ["preso", "condenado", "investigado", "acusado", "escândalo", "fraude"]
raw_text = (web_result.answer or "").lower()
for kw in negative_keywords:
if kw in raw_text:
dossier.red_flags.append(f"📰 Menção a '{kw}' encontrada")
dossier.noticias.status = "warning"
dossier.score_risco += 15
break
except Exception as e:
print(f"Web search error: {e}")
# 3. Extract related entities
if dossier.noticias and dossier.noticias.conteudo.get("resumo"):
try:
ner_result = await entity_extractor.extract(dossier.noticias.conteudo["resumo"][:2000])
if ner_result.entities:
entities = [
{"nome": e.name, "tipo": e.type, "descricao": e.description or e.role}
for e in ner_result.entities[:10]
if e.name.lower() != nome.lower() # Exclude the target
]
if entities:
dossier.entidades_relacionadas = DossierSection(
titulo=f"Conexões ({len(entities)})",
icone="🔗",
conteudo=entities
)
except Exception as e:
print(f"NER error: {e}")
dossier.score_risco = min(100, dossier.score_risco + len(dossier.red_flags) * 5)
return dossier
def dossier_to_dict(dossier: Dossier) -> Dict[str, Any]:
"""Convert dossier to dictionary for JSON response"""
result = {
"tipo": dossier.tipo,
"alvo": dossier.alvo,
"cnpj_cpf": dossier.cnpj_cpf,
"red_flags": dossier.red_flags,
"score_risco": dossier.score_risco,
"data_geracao": dossier.data_geracao,
"fonte_dados": dossier.fonte_dados,
"secoes": {}
}
for field_name in ["dados_cadastrais", "socios", "sancoes", "dados_politicos", "noticias", "entidades_relacionadas"]:
section = getattr(dossier, field_name)
if section:
result["secoes"][field_name] = {
"titulo": section.titulo,
"icone": section.icone,
"status": section.status,
"conteudo": section.conteudo
}
return result