ใ……ใ…Žใ…‡
Add CodeWeaver Gradio app
515f392
"""
CodeWeaver LangGraph ๋…ธ๋“œ ๊ตฌํ˜„.
๊ฐ ๋…ธ๋“œ๋Š” AgentState๋ฅผ ๋ฐ›์•„ ์ฒ˜๋ฆฌํ•˜๊ณ  ์—…๋ฐ์ดํŠธ๋œ ์ƒํƒœ๋ฅผ ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค.
๋ชจ๋“  ๋…ธ๋“œ๋Š” LangSmith๋ฅผ ํ†ตํ•ด ์ž๋™์œผ๋กœ ์ถ”์ ๋ฉ๋‹ˆ๋‹ค.
"""
import asyncio
import logging
import os
from typing import List, Literal, Optional
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_google_genai import ChatGoogleGenerativeAI
from langgraph.graph import StateGraph, START, END
from langgraph.types import Send
from src.agent.state import AgentState, SearchResult
from src.agent.state import _MULTI_ANS_RESET_TOKEN # reset token for multi_answers reducer
from src.tools.search_tools import (
search_github,
search_official_docs,
search_stackoverflow,
)
from src.utils.tracing import trace_node
from src.vector_db.qdrant_client import QdrantManager
logger = logging.getLogger(__name__)
# LLM ์ดˆ๊ธฐํ™” (Gemini 2.5 Flash)
llm = ChatGoogleGenerativeAI(
model="gemini-2.5-flash-lite",
temperature=0.7,
)
# Qdrant ๋งค๋‹ˆ์ € ์ดˆ๊ธฐํ™”
qdrant_manager = QdrantManager()
@trace_node("analyze_question")
async def analyze_question_node(state: AgentState) -> dict:
"""
์งˆ๋ฌธ์„ ๋ถ„์„ํ•˜์—ฌ ์œ ํ˜•์„ ๋ถ„๋ฅ˜ํ•˜๊ณ  ์บ์‹œ ์ ๊ฒฉ์„ฑ์„ ํŒ๋‹จํ•ฉ๋‹ˆ๋‹ค.
Phase 2: Question Analysis & Cache Eligibility Decision
๋ถ„๋ฅ˜:
- followup: ์ด์ „ ๋Œ€ํ™”์— ์˜์กดํ•˜๋Š” ํ›„์† ์งˆ๋ฌธ
- cache_candidate: ๋…๋ฆฝ์ ์ด๊ณ  ์žฌ์‚ฌ์šฉ ๊ฐ€๋Šฅํ•œ ์งˆ๋ฌธ
- new_search: ๋…๋ฆฝ์ ์ด์ง€๋งŒ ์บ์‹œํ•˜์ง€ ์•Š์„ ์งˆ๋ฌธ (์‹œ๊ฐ„ ๋ฏผ๊ฐ ๋“ฑ)
"""
user_question = state.user_question
messages = state.messages
# ๋Œ€ํ™” ๋งฅ๋ฝ ๊ตฌ์„ฑ
has_history = messages and len(messages) > 1
context_info = ""
if has_history:
context_info = "\n์ด์ „ ๋Œ€ํ™” ๋งฅ๋ฝ:\n"
for msg in messages[-4:-1]: # ํ˜„์žฌ ์งˆ๋ฌธ ์ œ์™ธ ์ตœ๊ทผ 3๊ฐœ
if hasattr(msg, 'type') and hasattr(msg, 'content'):
role = "์‚ฌ์šฉ์ž" if msg.type == "human" else "AI"
context_info += f"{role}: {msg.content[:100]}\n"
analysis_prompt = f"""์งˆ๋ฌธ์„ ๋ถ„์„ํ•˜์—ฌ ์œ ํ˜•์„ ๋ถ„๋ฅ˜ํ•˜๊ณ , ์บ์‹œ ์ ๊ฒฉ์„ฑ์„ ํŒ๋‹จํ•˜์„ธ์š”.
{context_info}
ํ˜„์žฌ ์งˆ๋ฌธ: {user_question}
๋ถ„๋ฅ˜ ๊ธฐ์ค€:
1. **clarification** (๋ณด์ถฉ/ํ˜•์‹ ๋ณ€๊ฒฝ ์š”์ฒญ)
- ์ด์ „ ๋‹ต๋ณ€/๋Œ€ํ™” ๋‚ด์šฉ์„ ๋ฐ”ํƒ•์œผ๋กœ "์„ค๋ช… ๋ฐฉ์‹"์„ ๋ฐ”๊พธ๊ฑฐ๋‚˜ ๋ณด์ถฉ์„ ์š”์ฒญ
- ์˜ˆ: "์ข€ ๋” ์‰ฝ๊ฒŒ ์„ค๋ช…ํ•ด์ค˜", "์˜ˆ์ œ ์ฝ”๋“œ๋กœ ๋ณด์—ฌ์ค˜", "ํ•œ ์ค„๋กœ ์š”์•ฝํ•ด์ค˜", "๋‹ค์‹œ ์„ค๋ช…ํ•ด์ค˜"
- ์›์น™: ๊ฒ€์ƒ‰/์บ์‹œ๊ฐ€ ์•„๋‹ˆ๋ผ ๋Œ€ํ™” ํžˆ์Šคํ† ๋ฆฌ ๊ธฐ๋ฐ˜ ๋‹ต๋ณ€
- should_cache = false, canonical_question = null
2. **new_topic** (๋Œ€ํ™” ์ค‘ ์ƒˆ ๊ฐœ๋… ์งˆ๋ฌธ)
- ๋Œ€ํ™”๊ฐ€ ์ด์–ด์ง€๋Š” ์ค‘์ด์ง€๋งŒ, ์งˆ๋ฌธ ์ž์ฒด๊ฐ€ ๋…๋ฆฝ์ ์œผ๋กœ ์„ฑ๋ฆฝํ•˜๋Š” '์ƒˆ ๊ฐœ๋…/์ •์˜/๋น„๊ต/์‚ฌ์šฉ๋ฒ•' ์งˆ๋ฌธ
- ์˜ˆ: (React ์ด์•ผ๊ธฐ ์ค‘) "Event Listener๋Š” ๋ญ์•ผ?", "CORS๊ฐ€ ๋ญ์•ผ?"
- ์›์น™: ๊ฒ€์ƒ‰ + ์บ์‹œ ์ €์žฅ ๊ฐ€์น˜๊ฐ€ ํผ
- should_cache = true (๊ธฐ๋ณธ), canonical_question ์ƒ์„ฑ
3. **independent** (์™„์ „ ๋…๋ฆฝ ์งˆ๋ฌธ)
- ์ด์ „ ๋Œ€ํ™” ์—†์ด๋„ ์ดํ•ด ๊ฐ€๋Šฅํ•œ ์ผ๋ฐ˜ ์งˆ๋ฌธ
- ์˜ˆ: "Spring Security๊ฐ€ ๋ญ์•ผ?", "Docker Compose ์‚ฌ์šฉ๋ฒ•์€?"
- ์›์น™: ๊ฒ€์ƒ‰ + ์บ์‹œ ์ €์žฅ ๊ฐ€์น˜๊ฐ€ ํผ
- should_cache = true (๊ธฐ๋ณธ), canonical_question ์ƒ์„ฑ
๋‹ค์Œ JSON ํ˜•์‹์œผ๋กœ๋งŒ ๋‹ต๋ณ€ํ•˜์„ธ์š”:
{{
"question_type": "clarification|new_topic|independent",
"should_cache": true|false,
"reasoning": "๋ถ„๋ฅ˜ ์ด์œ  1-2๋ฌธ์žฅ",
"canonical_question": "์บ์‹œํ•  ์ •๊ทœํ™”๋œ ์งˆ๋ฌธ (should_cache๊ฐ€ true์ธ ๊ฒฝ์šฐ์—๋งŒ, ์•„๋‹ˆ๋ฉด null)"
}}
JSON ์™ธ์— ๋‹ค๋ฅธ ํ…์ŠคํŠธ๋Š” ํฌํ•จํ•˜์ง€ ๋งˆ์„ธ์š”."""
try:
messages_to_llm = [HumanMessage(content=analysis_prompt)]
response = llm.invoke(messages_to_llm)
# JSON ํŒŒ์‹ฑ
import json
response_text = response.content.strip()
# JSON ๋ธ”๋ก ์ถ”์ถœ (๋งˆํฌ๋‹ค์šด ์ฝ”๋“œ ๋ธ”๋ก ์ œ๊ฑฐ)
if "```json" in response_text:
response_text = response_text.split("```json")[1].split("```")[0].strip()
elif "```" in response_text:
response_text = response_text.split("```")[1].split("```")[0].strip()
analysis = json.loads(response_text)
question_type = analysis.get("question_type", "independent")
should_cache = analysis.get("should_cache", False)
reasoning = analysis.get("reasoning", "")
canonical_question = analysis.get("canonical_question", user_question)
# ์œ ํšจ์„ฑ ๊ฒ€์ฆ
if question_type not in ["clarification", "new_topic", "independent"]:
question_type = "independent"
# 1์ฐจ ์ •์ฑ… ๋ณด์ •: clarification์€ ์บ์‹œ ๊ธˆ์ง€
if question_type == "clarification":
should_cache = False
canonical_question = None
else:
# new_topic/independent๋Š” ๊ธฐ๋ณธ์ ์œผ๋กœ ์บ์‹œ ๊ฐ€๋Šฅ
if canonical_question is None or (isinstance(canonical_question, str) and not canonical_question.strip()):
canonical_question = user_question
# ์‹คํ–‰(run) ์‹œ์ž‘๋งˆ๋‹ค step ๋กœ๊ทธ๋ฅผ ๋ฆฌ์…‹ํ•˜๊ณ , ์ด๋ฒˆ ์‹คํ–‰์˜ step๋งŒ ๋ˆ„์ ๋˜๊ฒŒ ํ•จ
steps_delta = [
"__RESET_STEPS__",
f"๐Ÿ” ์งˆ๋ฌธ ๋ถ„์„: {question_type} (์บ์‹œ ์—ฌ๋ถ€: {should_cache})",
]
return {
"question_type": question_type,
"should_cache": should_cache,
"analysis_reasoning": reasoning,
"canonical_question": canonical_question if should_cache else None,
"intermediate_steps": steps_delta
}
except Exception as e:
logger.error("์งˆ๋ฌธ ๋ถ„์„ ์‹คํŒจ: %s", e, exc_info=True)
# ๊ธฐ๋ณธ๊ฐ’: ๋…๋ฆฝ ์งˆ๋ฌธ์œผ๋กœ ๊ฐ„์ฃผ
steps_delta = [
"__RESET_STEPS__",
"โš ๏ธ ์งˆ๋ฌธ ๋ถ„์„ ์‹คํŒจ, ๊ธฐ๋ณธ๊ฐ’ ์‚ฌ์šฉ: independent",
]
return {
"question_type": "independent",
"should_cache": True,
"analysis_reasoning": "๋ถ„์„ ์‹คํŒจ, ๊ธฐ๋ณธ๊ฐ’ ์‚ฌ์šฉ",
"canonical_question": user_question,
"intermediate_steps": steps_delta
}
@trace_node("check_cache")
async def check_cache_node(state: AgentState) -> dict:
"""
๋ฒกํ„ฐ DB ์บ์‹œ์—์„œ ์œ ์‚ฌํ•œ ์งˆ๋ฌธ์„ ๊ฒ€์ƒ‰ํ•ฉ๋‹ˆ๋‹ค.
threshold 0.85 ์ด์ƒ์ธ ๊ฒฝ์šฐ ์บ์‹œ ํžˆํŠธ๋กœ ํŒ๋‹จํ•ฉ๋‹ˆ๋‹ค.
"""
question_for_lookup = state.canonical_question or state.user_question
logger.info("์บ์‹œ ํ™•์ธ ์ค‘: %s", question_for_lookup[:50])
try:
cached_result = await qdrant_manager.search_cache(
question=question_for_lookup,
threshold=0.85
)
updates = {}
steps_delta: List[str] = []
if cached_result:
updates["cached_result"] = cached_result
steps_delta.append(f"โœ… ์บ์‹œ ํžˆํŠธ (๋‹ต๋ณ€ ๊ธธ์ด: {len(cached_result)}์ž)")
logger.info("์บ์‹œ ํžˆํŠธ")
else:
updates["cached_result"] = None
steps_delta.append("โŒ ์บ์‹œ ๋ฏธ์Šค: ์ƒˆ๋กœ์šด ๊ฒ€์ƒ‰ ํ•„์š”")
logger.info("์บ์‹œ ๋ฏธ์Šค")
except Exception as e:
logger.error("์บ์‹œ ํ™•์ธ ์‹คํŒจ: %s", e, exc_info=True)
updates["cached_result"] = None
steps_delta.append(f"โš ๏ธ ์บ์‹œ ํ™•์ธ ์˜ค๋ฅ˜: {str(e)}")
updates["intermediate_steps"] = steps_delta
return updates
@trace_node("create_plan")
def create_plan_node(state: AgentState) -> dict:
"""
์งˆ๋ฌธ์„ ๋ถ„์„ํ•˜์—ฌ ์œ ํ˜•๊ณผ ๊ฐœ์ˆ˜๋ฅผ ํŒ๋‹จํ•ฉ๋‹ˆ๋‹ค.
Phase 4: Dynamic Parallel Search
- single_topic: ํ•˜๋‚˜์˜ ์ฃผ์ œ (๊ธฐ์กด ๊ทธ๋ž˜ํ”„ ์‹คํ–‰)
- multiple_questions: ๋…๋ฆฝ ์งˆ๋ฌธ 2๊ฐœ (Send API๋กœ ๊ทธ๋ž˜ํ”„ 2ํšŒ ์‹คํ–‰)
- too_many: ๋…๋ฆฝ ์งˆ๋ฌธ 3๊ฐœ ์ด์ƒ (์—๋Ÿฌ ๋ฉ”์‹œ์ง€)
LangGraph ๊ณต์‹ ๊ฐ€์ด๋“œ๋ผ์ธ: ๋…ธ๋“œ๋Š” ํ•œ ๊ฐ€์ง€ ์ผ๋งŒ ์ˆ˜ํ–‰ (๊ณ„ํš ์ˆ˜๋ฆฝ)
"""
user_question = state.user_question
logger.info("์งˆ๋ฌธ ๋ถ„์„ ๋ฐ ๊ณ„ํš ์ˆ˜๋ฆฝ ์ค‘: %s", user_question[:50])
def _extract_question_candidates(text: str) -> List[str]:
"""์ž…๋ ฅ ๋ฌธ์ž์—ด์—์„œ '์งˆ๋ฌธ ํ›„๋ณด'๋ฅผ ์ตœ๋Œ€ํ•œ ๋ณด์ˆ˜์ ์œผ๋กœ ์ถ”์ถœํ•ฉ๋‹ˆ๋‹ค(3๊ฐœ ์ด์ƒ ๊ฐ์ง€์šฉ)."""
import re
if not text:
return []
t = text.strip()
# 1) ๋ฌผ์Œํ‘œ ๊ธฐ๋ฐ˜ ๋ถ„๋ฆฌ (๊ฐ€์žฅ ์‹ ๋ขฐ๋„ ๋†’์Œ)
parts = re.split(r"[?๏ผŸ]+", t)
candidates = [p.strip() for p in parts if p.strip()]
if len(candidates) >= 2 and re.search(r"[?๏ผŸ]", t):
# ๋ฌผ์Œํ‘œ๊ฐ€ ์กด์žฌํ•  ๋•Œ๋งŒ ์ด ๊ทœ์น™์„ ์‹ ๋ขฐ
return candidates
# 2) ์ค„๋ฐ”๊ฟˆ/๋ฒˆํ˜ธ ๋งค๊ธฐ๊ธฐ ๊ธฐ๋ฐ˜ (๋‹ค์ค‘ ์งˆ๋ฌธ ์ž…๋ ฅ ํŒจํ„ด)
lines = [ln.strip() for ln in re.split(r"[\r\n]+", t) if ln.strip()]
numbered = []
for ln in lines:
if re.match(r"^\s*(\d+[\.\)]|[-*])\s+", ln):
numbered.append(re.sub(r"^\s*(\d+[\.\)]|[-*])\s+", "", ln).strip())
if len(numbered) >= 2:
return numbered
# 3) ๊ตฌ๋ถ„์ž ๊ธฐ๋ฐ˜(์„ธ๋ฏธ์ฝœ๋ก ) โ€” ๋ณด์กฐ
semi = [p.strip() for p in t.split(";") if p.strip()]
if len(semi) >= 2:
return semi
return [t]
def _hard_guard_too_many(text: str) -> Optional[dict]:
"""
ํ•˜๋“œ ๊ฐ€๋“œ: ์‚ฌ์šฉ์ž๊ฐ€ '์งˆ๋ฌธ 3๊ฐœ ์ด์ƒ'์„ ํ•œ ๋ฒˆ์— ๋˜์ง„ ๊ฒƒ์œผ๋กœ ํ™•์‹คํ•œ ๊ฒฝ์šฐ,
LLM ๋ถ„๋ฅ˜์™€ ๋ฌด๊ด€ํ•˜๊ฒŒ too_many๋กœ ๊ฐ•์ œํ•ฉ๋‹ˆ๋‹ค.
"""
import re
if not text:
return None
# ๊ฐ€์žฅ ํ™•์‹คํ•œ ๊ธฐ์ค€: ๋ฌผ์Œํ‘œ๊ฐ€ 3๊ฐœ ์ด์ƒ
qmarks = len(re.findall(r"[?๏ผŸ]", text))
if qmarks >= 3:
candidates = _extract_question_candidates(text)
msg = "์ฃ„์†กํ•ฉ๋‹ˆ๋‹ค. ์งˆ๋ฌธ์€ ํ•œ ๋ฒˆ์— ์ตœ๋Œ€ 2๊ฐœ๊นŒ์ง€ ๊ฐ€๋Šฅํ•ฉ๋‹ˆ๋‹ค. ๊ฐ€์žฅ ์ค‘์š”ํ•œ 2๊ฐœ๋งŒ ๊ณจ๋ผ์„œ ๋‹ค์‹œ ์งˆ๋ฌธํ•ด ์ฃผ์„ธ์š”."
return {
"case": "too_many",
"sub_questions": candidates,
"reasoning": f"๋ฌผ์Œํ‘œ๊ฐ€ {qmarks}๊ฐœ๋กœ, 3๊ฐœ ์ด์ƒ์˜ ๋…๋ฆฝ ์งˆ๋ฌธ์œผ๋กœ ํŒ๋‹จํ–ˆ์Šต๋‹ˆ๋‹ค.",
"error_message": msg,
"steps_note": f"โš ๏ธ ์งˆ๋ฌธ ์ˆ˜ ์ดˆ๊ณผ ๊ฐ์ง€(๋ฌผ์Œํ‘œ {qmarks}๊ฐœ) โ†’ too_many๋กœ ๊ฐ•์ œ",
}
# ๋ฒˆํ˜ธ ๋งค๊ธฐ๊ธฐ/๋ฆฌ์ŠคํŠธ๋กœ 3๊ฐœ ์ด์ƒ
candidates = _extract_question_candidates(text)
if len(candidates) >= 3:
msg = "์ฃ„์†กํ•ฉ๋‹ˆ๋‹ค. ์งˆ๋ฌธ์€ ํ•œ ๋ฒˆ์— ์ตœ๋Œ€ 2๊ฐœ๊นŒ์ง€ ๊ฐ€๋Šฅํ•ฉ๋‹ˆ๋‹ค. ๊ฐ€์žฅ ์ค‘์š”ํ•œ 2๊ฐœ๋งŒ ๊ณจ๋ผ์„œ ๋‹ค์‹œ ์งˆ๋ฌธํ•ด ์ฃผ์„ธ์š”."
return {
"case": "too_many",
"sub_questions": candidates,
"reasoning": f"์งˆ๋ฌธ ํ›„๋ณด๊ฐ€ {len(candidates)}๊ฐœ๋กœ ๊ฐ์ง€๋˜์–ด 3๊ฐœ ์ด์ƒ ์งˆ๋ฌธ์œผ๋กœ ํŒ๋‹จํ–ˆ์Šต๋‹ˆ๋‹ค.",
"error_message": msg,
"steps_note": f"โš ๏ธ ์งˆ๋ฌธ ์ˆ˜ ์ดˆ๊ณผ ๊ฐ์ง€(ํ›„๋ณด {len(candidates)}๊ฐœ) โ†’ too_many๋กœ ๊ฐ•์ œ",
}
return None
# ํ•˜๋“œ ๊ฐ€๋“œ(๊ฒฐ์ •๋ก ์ ) โ€” LLM์ด ์ž˜๋ชป ๋ถ„๋ฅ˜ํ•˜๋”๋ผ๋„ 3๊ฐœ ์ด์ƒ์ด๋ฉด ๋ฌด์กฐ๊ฑด ์ฐจ๋‹จ
hard = _hard_guard_too_many(user_question)
if hard:
steps_delta = [
f"๐Ÿ“‹ ๊ณ„ํš ํƒ€์ž…: {hard['case']}",
f" ์„œ๋ธŒ์งˆ๋ฌธ: {len(hard['sub_questions'])}๊ฐœ",
f" ์ด์œ : {hard['reasoning']}",
hard["steps_note"],
]
logger.info("๊ณ„ํš ์ˆ˜๋ฆฝ ์™„๋ฃŒ(ํ•˜๋“œ ๊ฐ€๋“œ): too_many, %d๊ฐœ ์„œ๋ธŒ์งˆ๋ฌธ", len(hard["sub_questions"]))
return {
"plan": {
"case": hard["case"],
"sub_questions": hard["sub_questions"],
"reasoning": hard["reasoning"],
"error_message": hard["error_message"],
},
"is_multi_question": False,
"sub_question_index": 0,
"sub_question_text": None,
"original_multi_question": None,
"multi_answers": [{"__token__": _MULTI_ANS_RESET_TOKEN}],
"intermediate_steps": steps_delta,
}
plan_prompt = f"""์งˆ๋ฌธ์„ ๋ถ„์„ํ•˜์—ฌ ์œ ํ˜•๊ณผ ๊ฐœ์ˆ˜๋ฅผ ํŒ๋‹จํ•˜์„ธ์š”.
์งˆ๋ฌธ: {user_question}
**์ค‘์š”**: sub_questions์˜ ์šฉ๋„๋Š” case์— ๋”ฐ๋ผ ๋‹ค๋ฆ…๋‹ˆ๋‹ค!
**Case 1: single_topic** (ํ•˜๋‚˜์˜ ์ฃผ์ œ)
- ์˜ˆ: "Spring Security JWT ์ธ์ฆ ๊ตฌํ˜„"
โ†’ sub_questions: ["๊ฐœ๋…", "๊ตฌํ˜„", "์˜ˆ์ œ"]
โ†’ ์šฉ๋„: ๋‹ต๋ณ€ ์„น์…˜ ๊ตฌ์กฐ (๊ฒ€์ƒ‰์€ ์›๋ณธ ์งˆ๋ฌธ์œผ๋กœ 1ํšŒ๋งŒ)
โ†’ ๊ฒ€์ƒ‰: "Spring Security JWT ์ธ์ฆ ๊ตฌํ˜„"
- ์˜ˆ: "React hooks ์™„๋ฒฝ ๊ฐ€์ด๋“œ"
โ†’ sub_questions: ["hooks๋ž€", "์ฃผ์š” hooks", "์‹ค๋ฌด ํŒจํ„ด"]
โ†’ ์šฉ๋„: ๋‹ต๋ณ€ ์„น์…˜ ๊ตฌ์กฐ
โ†’ ๊ฒ€์ƒ‰: "React hooks ์™„๋ฒฝ ๊ฐ€์ด๋“œ"
**Case 2: multiple_questions** (์—ฌ๋Ÿฌ ๋…๋ฆฝ ์งˆ๋ฌธ, ์ตœ๋Œ€ 2๊ฐœ)
- ์˜ˆ: "JWT๊ฐ€ ๋ญ์•ผ? CORS๋Š”?"
โ†’ sub_questions: ["JWT๊ฐ€ ๋ญ์•ผ?", "CORS๋Š”?"]
โ†’ ์šฉ๋„: ๊ฐ ์งˆ๋ฌธ๋งˆ๋‹ค ๋ณ„๋„ ๊ฒ€์ƒ‰
โ†’ ๊ฒ€์ƒ‰: "JWT๊ฐ€ ๋ญ์•ผ?" (1ํšŒ), "CORS๋Š”?" (1ํšŒ)
- ์˜ˆ: "Docker ์‚ฌ์šฉ๋ฒ•์€? Redis ์„ค์น˜๋Š”?"
โ†’ sub_questions: ["Docker ์‚ฌ์šฉ๋ฒ•์€?", "Redis ์„ค์น˜๋Š”?"]
โ†’ ์šฉ๋„: ๊ฐ ์งˆ๋ฌธ๋งˆ๋‹ค ๋ณ„๋„ ๊ฒ€์ƒ‰
**Case 3: too_many** (3๊ฐœ ์ด์ƒ ์งˆ๋ฌธ)
- ์˜ˆ: "JWT? CORS? Docker?"
โ†’ ๋„ˆ๋ฌด ๋งŽ์•„์„œ ์ฒ˜๋ฆฌ ๋ถˆ๊ฐ€
โ†’ error_message ์ œ๊ณต
๊ทœ์น™:
- single_topic: sub_questions๋Š” ์งง์€ ํ‚ค์›Œ๋“œ/๊ตฌ์ ˆ (1-5๊ฐœ)
- multiple_questions: sub_questions๋Š” ์™„์ „ํ•œ ๋ฌธ์žฅ (์ •ํ™•ํžˆ 2๊ฐœ๋งŒ)
- too_many: 3๊ฐœ ์ด์ƒ์ด๋ฉด ์ด ์ผ€์ด์Šค๋กœ ๋ถ„๋ฅ˜
๋‹ค์Œ JSON ํ˜•์‹์œผ๋กœ๋งŒ ๋‹ต๋ณ€ํ•˜์„ธ์š”:
{{
"case": "single_topic|multiple_questions|too_many",
"sub_questions": [...],
"reasoning": "์ด ์ผ€์ด์Šค๋กœ ํŒ๋‹จํ•œ ์ด์œ ",
"error_message": "..." (too_many์ธ ๊ฒฝ์šฐ๋งŒ, ๊ทธ ์™ธ๋Š” ๋นˆ ๋ฌธ์ž์—ด)
}}
JSON ์™ธ์— ๋‹ค๋ฅธ ํ…์ŠคํŠธ๋Š” ํฌํ•จํ•˜์ง€ ๋งˆ์„ธ์š”."""
try:
import json
messages_to_llm = [HumanMessage(content=plan_prompt)]
response = llm.invoke(messages_to_llm)
# JSON ํŒŒ์‹ฑ
response_text = response.content.strip()
# JSON ๋ธ”๋ก ์ถ”์ถœ (๋งˆํฌ๋‹ค์šด ์ฝ”๋“œ ๋ธ”๋ก ์ œ๊ฑฐ)
if "```json" in response_text:
response_text = response_text.split("```json")[1].split("```")[0].strip()
elif "```" in response_text:
response_text = response_text.split("```")[1].split("```")[0].strip()
plan_data = json.loads(response_text)
case = plan_data.get("case", "single_topic")
sub_questions = plan_data.get("sub_questions", [user_question])
reasoning = plan_data.get("reasoning", "")
error_message = plan_data.get("error_message", "")
# LLM ๊ฒฐ๊ณผ๋ฅผ ๋ฐ›์€ ๋’ค์—๋„ ํ•œ ๋ฒˆ ๋” ํ•˜๋“œ ๊ฐ€๋“œ ์ ์šฉ (์•ˆ์ „์žฅ์น˜)
hard2 = _hard_guard_too_many(user_question)
if hard2:
case = hard2["case"]
sub_questions = hard2["sub_questions"]
reasoning = hard2["reasoning"]
error_message = hard2["error_message"]
# ์œ ํšจ์„ฑ ๊ฒ€์ฆ
if not sub_questions or len(sub_questions) == 0:
sub_questions = [user_question]
case = "single_topic"
# multiple_questions์ผ ๋•Œ 2๊ฐœ ์ œํ•œ ๊ฐ•์ œ (๋‹จ, 3๊ฐœ ์ด์ƒ์€ ์œ„ ํ•˜๋“œ ๊ฐ€๋“œ์—์„œ too_many๋กœ ์ฒ˜๋ฆฌ๋จ)
if case == "multiple_questions" and len(sub_questions) > 2:
sub_questions = sub_questions[:2]
reasoning += " (์งˆ๋ฌธ ์ˆ˜ ์ œํ•œ: ์ตœ๋Œ€ 2๊ฐœ)"
steps_delta = [
f"๐Ÿ“‹ ๊ณ„ํš ํƒ€์ž…: {case}",
f" ์„œ๋ธŒ์งˆ๋ฌธ: {len(sub_questions)}๊ฐœ",
f" ์ด์œ : {reasoning}"
]
logger.info("๊ณ„ํš ์ˆ˜๋ฆฝ ์™„๋ฃŒ: %s, %d๊ฐœ ์„œ๋ธŒ์งˆ๋ฌธ", case, len(sub_questions))
# NOTE: ์ด ๊ทธ๋ž˜ํ”„๋Š” ์ฒดํฌํฌ์ธํŒ…/์Šค๋ ˆ๋“œ ์œ ์ง€๊ฐ€ ๊ฐ€๋Šฅํ•˜๋ฏ€๋กœ,
# multi_answers๋Š” ๋งค ์‹คํ–‰(run) ์‹œ์ž‘ ์‹œ ๋ฆฌ์…‹ํ•ด์•ผ ์ด์ „ ํ„ด ๋ˆ„์ ์ด ๋ฐœ์ƒํ•˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค.
return {
"plan": {
"case": case,
"sub_questions": sub_questions,
"reasoning": reasoning,
"error_message": error_message
},
"is_multi_question": False,
"sub_question_index": 0,
"sub_question_text": None,
"original_multi_question": None,
"multi_answers": [{"__token__": _MULTI_ANS_RESET_TOKEN}],
"intermediate_steps": steps_delta
}
except Exception as e:
logger.error("๊ณ„ํš ์ˆ˜๋ฆฝ ์‹คํŒจ: %s", e, exc_info=True)
# ๊ธฐ๋ณธ๊ฐ’: ์›๋ณธ ์งˆ๋ฌธ ๊ทธ๋Œ€๋กœ ์‚ฌ์šฉ
steps_delta = [
"โš ๏ธ ๊ณ„ํš ์ˆ˜๋ฆฝ ์‹คํŒจ, ๊ธฐ๋ณธ๊ฐ’ ์‚ฌ์šฉ: single_topic"
]
return {
"plan": {
"case": "single_topic",
"sub_questions": [user_question],
"reasoning": "๊ณ„ํš ์ˆ˜๋ฆฝ ์‹คํŒจ, ๊ธฐ๋ณธ๊ฐ’ ์‚ฌ์šฉ",
"error_message": ""
},
"is_multi_question": False,
"sub_question_index": 0,
"sub_question_text": None,
"original_multi_question": None,
"multi_answers": [{"__token__": _MULTI_ANS_RESET_TOKEN}],
"intermediate_steps": steps_delta
}
@trace_node("classify_intent")
def classify_intent_node(state: AgentState) -> dict:
"""
LLM์„ ์‚ฌ์šฉํ•˜์—ฌ ์‚ฌ์šฉ์ž ์งˆ๋ฌธ์˜ ์˜๋„๋ฅผ ๋ถ„๋ฅ˜ํ•ฉ๋‹ˆ๋‹ค.
๋ถ„๋ฅ˜ ์นดํ…Œ๊ณ ๋ฆฌ:
- debugging: ์—๋Ÿฌ ํ•ด๊ฒฐ, ๋ฒ„๊ทธ ์ˆ˜์ •
- learning: ๊ฐœ๋… ํ•™์Šต, ์›๋ฆฌ ์ดํ•ด
- code_review: ์ฝ”๋“œ ๊ฐœ์„ , ๋ฆฌํŒฉํ† ๋ง
"""
logger.info("์˜๋„ ๋ถ„๋ฅ˜ ์ค‘: %s", state.user_question[:50])
classification_prompt = f"""์งˆ๋ฌธ์„ ๋‹ค์Œ ์„ธ ๊ฐ€์ง€ ์˜๋„ ์ค‘ ํ•˜๋‚˜๋กœ ๋ถ„๋ฅ˜ํ•˜์„ธ์š”:
1. debugging: ์—๋Ÿฌ ํ•ด๊ฒฐ, ๋ฒ„๊ทธ ์ˆ˜์ •, ๋ฌธ์ œ ํ•ด๊ฒฐ
์˜ˆ: "ImportError๊ฐ€ ๋ฐœ์ƒํ•ด์š”", "์ด ์ฝ”๋“œ๊ฐ€ ์ž‘๋™ํ•˜์ง€ ์•Š์•„์š”"
2. learning: ๊ฐœ๋… ํ•™์Šต, ์›๋ฆฌ ์ดํ•ด, ํŠœํ† ๋ฆฌ์–ผ
์˜ˆ: "async/await๊ฐ€ ๋ญ”๊ฐ€์š”?", "JPA ๋™์ž‘ ์›๋ฆฌ๋Š”?"
3. code_review: ์ฝ”๋“œ ๊ฐœ์„ , ๋ฆฌํŒฉํ† ๋ง, ๋ฒ ์ŠคํŠธ ํ”„๋ž™ํ‹ฐ์Šค
์˜ˆ: "์ด ์ฝ”๋“œ๋ฅผ ๊ฐœ์„ ํ•  ๋ฐฉ๋ฒ•์€?", "๋” ๋‚˜์€ ์„ค๊ณ„๋Š”?"
์งˆ๋ฌธ: {state.user_question}
๋ฐ˜๋“œ์‹œ debugging, learning, code_review ์ค‘ ํ•˜๋‚˜๋งŒ ๋‹ตํ•˜์„ธ์š”."""
updates = {}
steps_delta: List[str] = []
try:
messages = [
SystemMessage(content="๋‹น์‹ ์€ ๊ฐœ๋ฐœ์ž ์งˆ๋ฌธ์„ ๋ถ„๋ฅ˜ํ•˜๋Š” ์ „๋ฌธ๊ฐ€์ž…๋‹ˆ๋‹ค."),
HumanMessage(content=classification_prompt)
]
response = llm.invoke(messages)
intent_raw = response.content.strip().lower()
# ์œ ํšจํ•œ ์˜๋„๋กœ ์ •๊ทœํ™”
valid_intents = ["debugging", "learning", "code_review"]
intent = next((i for i in valid_intents if i in intent_raw), "learning")
updates["detected_intent"] = intent
steps_delta.append(f"๐ŸŽฏ ์˜๋„ ๋ถ„๋ฅ˜: {intent}")
logger.info("์˜๋„ ๋ถ„๋ฅ˜ ์™„๋ฃŒ: %s", intent)
except Exception as e:
logger.error("์˜๋„ ๋ถ„๋ฅ˜ ์‹คํŒจ: %s", e, exc_info=True)
updates["detected_intent"] = "learning"
steps_delta.append("โš ๏ธ ์˜๋„ ๋ถ„๋ฅ˜ ์‹คํŒจ, ๊ธฐ๋ณธ๊ฐ’ ์‚ฌ์šฉ: learning")
updates["intermediate_steps"] = steps_delta
return updates
@trace_node("search_stackoverflow")
def search_stackoverflow_node(state: AgentState) -> dict:
"""
Stack Overflow์—์„œ ๊ฒ€์ƒ‰์„ ์ˆ˜ํ–‰ํ•ฉ๋‹ˆ๋‹ค.
Send API๋ฅผ ํ†ตํ•œ ๋ณ‘๋ ฌ ๊ฒ€์ƒ‰์˜ ์ผ๋ถ€๋กœ ์‹คํ–‰๋ฉ๋‹ˆ๋‹ค.
search_results์™€ intermediate_steps๋Š” Annotated[List, add]๋กœ
์ •์˜๋˜์–ด ์žˆ์–ด ์ž๋™์œผ๋กœ ๋จธ์ง€๋ฉ๋‹ˆ๋‹ค.
"""
intent = state.detected_intent or "learning"
count = 5 if intent == "debugging" else 3
logger.info("Stack Overflow ๊ฒ€์ƒ‰ ์‹œ์ž‘: %d๊ฐœ", count)
try:
results = search_stackoverflow(state.user_question, count)
logger.info("Stack Overflow์—์„œ %d๊ฐœ ๊ฒฐ๊ณผ ์ˆ˜์ง‘", len(results))
# reducer๊ฐ€ ์ž๋™์œผ๋กœ ๋จธ์ง€ํ•˜๋ฏ€๋กœ ์ƒˆ ๊ฒฐ๊ณผ๋งŒ ๋ฐ˜ํ™˜
return {
"search_results": results,
"intermediate_steps": [f"๐Ÿ” Stack Overflow: {len(results)}๊ฐœ ๊ฒฐ๊ณผ"]
}
except Exception as e:
logger.error("Stack Overflow ๊ฒ€์ƒ‰ ์‹คํŒจ: %s", e)
return {
"intermediate_steps": [f"โš ๏ธ Stack Overflow ๊ฒ€์ƒ‰ ์‹คํŒจ: {str(e)}"]
}
@trace_node("search_github")
def search_github_node(state: AgentState) -> dict:
"""
GitHub Issues/Discussions์—์„œ ๊ฒ€์ƒ‰์„ ์ˆ˜ํ–‰ํ•ฉ๋‹ˆ๋‹ค.
Send API๋ฅผ ํ†ตํ•œ ๋ณ‘๋ ฌ ๊ฒ€์ƒ‰์˜ ์ผ๋ถ€๋กœ ์‹คํ–‰๋ฉ๋‹ˆ๋‹ค.
"""
intent = state.detected_intent or "learning"
count = 5 if intent == "code_review" else 3 if intent == "learning" else 2
logger.info("GitHub ๊ฒ€์ƒ‰ ์‹œ์ž‘: %d๊ฐœ", count)
try:
results = search_github(state.user_question, count)
logger.info("GitHub์—์„œ %d๊ฐœ ๊ฒฐ๊ณผ ์ˆ˜์ง‘", len(results))
# reducer๊ฐ€ ์ž๋™์œผ๋กœ ๋จธ์ง€
return {
"search_results": results,
"intermediate_steps": [f"๐Ÿ” GitHub: {len(results)}๊ฐœ ๊ฒฐ๊ณผ"]
}
except Exception as e:
logger.error("GitHub ๊ฒ€์ƒ‰ ์‹คํŒจ: %s", e)
return {
"intermediate_steps": [f"โš ๏ธ GitHub ๊ฒ€์ƒ‰ ์‹คํŒจ: {str(e)}"]
}
@trace_node("search_official_docs")
def search_official_docs_node(state: AgentState) -> dict:
"""
๊ณต์‹ ๋ฌธ์„œ/Tavily์—์„œ ๊ฒ€์ƒ‰์„ ์ˆ˜ํ–‰ํ•ฉ๋‹ˆ๋‹ค.
Send API๋ฅผ ํ†ตํ•œ ๋ณ‘๋ ฌ ๊ฒ€์ƒ‰์˜ ์ผ๋ถ€๋กœ ์‹คํ–‰๋ฉ๋‹ˆ๋‹ค.
"""
intent = state.detected_intent or "learning"
count = 5 if intent == "learning" else 2
logger.info("๊ณต์‹ ๋ฌธ์„œ ๊ฒ€์ƒ‰ ์‹œ์ž‘: %d๊ฐœ", count)
try:
results = search_official_docs(state.user_question, count)
logger.info("๊ณต์‹ ๋ฌธ์„œ์—์„œ %d๊ฐœ ๊ฒฐ๊ณผ ์ˆ˜์ง‘", len(results))
# reducer๊ฐ€ ์ž๋™์œผ๋กœ ๋จธ์ง€
return {
"search_results": results,
"intermediate_steps": [f"๐Ÿ” ๊ณต์‹ ๋ฌธ์„œ: {len(results)}๊ฐœ ๊ฒฐ๊ณผ"]
}
except Exception as e:
logger.error("๊ณต์‹ ๋ฌธ์„œ ๊ฒ€์ƒ‰ ์‹คํŒจ: %s", e)
return {
"intermediate_steps": [f"โš ๏ธ ๊ณต์‹ ๋ฌธ์„œ ๊ฒ€์ƒ‰ ์‹คํŒจ: {str(e)}"]
}
@trace_node("collect_results")
def collect_results_node(state: AgentState) -> dict:
"""
๋ณ‘๋ ฌ ๊ฒ€์ƒ‰ ๊ฒฐ๊ณผ๋ฅผ ์ˆ˜์ง‘ํ•˜๊ณ  ์นด์šดํŠธํ•ฉ๋‹ˆ๋‹ค.
Fan-in ํฌ์ธํŠธ: 3๊ฐœ์˜ ๋ณ‘๋ ฌ ๊ฒ€์ƒ‰ ๋…ธ๋“œ๊ฐ€ ๋ชจ๋‘ ์™„๋ฃŒ๋œ ํ›„ ์‹คํ–‰๋ฉ๋‹ˆ๋‹ค.
LangGraph ๊ณต์‹ ๊ฐ€์ด๋“œ๋ผ์ธ: Send API์˜ fan-in ์ง€์ ์—์„œ ๊ฒฐ๊ณผ ์ง‘๊ณ„
"""
total_results = len(state.search_results)
logger.info("๊ฒ€์ƒ‰ ๊ฒฐ๊ณผ ์ˆ˜์ง‘ ์™„๋ฃŒ: %d๊ฐœ", total_results)
steps_delta = [
f"๐Ÿ“Š ๊ฒ€์ƒ‰ ๊ฒฐ๊ณผ ์ˆ˜์ง‘: ์ด {total_results}๊ฐœ"
]
return {
"intermediate_steps": steps_delta
}
@trace_node("evaluate_results")
def evaluate_results_node(state: AgentState) -> dict:
"""
๊ฒ€์ƒ‰ ๊ฒฐ๊ณผ์˜ ๊ฐœ์ˆ˜์™€ ํ’ˆ์งˆ์„ ๋ชจ๋‘ ํ‰๊ฐ€ํ•ฉ๋‹ˆ๋‹ค.
ํ‰๊ฐ€ ๊ธฐ์ค€:
1. ๊ฐœ์ˆ˜: ์ตœ์†Œ 2๊ฐœ ์ด์ƒ
2. ํ’ˆ์งˆ: ํ‰๊ท  relevance_score >= 0.6
"""
search_results = state.search_results # ์ง์ ‘ ์‚ฌ์šฉ (๋” ์•ˆ์ „)
refinement_count = state.refinement_count
result_count = len(search_results)
logger.info("๊ฒ€์ƒ‰ ๊ฒฐ๊ณผ ํ‰๊ฐ€: %d๊ฐœ (๊ฐœ์„  ํšŸ์ˆ˜: %d)", result_count, refinement_count)
# ์•ˆ์ „์žฅ์น˜: ์ด๋ฏธ 1ํšŒ ๊ฐœ์„ ํ–ˆ์œผ๋ฉด ๋” ์ด์ƒ ๊ฐœ์„ ํ•˜์ง€ ์•Š์Œ
if refinement_count >= 1:
steps_delta = [
f"โš ๏ธ ์ตœ๋Œ€ ๊ฐœ์„  ํšŸ์ˆ˜ ๋„๋‹ฌ ({refinement_count}ํšŒ), ํ˜„์žฌ ๊ฒฐ๊ณผ๋กœ ์ง„ํ–‰"
]
return {
"needs_refinement": False,
"intermediate_steps": steps_delta
}
# 1์ฐจ ํ‰๊ฐ€: ๊ฐœ์ˆ˜
if result_count < 2:
steps_delta = [
f"โš ๏ธ ๊ฒ€์ƒ‰ ๊ฒฐ๊ณผ ๋ถ€์กฑ ({result_count}๊ฐœ < 2๊ฐœ), ์ฟผ๋ฆฌ ๊ฐœ์„  ํ•„์š”"
]
return {
"needs_refinement": True,
"intermediate_steps": steps_delta
}
# 2์ฐจ ํ‰๊ฐ€: ํ’ˆ์งˆ (relevance_score๊ฐ€ ์žˆ๋Š” ๊ฒฝ์šฐ๋งŒ)
scored_results = [r for r in search_results if r.relevance_score is not None]
if scored_results:
avg_score = sum(r.relevance_score for r in scored_results) / len(scored_results)
# ํ‰๊ท  ์ ์ˆ˜๊ฐ€ 0.5 ๋ฏธ๋งŒ์ด๋ฉด ํ’ˆ์งˆ ๋ถ€์กฑ
if avg_score < 0.5:
steps_delta = [
f"โš ๏ธ ๊ฒ€์ƒ‰ ๊ฒฐ๊ณผ ํ’ˆ์งˆ ๋ถ€์กฑ (ํ‰๊ท  ์ ์ˆ˜: {avg_score:.2f} < 0.5), ์ฟผ๋ฆฌ ๊ฐœ์„  ํ•„์š”"
]
return {
"needs_refinement": True,
"intermediate_steps": steps_delta
}
steps_delta = [
f"โœ… ๊ฒ€์ƒ‰ ๊ฒฐ๊ณผ ์ถฉ๋ถ„ ({result_count}๊ฐœ, ํ‰๊ท  ์ ์ˆ˜: {avg_score:.2f}), ํ•„ํ„ฐ๋ง ๋‹จ๊ณ„๋กœ ์ง„ํ–‰"
]
else:
# relevance_score๊ฐ€ ์•„์ง ์—†์œผ๋ฉด ๊ฐœ์ˆ˜๋งŒ์œผ๋กœ ํŒ๋‹จ
steps_delta = [
f"โœ… ๊ฒ€์ƒ‰ ๊ฒฐ๊ณผ ์ถฉ๋ถ„ ({result_count}๊ฐœ), ํ•„ํ„ฐ๋ง ๋‹จ๊ณ„๋กœ ์ง„ํ–‰"
]
return {
"needs_refinement": False,
"intermediate_steps": steps_delta
}
@trace_node("refine_search")
def refine_search_node(state: AgentState) -> dict:
"""
๊ฒ€์ƒ‰ ์ฟผ๋ฆฌ๋ฅผ ๊ฐœ์„ ํ•ฉ๋‹ˆ๋‹ค.
Open Deep Research ํŒจํ„ด:
- LLM์ด ์ „๋žต์„ ์„ ํƒ (๊ตฌ์ฒดํ™”/์ผ๋ฐ˜ํ™”/๋ฒˆ์—ญ)
- ์›๋ณธ ์งˆ๋ฌธ ๋ณด์กด (์ตœ์ข… ๋‹ต๋ณ€ ์ƒ์„ฑ ์‹œ ์‚ฌ์šฉ)
LangGraph ๊ณต์‹ ๊ฐ€์ด๋“œ๋ผ์ธ:
- ์ƒํƒœ์— ์›์‹œ ๋ฐ์ดํ„ฐ ์ €์žฅ (์ „๋žต ์ •๋ณด ํฌํ•จ)
- ํ”„๋กฌํ”„ํŠธ๋Š” ๋…ธ๋“œ ๋‚ด์—์„œ ๋™์  ์ƒ์„ฑ
"""
user_question = state.user_question
original_question = state.original_question or user_question
result_count = len(state.search_results)
logger.info("๊ฒ€์ƒ‰ ์ฟผ๋ฆฌ ๊ฐœ์„  ์ค‘: %s (%d๊ฐœ ๊ฒฐ๊ณผ)", user_question[:50], result_count)
refinement_prompt = f"""๊ฒ€์ƒ‰ ๊ฒฐ๊ณผ๊ฐ€ ๋ถ€์กฑํ•ฉ๋‹ˆ๋‹ค. ๊ฒ€์ƒ‰ ์ฟผ๋ฆฌ๋ฅผ ๊ฐœ์„ ํ•˜์„ธ์š”.
์›๋ณธ ์งˆ๋ฌธ: {user_question}
ํ˜„์žฌ ๊ฒฐ๊ณผ ์ˆ˜: {result_count}๊ฐœ (๋ชฉํ‘œ: 2๊ฐœ ์ด์ƒ)
๊ฐœ์„  ์ „๋žต (ํ•˜๋‚˜ ์„ ํƒ):
1. MORE_SPECIFIC: ๊ธฐ์ˆ ์  ์„ธ๋ถ€์‚ฌํ•ญ ์ถ”๊ฐ€
์˜ˆ: "React hooks" โ†’ "React useEffect cleanup function dependencies"
2. MORE_GENERAL: ๋” ๋„“์€ ์šฉ์–ด ์‚ฌ์šฉ
์˜ˆ: "Spring Cloud Sleuth 2.x trace" โ†’ "distributed tracing Spring Boot"
3. TRANSLATE: ์–ธ์–ด ๋ณ€ํ™˜
์˜ˆ: "JWT ์ธ์ฆ ๊ตฌํ˜„" โ†’ "JWT authentication implementation"
์˜ˆ: "WebSocket connection" โ†’ "WebSocket ์—ฐ๊ฒฐ ๋ฐฉ๋ฒ•"
๋‹ค์Œ JSON ํ˜•์‹์œผ๋กœ๋งŒ ๋‹ต๋ณ€ํ•˜์„ธ์š”:
{{
"new_query": "๊ฐœ์„ ๋œ ๊ฒ€์ƒ‰ ์ฟผ๋ฆฌ",
"strategy": "MORE_SPECIFIC|MORE_GENERAL|TRANSLATE",
"reasoning": "์ด ์ „๋žต์„ ์„ ํƒํ•œ ์ด์œ  1-2๋ฌธ์žฅ"
}}
JSON ์™ธ์— ๋‹ค๋ฅธ ํ…์ŠคํŠธ๋Š” ํฌํ•จํ•˜์ง€ ๋งˆ์„ธ์š”."""
try:
import json
messages_to_llm = [HumanMessage(content=refinement_prompt)]
response = llm.invoke(messages_to_llm)
# JSON ํŒŒ์‹ฑ
response_text = response.content.strip()
if "```json" in response_text:
response_text = response_text.split("```json")[1].split("```")[0].strip()
elif "```" in response_text:
response_text = response_text.split("```")[1].split("```")[0].strip()
refinement_data = json.loads(response_text)
new_query = refinement_data.get("new_query", user_question)
strategy = refinement_data.get("strategy", "MORE_GENERAL")
reasoning = refinement_data.get("reasoning", "")
steps_delta = [
f"๐Ÿ”„ ์ฟผ๋ฆฌ ๊ฐœ์„ : {strategy}",
f" ์ด์ „: {user_question[:50]}...",
f" ์ดํ›„: {new_query[:50]}...",
f" ์ด์œ : {reasoning}"
]
logger.info("์ฟผ๋ฆฌ ๊ฐœ์„  ์™„๋ฃŒ: %s โ†’ %s", user_question[:30], new_query[:30])
return {
"user_question": new_query,
"original_question": original_question,
"refinement_count": state.refinement_count + 1,
"search_results": [], # CRITICAL: ์ด์ „ ๊ฒ€์ƒ‰ ๊ฒฐ๊ณผ ์ œ๊ฑฐ ํ›„ ์žฌ๊ฒ€์ƒ‰
"intermediate_steps": steps_delta
}
except Exception as e:
logger.error("์ฟผ๋ฆฌ ๊ฐœ์„  ์‹คํŒจ: %s", e, exc_info=True)
# ๊ธฐ๋ณธ ์ „๋žต: ์˜๋ฌธ ํ‚ค์›Œ๋“œ ์ถ”์ถœ (๊ฐ„๋‹จํ•œ fallback)
fallback_query = user_question + " tutorial example"
steps_delta = [
f"โš ๏ธ ์ฟผ๋ฆฌ ๊ฐœ์„  ์‹คํŒจ, ๊ธฐ๋ณธ ์ „๋žต ์‚ฌ์šฉ",
f" ์ดํ›„: {fallback_query}"
]
return {
"user_question": fallback_query,
"original_question": original_question,
"refinement_count": state.refinement_count + 1,
"search_results": [], # CRITICAL: ์‹คํŒจ ์‹œ์—๋„ ์ด์ „ ๊ฒ€์ƒ‰ ๊ฒฐ๊ณผ ์ œ๊ฑฐ
"intermediate_steps": steps_delta
}
@trace_node("filter_and_score")
def filter_and_score_node(state: AgentState) -> dict:
"""
๊ฒ€์ƒ‰ ๊ฒฐ๊ณผ๋ฅผ ํ•„ํ„ฐ๋งํ•˜๊ณ  ๊ด€๋ จ๋„ ์ ์ˆ˜๋ฅผ ๋งค๊น๋‹ˆ๋‹ค.
- ์ตœ์†Œ ๊ธธ์ด 50์ž ์ด์ƒ, URL ์กด์žฌํ•˜๋Š” ๊ฒฐ๊ณผ๋งŒ ์œ ์ง€
- ์ƒ์œ„ 5๊ฐœ ๊ฒฐ๊ณผ์— ๋Œ€ํ•ด LLM์œผ๋กœ ๊ด€๋ จ๋„ ํ‰๊ฐ€
- ๊ด€๋ จ๋„ ์ˆœ์œผ๋กœ ์ •๋ ฌํ•˜์—ฌ ์ƒ์œ„ 10๊ฐœ ์„ ํƒ
"""
search_results = state.search_results
logger.info("๊ฒ€์ƒ‰ ๊ฒฐ๊ณผ ํ•„ํ„ฐ๋ง ์ค‘: %d๊ฐœ", len(search_results))
# ๊ธฐ๋ณธ ํ•„ํ„ฐ๋ง
filtered = [
r for r in search_results
if r.content and len(r.content) >= 50 and r.url
]
logger.info("๊ธฐ๋ณธ ํ•„ํ„ฐ๋ง ํ›„: %d๊ฐœ ๊ฒฐ๊ณผ", len(filtered))
# ์ƒ์œ„ 5๊ฐœ ๊ฒฐ๊ณผ๋งŒ LLM์œผ๋กœ ์ ์ˆ˜ ๋งค๊ธฐ๊ธฐ (๋น„์šฉ ์ ˆ๊ฐ)
for result in filtered[:5]:
if result.relevance_score is None:
try:
scoring_prompt = f"""์งˆ๋ฌธ: {state.user_question}
๊ฒ€์ƒ‰ ๊ฒฐ๊ณผ: {result.content[:500]}
์ด ๊ฒ€์ƒ‰ ๊ฒฐ๊ณผ๊ฐ€ ์งˆ๋ฌธ์— ์–ผ๋งˆ๋‚˜ ๊ด€๋ จ์ด ์žˆ๋Š”์ง€ 0.0์—์„œ 1.0 ์‚ฌ์ด์˜ ์ ์ˆ˜๋กœ ํ‰๊ฐ€ํ•˜์„ธ์š”.
์ ์ˆ˜๋งŒ ์ˆซ์ž๋กœ ๋‹ตํ•˜์„ธ์š”. (์˜ˆ: 0.8)"""
response = llm.invoke([HumanMessage(content=scoring_prompt)])
score_str = response.content.strip()
result.relevance_score = float(score_str)
except Exception as e:
logger.warning("์ ์ˆ˜ ๋งค๊ธฐ๊ธฐ ์‹คํŒจ: %s", e)
result.relevance_score = 0.5
# ๊ด€๋ จ๋„ ์ˆœ์œผ๋กœ ์ •๋ ฌ
filtered.sort(key=lambda r: r.relevance_score or 0, reverse=True)
# ์ƒ์œ„ 5๊ฐœ๋งŒ ์œ ์ง€
top_results = filtered[:5]
subtask_results = dict(state.subtask_results)
subtask_results["filtered_results"] = [r.model_dump() for r in top_results]
steps_delta = [f"โœ‚๏ธ ํ•„ํ„ฐ๋ง ์™„๋ฃŒ: {len(top_results)}๊ฐœ ๊ฒฐ๊ณผ ์„ ํƒ"]
logger.info("ํ•„ํ„ฐ๋ง ์™„๋ฃŒ: %d๊ฐœ ๊ฒฐ๊ณผ", len(top_results))
return {
"subtask_results": subtask_results,
"intermediate_steps": steps_delta
}
@trace_node("summarize_results")
def summarize_results_node(state: AgentState) -> dict:
"""
ํ•„ํ„ฐ๋ง๋œ ๊ฐ ๊ฒ€์ƒ‰ ๊ฒฐ๊ณผ๋ฅผ ์ดˆ๋ณด ๊ฐœ๋ฐœ์ž๊ฐ€ ์ดํ•ดํ•˜๊ธฐ ์‰ฝ๊ฒŒ ์š”์•ฝํ•ฉ๋‹ˆ๋‹ค.
๊ฐ ๊ฒฐ๊ณผ๋ฅผ 2-3๋ฌธ์žฅ์œผ๋กœ ํ•ต์‹ฌ ๋‚ด์šฉ๋งŒ ์ถ”์ถœํ•ฉ๋‹ˆ๋‹ค.
"""
subtask_results = state.subtask_results
filtered_results = subtask_results.get("filtered_results", [])
logger.info("๊ฒ€์ƒ‰ ๊ฒฐ๊ณผ ์š”์•ฝ ์ค‘: %d๊ฐœ", len(filtered_results))
summaries = []
for result_dict in filtered_results:
try:
summary_prompt = f"""๋‹ค์Œ ๊ฒ€์ƒ‰ ๊ฒฐ๊ณผ๋ฅผ ์ดˆ๋ณด ๊ฐœ๋ฐœ์ž๊ฐ€ ์ดํ•ดํ•˜๊ธฐ ์‰ฝ๊ฒŒ 2-3๋ฌธ์žฅ์œผ๋กœ ์š”์•ฝํ•˜์„ธ์š”:
์ถœ์ฒ˜: {result_dict['source']}
๋‚ด์šฉ: {result_dict['content'][:1000]}
ํ•ต์‹ฌ ๋‚ด์šฉ๋งŒ ๊ฐ„๋‹จ๋ช…๋ฃŒํ•˜๊ฒŒ ์š”์•ฝํ•˜์„ธ์š”."""
response = llm.invoke([HumanMessage(content=summary_prompt)])
summaries.append({
"source": result_dict['source'],
"url": result_dict['url'],
"summary": response.content.strip(),
"relevance": result_dict.get('relevance_score', 0.5)
})
except Exception as e:
logger.error("์š”์•ฝ ์‹คํŒจ: %s", e)
updated_subtask_results = dict(subtask_results)
updated_subtask_results["summaries"] = summaries
steps_delta = [f"๐Ÿ“ ์š”์•ฝ ์™„๋ฃŒ: {len(summaries)}๊ฐœ ๊ฒฐ๊ณผ"]
logger.info("์š”์•ฝ ์™„๋ฃŒ: %d๊ฐœ", len(summaries))
return {
"subtask_results": updated_subtask_results,
"intermediate_steps": steps_delta
}
@trace_node("generate_answer")
async def generate_answer_node(state: AgentState) -> dict:
"""
์š”์•ฝ๋œ ์ •๋ณด๋ฅผ ๋ฐ”ํƒ•์œผ๋กœ ์ตœ์ข… ๋‹ต๋ณ€์„ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.
์˜๋„๋ณ„๋กœ ๋‹ค๋ฅธ ๋‹ต๋ณ€ ๊ตฌ์กฐ๋ฅผ ์‚ฌ์šฉํ•˜๋ฉฐ, ์ƒ์„ฑ๋œ ๋‹ต๋ณ€์€ ์บ์‹œ์— ์ €์žฅ๋ฉ๋‹ˆ๋‹ค.
"""
subtask_results = state.subtask_results
summaries = subtask_results.get("summaries", [])
intent = state.detected_intent or "learning"
logger.info("์ตœ์ข… ๋‹ต๋ณ€ ์ƒ์„ฑ ์ค‘: %s", intent)
# ์˜๋„๋ณ„ ํ”„๋กฌํ”„ํŠธ ํ…œํ”Œ๋ฆฟ
templates = {
"debugging": """๋‹ค์Œ ์ •๋ณด๋ฅผ ๋ฐ”ํƒ•์œผ๋กœ ๋””๋ฒ„๊น… ์งˆ๋ฌธ์— ๋‹ต๋ณ€ํ•˜์„ธ์š”:
์งˆ๋ฌธ: {question}
์ˆ˜์ง‘๋œ ์ •๋ณด:
{summaries}
๋‹ต๋ณ€ ๊ตฌ์กฐ:
1. ๋ฌธ์ œ ์ •์˜
2. ๋ฐœ์ƒ ์›์ธ
3. ํ•ด๊ฒฐ ๋ฐฉ๋ฒ• (์ฝ”๋“œ ์˜ˆ์ œ ํฌํ•จ)
4. ์ฃผ์˜์‚ฌํ•ญ
5. ์ฐธ๊ณ  ์ž๋ฃŒ
์ดˆ๋ณด ๊ฐœ๋ฐœ์ž๋„ ์ดํ•ดํ•  ์ˆ˜ ์žˆ๊ฒŒ Markdown ํ˜•์‹์œผ๋กœ ์ž‘์„ฑํ•˜์„ธ์š”.""",
"learning": """๋‹ค์Œ ์ •๋ณด๋ฅผ ๋ฐ”ํƒ•์œผ๋กœ ํ•™์Šต ์งˆ๋ฌธ์— ๋‹ต๋ณ€ํ•˜์„ธ์š”:
์งˆ๋ฌธ: {question}
์ˆ˜์ง‘๋œ ์ •๋ณด:
{summaries}
๋‹ต๋ณ€ ๊ตฌ์กฐ:
1. ๊ฐœ๋… ์„ค๋ช… (๊ฐ„๋‹จ๋ช…๋ฃŒ)
2. ๋™์ž‘ ์›๋ฆฌ
3. ์˜ˆ์ œ ์ฝ”๋“œ (์ฃผ์„ ํฌํ•จ)
4. ์‹ค๋ฌด ํ™œ์šฉ ํŒ
5. ์ถ”๊ฐ€ ํ•™์Šต ์ž๋ฃŒ
์ดˆ๋ณด ๊ฐœ๋ฐœ์ž๋„ ์ดํ•ดํ•  ์ˆ˜ ์žˆ๊ฒŒ Markdown ํ˜•์‹์œผ๋กœ ์ž‘์„ฑํ•˜์„ธ์š”.""",
"code_review": """๋‹ค์Œ ์ •๋ณด๋ฅผ ๋ฐ”ํƒ•์œผ๋กœ ์ฝ”๋“œ ๋ฆฌ๋ทฐ ์งˆ๋ฌธ์— ๋‹ต๋ณ€ํ•˜์„ธ์š”:
์งˆ๋ฌธ: {question}
์ˆ˜์ง‘๋œ ์ •๋ณด:
{summaries}
๋‹ต๋ณ€ ๊ตฌ์กฐ:
1. ํ˜„์žฌ ์ ‘๊ทผ ๋ฐฉ์‹ ๋ถ„์„
2. ๊ฐœ์„  ํฌ์ธํŠธ
3. ๋ฆฌํŒฉํ† ๋ง ์˜ˆ์ œ
4. ๋ฒ ์ŠคํŠธ ํ”„๋ž™ํ‹ฐ์Šค
5. ์ฐธ๊ณ  ํŒจํ„ด
์ดˆ๋ณด ๊ฐœ๋ฐœ์ž๋„ ์ดํ•ดํ•  ์ˆ˜ ์žˆ๊ฒŒ Markdown ํ˜•์‹์œผ๋กœ ์ž‘์„ฑํ•˜์„ธ์š”."""
}
template = templates.get(intent, templates["learning"])
# ์š”์•ฝ ํ…์ŠคํŠธ ํฌ๋งทํŒ…
summaries_text = "\n\n".join([
f"์ถœ์ฒ˜: {s['source']} ({s['url']})\n์š”์•ฝ: {s['summary']}"
for s in summaries
])
# ์ด์ „ ๋Œ€ํ™” ๋งฅ๋ฝ ์ถ”๊ฐ€ (messages ์‚ฌ์šฉ)
context_prefix = ""
messages_history = state.messages
if messages_history and len(messages_history) > 1:
context_prefix = "์ด์ „ ๋Œ€ํ™” ๋งฅ๋ฝ:\n"
# ์ตœ๊ทผ 6๊ฐœ ๋ฉ”์‹œ์ง€ (3ํ„ด) ์‚ฌ์šฉ
for msg in messages_history[-6:]:
if hasattr(msg, 'type'):
if msg.type == "human":
context_prefix += f"์‚ฌ์šฉ์ž: {msg.content}\n"
elif msg.type == "ai":
context_prefix += f"AI: {msg.content[:200]}...\n\n"
context_prefix += "---\nํ˜„์žฌ ์งˆ๋ฌธ:\n"
final_prompt = (context_prefix + template).format(
question=(state.original_question or state.user_question),
summaries=summaries_text
)
updates = {}
steps_delta: List[str] = []
try:
response = llm.invoke([HumanMessage(content=final_prompt)])
final_answer = response.content.strip()
updates["final_answer"] = final_answer
# Phase 3: ์กฐ๊ฑด๋ถ€ ์บ์‹œ ์ €์žฅ
# - clarification: ์บ์‹œ ๊ธˆ์ง€ (๊ทธ๋ž˜ํ”„ ์ƒ generate_with_history๋กœ ๋น ์ง€์ง€๋งŒ, ๋ฐฉ์–ด์ ์œผ๋กœ ํ•œ ๋ฒˆ ๋” ์ฒดํฌ)
# - new_topic/independent: ์บ์‹œ ๊ฐ€๋Šฅ(should_cache๊ฐ€ True์ผ ๋•Œ)
should_cache = state.should_cache if state.should_cache is not None else True
canonical_question = state.canonical_question
qtype = state.question_type or "independent"
if should_cache and qtype in ["new_topic", "independent"]:
# ์บ์‹œํ•  ์งˆ๋ฌธ: canonical_question ์šฐ์„ , ์—†์œผ๋ฉด ์›๋ณธ ์งˆ๋ฌธ
question_to_cache = canonical_question or state.user_question
await qdrant_manager.save_to_cache(
question=question_to_cache,
answer=final_answer
)
steps_delta.append(f"โœ… ์ตœ์ข… ๋‹ต๋ณ€ ์ƒ์„ฑ ์™„๋ฃŒ (๊ธธ์ด: {len(final_answer)}์ž)")
steps_delta.append(f"๐Ÿ’พ ์บ์‹œ ์ €์žฅ ์™„๋ฃŒ (์งˆ๋ฌธ: {question_to_cache[:50]}...)")
logger.info("์ตœ์ข… ๋‹ต๋ณ€ ์ƒ์„ฑ ๋ฐ ์บ์‹œ ์ €์žฅ ์™„๋ฃŒ: %s", question_to_cache[:50])
else:
steps_delta.append(f"โœ… ์ตœ์ข… ๋‹ต๋ณ€ ์ƒ์„ฑ ์™„๋ฃŒ (๊ธธ์ด: {len(final_answer)}์ž)")
steps_delta.append("โš ๏ธ ์บ์‹œ ์ €์žฅ ์ƒ๋žต (๋…๋ฆฝ์ ์ด์ง€ ์•Š๊ฑฐ๋‚˜ ์ผํšŒ์„ฑ ์งˆ๋ฌธ)")
logger.info("์ตœ์ข… ๋‹ต๋ณ€ ์ƒ์„ฑ ์™„๋ฃŒ (์บ์‹œ ์ €์žฅ ์ƒ๋žต)")
except Exception as e:
logger.error("๋‹ต๋ณ€ ์ƒ์„ฑ ์‹คํŒจ: %s", e, exc_info=True)
updates["final_answer"] = "๋‹ต๋ณ€ ์ƒ์„ฑ์— ์‹คํŒจํ–ˆ์Šต๋‹ˆ๋‹ค. ๋‹ค์‹œ ์‹œ๋„ํ•ด ์ฃผ์„ธ์š”."
steps_delta.append(f"โŒ ๋‹ต๋ณ€ ์ƒ์„ฑ ์‹คํŒจ: {str(e)}")
updates["intermediate_steps"] = steps_delta
# Phase 4: Multi-question handling
# NOTE: AgentState๋Š” Pydantic(BaseModel)์ด๋ฏ€๋กœ dict-style state.get(...) ์‚ฌ์šฉ ๊ธˆ์ง€
if state.is_multi_question:
answer_text = updates.get("final_answer")
if answer_text:
# Append to multi_answers (reducer will auto-merge)
updates["multi_answers"] = [{
"index": state.sub_question_index,
"question": state.sub_question_text or state.user_question,
"answer": answer_text
}]
logger.info("๋‹ค์ค‘ ์งˆ๋ฌธ ๋‹ต๋ณ€ ์ถ”๊ฐ€: Q%d", state.sub_question_index)
return updates
@trace_node("return_cached_answer")
def return_cached_answer_node(state: AgentState) -> dict:
"""
์บ์‹œ ํžˆํŠธ ์‹œ ์ €์žฅ๋œ ๋‹ต๋ณ€์„ ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค.
๊ฒ€์ƒ‰ ๋ฐ ์ƒ์„ฑ ๊ณผ์ •์„ ๊ฑด๋„ˆ๋›ฐ๊ณ  ์ฆ‰์‹œ ๋‹ต๋ณ€์„ ์ œ๊ณตํ•ฉ๋‹ˆ๋‹ค.
"""
logger.info("์บ์‹œ๋œ ๋‹ต๋ณ€ ๋ฐ˜ํ™˜")
steps_delta = ["๐Ÿ’พ ์บ์‹œ๋œ ๋‹ต๋ณ€ ๋ฐ˜ํ™˜ (๊ฒ€์ƒ‰ ์ƒ๋žต)"]
return {
"final_answer": state.cached_result,
"intermediate_steps": steps_delta
}
@trace_node("handle_too_many_questions")
def handle_too_many_questions_node(state: AgentState) -> dict:
"""
3๊ฐœ ์ด์ƒ ์งˆ๋ฌธ ์‹œ ์•ˆ๋‚ด ๋ฉ”์‹œ์ง€๋ฅผ ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค.
๋Œ€ํ™”๋ฅผ ์ข…๋ฃŒํ•˜์ง€ ์•Š๊ณ , ์‚ฌ์šฉ์ž๊ฐ€ ๋‹ค์‹œ ์งˆ๋ฌธํ•  ์ˆ˜ ์žˆ๋„๋ก ํ•ฉ๋‹ˆ๋‹ค.
"""
plan = state.plan or {}
error_message = plan.get("error_message", "")
sub_questions = plan.get("sub_questions", [])
logger.info("์งˆ๋ฌธ ์ˆ˜ ์ดˆ๊ณผ: %d๊ฐœ", len(sub_questions))
default_message = """์ฃ„์†กํ•ฉ๋‹ˆ๋‹ค. ํ•œ ๋ฒˆ์— ์ตœ๋Œ€ 2๊ฐœ์˜ ์งˆ๋ฌธ๊นŒ์ง€๋งŒ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
๋‹ค์Œ ์ค‘ ํ•˜๋‚˜๋ฅผ ์„ ํƒํ•ด์„œ ๋‹ค์‹œ ์งˆ๋ฌธํ•ด ์ฃผ์„ธ์š”:
1. **ํ•˜๋‚˜์˜ ์ฃผ์ œ๋กœ ํ†ตํ•ฉํ•ด์„œ ์งˆ๋ฌธ**
์˜ˆ: "JWT ์ธ์ฆ๊ณผ CORS ์„ค์ •์„ ํ•จ๊ป˜ ๊ตฌํ˜„ํ•˜๋Š” ๋ฐฉ๋ฒ•"
2. **๊ฐ€์žฅ ์ค‘์š”ํ•œ 2๊ฐœ ์งˆ๋ฌธ๋งŒ ์„ ํƒ**
์˜ˆ: "JWT๊ฐ€ ๋ญ์•ผ? ๋‚ด ์ฝ”๋“œ์— ์–ด๋–ป๊ฒŒ ์ ์šฉํ•ด?"
3. **์งˆ๋ฌธ์„ ๋‚˜๋ˆ ์„œ ์ˆœ์ฐจ์ ์œผ๋กœ ์งˆ๋ฌธ**
์˜ˆ: ๋จผ์ € "JWT๊ฐ€ ๋ญ์•ผ?" ์งˆ๋ฌธ โ†’ ๋‹ต๋ณ€ ํ™•์ธ โ†’ ๋‹ค์Œ ์งˆ๋ฌธ
์–ด๋–ป๊ฒŒ ๋„์™€๋“œ๋ฆด๊นŒ์š”?"""
final_message = error_message if error_message else default_message
steps_delta = [
f"โš ๏ธ ์งˆ๋ฌธ ์ˆ˜ ์ดˆ๊ณผ: {len(sub_questions)}๊ฐœ",
"๐Ÿ’ฌ ์•ˆ๋‚ด ๋ฉ”์‹œ์ง€ ์ œ๊ณต (๋Œ€ํ™” ๊ณ„์† ๊ฐ€๋Šฅ)"
]
return {
"final_answer": final_message,
"intermediate_steps": steps_delta
}
@trace_node("initiate_dynamic_search")
def initiate_dynamic_search_node(state: AgentState) -> dict:
"""
๋‹ค์ค‘ ์งˆ๋ฌธ ์ฒ˜๋ฆฌ์˜ ์ง„์ž… ๋…ธ๋“œ.
IMPORTANT:
- LangGraph์—์„œ `List[Send]`๋Š” **๋…ธ๋“œ ๋ฐ˜ํ™˜๊ฐ’**์ด ์•„๋‹ˆ๋ผ,
`add_conditional_edges(...)`์— ์ „๋‹ฌํ•˜๋Š” **edge ํ•จ์ˆ˜ ๋ฐ˜ํ™˜๊ฐ’**์œผ๋กœ๋งŒ ์‚ฌ์šฉํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.
- ๋”ฐ๋ผ์„œ ์ด ๋…ธ๋“œ๋Š” dict ์—…๋ฐ์ดํŠธ๋งŒ ๋ฐ˜ํ™˜ํ•˜๊ณ ,
์‹ค์ œ fan-out์€ ๋ณ„๋„ edge ํ•จ์ˆ˜(`fanout_multi_questions`)๊ฐ€ ๋‹ด๋‹นํ•ฉ๋‹ˆ๋‹ค.
"""
plan = state.plan or {}
sub_questions = plan.get("sub_questions", [])
logger.info("๋™์  ๋ณต์ œ ์ค€๋น„: %d๊ฐœ ์งˆ๋ฌธ", len(sub_questions))
return {
"intermediate_steps": [f"๐Ÿ”€ ๋‹ค์ค‘ ์งˆ๋ฌธ fan-out ์ค€๋น„: {len(sub_questions)}๊ฐœ"]
}
def fanout_multi_questions(state: AgentState):
"""
๋‹ค์ค‘ ์งˆ๋ฌธ์„ Send API๋กœ fan-out ํ•ฉ๋‹ˆ๋‹ค.
๋ฐ˜ํ™˜๊ฐ’(List[Send])์€ conditional edge ํ•จ์ˆ˜์—์„œ๋งŒ ํ—ˆ์šฉ๋ฉ๋‹ˆ๋‹ค.
"""
from langgraph.types import Send
plan = state.plan or {}
sub_questions = plan.get("sub_questions", [])
original_question = state.user_question
messages = state.messages
logger.info("๋™์  ๋ณต์ œ: %d๊ฐœ ์งˆ๋ฌธ์„ ๊ฐ๊ฐ ์ „์ฒด ๊ทธ๋ž˜ํ”„๋กœ ์‹คํ–‰", len(sub_questions))
sends = []
for i, sq in enumerate(sub_questions):
# IMPORTANT: ์ด ํ”„๋กœ์ ํŠธ๋Š” AgentState(BaseModel)๋ฅผ ๋…ธ๋“œ ์ž…๋ ฅ์œผ๋กœ ์‚ฌ์šฉํ•˜๋ฏ€๋กœ,
# Send arg๋„ dict๊ฐ€ ์•„๋‹ˆ๋ผ AgentState ์ธ์Šคํ„ด์Šค๋กœ ๋ณด๋‚ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.
child = state.model_copy(deep=True)
# ์งˆ๋ฌธ ๊ต์ฒด + ๋‹ค์ค‘ ์งˆ๋ฌธ ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ
child.user_question = sq
child.is_multi_question = True
child.sub_question_index = i
child.sub_question_text = sq
child.original_multi_question = original_question
# ๊ณตํ†ต ์œ ์ง€ ํ•„๋“œ
child.messages = messages
child.plan = plan
# ๊ธฐ์กด ๊ทธ๋ž˜ํ”„๊ฐ€ ๋‹ค์‹œ ์ฑ„์šธ ํ•„๋“œ๋“ค์€ ์ดˆ๊ธฐํ™”
child.question_type = None
child.should_cache = None
child.canonical_question = None
child.analysis_reasoning = None
child.cached_result = None
child.detected_intent = None
child.search_results = []
child.subtask_results = {}
child.refinement_count = 0
child.needs_refinement = False
child.original_question = None
child.final_answer = None
child.multi_answers = []
child.intermediate_steps = [f"๐Ÿ”„ ์งˆ๋ฌธ {i+1}/{len(sub_questions)}: {sq[:50]}"]
# ๋‹ค์ค‘ ์งˆ๋ฌธ์€ outer graph์—์„œ ๊ธฐ์กด ํŒŒ์ดํ”„๋ผ์ธ ์ „์ฒด๋ฅผ ๋ณ‘๋ ฌ๋กœ ๋Œ๋ฆฌ๋ฉด
# scalar state ์ฑ„๋„(question_type ๋“ฑ)์—์„œ concurrent update ์ถฉ๋Œ์ด ๋‚ฉ๋‹ˆ๋‹ค.
# ๋”ฐ๋ผ์„œ worker ๋…ธ๋“œ ์•ˆ์—์„œ '๋‹จ์ผ ์งˆ๋ฌธ ๊ทธ๋ž˜ํ”„'๋ฅผ ๋ณ„๋„๋กœ ์‹คํ–‰ํ•œ ๋’ค,
# outer state์—๋Š” multi_answers(reducer)๋งŒ ์—…๋ฐ์ดํŠธํ•ฉ๋‹ˆ๋‹ค.
sends.append(Send("run_single_question_worker", child))
return sends
@trace_node("combine_answers")
def combine_answers_node(state: AgentState) -> dict:
"""
Fan-in: ๋ชจ๋“  Send๊ฐ€ ์™„๋ฃŒ๋˜๋ฉด multi_answers๋ฅผ ์กฐํ•ฉํ•ฉ๋‹ˆ๋‹ค.
Reducer (Annotated[List[dict], add])๊ฐ€ ์ž๋™์œผ๋กœ
๋ชจ๋“  parallel Send์˜ ๊ฒฐ๊ณผ๋ฅผ multi_answers์— ๋ชจ์•„๋‘ก๋‹ˆ๋‹ค.
์ด ๋…ธ๋“œ๋Š” ๋‹จ์ˆœํžˆ ๋ชจ์•„์ง„ ๊ฒฐ๊ณผ๋ฅผ ์ฝ์–ด์„œ Markdown์œผ๋กœ ์กฐํ•ฉํ•ฉ๋‹ˆ๋‹ค.
"""
answers = state.multi_answers
original_question = state.original_multi_question or state.user_question
if not answers:
logger.error("๋‹ค์ค‘ ๋‹ต๋ณ€์ด ๋น„์–ด์žˆ์Œ")
return {
"final_answer": "๋‹ต๋ณ€ ์ƒ์„ฑ์— ์‹คํŒจํ–ˆ์Šต๋‹ˆ๋‹ค. ๋‹ค์‹œ ์‹œ๋„ํ•ด ์ฃผ์„ธ์š”.",
"intermediate_steps": ["โŒ multi_answers ๋น„์–ด์žˆ์Œ"]
}
# ์ธ๋ฑ์Šค ์ˆœ์œผ๋กœ ์ •๋ ฌ
answers.sort(key=lambda x: x["index"])
# Markdown ํ˜•์‹์œผ๋กœ ์กฐํ•ฉ
combined_parts = []
for ans in answers:
section = f"""## {ans['index']+1}. {ans['question']}
{ans['answer']}"""
combined_parts.append(section)
combined = "\n\n---\n\n".join(combined_parts)
# ํ—ค๋” ์ถ”๊ฐ€
header = f"# ๋‹ค์ค‘ ์งˆ๋ฌธ ๋‹ต๋ณ€\n\n์›๋ณธ ์งˆ๋ฌธ: {original_question}\n\n---\n\n"
final_combined = header + combined
logger.info("๋‹ค์ค‘ ๋‹ต๋ณ€ ์กฐํ•ฉ ์™„๋ฃŒ: %d๊ฐœ", len(answers))
return {
"final_answer": final_combined,
"intermediate_steps": [f"โœ… {len(answers)}๊ฐœ ๋‹ต๋ณ€ ์กฐํ•ฉ ์™„๋ฃŒ"]
}
def _build_search_subgraph_local() -> StateGraph:
"""nodes.py ๋‚ด๋ถ€์—์„œ ๋‹จ์ผ ์งˆ๋ฌธ ๊ทธ๋ž˜ํ”„์šฉ ๊ฒ€์ƒ‰ ์„œ๋ธŒ๊ทธ๋ž˜ํ”„๋ฅผ ๊ตฌ์„ฑ."""
subgraph = StateGraph(AgentState)
subgraph.add_node("filter_and_score", filter_and_score_node)
subgraph.add_node("summarize_results", summarize_results_node)
subgraph.add_edge(START, "filter_and_score")
subgraph.add_edge("filter_and_score", "summarize_results")
subgraph.add_edge("summarize_results", END)
return subgraph.compile()
def _get_single_question_agent():
"""
๋‹ค์ค‘ ์งˆ๋ฌธ worker์—์„œ ์‚ฌ์šฉํ•  '๋‹จ์ผ ์งˆ๋ฌธ ํŒŒ์ดํ”„๋ผ์ธ' ๊ทธ๋ž˜ํ”„๋ฅผ lazy-compile ํ•ด์„œ ์บ์‹ฑํ•ฉ๋‹ˆ๋‹ค.
(outer state ์ถฉ๋Œ์„ ํ”ผํ•˜๊ธฐ ์œ„ํ•ด, worker ๋‚ด๋ถ€์—์„œ ๋ณ„๋„ ๊ทธ๋ž˜ํ”„๋ฅผ ์‹คํ–‰)
"""
global _SINGLE_QUESTION_AGENT # type: ignore[name-defined]
try:
return _SINGLE_QUESTION_AGENT # type: ignore[name-defined]
except Exception:
pass
# ---- routing helpers (graph.py ์˜ ๋‹จ์ผ ์งˆ๋ฌธ ํ๋ฆ„๊ณผ ๋™์ผ) ----
def _route_after_analysis(s: AgentState) -> Literal["generate_with_history", "check_cache"]:
raw_qtype = s.question_type or "independent"
legacy_map = {"followup": "clarification", "cache_candidate": "independent", "new_search": "independent"}
question_type = legacy_map.get(raw_qtype, raw_qtype)
return "generate_with_history" if question_type == "clarification" else "check_cache"
def _route_after_cache(s: AgentState) -> Literal["return_cached_answer", "classify_intent"]:
return "return_cached_answer" if s.cached_result else "classify_intent"
def _route_after_evaluation(s: AgentState) -> Literal["refine_search", "search_subgraph"]:
if s.needs_refinement and s.refinement_count < 1:
return "refine_search"
return "search_subgraph"
def _initiate_parallel_search(s: AgentState):
return [
Send("search_stackoverflow", s),
Send("search_github", s),
Send("search_official_docs", s),
]
# ---- build ----
g = StateGraph(AgentState)
g.add_node("analyze_question", analyze_question_node)
g.add_node("generate_with_history", generate_with_history_node)
g.add_node("check_cache", check_cache_node)
g.add_node("return_cached_answer", return_cached_answer_node)
g.add_node("classify_intent", classify_intent_node)
g.add_node("search_stackoverflow", search_stackoverflow_node)
g.add_node("search_github", search_github_node)
g.add_node("search_official_docs", search_official_docs_node)
g.add_node("collect_results", collect_results_node)
g.add_node("evaluate_results", evaluate_results_node)
g.add_node("refine_search", refine_search_node)
g.add_node("generate_answer", generate_answer_node)
search_subgraph = _build_search_subgraph_local()
g.add_node("search_subgraph", search_subgraph)
g.add_edge(START, "analyze_question")
g.add_conditional_edges(
"analyze_question",
_route_after_analysis,
{"generate_with_history": "generate_with_history", "check_cache": "check_cache"},
)
g.add_edge("generate_with_history", END)
g.add_conditional_edges(
"check_cache",
_route_after_cache,
{"return_cached_answer": "return_cached_answer", "classify_intent": "classify_intent"},
)
g.add_edge("return_cached_answer", END)
g.add_conditional_edges("classify_intent", _initiate_parallel_search)
g.add_edge("search_stackoverflow", "collect_results")
g.add_edge("search_github", "collect_results")
g.add_edge("search_official_docs", "collect_results")
g.add_edge("collect_results", "evaluate_results")
g.add_conditional_edges(
"evaluate_results",
_route_after_evaluation,
{"refine_search": "refine_search", "search_subgraph": "search_subgraph"},
)
g.add_edge("refine_search", "classify_intent")
g.add_edge("search_subgraph", "generate_answer")
g.add_edge("generate_answer", END)
_SINGLE_QUESTION_AGENT = g.compile()
return _SINGLE_QUESTION_AGENT
@trace_node("run_single_question_worker")
async def run_single_question_worker_node(state: AgentState) -> dict:
"""
๋‹ค์ค‘ ์งˆ๋ฌธ์˜ ๊ฐ ์„œ๋ธŒ ์งˆ๋ฌธ์„ '๋‹จ์ผ ์งˆ๋ฌธ ๊ทธ๋ž˜ํ”„'๋กœ ์‹คํ–‰ํ•œ ๋’ค,
outer graph์—๋Š” reducer ์ฑ„๋„(multi_answers)๋งŒ ์—…๋ฐ์ดํŠธํ•ฉ๋‹ˆ๋‹ค.
"""
agent = _get_single_question_agent()
# inner ์‹คํ–‰์€ multi-question ํ”Œ๋ž˜๊ทธ๋ฅผ ๊บผ์„œ(=multi_answers append ๋ฐฉ์ง€)
inner = state.model_copy(deep=True)
inner.is_multi_question = False
inner.multi_answers = []
result = await agent.ainvoke(
{
"user_question": inner.user_question,
"messages": inner.messages,
}
)
answer_text = result.get("final_answer") or ""
return {
"multi_answers": [
{
"index": state.sub_question_index,
"question": state.sub_question_text or state.user_question,
"answer": answer_text,
}
],
"intermediate_steps": [f"โœ… ์„œ๋ธŒ ์งˆ๋ฌธ {state.sub_question_index + 1} ์ฒ˜๋ฆฌ ์™„๋ฃŒ"],
}
@trace_node("generate_with_history")
async def generate_with_history_node(state: AgentState) -> dict:
"""
๋Œ€ํ™” ํžˆ์Šคํ† ๋ฆฌ๋งŒ ์‚ฌ์šฉํ•˜์—ฌ ํ›„์† ์งˆ๋ฌธ์— ๋‹ต๋ณ€ํ•ฉ๋‹ˆ๋‹ค.
Phase 2: Follow-up Handler
- ์บ์‹œ ๊ฒ€์ƒ‰ ์•ˆ ํ•จ
- ์›น ๊ฒ€์ƒ‰ ์•ˆ ํ•จ
- ์บ์‹œ์— ์ €์žฅ ์•ˆ ํ•จ
- messages ํžˆ์Šคํ† ๋ฆฌ๋งŒ ํ™œ์šฉ
"""
user_question = state.user_question
messages_history = state.messages
logger.info("๋Œ€ํ™” ํžˆ์Šคํ† ๋ฆฌ ๊ธฐ๋ฐ˜ ๋‹ต๋ณ€ ์ƒ์„ฑ: %s", user_question[:50])
# ๋Œ€ํ™” ๋งฅ๋ฝ ๊ตฌ์„ฑ
context_prompt = "์ด์ „ ๋Œ€ํ™”๋ฅผ ์ฐธ๊ณ ํ•˜์—ฌ ํ›„์† ์งˆ๋ฌธ์— ๋‹ต๋ณ€ํ•˜์„ธ์š”.\n\n"
if messages_history:
context_prompt += "๋Œ€ํ™” ๋‚ด์—ญ:\n"
for msg in messages_history[:-1]: # ํ˜„์žฌ ์งˆ๋ฌธ ์ œ์™ธ
if hasattr(msg, 'type') and hasattr(msg, 'content'):
role = "์‚ฌ์šฉ์ž" if msg.type == "human" else "AI"
context_prompt += f"{role}: {msg.content}\n\n"
context_prompt += f"ํ˜„์žฌ ์งˆ๋ฌธ: {user_question}\n\n"
context_prompt += "์ด์ „ ๋Œ€ํ™” ๋งฅ๋ฝ์„ ๊ณ ๋ คํ•˜์—ฌ ์ž์„ธํ•˜๊ณ  ์นœ์ ˆํ•˜๊ฒŒ ๋‹ต๋ณ€ํ•˜์„ธ์š”."
updates = {}
steps_delta: List[str] = []
try:
response = llm.invoke([HumanMessage(content=context_prompt)])
final_answer = response.content.strip()
updates["final_answer"] = final_answer
steps_delta.append(f"๐Ÿ’ฌ ๋Œ€ํ™” ํžˆ์Šคํ† ๋ฆฌ ๊ธฐ๋ฐ˜ ๋‹ต๋ณ€ ์ƒ์„ฑ (๊ธธ์ด: {len(final_answer)}์ž)")
steps_delta.append("โš ๏ธ ์บ์‹œ ์ €์žฅ ์ƒ๋žต (๋ณด์ถฉ ์š”์ฒญ)")
logger.info("๋Œ€ํ™” ํžˆ์Šคํ† ๋ฆฌ ๊ธฐ๋ฐ˜ ๋‹ต๋ณ€ ์ƒ์„ฑ ์™„๋ฃŒ")
except Exception as e:
logger.error("๋Œ€ํ™” ํžˆ์Šคํ† ๋ฆฌ ๊ธฐ๋ฐ˜ ๋‹ต๋ณ€ ์ƒ์„ฑ ์‹คํŒจ: %s", e, exc_info=True)
updates["final_answer"] = "๋‹ต๋ณ€ ์ƒ์„ฑ์— ์‹คํŒจํ–ˆ์Šต๋‹ˆ๋‹ค. ๋‹ค์‹œ ์‹œ๋„ํ•ด ์ฃผ์„ธ์š”."
steps_delta.append(f"โŒ ๋‹ต๋ณ€ ์ƒ์„ฑ ์‹คํŒจ: {str(e)}")
updates["intermediate_steps"] = steps_delta
return updates