| """ |
| Комплексный тест-сьют RAG-системы. |
| Запуск: python3 test_rag.py |
| |
| Тестирует: |
| 1. Загрузка PDF (обычный текст + таблицы) |
| 2. Гибридный поиск (BM25 точные коды + vector семантика) |
| 3. Per-document diversity |
| 4. Similarity threshold |
| 5. Детектор конфликтов |
| 6. app.py обработчики (без LLM) |
| 7. Краевые случаи |
| """ |
|
|
| import sys |
| import shutil |
| import tempfile |
| import unittest |
| from pathlib import Path |
| from unittest.mock import MagicMock, patch |
|
|
| |
| sys.path.insert(0, str(Path(__file__).parent)) |
|
|
| from rag_system import ( |
| RAGSystem, |
| RAGAnswer, |
| CHROMA_PERSIST_DIR, |
| SIMILARITY_THRESHOLD, |
| MAX_CHUNKS_PER_DOC, |
| ) |
| from langchain_core.documents import Document |
|
|
| |
| GREEN = "\033[92m" |
| RED = "\033[91m" |
| YELLOW = "\033[93m" |
| RESET = "\033[0m" |
| BOLD = "\033[1m" |
|
|
|
|
| |
|
|
| def _fake_rag() -> RAGSystem: |
| """ |
| Создаёт RAGSystem во временной директории без реальной модели LLM. |
| Embeddings — реальные (нужны для поиска), LLM — мок. |
| """ |
| tmp = tempfile.mkdtemp(prefix="rag_test_") |
| with patch("rag_system.CHROMA_PERSIST_DIR", tmp): |
| rag = RAGSystem.__new__(RAGSystem) |
| rag.llm_provider = "ollama" |
|
|
| from langchain_huggingface import HuggingFaceEmbeddings |
| rag.embeddings = HuggingFaceEmbeddings( |
| model_name="intfloat/multilingual-e5-small", |
| model_kwargs={"device": "cpu"}, |
| encode_kwargs={"normalize_embeddings": True}, |
| ) |
| from langchain_chroma import Chroma |
| from rag_system import COLLECTION_NAME |
| from langchain_core.prompts import PromptTemplate |
| from rag_system import STRICT_PROMPT_TEMPLATE |
| from langchain_text_splitters import RecursiveCharacterTextSplitter |
| from rag_system import CHUNK_SIZE, CHUNK_OVERLAP |
|
|
| Path(tmp).mkdir(parents=True, exist_ok=True) |
| rag.vectorstore = Chroma( |
| collection_name=COLLECTION_NAME, |
| embedding_function=rag.embeddings, |
| persist_directory=tmp, |
| ) |
| rag.splitter = RecursiveCharacterTextSplitter( |
| chunk_size=CHUNK_SIZE, |
| chunk_overlap=CHUNK_OVERLAP, |
| separators=["\n\n", "\n", ".", " ", ""], |
| ) |
| rag.prompt = PromptTemplate( |
| input_variables=["context", "question"], |
| template=STRICT_PROMPT_TEMPLATE, |
| ) |
| rag.llm = MagicMock() |
| rag.llm.__or__ = lambda self, other: other |
| rag._bm25_docs = [] |
| rag._tmp_dir = tmp |
| return rag |
|
|
|
|
| def _cleanup(rag: RAGSystem): |
| shutil.rmtree(getattr(rag, "_tmp_dir", ""), ignore_errors=True) |
|
|
|
|
| def _add_fake_docs(rag: RAGSystem, docs: list[Document]): |
| rag.vectorstore.add_documents(docs) |
| rag._rebuild_bm25() |
|
|
|
|
| |
|
|
| class TestPDFLoading(unittest.TestCase): |
| """Тест 1: Загрузка PDF и table-aware парсинг.""" |
|
|
| def setUp(self): |
| self.rag = _fake_rag() |
|
|
| def tearDown(self): |
| _cleanup(self.rag) |
|
|
| def test_load_real_pdf_produces_docs(self): |
| """PDF из test_pdfs должен давать >0 документов.""" |
| pdf = Path(__file__).parent / "test_pdfs" / "novobank_handbook_2024.pdf" |
| if not pdf.exists(): |
| self.skipTest("test_pdfs/novobank_handbook_2024.pdf не найден") |
| docs = RAGSystem._load_pdf_with_tables(str(pdf)) |
| self.assertGreater(len(docs), 0, "Документы не загружены") |
| for d in docs: |
| self.assertIn("source_file", d.metadata) |
| self.assertIn("page", d.metadata) |
| self.assertGreater(len(d.page_content.strip()), 0) |
|
|
| def test_table_keyword_preserved(self): |
| """После загрузки PDF с таблицей слово [ТАБЛИЦА] должно присутствовать |
| или содержимое таблицы — в тексте страницы.""" |
| pdf = Path(__file__).parent / "test_pdfs" / "novobank_handbook_2024.pdf" |
| if not pdf.exists(): |
| self.skipTest("test_pdfs/novobank_handbook_2024.pdf не найден") |
| docs = RAGSystem._load_pdf_with_tables(str(pdf)) |
| combined = "\n".join(d.page_content for d in docs) |
| |
| self.assertTrue( |
| any(kw in combined for kw in ["бесплатно", "руб", "%", "НовоБанк", "ставка"]), |
| "Ключевые банковские данные не найдены в загруженном тексте" |
| ) |
|
|
| def test_metadata_page_numbers(self): |
| """Страницы должны нумероваться с 0.""" |
| pdf = Path(__file__).parent / "test_pdfs" / "novobank_handbook_2024.pdf" |
| if not pdf.exists(): |
| self.skipTest("test_pdfs/novobank_handbook_2024.pdf не найден") |
| docs = RAGSystem._load_pdf_with_tables(str(pdf)) |
| pages = [d.metadata["page"] for d in docs] |
| self.assertEqual(min(pages), 0, "Нумерация должна начинаться с 0") |
|
|
| def test_nonexistent_path_returns_zero(self): |
| """Несуществующий путь → add_documents возвращает 0.""" |
| count = self.rag.add_documents("/tmp/nonexistent_dir_xyz") |
| self.assertEqual(count, 0) |
|
|
| def test_add_documents_updates_bm25(self): |
| """После add_documents BM25-индекс должен быть обновлён.""" |
| pdf = Path(__file__).parent / "test_pdfs" / "novobank_handbook_2024.pdf" |
| if not pdf.exists(): |
| self.skipTest("test_pdfs/novobank_handbook_2024.pdf не найден") |
| self.assertEqual(len(self.rag._bm25_docs), 0) |
| self.rag.add_documents(str(pdf)) |
| self.assertGreater(len(self.rag._bm25_docs), 0, "BM25 не обновился") |
|
|
|
|
| class TestHybridSearch(unittest.TestCase): |
| """Тест 2: Гибридный поиск BM25 + vector.""" |
|
|
| def setUp(self): |
| self.rag = _fake_rag() |
| |
| _add_fake_docs(self.rag, [ |
| Document(page_content="Регламент 405-А: комиссия за перевод 1.5%", |
| metadata={"source_file": "reglament.pdf", "page": 0}), |
| Document(page_content="Тариф Б-17: ежемесячное обслуживание 990 рублей", |
| metadata={"source_file": "tariff.pdf", "page": 0}), |
| Document(page_content="Кредитная ставка по ипотеке составляет 11.9% годовых для вторичного рынка", |
| metadata={"source_file": "mortgage.pdf", "page": 1}), |
| Document(page_content="Бесплатные услуги: открытие счёта, SMS-уведомления первые 30 дней", |
| metadata={"source_file": "free_services.pdf", "page": 0}), |
| ]) |
|
|
| def tearDown(self): |
| _cleanup(self.rag) |
|
|
| def test_exact_code_found_by_bm25(self): |
| """Точный артикул '405-А' должен быть найден.""" |
| results = self.rag.similarity_search("405-А комиссия") |
| contents = [doc.page_content for doc, _ in results] |
| self.assertTrue( |
| any("405-А" in c for c in contents), |
| f"Артикул 405-А не найден в результатах: {contents}" |
| ) |
|
|
| def test_exact_code_B17_found(self): |
| """Артикул 'Б-17' должен быть найден.""" |
| results = self.rag.similarity_search("Б-17 обслуживание") |
| contents = [doc.page_content for doc, _ in results] |
| self.assertTrue( |
| any("Б-17" in c for c in contents), |
| f"Артикул Б-17 не найден: {contents}" |
| ) |
|
|
| def test_semantic_search_works(self): |
| """Семантический запрос 'жильё кредит процент' должен найти ипотечный документ.""" |
| results = self.rag.similarity_search("жильё кредит процент") |
| contents = " ".join(doc.page_content for doc, _ in results) |
| self.assertIn("ипотеке", contents, "Семантический поиск не работает") |
|
|
| def test_results_have_scores(self): |
| """Каждый результат должен иметь числовой score.""" |
| results = self.rag.similarity_search("комиссия тариф") |
| for doc, score in results: |
| self.assertIsInstance(score, float) |
| self.assertGreaterEqual(score, 0.0) |
| self.assertLessEqual(score, 1.0) |
|
|
| def test_empty_db_returns_empty(self): |
| """Пустая база → пустой список результатов.""" |
| empty_rag = _fake_rag() |
| try: |
| results = empty_rag.similarity_search("любой запрос") |
| self.assertEqual(results, []) |
| finally: |
| _cleanup(empty_rag) |
|
|
|
|
| class TestDiversityFilter(unittest.TestCase): |
| """Тест 3: Per-document diversity — большой PDF не вытесняет маленький.""" |
|
|
| def setUp(self): |
| self.rag = _fake_rag() |
| |
| big_docs = [ |
| Document( |
| page_content=f"Сбербанк тарифный план раздел {i}: комиссия за платёж", |
| metadata={"source_file": "sber_big.pdf", "page": i}, |
| ) |
| for i in range(5) |
| ] |
| small_doc = Document( |
| page_content="Регламент 101-Б: комиссия за платёж между филиалами 0.3%", |
| metadata={"source_file": "reglament_101.pdf", "page": 0}, |
| ) |
| _add_fake_docs(self.rag, big_docs + [small_doc]) |
|
|
| def tearDown(self): |
| _cleanup(self.rag) |
|
|
| def test_small_doc_not_crowded_out(self): |
| """Маленький регламент должен присутствовать в результатах.""" |
| results = self.rag.similarity_search("комиссия за платёж", k=8) |
| sources = {doc.metadata["source_file"] for doc, _ in results} |
| self.assertIn( |
| "reglament_101.pdf", sources, |
| f"Маленький регламент вытеснен! Источники: {sources}" |
| ) |
|
|
| def test_max_chunks_per_doc_respected(self): |
| """С одного файла не должно приходить больше MAX_CHUNKS_PER_DOC чанков.""" |
| results = self.rag.similarity_search("комиссия за платёж", k=8) |
| from collections import Counter |
| per_doc = Counter(doc.metadata["source_file"] for doc, _ in results) |
| for fname, count in per_doc.items(): |
| self.assertLessEqual( |
| count, MAX_CHUNKS_PER_DOC, |
| f"{fname} превышает лимит: {count} > {MAX_CHUNKS_PER_DOC}" |
| ) |
|
|
|
|
| class TestSimilarityThreshold(unittest.TestCase): |
| """Тест 4: Порог схожести отсекает мусор.""" |
|
|
| def setUp(self): |
| self.rag = _fake_rag() |
| _add_fake_docs(self.rag, [ |
| Document( |
| page_content="Ипотечная ставка на новостройки 10.9% при взносе от 20%", |
| metadata={"source_file": "mortgage.pdf", "page": 0}, |
| ), |
| ]) |
|
|
| def tearDown(self): |
| _cleanup(self.rag) |
|
|
| def test_relevant_query_returns_result(self): |
| """Релевантный запрос должен вернуть результат.""" |
| results = self.rag.similarity_search("ипотека ставка новостройка") |
| self.assertGreater(len(results), 0) |
|
|
| def test_completely_unrelated_returns_best_anyway(self): |
| """Даже нерелевантный запрос возвращает хоть 1 чанк (fallback).""" |
| results = self.rag.similarity_search("погода в антарктиде сегодня") |
| |
| self.assertGreaterEqual(len(results), 0) |
|
|
| def test_scores_above_threshold(self): |
| """Все возвращённые чанки (кроме fallback) должны быть выше порога.""" |
| results = self.rag.similarity_search("ипотека ставка", threshold=SIMILARITY_THRESHOLD) |
| if len(results) > 1: |
| for _, score in results: |
| self.assertGreaterEqual( |
| score, SIMILARITY_THRESHOLD - 0.01, |
| f"Чанк с score={score:.2%} ниже порога {SIMILARITY_THRESHOLD:.0%}" |
| ) |
|
|
|
|
| class TestConflictDetection(unittest.TestCase): |
| """Тест 5: Детектор конфликтов между источниками.""" |
|
|
| def setUp(self): |
| self.rag = _fake_rag() |
|
|
| def tearDown(self): |
| _cleanup(self.rag) |
|
|
| def test_conflicting_rates_detected(self): |
| """Разные ставки в разных файлах → конфликт обнаружен.""" |
| docs = [ |
| (Document(page_content="комиссия за перевод составляет 1.5% от суммы", |
| metadata={"source_file": "tariff_2023.pdf", "page": 0}), 0.9), |
| (Document(page_content="комиссия за перевод составляет 2.0% от суммы", |
| metadata={"source_file": "tariff_2024.pdf", "page": 0}), 0.88), |
| ] |
| conflicts = self.rag._detect_conflicts(docs) |
| self.assertGreater(len(conflicts), 0, "Конфликт не обнаружен!") |
| combined = " ".join(conflicts) |
| self.assertTrue( |
| "1.5" in combined or "1,5" in combined or "2.0" in combined or "2,0" in combined |
| ) |
|
|
| def test_same_values_no_conflict(self): |
| """Одинаковые значения в разных файлах — конфликта нет.""" |
| docs = [ |
| (Document(page_content="ставка по вкладу 12% годовых", |
| metadata={"source_file": "a.pdf", "page": 0}), 0.9), |
| (Document(page_content="ставка по вкладу 12% годовых", |
| metadata={"source_file": "b.pdf", "page": 0}), 0.85), |
| ] |
| conflicts = self.rag._detect_conflicts(docs) |
| self.assertEqual(conflicts, [], f"Ложный конфликт: {conflicts}") |
|
|
| def test_single_source_no_conflict(self): |
| """Один источник — конфликтов нет по определению.""" |
| docs = [ |
| (Document(page_content="комиссия 3.5% годовых", |
| metadata={"source_file": "single.pdf", "page": 0}), 0.9), |
| (Document(page_content="минимальный платёж 500 рублей", |
| metadata={"source_file": "single.pdf", "page": 1}), 0.8), |
| ] |
| conflicts = self.rag._detect_conflicts(docs) |
| self.assertEqual(conflicts, []) |
|
|
| def test_conflicts_deduplicated(self): |
| """Одинаковые конфликты не дублируются.""" |
| docs = [ |
| (Document(page_content="тариф комиссия перевод 1.5% за операцию", |
| metadata={"source_file": "x.pdf", "page": 0}), 0.9), |
| (Document(page_content="тариф комиссия перевод 1.5% за операцию", |
| metadata={"source_file": "x.pdf", "page": 1}), 0.8), |
| (Document(page_content="тариф комиссия перевод 2.5% за операцию", |
| metadata={"source_file": "y.pdf", "page": 0}), 0.85), |
| ] |
| conflicts = self.rag._detect_conflicts(docs) |
| self.assertEqual(len(conflicts), len(set(conflicts)), "Конфликты дублируются") |
|
|
|
|
| class TestRAGAnswer(unittest.TestCase): |
| """Тест 6: Структура RAGAnswer и её строковое представление.""" |
|
|
| def test_str_with_sources(self): |
| ans = RAGAnswer( |
| answer="Комиссия 5 рублей.", |
| sources=[ |
| {"file": "doc.pdf", "page": 3, "score": 0.87, "snippet": "..."}, |
| {"file": "doc.pdf", "page": 3, "score": 0.87, "snippet": "..."}, |
| ], |
| conflicts=[], |
| ) |
| text = str(ans) |
| self.assertIn("Комиссия 5 рублей.", text) |
| self.assertIn("doc.pdf", text) |
| self.assertIn("87%", text) |
| |
| self.assertEqual(text.count("doc.pdf"), 1) |
|
|
| def test_str_with_conflicts(self): |
| ans = RAGAnswer( |
| answer="Ставка неоднозначна.", |
| sources=[{"file": "a.pdf", "page": 1, "score": 0.9, "snippet": ""}], |
| conflicts=["«4.5%» в a.pdf vs «4.7%» в b.pdf"], |
| ) |
| text = str(ans) |
| self.assertIn("ПРОТИВОРЕЧИЯ", text) |
| self.assertIn("4.5%", text) |
|
|
| def test_empty_sources(self): |
| ans = RAGAnswer(answer="Информация не найдена", sources=[], conflicts=[]) |
| text = str(ans) |
| self.assertIn("Информация не найдена", text) |
|
|
|
|
| class TestAppHandlers(unittest.TestCase): |
| """Тест 7: Обработчики app.py (без реального LLM и сети).""" |
|
|
| def test_index_pdf_no_files(self): |
| """Вызов index_pdf без файлов → предупреждение, не падает.""" |
| import app |
| result = app.index_pdf([], "ollama", "") |
| self.assertIn("⚠", result) |
|
|
| def test_ask_empty_question(self): |
| """Пустой вопрос → подсказка, не падает.""" |
| import app |
| answer, sources = app.ask("", "ollama", "") |
| self.assertIn("Введите", answer) |
| self.assertEqual(sources, "") |
|
|
| def test_ask_whitespace_question(self): |
| """Вопрос из пробелов → подсказка, не падает.""" |
| import app |
| answer, sources = app.ask(" \n ", "ollama", "") |
| self.assertIn("Введите", answer) |
|
|
| def test_toggle_key_visibility_openai(self): |
| """При выборе openai поле API-ключа становится visible.""" |
| import app, gradio as gr |
| result = app.toggle_key_visibility("openai") |
| self.assertEqual(result["visible"], True) |
|
|
| def test_toggle_key_visibility_ollama(self): |
| """При выборе ollama поле API-ключа скрыто.""" |
| import app |
| result = app.toggle_key_visibility("ollama") |
| self.assertEqual(result["visible"], False) |
|
|
| def test_clear_db_no_error(self): |
| """clear_db не должен падать даже если база не существует.""" |
| import app |
| result = app.clear_db("ollama", "") |
| self.assertIn("очищена", result.lower()) |
|
|
|
|
| class TestEdgeCases(unittest.TestCase): |
| """Тест 8: Краевые случаи.""" |
|
|
| def setUp(self): |
| self.rag = _fake_rag() |
|
|
| def tearDown(self): |
| _cleanup(self.rag) |
|
|
| def test_unicode_query(self): |
| """Запрос с кириллицей и спецсимволами.""" |
| _add_fake_docs(self.rag, [ |
| Document(page_content="Вклад «Классический» 12% годовых", |
| metadata={"source_file": "t.pdf", "page": 0}) |
| ]) |
| results = self.rag.similarity_search("Вклад «Классический» — ставка?") |
| self.assertGreater(len(results), 0) |
|
|
| def test_very_long_query(self): |
| """Длинный запрос не вызывает ошибку.""" |
| _add_fake_docs(self.rag, [ |
| Document(page_content="комиссия 1 рубль", |
| metadata={"source_file": "t.pdf", "page": 0}) |
| ]) |
| long_q = "комиссия " * 100 |
| try: |
| results = self.rag.similarity_search(long_q) |
| |
| except Exception as e: |
| self.fail(f"Длинный запрос вызвал ошибку: {e}") |
|
|
| def test_duplicate_pdf_no_duplicate_bm25(self): |
| """Повторная загрузка одного PDF не дублирует BM25-индекс безгранично.""" |
| pdf = Path(__file__).parent / "test_pdfs" / "novobank_handbook_2024.pdf" |
| if not pdf.exists(): |
| self.skipTest("test_pdfs/novobank_handbook_2024.pdf не найден") |
| self.rag.add_documents(str(pdf)) |
| count_1 = len(self.rag._bm25_docs) |
| self.rag._rebuild_bm25() |
| count_2 = len(self.rag._bm25_docs) |
| self.assertEqual(count_1, count_2, "BM25 дублирует документы при перестройке") |
|
|
| def test_get_stats_returns_dict(self): |
| """get_stats возвращает словарь с нужными ключами.""" |
| stats = self.rag.get_stats() |
| self.assertIsInstance(stats, dict) |
| self.assertIn("total_chunks", stats) |
| self.assertIn("collection", stats) |
| self.assertIn("persist_dir", stats) |
|
|
| def test_resolve_pdf_paths_single_file(self): |
| """Путь к файлу .pdf возвращает список из одного элемента.""" |
| pdf = Path(__file__).parent / "test_pdfs" / "novobank_handbook_2024.pdf" |
| if not pdf.exists(): |
| self.skipTest("PDF не найден") |
| result = RAGSystem._resolve_pdf_paths(str(pdf)) |
| self.assertEqual(len(result), 1) |
| self.assertEqual(result[0], str(pdf)) |
|
|
| def test_resolve_pdf_paths_directory(self): |
| """Путь к директории с PDF возвращает все .pdf файлы.""" |
| d = Path(__file__).parent / "test_pdfs" |
| if not d.exists(): |
| self.skipTest("test_pdfs не найдена") |
| result = RAGSystem._resolve_pdf_paths(str(d)) |
| self.assertGreater(len(result), 0) |
| for r in result: |
| self.assertTrue(r.endswith(".pdf")) |
|
|
| def test_resolve_pdf_paths_nonexistent(self): |
| """Несуществующий путь → пустой список.""" |
| result = RAGSystem._resolve_pdf_paths("/nonexistent/path/xyz") |
| self.assertEqual(result, []) |
|
|
|
|
| |
|
|
| def run_tests(): |
| import warnings |
| warnings.filterwarnings("ignore") |
|
|
| suites = [ |
| ("📄 Загрузка PDF", TestPDFLoading), |
| ("🔍 Гибридный поиск", TestHybridSearch), |
| ("⚖️ Diversity filter", TestDiversityFilter), |
| ("📊 Similarity threshold", TestSimilarityThreshold), |
| ("⚠️ Детектор конфликтов", TestConflictDetection), |
| ("📦 RAGAnswer структура", TestRAGAnswer), |
| ("🌐 App обработчики", TestAppHandlers), |
| ("🔧 Краевые случаи", TestEdgeCases), |
| ] |
|
|
| total_passed = total_failed = total_errors = 0 |
| results_summary = [] |
|
|
| for suite_name, test_class in suites: |
| loader = unittest.TestLoader() |
| suite = loader.loadTestsFromTestCase(test_class) |
| runner = unittest.TextTestRunner(verbosity=0, stream=open("/dev/null", "w")) |
| result = runner.run(suite) |
|
|
| passed = result.testsRun - len(result.failures) - len(result.errors) - len(result.skipped) |
| failed = len(result.failures) |
| errors = len(result.errors) |
| skipped = len(result.skipped) |
|
|
| total_passed += passed |
| total_failed += failed |
| total_errors += errors |
|
|
| if failed == 0 and errors == 0: |
| status = f"{GREEN}✓ PASS{RESET}" |
| else: |
| status = f"{RED}✗ FAIL{RESET}" |
|
|
| skip_note = f" ({skipped} пропущено)" if skipped else "" |
| line = f" {status} {suite_name:<30} {passed}/{result.testsRun - skipped}{skip_note}" |
| results_summary.append((line, result.failures + result.errors)) |
|
|
| print(line) |
|
|
| for fail in result.failures + result.errors: |
| test_name = str(fail[0]).split(" ")[0] |
| |
| last_line = [l.strip() for l in fail[1].splitlines() if l.strip()][-1] |
| print(f" {YELLOW}↳ {test_name}: {last_line}{RESET}") |
|
|
| total = total_passed + total_failed + total_errors |
| print() |
| print("─" * 55) |
| if total_failed == 0 and total_errors == 0: |
| print(f"{BOLD}{GREEN}✅ Все тесты прошли: {total_passed}/{total}{RESET}") |
| else: |
| print(f"{BOLD}{RED}❌ Провалено: {total_failed + total_errors}, " |
| f"Прошло: {total_passed}/{total}{RESET}") |
| print("─" * 55) |
|
|
| return total_failed + total_errors |
|
|
|
|
| if __name__ == "__main__": |
| print(f"\n{BOLD}{'─'*55}") |
| print(" RAG System — Полный тест-сьют") |
| print(f"{'─'*55}{RESET}\n") |
|
|
| import warnings |
| warnings.filterwarnings("ignore") |
|
|
| exit_code = run_tests() |
| sys.exit(exit_code) |
|
|