CourseBuilder / pipeline.py
Dropdead072's picture
Update pipeline.py
fd8cb04 verified
from typing import TypedDict, List, Annotated
import json
import time
from contextlib import contextmanager
from pathlib import Path
import json, re
import tempfile
import zipfile, tarfile
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import FAISS
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_community.llms.yandex import YandexGPT
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from dotenv import load_dotenv
load_dotenv()
from pathlib import Path
import os
from pypdf import PdfReader
# ====================== CONFIG ======================
YANDEX_API_KEY = os.getenv("YANDEX_API_KEY")
YANDEX_FOLDER_ID = os.getenv("YANDEX_FOLDER_ID")
# --- LLMs ---
llm_reasoning = YandexGPT(
api_key=YANDEX_API_KEY,
folder_id=YANDEX_FOLDER_ID,
temperature=0.3,
max_tokens=2000
)
llm_output = YandexGPT(
api_key=YANDEX_API_KEY,
folder_id=YANDEX_FOLDER_ID,
temperature=0.4,
max_tokens=6500
)
def llm_analyze(prompt: str) -> str:
return llm_reasoning.invoke(prompt)
def llm_final(prompt: str) -> str:
return llm_output.invoke(prompt)
# --- Logging ---
@contextmanager
def log_step(name: str):
print(f"\n>>> START: {name}")
start = time.time()
try:
yield
finally:
print(f"<<< END: {name} ({time.time() - start:.2f}s)")
# ====================== STATE ======================
class GraphState(TypedDict):
query: str
toc: str
toc_analysis: str
plan: dict
retrieval_queries: List[str]
contexts: List[Document]
result: dict
iteration: int
recurse: bool
# ====================== ARCHIVE PROCESSING ======================
def read_file(file_path: Path) -> str:
suffix = file_path.suffix.lower()
try:
# --- PDF ---
if suffix == ".pdf":
reader = PdfReader(str(file_path))
text = []
for page in reader.pages:
text.append(page.extract_text() or "")
return "\n".join(text)
# --- TXT / CODE ---
elif suffix in [".txt", ".md", ".py", ".json"]:
return file_path.read_text(encoding="utf-8", errors="ignore")
# --- fallback ---
else:
return file_path.read_text(encoding="utf-8", errors="ignore")
except Exception as e:
print(f"⚠️ Ошибка чтения {file_path}: {e}")
return ""
def extract_archive_to_documents(archive_path: str) -> List[Document]:
documents = []
with tempfile.TemporaryDirectory() as tmpdir:
tmp_path = Path(tmpdir)
if archive_path.endswith('.zip'):
with zipfile.ZipFile(archive_path) as z:
z.extractall(tmpdir)
elif archive_path.endswith(('.tar', '.tar.gz', '.tgz')):
with tarfile.open(archive_path) as t:
t.extractall(tmpdir)
else:
raise ValueError(f"Unsupported archive: {archive_path}")
for file_path in tmp_path.rglob("*"):
if file_path.is_file() and not file_path.name.startswith('.'):
try:
relative_path = file_path.relative_to(tmp_path)
text = read_file(file_path)
if not text.strip():
continue
doc = Document(
page_content=text,
metadata={
"source": str(relative_path),
"file_name": file_path.name,
"file_type": file_path.suffix.lower(),
"size_bytes": file_path.stat().st_size,
}
)
documents.append(doc)
except Exception as e:
print(f"⚠️ Не удалось обработать {file_path}: {e}")
print(f"Извлечено {len(documents)} документов из архива")
return documents
# ====================== VECTORSTORE ======================
def build_vectorstore(docs: List[Document]):
with log_step("Building FAISS Vectorstore"):
splitter = RecursiveCharacterTextSplitter(
chunk_size=1200,
chunk_overlap=150,
separators=["\n\n", "\n", ". ", " ", ""]
)
splits = splitter.split_documents(docs)
embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
db = FAISS.from_documents(splits, embeddings)
retriever = db.as_retriever(
search_type="mmr",
search_kwargs={"k": 10, "fetch_k": 25, "lambda_mult": 0.7}
)
return db, retriever
# ====================== NODES ======================
def analyze_toc(state: GraphState):
with log_step("analyze_toc"):
prompt = f"Оглавление курса:\n{state['toc']}\n\nКратко опиши структуру курса, выделив основные разделы и логику."
toc_analysis = llm_analyze(prompt)
return {"toc_analysis": toc_analysis}
def planner(state: GraphState):
with log_step("planner"):
prompt = f"""
Оглавление:
{state['toc']}
Анализ структуры:
{state['toc_analysis']}
Запрос пользователя:
{state['query']}
Составь план retrieval'а. Верни JSON:
{{
"sections": ["список ключевых тем"],
"queries": ["конкретные поисковые запросы для RAG"]
}}
"""
raw = llm_analyze(prompt)
try:
plan = json.loads(raw)
except:
plan = {"sections": [], "queries": [state["query"]]}
return {
"plan": plan,
"retrieval_queries": plan.get("queries", [state["query"]]),
"iteration": 0
}
def retrieve(state: GraphState):
with log_step("retrieve"):
all_docs = []
for q in state["retrieval_queries"]:
docs = retriever.invoke(q)
all_docs.extend(docs)
seen = set()
unique_docs = []
for doc in all_docs:
content_hash = hash(doc.page_content[:200])
if content_hash not in seen:
seen.add(content_hash)
unique_docs.append(doc)
print(f"Retrieved {len(unique_docs)} unique documents")
return {
"contexts": state["contexts"] + unique_docs
}
def check_completeness(state: GraphState):
with log_step("check_completeness"):
state["iteration"] += 1
total_chars = sum(len(doc.page_content) for doc in state["contexts"])
num_docs = len(state["contexts"])
print(f"Iteration {state['iteration']} | Documents: {num_docs} | Chars: ~{total_chars//1000}k")
# Жёсткий лимит по размеру контекста
if total_chars > 25000 or state["iteration"] >= 8:
print("✅ Достаточно контекста по объёму → завершаем сбор")
return {"recurse": False}
context_preview = "\n\n".join([doc.page_content[:400] for doc in state["contexts"][-8:]])
prompt = f"""
Текущий запрос пользователя:
{state['query']}
Уже собрано {num_docs} документов (~{total_chars//1000}k символов).
Проанализируй, достаточно ли материала, чтобы создать **качественный интенсивный курс**.
Ответь строго JSON:
{{
"enough": true/false,
"reason": "короткое объяснение почему enough или нет",
"next_query": "если enough=false — один **конкретный** поисковый запрос для RAG.
Должен быть тематическим, а не 'составь план'.
Пример: 'аффинные алгебраические многообразия определение свойства примеры'"
}}
Будь строгим. Если контекста уже много и основные темы покрыты — ставь enough: true.
"""
raw = llm_analyze(prompt)
try:
match = re.search(r'\{[\s\S]*\}', raw)
data = json.loads(match.group(0)) if match else json.loads(raw)
enough = data.get("enough", False)
next_query = (data.get("next_query") or "").strip()
reason = data.get("reason", "")
print(f"Enough: {enough} | Reason: {reason[:100]}...")
if not enough and next_query and state["iteration"] < 8:
print(f"→ New query: {next_query}")
return {
"retrieval_queries": state["retrieval_queries"] + [next_query],
"recurse": True
}
else:
print("✅ Завершаем итерации, переходим к генерации плана")
return {"recurse": False}
except Exception as e:
print(f"Parse error: {e}")
# Если модель совсем не дала JSON — выходим после 6 итераций
if state["iteration"] >= 6:
return {"recurse": False}
return {"recurse": True} # попробуем ещё раз
def generate_weekly_plan(state: GraphState):
with log_step("generate_weekly_plan"):
context_blocks = []
for doc in state["contexts"][:25]:
meta = doc.metadata
source = meta.get("source") or meta.get("file_name", "unknown")
preview = doc.page_content[:700].replace("\n", " ").strip()
context_blocks.append(f"Источник → {source}\n{preview}...\n")
context_text = "\n\n" + "-" * 80 + "\n\n".join(context_blocks)
prompt = f"""Ты — строгий профессиональный методист.
Отвечай **исключительно валидным JSON**, без единого слова вне объекта.
Запрос пользователя: {state['query']}
Оглавление курса:
{state.get('toc', 'Не предоставлено')}
Доступные материалы:
{context_text}
Создай понедельный учебный план, соответствующий запросу пользователя.
В одной неделе может быть от 2 до 5 занятий.
Если требуется усиленный курс, то надо от 3х занятий в неделю.
Верни **ТОЛЬКО** этот JSON (начинай сразу с '{{'):
{{
"course_title": "Название курса",
"duration_weeks": число,
"weekly_plan": [
{{
"week": 1,
"theme": "Общая тема недели",
"sessions": [
{{
"session_number": 1,
"title": "Название конкретного занятия",
"goal": "Подробная мотивировка и цель занятия (что студент должен понять и уметь)",
"main_sources": [
{{
"material": "Название учебника или лекции",
"chapter": "§1 Проективное пространство",
"section": "1.1 Соглашения об обозначениях, 1.2 Определение",
"file_source": "имя_файла.pdf"
}}
],
"preparation_materials": ["список строк для подготовки"],
"practice_and_homework": ["конкретные задания, задачи, домашняя работа"],
"estimated_time": "3–5 часов"
}}
]
}}
],
"additional_recommendations": "Общие рекомендации"
}}
Правила, которых нужно строго придерживаться:
- Используй реальные названия глав, параграфов и файлов из предоставленных материалов.
- Обязательно заполняй поле "goal" — подробная мотивировка, зачем это занятие нужно.
- Поле "file_source" должно содержать реальное имя файла из метаданных, если возможно.
- В одной неделе можно делать несколько занятий (sessions).
- Не пиши ничего кроме JSON.
"""
raw = llm_final(prompt)
try:
cleaned = raw.strip()
if cleaned.startswith("```"):
cleaned = cleaned.split("```")[1].strip()
if cleaned.lower().startswith(("json", "```json")):
cleaned = cleaned.split("\n", 1)[-1].strip()
plan = json.loads(cleaned)
except json.JSONDecodeError:
import re
match = re.search(r'(\{[\s\S]*\})', raw, re.DOTALL)
if match:
try:
plan = json.loads(match.group(1))
except:
plan = {"error": "JSON parse failed", "raw_preview": raw[:1000]}
else:
plan = {"error": "JSON parse failed", "raw_preview": raw[:800]}
return {"result": plan}
def make_final_output_pretty(state: GraphState):
import json
raw_plan = state["result"]
if isinstance(raw_plan, dict):
plan_str = json.dumps(raw_plan, ensure_ascii=False, indent=2)
else:
plan_str = str(raw_plan)
prompt = f"""
Ты — отличный технический писатель и методист.
У тебя есть следующий JSON с учебным планом (возможно, неидеальный):
{plan_str}
Твоя задача:
преобразовать его в КРАСИВЫЙ и СТРУКТУРИРОВАННЫЙ Markdown-документ.
Требования:
- Используй заголовки (#, ##, ###)
- Выделяй недели как отдельные секции
- Для каждой недели:
- тема
- список занятий (bullets)
- краткое описание (если есть)
- Добавь читаемую структуру курса
- В конце добавь раздел "Рекомендации"
- Убери технический мусор и JSON-структуру
- Сделай текст естественным, как учебный материал
Формат:
# Название курса
## Неделя 1 — Тема
- Занятие 1
- Занятие 2
## Неделя 2 — ...
## Рекомендации
Верни ТОЛЬКО Markdown, без JSON и без пояснений.
"""
pretty = llm_final(prompt)
return {"result": pretty}
# ====================== GRAPH ======================
def should_continue(state: GraphState) -> str:
if state.get("recurse") is True:
return "retrieve"
return "generate_weekly_plan"
builder = StateGraph(GraphState)
builder.add_node("analyze_toc", analyze_toc)
builder.add_node("planner", planner)
builder.add_node("retrieve", retrieve)
builder.add_node("check_completeness", check_completeness)
builder.add_node("generate_weekly_plan", generate_weekly_plan)
builder.add_node("pretty_output", make_final_output_pretty)
builder.set_entry_point("analyze_toc")
builder.add_edge("analyze_toc", "planner")
builder.add_edge("planner", "retrieve")
builder.add_edge("retrieve", "check_completeness")
builder.add_conditional_edges(
"check_completeness",
should_continue,
{
"retrieve": "retrieve",
"generate_weekly_plan": "generate_weekly_plan"
}
)
builder.add_edge("generate_weekly_plan", "pretty_output")
builder.add_edge("pretty_output", END)
graph = builder.compile()
# ====================== GLOBAL ======================
retriever = None
toc_text = ""
# ====================== MAIN PIPELINE ======================
def run_course_builder(query: str, toc: str, archive_path: str):
global retriever, toc_text
toc_text = toc
documents = extract_archive_to_documents(archive_path)
_, retriever = build_vectorstore(documents)
initial_state = {
"query": query,
"toc": toc_text,
"toc_analysis": "",
"plan": {},
"retrieval_queries": [],
"contexts": [],
"result": {},
"iteration": 0,
"recurse": False
}
final_state = graph.invoke(initial_state)
return final_state["result"]