import os import re import glob from pathlib import Path from dataclasses import dataclass, field from collections import defaultdict from typing import Literal import pdfplumber import docx as python_docx from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_huggingface import HuggingFaceEmbeddings from langchain_chroma import Chroma from langchain_core.prompts import PromptTemplate from langchain_core.output_parsers import StrOutputParser from langchain_core.documents import Document from langchain_community.retrievers import BM25Retriever # ───────────────────────────────────────────── # Конфигурация # ───────────────────────────────────────────── CHROMA_PERSIST_DIR = "/tmp/chroma_db" if os.getenv("SPACE_ID") else str(Path(__file__).parent / "chroma_db") EMBEDDING_MODEL = ( "intfloat/multilingual-e5-small" # HF Spaces: быстро (~1 мин на 500 стр) if os.getenv("SPACE_ID") else "intfloat/multilingual-e5-large" # Локально: высокая точность ) COLLECTION_NAME = "rag_docs" CHUNK_SIZE = 600 # Уменьшен: таблицы влезают целиком, коды не теряются CHUNK_OVERLAP = 100 TOP_K = 12 # Берём больше кандидатов до фильтрации MAX_CHUNKS_PER_DOC = 8 # Лимит чанков с одного файла — защита от «больших» PDF SIMILARITY_THRESHOLD = 0.35 # Порог для векторного поиска BM25_WEIGHT = 0.40 # Вес BM25 в ансамбле (0.40 BM25 + 0.60 vector) STRICT_PROMPT_TEMPLATE = """Ты — точный аналитик документов. Работаешь с любыми типами документов: финансовыми, юридическими, техническими, медицинскими, кулинарными и другими. ГЛАВНОЕ ПРАВИЛО: отвечай СТРОГО на основе текста документа — ни слова от себя. Никогда не отказывайся отвечать. Если информации нет — пиши ТОЛЬКО «Информация не найдена (отсутствует в документе)». ШАГ 1 — ФАКТЫ ИЗ ДОКУМЕНТА: • Выпиши дословно ВСЕ пункты, относящиеся к вопросу (с номером раздела/статьи). • К каждому пункту выпиши ВСЕ условия, исключения и оговорки, указанные рядом с ним. (льготные периоды, минимальные суммы, пороги, скидки, ограничения) ШАГ 2 — АНАЛИЗ И ВЫЧИСЛЕНИЯ: Если вопрос требует расчётов: • Запиши формулу/логику точно как в документе. • Проверь ВСЕ условия из ШАГ 1: льготные периоды, пороги, минимумы. • При расчёте за несколько периодов: - Исключи льготные/бесплатные периоды из платных - Итого = (платные периоды × ставка) + разовые платежи • Проверь граничные условия (≤N vs >N — могут быть разные ставки). • Сравни результат с минимальным значением, если оно указано в документе. • Подставь числа и вычисли пошагово. Если расчётов нет — пропусти этот шаг. ШАГ 3 — ПРОВЕРКА: • Каждый вывод должен опираться на конкретный пункт из ШАГ 1 — без домыслов. • Если в ШАГ 1 нет факта подтверждающего утверждение — это означает «Информация не найдена», а НЕ «да» или «нет». • Вопросы типа «есть ли X» — отвечай «да» только если X явно присутствует в тексте. • Условия одного пункта НЕ переносятся на другой пункт. • При сравнении вариантов — считай полную стоимость каждого с учётом всех условий. • Если вопрос о конкретном разделе — используй только его. • Цифры и названия цитируй дословно, не округляй и не перефразируй. ШАГ 4 — ИТОГОВЫЙ ОТВЕТ: Дай прямой ответ на вопрос. Не повторяй контекст. Если нужной информации нет — «Информация не найдена (отсутствует в документе)». --- Контекст: {context} Вопрос: {question} Ответ (выполни шаги 1–4):""" # ───────────────────────────────────────────── # Структура ответа # ───────────────────────────────────────────── @dataclass class RAGAnswer: answer: str sources: list[dict] # [{"file": ..., "page": ..., "score": ...}, ...] conflicts: list[str] = field(default_factory=list) # Предупреждения о противоречиях reasoning: str = "" # Полный CoT-вывод (для отладки и UI) def __str__(self) -> str: lines = [f"Ответ:\n{self.answer}"] if self.conflicts: lines.append("") lines.append("⚠️ ОБНАРУЖЕНЫ ПРОТИВОРЕЧИЯ МЕЖДУ ИСТОЧНИКАМИ:") for c in self.conflicts: lines.append(f" {c}") lines.append("") lines.append("Источники (релевантность):") seen: set[tuple] = set() for src in self.sources: key = (src["file"], src["page"]) if key not in seen: seen.add(key) score_str = f"{src['score']:.0%}" if src.get("score") is not None else "—" lines.append(f" • {src['file']}, стр. {src['page']} [{score_str}]") return "\n".join(lines) # ───────────────────────────────────────────── # Инициализация базы данных # ───────────────────────────────────────────── class RAGSystem: """ Основной класс RAG-системы. Параметры ---------- llm_provider : "openai" | "ollama" | "groq" | "gemini" Выбор языковой модели. openai_model : str Название модели OpenAI (используется при llm_provider="openai"). ollama_model : str Название модели Ollama (используется при llm_provider="ollama"). openai_api_key : str | None API-ключ. Если не указан, берётся из переменной окружения (OPENAI_API_KEY / GROQ_API_KEY / GOOGLE_API_KEY). ollama_base_url : str URL локального сервера Ollama. """ def __init__( self, llm_provider: Literal["openai", "ollama", "groq", "gemini"] = "openai", openai_model: str = "gpt-4o-mini", ollama_model: str = "llama3.1:8b", openai_api_key: str | None = None, ollama_base_url: str = "http://localhost:11434", ) -> None: self.llm_provider = llm_provider # Embeddings (локально, данные не покидают машину) print(f"[RAG] Загрузка модели эмбеддингов: {EMBEDDING_MODEL} ...") self.embeddings = HuggingFaceEmbeddings( model_name=EMBEDDING_MODEL, model_kwargs={"device": "cpu"}, encode_kwargs={"normalize_embeddings": True}, ) # Векторная БД (Persistent ChromaDB) self.vectorstore = self._init_vectorstore() # Text Splitter self.splitter = RecursiveCharacterTextSplitter( chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP, separators=["\n\n", "\n", ".", " ", ""], ) # BM25-индекс (кешируется, пересобирается только после add_documents) self._bm25_docs: list[Document] = [] self._bm25_retriever = None self._rebuild_bm25() # LLM self.llm = self._init_llm( llm_provider, openai_model, openai_api_key, ollama_model, ollama_base_url, ) # Prompt self.prompt = PromptTemplate( input_variables=["context", "question"], template=STRICT_PROMPT_TEMPLATE, ) print("[RAG] Система готова к работе.") # ── Вспомогательные методы инициализации ────────────────────────────── def _init_vectorstore(self) -> Chroma: """Создаёт ChromaDB: in-memory на HuggingFace Spaces, persistent локально.""" import chromadb if os.getenv("SPACE_ID"): # HuggingFace Spaces: файловая система read-only — используем RAM client = chromadb.EphemeralClient() print("[RAG] ChromaDB режим: in-memory (HuggingFace Spaces)") else: Path(CHROMA_PERSIST_DIR).mkdir(parents=True, exist_ok=True) client = chromadb.PersistentClient(path=CHROMA_PERSIST_DIR) vectorstore = Chroma( collection_name=COLLECTION_NAME, embedding_function=self.embeddings, client=client, ) count = vectorstore._collection.count() print(f"[RAG] ChromaDB инициализирована: {count} чанков в базе.") return vectorstore @staticmethod def _init_llm( provider: str, openai_model: str, openai_api_key: str | None, ollama_model: str, ollama_base_url: str, ): """Создаёт объект LLM в зависимости от выбранного провайдера.""" if provider == "openai": from langchain_openai import ChatOpenAI key = openai_api_key or os.getenv("OPENAI_API_KEY") if not key: raise ValueError( "Не задан OpenAI API-ключ. " "Передайте openai_api_key= или установите OPENAI_API_KEY." ) print(f"[RAG] LLM: ChatOpenAI ({openai_model})") return ChatOpenAI(model=openai_model, api_key=key, temperature=0, request_timeout=60) elif provider == "ollama": from langchain_ollama import OllamaLLM print(f"[RAG] LLM: Ollama ({ollama_model}) @ {ollama_base_url}") return OllamaLLM(model=ollama_model, base_url=ollama_base_url, temperature=0) elif provider == "groq": from langchain_groq import ChatGroq key = openai_api_key or os.getenv("GROQ_API_KEY") if not key: raise ValueError("Не задан Groq API-ключ. Передайте openai_api_key= или установите GROQ_API_KEY.") print(f"[RAG] LLM: Groq (llama-3.3-70b-versatile)") return ChatGroq(model="llama-3.3-70b-versatile", api_key=key, temperature=0, request_timeout=60) elif provider == "gemini": from langchain_google_genai import ChatGoogleGenerativeAI key = openai_api_key or os.getenv("GOOGLE_API_KEY") if not key: raise ValueError("Не задан Google API-ключ.") print(f"[RAG] LLM: Gemini 1.5 Flash") return ChatGoogleGenerativeAI(model="gemini-1.5-flash-latest", google_api_key=key, temperature=0, request_timeout=60) else: raise ValueError(f"Неизвестный провайдер LLM: '{provider}'. Используйте 'openai', 'ollama', 'groq' или 'gemini'.") def switch_llm(self, provider: str, openai_api_key: str | None = None) -> None: """Меняет только LLM-провайдер, не трогая vectorstore и BM25-индекс.""" self.llm = self._init_llm( provider, "gpt-4o-mini", openai_api_key, "llama3.1:8b", "http://localhost:11434" ) self.llm_provider = provider print(f"[RAG] Провайдер переключён на: {provider}") # ── Публичный API ────────────────────────────────────────────────────── def add_documents(self, path: str) -> int: """ Загружает PDF и DOCX, разбивает на чанки и сохраняет в ChromaDB. """ file_paths = self._resolve_doc_paths(path) if not file_paths: print(f"[RAG] Файлы не найдены по пути: {path}") return 0 all_chunks: list[Document] = [] for file_path in file_paths: fname = Path(file_path).name # Проверка на дубликат: пропускаем, если файл уже в базе existing = self.vectorstore._collection.get( where={"source_file": fname}, limit=1, include=[] ) if existing["ids"]: print(f"[RAG] Пропуск {fname} — уже в базе.") continue print(f"[RAG] Загрузка файла: {file_path}") try: ext = Path(file_path).suffix.lower() if ext == ".pdf": pages = self._load_pdf_with_tables(file_path) elif ext in (".docx", ".doc"): pages = self._load_docx(file_path) else: print(f"[RAG] Неподдерживаемый формат: {ext}") continue chunks = self.splitter.split_documents(pages) all_chunks.extend(chunks) print(f" -> {len(pages)} стр., {len(chunks)} чанков") except Exception as exc: print(f"[RAG] Ошибка при обработке {file_path}: {exc}") if not all_chunks: print("[RAG] Нет чанков для добавления.") return 0 # Батчинг — по 100 чанков за раз (стабильно для любого размера документа) batch_size = 100 print(f"[RAG] Индексация {len(all_chunks)} чанков батчами по {batch_size} ...") for i in range(0, len(all_chunks), batch_size): batch = all_chunks[i:i + batch_size] self.vectorstore.add_documents(batch) print(f"[RAG] {min(i + batch_size, len(all_chunks))}/{len(all_chunks)} чанков") total = self.vectorstore._collection.count() self._rebuild_bm25() print(f"[RAG] Готово. Всего в базе: {total} чанков.") return len(all_chunks) def similarity_search( self, query: str, k: int = TOP_K, threshold: float = SIMILARITY_THRESHOLD, ) -> list[tuple[Document, float]]: """ Гибридный поиск: BM25 (точные коды/артикулы) + vector (семантика). После объединения применяет: 1. Порог схожести (threshold) по векторной оценке 2. Per-document diversity (MAX_CHUNKS_PER_DOC чанков с одного файла) """ # ── Векторный поиск со скорами ──────────────────────────────────── vector_scored: list[tuple[Document, float]] = ( self.vectorstore.similarity_search_with_relevance_scores(query, k=k) ) vector_map: dict[str, float] = { doc.page_content: score for doc, score in vector_scored } # ── BM25 поиск (используем кешированный retriever) ─────────────── bm25_docs: list[Document] = [] if self._bm25_retriever is not None: bm25_docs = self._bm25_retriever.invoke(query) # ── Объединение: union с дедупликацией ──────────────────────────── seen_content: set[str] = set() merged: list[tuple[Document, float]] = [] for doc, score in vector_scored: if doc.page_content not in seen_content: seen_content.add(doc.page_content) merged.append((doc, score)) # BM25-кандидаты: score из вектора или BM25_WEIGHT (чтобы пройти threshold) for doc in bm25_docs: if doc.page_content not in seen_content: seen_content.add(doc.page_content) score = vector_map.get(doc.page_content, BM25_WEIGHT) merged.append((doc, score)) # ── Порог схожести ──────────────────────────────────────────────── filtered = [(d, s) for d, s in merged if s >= threshold] if not filtered and merged: best = max(merged, key=lambda x: x[1]) print( f"[RAG] ⚠ Порог {threshold:.0%} не пройден. " f"Лучший score={best[1]:.0%}. Используем 1 чанк." ) filtered = [best] else: dropped = len(merged) - len(filtered) if dropped: print(f"[RAG] Отфильтровано {dropped} чанков (score < {threshold:.0%}).") # ── Per-document diversity ───────────────────────────────────────── per_doc: dict[str, int] = defaultdict(int) diverse: list[tuple[Document, float]] = [] for doc, score in sorted(filtered, key=lambda x: -x[1]): fname = doc.metadata.get("source_file", "?") if per_doc[fname] < MAX_CHUNKS_PER_DOC: diverse.append((doc, score)) per_doc[fname] += 1 return diverse def _detect_conflicts(self, docs_with_scores: list[tuple[Document, float]]) -> list[str]: """ Ищет противоречия в числовых значениях между разными PDF-источниками. Алгоритм: 1. Группирует текст чанков по файлу-источнику. 2. Извлекает все числовые значения (%, руб.) с ближайшим контекстом. 3. Если одно и то же ключевое слово встречается с разными числами в разных файлах — фиксирует конфликт. """ by_file: dict[str, str] = defaultdict(str) for doc, _ in docs_with_scores: fname = doc.metadata.get("source_file", "unknown") by_file[fname] += "\n" + doc.page_content if len(by_file) < 2: return [] # Паттерн: (контекст до 50 символов)(число)(единица измерения) pat = re.compile( r'([^\n]{0,60}?)' # контекст слева r'(\d+[,.]?\d*)' # числовое значение r'\s*(%|руб\.?|рублей)', # единица re.IGNORECASE, ) # Стоп-слова, не несущие смысловой нагрузки STOP = {'', 'и', 'в', 'на', 'с', 'от', 'до', 'за', 'по', 'для', 'не', 'или', 'а', 'но', 'при', 'если', 'то', 'как', 'что'} def key_words(text: str) -> frozenset[str]: return frozenset(w.lower() for w in re.split(r'\W+', text) if w and w.lower() not in STOP) # Извлекаем факты по файлам: {file: [(keywords, value, unit), ...]} facts: dict[str, list[tuple[frozenset, str, str]]] = {} for fname, text in by_file.items(): facts[fname] = [ (key_words(ctx), val, unit) for ctx, val, unit in pat.findall(text) ] conflicts: list[str] = [] files = list(facts.keys()) for i in range(len(files)): for j in range(i + 1, len(files)): f1, f2 = files[i], files[j] for kw1, val1, unit1 in facts[f1]: for kw2, val2, unit2 in facts[f2]: # Единицы совпадают, значения различаются if unit1.rstrip('.') != unit2.rstrip('.'): continue if val1 == val2: continue # Контексты достаточно похожи (≥ 3 общих ключевых слова) if len(kw1 & kw2) >= 3: conflicts.append( f"«{val1}{unit1}» в {f1} vs «{val2}{unit2}» в {f2}" f" (общий контекст: {', '.join(sorted(kw1 & kw2)[:4])})" ) # Дедупликация return list(dict.fromkeys(conflicts)) def ask_question(self, query: str) -> RAGAnswer: """ Принимает вопрос, извлекает релевантный контекст и генерирует ответ. Возвращает RAGAnswer с: - answer — ответ с цепочкой рассуждений - sources — источники с оценкой релевантности - conflicts — предупреждения о противоречиях между PDF """ # 1. Поиск с фильтрацией по порогу docs_with_scores = self.similarity_search(query) if not docs_with_scores: return RAGAnswer(answer="Информация не найдена", sources=[], conflicts=[]) # 2. Детектирование конфликтов между источниками conflicts = self._detect_conflicts(docs_with_scores) if conflicts: print(f"[RAG] Обнаружено противоречий: {len(conflicts)}") # 3. Формирование контекста (с разделителями по источникам) context_parts = [] for doc, score in docs_with_scores: fname = doc.metadata.get("source_file", "?") page = doc.metadata.get("page", 0) + 1 context_parts.append( f"[Источник: {fname}, стр. {page}, релевантность: {score:.0%}]\n" + doc.page_content ) context_text = "\n\n---\n\n".join(context_parts) # 4. Цепочка: prompt → LLM → парсер chain = self.prompt | self.llm | StrOutputParser() raw_answer = chain.invoke({"context": context_text, "question": query}) answer = self._extract_final_answer(raw_answer) # 5. Формирование источников sources = [] for doc, score in docs_with_scores: meta = doc.metadata sources.append({ "file": meta.get("source_file") or Path(meta.get("source", "unknown")).name, "page": meta.get("page", 0) + 1, "score": score, "snippet": doc.page_content[:120].replace("\n", " "), }) return RAGAnswer(answer=answer.strip(), sources=sources, conflicts=conflicts, reasoning=raw_answer.strip()) def get_stats(self) -> dict: """Возвращает статистику по базе данных.""" count = self.vectorstore._collection.count() return {"collection": COLLECTION_NAME, "total_chunks": count, "persist_dir": CHROMA_PERSIST_DIR} # ── Приватные утилиты ───────────────────────────────────────────────── def _rebuild_bm25(self) -> None: """Пересобирает и кеширует BM25-индекс из всех документов в ChromaDB.""" try: data = self.vectorstore._collection.get(include=["documents", "metadatas"]) self._bm25_docs = [ Document(page_content=text, metadata=meta) for text, meta in zip(data["documents"], data["metadatas"]) if text and text.strip() ] self._bm25_retriever = ( BM25Retriever.from_documents(self._bm25_docs, k=TOP_K) if self._bm25_docs else None ) print(f"[RAG] BM25 индекс: {len(self._bm25_docs)} документов.") except Exception as e: print(f"[RAG] BM25 rebuild error: {e}") self._bm25_docs = [] self._bm25_retriever = None @staticmethod def _extract_final_answer(text: str) -> str: """Извлекает только раздел «ИТОГОВЫЙ ОТВЕТ» из полного CoT-вывода.""" m = re.search( r'(?:ШАГ\s*4\s*[—–\-]\s*)?ИТОГОВЫЙ\s*ОТВЕТ\s*:?\s*\n+(.*)', text, re.IGNORECASE | re.DOTALL, ) if m: return m.group(1).strip() return text.strip() @staticmethod def _load_pdf_with_tables(pdf_path: str) -> list[Document]: """ Загружает PDF через pdfplumber. Таблицы форматируются как «Номер Услуга → Стоимость» — связь сохраняется при чанкинге. """ fname = Path(pdf_path).name docs: list[Document] = [] with pdfplumber.open(pdf_path) as pdf: for page_num, page in enumerate(pdf.pages): parts: list[str] = [] # 1. Обычный текст страницы text = page.extract_text() or "" if text.strip(): parts.append(text.strip()) # 2. Таблицы for table in page.extract_tables(): if not table: continue # Убираем полностью пустые столбцы n_cols = max(len(r) for r in table) active_cols = [ ci for ci in range(n_cols) if any( (str(row[ci] or "").strip() if ci < len(row) else "") for row in table ) ] clean = [] for row in table: clean.append([ str(row[ci] or "").strip().replace("\n", " ") if ci < len(row) else "" for ci in active_cols ]) rows: list[str] = [] for row in clean: non_empty = [v for v in row if v] if not non_empty: continue # Формат: все непустые ячейки через " → " # Последняя ячейка = цена/стоимость if len(non_empty) == 1: rows.append(non_empty[0]) elif len(non_empty) == 2: rows.append(f"{non_empty[0]}: {non_empty[1]}") else: # Первая ячейка = номер пункта, средние = описание, последняя = цена num = non_empty[0] desc = " ".join(non_empty[1:-1]) price = non_empty[-1] rows.append(f"{num} {desc}: {price}") if rows: parts.append("[ТАБЛИЦА]\n" + "\n".join(rows)) combined = "\n\n".join(parts).strip() if combined: docs.append(Document( page_content=combined, metadata={"source_file": fname, "page": page_num}, )) return docs @staticmethod def _resolve_doc_paths(path: str) -> list[str]: """Разворачивает путь к файлу или папке в список PDF/DOCX путей.""" p = Path(path) if p.is_file() and p.suffix.lower() in (".pdf", ".docx", ".doc"): return [str(p)] if p.is_dir(): found = [] for ext in ("*.pdf", "*.docx", "*.doc"): found.extend(glob.glob(str(p / "**" / ext), recursive=True)) return sorted(found) return [] # Обратная совместимость @staticmethod def _resolve_pdf_paths(path: str) -> list[str]: return RAGSystem._resolve_doc_paths(path) @staticmethod def _load_docx(file_path: str) -> list[Document]: """Загружает DOCX: текст параграфов + таблицы.""" fname = Path(file_path).name doc = python_docx.Document(file_path) docs: list[Document] = [] parts: list[str] = [] for para in doc.paragraphs: text = para.text.strip() if text: parts.append(text) for table in doc.tables: rows: list[str] = [] for row in table.rows: cells = [c.text.strip().replace("\n", " ") for c in row.cells if c.text.strip()] if cells: rows.append(" | ".join(cells)) if rows: parts.append("[ТАБЛИЦА]\n" + "\n".join(rows)) # Разбиваем на страницы по ~3000 символов combined = "\n\n".join(parts) chunk_size = 3000 for i, start in enumerate(range(0, max(len(combined), 1), chunk_size)): piece = combined[start:start + chunk_size].strip() if piece: docs.append(Document( page_content=piece, metadata={"source_file": fname, "page": i}, )) return docs # ───────────────────────────────────────────── # Точка входа / демонстрация использования # ───────────────────────────────────────────── if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="RAG-чатбот для документов") parser.add_argument("--provider", choices=["openai", "ollama", "groq", "gemini"], default="ollama", help="Провайдер LLM (openai, ollama, groq или gemini)") parser.add_argument("--index", metavar="PDF_DIR", help="Папка с PDF для индексации") parser.add_argument("--question", "-q", metavar="ВОПРОС", help="Задать вопрос системе") parser.add_argument("--stats", action="store_true", help="Показать статистику базы данных") args = parser.parse_args() # Инициализация системы rag = RAGSystem(llm_provider=args.provider) if args.stats: print(rag.get_stats()) if args.index: added = rag.add_documents(args.index) print(f"\nДобавлено чанков: {added}") if args.question: print(f"\nВопрос: {args.question}") result = rag.ask_question(args.question) print("\n" + str(result)) # Интерактивный режим, если не переданы аргументы if not args.index and not args.question and not args.stats: print("\nИнтерактивный режим. Введите 'выход' для завершения.") while True: query = input("\nВопрос: ").strip() if query.lower() in ("выход", "exit", "quit", "q"): break if not query: continue result = rag.ask_question(query) print("\n" + str(result))