Spaces:
Build error
Build error
| import pandas as pd | |
| import numpy as np | |
| from sklearn.discriminant_analysis import LinearDiscriminantAnalysis | |
| from sklearn.preprocessing import StandardScaler, LabelEncoder | |
| from sklearn.decomposition import PCA | |
| from sklearn.feature_selection import SelectKBest, f_classif | |
| import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| import logging | |
| import os | |
| from datetime import datetime | |
| from typing import Dict, Tuple, List, Optional, Any | |
| import xlsxwriter | |
| class LDAAnalyzer: | |
| """ | |
| Класс для выполнения линейного дискриминантного анализа (LDA) | |
| с расширенной функциональностью и форматированным выводом результатов | |
| """ | |
| def __init__(self, input_file: str, target_column: int): | |
| """ | |
| Инициализация анализатора LDA | |
| Args: | |
| input_file (str): Путь к входному файлу Excel | |
| target_column (int): Номер столбца для классификации | |
| """ | |
| self.input_file = input_file | |
| self.target_column = target_column | |
| self.data = None | |
| self.X = None | |
| self.y = None | |
| self.X_transformed = None | |
| self.lda = None | |
| self.scaler = StandardScaler() | |
| self.label_encoder = LabelEncoder() | |
| self.feature_names = None | |
| # Настройка логирования | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.FileHandler('lda_analysis.log'), | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| self.logger = logging.getLogger(__name__) | |
| # Цветовая схема для визуализации | |
| self.colors = ['lightblue', 'green', 'purple', 'yellow', | |
| 'red', 'orange', 'cyan', 'brown', 'pink'] | |
| self.logger.info(f"Инициализация LDA анализатора с файлом: {input_file}") | |
| def validate_data(self) -> None: | |
| """Валидация входных данных""" | |
| if self.data is None: | |
| raise ValueError("Данные не загружены") | |
| # Проверка размерности | |
| if self.data.shape[0] < 30: | |
| raise ValueError("Недостаточно наблюдений (минимум 30)") | |
| # Проверка пропущенных значений | |
| if self.data.isnull().any().any(): | |
| raise ValueError("Обнаружены пропущенные значения") | |
| # Проверка типов данных | |
| numeric_cols = self.data.select_dtypes(include=[np.number]).columns | |
| if len(numeric_cols) < self.data.shape[1] - 1: # -1 для целевой переменной | |
| raise ValueError("Обнаружены нечисловые признаки") | |
| def load_data(self) -> None: | |
| """Загрузка данных из Excel файла""" | |
| try: | |
| self.logger.info("Загрузка данных...") | |
| # Загрузка данных | |
| self.data = pd.read_excel(self.input_file) | |
| # Преобразование имен колонок | |
| self.data.columns = [str(col) for col in self.data.columns] | |
| # Попытка преобразовать все колонки (кроме целевой) в числовой формат | |
| for col in self.data.columns: | |
| if self.data.columns.get_loc(col) != self.target_column: | |
| try: | |
| self.data[col] = pd.to_numeric(self.data[col], errors='coerce') | |
| except Exception as e: | |
| self.logger.warning(f"Не удалось преобразовать колонку {col} в числовой формат: {str(e)}") | |
| self.validate_data() | |
| self.logger.info(f"Данные загружены. Размерность: {self.data.shape}") | |
| except Exception as e: | |
| self.logger.error(f"Ошибка при загрузке данных: {str(e)}") | |
| raise | |
| def prepare_data(self) -> None: | |
| """Подготовка данных для анализа""" | |
| try: | |
| self.logger.info("Подготовка данных...") | |
| # Разделение на признаки и целевую переменную | |
| X = self.data.drop(self.data.columns[self.target_column], axis=1) | |
| y = self.data.iloc[:, self.target_column] | |
| # Преобразование имен колонок в строки | |
| X.columns = X.columns.astype(str) | |
| # Кодирование меток классов | |
| self.y = self.label_encoder.fit_transform(y) + 1 | |
| # Преобразование в числовой формат | |
| X = X.apply(pd.to_numeric, errors='coerce') | |
| # Проверка на пропущенные значения после преобразования | |
| if X.isnull().any().any(): | |
| raise ValueError("После преобразования в числовой формат появились пропущенные значения") | |
| # Стандартизация признаков | |
| self.X = self.scaler.fit_transform(X) | |
| # Проверка количества классов и наблюдений в каждом классе | |
| class_counts = pd.Series(self.y).value_counts() | |
| if (class_counts < 5).any(): | |
| self.logger.warning("Некоторые классы имеют менее 5 наблюдений") | |
| self.logger.info(f"Данные подготовлены. X: {self.X.shape}, y: {self.y.shape}") | |
| self.logger.info(f"Количество классов: {len(np.unique(self.y))}") | |
| except Exception as e: | |
| self.logger.error(f"Ошибка при подготовке данных: {str(e)}") | |
| raise | |
| def perform_lda(self) -> None: | |
| """Выполнение LDA анализа""" | |
| try: | |
| self.logger.info("Выполнение LDA анализа...") | |
| # Инициализация и обучение LDA | |
| self.lda = LinearDiscriminantAnalysis(solver='svd') | |
| self.X_transformed = self.lda.fit_transform(self.X, self.y) | |
| # Оценка качества модели | |
| accuracy = self.lda.score(self.X, self.y) | |
| self.logger.info(f"Общая точность модели: {accuracy:.3f}") | |
| except Exception as e: | |
| self.logger.error(f"Ошибка при выполнении LDA: {str(e)}") | |
| raise | |
| def create_confusion_matrix(self) -> Tuple[pd.DataFrame, List[List[str]], float]: | |
| """ | |
| Создание матрицы ошибок и расчет процентов классификации | |
| Returns: | |
| tuple: (матрица ошибок, проценты, общая точность) | |
| """ | |
| try: | |
| self.logger.info("Создание матрицы ошибок...") | |
| # Получение предсказаний | |
| y_pred = self.lda.predict(self.X) | |
| # Создание матрицы ошибок | |
| classes = sorted(np.unique(self.y)) | |
| n_classes = len(classes) | |
| confusion_matrix = np.zeros((n_classes, n_classes)) | |
| for i in range(len(self.y)): | |
| confusion_matrix[self.y[i]-1][y_pred[i]-1] += 1 | |
| # Создание DataFrame для матрицы ошибок | |
| columns = [f"{i+1}.00" for i in range(n_classes)] | |
| index = [f"{i+1}.00" for i in range(n_classes)] | |
| df_confusion = pd.DataFrame(confusion_matrix, columns=columns, index=index) | |
| # Добавление столбца "Всего" | |
| df_confusion['Всего'] = df_confusion.sum(axis=1) | |
| # Расчет процентов | |
| percentages = np.zeros((n_classes, n_classes + 1)) # +1 для столбца "Всего" | |
| for i in range(n_classes): | |
| row_sum = confusion_matrix[i].sum() | |
| if row_sum > 0: | |
| percentages[i, :-1] = (confusion_matrix[i] / row_sum) * 100 | |
| percentages[i, -1] = 100.0 | |
| # Форматирование процентов | |
| percentage_rows = [] | |
| for row in percentages: | |
| formatted_row = [f"{x:.1f}" for x in row] | |
| percentage_rows.append(formatted_row) | |
| # Расчет общей точности | |
| accuracy = (np.sum(np.diag(confusion_matrix)) / np.sum(confusion_matrix)) * 100 | |
| self.logger.info(f"Процент правильной классификации: {accuracy:.1f}%") | |
| return df_confusion, percentage_rows, accuracy | |
| except Exception as e: | |
| self.logger.error(f"Ошибка при создании матрицы ошибок: {str(e)}") | |
| raise | |
| def get_coefficients(self) -> pd.DataFrame: | |
| """ | |
| Получение коэффициентов дискриминантных функций | |
| Returns: | |
| pd.DataFrame: таблица коэффициентов | |
| """ | |
| try: | |
| self.logger.info("Получение коэффициентов...") | |
| # Получение коэффициентов и размерностей | |
| n_features = self.X.shape[1] | |
| n_classes = len(np.unique(self.y)) | |
| n_components = min(n_classes - 1, n_features) | |
| # Создание списка имен переменных | |
| var_names = [f"VAR{str(i+1).zfill(5)}" for i in range(n_features)] | |
| # Создание DataFrame с коэффициентами | |
| coef_data = [] | |
| for i in range(n_components): | |
| row_data = {} | |
| for j, var_name in enumerate(var_names): | |
| row_data[var_name] = self.lda.coef_[i][j] | |
| coef_data.append(row_data) | |
| df_coef = pd.DataFrame(coef_data, index=[f"Функция {i+1}" for i in range(n_components)]) | |
| # Добавление константы (intercept) | |
| const_data = {} | |
| for j, var_name in enumerate(var_names): | |
| const_data[var_name] = self.lda.intercept_[j] if j < len(self.lda.intercept_) else 0.0 | |
| const_df = pd.DataFrame([const_data], index=['Константа']) | |
| # Объединение коэффициентов и константы | |
| df_coef = pd.concat([df_coef, const_df]) | |
| # Округление значений | |
| df_coef = df_coef.round(3) | |
| self.logger.info("Коэффициенты получены") | |
| return df_coef | |
| except Exception as e: | |
| self.logger.error(f"Ошибка при получении коэффициентов: {str(e)}") | |
| raise | |
| def create_visualization(self) -> plt.Figure: | |
| """ | |
| Создание визуализации результатов | |
| Returns: | |
| plt.Figure: объект графика | |
| """ | |
| try: | |
| self.logger.info("Создание визуализации...") | |
| fig = plt.figure(figsize=(12, 8)) | |
| # Построение точек для каждого класса | |
| for class_num in np.unique(self.y): | |
| mask = self.y == class_num | |
| plt.scatter( | |
| self.X_transformed[mask, 0], | |
| self.X_transformed[mask, 1] if self.X_transformed.shape[1] > 1 | |
| else np.zeros_like(self.X_transformed[mask, 0]), | |
| c=[self.colors[(class_num-1) % len(self.colors)]], | |
| label=f'Группа {class_num}', | |
| alpha=0.7 | |
| ) | |
| # Добавление центроидов | |
| centroid = np.mean(self.X_transformed[mask, :2], axis=0) | |
| plt.scatter( | |
| centroid[0], | |
| centroid[1] if self.X_transformed.shape[1] > 1 else 0, | |
| c='black', | |
| marker='s', | |
| s=100 | |
| ) | |
| plt.annotate( | |
| f'{class_num}', | |
| (centroid[0], centroid[1]), | |
| xytext=(5, 5), | |
| textcoords='offset points', | |
| fontsize=10, | |
| bbox=dict(facecolor='white', edgecolor='none', alpha=0.7) | |
| ) | |
| plt.xlabel('Первая каноническая функция') | |
| plt.ylabel('Вторая каноническая функция') | |
| plt.title('Канонические дискриминантные функции') | |
| plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left') | |
| plt.grid(True, alpha=0.3) | |
| plt.tight_layout() | |
| self.logger.info("Визуализация создана") | |
| return fig | |
| except Exception as e: | |
| self.logger.error(f"Ошибка при создании визуализации: {str(e)}") | |
| raise | |
| def save_results(self, output_dir: str) -> None: | |
| """ | |
| Сохранение всех результатов анализа | |
| Args: | |
| output_dir (str): директория для сохранения результатов | |
| """ | |
| try: | |
| self.logger.info(f"Сохранение результатов в {output_dir}...") | |
| # Создание директории если её нет | |
| os.makedirs(output_dir, exist_ok=True) | |
| # Получение результатов | |
| confusion_matrix, percentages, accuracy = self.create_confusion_matrix() | |
| coefficients = self.get_coefficients() | |
| # Сохранение в Excel | |
| excel_path = os.path.join(output_dir, 'lda_results.xlsx') | |
| with pd.ExcelWriter(excel_path, engine='xlsxwriter') as writer: | |
| workbook = writer.book | |
| # Форматы для Excel | |
| header_format = workbook.add_format({ | |
| 'bold': True, | |
| 'align': 'center', | |
| 'valign': 'vcenter', | |
| 'bg_color': '#D9D9D9', | |
| 'border': 1 | |
| }) | |
| cell_format = workbook.add_format({ | |
| 'align': 'center', | |
| 'border': 1 | |
| }) | |
| number_format = workbook.add_format({ | |
| 'align': 'center', | |
| 'border': 1, | |
| 'num_format': '0.000' | |
| }) | |
| # 1. Матрица классификации | |
| worksheet1 = workbook.add_worksheet('Матрица классификации') | |
| # Записываем заголовки | |
| headers = ['Исходный', 'Количество'] + \ | |
| [f'{i+1}.00' for i in range(len(confusion_matrix.columns)-1)] + \ | |
| ['Всего'] | |
| for col, header in enumerate(headers): | |
| worksheet1.write(0, col, header, header_format) | |
| worksheet1.set_column(col, col, 15) | |
| # Записываем данные | |
| for i, (index, row) in enumerate(confusion_matrix.iterrows()): | |
| worksheet1.write(i+1, 0, index, cell_format) | |
| worksheet1.write(i+1, 1, row['Всего'], cell_format) | |
| for j, val in enumerate(row): | |
| worksheet1.write(i+1, j+2, val, cell_format) | |
| # 2. Проценты классификации | |
| worksheet2 = workbook.add_worksheet('Проценты') | |
| # Заголовки | |
| for col, header in enumerate(headers): | |
| worksheet2.write(0, col, header, header_format) | |
| worksheet2.set_column(col, col, 15) | |
| # Данные процентов | |
| for i, row in enumerate(percentages): | |
| worksheet2.write(i+1, 0, f"{i+1}.00", cell_format) | |
| worksheet2.write(i+1, 1, confusion_matrix.iloc[i]['Всего'], cell_format) | |
| for j, val in enumerate(row): | |
| worksheet2.write(i+1, j+2, float(val.replace(',', '.')), number_format) | |
| # Примечание | |
| note_row = len(percentages) + 2 | |
| worksheet2.write( | |
| note_row, 0, | |
| f'* Примечание: {accuracy:.1f}% исходных сгруппированных наблюдений ' | |
| f'классифицированы правильно.', | |
| workbook.add_format({'bold': True}) | |
| ) | |
| # 3. Коэффициенты функций | |
| worksheet3 = workbook.add_worksheet('Коэффициенты') | |
| # Записываем заголовки коэффициентов | |
| worksheet3.write(0, 0, 'Переменная', header_format) | |
| for i, col in enumerate(coefficients.columns): | |
| worksheet3.write(0, i+1, col, header_format) | |
| worksheet3.set_column(i+1, i+1, 15) | |
| # Записываем данные коэффициентов | |
| for i, (index, row) in enumerate(coefficients.iterrows()): | |
| worksheet3.write(i+1, 0, index, cell_format) | |
| for j, val in enumerate(row): | |
| worksheet3.write(i+1, j+1, val, number_format) | |
| # Добавляем примечание к коэффициентам | |
| worksheet3.write( | |
| len(coefficients)+1, 0, | |
| '*Нестандартизованные коэффициенты', | |
| workbook.add_format({'bold': True, 'italic': True}) | |
| ) | |
| # Сохранение визуализации | |
| fig = self.create_visualization() | |
| fig.savefig( | |
| os.path.join(output_dir, 'lda_visualization.png'), | |
| bbox_inches='tight', | |
| dpi=300 | |
| ) | |
| plt.close(fig) | |
| self.logger.info("Результаты успешно сохранены") | |
| except Exception as e: | |
| self.logger.error(f"Ошибка при сохранении результатов: {str(e)}") | |
| raise |