Spaces:
Sleeping
Sleeping
| from typing import TypedDict, List, Annotated | |
| import json | |
| import time | |
| from contextlib import contextmanager | |
| from pathlib import Path | |
| import json, re | |
| import tempfile | |
| import zipfile, tarfile | |
| from langchain_core.documents import Document | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| from langchain_community.vectorstores import FAISS | |
| from langchain_huggingface import HuggingFaceEmbeddings | |
| from langchain_community.llms.yandex import YandexGPT | |
| from langgraph.graph import StateGraph, END | |
| from langgraph.graph.message import add_messages | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| from pathlib import Path | |
| import os | |
| from pypdf import PdfReader | |
| # ====================== CONFIG ====================== | |
| YANDEX_API_KEY = os.getenv("YANDEX_API_KEY") | |
| YANDEX_FOLDER_ID = os.getenv("YANDEX_FOLDER_ID") | |
| # --- LLMs --- | |
| llm_reasoning = YandexGPT( | |
| api_key=YANDEX_API_KEY, | |
| folder_id=YANDEX_FOLDER_ID, | |
| temperature=0.3, | |
| max_tokens=2000 | |
| ) | |
| llm_output = YandexGPT( | |
| api_key=YANDEX_API_KEY, | |
| folder_id=YANDEX_FOLDER_ID, | |
| temperature=0.4, | |
| max_tokens=6500 | |
| ) | |
| def llm_analyze(prompt: str) -> str: | |
| return llm_reasoning.invoke(prompt) | |
| def llm_final(prompt: str) -> str: | |
| return llm_output.invoke(prompt) | |
| # --- Logging --- | |
| def log_step(name: str): | |
| print(f"\n>>> START: {name}") | |
| start = time.time() | |
| try: | |
| yield | |
| finally: | |
| print(f"<<< END: {name} ({time.time() - start:.2f}s)") | |
| # ====================== STATE ====================== | |
| class GraphState(TypedDict): | |
| query: str | |
| toc: str | |
| toc_analysis: str | |
| plan: dict | |
| retrieval_queries: List[str] | |
| contexts: List[Document] | |
| result: dict | |
| iteration: int | |
| recurse: bool | |
| # ====================== ARCHIVE PROCESSING ====================== | |
| def read_file(file_path: Path) -> str: | |
| suffix = file_path.suffix.lower() | |
| try: | |
| # --- PDF --- | |
| if suffix == ".pdf": | |
| reader = PdfReader(str(file_path)) | |
| text = [] | |
| for page in reader.pages: | |
| text.append(page.extract_text() or "") | |
| return "\n".join(text) | |
| # --- TXT / CODE --- | |
| elif suffix in [".txt", ".md", ".py", ".json"]: | |
| return file_path.read_text(encoding="utf-8", errors="ignore") | |
| # --- fallback --- | |
| else: | |
| return file_path.read_text(encoding="utf-8", errors="ignore") | |
| except Exception as e: | |
| print(f"⚠️ Ошибка чтения {file_path}: {e}") | |
| return "" | |
| def extract_archive_to_documents(archive_path: str) -> List[Document]: | |
| documents = [] | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| tmp_path = Path(tmpdir) | |
| if archive_path.endswith('.zip'): | |
| with zipfile.ZipFile(archive_path) as z: | |
| z.extractall(tmpdir) | |
| elif archive_path.endswith(('.tar', '.tar.gz', '.tgz')): | |
| with tarfile.open(archive_path) as t: | |
| t.extractall(tmpdir) | |
| else: | |
| raise ValueError(f"Unsupported archive: {archive_path}") | |
| for file_path in tmp_path.rglob("*"): | |
| if file_path.is_file() and not file_path.name.startswith('.'): | |
| try: | |
| relative_path = file_path.relative_to(tmp_path) | |
| text = read_file(file_path) | |
| if not text.strip(): | |
| continue | |
| doc = Document( | |
| page_content=text, | |
| metadata={ | |
| "source": str(relative_path), | |
| "file_name": file_path.name, | |
| "file_type": file_path.suffix.lower(), | |
| "size_bytes": file_path.stat().st_size, | |
| } | |
| ) | |
| documents.append(doc) | |
| except Exception as e: | |
| print(f"⚠️ Не удалось обработать {file_path}: {e}") | |
| print(f"Извлечено {len(documents)} документов из архива") | |
| return documents | |
| # ====================== VECTORSTORE ====================== | |
| def build_vectorstore(docs: List[Document]): | |
| with log_step("Building FAISS Vectorstore"): | |
| splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=1200, | |
| chunk_overlap=150, | |
| separators=["\n\n", "\n", ". ", " ", ""] | |
| ) | |
| splits = splitter.split_documents(docs) | |
| embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") | |
| db = FAISS.from_documents(splits, embeddings) | |
| retriever = db.as_retriever( | |
| search_type="mmr", | |
| search_kwargs={"k": 10, "fetch_k": 25, "lambda_mult": 0.7} | |
| ) | |
| return db, retriever | |
| # ====================== NODES ====================== | |
| def analyze_toc(state: GraphState): | |
| with log_step("analyze_toc"): | |
| prompt = f"Оглавление курса:\n{state['toc']}\n\nКратко опиши структуру курса, выделив основные разделы и логику." | |
| toc_analysis = llm_analyze(prompt) | |
| return {"toc_analysis": toc_analysis} | |
| def planner(state: GraphState): | |
| with log_step("planner"): | |
| prompt = f""" | |
| Оглавление: | |
| {state['toc']} | |
| Анализ структуры: | |
| {state['toc_analysis']} | |
| Запрос пользователя: | |
| {state['query']} | |
| Составь план retrieval'а. Верни JSON: | |
| {{ | |
| "sections": ["список ключевых тем"], | |
| "queries": ["конкретные поисковые запросы для RAG"] | |
| }} | |
| """ | |
| raw = llm_analyze(prompt) | |
| try: | |
| plan = json.loads(raw) | |
| except: | |
| plan = {"sections": [], "queries": [state["query"]]} | |
| return { | |
| "plan": plan, | |
| "retrieval_queries": plan.get("queries", [state["query"]]), | |
| "iteration": 0 | |
| } | |
| def retrieve(state: GraphState): | |
| with log_step("retrieve"): | |
| all_docs = [] | |
| for q in state["retrieval_queries"]: | |
| docs = retriever.invoke(q) | |
| all_docs.extend(docs) | |
| seen = set() | |
| unique_docs = [] | |
| for doc in all_docs: | |
| content_hash = hash(doc.page_content[:200]) | |
| if content_hash not in seen: | |
| seen.add(content_hash) | |
| unique_docs.append(doc) | |
| print(f"Retrieved {len(unique_docs)} unique documents") | |
| return { | |
| "contexts": state["contexts"] + unique_docs | |
| } | |
| def check_completeness(state: GraphState): | |
| with log_step("check_completeness"): | |
| state["iteration"] += 1 | |
| total_chars = sum(len(doc.page_content) for doc in state["contexts"]) | |
| num_docs = len(state["contexts"]) | |
| print(f"Iteration {state['iteration']} | Documents: {num_docs} | Chars: ~{total_chars//1000}k") | |
| # Жёсткий лимит по размеру контекста | |
| if total_chars > 25000 or state["iteration"] >= 8: | |
| print("✅ Достаточно контекста по объёму → завершаем сбор") | |
| return {"recurse": False} | |
| context_preview = "\n\n".join([doc.page_content[:400] for doc in state["contexts"][-8:]]) | |
| prompt = f""" | |
| Текущий запрос пользователя: | |
| {state['query']} | |
| Уже собрано {num_docs} документов (~{total_chars//1000}k символов). | |
| Проанализируй, достаточно ли материала, чтобы создать **качественный интенсивный курс**. | |
| Ответь строго JSON: | |
| {{ | |
| "enough": true/false, | |
| "reason": "короткое объяснение почему enough или нет", | |
| "next_query": "если enough=false — один **конкретный** поисковый запрос для RAG. | |
| Должен быть тематическим, а не 'составь план'. | |
| Пример: 'аффинные алгебраические многообразия определение свойства примеры'" | |
| }} | |
| Будь строгим. Если контекста уже много и основные темы покрыты — ставь enough: true. | |
| """ | |
| raw = llm_analyze(prompt) | |
| try: | |
| match = re.search(r'\{[\s\S]*\}', raw) | |
| data = json.loads(match.group(0)) if match else json.loads(raw) | |
| enough = data.get("enough", False) | |
| next_query = (data.get("next_query") or "").strip() | |
| reason = data.get("reason", "") | |
| print(f"Enough: {enough} | Reason: {reason[:100]}...") | |
| if not enough and next_query and state["iteration"] < 8: | |
| print(f"→ New query: {next_query}") | |
| return { | |
| "retrieval_queries": state["retrieval_queries"] + [next_query], | |
| "recurse": True | |
| } | |
| else: | |
| print("✅ Завершаем итерации, переходим к генерации плана") | |
| return {"recurse": False} | |
| except Exception as e: | |
| print(f"Parse error: {e}") | |
| # Если модель совсем не дала JSON — выходим после 6 итераций | |
| if state["iteration"] >= 6: | |
| return {"recurse": False} | |
| return {"recurse": True} # попробуем ещё раз | |
| def generate_weekly_plan(state: GraphState): | |
| with log_step("generate_weekly_plan"): | |
| context_blocks = [] | |
| for doc in state["contexts"][:25]: | |
| meta = doc.metadata | |
| source = meta.get("source") or meta.get("file_name", "unknown") | |
| preview = doc.page_content[:700].replace("\n", " ").strip() | |
| context_blocks.append(f"Источник → {source}\n{preview}...\n") | |
| context_text = "\n\n" + "-" * 80 + "\n\n".join(context_blocks) | |
| prompt = f"""Ты — строгий профессиональный методист. | |
| Отвечай **исключительно валидным JSON**, без единого слова вне объекта. | |
| Запрос пользователя: {state['query']} | |
| Оглавление курса: | |
| {state.get('toc', 'Не предоставлено')} | |
| Доступные материалы: | |
| {context_text} | |
| Создай понедельный учебный план, соответствующий запросу пользователя. | |
| В одной неделе может быть от 2 до 5 занятий. | |
| Если требуется усиленный курс, то надо от 3х занятий в неделю. | |
| Верни **ТОЛЬКО** этот JSON (начинай сразу с '{{'): | |
| {{ | |
| "course_title": "Название курса", | |
| "duration_weeks": число, | |
| "weekly_plan": [ | |
| {{ | |
| "week": 1, | |
| "theme": "Общая тема недели", | |
| "sessions": [ | |
| {{ | |
| "session_number": 1, | |
| "title": "Название конкретного занятия", | |
| "goal": "Подробная мотивировка и цель занятия (что студент должен понять и уметь)", | |
| "main_sources": [ | |
| {{ | |
| "material": "Название учебника или лекции", | |
| "chapter": "§1 Проективное пространство", | |
| "section": "1.1 Соглашения об обозначениях, 1.2 Определение", | |
| "file_source": "имя_файла.pdf" | |
| }} | |
| ], | |
| "preparation_materials": ["список строк для подготовки"], | |
| "practice_and_homework": ["конкретные задания, задачи, домашняя работа"], | |
| "estimated_time": "3–5 часов" | |
| }} | |
| ] | |
| }} | |
| ], | |
| "additional_recommendations": "Общие рекомендации" | |
| }} | |
| Правила, которых нужно строго придерживаться: | |
| - Используй реальные названия глав, параграфов и файлов из предоставленных материалов. | |
| - Обязательно заполняй поле "goal" — подробная мотивировка, зачем это занятие нужно. | |
| - Поле "file_source" должно содержать реальное имя файла из метаданных, если возможно. | |
| - В одной неделе можно делать несколько занятий (sessions). | |
| - Не пиши ничего кроме JSON. | |
| """ | |
| raw = llm_final(prompt) | |
| try: | |
| cleaned = raw.strip() | |
| if cleaned.startswith("```"): | |
| cleaned = cleaned.split("```")[1].strip() | |
| if cleaned.lower().startswith(("json", "```json")): | |
| cleaned = cleaned.split("\n", 1)[-1].strip() | |
| plan = json.loads(cleaned) | |
| except json.JSONDecodeError: | |
| import re | |
| match = re.search(r'(\{[\s\S]*\})', raw, re.DOTALL) | |
| if match: | |
| try: | |
| plan = json.loads(match.group(1)) | |
| except: | |
| plan = {"error": "JSON parse failed", "raw_preview": raw[:1000]} | |
| else: | |
| plan = {"error": "JSON parse failed", "raw_preview": raw[:800]} | |
| return {"result": plan} | |
| def make_final_output_pretty(state: GraphState): | |
| import json | |
| raw_plan = state["result"] | |
| if isinstance(raw_plan, dict): | |
| plan_str = json.dumps(raw_plan, ensure_ascii=False, indent=2) | |
| else: | |
| plan_str = str(raw_plan) | |
| prompt = f""" | |
| Ты — отличный технический писатель и методист. | |
| У тебя есть следующий JSON с учебным планом (возможно, неидеальный): | |
| {plan_str} | |
| Твоя задача: | |
| преобразовать его в КРАСИВЫЙ и СТРУКТУРИРОВАННЫЙ Markdown-документ. | |
| Требования: | |
| - Используй заголовки (#, ##, ###) | |
| - Выделяй недели как отдельные секции | |
| - Для каждой недели: | |
| - тема | |
| - список занятий (bullets) | |
| - краткое описание (если есть) | |
| - Добавь читаемую структуру курса | |
| - В конце добавь раздел "Рекомендации" | |
| - Убери технический мусор и JSON-структуру | |
| - Сделай текст естественным, как учебный материал | |
| Формат: | |
| # Название курса | |
| ## Неделя 1 — Тема | |
| - Занятие 1 | |
| - Занятие 2 | |
| ## Неделя 2 — ... | |
| ## Рекомендации | |
| Верни ТОЛЬКО Markdown, без JSON и без пояснений. | |
| """ | |
| pretty = llm_final(prompt) | |
| return {"result": pretty} | |
| # ====================== GRAPH ====================== | |
| def should_continue(state: GraphState) -> str: | |
| if state.get("recurse") is True: | |
| return "retrieve" | |
| return "generate_weekly_plan" | |
| builder = StateGraph(GraphState) | |
| builder.add_node("analyze_toc", analyze_toc) | |
| builder.add_node("planner", planner) | |
| builder.add_node("retrieve", retrieve) | |
| builder.add_node("check_completeness", check_completeness) | |
| builder.add_node("generate_weekly_plan", generate_weekly_plan) | |
| builder.add_node("pretty_output", make_final_output_pretty) | |
| builder.set_entry_point("analyze_toc") | |
| builder.add_edge("analyze_toc", "planner") | |
| builder.add_edge("planner", "retrieve") | |
| builder.add_edge("retrieve", "check_completeness") | |
| builder.add_conditional_edges( | |
| "check_completeness", | |
| should_continue, | |
| { | |
| "retrieve": "retrieve", | |
| "generate_weekly_plan": "generate_weekly_plan" | |
| } | |
| ) | |
| builder.add_edge("generate_weekly_plan", "pretty_output") | |
| builder.add_edge("pretty_output", END) | |
| graph = builder.compile() | |
| # ====================== GLOBAL ====================== | |
| retriever = None | |
| toc_text = "" | |
| # ====================== MAIN PIPELINE ====================== | |
| def run_course_builder(query: str, toc: str, archive_path: str): | |
| global retriever, toc_text | |
| toc_text = toc | |
| documents = extract_archive_to_documents(archive_path) | |
| _, retriever = build_vectorstore(documents) | |
| initial_state = { | |
| "query": query, | |
| "toc": toc_text, | |
| "toc_analysis": "", | |
| "plan": {}, | |
| "retrieval_queries": [], | |
| "contexts": [], | |
| "result": {}, | |
| "iteration": 0, | |
| "recurse": False | |
| } | |
| final_state = graph.invoke(initial_state) | |
| return final_state["result"] | |