{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Пайплайн классификации новостных постов (LangGraph)\n", "\n", "Этот пайплайн реализован с использованием **LangGraph** и состоит из двух узлов:\n", "1. **Узел извлечения** - вычленяет основную мысль/сообщение из новостного поста\n", "2. **Узел классификации** - определяет, является ли основная тема однозначной при поиске\n", "\n", "LangGraph позволяет строить графовые структуры агентов с явным управлением состоянием и потоком данных.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Установка зависимостей (раскомментируйте при необходимости)\n", "# !pip install langgraph langchain langchain-openai pydantic python-dotenv pandas tqdm\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "from typing import Literal, Optional, TypedDict, Annotated\n", "from dotenv import load_dotenv\n", "from pydantic import BaseModel, Field\n", "from langchain_openai import ChatOpenAI\n", "from langchain_core.prompts import ChatPromptTemplate\n", "from langchain_core.output_parsers import PydanticOutputParser\n", "from langgraph.graph import StateGraph, END\n", "import pandas as pd\n", "from tqdm import tqdm\n", "import operator\n", "\n", "load_dotenv()\n", "\n", "# Проверка наличия API ключа\n", "OPENROUTER_API_KEY = os.getenv(\"OPENROUTER_API_KEY\")\n", "if not OPENROUTER_API_KEY:\n", " raise ValueError(\"Не найден OPENROUTER_API_KEY в переменных окружения\")\n", "\n", "print(\"✅ API ключ загружен\")\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Определение структурированных моделей вывода\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class MainMessage(BaseModel):\n", " \"\"\"Основная мысль/сообщение новостного поста\"\"\"\n", " \n", " main_topic: str = Field(\n", " description=\"Основная тема или предмет новостного поста (например: 'Выпуск iPhone 17', 'Высказывание политика А о политике Б')\"\n", " )\n", " key_entities: list[str] = Field(\n", " description=\"Ключевые сущности, упомянутые в посте (люди, организации, события, даты, места)\"\n", " )\n", " main_fact_or_statement: str = Field(\n", " description=\"Основной факт или утверждение, содержащееся в посте\"\n", " )\n", " temporal_context: Optional[str] = Field(\n", " default=None,\n", " description=\"Временной контекст события (конкретная дата, период, или 'текущий момент')\"\n", " )\n", " additional_context: str = Field(\n", " description=\"Дополнительный контекст, необходимый для понимания основной мысли\"\n", " )\n", "\n", "\n", "class ClassificationResult(BaseModel):\n", " \"\"\"Результат классификации новостного поста по однозначности поиска\"\"\"\n", " \n", " is_unambiguous: bool = Field(\n", " description=\"Является ли основная тема поста однозначной при поиске. True - однозначная (конкретный факт), False - неоднозначная (могут быть противоречивые ответы)\"\n", " )\n", " confidence: float = Field(\n", " description=\"Уверенность в классификации от 0.0 до 1.0\",\n", " ge=0.0,\n", " le=1.0\n", " )\n", " category: Literal[\"fact\", \"opinion\", \"statement\", \"event\", \"mixed\"] = Field(\n", " description=\"Категория контента: fact - чистый факт, opinion - мнение, statement - высказывание/заявление, event - событие, mixed - смешанный\"\n", " )\n", " search_difficulty: Literal[\"easy\", \"medium\", \"hard\"] = Field(\n", " description=\"Сложность поиска: easy - простой уникальный факт, medium - требует временного контекста, hard - неоднозначный, может иметь противоречивые ответы\"\n", " )\n", " ambiguity_reasons: list[str] = Field(\n", " default_factory=list,\n", " description=\"Причины неоднозначности (если есть): изменчивость позиции, множественные источники, субъективность и т.д.\"\n", " )\n", " reasoning: str = Field(\n", " description=\"Подробное обоснование классификации\"\n", " )\n", " suggested_search_query: str = Field(\n", " description=\"Предлагаемый поисковый запрос для нахождения этой информации\"\n", " )\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Определение состояния графа (LangGraph State)\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Определение состояния графа\n", "# TypedDict используется для определения схемы состояния, которое передается между узлами\n", "\n", "class GraphState(TypedDict):\n", " \"\"\"Состояние, передаваемое между узлами графа\"\"\"\n", " \n", " # Входные данные\n", " original_text: str\n", " \n", " # Результат извлечения (заполняется узлом extraction)\n", " main_message: Optional[MainMessage]\n", " \n", " # Результат классификации (заполняется узлом classification)\n", " classification: Optional[ClassificationResult]\n", " \n", " # Статус обработки\n", " error: Optional[str]\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Настройка LLM через OpenRouter\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Настройка LLM через OpenRouter\n", "\n", "def create_llm(model: str = \"openai/gpt-4o-mini\", temperature: float = 0.0) -> ChatOpenAI:\n", " \"\"\"Создает экземпляр LLM через OpenRouter\"\"\"\n", " return ChatOpenAI(\n", " model=model,\n", " temperature=temperature,\n", " openai_api_key=OPENROUTER_API_KEY,\n", " openai_api_base=\"https://openrouter.ai/api/v1\",\n", " )\n", "\n", "# Модель для использования\n", "MODEL_NAME = \"openai/gpt-4o-mini\"\n", "\n", "# Создаем LLM\n", "llm = create_llm(MODEL_NAME)\n", "\n", "print(f\"✅ Используемая модель: {MODEL_NAME}\")\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Определение узлов графа (Nodes)\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Узел 1: Извлечение основной мысли\n", "\n", "extraction_parser = PydanticOutputParser(pydantic_object=MainMessage)\n", "\n", "extraction_prompt = ChatPromptTemplate.from_messages([\n", " (\"system\", \"\"\"Ты - эксперт по анализу новостного контента. Твоя задача - извлечь основную мысль и ключевую информацию из новостного поста.\n", "\n", "Анализируй текст внимательно и выдели:\n", "1. Основную тему поста\n", "2. Все ключевые сущности (люди, организации, места, даты, события)\n", "3. Главный факт или утверждение\n", "4. Временной контекст (когда это произошло/происходит)\n", "5. Дополнительный контекст для понимания\n", "\n", "{format_instructions}\"\"\"),\n", " (\"human\", \"Проанализируй следующий новостной пост и извлеки основную мысль:\\n\\n{text}\")\n", "])\n", "\n", "extraction_chain = extraction_prompt | llm | extraction_parser\n", "\n", "\n", "def extraction_node(state: GraphState) -> dict:\n", " \"\"\"\n", " Узел извлечения основной мысли.\n", " Принимает состояние, извлекает основную мысль и возвращает обновление состояния.\n", " \"\"\"\n", " try:\n", " result = extraction_chain.invoke({\n", " \"text\": state[\"original_text\"],\n", " \"format_instructions\": extraction_parser.get_format_instructions()\n", " })\n", " return {\"main_message\": result, \"error\": None}\n", " except Exception as e:\n", " return {\"main_message\": None, \"error\": f\"Ошибка извлечения: {str(e)}\"}\n", "\n", "\n", "print(\"✅ Узел извлечения определен\")\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Узел 2: Классификация по однозначности\n", "\n", "classification_parser = PydanticOutputParser(pydantic_object=ClassificationResult)\n", "\n", "classification_prompt = ChatPromptTemplate.from_messages([\n", " (\"system\", \"\"\"Ты - эксперт по классификации новостного контента для поисковых систем.\n", "\n", "Твоя задача - определить, является ли новостной пост ОДНОЗНАЧНЫМ или НЕОДНОЗНАЧНЫМ для поиска.\n", "\n", "## Критерии ОДНОЗНАЧНОГО контента (is_unambiguous=True):\n", "- Конкретные факты с точными датами и цифрами (\"Apple выпустила iPhone 17 15 сентября 2025\")\n", "- Уникальные события, которые произошли один раз\n", "- Официальные решения, законы, назначения\n", "- Результаты спортивных событий, выборов\n", "- Финансовые показатели за конкретный период\n", "\n", "## Критерии НЕОДНОЗНАЧНОГО контента (is_unambiguous=False):\n", "- Высказывания и мнения, которые могут меняться со временем\n", "- Позиции политиков/персон по вопросам (\"политик А заявил о политике Б\")\n", "- Прогнозы и ожидания\n", "- Оценочные суждения\n", "- События без точной привязки ко времени\n", "- Темы, где возможны противоречивые источники\n", "\n", "## Сложность поиска:\n", "- easy: Уникальный факт, легко найти один правильный ответ\n", "- medium: Требует временного/контекстного уточнения\n", "- hard: Высокая вероятность найти противоречивые ответы\n", "\n", "{format_instructions}\"\"\"),\n", " (\"human\", \"\"\"Проклассифицируй следующий новостной контент:\n", "\n", "## Оригинальный текст:\n", "{original_text}\n", "\n", "## Извлечённая основная мысль:\n", "- Тема: {main_topic}\n", "- Ключевые сущности: {key_entities}\n", "- Основной факт/утверждение: {main_fact}\n", "- Временной контекст: {temporal_context}\n", "- Дополнительный контекст: {additional_context}\n", "\n", "Определи, является ли этот контент однозначным для поиска.\"\"\")\n", "])\n", "\n", "classification_chain = classification_prompt | llm | classification_parser\n", "\n", "\n", "def classification_node(state: GraphState) -> dict:\n", " \"\"\"\n", " Узел классификации контента.\n", " Принимает состояние с извлеченной мыслью и классифицирует её.\n", " \"\"\"\n", " # Проверяем, есть ли ошибка на предыдущем шаге\n", " if state.get(\"error\"):\n", " return {\"classification\": None}\n", " \n", " main_message = state.get(\"main_message\")\n", " if not main_message:\n", " return {\"classification\": None, \"error\": \"Отсутствует main_message для классификации\"}\n", " \n", " try:\n", " result = classification_chain.invoke({\n", " \"original_text\": state[\"original_text\"],\n", " \"main_topic\": main_message.main_topic,\n", " \"key_entities\": \", \".join(main_message.key_entities),\n", " \"main_fact\": main_message.main_fact_or_statement,\n", " \"temporal_context\": main_message.temporal_context or \"не указан\",\n", " \"additional_context\": main_message.additional_context,\n", " \"format_instructions\": classification_parser.get_format_instructions()\n", " })\n", " return {\"classification\": result}\n", " except Exception as e:\n", " return {\"classification\": None, \"error\": f\"Ошибка классификации: {str(e)}\"}\n", "\n", "\n", "print(\"✅ Узел классификации определен\")\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Построение графа LangGraph\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Создание графа\n", "workflow = StateGraph(GraphState)\n", "\n", "# Добавляем узлы\n", "workflow.add_node(\"extraction\", extraction_node)\n", "workflow.add_node(\"classification\", classification_node)\n", "\n", "# Определяем входную точку\n", "workflow.set_entry_point(\"extraction\")\n", "\n", "# Добавляем рёбра (переходы между узлами)\n", "workflow.add_edge(\"extraction\", \"classification\")\n", "workflow.add_edge(\"classification\", END)\n", "\n", "# Компилируем граф\n", "graph = workflow.compile()\n", "\n", "print(\"✅ Граф LangGraph скомпилирован\")\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Визуализация графа (опционально)\n", "try:\n", " from IPython.display import Image, display\n", " display(Image(graph.get_graph().draw_mermaid_png()))\n", "except Exception as e:\n", " print(f\"Визуализация недоступна: {e}\")\n", " print(\"\\nСтруктура графа:\")\n", " print(\" [START] → extraction → classification → [END]\")\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Обёртка пайплайна\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class PipelineResult(BaseModel):\n", " \"\"\"Полный результат работы пайплайна\"\"\"\n", " original_text: str\n", " main_message: Optional[MainMessage]\n", " classification: Optional[ClassificationResult]\n", " error: Optional[str] = None\n", "\n", "\n", "class NewsClassificationPipeline:\n", " \"\"\"Обёртка над графом LangGraph для удобного использования\"\"\"\n", " \n", " def __init__(self, compiled_graph=None):\n", " self.graph = compiled_graph or graph\n", " \n", " def process(self, text: str) -> PipelineResult:\n", " \"\"\"Обрабатывает один новостной пост через граф\"\"\"\n", " initial_state = {\n", " \"original_text\": text,\n", " \"main_message\": None,\n", " \"classification\": None,\n", " \"error\": None\n", " }\n", " \n", " # Запускаем граф\n", " final_state = self.graph.invoke(initial_state)\n", " \n", " return PipelineResult(\n", " original_text=text,\n", " main_message=final_state.get(\"main_message\"),\n", " classification=final_state.get(\"classification\"),\n", " error=final_state.get(\"error\")\n", " )\n", " \n", " def process_batch(self, texts: list[str], show_progress: bool = True) -> list[PipelineResult]:\n", " \"\"\"Обрабатывает список постов\"\"\"\n", " results = []\n", " iterator = tqdm(texts, desc=\"Обработка постов\") if show_progress else texts\n", " \n", " for text in iterator:\n", " result = self.process(text)\n", " results.append(result)\n", " \n", " return results\n", " \n", " def stream(self, text: str):\n", " \"\"\"Потоковая обработка с выводом промежуточных состояний\"\"\"\n", " initial_state = {\n", " \"original_text\": text,\n", " \"main_message\": None,\n", " \"classification\": None,\n", " \"error\": None\n", " }\n", " \n", " for event in self.graph.stream(initial_state):\n", " yield event\n", "\n", "\n", "# Создаем экземпляр пайплайна\n", "pipeline = NewsClassificationPipeline()\n", "print(\"✅ Пайплайн LangGraph готов к работе\")\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Демонстрация работы пайплайна\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Примеры для тестирования\n", "test_posts = [\n", " \"\"\"▪️Apple представила iPhone 17 на презентации 10 сентября 2025 года. \n", " Новый смартфон получил процессор A19 Bionic и камеру на 200 мегапикселей. \n", " Цена в России начинается от 129 990 рублей.\"\"\",\n", " \n", " \"\"\"▪️Путин заявил о готовности к переговорам по Украине.\n", " «Мы всегда открыты к диалогу», – подчеркнул президент на встрече с журналистами.\n", " При этом он отметил, что условия для переговоров должны учитывать интересы России.\"\"\",\n", " \n", " \"\"\"▪️Роскомнадзор сообщил об ограничении звонков через Telegram и WhatsApp.\n", " «По данным правоохранительных органов, иностранные мессенджеры стали основными \n", " голосовыми сервисами для обмана граждан», – пояснили в пресс-службе ведомства.\"\"\",\n", " \n", " \"\"\"▪️Индекс Мосбиржи упал на 3,2% по итогам торгов 13 марта 2025 года.\n", " Основными аутсайдерами стали акции Сбербанка (-4,5%) и Газпрома (-3,8%).\n", " Аналитики связывают падение с геополитической напряжённостью.\"\"\"\n", "]\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Демонстрация потоковой обработки (streaming) - уникальная возможность LangGraph\n", "print(\"🔄 Потоковая обработка первого поста:\\n\")\n", "print(f\"Текст: {test_posts[0][:100]}...\\n\")\n", "\n", "for step in pipeline.stream(test_posts[0]):\n", " node_name = list(step.keys())[0]\n", " print(f\"📍 Узел: {node_name}\")\n", " \n", " if node_name == \"extraction\" and step[node_name].get(\"main_message\"):\n", " msg = step[node_name][\"main_message\"]\n", " print(f\" Тема: {msg.main_topic}\")\n", " print(f\" Сущности: {', '.join(msg.key_entities)}\")\n", " \n", " if node_name == \"classification\" and step[node_name].get(\"classification\"):\n", " cls = step[node_name][\"classification\"]\n", " status = \"✅ ОДНОЗНАЧНЫЙ\" if cls.is_unambiguous else \"⚠️ НЕОДНОЗНАЧНЫЙ\"\n", " print(f\" Статус: {status}\")\n", " print(f\" Сложность: {cls.search_difficulty}\")\n", " print()\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Обработка всех тестовых примеров\n", "results = []\n", "\n", "for i, post in enumerate(test_posts, 1):\n", " print(f\"\\n{'='*80}\")\n", " print(f\"📰 ПОСТ #{i}\")\n", " print(f\"{'='*80}\")\n", " print(post[:200] + \"...\" if len(post) > 200 else post)\n", " \n", " result = pipeline.process(post)\n", " results.append(result)\n", " \n", " if result.error:\n", " print(f\"\\n❌ Ошибка: {result.error}\")\n", " continue\n", " \n", " print(f\"\\n📋 ОСНОВНАЯ МЫСЛЬ:\")\n", " print(f\" Тема: {result.main_message.main_topic}\")\n", " print(f\" Сущности: {', '.join(result.main_message.key_entities)}\")\n", " print(f\" Факт: {result.main_message.main_fact_or_statement}\")\n", " print(f\" Время: {result.main_message.temporal_context}\")\n", " \n", " print(f\"\\n🎯 КЛАССИФИКАЦИЯ:\")\n", " status = \"✅ ОДНОЗНАЧНЫЙ\" if result.classification.is_unambiguous else \"⚠️ НЕОДНОЗНАЧНЫЙ\"\n", " print(f\" Статус: {status}\")\n", " print(f\" Уверенность: {result.classification.confidence:.0%}\")\n", " print(f\" Категория: {result.classification.category}\")\n", " print(f\" Сложность поиска: {result.classification.search_difficulty}\")\n", " if result.classification.ambiguity_reasons:\n", " print(f\" Причины неоднозначности: {', '.join(result.classification.ambiguity_reasons)}\")\n", " print(f\" Поисковый запрос: {result.classification.suggested_search_query}\")\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Преобразование результатов в DataFrame\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def results_to_dataframe(results: list[PipelineResult]) -> pd.DataFrame:\n", " \"\"\"Преобразует результаты в pandas DataFrame\"\"\"\n", " rows = []\n", " \n", " for r in results:\n", " if r.error or not r.main_message or not r.classification:\n", " continue\n", " \n", " rows.append({\n", " \"original_text\": r.original_text[:100] + \"...\",\n", " \"main_topic\": r.main_message.main_topic,\n", " \"key_entities\": \", \".join(r.main_message.key_entities),\n", " \"main_fact\": r.main_message.main_fact_or_statement,\n", " \"temporal_context\": r.main_message.temporal_context,\n", " \"is_unambiguous\": r.classification.is_unambiguous,\n", " \"confidence\": r.classification.confidence,\n", " \"category\": r.classification.category,\n", " \"search_difficulty\": r.classification.search_difficulty,\n", " \"ambiguity_reasons\": \", \".join(r.classification.ambiguity_reasons),\n", " \"suggested_query\": r.classification.suggested_search_query\n", " })\n", " \n", " return pd.DataFrame(rows)\n", "\n", "\n", "df_results = results_to_dataframe(results)\n", "df_results\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Применение к реальным данным\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Загрузка реальных данных\n", "data = pd.read_csv('src/dataset/rbc/channel_rbc_news_posts.csv')\n", "data[\"message_dt\"] = pd.to_datetime(data[\"message_dt\"])\n", "data = data.sort_values(\"message_dt\")\n", "\n", "print(f\"Загружено {len(data)} постов\")\n", "print(f\"Период: {data['message_dt'].min()} - {data['message_dt'].max()}\")\n", "data.head()\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Обработка выборки постов\n", "SAMPLE_SIZE = 10\n", "\n", "sample_data = data.sample(n=min(SAMPLE_SIZE, len(data)), random_state=42)\n", "sample_texts = sample_data[\"content\"].dropna().tolist()\n", "\n", "print(f\"Обрабатываем {len(sample_texts)} постов...\")\n", "\n", "sample_results = pipeline.process_batch(sample_texts)\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Анализ результатов\n", "df_sample_results = results_to_dataframe(sample_results)\n", "\n", "if len(df_sample_results) > 0:\n", " print(\"\\n📊 СТАТИСТИКА КЛАССИФИКАЦИИ:\")\n", " print(f\" Однозначных постов: {df_sample_results['is_unambiguous'].sum()} ({df_sample_results['is_unambiguous'].mean():.0%})\")\n", " print(f\" Неоднозначных постов: {(~df_sample_results['is_unambiguous']).sum()} ({(~df_sample_results['is_unambiguous']).mean():.0%})\")\n", "\n", " print(\"\\n📈 РАСПРЕДЕЛЕНИЕ ПО СЛОЖНОСТИ:\")\n", " print(df_sample_results['search_difficulty'].value_counts())\n", "\n", " print(\"\\n📂 РАСПРЕДЕЛЕНИЕ ПО КАТЕГОРИЯМ:\")\n", " print(df_sample_results['category'].value_counts())\n", "else:\n", " print(\"Нет успешно обработанных результатов\")\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Сохранение результатов\n", "if len(df_sample_results) > 0:\n", " df_sample_results.to_csv('classification_results_langgraph.csv', index=False)\n", " print(\"✅ Результаты сохранены в classification_results_langgraph.csv\")\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Преимущества LangGraph\n", "\n", "**LangGraph** предоставляет ряд преимуществ по сравнению с обычными цепочками LangChain:\n", "\n", "1. **Явное управление состоянием** - `GraphState` определяет схему данных, передаваемых между узлами\n", "2. **Потоковая обработка (Streaming)** - возможность отслеживать промежуточные результаты через `graph.stream()`\n", "3. **Визуализация** - граф можно визуализировать для понимания потока данных\n", "4. **Условные переходы** - можно добавить условную логику для разветвления графа\n", "5. **Циклы** - поддержка циклических графов для итеративных агентов\n", "6. **Checkpointing** - сохранение состояния для возобновления обработки\n", "\n", "### Структура графа\n", "\n", "```\n", "[START] → extraction → classification → [END]\n", "```\n", "\n", "- **extraction**: Извлекает основную мысль из текста\n", "- **classification**: Классифицирует контент по однозначности для поиска\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [] } ], "metadata": { "language_info": { "name": "python" } }, "nbformat": 4, "nbformat_minor": 2 }