import os import io import csv import json import base64 import tempfile import traceback import math import statistics from collections import Counter from flask import Blueprint, request, jsonify, send_file, current_app import chardet preprocessing_bp = Blueprint('preprocessing', __name__) # Diretório temporário para armazenar arquivos TEMP_DIR = tempfile.gettempdir() CURRENT_FILE = None PROCESSED_FILE = None DATASET_INFO = None def detect_encoding(file_path): """Detecta o encoding do arquivo de forma robusta""" try: with open(file_path, 'rb') as f: raw_data = f.read(100000) result = chardet.detect(raw_data) encoding = result['encoding'] confidence = result['confidence'] encodings_to_try = [] if confidence and confidence > 0.7: encodings_to_try.append(encoding) encodings_to_try.extend(['utf-8', 'latin-1', 'iso-8859-1', 'cp1252', 'utf-16']) seen = set() unique_encodings = [] for enc in encodings_to_try: if enc and enc not in seen: seen.add(enc) unique_encodings.append(enc) for enc in unique_encodings: try: with open(file_path, 'r', encoding=enc, errors='replace') as test_f: sample = test_f.read(5000) replacement_ratio = sample.count('�') / len(sample) if sample else 1 if replacement_ratio < 0.1: return enc except: continue return 'utf-8' except: return 'utf-8' def detect_delimiter(file_path, encoding): """Detecta o delimitador do arquivo CSV de forma inteligente""" try: with open(file_path, 'r', encoding=encoding, errors='replace') as f: lines = [] for i, line in enumerate(f): if i >= 10: break lines.append(line.strip()) if not lines: return ',' delimiters = [';', ',', '\t', '|', ':', ' '] delimiter_scores = {} for delimiter in delimiters: scores = [] for line in lines: if line: parts = line.split(delimiter) scores.append(len(parts)) if scores: avg_parts = statistics.mean(scores) if len(scores) > 1: std_dev = statistics.stdev(scores) consistency = 1 / (1 + std_dev) else: consistency = 1 final_score = avg_parts * consistency delimiter_scores[delimiter] = final_score if delimiter_scores: best_delimiter = max(delimiter_scores, key=delimiter_scores.get) if delimiter_scores[best_delimiter] >= 2: return best_delimiter return ',' except: return ',' def safe_convert_numeric(value): """Converte valor para numérico de forma segura""" if not value or not str(value).strip(): return None try: clean_value = str(value).strip().replace(',', '.') import re clean_value = re.sub(r'[^\d\.\-]', '', clean_value) if not clean_value or clean_value == '.' or clean_value == '-': return None return float(clean_value) except: return None def analyze_column_type_advanced(values): """Analisa o tipo de uma coluna de forma mais inteligente""" non_empty_values = [v for v in values if v and str(v).strip()] if not non_empty_values: return 'object' numeric_count = 0 integer_count = 0 date_like_count = 0 boolean_count = 0 for value in non_empty_values: str_value = str(value).strip().lower() if str_value in ['true', 'false', 'yes', 'no', 'sim', 'não', '1', '0', 'y', 'n', 's']: boolean_count += 1 continue numeric_val = safe_convert_numeric(value) if numeric_val is not None: numeric_count += 1 if numeric_val.is_integer(): integer_count += 1 continue import re date_patterns = [ r'\d{1,2}[/-]\d{1,2}[/-]\d{2,4}', r'\d{4}[/-]\d{1,2}[/-]\d{1,2}', r'\d{1,2}[/-]\d{1,2}[/-]\d{2}', ] for pattern in date_patterns: if re.match(pattern, str_value): date_like_count += 1 break total_values = len(non_empty_values) numeric_ratio = numeric_count / total_values boolean_ratio = boolean_count / total_values date_ratio = date_like_count / total_values if boolean_ratio > 0.8: return 'boolean' if date_ratio > 0.8: return 'datetime' if numeric_ratio > 0.8: if integer_count == numeric_count: return 'int64' else: return 'float64' unique_ratio = len(set(non_empty_values)) / total_values if unique_ratio < 0.5 and len(set(non_empty_values)) < 20: return 'category' return 'object' def read_dataset_file_enhanced(file_path): """Lê um arquivo de dataset com detecção automática super robusta""" try: file_ext = os.path.splitext(file_path)[1].lower() if file_ext in ['.xlsx', '.xls']: return {'success': False, 'error': 'Suporte a Excel não disponível. Por favor, converta para CSV.'} encoding = detect_encoding(file_path) delimiter = detect_delimiter(file_path, encoding) print(f"Arquivo: {os.path.basename(file_path)}") print(f"Encoding detectado: {encoding}") print(f"Delimitador detectado: '{delimiter}'") with open(file_path, 'r', encoding=encoding, errors='replace') as f: first_line = f.readline().strip() if not first_line: return {'success': False, 'error': 'Arquivo vazio ou sem conteúdo válido'} headers = [h.strip().strip('"').strip("'") for h in first_line.split(delimiter)] if headers and headers[0].startswith('\ufeff'): headers[0] = headers[0][1:] cleaned_headers = [] for i, header in enumerate(headers): if not header or header.isspace(): cleaned_headers.append(f'Column_{i+1}') else: import re clean_header = re.sub(r'[^\w\s\-_]', '', header) clean_header = clean_header.strip() if not clean_header: clean_header = f'Column_{i+1}' cleaned_headers.append(clean_header) headers = cleaned_headers f.seek(0) reader = csv.reader(f, delimiter=delimiter) next(reader) rows = [] max_rows = 50000 for row_num, row in enumerate(reader): if row_num >= max_rows: break if not any(cell.strip() for cell in row): continue while len(row) < len(headers): row.append('') if len(row) > len(headers): row = row[:len(headers)] rows.append(row) if not rows: return {'success': False, 'error': 'Nenhuma linha de dados válida encontrada'} print(f"Linhas lidas: {len(rows)}") print(f"Colunas: {len(headers)}") dtypes = {} missing_values = {} unique_values = {} sample_values = {} for i, col in enumerate(headers): col_values = [row[i] if i < len(row) else '' for row in rows] missing_count = sum(1 for v in col_values if not v or not str(v).strip()) missing_values[col] = missing_count non_empty_values = [v for v in col_values if v and str(v).strip()] unique_count = len(set(non_empty_values)) unique_values[col] = unique_count sample_values[col] = non_empty_values[:5] if non_empty_values else [] dtypes[col] = analyze_column_type_advanced(col_values) seen = set() duplicates = 0 for row in rows: row_tuple = tuple(row) if row_tuple in seen: duplicates += 1 else: seen.add(row_tuple) numeric_columns = [col for col, dtype in dtypes.items() if dtype in ('int64', 'float64')] categorical_columns = [col for col, dtype in dtypes.items() if dtype in ('object', 'category')] boolean_columns = [col for col, dtype in dtypes.items() if dtype == 'boolean'] datetime_columns = [col for col, dtype in dtypes.items() if dtype == 'datetime'] total_cells = len(rows) * len(headers) total_missing = sum(missing_values.values()) data_quality = { 'completeness': ((total_cells - total_missing) / total_cells * 100) if total_cells > 0 else 0, 'uniqueness': (len(rows) - duplicates) / len(rows) * 100 if len(rows) > 0 else 0, 'consistency': 100 } return { 'success': True, 'info': { 'filename': os.path.basename(file_path), 'shape': (len(rows), len(headers)), 'columns': headers, 'dtypes': dtypes, 'missing_values': missing_values, 'unique_values': unique_values, 'sample_values': sample_values, 'duplicates': duplicates, 'numeric_columns': numeric_columns, 'categorical_columns': categorical_columns, 'boolean_columns': boolean_columns, 'datetime_columns': datetime_columns, 'data_quality': data_quality, 'encoding_used': encoding, 'delimiter_used': delimiter } } except Exception as e: print(f"Erro ao ler arquivo: {str(e)}") traceback.print_exc() return {'success': False, 'error': f"Erro ao ler arquivo: {str(e)}"} @preprocessing_bp.route('/upload', methods=['POST']) def upload_file(): """Endpoint para upload de arquivo melhorado""" global CURRENT_FILE, DATASET_INFO try: if 'file' not in request.files: return jsonify({'success': False, 'error': 'Nenhum arquivo enviado'}) file = request.files['file'] if file.filename == '': return jsonify({'success': False, 'error': 'Nome de arquivo vazio'}) allowed_extensions = ['.csv', '.txt', '.tsv'] file_ext = os.path.splitext(file.filename)[1].lower() if file_ext not in allowed_extensions: return jsonify({ 'success': False, 'error': f'Tipo de arquivo não suportado. Use: {", ".join(allowed_extensions)}' }) file.seek(0, 2) file_size = file.tell() file.seek(0) max_size = 50 * 1024 * 1024 if file_size > max_size: return jsonify({ 'success': False, 'error': f'Arquivo muito grande. Tamanho máximo: 50MB' }) safe_filename = "".join(c for c in file.filename if c.isalnum() or c in (' ', '.', '_', '-')).rstrip() file_path = os.path.join(TEMP_DIR, f"dataset_{safe_filename}") file.save(file_path) CURRENT_FILE = file_path result = read_dataset_file_enhanced(file_path) if result['success']: DATASET_INFO = result['info'] print(f"Dataset carregado com sucesso: {DATASET_INFO['shape']}") return jsonify(result) except Exception as e: print(f"Erro no upload: {str(e)}") traceback.print_exc() return jsonify({'success': False, 'error': f"Erro no upload do arquivo: {str(e)}"}) @preprocessing_bp.route('/analyze', methods=['POST']) def analyze_dataset(): """Endpoint para análise inicial do dataset melhorada""" global CURRENT_FILE, DATASET_INFO try: if not CURRENT_FILE or not os.path.exists(CURRENT_FILE): return jsonify({'success': False, 'error': 'Nenhum arquivo carregado'}) data = request.json target_column = data.get('target_column') if not target_column: return jsonify({'success': False, 'error': 'Coluna target não especificada'}) if not DATASET_INFO: return jsonify({'success': False, 'error': 'Informações do dataset não disponíveis'}) if target_column not in DATASET_INFO['columns']: return jsonify({'success': False, 'error': f'Coluna {target_column} não encontrada'}) encoding = DATASET_INFO.get('encoding_used', 'utf-8') delimiter = DATASET_INFO.get('delimiter_used', ',') with open(CURRENT_FILE, 'r', encoding=encoding, errors='replace') as f: reader = csv.reader(f, delimiter=delimiter) headers = next(reader) headers = [h.strip().strip('"').strip("'") for h in headers] if headers and headers[0].startswith('\ufeff'): headers[0] = headers[0][1:] try: target_idx = headers.index(target_column) except ValueError: return jsonify({'success': False, 'error': f'Coluna {target_column} não encontrada'}) rows = list(reader) target_values = [] for row in rows: if target_idx < len(row) and row[target_idx].strip(): target_values.append(row[target_idx].strip()) target_classes = Counter(target_values) total_target_values = len(target_values) missing_target = len(rows) - total_target_values if target_classes: class_counts = list(target_classes.values()) min_class = min(class_counts) max_class = max(class_counts) balance_ratio = min_class / max_class if max_class > 0 else 1 needs_balancing = balance_ratio < 0.8 else: balance_ratio = 1 needs_balancing = False ml_readiness = { 'target_quality': 'good' if missing_target == 0 else 'needs_attention', 'class_balance': 'good' if balance_ratio > 0.8 else 'imbalanced', 'data_completeness': DATASET_INFO['data_quality']['completeness'], 'recommendation': [] } if missing_target > 0: ml_readiness['recommendation'].append(f'Remover {missing_target} linhas com target ausente') if needs_balancing: ml_readiness['recommendation'].append('Considerar balanceamento de classes') if DATASET_INFO['data_quality']['completeness'] < 90: ml_readiness['recommendation'].append('Tratar valores ausentes nas features') total_missing = sum(DATASET_INFO['missing_values'].values()) total_cells = DATASET_INFO['shape'][0] * DATASET_INFO['shape'][1] missing_percentage = (total_missing / total_cells) * 100 if total_cells > 0 else 0 return jsonify({ 'success': True, 'analysis': { 'target_column': target_column, 'target_type': DATASET_INFO['dtypes'].get(target_column, 'object'), 'target_classes': dict(target_classes), 'target_balance_ratio': balance_ratio, 'missing_target_values': missing_target, 'total_missing_percentage': missing_percentage, 'duplicates_count': DATASET_INFO['duplicates'], 'numeric_columns_count': len(DATASET_INFO['numeric_columns']), 'categorical_columns_count': len(DATASET_INFO['categorical_columns']), 'boolean_columns_count': len(DATASET_INFO['boolean_columns']), 'datetime_columns_count': len(DATASET_INFO['datetime_columns']), 'dataset_shape': DATASET_INFO['shape'], 'data_quality': DATASET_INFO['data_quality'], 'ml_readiness': ml_readiness, 'needs_balancing': needs_balancing } }) except Exception as e: print(f"Erro na análise: {str(e)}") traceback.print_exc() return jsonify({'success': False, 'error': f"Erro na análise: {str(e)}"}) @preprocessing_bp.route('/process', methods=['POST']) def process_dataset(): """Endpoint para processamento completo do dataset com configurações""" global CURRENT_FILE, PROCESSED_FILE, DATASET_INFO try: if not CURRENT_FILE or not os.path.exists(CURRENT_FILE): return jsonify({'success': False, 'error': 'Nenhum arquivo carregado'}) data = request.json target_column = data.get('target_column') config = data.get('config', {}) if not target_column: return jsonify({'success': False, 'error': 'Coluna target não especificada'}) print(f"Configurações recebidas: {config}") encoding = DATASET_INFO.get('encoding_used', 'utf-8') delimiter = DATASET_INFO.get('delimiter_used', ',') with open(CURRENT_FILE, 'r', encoding=encoding, errors='replace') as f: reader = csv.reader(f, delimiter=delimiter) headers = next(reader) headers = [h.strip().strip('"').strip("'") for h in headers] if headers and headers[0].startswith('\ufeff'): headers[0] = headers[0][1:] rows = list(reader) try: target_idx = headers.index(target_column) except ValueError: return jsonify({'success': False, 'error': f'Coluna {target_column} não encontrada'}) processing_stats = { 'original_rows': len(rows), 'original_columns': len(headers), 'missing_values_treated': 0, 'duplicates_removed': 0, 'outliers_removed': 0, 'categorical_encoded': 0, 'boolean_encoded': 0, 'normalized_columns': 0, 'balanced_samples': 0 } print(f"Iniciando processamento: {len(rows)} linhas, {len(headers)} colunas") # 1. Remove linhas com target ausente valid_rows = [] for row in rows: if target_idx < len(row) and row[target_idx].strip(): valid_rows.append(row) processing_stats['missing_target_removed'] = len(rows) - len(valid_rows) print(f"Após remoção de target ausente: {len(valid_rows)} linhas") # 2. Remove duplicatas (se configurado) if config.get('remove_duplicates', True): seen = set() unique_rows = [] for row in valid_rows: row_tuple = tuple(row) if row_tuple not in seen: seen.add(row_tuple) unique_rows.append(row) else: processing_stats['duplicates_removed'] += 1 valid_rows = unique_rows print(f"Após remoção de duplicatas: {len(valid_rows)} linhas") # 3. Trata valores ausentes (se configurado) if config.get('treat_missing', True): for i, header in enumerate(headers): if i == target_idx: continue col_type = DATASET_INFO['dtypes'].get(header, 'object') col_values = [row[i] if i < len(row) else '' for row in valid_rows] non_empty_values = [v for v in col_values if v and str(v).strip()] if not non_empty_values: continue if col_type in ('int64', 'float64'): numeric_values = [safe_convert_numeric(v) for v in non_empty_values] numeric_values = [v for v in numeric_values if v is not None] if numeric_values: fill_value = statistics.median(numeric_values) for row in valid_rows: if i >= len(row) or not row[i] or not str(row[i]).strip(): if i >= len(row): row.extend([''] * (i - len(row) + 1)) row[i] = str(fill_value) processing_stats['missing_values_treated'] += 1 else: value_counts = Counter(non_empty_values) fill_value = value_counts.most_common(1)[0][0] for row in valid_rows: if i >= len(row) or not row[i] or not str(row[i]).strip(): if i >= len(row): row.extend([''] * (i - len(row) + 1)) row[i] = fill_value processing_stats['missing_values_treated'] += 1 print(f"Valores ausentes tratados: {processing_stats['missing_values_treated']}") # 4. Codifica variáveis categóricas (se configurado) categorical_mappings = {} if config.get('encode_categories', True): for i, header in enumerate(headers): if i == target_idx: continue col_type = DATASET_INFO['dtypes'].get(header, 'object') if col_type == 'boolean': col_values = [row[i] if i < len(row) else '' for row in valid_rows] unique_vals = list(set(v for v in col_values if v and str(v).strip())) if len(unique_vals) <= 10: mapping = {} for val in unique_vals: val_lower = str(val).lower() if val_lower in ['true', 'yes', 'sim', '1', 'y', 's']: mapping[val] = '1' else: mapping[val] = '0' categorical_mappings[header] = mapping for row in valid_rows: if i < len(row) and row[i] in mapping: row[i] = mapping[row[i]] processing_stats['boolean_encoded'] += 1 elif col_type in ('object', 'category'): col_values = [row[i] if i < len(row) else '' for row in valid_rows] unique_vals = list(set(v for v in col_values if v and str(v).strip())) if len(unique_vals) <= 100: mapping = {val: str(idx) for idx, val in enumerate(sorted(unique_vals))} categorical_mappings[header] = mapping for row in valid_rows: if i < len(row) and row[i] in mapping: row[i] = mapping[row[i]] processing_stats['categorical_encoded'] += 1 print(f"Variáveis codificadas: {processing_stats['categorical_encoded'] + processing_stats['boolean_encoded']}") # 5. Normalização (se configurado) if config.get('normalize_data', False): for i, header in enumerate(headers): if i == target_idx: continue col_type = DATASET_INFO['dtypes'].get(header, 'object') if col_type in ('int64', 'float64'): col_values = [safe_convert_numeric(row[i]) for row in valid_rows] col_values = [v for v in col_values if v is not None] if len(col_values) > 1: mean_val = statistics.mean(col_values) std_val = statistics.stdev(col_values) if std_val > 0: for row in valid_rows: if i < len(row): val = safe_convert_numeric(row[i]) if val is not None: normalized = (val - mean_val) / std_val row[i] = str(normalized) processing_stats['normalized_columns'] += 1 # 6. Codifica a coluna target target_values = [row[target_idx] for row in valid_rows] unique_targets = list(set(target_values)) target_mapping = {val: str(idx) for idx, val in enumerate(sorted(unique_targets))} for row in valid_rows: if row[target_idx] in target_mapping: row[target_idx] = target_mapping[row[target_idx]] # 7. Seleção de features (se configurado) if config.get('select_features', True): feature_indices = [i for i in range(len(headers)) if i != target_idx] if len(feature_indices) > 2: num_features_percent = config.get('num_features_percent', 50) num_features_to_keep = max(1, int(len(feature_indices) * num_features_percent / 100)) # Simula seleção baseada em importância feature_scores = [] for idx in feature_indices: col_values = [safe_convert_numeric(row[idx]) for row in valid_rows] col_values = [v for v in col_values if v is not None] if col_values and len(set(col_values)) > 1: variance = statistics.variance(col_values) if len(col_values) > 1 else 0 score = variance + len(set(col_values)) / len(col_values) else: score = 0 feature_scores.append((idx, score)) feature_scores.sort(key=lambda x: x[1], reverse=True) selected_features = [idx for idx, score in feature_scores[:num_features_to_keep]] else: selected_features = feature_indices else: selected_features = [i for i in range(len(headers)) if i != target_idx] # 8. Balanceamento de classes (se configurado) if config.get('balance_classes', False): target_values = [row[target_idx] for row in valid_rows] target_counts = Counter(target_values) if len(target_counts) > 1: max_count = max(target_counts.values()) balanced_rows = [] for target_class in target_counts: class_rows = [row for row in valid_rows if row[target_idx] == target_class] current_count = len(class_rows) balanced_rows.extend(class_rows) if current_count < max_count: needed = max_count - current_count for _ in range(needed): import random random_row = random.choice(class_rows).copy() balanced_rows.append(random_row) processing_stats['balanced_samples'] = len(balanced_rows) - len(valid_rows) valid_rows = balanced_rows # Cria headers finais final_headers = [headers[i] for i in selected_features] + [headers[target_idx]] final_indices = selected_features + [target_idx] # Cria dataset final final_rows = [] for row in valid_rows: final_row = [row[i] if i < len(row) else '' for i in final_indices] final_rows.append(final_row) print(f"Dataset final: {len(final_rows)} linhas, {len(final_headers)} colunas") # Salva arquivo processado processed_filename = f"processed_{os.path.basename(CURRENT_FILE)}" processed_path = os.path.join(TEMP_DIR, processed_filename) with open(processed_path, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) writer.writerow(final_headers) writer.writerows(final_rows) PROCESSED_FILE = processed_path # Estatísticas finais processing_stats.update({ 'final_rows': len(final_rows), 'final_columns': len(final_headers), 'features_selected': len(selected_features), 'target_mapping': target_mapping, 'categorical_mappings': categorical_mappings, 'improvement_ratio': len(final_rows) / len(rows) if len(rows) > 0 else 0 }) return jsonify({ 'success': True, 'processing_stats': processing_stats, 'final_shape': (len(final_rows), len(final_headers)), 'final_columns': final_headers, 'data_quality_improvement': { 'completeness_before': DATASET_INFO['data_quality']['completeness'], 'completeness_after': 100, 'duplicates_removed': processing_stats['duplicates_removed'], 'features_optimized': len([i for i in range(len(headers)) if i != target_idx]) - len(selected_features) } }) except Exception as e: print(f"Erro no processamento: {str(e)}") traceback.print_exc() return jsonify({'success': False, 'error': f"Erro no processamento: {str(e)}"}) @preprocessing_bp.route('/download', methods=['GET']) def download_processed(): """Endpoint para download do arquivo processado""" global PROCESSED_FILE try: if not PROCESSED_FILE or not os.path.exists(PROCESSED_FILE): return jsonify({'success': False, 'error': 'Nenhum arquivo processado disponível'}) return send_file( PROCESSED_FILE, as_attachment=True, download_name=f"processed_dataset.csv", mimetype='text/csv' ) except Exception as e: print(f"Erro no download: {str(e)}") return jsonify({'success': False, 'error': f"Erro no download: {str(e)}"}) @preprocessing_bp.route('/clear', methods=['POST']) def clear_session(): """Endpoint para limpar sessão""" global CURRENT_FILE, PROCESSED_FILE, DATASET_INFO try: if CURRENT_FILE and os.path.exists(CURRENT_FILE): os.remove(CURRENT_FILE) if PROCESSED_FILE and os.path.exists(PROCESSED_FILE): os.remove(PROCESSED_FILE) CURRENT_FILE = None PROCESSED_FILE = None DATASET_INFO = None return jsonify({'success': True, 'message': 'Sessão limpa com sucesso'}) except Exception as e: print(f"Erro ao limpar sessão: {str(e)}") return jsonify({'success': False, 'error': f"Erro ao limpar sessão: {str(e)}"}) def create_simple_plot(plot_type, title): """Cria um gráfico simples simulado""" import matplotlib matplotlib.use('Agg') # Backend não-interativo import matplotlib.pyplot as plt import numpy as np try: fig, ax = plt.subplots(figsize=(10, 6)) if plot_type == 'correlation': # Simula matriz de correlação data = np.random.rand(5, 5) im = ax.imshow(data, cmap='coolwarm', aspect='auto') ax.set_title(title) plt.colorbar(im) elif plot_type == 'pca': # Simula PCA 2D np.random.seed(42) x = np.random.randn(100) y = np.random.randn(100) colors = np.random.choice(['red', 'blue', 'green'], 100) ax.scatter(x, y, c=colors, alpha=0.6) ax.set_xlabel('PC1 (52.3%)') ax.set_ylabel('PC2 (28.7%)') ax.set_title(title) elif plot_type == 'outliers': # Simula detecção de outliers np.random.seed(42) normal_data = np.random.randn(200, 2) outliers = np.random.randn(20, 2) * 3 + 5 ax.scatter(normal_data[:, 0], normal_data[:, 1], c='blue', alpha=0.6, label='Normal') ax.scatter(outliers[:, 0], outliers[:, 1], c='red', alpha=0.8, label='Outliers') ax.set_title(title) ax.legend() else: # Gráfico genérico x = np.linspace(0, 10, 100) y = np.sin(x) + np.random.randn(100) * 0.1 ax.plot(x, y) ax.set_title(title) # Salva em buffer buffer = io.BytesIO() plt.savefig(buffer, format='png', dpi=150, bbox_inches='tight') buffer.seek(0) # Converte para base64 import base64 plot_data = base64.b64encode(buffer.getvalue()).decode('utf-8') plt.close(fig) return f"data:image/png;base64,{plot_data}" except Exception as e: print(f"Erro ao criar gráfico: {str(e)}") # Retorna placeholder se der erro return f"data:image/png;base64,{base64.b64encode(b'Plot Error').decode('utf-8')}" def perform_pca(target_column: str, n_components: int): """Executa PCA real baseado na coluna target""" import pandas as pd from sklearn.preprocessing import StandardScaler from sklearn.decomposition import PCA encoding = DATASET_INFO.get('encoding_used', 'utf-8') delimiter = DATASET_INFO.get('delimiter_used', ',') df = pd.read_csv(CURRENT_FILE, delimiter=delimiter, encoding=encoding) if target_column not in df.columns: raise ValueError(f'Coluna {target_column} não encontrada') df = df.dropna(subset=[target_column]) y = df[target_column].astype(str) X = df.drop(columns=[target_column]) X = pd.get_dummies(X, drop_first=True) X = X.apply(pd.to_numeric, errors='coerce').fillna(0) scaler = StandardScaler() X_scaled = scaler.fit_transform(X) pca = PCA(n_components=n_components) comps = pca.fit_transform(X_scaled) explained = [float(round(v, 3)) for v in pca.explained_variance_ratio_] cumulative = float(round(sum(pca.explained_variance_ratio_), 3)) return comps, y, explained, cumulative def create_pca_plot(target_column: str, title: str): """Gera figura PCA 2D utilizando matplotlib""" import matplotlib matplotlib.use('Agg') import matplotlib.pyplot as plt import pandas as pd try: comps, labels, explained, cumulative = perform_pca(target_column, 2) unique_labels = sorted(set(labels)) cmap = plt.cm.get_cmap('tab10', len(unique_labels)) fig, ax = plt.subplots(figsize=(10, 6)) for idx, lbl in enumerate(unique_labels): mask = [l == lbl for l in labels] ax.scatter(comps[mask, 0], comps[mask, 1], color=cmap(idx), label=str(lbl), alpha=0.7) ax.set_xlabel(f'PC1 ({explained[0]*100:.1f}%)') ax.set_ylabel(f'PC2 ({explained[1]*100:.1f}%)') ax.set_title(title) ax.legend() buffer = io.BytesIO() plt.savefig(buffer, format='png', dpi=150, bbox_inches='tight') buffer.seek(0) plot_data = base64.b64encode(buffer.getvalue()).decode('utf-8') plt.close(fig) return f"data:image/png;base64,{plot_data}", explained, cumulative except Exception as e: print(f"Erro ao criar PCA 2D: {str(e)}") return f"data:image/png;base64,{base64.b64encode(b'Plot Error').decode('utf-8')}", [0,0], 0 def create_pca_3d_plot(target_column: str, title: str): """Cria um gráfico PCA 3D real usando Plotly""" import plotly.express as px import pandas as pd try: comps, labels, explained, cumulative = perform_pca(target_column, 3) df = pd.DataFrame({ 'PC1': comps[:, 0], 'PC2': comps[:, 1], 'PC3': comps[:, 2], 'label': labels }) fig = px.scatter_3d(df, x='PC1', y='PC2', z='PC3', color='label') fig.update_layout(title=title) html = fig.to_html(full_html=False, include_plotlyjs=True) return html, explained, cumulative except Exception as e: print(f"Erro ao criar PCA 3D: {str(e)}") return "