from typing import TypedDict, List, Annotated import json import time from contextlib import contextmanager from pathlib import Path import json, re import tempfile import zipfile, tarfile from langchain_core.documents import Document from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_community.vectorstores import FAISS from langchain_huggingface import HuggingFaceEmbeddings from langchain_community.llms.yandex import YandexGPT from langgraph.graph import StateGraph, END from langgraph.graph.message import add_messages from dotenv import load_dotenv load_dotenv() from pathlib import Path import os from pypdf import PdfReader # ====================== CONFIG ====================== YANDEX_API_KEY = os.getenv("YANDEX_API_KEY") YANDEX_FOLDER_ID = os.getenv("YANDEX_FOLDER_ID") # --- LLMs --- llm_reasoning = YandexGPT( api_key=YANDEX_API_KEY, folder_id=YANDEX_FOLDER_ID, temperature=0.3, max_tokens=2000 ) llm_output = YandexGPT( api_key=YANDEX_API_KEY, folder_id=YANDEX_FOLDER_ID, temperature=0.4, max_tokens=6500 ) def llm_analyze(prompt: str) -> str: return llm_reasoning.invoke(prompt) def llm_final(prompt: str) -> str: return llm_output.invoke(prompt) # --- Logging --- @contextmanager def log_step(name: str): print(f"\n>>> START: {name}") start = time.time() try: yield finally: print(f"<<< END: {name} ({time.time() - start:.2f}s)") # ====================== STATE ====================== class GraphState(TypedDict): query: str toc: str toc_analysis: str plan: dict retrieval_queries: List[str] contexts: List[Document] result: dict iteration: int recurse: bool # ====================== ARCHIVE PROCESSING ====================== def read_file(file_path: Path) -> str: suffix = file_path.suffix.lower() try: # --- PDF --- if suffix == ".pdf": reader = PdfReader(str(file_path)) text = [] for page in reader.pages: text.append(page.extract_text() or "") return "\n".join(text) # --- TXT / CODE --- elif suffix in [".txt", ".md", ".py", ".json"]: return file_path.read_text(encoding="utf-8", errors="ignore") # --- fallback --- else: return file_path.read_text(encoding="utf-8", errors="ignore") except Exception as e: print(f"⚠️ Ошибка чтения {file_path}: {e}") return "" def extract_archive_to_documents(archive_path: str) -> List[Document]: documents = [] with tempfile.TemporaryDirectory() as tmpdir: tmp_path = Path(tmpdir) if archive_path.endswith('.zip'): with zipfile.ZipFile(archive_path) as z: z.extractall(tmpdir) elif archive_path.endswith(('.tar', '.tar.gz', '.tgz')): with tarfile.open(archive_path) as t: t.extractall(tmpdir) else: raise ValueError(f"Unsupported archive: {archive_path}") for file_path in tmp_path.rglob("*"): if file_path.is_file() and not file_path.name.startswith('.'): try: relative_path = file_path.relative_to(tmp_path) text = read_file(file_path) if not text.strip(): continue doc = Document( page_content=text, metadata={ "source": str(relative_path), "file_name": file_path.name, "file_type": file_path.suffix.lower(), "size_bytes": file_path.stat().st_size, } ) documents.append(doc) except Exception as e: print(f"⚠️ Не удалось обработать {file_path}: {e}") print(f"Извлечено {len(documents)} документов из архива") return documents # ====================== VECTORSTORE ====================== def build_vectorstore(docs: List[Document]): with log_step("Building FAISS Vectorstore"): splitter = RecursiveCharacterTextSplitter( chunk_size=1200, chunk_overlap=150, separators=["\n\n", "\n", ". ", " ", ""] ) splits = splitter.split_documents(docs) embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") db = FAISS.from_documents(splits, embeddings) retriever = db.as_retriever( search_type="mmr", search_kwargs={"k": 10, "fetch_k": 25, "lambda_mult": 0.7} ) return db, retriever # ====================== NODES ====================== def analyze_toc(state: GraphState): with log_step("analyze_toc"): prompt = f"Оглавление курса:\n{state['toc']}\n\nКратко опиши структуру курса, выделив основные разделы и логику." toc_analysis = llm_analyze(prompt) return {"toc_analysis": toc_analysis} def planner(state: GraphState): with log_step("planner"): prompt = f""" Оглавление: {state['toc']} Анализ структуры: {state['toc_analysis']} Запрос пользователя: {state['query']} Составь план retrieval'а. Верни JSON: {{ "sections": ["список ключевых тем"], "queries": ["конкретные поисковые запросы для RAG"] }} """ raw = llm_analyze(prompt) try: plan = json.loads(raw) except: plan = {"sections": [], "queries": [state["query"]]} return { "plan": plan, "retrieval_queries": plan.get("queries", [state["query"]]), "iteration": 0 } def retrieve(state: GraphState): with log_step("retrieve"): all_docs = [] for q in state["retrieval_queries"]: docs = retriever.invoke(q) all_docs.extend(docs) seen = set() unique_docs = [] for doc in all_docs: content_hash = hash(doc.page_content[:200]) if content_hash not in seen: seen.add(content_hash) unique_docs.append(doc) print(f"Retrieved {len(unique_docs)} unique documents") return { "contexts": state["contexts"] + unique_docs } def check_completeness(state: GraphState): with log_step("check_completeness"): state["iteration"] += 1 total_chars = sum(len(doc.page_content) for doc in state["contexts"]) num_docs = len(state["contexts"]) print(f"Iteration {state['iteration']} | Documents: {num_docs} | Chars: ~{total_chars//1000}k") # Жёсткий лимит по размеру контекста if total_chars > 25000 or state["iteration"] >= 8: print("✅ Достаточно контекста по объёму → завершаем сбор") return {"recurse": False} context_preview = "\n\n".join([doc.page_content[:400] for doc in state["contexts"][-8:]]) prompt = f""" Текущий запрос пользователя: {state['query']} Уже собрано {num_docs} документов (~{total_chars//1000}k символов). Проанализируй, достаточно ли материала, чтобы создать **качественный интенсивный курс**. Ответь строго JSON: {{ "enough": true/false, "reason": "короткое объяснение почему enough или нет", "next_query": "если enough=false — один **конкретный** поисковый запрос для RAG. Должен быть тематическим, а не 'составь план'. Пример: 'аффинные алгебраические многообразия определение свойства примеры'" }} Будь строгим. Если контекста уже много и основные темы покрыты — ставь enough: true. """ raw = llm_analyze(prompt) try: match = re.search(r'\{[\s\S]*\}', raw) data = json.loads(match.group(0)) if match else json.loads(raw) enough = data.get("enough", False) next_query = (data.get("next_query") or "").strip() reason = data.get("reason", "") print(f"Enough: {enough} | Reason: {reason[:100]}...") if not enough and next_query and state["iteration"] < 8: print(f"→ New query: {next_query}") return { "retrieval_queries": state["retrieval_queries"] + [next_query], "recurse": True } else: print("✅ Завершаем итерации, переходим к генерации плана") return {"recurse": False} except Exception as e: print(f"Parse error: {e}") # Если модель совсем не дала JSON — выходим после 6 итераций if state["iteration"] >= 6: return {"recurse": False} return {"recurse": True} # попробуем ещё раз def generate_weekly_plan(state: GraphState): with log_step("generate_weekly_plan"): context_blocks = [] for doc in state["contexts"][:25]: meta = doc.metadata source = meta.get("source") or meta.get("file_name", "unknown") preview = doc.page_content[:700].replace("\n", " ").strip() context_blocks.append(f"Источник → {source}\n{preview}...\n") context_text = "\n\n" + "-" * 80 + "\n\n".join(context_blocks) prompt = f"""Ты — строгий профессиональный методист. Отвечай **исключительно валидным JSON**, без единого слова вне объекта. Запрос пользователя: {state['query']} Оглавление курса: {state.get('toc', 'Не предоставлено')} Доступные материалы: {context_text} Создай понедельный учебный план, соответствующий запросу пользователя. В одной неделе может быть от 2 до 5 занятий. Если требуется усиленный курс, то надо от 3х занятий в неделю. Верни **ТОЛЬКО** этот JSON (начинай сразу с '{{'): {{ "course_title": "Название курса", "duration_weeks": число, "weekly_plan": [ {{ "week": 1, "theme": "Общая тема недели", "sessions": [ {{ "session_number": 1, "title": "Название конкретного занятия", "goal": "Подробная мотивировка и цель занятия (что студент должен понять и уметь)", "main_sources": [ {{ "material": "Название учебника или лекции", "chapter": "§1 Проективное пространство", "section": "1.1 Соглашения об обозначениях, 1.2 Определение", "file_source": "имя_файла.pdf" }} ], "preparation_materials": ["список строк для подготовки"], "practice_and_homework": ["конкретные задания, задачи, домашняя работа"], "estimated_time": "3–5 часов" }} ] }} ], "additional_recommendations": "Общие рекомендации" }} Правила, которых нужно строго придерживаться: - Используй реальные названия глав, параграфов и файлов из предоставленных материалов. - Обязательно заполняй поле "goal" — подробная мотивировка, зачем это занятие нужно. - Поле "file_source" должно содержать реальное имя файла из метаданных, если возможно. - В одной неделе можно делать несколько занятий (sessions). - Не пиши ничего кроме JSON. """ raw = llm_final(prompt) try: cleaned = raw.strip() if cleaned.startswith("```"): cleaned = cleaned.split("```")[1].strip() if cleaned.lower().startswith(("json", "```json")): cleaned = cleaned.split("\n", 1)[-1].strip() plan = json.loads(cleaned) except json.JSONDecodeError: import re match = re.search(r'(\{[\s\S]*\})', raw, re.DOTALL) if match: try: plan = json.loads(match.group(1)) except: plan = {"error": "JSON parse failed", "raw_preview": raw[:1000]} else: plan = {"error": "JSON parse failed", "raw_preview": raw[:800]} return {"result": plan} def make_final_output_pretty(state: GraphState): import json raw_plan = state["result"] if isinstance(raw_plan, dict): plan_str = json.dumps(raw_plan, ensure_ascii=False, indent=2) else: plan_str = str(raw_plan) prompt = f""" Ты — отличный технический писатель и методист. У тебя есть следующий JSON с учебным планом (возможно, неидеальный): {plan_str} Твоя задача: преобразовать его в КРАСИВЫЙ и СТРУКТУРИРОВАННЫЙ Markdown-документ. Требования: - Используй заголовки (#, ##, ###) - Выделяй недели как отдельные секции - Для каждой недели: - тема - список занятий (bullets) - краткое описание (если есть) - Добавь читаемую структуру курса - В конце добавь раздел "Рекомендации" - Убери технический мусор и JSON-структуру - Сделай текст естественным, как учебный материал Формат: # Название курса ## Неделя 1 — Тема - Занятие 1 - Занятие 2 ## Неделя 2 — ... ## Рекомендации Верни ТОЛЬКО Markdown, без JSON и без пояснений. """ pretty = llm_final(prompt) return {"result": pretty} # ====================== GRAPH ====================== def should_continue(state: GraphState) -> str: if state.get("recurse") is True: return "retrieve" return "generate_weekly_plan" builder = StateGraph(GraphState) builder.add_node("analyze_toc", analyze_toc) builder.add_node("planner", planner) builder.add_node("retrieve", retrieve) builder.add_node("check_completeness", check_completeness) builder.add_node("generate_weekly_plan", generate_weekly_plan) builder.add_node("pretty_output", make_final_output_pretty) builder.set_entry_point("analyze_toc") builder.add_edge("analyze_toc", "planner") builder.add_edge("planner", "retrieve") builder.add_edge("retrieve", "check_completeness") builder.add_conditional_edges( "check_completeness", should_continue, { "retrieve": "retrieve", "generate_weekly_plan": "generate_weekly_plan" } ) builder.add_edge("generate_weekly_plan", "pretty_output") builder.add_edge("pretty_output", END) graph = builder.compile() # ====================== GLOBAL ====================== retriever = None toc_text = "" # ====================== MAIN PIPELINE ====================== def run_course_builder(query: str, toc: str, archive_path: str): global retriever, toc_text toc_text = toc documents = extract_archive_to_documents(archive_path) _, retriever = build_vectorstore(documents) initial_state = { "query": query, "toc": toc_text, "toc_analysis": "", "plan": {}, "retrieval_queries": [], "contexts": [], "result": {}, "iteration": 0, "recurse": False } final_state = graph.invoke(initial_state) return final_state["result"]