|
|
| import os |
| import re |
| import glob |
| from pathlib import Path |
| from dataclasses import dataclass, field |
| from collections import defaultdict |
| from typing import Literal |
|
|
| import pdfplumber |
| import docx as python_docx |
| from langchain_text_splitters import RecursiveCharacterTextSplitter |
| from langchain_huggingface import HuggingFaceEmbeddings |
| from langchain_chroma import Chroma |
| from langchain_core.prompts import PromptTemplate |
| from langchain_core.output_parsers import StrOutputParser |
| from langchain_core.documents import Document |
| from langchain_community.retrievers import BM25Retriever |
|
|
|
|
| |
| |
| |
| CHROMA_PERSIST_DIR = "/tmp/chroma_db" if os.getenv("SPACE_ID") else str(Path(__file__).parent / "chroma_db") |
| EMBEDDING_MODEL = ( |
| "intfloat/multilingual-e5-small" |
| if os.getenv("SPACE_ID") else |
| "intfloat/multilingual-e5-large" |
| ) |
| COLLECTION_NAME = "rag_docs" |
| CHUNK_SIZE = 600 |
| CHUNK_OVERLAP = 100 |
| TOP_K = 12 |
| MAX_CHUNKS_PER_DOC = 8 |
| SIMILARITY_THRESHOLD = 0.35 |
| BM25_WEIGHT = 0.40 |
|
|
| STRICT_PROMPT_TEMPLATE = """Ты — точный аналитик документов. Работаешь с любыми типами документов: финансовыми, юридическими, техническими, медицинскими, кулинарными и другими. |
| ГЛАВНОЕ ПРАВИЛО: отвечай СТРОГО на основе текста документа — ни слова от себя. |
| Никогда не отказывайся отвечать. Если информации нет — пиши ТОЛЬКО «Информация не найдена (отсутствует в документе)». |
| |
| ШАГ 1 — ФАКТЫ ИЗ ДОКУМЕНТА: |
| • Выпиши дословно ВСЕ пункты, относящиеся к вопросу (с номером раздела/статьи). |
| • К каждому пункту выпиши ВСЕ условия, исключения и оговорки, указанные рядом с ним. |
| (льготные периоды, минимальные суммы, пороги, скидки, ограничения) |
| |
| ШАГ 2 — АНАЛИЗ И ВЫЧИСЛЕНИЯ: |
| Если вопрос требует расчётов: |
| • Запиши формулу/логику точно как в документе. |
| • Проверь ВСЕ условия из ШАГ 1: льготные периоды, пороги, минимумы. |
| • При расчёте за несколько периодов: |
| - Исключи льготные/бесплатные периоды из платных |
| - Итого = (платные периоды × ставка) + разовые платежи |
| • Проверь граничные условия (≤N vs >N — могут быть разные ставки). |
| • Сравни результат с минимальным значением, если оно указано в документе. |
| • Подставь числа и вычисли пошагово. |
| Если расчётов нет — пропусти этот шаг. |
| |
| ШАГ 3 — ПРОВЕРКА: |
| • Каждый вывод должен опираться на конкретный пункт из ШАГ 1 — без домыслов. |
| • Если в ШАГ 1 нет факта подтверждающего утверждение — это означает «Информация не найдена», а НЕ «да» или «нет». |
| • Вопросы типа «есть ли X» — отвечай «да» только если X явно присутствует в тексте. |
| • Условия одного пункта НЕ переносятся на другой пункт. |
| • При сравнении вариантов — считай полную стоимость каждого с учётом всех условий. |
| • Если вопрос о конкретном разделе — используй только его. |
| • Цифры и названия цитируй дословно, не округляй и не перефразируй. |
| |
| ШАГ 4 — ИТОГОВЫЙ ОТВЕТ: |
| Дай прямой ответ на вопрос. Не повторяй контекст. |
| Если нужной информации нет — «Информация не найдена (отсутствует в документе)». |
| |
| --- |
| Контекст: |
| {context} |
| |
| Вопрос: {question} |
| |
| Ответ (выполни шаги 1–4):""" |
|
|
|
|
| |
| |
| |
| @dataclass |
| class RAGAnswer: |
| answer: str |
| sources: list[dict] |
| conflicts: list[str] = field(default_factory=list) |
| reasoning: str = "" |
|
|
| def __str__(self) -> str: |
| lines = [f"Ответ:\n{self.answer}"] |
|
|
| if self.conflicts: |
| lines.append("") |
| lines.append("⚠️ ОБНАРУЖЕНЫ ПРОТИВОРЕЧИЯ МЕЖДУ ИСТОЧНИКАМИ:") |
| for c in self.conflicts: |
| lines.append(f" {c}") |
|
|
| lines.append("") |
| lines.append("Источники (релевантность):") |
| seen: set[tuple] = set() |
| for src in self.sources: |
| key = (src["file"], src["page"]) |
| if key not in seen: |
| seen.add(key) |
| score_str = f"{src['score']:.0%}" if src.get("score") is not None else "—" |
| lines.append(f" • {src['file']}, стр. {src['page']} [{score_str}]") |
| return "\n".join(lines) |
|
|
|
|
| |
| |
| |
| class RAGSystem: |
| """ |
| Основной класс RAG-системы. |
| |
| Параметры |
| ---------- |
| llm_provider : "openai" | "ollama" | "groq" | "gemini" |
| Выбор языковой модели. |
| openai_model : str |
| Название модели OpenAI (используется при llm_provider="openai"). |
| ollama_model : str |
| Название модели Ollama (используется при llm_provider="ollama"). |
| openai_api_key : str | None |
| API-ключ. Если не указан, берётся из переменной окружения |
| (OPENAI_API_KEY / GROQ_API_KEY / GOOGLE_API_KEY). |
| ollama_base_url : str |
| URL локального сервера Ollama. |
| """ |
|
|
| def __init__( |
| self, |
| llm_provider: Literal["openai", "ollama", "groq", "gemini"] = "openai", |
| openai_model: str = "gpt-4o-mini", |
| ollama_model: str = "llama3.1:8b", |
| openai_api_key: str | None = None, |
| ollama_base_url: str = "http://localhost:11434", |
| ) -> None: |
| self.llm_provider = llm_provider |
|
|
| |
| print(f"[RAG] Загрузка модели эмбеддингов: {EMBEDDING_MODEL} ...") |
| self.embeddings = HuggingFaceEmbeddings( |
| model_name=EMBEDDING_MODEL, |
| model_kwargs={"device": "cpu"}, |
| encode_kwargs={"normalize_embeddings": True}, |
| ) |
|
|
| |
| self.vectorstore = self._init_vectorstore() |
|
|
| |
| self.splitter = RecursiveCharacterTextSplitter( |
| chunk_size=CHUNK_SIZE, |
| chunk_overlap=CHUNK_OVERLAP, |
| separators=["\n\n", "\n", ".", " ", ""], |
| ) |
|
|
| |
| self._bm25_docs: list[Document] = [] |
| self._bm25_retriever = None |
| self._rebuild_bm25() |
|
|
| |
| self.llm = self._init_llm( |
| llm_provider, |
| openai_model, |
| openai_api_key, |
| ollama_model, |
| ollama_base_url, |
| ) |
|
|
| |
| self.prompt = PromptTemplate( |
| input_variables=["context", "question"], |
| template=STRICT_PROMPT_TEMPLATE, |
| ) |
|
|
| print("[RAG] Система готова к работе.") |
|
|
| |
|
|
| def _init_vectorstore(self) -> Chroma: |
| """Создаёт ChromaDB: in-memory на HuggingFace Spaces, persistent локально.""" |
| import chromadb |
| if os.getenv("SPACE_ID"): |
| |
| client = chromadb.EphemeralClient() |
| print("[RAG] ChromaDB режим: in-memory (HuggingFace Spaces)") |
| else: |
| Path(CHROMA_PERSIST_DIR).mkdir(parents=True, exist_ok=True) |
| client = chromadb.PersistentClient(path=CHROMA_PERSIST_DIR) |
| vectorstore = Chroma( |
| collection_name=COLLECTION_NAME, |
| embedding_function=self.embeddings, |
| client=client, |
| ) |
| count = vectorstore._collection.count() |
| print(f"[RAG] ChromaDB инициализирована: {count} чанков в базе.") |
| return vectorstore |
|
|
| @staticmethod |
| def _init_llm( |
| provider: str, |
| openai_model: str, |
| openai_api_key: str | None, |
| ollama_model: str, |
| ollama_base_url: str, |
| ): |
| """Создаёт объект LLM в зависимости от выбранного провайдера.""" |
| if provider == "openai": |
| from langchain_openai import ChatOpenAI |
|
|
| key = openai_api_key or os.getenv("OPENAI_API_KEY") |
| if not key: |
| raise ValueError( |
| "Не задан OpenAI API-ключ. " |
| "Передайте openai_api_key= или установите OPENAI_API_KEY." |
| ) |
| print(f"[RAG] LLM: ChatOpenAI ({openai_model})") |
| return ChatOpenAI(model=openai_model, api_key=key, temperature=0, request_timeout=60) |
|
|
| elif provider == "ollama": |
| from langchain_ollama import OllamaLLM |
|
|
| print(f"[RAG] LLM: Ollama ({ollama_model}) @ {ollama_base_url}") |
| return OllamaLLM(model=ollama_model, base_url=ollama_base_url, temperature=0) |
|
|
| elif provider == "groq": |
| from langchain_groq import ChatGroq |
|
|
| key = openai_api_key or os.getenv("GROQ_API_KEY") |
| if not key: |
| raise ValueError("Не задан Groq API-ключ. Передайте openai_api_key= или установите GROQ_API_KEY.") |
| print(f"[RAG] LLM: Groq (llama-3.3-70b-versatile)") |
| return ChatGroq(model="llama-3.3-70b-versatile", api_key=key, temperature=0, request_timeout=60) |
|
|
| elif provider == "gemini": |
| from langchain_google_genai import ChatGoogleGenerativeAI |
|
|
| key = openai_api_key or os.getenv("GOOGLE_API_KEY") |
| if not key: |
| raise ValueError("Не задан Google API-ключ.") |
| print(f"[RAG] LLM: Gemini 1.5 Flash") |
| return ChatGoogleGenerativeAI(model="gemini-1.5-flash-latest", google_api_key=key, temperature=0, request_timeout=60) |
|
|
| else: |
| raise ValueError(f"Неизвестный провайдер LLM: '{provider}'. Используйте 'openai', 'ollama', 'groq' или 'gemini'.") |
|
|
| def switch_llm(self, provider: str, openai_api_key: str | None = None) -> None: |
| """Меняет только LLM-провайдер, не трогая vectorstore и BM25-индекс.""" |
| self.llm = self._init_llm( |
| provider, "gpt-4o-mini", openai_api_key, "llama3.1:8b", "http://localhost:11434" |
| ) |
| self.llm_provider = provider |
| print(f"[RAG] Провайдер переключён на: {provider}") |
|
|
| |
|
|
| def add_documents(self, path: str) -> int: |
| """ |
| Загружает PDF и DOCX, разбивает на чанки и сохраняет в ChromaDB. |
| """ |
| file_paths = self._resolve_doc_paths(path) |
| if not file_paths: |
| print(f"[RAG] Файлы не найдены по пути: {path}") |
| return 0 |
|
|
| all_chunks: list[Document] = [] |
|
|
| for file_path in file_paths: |
| fname = Path(file_path).name |
| |
| existing = self.vectorstore._collection.get( |
| where={"source_file": fname}, limit=1, include=[] |
| ) |
| if existing["ids"]: |
| print(f"[RAG] Пропуск {fname} — уже в базе.") |
| continue |
|
|
| print(f"[RAG] Загрузка файла: {file_path}") |
| try: |
| ext = Path(file_path).suffix.lower() |
| if ext == ".pdf": |
| pages = self._load_pdf_with_tables(file_path) |
| elif ext in (".docx", ".doc"): |
| pages = self._load_docx(file_path) |
| else: |
| print(f"[RAG] Неподдерживаемый формат: {ext}") |
| continue |
| chunks = self.splitter.split_documents(pages) |
| all_chunks.extend(chunks) |
| print(f" -> {len(pages)} стр., {len(chunks)} чанков") |
| except Exception as exc: |
| print(f"[RAG] Ошибка при обработке {file_path}: {exc}") |
|
|
| if not all_chunks: |
| print("[RAG] Нет чанков для добавления.") |
| return 0 |
|
|
| |
| batch_size = 100 |
| print(f"[RAG] Индексация {len(all_chunks)} чанков батчами по {batch_size} ...") |
| for i in range(0, len(all_chunks), batch_size): |
| batch = all_chunks[i:i + batch_size] |
| self.vectorstore.add_documents(batch) |
| print(f"[RAG] {min(i + batch_size, len(all_chunks))}/{len(all_chunks)} чанков") |
| total = self.vectorstore._collection.count() |
| self._rebuild_bm25() |
| print(f"[RAG] Готово. Всего в базе: {total} чанков.") |
| return len(all_chunks) |
|
|
| def similarity_search( |
| self, |
| query: str, |
| k: int = TOP_K, |
| threshold: float = SIMILARITY_THRESHOLD, |
| ) -> list[tuple[Document, float]]: |
| """ |
| Гибридный поиск: BM25 (точные коды/артикулы) + vector (семантика). |
| После объединения применяет: |
| 1. Порог схожести (threshold) по векторной оценке |
| 2. Per-document diversity (MAX_CHUNKS_PER_DOC чанков с одного файла) |
| """ |
| |
| vector_scored: list[tuple[Document, float]] = ( |
| self.vectorstore.similarity_search_with_relevance_scores(query, k=k) |
| ) |
| vector_map: dict[str, float] = { |
| doc.page_content: score for doc, score in vector_scored |
| } |
|
|
| |
| bm25_docs: list[Document] = [] |
| if self._bm25_retriever is not None: |
| bm25_docs = self._bm25_retriever.invoke(query) |
|
|
| |
| seen_content: set[str] = set() |
| merged: list[tuple[Document, float]] = [] |
|
|
| for doc, score in vector_scored: |
| if doc.page_content not in seen_content: |
| seen_content.add(doc.page_content) |
| merged.append((doc, score)) |
|
|
| |
| for doc in bm25_docs: |
| if doc.page_content not in seen_content: |
| seen_content.add(doc.page_content) |
| score = vector_map.get(doc.page_content, BM25_WEIGHT) |
| merged.append((doc, score)) |
|
|
| |
| filtered = [(d, s) for d, s in merged if s >= threshold] |
|
|
| if not filtered and merged: |
| best = max(merged, key=lambda x: x[1]) |
| print( |
| f"[RAG] ⚠ Порог {threshold:.0%} не пройден. " |
| f"Лучший score={best[1]:.0%}. Используем 1 чанк." |
| ) |
| filtered = [best] |
| else: |
| dropped = len(merged) - len(filtered) |
| if dropped: |
| print(f"[RAG] Отфильтровано {dropped} чанков (score < {threshold:.0%}).") |
|
|
| |
| per_doc: dict[str, int] = defaultdict(int) |
| diverse: list[tuple[Document, float]] = [] |
| for doc, score in sorted(filtered, key=lambda x: -x[1]): |
| fname = doc.metadata.get("source_file", "?") |
| if per_doc[fname] < MAX_CHUNKS_PER_DOC: |
| diverse.append((doc, score)) |
| per_doc[fname] += 1 |
|
|
| return diverse |
|
|
| def _detect_conflicts(self, docs_with_scores: list[tuple[Document, float]]) -> list[str]: |
| """ |
| Ищет противоречия в числовых значениях между разными PDF-источниками. |
| |
| Алгоритм: |
| 1. Группирует текст чанков по файлу-источнику. |
| 2. Извлекает все числовые значения (%, руб.) с ближайшим контекстом. |
| 3. Если одно и то же ключевое слово встречается с разными числами |
| в разных файлах — фиксирует конфликт. |
| """ |
| by_file: dict[str, str] = defaultdict(str) |
| for doc, _ in docs_with_scores: |
| fname = doc.metadata.get("source_file", "unknown") |
| by_file[fname] += "\n" + doc.page_content |
|
|
| if len(by_file) < 2: |
| return [] |
|
|
| |
| pat = re.compile( |
| r'([^\n]{0,60}?)' |
| r'(\d+[,.]?\d*)' |
| r'\s*(%|руб\.?|рублей)', |
| re.IGNORECASE, |
| ) |
|
|
| |
| STOP = {'', 'и', 'в', 'на', 'с', 'от', 'до', 'за', 'по', 'для', |
| 'не', 'или', 'а', 'но', 'при', 'если', 'то', 'как', 'что'} |
|
|
| def key_words(text: str) -> frozenset[str]: |
| return frozenset(w.lower() for w in re.split(r'\W+', text) if w and w.lower() not in STOP) |
|
|
| |
| facts: dict[str, list[tuple[frozenset, str, str]]] = {} |
| for fname, text in by_file.items(): |
| facts[fname] = [ |
| (key_words(ctx), val, unit) |
| for ctx, val, unit in pat.findall(text) |
| ] |
|
|
| conflicts: list[str] = [] |
| files = list(facts.keys()) |
| for i in range(len(files)): |
| for j in range(i + 1, len(files)): |
| f1, f2 = files[i], files[j] |
| for kw1, val1, unit1 in facts[f1]: |
| for kw2, val2, unit2 in facts[f2]: |
| |
| if unit1.rstrip('.') != unit2.rstrip('.'): |
| continue |
| if val1 == val2: |
| continue |
| |
| if len(kw1 & kw2) >= 3: |
| conflicts.append( |
| f"«{val1}{unit1}» в {f1} vs «{val2}{unit2}» в {f2}" |
| f" (общий контекст: {', '.join(sorted(kw1 & kw2)[:4])})" |
| ) |
| |
| return list(dict.fromkeys(conflicts)) |
|
|
| def ask_question(self, query: str) -> RAGAnswer: |
| """ |
| Принимает вопрос, извлекает релевантный контекст и генерирует ответ. |
| |
| Возвращает RAGAnswer с: |
| - answer — ответ с цепочкой рассуждений |
| - sources — источники с оценкой релевантности |
| - conflicts — предупреждения о противоречиях между PDF |
| """ |
| |
| docs_with_scores = self.similarity_search(query) |
|
|
| if not docs_with_scores: |
| return RAGAnswer(answer="Информация не найдена", sources=[], conflicts=[]) |
|
|
| |
| conflicts = self._detect_conflicts(docs_with_scores) |
| if conflicts: |
| print(f"[RAG] Обнаружено противоречий: {len(conflicts)}") |
|
|
| |
| context_parts = [] |
| for doc, score in docs_with_scores: |
| fname = doc.metadata.get("source_file", "?") |
| page = doc.metadata.get("page", 0) + 1 |
| context_parts.append( |
| f"[Источник: {fname}, стр. {page}, релевантность: {score:.0%}]\n" |
| + doc.page_content |
| ) |
| context_text = "\n\n---\n\n".join(context_parts) |
|
|
| |
| chain = self.prompt | self.llm | StrOutputParser() |
| raw_answer = chain.invoke({"context": context_text, "question": query}) |
| answer = self._extract_final_answer(raw_answer) |
|
|
| |
| sources = [] |
| for doc, score in docs_with_scores: |
| meta = doc.metadata |
| sources.append({ |
| "file": meta.get("source_file") or Path(meta.get("source", "unknown")).name, |
| "page": meta.get("page", 0) + 1, |
| "score": score, |
| "snippet": doc.page_content[:120].replace("\n", " "), |
| }) |
|
|
| return RAGAnswer(answer=answer.strip(), sources=sources, conflicts=conflicts, reasoning=raw_answer.strip()) |
|
|
| def get_stats(self) -> dict: |
| """Возвращает статистику по базе данных.""" |
| count = self.vectorstore._collection.count() |
| return {"collection": COLLECTION_NAME, "total_chunks": count, "persist_dir": CHROMA_PERSIST_DIR} |
|
|
| |
|
|
| def _rebuild_bm25(self) -> None: |
| """Пересобирает и кеширует BM25-индекс из всех документов в ChromaDB.""" |
| try: |
| data = self.vectorstore._collection.get(include=["documents", "metadatas"]) |
| self._bm25_docs = [ |
| Document(page_content=text, metadata=meta) |
| for text, meta in zip(data["documents"], data["metadatas"]) |
| if text and text.strip() |
| ] |
| self._bm25_retriever = ( |
| BM25Retriever.from_documents(self._bm25_docs, k=TOP_K) |
| if self._bm25_docs else None |
| ) |
| print(f"[RAG] BM25 индекс: {len(self._bm25_docs)} документов.") |
| except Exception as e: |
| print(f"[RAG] BM25 rebuild error: {e}") |
| self._bm25_docs = [] |
| self._bm25_retriever = None |
|
|
| @staticmethod |
| def _extract_final_answer(text: str) -> str: |
| """Извлекает только раздел «ИТОГОВЫЙ ОТВЕТ» из полного CoT-вывода.""" |
| m = re.search( |
| r'(?:ШАГ\s*4\s*[—–\-]\s*)?ИТОГОВЫЙ\s*ОТВЕТ\s*:?\s*\n+(.*)', |
| text, |
| re.IGNORECASE | re.DOTALL, |
| ) |
| if m: |
| return m.group(1).strip() |
| return text.strip() |
|
|
| @staticmethod |
| def _load_pdf_with_tables(pdf_path: str) -> list[Document]: |
| """ |
| Загружает PDF через pdfplumber. |
| Таблицы форматируются как «Номер Услуга → Стоимость» — связь сохраняется при чанкинге. |
| """ |
| fname = Path(pdf_path).name |
| docs: list[Document] = [] |
|
|
| with pdfplumber.open(pdf_path) as pdf: |
| for page_num, page in enumerate(pdf.pages): |
| parts: list[str] = [] |
|
|
| |
| text = page.extract_text() or "" |
| if text.strip(): |
| parts.append(text.strip()) |
|
|
| |
| for table in page.extract_tables(): |
| if not table: |
| continue |
|
|
| |
| n_cols = max(len(r) for r in table) |
| active_cols = [ |
| ci for ci in range(n_cols) |
| if any( |
| (str(row[ci] or "").strip() if ci < len(row) else "") |
| for row in table |
| ) |
| ] |
|
|
| clean = [] |
| for row in table: |
| clean.append([ |
| str(row[ci] or "").strip().replace("\n", " ") |
| if ci < len(row) else "" |
| for ci in active_cols |
| ]) |
|
|
| rows: list[str] = [] |
| for row in clean: |
| non_empty = [v for v in row if v] |
| if not non_empty: |
| continue |
|
|
| |
| |
| if len(non_empty) == 1: |
| rows.append(non_empty[0]) |
| elif len(non_empty) == 2: |
| rows.append(f"{non_empty[0]}: {non_empty[1]}") |
| else: |
| |
| num = non_empty[0] |
| desc = " ".join(non_empty[1:-1]) |
| price = non_empty[-1] |
| rows.append(f"{num} {desc}: {price}") |
|
|
| if rows: |
| parts.append("[ТАБЛИЦА]\n" + "\n".join(rows)) |
|
|
| combined = "\n\n".join(parts).strip() |
| if combined: |
| docs.append(Document( |
| page_content=combined, |
| metadata={"source_file": fname, "page": page_num}, |
| )) |
| return docs |
|
|
| @staticmethod |
| def _resolve_doc_paths(path: str) -> list[str]: |
| """Разворачивает путь к файлу или папке в список PDF/DOCX путей.""" |
| p = Path(path) |
| if p.is_file() and p.suffix.lower() in (".pdf", ".docx", ".doc"): |
| return [str(p)] |
| if p.is_dir(): |
| found = [] |
| for ext in ("*.pdf", "*.docx", "*.doc"): |
| found.extend(glob.glob(str(p / "**" / ext), recursive=True)) |
| return sorted(found) |
| return [] |
|
|
| |
| @staticmethod |
| def _resolve_pdf_paths(path: str) -> list[str]: |
| return RAGSystem._resolve_doc_paths(path) |
|
|
| @staticmethod |
| def _load_docx(file_path: str) -> list[Document]: |
| """Загружает DOCX: текст параграфов + таблицы.""" |
| fname = Path(file_path).name |
| doc = python_docx.Document(file_path) |
| docs: list[Document] = [] |
| parts: list[str] = [] |
|
|
| for para in doc.paragraphs: |
| text = para.text.strip() |
| if text: |
| parts.append(text) |
|
|
| for table in doc.tables: |
| rows: list[str] = [] |
| for row in table.rows: |
| cells = [c.text.strip().replace("\n", " ") for c in row.cells if c.text.strip()] |
| if cells: |
| rows.append(" | ".join(cells)) |
| if rows: |
| parts.append("[ТАБЛИЦА]\n" + "\n".join(rows)) |
|
|
| |
| combined = "\n\n".join(parts) |
| chunk_size = 3000 |
| for i, start in enumerate(range(0, max(len(combined), 1), chunk_size)): |
| piece = combined[start:start + chunk_size].strip() |
| if piece: |
| docs.append(Document( |
| page_content=piece, |
| metadata={"source_file": fname, "page": i}, |
| )) |
| return docs |
|
|
|
|
| |
| |
| |
| if __name__ == "__main__": |
| import argparse |
|
|
| parser = argparse.ArgumentParser(description="RAG-чатбот для документов") |
| parser.add_argument("--provider", choices=["openai", "ollama", "groq", "gemini"], default="ollama", |
| help="Провайдер LLM (openai, ollama, groq или gemini)") |
| parser.add_argument("--index", metavar="PDF_DIR", |
| help="Папка с PDF для индексации") |
| parser.add_argument("--question", "-q", metavar="ВОПРОС", |
| help="Задать вопрос системе") |
| parser.add_argument("--stats", action="store_true", |
| help="Показать статистику базы данных") |
| args = parser.parse_args() |
|
|
| |
| rag = RAGSystem(llm_provider=args.provider) |
|
|
| if args.stats: |
| print(rag.get_stats()) |
|
|
| if args.index: |
| added = rag.add_documents(args.index) |
| print(f"\nДобавлено чанков: {added}") |
|
|
| if args.question: |
| print(f"\nВопрос: {args.question}") |
| result = rag.ask_question(args.question) |
| print("\n" + str(result)) |
|
|
| |
| if not args.index and not args.question and not args.stats: |
| print("\nИнтерактивный режим. Введите 'выход' для завершения.") |
| while True: |
| query = input("\nВопрос: ").strip() |
| if query.lower() in ("выход", "exit", "quit", "q"): |
| break |
| if not query: |
| continue |
| result = rag.ask_question(query) |
| print("\n" + str(result)) |
|
|