import gradio as gr import pandas as pd import numpy as np import matplotlib.pyplot as plt import seaborn as sns from datetime import datetime import warnings warnings.filterwarnings('ignore') # Machine Learning from sklearn.model_selection import train_test_split from sklearn.preprocessing import StandardScaler, LabelEncoder from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score, roc_curve from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier from sklearn.linear_model import LogisticRegression from sklearn.tree import DecisionTreeClassifier from imblearn.over_sampling import SMOTE import plotly.express as px import plotly.graph_objects as go from plotly.subplots import make_subplots # Configurações de visualização plt.style.use('seaborn-v0_8-darkgrid') sns.set_palette('husl') class ConsumerComplaintAnalyzer: def __init__(self): self.df = None self.df_processed = None self.models = {} self.results_df = None self.feature_importance = None self.is_trained = False self.X_test_scaled = None self.y_test = None def load_data_preview(self, file_obj): """Carrega pré-visualização dos dados""" if file_obj is None: return None, {} try: df_preview = pd.read_csv(file_obj.name, sep='\t').head(5) df_full = pd.read_csv(file_obj.name, sep='\t') stats = { "Total de Registros": len(df_full), "Total de Colunas": len(df_full.columns), "Taxa de Queixas": f"{df_full['Complain'].mean() * 100:.2f}%" if 'Complain' in df_full.columns else "N/A" } return df_preview, stats except Exception as e: return None, {"Erro": str(e)} def load_and_preprocess_data(self, file_obj): """Carrega e pré-processa os dados""" try: self.df = pd.read_csv(file_obj.name, sep='\t') # Engenharia de features self.df_processed = self.df.copy() # Criar variável de idade current_year = 2025 self.df_processed["Age"] = current_year - self.df_processed["Year_Birth"] # Criar variável de tempo como cliente self.df_processed["Dt_Customer"] = pd.to_datetime(self.df_processed["Dt_Customer"], format='%d-%m-%Y', errors='coerce') self.df_processed["Customer_Days"] = (pd.Timestamp('2025-11-02') - self.df_processed["Dt_Customer"]).dt.days self.df_processed["Customer_Years"] = self.df_processed["Customer_Days"] / 365.25 # Total de gastos spend_cols = ['MntWines', 'MntFruits', 'MntMeatProducts', 'MntFishProducts', 'MntSweetProducts', 'MntGoldProds'] self.df_processed["Total_Spent"] = self.df_processed[spend_cols].sum(axis=1) # Total de compras purchase_cols = ['NumDealsPurchases', 'NumCatalogPurchases', 'NumStorePurchases', 'NumWebPurchases'] self.df_processed["Total_Purchases"] = self.df_processed[purchase_cols].sum(axis=1) # Total de crianças/adolescentes self.df_processed["Total_Children"] = self.df_processed["Kidhome"] + self.df_processed["Teenhome"] # Total de campanhas aceitas campaign_cols = ['AcceptedCmp1', 'AcceptedCmp2', 'AcceptedCmp3', 'AcceptedCmp4', 'AcceptedCmp5'] self.df_processed["Total_Campaigns_Accepted"] = self.df_processed[campaign_cols].sum(axis=1) # Ticket médio self.df_processed["Average_Purchase_Value"] = self.df_processed["Total_Spent"] / (self.df_processed["Total_Purchases"] + 1e-8) # Tratamento de valores ausentes if 'Income' in self.df_processed.columns: self.df_processed['Income'] = self.df_processed.groupby('Education')['Income'].transform( lambda x: x.fillna(x.median()) ) # Remover outliers extremos self.df_processed = self.df_processed[(self.df_processed['Age'] >= 18) & (self.df_processed['Age'] <= 100)] # Capear valores extremos de Income if 'Income' in self.df_processed.columns: income_cap = self.df_processed['Income'].quantile(0.99) self.df_processed['Income'] = self.df_processed['Income'].clip(upper=income_cap) return "✅ Dados carregados e pré-processados com sucesso!" except Exception as e: return f"❌ Erro ao carregar dados: {str(e)}" def create_complaint_distribution(self): """Cria gráfico de distribuição de queixas""" if self.df is None: return self._create_placeholder_plot("Carregue os dados primeiro") complain_counts = self.df['Complain'].value_counts() fig = make_subplots(rows=1, cols=2, subplot_titles=['Distribuição de Queixas', 'Proporção de Queixas'], specs=[[{"type": "bar"}, {"type": "pie"}]]) # Gráfico de barras fig.add_trace( go.Bar(x=['Sem Queixa', 'Com Queixa'], y=complain_counts.values, marker_color=['#2ecc71', '#e74c3c'], text=complain_counts.values, textposition='auto'), row=1, col=1 ) # Gráfico de pizza fig.add_trace( go.Pie(labels=['Sem Queixa', 'Com Queixa'], values=complain_counts.values, marker_colors=['#2ecc71', '#e74c3c']), row=1, col=2 ) fig.update_layout( height=400, showlegend=True, title_text="Distribuição de Queixas de Consumidores", font=dict(size=12) ) return fig def create_correlation_heatmap(self, selected_variables): """Cria heatmap de correlação""" if self.df_processed is None: return self._create_placeholder_plot("Carregue os dados primeiro") if not selected_variables: return self._create_placeholder_plot("Selecione variáveis para análise") # Verificar quais variáveis existem no dataframe available_vars = [var for var in selected_variables if var in self.df_processed.columns] if not available_vars: return self._create_placeholder_plot("Nenhuma variável válida selecionada") corr_matrix = self.df_processed[available_vars].corr() fig = px.imshow(corr_matrix, text_auto=True, aspect="auto", color_continuous_scale='RdBu_r', title="Matriz de Correlação") fig.update_layout(height=500) return fig def create_bivariate_analysis(self, variable): """Análise bivariada entre uma variável e queixas""" if self.df_processed is None: return self._create_placeholder_plot("Carregue os dados primeiro") if variable not in self.df_processed.columns: return self._create_placeholder_plot(f"Variável '{variable}' não encontrada") fig = px.box(self.df_processed, x='Complain', y=variable, color='Complain', title=f'Distribuição de {variable} por Status de Queixa', labels={'Complain': 'Queixa (0=Não, 1=Sim)'}) fig.update_layout(height=400) return fig def _create_placeholder_plot(self, message): """Cria um gráfico placeholder com mensagem""" fig = go.Figure() fig.add_annotation(text=message, xref="paper", yref="paper", x=0.5, y=0.5, xanchor='center', yanchor='middle', showarrow=False, font=dict(size=16)) fig.update_layout(height=400) return fig def train_models(self, selected_features, test_size, use_smote): """Treina os modelos de machine learning""" try: if self.df_processed is None: return "❌ Carregue os dados primeiro!" # Encoding de variáveis categóricas df_temp = self.df_processed.copy() if 'Education' in df_temp.columns: le_education = LabelEncoder() df_temp["Education_Encoded"] = le_education.fit_transform(df_temp["Education"]) marital_dummies = pd.get_dummies(df_temp["Marital_Status"], prefix='Marital') if 'Marital_Status' in df_temp.columns else pd.DataFrame() if not marital_dummies.empty: df_temp = pd.concat([df_temp, marital_dummies], axis=1) # Features base disponíveis available_features = [] possible_features = [ 'Age', 'Income', 'Education_Encoded', 'Total_Children', 'Recency', 'Total_Spent', 'Total_Purchases', 'NumWebVisitsMonth', 'Customer_Years', 'Total_Campaigns_Accepted', 'Average_Purchase_Value' ] + (list(marital_dummies.columns) if not marital_dummies.empty else []) # Filtrar features disponíveis available_features = [f for f in possible_features if f in df_temp.columns] # Adicionar features selecionadas que estão disponíveis features_to_use = [f for f in selected_features if f in df_temp.columns] if not features_to_use: return "❌ Nenhuma feature válida selecionada!" X = df_temp[features_to_use] y = df_temp['Complain'] # Divisão treino-teste X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=test_size/100, random_state=42, stratify=y ) # Normalização scaler = StandardScaler() X_train_scaled = scaler.fit_transform(X_train) X_test_scaled = scaler.transform(X_test) # Armazenar para uso posterior self.X_test_scaled = X_test_scaled self.y_test = y_test self.scaler = scaler self.features_to_use = features_to_use # Balanceamento com SMOTE if use_smote: smote = SMOTE(random_state=42) X_train_balanced, y_train_balanced = smote.fit_resample(X_train_scaled, y_train) else: X_train_balanced, y_train_balanced = X_train_scaled, y_train # Modelos models = { 'Regressão Logística': LogisticRegression(random_state=42, max_iter=1000), 'Árvore de Decisão': DecisionTreeClassifier(random_state=42, max_depth=10), 'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42, max_depth=15), 'Gradient Boosting': GradientBoostingClassifier(n_estimators=100, random_state=42, max_depth=5) } results = [] self.predictions = {} for name, model in models.items(): model.fit(X_train_balanced, y_train_balanced) y_pred = model.predict(X_test_scaled) y_proba = model.predict_proba(X_test_scaled)[:, 1] if hasattr(model, 'predict_proba') else None accuracy = (y_pred == y_test).mean() # Calcular precisão if sum(y_pred) > 0: precision = (y_test[y_pred == 1] == 1).mean() else: precision = 0 # Calcular recall if sum(y_test) > 0: recall = (y_pred[y_test == 1] == 1).mean() else: recall = 0 # Calcular F1-Score if (precision + recall) > 0: f1 = 2 * (precision * recall) / (precision + recall) else: f1 = 0 # Calcular ROC-AUC if y_proba is not None: roc_auc = roc_auc_score(y_test, y_proba) else: roc_auc = 0 results.append({ 'Modelo': name, 'Acurácia': round(accuracy, 4), 'Precisão': round(precision, 4), 'Recall': round(recall, 4), 'F1-Score': round(f1, 4), 'ROC-AUC': round(roc_auc, 4) }) self.predictions[name] = (y_pred, y_proba) self.models[name] = model # Importância das features (para modelos que suportam) if hasattr(model, 'feature_importances_'): self.feature_importance = pd.DataFrame({ 'Feature': features_to_use, 'Importance': model.feature_importances_ }).sort_values('Importance', ascending=False) self.results_df = pd.DataFrame(results) self.is_trained = True best_model = self.results_df.loc[self.results_df['ROC-AUC'].idxmax()] return f"✅ Modelos treinados com sucesso! Melhor modelo: {best_model['Modelo']} (AUC: {best_model['ROC-AUC']:.3f})" except Exception as e: return f"❌ Erro no treinamento: {str(e)}" def create_model_comparison(self): """Cria gráfico de comparação de modelos""" if self.results_df is None: return self._create_placeholder_plot("Treine os modelos primeiro") fig = go.Figure() metrics = ['Acurácia', 'Precisão', 'Recall', 'F1-Score', 'ROC-AUC'] colors = ['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd'] for i, metric in enumerate(metrics): fig.add_trace(go.Bar( name=metric, x=self.results_df['Modelo'], y=self.results_df[metric], marker_color=colors[i], text=self.results_df[metric].round(3), textposition='auto' )) fig.update_layout( title="Comparação de Desempenho dos Modelos", xaxis_title="Modelos", yaxis_title="Score", barmode='group', height=500, font=dict(size=12) ) return fig def create_feature_importance(self): """Cria gráfico de importância das features""" if self.feature_importance is None: return self._create_placeholder_plot("Treine os modelos primeiro para ver a importância das features") top_features = self.feature_importance.head(15) fig = px.bar(top_features, x='Importance', y='Feature', orientation='h', title='Top 15 Features Mais Importantes', color='Importance', color_continuous_scale='Blues') fig.update_layout( height=500, yaxis={'categoryorder':'total ascending'}, font=dict(size=12) ) return fig def create_roc_curves(self): """Cria curvas ROC para todos os modelos""" if not self.is_trained or not hasattr(self, 'predictions'): return self._create_placeholder_plot("Treine os modelos primeiro para ver as curvas ROC") fig = go.Figure() # Adicionar linha de referência fig.add_trace(go.Scatter( x=[0, 1], y=[0, 1], mode='lines', line=dict(dash='dash', color='gray'), name='Classificador Aleatório' )) # Para cada modelo, adicionar curva ROC for name, (_, y_proba) in self.predictions.items(): if y_proba is not None: fpr, tpr, _ = roc_curve(self.y_test, y_proba) roc_auc = roc_auc_score(self.y_test, y_proba) fig.add_trace(go.Scatter( x=fpr, y=tpr, mode='lines', name=f'{name} (AUC = {roc_auc:.3f})', line=dict(width=2) )) fig.update_layout( title='Curvas ROC - Comparação de Modelos', xaxis_title='Taxa de Falsos Positivos', yaxis_title='Taxa de Verdadeiros Positivos', height=500, font=dict(size=12) ) return fig # Instanciar o analisador analyzer = ConsumerComplaintAnalyzer() # Interface Gradio def create_analysis_interface(): with gr.Blocks(theme=gr.themes.Soft(), title="Análise de Queixas de Consumidores") as demo: gr.Markdown("# 🎯 Análise Preditiva de Queixas de Consumidores") gr.Markdown("**PPCA/UnB - AEDI - Tarefa 6**") with gr.Tabs(): with gr.TabItem("📊 Carregamento de Dados"): with gr.Row(): with gr.Column(): file_input = gr.File(label="Upload do Dataset (CSV)", file_types=[".csv"]) load_btn = gr.Button("🚀 Carregar e Pré-processar Dados") load_status = gr.Textbox(label="Status", interactive=False) with gr.Column(): data_preview = gr.Dataframe(label="Pré-visualização dos Dados (Primeiras 5 linhas)") stats_display = gr.JSON(label="Estatísticas do Dataset") # Conectar eventos file_input.change( fn=analyzer.load_data_preview, inputs=[file_input], outputs=[data_preview, stats_display] ) load_btn.click( fn=analyzer.load_and_preprocess_data, inputs=[file_input], outputs=[load_status] ) with gr.TabItem("📈 Análise Exploratória"): with gr.Row(): with gr.Column(): gr.Markdown("### Distribuição de Queixas") dist_plot = gr.Plot() update_dist_btn = gr.Button("🔄 Atualizar Distribuição") with gr.Column(): gr.Markdown("### Análise de Correlação") corr_vars = gr.CheckboxGroup( choices=['Age', 'Income', 'Total_Spent', 'Total_Purchases', 'Customer_Years', 'Total_Children', 'Average_Purchase_Value', 'Recency', 'NumWebVisitsMonth', 'Total_Campaigns_Accepted'], label="Selecione variáveis para análise de correlação:", value=['Age', 'Income', 'Total_Spent', 'Total_Purchases', 'Customer_Years', 'Total_Children', 'Average_Purchase_Value', 'Complain'] ) corr_plot = gr.Plot() update_corr_btn = gr.Button("🔄 Atualizar Correlação") with gr.Row(): with gr.Column(): gr.Markdown("### Análise Bivariada") bivariate_var = gr.Dropdown( choices=['Age', 'Income', 'Total_Spent', 'Total_Purchases', 'Customer_Years', 'Total_Children', 'Average_Purchase_Value', 'Recency', 'NumWebVisitsMonth'], label="Selecione uma variável para análise:", value='Income' ) bivariate_plot = gr.Plot() update_bivariate_btn = gr.Button("🔄 Atualizar Análise") # Conectar botões update_dist_btn.click( fn=analyzer.create_complaint_distribution, outputs=[dist_plot] ) update_corr_btn.click( fn=analyzer.create_correlation_heatmap, inputs=[corr_vars], outputs=[corr_plot] ) update_bivariate_btn.click( fn=analyzer.create_bivariate_analysis, inputs=[bivariate_var], outputs=[bivariate_plot] ) with gr.TabItem("🤖 Modelagem Preditiva"): with gr.Row(): with gr.Column(): gr.Markdown("### Configuração do Modelo") feature_selection = gr.CheckboxGroup( choices=['Age', 'Income', 'Total_Spent', 'Total_Purchases', 'Customer_Years', 'Total_Children', 'Average_Purchase_Value', 'Recency', 'NumWebVisitsMonth', 'Total_Campaigns_Accepted'], label="Selecione features para o modelo:", value=['Age', 'Income', 'Total_Spent', 'Total_Purchases', 'Customer_Years', 'Total_Children', 'Average_Purchase_Value', 'Recency', 'NumWebVisitsMonth', 'Total_Campaigns_Accepted'] ) test_size = gr.Slider(10, 40, value=20, label="Tamanho do conjunto de teste (%)") use_smote = gr.Checkbox(value=True, label="Usar SMOTE para balanceamento de dados") train_btn = gr.Button("🎯 Treinar Modelos", variant="primary") train_status = gr.Textbox(label="Status do Treinamento") with gr.Column(): gr.Markdown("### Resultados dos Modelos") model_results = gr.Dataframe(label="Métricas de Desempenho") model_comparison = gr.Plot(label="Comparação Visual dos Modelos") with gr.Row(): with gr.Column(): gr.Markdown("### Importância das Features") feature_importance_plot = gr.Plot() with gr.Column(): gr.Markdown("### Curvas ROC") roc_plot = gr.Plot() # Conectar botão de treinamento train_btn.click( fn=analyzer.train_models, inputs=[feature_selection, test_size, use_smote], outputs=[train_status] ) train_btn.click( fn=lambda: analyzer.results_df if analyzer.is_trained else pd.DataFrame(), outputs=[model_results] ) train_btn.click( fn=analyzer.create_model_comparison, outputs=[model_comparison] ) train_btn.click( fn=analyzer.create_feature_importance, outputs=[feature_importance_plot] ) train_btn.click( fn=analyzer.create_roc_curves, outputs=[roc_plot] ) with gr.TabItem("📋 Relatório e Insights"): with gr.Row(): with gr.Column(): gr.Markdown("### Insights da Análise") insights_text = gr.Textbox( label="Principais Descobertas", lines=12, interactive=False ) generate_insights_btn = gr.Button("💡 Gerar Insights", variant="secondary") with gr.Column(): gr.Markdown("### Recomendações Estratégicas") recommendations = gr.Textbox( label="Ações Recomendadas", lines=12, interactive=False ) generate_recommendations_btn = gr.Button("🎯 Gerar Recomendações", variant="secondary") def generate_insights(): if analyzer.df is None: return "📊 Carregue os dados primeiro para gerar insights." total_customers = len(analyzer.df) complaint_rate = analyzer.df['Complain'].mean() * 100 insights = f"""📊 **INSIGHTS DA ANÁLISE:** • **Total de Clientes Analisados:** {total_customers:,} • **Taxa de Queixas:** {complaint_rate:.2f}% • **Problema de Classificação:** {'DESBALANCEADO' if complaint_rate < 5 else 'Balanceado'} • **Complexidade:** {'Alta (múltiplos fatores)' if complaint_rate > 1 else 'Média'} 🔍 **PADRÕES IDENTIFICADOS:**""" if analyzer.feature_importance is not None: top_features = analyzer.feature_importance.head(3)['Feature'].tolist() insights += f"\n• **Variáveis Mais Importantes:** {', '.join(top_features)}" if analyzer.results_df is not None: best_model = analyzer.results_df.loc[analyzer.results_df['ROC-AUC'].idxmax()] insights += f"\n• **Melhor Modelo:** {best_model['Modelo']}" insights += f"\n• **Desempenho (AUC):** {best_model['ROC-AUC']:.3f}" insights += "\n\n📈 **INTERPRETAÇÃO TÉCNICA:**" insights += "\n• AUC > 0.7: Bom poder preditivo" insights += "\n• AUC > 0.8: Excelente poder preditivo" insights += "\n• Features importantes indicam padrões comportamentais relevantes" insights += "\n• Modelos ensemble geralmente performam melhor em dados complexos" return insights def generate_recommendations(): return """🎯 **RECOMENDAÇÕES ESTRATÉGICAS:** 🚨 **AÇÕES IMEDIATAS (0-30 dias):** • Implementar sistema de alerta precoce para clientes de alto risco • Criar segmentação baseada nas variáveis mais importantes identificadas • Desenvolver campanhas proativas direcionadas a grupos específicos • Estabelecer protocolo de contato proativo para clientes com alta probabilidade de queixa 📈 **OTIMIZAÇÕES DE LONGO PRAZO (30-90 dias):** • Integrar modelo preditivo ao sistema de CRM existente • Implementar dashboard de monitoramento em tempo real • Desenvolver programa de treinamento para equipes de atendimento • Criar fluxo de trabalho automatizado para casos de alto risco 💡 **SUGESTÕES OPERACIONAIS ESPECÍFICAS:** • Monitorar continuamente clientes com alta recência de compra • Acompanhar mudanças abruptas no padrão de gastos dos clientes • Implementar programas de fidelidade segmentados por perfil de risco • Estabelecer métricas de satisfação pós-atendimento 📊 **MÉTRICAS DE SUCESSO:** • Redução de 20% na taxa de queixas em 6 meses • Aumento de 15% na satisfação do cliente (NPS) • Melhoria de 30% no tempo de resposta a queixas • Redução de 25% nos custos com resolução reativa de problemas 🔧 **IMPLEMENTAÇÃO TÉCNICA:** • Revisar e atualizar modelos trimestralmente • Validar performance com novos dados • Expandir análise para outros indicadores (churn, lifetime value) • Incorporar feedback dos clientes no refinamento do modelo""" generate_insights_btn.click( fn=generate_insights, outputs=[insights_text] ) generate_recommendations_btn.click( fn=generate_recommendations, outputs=[recommendations] ) gr.Markdown("---") gr.Markdown("**Desenvolvido para PPCA/UnB - AEDI - Tarefa 6** | *Análise Preditiva de Queixas de Consumidores*") return demo # Criar e lançar a interface if __name__ == "__main__": demo = create_analysis_interface() demo.launch(share=True)