Spaces:
Running
Running
| """Motor de risco do Analytical-Force. | |
| Recebe as métricas já calculadas em Python e gera uma lista de alertas | |
| classificados em ``low``, ``medium`` e ``high``. NÃO recalcula indicadores — | |
| apenas aplica regras de negócio sobre os números prontos. | |
| Cada alerta segue o contrato: | |
| { | |
| "severity": "low|medium|high", | |
| "category": "Leads|Oportunidades|Tarefas|Satisfação|Cancelamentos", | |
| "title": str, | |
| "description": str, | |
| "recommended_action": str, | |
| "source_object": str | None, | |
| "source_record_id": str | None, | |
| } | |
| """ | |
| from __future__ import annotations | |
| from typing import Any | |
| from ..config.settings import RiskSettings | |
| from ..utils.logger import get_logger | |
| from ..utils.validators import normalizar_severidade | |
| logger = get_logger("analytics.risk_engine") | |
| # Ordem de severidade para ordenação final (maior primeiro). | |
| _ORDEM_SEVERIDADE = {"high": 0, "medium": 1, "low": 2} | |
| def _alerta( | |
| severity: str, | |
| category: str, | |
| title: str, | |
| description: str, | |
| recommended_action: str, | |
| source_object: str | None = None, | |
| source_record_id: str | None = None, | |
| affected_records: list[dict[str, Any]] | None = None, | |
| ) -> dict[str, Any]: | |
| """Cria um dicionário de alerta padronizado. | |
| ``affected_records`` carrega os registros concretos relacionados ao alerta | |
| (nome, valor, dono, etc.), usados para montar tarefas ricas e acionáveis. | |
| """ | |
| return { | |
| "severity": normalizar_severidade(severity), | |
| "category": category, | |
| "title": title, | |
| "description": description, | |
| "recommended_action": recommended_action, | |
| "source_object": source_object, | |
| "source_record_id": source_record_id, | |
| "affected_records": affected_records or [], | |
| } | |
| def _var_percent_anterior(metricas: dict[str, Any], chave: str) -> float | None: | |
| """Lê a variação percentual vs dia anterior de uma métrica (ou None).""" | |
| comparacoes = metricas.get("comparisons", {}) | |
| item = comparacoes.get(chave, {}) | |
| return item.get("variation_percent_vs_previous") | |
| def _var_percent_7dias(metricas: dict[str, Any], chave: str) -> float | None: | |
| """Lê a variação percentual vs média de 7 dias de uma métrica (ou None).""" | |
| comparacoes = metricas.get("comparisons", {}) | |
| item = comparacoes.get(chave, {}) | |
| return item.get("variation_percent_vs_7day_avg") | |
| def _alertas_leads(m: dict[str, Any], risk: RiskSettings) -> list[dict[str, Any]]: | |
| """Regras de risco para Leads.""" | |
| alertas: list[dict[str, Any]] = [] | |
| sem_tarefa = int(m.get("leads_without_first_task", 0) or 0) | |
| if sem_tarefa > 0: | |
| severidade = "high" if sem_tarefa >= 5 else "medium" | |
| alertas.append( | |
| _alerta( | |
| severidade, | |
| "Leads", | |
| f"{sem_tarefa} lead(s) sem primeira tarefa", | |
| f"Há {sem_tarefa} lead(s) criados sem follow-up registrado, " | |
| f"acima do limite de {risk.lead_max_hours_without_task}h sem ação.", | |
| "Distribuir e agendar a primeira tarefa de contato para esses leads.", | |
| source_object="Lead", | |
| ) | |
| ) | |
| avg_first = m.get("avg_time_to_first_task_hours") | |
| if isinstance(avg_first, (int, float)) and avg_first > risk.lead_first_task_target_hours: | |
| alertas.append( | |
| _alerta( | |
| "medium", | |
| "Leads", | |
| "Tempo até primeira tarefa acima da meta", | |
| f"Tempo médio até a primeira tarefa é {avg_first}h, " | |
| f"acima da meta de {risk.lead_first_task_target_hours}h.", | |
| "Revisar a cadência de prospecção e a distribuição de leads.", | |
| source_object="Lead", | |
| ) | |
| ) | |
| var_conv = _var_percent_anterior(m, "conversion_rate") | |
| if var_conv is not None and var_conv <= -risk.conversion_drop_threshold_percent: | |
| alertas.append( | |
| _alerta( | |
| "medium", | |
| "Leads", | |
| "Queda na taxa de conversão", | |
| f"Conversão caiu {abs(var_conv):.1f}% em relação ao dia anterior " | |
| f"(limite de alerta: {risk.conversion_drop_threshold_percent:.0f}%).", | |
| "Investigar qualidade dos leads e abordagem comercial recente.", | |
| source_object="Lead", | |
| ) | |
| ) | |
| var_conv_7d = _var_percent_7dias(m, "conversion_rate") | |
| if var_conv_7d is not None and var_conv_7d < 0: | |
| alertas.append( | |
| _alerta( | |
| "medium", | |
| "Leads", | |
| "Conversão abaixo da média de 7 dias", | |
| f"Conversão está {abs(var_conv_7d):.1f}% abaixo da média dos últimos 7 dias.", | |
| "Comparar com origens de melhor desempenho e ajustar o foco.", | |
| source_object="Lead", | |
| ) | |
| ) | |
| return alertas | |
| def _alertas_oportunidades(m: dict[str, Any], risk: RiskSettings) -> list[dict[str, Any]]: | |
| """Regras de risco para Oportunidades.""" | |
| alertas: list[dict[str, Any]] = [] | |
| alto_valor_paradas = int(m.get("high_value_stalled_opportunities", 0) or 0) | |
| if alto_valor_paradas > 0: | |
| # Usa os IDs/detalhes do SUBCONJUNTO de alto valor (não da lista geral). | |
| ids = m.get("high_value_stalled_opportunity_ids") or [] | |
| detalhes = m.get("high_value_stalled_details") or [] | |
| alertas.append( | |
| _alerta( | |
| "high", | |
| "Oportunidades", | |
| f"{alto_valor_paradas} oportunidade(s) de alto valor parada(s)", | |
| f"{alto_valor_paradas} oportunidade(s) acima de " | |
| f"R$ {risk.high_value_opportunity_amount:,.0f} sem atividade recente.", | |
| "Priorizar contato imediato e definir próximo passo nessas oportunidades.", | |
| source_object="Opportunity", | |
| source_record_id=str(ids[0]) if ids else None, | |
| affected_records=detalhes, | |
| ) | |
| ) | |
| paradas = int(m.get("stalled_opportunities", 0) or 0) | |
| if paradas > 0: | |
| ids = m.get("stalled_opportunity_ids") or [] | |
| detalhes = m.get("stalled_opportunity_details") or [] | |
| alertas.append( | |
| _alerta( | |
| "high", | |
| "Oportunidades", | |
| f"{paradas} oportunidade(s) parada(s)", | |
| f"{paradas} oportunidade(s) aberta(s) sem atividade há mais de " | |
| f"{risk.opportunity_max_days_without_activity} dias.", | |
| "Reengajar essas oportunidades ou reavaliar a previsão de fechamento.", | |
| source_object="Opportunity", | |
| source_record_id=str(ids[0]) if ids else None, | |
| affected_records=detalhes, | |
| ) | |
| ) | |
| sem_tarefa = int(m.get("opportunities_without_next_task", 0) or 0) | |
| if sem_tarefa > 0: | |
| ids = m.get("opportunities_without_task_ids") or [] | |
| detalhes = m.get("opportunities_without_task_details") or [] | |
| alertas.append( | |
| _alerta( | |
| "high", | |
| "Oportunidades", | |
| f"{sem_tarefa} oportunidade(s) sem próxima tarefa", | |
| f"{sem_tarefa} oportunidade(s) aberta(s) sem nenhuma atividade futura agendada.", | |
| "Agendar a próxima ação comercial para cada oportunidade aberta.", | |
| source_object="Opportunity", | |
| source_record_id=str(ids[0]) if ids else None, | |
| affected_records=detalhes, | |
| ) | |
| ) | |
| var_pipeline = _var_percent_anterior(m, "open_pipeline_amount") | |
| if var_pipeline is not None and var_pipeline <= -risk.pipeline_drop_threshold_percent: | |
| alertas.append( | |
| _alerta( | |
| "medium", | |
| "Oportunidades", | |
| "Queda no pipeline aberto", | |
| f"Pipeline aberto caiu {abs(var_pipeline):.1f}% vs dia anterior " | |
| f"(limite: {risk.pipeline_drop_threshold_percent:.0f}%).", | |
| "Verificar perdas recentes e ritmo de geração de novas oportunidades.", | |
| source_object="Opportunity", | |
| ) | |
| ) | |
| var_ganho = _var_percent_anterior(m, "won_amount") | |
| if var_ganho is not None and var_ganho < 0: | |
| alertas.append( | |
| _alerta( | |
| "medium", | |
| "Oportunidades", | |
| "Queda no valor ganho", | |
| f"Valor ganho caiu {abs(var_ganho):.1f}% em relação ao dia anterior.", | |
| "Analisar negócios fechados e foco da equipe comercial.", | |
| source_object="Opportunity", | |
| ) | |
| ) | |
| var_perdidas = _var_percent_anterior(m, "lost_opportunities") | |
| if var_perdidas is not None and var_perdidas > 0: | |
| alertas.append( | |
| _alerta( | |
| "medium", | |
| "Oportunidades", | |
| "Aumento de oportunidades perdidas", | |
| f"Oportunidades perdidas subiram {var_perdidas:.1f}% vs dia anterior.", | |
| "Revisar motivos de perda e objeções recorrentes.", | |
| source_object="Opportunity", | |
| ) | |
| ) | |
| return alertas | |
| def _alertas_tarefas(m: dict[str, Any], risk: RiskSettings) -> list[dict[str, Any]]: | |
| """Regras de risco para Tarefas. | |
| Observação: por decisão do projeto, NÃO geramos alertas de "tarefas vencidas | |
| ligadas a oportunidades" nem de "responsável com mais tarefas vencidas" — | |
| o volume de tarefas vencidas é muito alto e gerava ruído. Esses números | |
| continuam disponíveis nas métricas, mas não viram alerta/tarefa. | |
| """ | |
| alertas: list[dict[str, Any]] = [] | |
| var_venc = _var_percent_anterior(m, "tasks_overdue") | |
| if var_venc is not None and var_venc > 0: | |
| alertas.append( | |
| _alerta( | |
| "medium", | |
| "Tarefas", | |
| "Aumento de tarefas vencidas", | |
| f"Tarefas vencidas aumentaram {var_venc:.1f}% em relação ao dia anterior.", | |
| "Mobilizar a equipe para zerar o backlog de tarefas vencidas.", | |
| source_object="Task", | |
| ) | |
| ) | |
| taxa = m.get("completion_rate") | |
| criadas = int(m.get("tasks_created", 0) or 0) | |
| if criadas > 0 and isinstance(taxa, (int, float)) and taxa < 50.0: | |
| alertas.append( | |
| _alerta( | |
| "medium", | |
| "Tarefas", | |
| "Baixa taxa de conclusão de tarefas", | |
| f"Apenas {taxa:.1f}% das tarefas criadas no dia foram concluídas.", | |
| "Verificar gargalos de execução e prioridades da equipe.", | |
| source_object="Task", | |
| ) | |
| ) | |
| return alertas | |
| def _alertas_satisfacao(m: dict[str, Any], risk: RiskSettings) -> list[dict[str, Any]]: | |
| """Regra de risco ÚNICA para Satisfação (nota, negativas, comentários e queda). | |
| Tudo que diz respeito à nota/satisfação é consolidado em um único alerta, | |
| para não fragmentar (ex.: evitar "nota baixa" + "comentário crítico" separados). | |
| """ | |
| if not m.get("configured"): | |
| return [] | |
| avg = m.get("avg_score") | |
| negativos = int(m.get("negative_count") or 0) | |
| comentarios = m.get("critical_comments") or [] | |
| var_7d = _var_percent_7dias(m, "avg_score") | |
| nota_baixa = isinstance(avg, (int, float)) and avg < risk.satisfaction_min_score | |
| caindo = var_7d is not None and var_7d < 0 | |
| if not (nota_baixa or negativos or comentarios or caindo): | |
| return [] | |
| partes: list[str] = [] | |
| if nota_baixa: | |
| partes.append(f"nota média {avg} abaixo da meta ({risk.satisfaction_min_score})") | |
| if negativos: | |
| partes.append(f"{negativos} avaliação(ões) negativa(s)") | |
| if comentarios: | |
| partes.append(f"{len(comentarios)} comentário(s) crítico(s)") | |
| if caindo: | |
| partes.append(f"queda de {abs(var_7d):.1f}% vs média de 7 dias") | |
| severidade = "high" if (nota_baixa or comentarios) else "medium" | |
| return [ | |
| _alerta( | |
| severidade, | |
| "Satisfação", | |
| "Satisfação em risco", | |
| "Pontos de atenção em satisfação: " + "; ".join(partes) + ".", | |
| "Acionar o CS: contatar os clientes negativos e revisar os " | |
| "comentários críticos do dia.", | |
| ) | |
| ] | |
| def _alertas_cancelamento(m: dict[str, Any], risk: RiskSettings) -> list[dict[str, Any]]: | |
| """Regras de risco para Cancelamentos (somente se configurado).""" | |
| if not m.get("configured"): | |
| return [] | |
| alertas: list[dict[str, Any]] = [] | |
| qtd = int(m.get("cancellations_count", 0) or 0) | |
| mrr = float(m.get("mrr_impact", 0.0) or 0.0) | |
| if qtd > 0 and mrr >= risk.high_value_opportunity_amount: | |
| alertas.append( | |
| _alerta( | |
| "high", | |
| "Cancelamentos", | |
| "Cancelamento com alto impacto financeiro", | |
| f"{qtd} cancelamento(s) somando impacto de R$ {mrr:,.0f} em MRR.", | |
| "Acionar retenção e revisar contratos de maior valor.", | |
| ) | |
| ) | |
| elif qtd > 0: | |
| alertas.append( | |
| _alerta( | |
| "medium", | |
| "Cancelamentos", | |
| f"{qtd} cancelamento(s) no período", | |
| f"Foram registrados {qtd} cancelamento(s). Motivo principal: " | |
| f"{m.get('top_reason') or 'não informado'}.", | |
| "Analisar motivos e iniciar ações de retenção.", | |
| ) | |
| ) | |
| # Mais de um cancelamento no mesmo produto. | |
| por_produto = m.get("cancellations_by_product") or {} | |
| for produto, total in por_produto.items(): | |
| if isinstance(total, int) and total > 1: | |
| alertas.append( | |
| _alerta( | |
| "high", | |
| "Cancelamentos", | |
| f"Cancelamentos recorrentes no produto {produto}", | |
| f"{total} cancelamentos no produto {produto} no período.", | |
| "Investigar causa específica do produto e priorizar correção.", | |
| ) | |
| ) | |
| return alertas | |
| def generate_alerts( | |
| metrics: dict[str, Any], | |
| risk: RiskSettings, | |
| data_quality: dict[str, Any] | None = None, | |
| ) -> list[dict[str, Any]]: | |
| """Gera todos os alertas a partir das métricas calculadas. | |
| Args: | |
| metrics: Métricas aninhadas ``{leads, opportunities, tasks, | |
| satisfaction, cancellations}``. | |
| risk: Limiares configuráveis do motor de risco. | |
| data_quality: Sinais de qualidade de dados (gera alertas informativos). | |
| Returns: | |
| Lista de alertas ordenada por severidade (high → low). | |
| """ | |
| alertas: list[dict[str, Any]] = [] | |
| alertas += _alertas_leads(metrics.get("leads", {}) or {}, risk) | |
| alertas += _alertas_oportunidades(metrics.get("opportunities", {}) or {}, risk) | |
| alertas += _alertas_tarefas(metrics.get("tasks", {}) or {}, risk) | |
| alertas += _alertas_satisfacao(metrics.get("satisfaction", {}) or {}, risk) | |
| alertas += _alertas_cancelamento(metrics.get("cancellations", {}) or {}, risk) | |
| # Alertas informativos de qualidade de dados (severidade baixa). | |
| if data_quality: | |
| if data_quality.get("salesforce_connection") not in (None, "ok"): | |
| alertas.append( | |
| _alerta( | |
| "high", | |
| "Dados", | |
| "Falha de conexão com o Salesforce", | |
| "A extração do Salesforce não foi concluída com sucesso.", | |
| "Verificar credenciais e disponibilidade da API do Salesforce.", | |
| ) | |
| ) | |
| alertas.sort(key=lambda a: _ORDEM_SEVERIDADE.get(a["severity"], 99)) | |
| logger.info("Motor de risco gerou %d alerta(s).", len(alertas)) | |
| return alertas | |