Spaces:
Runtime error
Runtime error
| #!/usr/bin/env python | |
| """ | |
| Скрипт для анализа ненайденных пунктов по лучшему подходу чанкинга (200 слов, 75 перекрытие, baai/bge-m3, top-100). | |
| Формирует отчет в формате Markdown с топ-5 наиболее похожими чанками для каждого ненайденного пункта. | |
| """ | |
| import argparse | |
| import json | |
| import os | |
| import sys | |
| from pathlib import Path | |
| import numpy as np | |
| import pandas as pd | |
| from fuzzywuzzy import fuzz | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from tqdm import tqdm | |
| # Константы | |
| DATA_FOLDER = "data/docs" # Путь к папке с документами | |
| MODEL_NAME = "BAAI/bge-m3" # Название лучшей модели | |
| DATASET_PATH = "data/dataset.xlsx" # Путь к Excel-датасету с вопросами | |
| OUTPUT_DIR = "data" # Директория для сохранения результатов | |
| MARKDOWN_FILE = "missing_puncts_analysis.md" # Имя выходного MD-файла | |
| SIMILARITY_THRESHOLD = 0.7 # Порог для нечеткого сравнения | |
| WORDS_PER_CHUNK = 200 # Размер чанка в словах | |
| OVERLAP_WORDS = 75 # Перекрытие в словах | |
| TOP_N = 100 # Количество чанков в топе | |
| sys.path.insert(0, str(Path(__file__).parent.parent)) | |
| def parse_args(): | |
| """ | |
| Парсит аргументы командной строки. | |
| Returns: | |
| Аргументы командной строки | |
| """ | |
| parser = argparse.ArgumentParser(description="Анализ ненайденных пунктов для лучшего подхода чанкинга") | |
| parser.add_argument("--data-folder", type=str, default=DATA_FOLDER, | |
| help=f"Путь к папке с документами (по умолчанию: {DATA_FOLDER})") | |
| parser.add_argument("--model-name", type=str, default=MODEL_NAME, | |
| help=f"Название модели (по умолчанию: {MODEL_NAME})") | |
| parser.add_argument("--dataset-path", type=str, default=DATASET_PATH, | |
| help=f"Путь к Excel-датасету с вопросами (по умолчанию: {DATASET_PATH})") | |
| parser.add_argument("--output-dir", type=str, default=OUTPUT_DIR, | |
| help=f"Директория для сохранения результатов (по умолчанию: {OUTPUT_DIR})") | |
| parser.add_argument("--markdown-file", type=str, default=MARKDOWN_FILE, | |
| help=f"Имя выходного MD-файла (по умолчанию: {MARKDOWN_FILE})") | |
| parser.add_argument("--similarity-threshold", type=float, default=SIMILARITY_THRESHOLD, | |
| help=f"Порог для нечеткого сравнения (по умолчанию: {SIMILARITY_THRESHOLD})") | |
| parser.add_argument("--words-per-chunk", type=int, default=WORDS_PER_CHUNK, | |
| help=f"Размер чанка в словах (по умолчанию: {WORDS_PER_CHUNK})") | |
| parser.add_argument("--overlap-words", type=int, default=OVERLAP_WORDS, | |
| help=f"Перекрытие в словах (по умолчанию: {OVERLAP_WORDS})") | |
| parser.add_argument("--top-n", type=int, default=TOP_N, | |
| help=f"Количество чанков в топе (по умолчанию: {TOP_N})") | |
| return parser.parse_args() | |
| def load_questions_dataset(file_path: str) -> pd.DataFrame: | |
| """ | |
| Загружает датасет с вопросами из Excel-файла. | |
| Args: | |
| file_path: Путь к Excel-файлу | |
| Returns: | |
| DataFrame с вопросами и пунктами | |
| """ | |
| print(f"Загрузка датасета из {file_path}...") | |
| df = pd.read_excel(file_path) | |
| print(f"Загружен датасет со столбцами: {df.columns.tolist()}") | |
| # Преобразуем NaN в пустые строки для текстовых полей | |
| text_columns = ['question', 'text', 'item_type'] | |
| for col in text_columns: | |
| if col in df.columns: | |
| df[col] = df[col].fillna('') | |
| return df | |
| def load_chunks_and_embeddings(output_dir: str, words_per_chunk: int, overlap_words: int, model_name: str) -> tuple: | |
| """ | |
| Загружает чанки и эмбеддинги из файлов. | |
| Args: | |
| output_dir: Директория с файлами | |
| words_per_chunk: Размер чанка в словах | |
| overlap_words: Перекрытие в словах | |
| model_name: Название модели | |
| Returns: | |
| Кортеж (чанки, эмбеддинги чанков, эмбеддинги вопросов, данные вопросов) | |
| """ | |
| # Формируем уникальное имя для файлов на основе параметров | |
| model_name_safe = model_name.replace('/', '_') | |
| strategy_config_str = f"fixed_size_w{words_per_chunk}_o{overlap_words}" | |
| chunks_filename = f"chunks_{strategy_config_str}_{model_name_safe}" | |
| questions_filename = f"questions_{model_name_safe}" | |
| # Пути к файлам | |
| chunks_embeddings_path = os.path.join(output_dir, f"{chunks_filename}_embeddings.npy") | |
| chunks_data_path = os.path.join(output_dir, f"{chunks_filename}_data.csv") | |
| questions_embeddings_path = os.path.join(output_dir, f"{questions_filename}_embeddings.npy") | |
| questions_data_path = os.path.join(output_dir, f"{questions_filename}_data.csv") | |
| # Проверяем наличие всех файлов | |
| for path in [chunks_embeddings_path, chunks_data_path, questions_embeddings_path, questions_data_path]: | |
| if not os.path.exists(path): | |
| raise FileNotFoundError(f"Файл {path} не найден") | |
| # Загружаем данные | |
| print(f"Загрузка данных из {output_dir}...") | |
| chunks_embeddings = np.load(chunks_embeddings_path) | |
| chunks_df = pd.read_csv(chunks_data_path) | |
| questions_embeddings = np.load(questions_embeddings_path) | |
| questions_df = pd.read_csv(questions_data_path) | |
| print(f"Загружено {len(chunks_df)} чанков и {len(questions_df)} вопросов") | |
| return chunks_df, chunks_embeddings, questions_embeddings, questions_df | |
| def load_top_chunks(top_chunks_dir: str) -> dict: | |
| """ | |
| Загружает JSON-файлы с топ-чанками для вопросов. | |
| Args: | |
| top_chunks_dir: Директория с JSON-файлами | |
| Returns: | |
| Словарь {question_id: данные из JSON} | |
| """ | |
| print(f"Загрузка топ-чанков из {top_chunks_dir}...") | |
| top_chunks_data = {} | |
| json_files = list(Path(top_chunks_dir).glob("question_*_top_chunks.json")) | |
| for json_file in tqdm(json_files, desc="Загрузка JSON-файлов"): | |
| try: | |
| with open(json_file, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| question_id = data.get('question_id') | |
| if question_id is not None: | |
| top_chunks_data[question_id] = data | |
| except Exception as e: | |
| print(f"Ошибка при загрузке файла {json_file}: {e}") | |
| print(f"Загружены данные для {len(top_chunks_data)} вопросов") | |
| return top_chunks_data | |
| def calculate_chunk_overlap(chunk_text: str, punct_text: str) -> float: | |
| """ | |
| Рассчитывает степень перекрытия между чанком и пунктом с использованием partial_ratio. | |
| Args: | |
| chunk_text: Текст чанка | |
| punct_text: Текст пункта | |
| Returns: | |
| Коэффициент перекрытия от 0 до 1 | |
| """ | |
| # Если чанк входит в пункт, возвращаем 1.0 (полное вхождение) | |
| if chunk_text in punct_text: | |
| return 1.0 | |
| # Если пункт входит в чанк, возвращаем соотношение длин | |
| if punct_text in chunk_text: | |
| return len(punct_text) / len(chunk_text) | |
| # Используем partial_ratio из fuzzywuzzy | |
| partial_ratio_score = fuzz.partial_ratio(chunk_text, punct_text) / 100.0 | |
| return partial_ratio_score | |
| def find_most_similar_chunks(punct_text: str, chunks_df: pd.DataFrame, chunks_embeddings: np.ndarray, punct_embedding: np.ndarray, top_n: int = 5) -> list: | |
| """ | |
| Находит топ-N наиболее похожих чанков для заданного пункта. | |
| Args: | |
| punct_text: Текст пункта | |
| chunks_df: DataFrame с чанками | |
| chunks_embeddings: Эмбеддинги чанков | |
| punct_embedding: Эмбеддинг пункта | |
| top_n: Количество похожих чанков (по умолчанию 5) | |
| Returns: | |
| Список словарей с информацией о похожих чанках | |
| """ | |
| # Вычисляем косинусную близость между пунктом и всеми чанками | |
| similarities = cosine_similarity([punct_embedding], chunks_embeddings)[0] | |
| # Получаем индексы топ-N чанков по косинусной близости | |
| top_indices = np.argsort(similarities)[-top_n:][::-1] | |
| similar_chunks = [] | |
| for idx in top_indices: | |
| chunk = chunks_df.iloc[idx] | |
| overlap = calculate_chunk_overlap(chunk['text'], punct_text) | |
| similar_chunks.append({ | |
| 'chunk_id': chunk['id'], | |
| 'doc_name': chunk['doc_name'], | |
| 'text': chunk['text'], | |
| 'similarity': float(similarities[idx]), | |
| 'overlap': overlap | |
| }) | |
| return similar_chunks | |
| def analyze_missing_puncts(questions_df: pd.DataFrame, chunks_df: pd.DataFrame, | |
| questions_embeddings: np.ndarray, chunks_embeddings: np.ndarray, | |
| similarity_threshold: float, top_n: int = 100) -> dict: | |
| """ | |
| Анализирует ненайденные пункты и находит для них наиболее похожие чанки. | |
| Args: | |
| questions_df: DataFrame с вопросами и пунктами | |
| chunks_df: DataFrame с чанками | |
| questions_embeddings: Эмбеддинги вопросов | |
| chunks_embeddings: Эмбеддинги чанков | |
| similarity_threshold: Порог для определения найденных пунктов | |
| top_n: Количество чанков для проверки (по умолчанию 100) | |
| Returns: | |
| Словарь с результатами анализа | |
| """ | |
| print("Анализ ненайденных пунктов...") | |
| # Проверяем соответствие количества вопросов и эмбеддингов | |
| unique_question_ids = questions_df['id'].unique() | |
| if len(unique_question_ids) != questions_embeddings.shape[0]: | |
| print(f"ВНИМАНИЕ: Количество уникальных ID вопросов ({len(unique_question_ids)}) не соответствует размеру массива эмбеддингов ({questions_embeddings.shape[0]}).") | |
| print("Будут анализироваться только вопросы, имеющие соответствующие эмбеддинги.") | |
| # Создаем маппинг id вопроса -> индекс в DataFrame с метаданными | |
| # Используем порядковый номер в списке уникальных ID, а не порядок строк в DataFrame | |
| question_id_to_idx = {qid: idx for idx, qid in enumerate(unique_question_ids)} | |
| # Вычисляем косинусную близость между вопросами и чанками | |
| similarity_matrix = cosine_similarity(questions_embeddings, chunks_embeddings) | |
| # Результаты анализа | |
| analysis_results = {} | |
| # Обрабатываем только те вопросы, для которых у нас есть эмбеддинги | |
| valid_question_ids = [qid for qid in unique_question_ids if qid in question_id_to_idx and question_id_to_idx[qid] < len(questions_embeddings)] | |
| # Группируем датасет по id вопроса | |
| for question_id in tqdm(valid_question_ids, desc="Анализ вопросов"): | |
| # Получаем строки для текущего вопроса | |
| question_rows = questions_df[questions_df['id'] == question_id] | |
| # Если нет строк с таким id, пропускаем | |
| if len(question_rows) == 0: | |
| continue | |
| # Получаем индекс вопроса в массиве эмбеддингов | |
| question_idx = question_id_to_idx[question_id] | |
| # Если индекс выходит за границы массива эмбеддингов, пропускаем | |
| if question_idx >= questions_embeddings.shape[0]: | |
| print(f"ВНИМАНИЕ: Индекс {question_idx} для вопроса {question_id} выходит за границы массива эмбеддингов размера {questions_embeddings.shape[0]}. Пропускаем.") | |
| continue | |
| # Получаем текст вопроса и пункты | |
| question_text = question_rows['question'].iloc[0] | |
| # Собираем пункты с информацией о документе | |
| puncts = [] | |
| for _, row in question_rows.iterrows(): | |
| punct_doc = row.get('filename', '') if 'filename' in row else '' | |
| if pd.isna(punct_doc): | |
| punct_doc = '' | |
| puncts.append({ | |
| 'text': row['text'], | |
| 'doc_name': punct_doc | |
| }) | |
| # Получаем связанные документы | |
| relevant_docs = [] | |
| if 'filename' in question_rows.columns: | |
| relevant_docs = [f for f in question_rows['filename'].unique() if f and not pd.isna(f)] | |
| else: | |
| relevant_docs = chunks_df['doc_name'].unique().tolist() | |
| # Если для вопроса нет релевантных документов, пропускаем | |
| if not relevant_docs: | |
| continue | |
| # Для отслеживания найденных и ненайденных пунктов | |
| found_puncts = [] | |
| missing_puncts = [] | |
| # Собираем все чанки для документов вопроса | |
| all_question_chunks = [] | |
| all_question_similarities = [] | |
| for filename in relevant_docs: | |
| if not filename or pd.isna(filename): | |
| continue | |
| # Фильтруем чанки по имени файла | |
| doc_chunks = chunks_df[chunks_df['doc_name'] == filename] | |
| if doc_chunks.empty: | |
| continue | |
| # Индексы чанков для текущего файла | |
| doc_chunk_indices = doc_chunks.index.tolist() | |
| # Проверяем, что индексы чанков существуют в chunks_df | |
| valid_indices = [idx for idx in doc_chunk_indices if idx in chunks_df.index] | |
| # Получаем значения близости для чанков текущего файла | |
| doc_similarities = [] | |
| for idx in valid_indices: | |
| try: | |
| chunk_loc = chunks_df.index.get_loc(idx) | |
| doc_similarities.append(similarity_matrix[question_idx, chunk_loc]) | |
| except (KeyError, IndexError) as e: | |
| print(f"Ошибка при получении индекса для чанка {idx}: {e}") | |
| continue | |
| # Добавляем чанки и их схожести к общему списку для вопроса | |
| for i, idx in enumerate(valid_indices): | |
| if i < len(doc_similarities): # проверяем, что у нас есть соответствующее значение similarity | |
| try: | |
| chunk_row = doc_chunks.loc[idx] | |
| all_question_chunks.append((idx, chunk_row)) | |
| all_question_similarities.append(doc_similarities[i]) | |
| except KeyError as e: | |
| print(f"Ошибка при доступе к строке с индексом {idx}: {e}") | |
| # Если нет чанков для вопроса, пропускаем | |
| if not all_question_chunks: | |
| continue | |
| # Сортируем все чанки по убыванию схожести и берем top_n | |
| sorted_indices = np.argsort(all_question_similarities)[-min(top_n, len(all_question_similarities)):][::-1] | |
| top_chunks = [] | |
| top_similarities = [] | |
| # Собираем топ-N чанков и их схожести | |
| for i in sorted_indices: | |
| idx, chunk = all_question_chunks[i] | |
| top_chunks.append({ | |
| 'id': chunk['id'], | |
| 'doc_name': chunk['doc_name'], | |
| 'text': chunk['text'] | |
| }) | |
| top_similarities.append(all_question_similarities[i]) | |
| # Проверяем каждый пункт на наличие в топ-чанках | |
| for i, punct in enumerate(puncts): | |
| is_found = False | |
| punct_text = punct['text'] | |
| punct_doc = punct['doc_name'] | |
| # Для каждого чанка из топ-N рассчитываем partial_ratio с пунктом | |
| chunk_overlaps = [] | |
| for j, chunk in enumerate(top_chunks): | |
| overlap = calculate_chunk_overlap(chunk['text'], punct_text) | |
| # Если перекрытие больше порога, пункт найден | |
| if overlap >= similarity_threshold: | |
| is_found = True | |
| # Сохраняем информацию о перекрытии для каждого чанка | |
| chunk_overlaps.append({ | |
| 'chunk_id': chunk['id'], | |
| 'doc_name': chunk['doc_name'], | |
| 'text': chunk['text'], | |
| 'overlap': overlap, | |
| 'similarity': float(top_similarities[j]) | |
| }) | |
| # Если пункт найден, добавляем в список найденных | |
| if is_found: | |
| found_puncts.append({ | |
| 'index': i, | |
| 'text': punct_text, | |
| 'doc_name': punct_doc | |
| }) | |
| else: | |
| # Сортируем чанки по убыванию перекрытия с пунктом и берем топ-5 | |
| chunk_overlaps.sort(key=lambda x: x['overlap'], reverse=True) | |
| top_overlaps = chunk_overlaps[:5] | |
| missing_puncts.append({ | |
| 'index': i, | |
| 'text': punct_text, | |
| 'doc_name': punct_doc, | |
| 'similar_chunks': top_overlaps | |
| }) | |
| # Добавляем результаты для текущего вопроса | |
| analysis_results[question_id] = { | |
| 'question_id': question_id, | |
| 'question_text': question_text, | |
| 'found_puncts_count': len(found_puncts), | |
| 'missing_puncts_count': len(missing_puncts), | |
| 'total_puncts_count': len(puncts), | |
| 'found_puncts': found_puncts, | |
| 'missing_puncts': missing_puncts | |
| } | |
| return analysis_results | |
| def generate_markdown_report(analysis_results: dict, output_file: str, | |
| words_per_chunk: int, overlap_words: int, model_name: str, top_n: int): | |
| """ | |
| Генерирует отчет в формате Markdown. | |
| Args: | |
| analysis_results: Результаты анализа | |
| output_file: Путь к выходному файлу | |
| words_per_chunk: Размер чанка в словах | |
| overlap_words: Перекрытие в словах | |
| model_name: Название модели | |
| top_n: Количество чанков в топе | |
| """ | |
| print(f"Генерация отчета в формате Markdown в {output_file}...") | |
| with open(output_file, 'w', encoding='utf-8') as f: | |
| # Заголовок отчета | |
| f.write(f"# Анализ ненайденных пунктов для оптимальной конфигурации чанкинга\n\n") | |
| # Параметры анализа | |
| f.write("## Параметры анализа\n\n") | |
| f.write(f"- **Модель**: {model_name}\n") | |
| f.write(f"- **Размер чанка**: {words_per_chunk} слов\n") | |
| f.write(f"- **Перекрытие**: {overlap_words} слов ({round(overlap_words/words_per_chunk*100, 1)}%)\n") | |
| f.write(f"- **Количество чанков в топе**: {top_n}\n\n") | |
| # Сводная статистика | |
| total_questions = len(analysis_results) | |
| total_puncts = sum(q['total_puncts_count'] for q in analysis_results.values()) | |
| total_found = sum(q['found_puncts_count'] for q in analysis_results.values()) | |
| total_missing = sum(q['missing_puncts_count'] for q in analysis_results.values()) | |
| f.write("## Сводная статистика\n\n") | |
| f.write(f"- **Всего вопросов**: {total_questions}\n") | |
| f.write(f"- **Всего пунктов**: {total_puncts}\n") | |
| f.write(f"- **Найдено пунктов**: {total_found} ({round(total_found/total_puncts*100, 1)}%)\n") | |
| f.write(f"- **Ненайдено пунктов**: {total_missing} ({round(total_missing/total_puncts*100, 1)}%)\n\n") | |
| # Детали по каждому вопросу | |
| f.write("## Детальный анализ по вопросам\n\n") | |
| # Сортируем вопросы по количеству ненайденных пунктов (по убыванию) | |
| sorted_questions = sorted( | |
| analysis_results.values(), | |
| key=lambda x: x['missing_puncts_count'], | |
| reverse=True | |
| ) | |
| for question_data in sorted_questions: | |
| question_id = question_data['question_id'] | |
| question_text = question_data['question_text'] | |
| missing_count = question_data['missing_puncts_count'] | |
| total_count = question_data['total_puncts_count'] | |
| # Если нет ненайденных пунктов, пропускаем | |
| if missing_count == 0: | |
| continue | |
| f.write(f"### Вопрос {question_id}\n\n") | |
| f.write(f"**Текст вопроса**: {question_text}\n\n") | |
| f.write(f"**Статистика**: найдено {question_data['found_puncts_count']} из {total_count} пунктов ") | |
| f.write(f"({round(question_data['found_puncts_count']/total_count*100, 1)}%)\n\n") | |
| # Детали по ненайденным пунктам | |
| f.write("#### Ненайденные пункты\n\n") | |
| for i, punct in enumerate(question_data['missing_puncts']): | |
| punct_text = punct['text'] | |
| punct_doc = punct.get('doc_name', '') | |
| similar_chunks = punct['similar_chunks'] | |
| f.write(f"##### Пункт {i+1}\n\n") | |
| f.write(f"**Текст пункта**: {punct_text}\n\n") | |
| if punct_doc: | |
| f.write(f"**Документ пункта**: {punct_doc}\n\n") | |
| f.write("**Топ-5 наиболее похожих чанков**:\n\n") | |
| # Таблица с похожими чанками | |
| f.write("| № | Документ | Схожесть (с вопросом) | Перекрытие (с пунктом) | Текст чанка |\n") | |
| f.write("|---|----------|----------|------------|------------|\n") | |
| for j, chunk in enumerate(similar_chunks): | |
| # Используем полный текст чанка без обрезки | |
| chunk_text = chunk['text'] | |
| f.write(f"| {j+1} | {chunk['doc_name']} | {chunk['similarity']:.4f} | ") | |
| f.write(f"{chunk['overlap']:.4f} | {chunk_text} |\n") | |
| f.write("\n") | |
| f.write("\n") | |
| print(f"Отчет успешно сгенерирован: {output_file}") | |
| def main(): | |
| """ | |
| Основная функция скрипта. | |
| """ | |
| args = parse_args() | |
| # Загружаем датасет с вопросами | |
| questions_df = load_questions_dataset(args.dataset_path) | |
| # Загружаем чанки и эмбеддинги | |
| chunks_df, chunks_embeddings, questions_embeddings, questions_meta = load_chunks_and_embeddings( | |
| args.output_dir, args.words_per_chunk, args.overlap_words, args.model_name | |
| ) | |
| # Анализируем ненайденные пункты | |
| analysis_results = analyze_missing_puncts( | |
| questions_df, chunks_df, questions_embeddings, chunks_embeddings, | |
| args.similarity_threshold, args.top_n | |
| ) | |
| # Генерируем отчет в формате Markdown | |
| output_file = os.path.join(args.output_dir, args.markdown_file) | |
| generate_markdown_report( | |
| analysis_results, output_file, | |
| args.words_per_chunk, args.overlap_words, args.model_name, args.top_n | |
| ) | |
| print(f"Анализ ненайденных пунктов завершен. Результаты сохранены в {output_file}") | |
| if __name__ == "__main__": | |
| main() |