""" CodeWeaver LangGraph 노드 구현. 각 노드는 AgentState를 받아 처리하고 업데이트된 상태를 반환합니다. 모든 노드는 LangSmith를 통해 자동으로 추적됩니다. """ import asyncio import logging import os from typing import List, Literal, Optional from langchain_core.messages import HumanMessage, SystemMessage from langchain_google_genai import ChatGoogleGenerativeAI from langgraph.graph import StateGraph, START, END from langgraph.types import Send from src.agent.state import AgentState, SearchResult from src.agent.state import _MULTI_ANS_RESET_TOKEN # reset token for multi_answers reducer from src.tools.search_tools import ( search_github, search_official_docs, search_stackoverflow, ) from src.utils.tracing import trace_node from src.vector_db.qdrant_client import QdrantManager logger = logging.getLogger(__name__) # LLM 초기화 (Gemini 2.5 Flash) llm = ChatGoogleGenerativeAI( model="gemini-2.5-flash-lite", temperature=0.7, ) # Qdrant 매니저 초기화 qdrant_manager = QdrantManager() @trace_node("analyze_question") async def analyze_question_node(state: AgentState) -> dict: """ 질문을 분석하여 유형을 분류하고 캐시 적격성을 판단합니다. Phase 2: Question Analysis & Cache Eligibility Decision 분류: - followup: 이전 대화에 의존하는 후속 질문 - cache_candidate: 독립적이고 재사용 가능한 질문 - new_search: 독립적이지만 캐시하지 않을 질문 (시간 민감 등) """ user_question = state.user_question messages = state.messages # 대화 맥락 구성 has_history = messages and len(messages) > 1 context_info = "" if has_history: context_info = "\n이전 대화 맥락:\n" for msg in messages[-4:-1]: # 현재 질문 제외 최근 3개 if hasattr(msg, 'type') and hasattr(msg, 'content'): role = "사용자" if msg.type == "human" else "AI" context_info += f"{role}: {msg.content[:100]}\n" analysis_prompt = f"""질문을 분석하여 유형을 분류하고, 캐시 적격성을 판단하세요. {context_info} 현재 질문: {user_question} 분류 기준: 1. **clarification** (보충/형식 변경 요청) - 이전 답변/대화 내용을 바탕으로 "설명 방식"을 바꾸거나 보충을 요청 - 예: "좀 더 쉽게 설명해줘", "예제 코드로 보여줘", "한 줄로 요약해줘", "다시 설명해줘" - 원칙: 검색/캐시가 아니라 대화 히스토리 기반 답변 - should_cache = false, canonical_question = null 2. **new_topic** (대화 중 새 개념 질문) - 대화가 이어지는 중이지만, 질문 자체가 독립적으로 성립하는 '새 개념/정의/비교/사용법' 질문 - 예: (React 이야기 중) "Event Listener는 뭐야?", "CORS가 뭐야?" - 원칙: 검색 + 캐시 저장 가치가 큼 - should_cache = true (기본), canonical_question 생성 3. **independent** (완전 독립 질문) - 이전 대화 없이도 이해 가능한 일반 질문 - 예: "Spring Security가 뭐야?", "Docker Compose 사용법은?" - 원칙: 검색 + 캐시 저장 가치가 큼 - should_cache = true (기본), canonical_question 생성 다음 JSON 형식으로만 답변하세요: {{ "question_type": "clarification|new_topic|independent", "should_cache": true|false, "reasoning": "분류 이유 1-2문장", "canonical_question": "캐시할 정규화된 질문 (should_cache가 true인 경우에만, 아니면 null)" }} JSON 외에 다른 텍스트는 포함하지 마세요.""" try: messages_to_llm = [HumanMessage(content=analysis_prompt)] response = llm.invoke(messages_to_llm) # JSON 파싱 import json response_text = response.content.strip() # JSON 블록 추출 (마크다운 코드 블록 제거) if "```json" in response_text: response_text = response_text.split("```json")[1].split("```")[0].strip() elif "```" in response_text: response_text = response_text.split("```")[1].split("```")[0].strip() analysis = json.loads(response_text) question_type = analysis.get("question_type", "independent") should_cache = analysis.get("should_cache", False) reasoning = analysis.get("reasoning", "") canonical_question = analysis.get("canonical_question", user_question) # 유효성 검증 if question_type not in ["clarification", "new_topic", "independent"]: question_type = "independent" # 1차 정책 보정: clarification은 캐시 금지 if question_type == "clarification": should_cache = False canonical_question = None else: # new_topic/independent는 기본적으로 캐시 가능 if canonical_question is None or (isinstance(canonical_question, str) and not canonical_question.strip()): canonical_question = user_question # 실행(run) 시작마다 step 로그를 리셋하고, 이번 실행의 step만 누적되게 함 steps_delta = [ "__RESET_STEPS__", f"🔍 질문 분석: {question_type} (캐시 여부: {should_cache})", ] return { "question_type": question_type, "should_cache": should_cache, "analysis_reasoning": reasoning, "canonical_question": canonical_question if should_cache else None, "intermediate_steps": steps_delta } except Exception as e: logger.error("질문 분석 실패: %s", e, exc_info=True) # 기본값: 독립 질문으로 간주 steps_delta = [ "__RESET_STEPS__", "⚠️ 질문 분석 실패, 기본값 사용: independent", ] return { "question_type": "independent", "should_cache": True, "analysis_reasoning": "분석 실패, 기본값 사용", "canonical_question": user_question, "intermediate_steps": steps_delta } @trace_node("check_cache") async def check_cache_node(state: AgentState) -> dict: """ 벡터 DB 캐시에서 유사한 질문을 검색합니다. threshold 0.85 이상인 경우 캐시 히트로 판단합니다. """ question_for_lookup = state.canonical_question or state.user_question logger.info("캐시 확인 중: %s", question_for_lookup[:50]) try: cached_result = await qdrant_manager.search_cache( question=question_for_lookup, threshold=0.85 ) updates = {} steps_delta: List[str] = [] if cached_result: updates["cached_result"] = cached_result steps_delta.append(f"✅ 캐시 히트 (답변 길이: {len(cached_result)}자)") logger.info("캐시 히트") else: updates["cached_result"] = None steps_delta.append("❌ 캐시 미스: 새로운 검색 필요") logger.info("캐시 미스") except Exception as e: logger.error("캐시 확인 실패: %s", e, exc_info=True) updates["cached_result"] = None steps_delta.append(f"⚠️ 캐시 확인 오류: {str(e)}") updates["intermediate_steps"] = steps_delta return updates @trace_node("create_plan") def create_plan_node(state: AgentState) -> dict: """ 질문을 분석하여 유형과 개수를 판단합니다. Phase 4: Dynamic Parallel Search - single_topic: 하나의 주제 (기존 그래프 실행) - multiple_questions: 독립 질문 2개 (Send API로 그래프 2회 실행) - too_many: 독립 질문 3개 이상 (에러 메시지) LangGraph 공식 가이드라인: 노드는 한 가지 일만 수행 (계획 수립) """ user_question = state.user_question logger.info("질문 분석 및 계획 수립 중: %s", user_question[:50]) def _extract_question_candidates(text: str) -> List[str]: """입력 문자열에서 '질문 후보'를 최대한 보수적으로 추출합니다(3개 이상 감지용).""" import re if not text: return [] t = text.strip() # 1) 물음표 기반 분리 (가장 신뢰도 높음) parts = re.split(r"[??]+", t) candidates = [p.strip() for p in parts if p.strip()] if len(candidates) >= 2 and re.search(r"[??]", t): # 물음표가 존재할 때만 이 규칙을 신뢰 return candidates # 2) 줄바꿈/번호 매기기 기반 (다중 질문 입력 패턴) lines = [ln.strip() for ln in re.split(r"[\r\n]+", t) if ln.strip()] numbered = [] for ln in lines: if re.match(r"^\s*(\d+[\.\)]|[-*])\s+", ln): numbered.append(re.sub(r"^\s*(\d+[\.\)]|[-*])\s+", "", ln).strip()) if len(numbered) >= 2: return numbered # 3) 구분자 기반(세미콜론) — 보조 semi = [p.strip() for p in t.split(";") if p.strip()] if len(semi) >= 2: return semi return [t] def _hard_guard_too_many(text: str) -> Optional[dict]: """ 하드 가드: 사용자가 '질문 3개 이상'을 한 번에 던진 것으로 확실한 경우, LLM 분류와 무관하게 too_many로 강제합니다. """ import re if not text: return None # 가장 확실한 기준: 물음표가 3개 이상 qmarks = len(re.findall(r"[??]", text)) if qmarks >= 3: candidates = _extract_question_candidates(text) msg = "죄송합니다. 질문은 한 번에 최대 2개까지 가능합니다. 가장 중요한 2개만 골라서 다시 질문해 주세요." return { "case": "too_many", "sub_questions": candidates, "reasoning": f"물음표가 {qmarks}개로, 3개 이상의 독립 질문으로 판단했습니다.", "error_message": msg, "steps_note": f"⚠️ 질문 수 초과 감지(물음표 {qmarks}개) → too_many로 강제", } # 번호 매기기/리스트로 3개 이상 candidates = _extract_question_candidates(text) if len(candidates) >= 3: msg = "죄송합니다. 질문은 한 번에 최대 2개까지 가능합니다. 가장 중요한 2개만 골라서 다시 질문해 주세요." return { "case": "too_many", "sub_questions": candidates, "reasoning": f"질문 후보가 {len(candidates)}개로 감지되어 3개 이상 질문으로 판단했습니다.", "error_message": msg, "steps_note": f"⚠️ 질문 수 초과 감지(후보 {len(candidates)}개) → too_many로 강제", } return None # 하드 가드(결정론적) — LLM이 잘못 분류하더라도 3개 이상이면 무조건 차단 hard = _hard_guard_too_many(user_question) if hard: steps_delta = [ f"📋 계획 타입: {hard['case']}", f" 서브질문: {len(hard['sub_questions'])}개", f" 이유: {hard['reasoning']}", hard["steps_note"], ] logger.info("계획 수립 완료(하드 가드): too_many, %d개 서브질문", len(hard["sub_questions"])) return { "plan": { "case": hard["case"], "sub_questions": hard["sub_questions"], "reasoning": hard["reasoning"], "error_message": hard["error_message"], }, "is_multi_question": False, "sub_question_index": 0, "sub_question_text": None, "original_multi_question": None, "multi_answers": [{"__token__": _MULTI_ANS_RESET_TOKEN}], "intermediate_steps": steps_delta, } plan_prompt = f"""질문을 분석하여 유형과 개수를 판단하세요. 질문: {user_question} **중요**: sub_questions의 용도는 case에 따라 다릅니다! **Case 1: single_topic** (하나의 주제) - 예: "Spring Security JWT 인증 구현" → sub_questions: ["개념", "구현", "예제"] → 용도: 답변 섹션 구조 (검색은 원본 질문으로 1회만) → 검색: "Spring Security JWT 인증 구현" - 예: "React hooks 완벽 가이드" → sub_questions: ["hooks란", "주요 hooks", "실무 패턴"] → 용도: 답변 섹션 구조 → 검색: "React hooks 완벽 가이드" **Case 2: multiple_questions** (여러 독립 질문, 최대 2개) - 예: "JWT가 뭐야? CORS는?" → sub_questions: ["JWT가 뭐야?", "CORS는?"] → 용도: 각 질문마다 별도 검색 → 검색: "JWT가 뭐야?" (1회), "CORS는?" (1회) - 예: "Docker 사용법은? Redis 설치는?" → sub_questions: ["Docker 사용법은?", "Redis 설치는?"] → 용도: 각 질문마다 별도 검색 **Case 3: too_many** (3개 이상 질문) - 예: "JWT? CORS? Docker?" → 너무 많아서 처리 불가 → error_message 제공 규칙: - single_topic: sub_questions는 짧은 키워드/구절 (1-5개) - multiple_questions: sub_questions는 완전한 문장 (정확히 2개만) - too_many: 3개 이상이면 이 케이스로 분류 다음 JSON 형식으로만 답변하세요: {{ "case": "single_topic|multiple_questions|too_many", "sub_questions": [...], "reasoning": "이 케이스로 판단한 이유", "error_message": "..." (too_many인 경우만, 그 외는 빈 문자열) }} JSON 외에 다른 텍스트는 포함하지 마세요.""" try: import json messages_to_llm = [HumanMessage(content=plan_prompt)] response = llm.invoke(messages_to_llm) # JSON 파싱 response_text = response.content.strip() # JSON 블록 추출 (마크다운 코드 블록 제거) if "```json" in response_text: response_text = response_text.split("```json")[1].split("```")[0].strip() elif "```" in response_text: response_text = response_text.split("```")[1].split("```")[0].strip() plan_data = json.loads(response_text) case = plan_data.get("case", "single_topic") sub_questions = plan_data.get("sub_questions", [user_question]) reasoning = plan_data.get("reasoning", "") error_message = plan_data.get("error_message", "") # LLM 결과를 받은 뒤에도 한 번 더 하드 가드 적용 (안전장치) hard2 = _hard_guard_too_many(user_question) if hard2: case = hard2["case"] sub_questions = hard2["sub_questions"] reasoning = hard2["reasoning"] error_message = hard2["error_message"] # 유효성 검증 if not sub_questions or len(sub_questions) == 0: sub_questions = [user_question] case = "single_topic" # multiple_questions일 때 2개 제한 강제 (단, 3개 이상은 위 하드 가드에서 too_many로 처리됨) if case == "multiple_questions" and len(sub_questions) > 2: sub_questions = sub_questions[:2] reasoning += " (질문 수 제한: 최대 2개)" steps_delta = [ f"📋 계획 타입: {case}", f" 서브질문: {len(sub_questions)}개", f" 이유: {reasoning}" ] logger.info("계획 수립 완료: %s, %d개 서브질문", case, len(sub_questions)) # NOTE: 이 그래프는 체크포인팅/스레드 유지가 가능하므로, # multi_answers는 매 실행(run) 시작 시 리셋해야 이전 턴 누적이 발생하지 않습니다. return { "plan": { "case": case, "sub_questions": sub_questions, "reasoning": reasoning, "error_message": error_message }, "is_multi_question": False, "sub_question_index": 0, "sub_question_text": None, "original_multi_question": None, "multi_answers": [{"__token__": _MULTI_ANS_RESET_TOKEN}], "intermediate_steps": steps_delta } except Exception as e: logger.error("계획 수립 실패: %s", e, exc_info=True) # 기본값: 원본 질문 그대로 사용 steps_delta = [ "⚠️ 계획 수립 실패, 기본값 사용: single_topic" ] return { "plan": { "case": "single_topic", "sub_questions": [user_question], "reasoning": "계획 수립 실패, 기본값 사용", "error_message": "" }, "is_multi_question": False, "sub_question_index": 0, "sub_question_text": None, "original_multi_question": None, "multi_answers": [{"__token__": _MULTI_ANS_RESET_TOKEN}], "intermediate_steps": steps_delta } @trace_node("classify_intent") def classify_intent_node(state: AgentState) -> dict: """ LLM을 사용하여 사용자 질문의 의도를 분류합니다. 분류 카테고리: - debugging: 에러 해결, 버그 수정 - learning: 개념 학습, 원리 이해 - code_review: 코드 개선, 리팩토링 """ logger.info("의도 분류 중: %s", state.user_question[:50]) classification_prompt = f"""질문을 다음 세 가지 의도 중 하나로 분류하세요: 1. debugging: 에러 해결, 버그 수정, 문제 해결 예: "ImportError가 발생해요", "이 코드가 작동하지 않아요" 2. learning: 개념 학습, 원리 이해, 튜토리얼 예: "async/await가 뭔가요?", "JPA 동작 원리는?" 3. code_review: 코드 개선, 리팩토링, 베스트 프랙티스 예: "이 코드를 개선할 방법은?", "더 나은 설계는?" 질문: {state.user_question} 반드시 debugging, learning, code_review 중 하나만 답하세요.""" updates = {} steps_delta: List[str] = [] try: messages = [ SystemMessage(content="당신은 개발자 질문을 분류하는 전문가입니다."), HumanMessage(content=classification_prompt) ] response = llm.invoke(messages) intent_raw = response.content.strip().lower() # 유효한 의도로 정규화 valid_intents = ["debugging", "learning", "code_review"] intent = next((i for i in valid_intents if i in intent_raw), "learning") updates["detected_intent"] = intent steps_delta.append(f"🎯 의도 분류: {intent}") logger.info("의도 분류 완료: %s", intent) except Exception as e: logger.error("의도 분류 실패: %s", e, exc_info=True) updates["detected_intent"] = "learning" steps_delta.append("⚠️ 의도 분류 실패, 기본값 사용: learning") updates["intermediate_steps"] = steps_delta return updates @trace_node("search_stackoverflow") def search_stackoverflow_node(state: AgentState) -> dict: """ Stack Overflow에서 검색을 수행합니다. Send API를 통한 병렬 검색의 일부로 실행됩니다. search_results와 intermediate_steps는 Annotated[List, add]로 정의되어 있어 자동으로 머지됩니다. """ intent = state.detected_intent or "learning" count = 5 if intent == "debugging" else 3 logger.info("Stack Overflow 검색 시작: %d개", count) try: results = search_stackoverflow(state.user_question, count) logger.info("Stack Overflow에서 %d개 결과 수집", len(results)) # reducer가 자동으로 머지하므로 새 결과만 반환 return { "search_results": results, "intermediate_steps": [f"🔍 Stack Overflow: {len(results)}개 결과"] } except Exception as e: logger.error("Stack Overflow 검색 실패: %s", e) return { "intermediate_steps": [f"⚠️ Stack Overflow 검색 실패: {str(e)}"] } @trace_node("search_github") def search_github_node(state: AgentState) -> dict: """ GitHub Issues/Discussions에서 검색을 수행합니다. Send API를 통한 병렬 검색의 일부로 실행됩니다. """ intent = state.detected_intent or "learning" count = 5 if intent == "code_review" else 3 if intent == "learning" else 2 logger.info("GitHub 검색 시작: %d개", count) try: results = search_github(state.user_question, count) logger.info("GitHub에서 %d개 결과 수집", len(results)) # reducer가 자동으로 머지 return { "search_results": results, "intermediate_steps": [f"🔍 GitHub: {len(results)}개 결과"] } except Exception as e: logger.error("GitHub 검색 실패: %s", e) return { "intermediate_steps": [f"⚠️ GitHub 검색 실패: {str(e)}"] } @trace_node("search_official_docs") def search_official_docs_node(state: AgentState) -> dict: """ 공식 문서/Tavily에서 검색을 수행합니다. Send API를 통한 병렬 검색의 일부로 실행됩니다. """ intent = state.detected_intent or "learning" count = 5 if intent == "learning" else 2 logger.info("공식 문서 검색 시작: %d개", count) try: results = search_official_docs(state.user_question, count) logger.info("공식 문서에서 %d개 결과 수집", len(results)) # reducer가 자동으로 머지 return { "search_results": results, "intermediate_steps": [f"🔍 공식 문서: {len(results)}개 결과"] } except Exception as e: logger.error("공식 문서 검색 실패: %s", e) return { "intermediate_steps": [f"⚠️ 공식 문서 검색 실패: {str(e)}"] } @trace_node("collect_results") def collect_results_node(state: AgentState) -> dict: """ 병렬 검색 결과를 수집하고 카운트합니다. Fan-in 포인트: 3개의 병렬 검색 노드가 모두 완료된 후 실행됩니다. LangGraph 공식 가이드라인: Send API의 fan-in 지점에서 결과 집계 """ total_results = len(state.search_results) logger.info("검색 결과 수집 완료: %d개", total_results) steps_delta = [ f"📊 검색 결과 수집: 총 {total_results}개" ] return { "intermediate_steps": steps_delta } @trace_node("evaluate_results") def evaluate_results_node(state: AgentState) -> dict: """ 검색 결과의 개수와 품질을 모두 평가합니다. 평가 기준: 1. 개수: 최소 2개 이상 2. 품질: 평균 relevance_score >= 0.6 """ search_results = state.search_results # 직접 사용 (더 안전) refinement_count = state.refinement_count result_count = len(search_results) logger.info("검색 결과 평가: %d개 (개선 횟수: %d)", result_count, refinement_count) # 안전장치: 이미 1회 개선했으면 더 이상 개선하지 않음 if refinement_count >= 1: steps_delta = [ f"⚠️ 최대 개선 횟수 도달 ({refinement_count}회), 현재 결과로 진행" ] return { "needs_refinement": False, "intermediate_steps": steps_delta } # 1차 평가: 개수 if result_count < 2: steps_delta = [ f"⚠️ 검색 결과 부족 ({result_count}개 < 2개), 쿼리 개선 필요" ] return { "needs_refinement": True, "intermediate_steps": steps_delta } # 2차 평가: 품질 (relevance_score가 있는 경우만) scored_results = [r for r in search_results if r.relevance_score is not None] if scored_results: avg_score = sum(r.relevance_score for r in scored_results) / len(scored_results) # 평균 점수가 0.5 미만이면 품질 부족 if avg_score < 0.5: steps_delta = [ f"⚠️ 검색 결과 품질 부족 (평균 점수: {avg_score:.2f} < 0.5), 쿼리 개선 필요" ] return { "needs_refinement": True, "intermediate_steps": steps_delta } steps_delta = [ f"✅ 검색 결과 충분 ({result_count}개, 평균 점수: {avg_score:.2f}), 필터링 단계로 진행" ] else: # relevance_score가 아직 없으면 개수만으로 판단 steps_delta = [ f"✅ 검색 결과 충분 ({result_count}개), 필터링 단계로 진행" ] return { "needs_refinement": False, "intermediate_steps": steps_delta } @trace_node("refine_search") def refine_search_node(state: AgentState) -> dict: """ 검색 쿼리를 개선합니다. Open Deep Research 패턴: - LLM이 전략을 선택 (구체화/일반화/번역) - 원본 질문 보존 (최종 답변 생성 시 사용) LangGraph 공식 가이드라인: - 상태에 원시 데이터 저장 (전략 정보 포함) - 프롬프트는 노드 내에서 동적 생성 """ user_question = state.user_question original_question = state.original_question or user_question result_count = len(state.search_results) logger.info("검색 쿼리 개선 중: %s (%d개 결과)", user_question[:50], result_count) refinement_prompt = f"""검색 결과가 부족합니다. 검색 쿼리를 개선하세요. 원본 질문: {user_question} 현재 결과 수: {result_count}개 (목표: 2개 이상) 개선 전략 (하나 선택): 1. MORE_SPECIFIC: 기술적 세부사항 추가 예: "React hooks" → "React useEffect cleanup function dependencies" 2. MORE_GENERAL: 더 넓은 용어 사용 예: "Spring Cloud Sleuth 2.x trace" → "distributed tracing Spring Boot" 3. TRANSLATE: 언어 변환 예: "JWT 인증 구현" → "JWT authentication implementation" 예: "WebSocket connection" → "WebSocket 연결 방법" 다음 JSON 형식으로만 답변하세요: {{ "new_query": "개선된 검색 쿼리", "strategy": "MORE_SPECIFIC|MORE_GENERAL|TRANSLATE", "reasoning": "이 전략을 선택한 이유 1-2문장" }} JSON 외에 다른 텍스트는 포함하지 마세요.""" try: import json messages_to_llm = [HumanMessage(content=refinement_prompt)] response = llm.invoke(messages_to_llm) # JSON 파싱 response_text = response.content.strip() if "```json" in response_text: response_text = response_text.split("```json")[1].split("```")[0].strip() elif "```" in response_text: response_text = response_text.split("```")[1].split("```")[0].strip() refinement_data = json.loads(response_text) new_query = refinement_data.get("new_query", user_question) strategy = refinement_data.get("strategy", "MORE_GENERAL") reasoning = refinement_data.get("reasoning", "") steps_delta = [ f"🔄 쿼리 개선: {strategy}", f" 이전: {user_question[:50]}...", f" 이후: {new_query[:50]}...", f" 이유: {reasoning}" ] logger.info("쿼리 개선 완료: %s → %s", user_question[:30], new_query[:30]) return { "user_question": new_query, "original_question": original_question, "refinement_count": state.refinement_count + 1, "search_results": [], # CRITICAL: 이전 검색 결과 제거 후 재검색 "intermediate_steps": steps_delta } except Exception as e: logger.error("쿼리 개선 실패: %s", e, exc_info=True) # 기본 전략: 영문 키워드 추출 (간단한 fallback) fallback_query = user_question + " tutorial example" steps_delta = [ f"⚠️ 쿼리 개선 실패, 기본 전략 사용", f" 이후: {fallback_query}" ] return { "user_question": fallback_query, "original_question": original_question, "refinement_count": state.refinement_count + 1, "search_results": [], # CRITICAL: 실패 시에도 이전 검색 결과 제거 "intermediate_steps": steps_delta } @trace_node("filter_and_score") def filter_and_score_node(state: AgentState) -> dict: """ 검색 결과를 필터링하고 관련도 점수를 매깁니다. - 최소 길이 50자 이상, URL 존재하는 결과만 유지 - 상위 5개 결과에 대해 LLM으로 관련도 평가 - 관련도 순으로 정렬하여 상위 10개 선택 """ search_results = state.search_results logger.info("검색 결과 필터링 중: %d개", len(search_results)) # 기본 필터링 filtered = [ r for r in search_results if r.content and len(r.content) >= 50 and r.url ] logger.info("기본 필터링 후: %d개 결과", len(filtered)) # 상위 5개 결과만 LLM으로 점수 매기기 (비용 절감) for result in filtered[:5]: if result.relevance_score is None: try: scoring_prompt = f"""질문: {state.user_question} 검색 결과: {result.content[:500]} 이 검색 결과가 질문에 얼마나 관련이 있는지 0.0에서 1.0 사이의 점수로 평가하세요. 점수만 숫자로 답하세요. (예: 0.8)""" response = llm.invoke([HumanMessage(content=scoring_prompt)]) score_str = response.content.strip() result.relevance_score = float(score_str) except Exception as e: logger.warning("점수 매기기 실패: %s", e) result.relevance_score = 0.5 # 관련도 순으로 정렬 filtered.sort(key=lambda r: r.relevance_score or 0, reverse=True) # 상위 5개만 유지 top_results = filtered[:5] subtask_results = dict(state.subtask_results) subtask_results["filtered_results"] = [r.model_dump() for r in top_results] steps_delta = [f"✂️ 필터링 완료: {len(top_results)}개 결과 선택"] logger.info("필터링 완료: %d개 결과", len(top_results)) return { "subtask_results": subtask_results, "intermediate_steps": steps_delta } @trace_node("summarize_results") def summarize_results_node(state: AgentState) -> dict: """ 필터링된 각 검색 결과를 초보 개발자가 이해하기 쉽게 요약합니다. 각 결과를 2-3문장으로 핵심 내용만 추출합니다. """ subtask_results = state.subtask_results filtered_results = subtask_results.get("filtered_results", []) logger.info("검색 결과 요약 중: %d개", len(filtered_results)) summaries = [] for result_dict in filtered_results: try: summary_prompt = f"""다음 검색 결과를 초보 개발자가 이해하기 쉽게 2-3문장으로 요약하세요: 출처: {result_dict['source']} 내용: {result_dict['content'][:1000]} 핵심 내용만 간단명료하게 요약하세요.""" response = llm.invoke([HumanMessage(content=summary_prompt)]) summaries.append({ "source": result_dict['source'], "url": result_dict['url'], "summary": response.content.strip(), "relevance": result_dict.get('relevance_score', 0.5) }) except Exception as e: logger.error("요약 실패: %s", e) updated_subtask_results = dict(subtask_results) updated_subtask_results["summaries"] = summaries steps_delta = [f"📝 요약 완료: {len(summaries)}개 결과"] logger.info("요약 완료: %d개", len(summaries)) return { "subtask_results": updated_subtask_results, "intermediate_steps": steps_delta } @trace_node("generate_answer") async def generate_answer_node(state: AgentState) -> dict: """ 요약된 정보를 바탕으로 최종 답변을 생성합니다. 의도별로 다른 답변 구조를 사용하며, 생성된 답변은 캐시에 저장됩니다. """ subtask_results = state.subtask_results summaries = subtask_results.get("summaries", []) intent = state.detected_intent or "learning" logger.info("최종 답변 생성 중: %s", intent) # 의도별 프롬프트 템플릿 templates = { "debugging": """다음 정보를 바탕으로 디버깅 질문에 답변하세요: 질문: {question} 수집된 정보: {summaries} 답변 구조: 1. 문제 정의 2. 발생 원인 3. 해결 방법 (코드 예제 포함) 4. 주의사항 5. 참고 자료 초보 개발자도 이해할 수 있게 Markdown 형식으로 작성하세요.""", "learning": """다음 정보를 바탕으로 학습 질문에 답변하세요: 질문: {question} 수집된 정보: {summaries} 답변 구조: 1. 개념 설명 (간단명료) 2. 동작 원리 3. 예제 코드 (주석 포함) 4. 실무 활용 팁 5. 추가 학습 자료 초보 개발자도 이해할 수 있게 Markdown 형식으로 작성하세요.""", "code_review": """다음 정보를 바탕으로 코드 리뷰 질문에 답변하세요: 질문: {question} 수집된 정보: {summaries} 답변 구조: 1. 현재 접근 방식 분석 2. 개선 포인트 3. 리팩토링 예제 4. 베스트 프랙티스 5. 참고 패턴 초보 개발자도 이해할 수 있게 Markdown 형식으로 작성하세요.""" } template = templates.get(intent, templates["learning"]) # 요약 텍스트 포맷팅 summaries_text = "\n\n".join([ f"출처: {s['source']} ({s['url']})\n요약: {s['summary']}" for s in summaries ]) # 이전 대화 맥락 추가 (messages 사용) context_prefix = "" messages_history = state.messages if messages_history and len(messages_history) > 1: context_prefix = "이전 대화 맥락:\n" # 최근 6개 메시지 (3턴) 사용 for msg in messages_history[-6:]: if hasattr(msg, 'type'): if msg.type == "human": context_prefix += f"사용자: {msg.content}\n" elif msg.type == "ai": context_prefix += f"AI: {msg.content[:200]}...\n\n" context_prefix += "---\n현재 질문:\n" final_prompt = (context_prefix + template).format( question=(state.original_question or state.user_question), summaries=summaries_text ) updates = {} steps_delta: List[str] = [] try: response = llm.invoke([HumanMessage(content=final_prompt)]) final_answer = response.content.strip() updates["final_answer"] = final_answer # Phase 3: 조건부 캐시 저장 # - clarification: 캐시 금지 (그래프 상 generate_with_history로 빠지지만, 방어적으로 한 번 더 체크) # - new_topic/independent: 캐시 가능(should_cache가 True일 때) should_cache = state.should_cache if state.should_cache is not None else True canonical_question = state.canonical_question qtype = state.question_type or "independent" if should_cache and qtype in ["new_topic", "independent"]: # 캐시할 질문: canonical_question 우선, 없으면 원본 질문 question_to_cache = canonical_question or state.user_question await qdrant_manager.save_to_cache( question=question_to_cache, answer=final_answer ) steps_delta.append(f"✅ 최종 답변 생성 완료 (길이: {len(final_answer)}자)") steps_delta.append(f"💾 캐시 저장 완료 (질문: {question_to_cache[:50]}...)") logger.info("최종 답변 생성 및 캐시 저장 완료: %s", question_to_cache[:50]) else: steps_delta.append(f"✅ 최종 답변 생성 완료 (길이: {len(final_answer)}자)") steps_delta.append("⚠️ 캐시 저장 생략 (독립적이지 않거나 일회성 질문)") logger.info("최종 답변 생성 완료 (캐시 저장 생략)") except Exception as e: logger.error("답변 생성 실패: %s", e, exc_info=True) updates["final_answer"] = "답변 생성에 실패했습니다. 다시 시도해 주세요." steps_delta.append(f"❌ 답변 생성 실패: {str(e)}") updates["intermediate_steps"] = steps_delta # Phase 4: Multi-question handling # NOTE: AgentState는 Pydantic(BaseModel)이므로 dict-style state.get(...) 사용 금지 if state.is_multi_question: answer_text = updates.get("final_answer") if answer_text: # Append to multi_answers (reducer will auto-merge) updates["multi_answers"] = [{ "index": state.sub_question_index, "question": state.sub_question_text or state.user_question, "answer": answer_text }] logger.info("다중 질문 답변 추가: Q%d", state.sub_question_index) return updates @trace_node("return_cached_answer") def return_cached_answer_node(state: AgentState) -> dict: """ 캐시 히트 시 저장된 답변을 반환합니다. 검색 및 생성 과정을 건너뛰고 즉시 답변을 제공합니다. """ logger.info("캐시된 답변 반환") steps_delta = ["💾 캐시된 답변 반환 (검색 생략)"] return { "final_answer": state.cached_result, "intermediate_steps": steps_delta } @trace_node("handle_too_many_questions") def handle_too_many_questions_node(state: AgentState) -> dict: """ 3개 이상 질문 시 안내 메시지를 반환합니다. 대화를 종료하지 않고, 사용자가 다시 질문할 수 있도록 합니다. """ plan = state.plan or {} error_message = plan.get("error_message", "") sub_questions = plan.get("sub_questions", []) logger.info("질문 수 초과: %d개", len(sub_questions)) default_message = """죄송합니다. 한 번에 최대 2개의 질문까지만 처리할 수 있습니다. 다음 중 하나를 선택해서 다시 질문해 주세요: 1. **하나의 주제로 통합해서 질문** 예: "JWT 인증과 CORS 설정을 함께 구현하는 방법" 2. **가장 중요한 2개 질문만 선택** 예: "JWT가 뭐야? 내 코드에 어떻게 적용해?" 3. **질문을 나눠서 순차적으로 질문** 예: 먼저 "JWT가 뭐야?" 질문 → 답변 확인 → 다음 질문 어떻게 도와드릴까요?""" final_message = error_message if error_message else default_message steps_delta = [ f"⚠️ 질문 수 초과: {len(sub_questions)}개", "💬 안내 메시지 제공 (대화 계속 가능)" ] return { "final_answer": final_message, "intermediate_steps": steps_delta } @trace_node("initiate_dynamic_search") def initiate_dynamic_search_node(state: AgentState) -> dict: """ 다중 질문 처리의 진입 노드. IMPORTANT: - LangGraph에서 `List[Send]`는 **노드 반환값**이 아니라, `add_conditional_edges(...)`에 전달하는 **edge 함수 반환값**으로만 사용해야 합니다. - 따라서 이 노드는 dict 업데이트만 반환하고, 실제 fan-out은 별도 edge 함수(`fanout_multi_questions`)가 담당합니다. """ plan = state.plan or {} sub_questions = plan.get("sub_questions", []) logger.info("동적 복제 준비: %d개 질문", len(sub_questions)) return { "intermediate_steps": [f"🔀 다중 질문 fan-out 준비: {len(sub_questions)}개"] } def fanout_multi_questions(state: AgentState): """ 다중 질문을 Send API로 fan-out 합니다. 반환값(List[Send])은 conditional edge 함수에서만 허용됩니다. """ from langgraph.types import Send plan = state.plan or {} sub_questions = plan.get("sub_questions", []) original_question = state.user_question messages = state.messages logger.info("동적 복제: %d개 질문을 각각 전체 그래프로 실행", len(sub_questions)) sends = [] for i, sq in enumerate(sub_questions): # IMPORTANT: 이 프로젝트는 AgentState(BaseModel)를 노드 입력으로 사용하므로, # Send arg도 dict가 아니라 AgentState 인스턴스로 보내야 합니다. child = state.model_copy(deep=True) # 질문 교체 + 다중 질문 메타데이터 child.user_question = sq child.is_multi_question = True child.sub_question_index = i child.sub_question_text = sq child.original_multi_question = original_question # 공통 유지 필드 child.messages = messages child.plan = plan # 기존 그래프가 다시 채울 필드들은 초기화 child.question_type = None child.should_cache = None child.canonical_question = None child.analysis_reasoning = None child.cached_result = None child.detected_intent = None child.search_results = [] child.subtask_results = {} child.refinement_count = 0 child.needs_refinement = False child.original_question = None child.final_answer = None child.multi_answers = [] child.intermediate_steps = [f"🔄 질문 {i+1}/{len(sub_questions)}: {sq[:50]}"] # 다중 질문은 outer graph에서 기존 파이프라인 전체를 병렬로 돌리면 # scalar state 채널(question_type 등)에서 concurrent update 충돌이 납니다. # 따라서 worker 노드 안에서 '단일 질문 그래프'를 별도로 실행한 뒤, # outer state에는 multi_answers(reducer)만 업데이트합니다. sends.append(Send("run_single_question_worker", child)) return sends @trace_node("combine_answers") def combine_answers_node(state: AgentState) -> dict: """ Fan-in: 모든 Send가 완료되면 multi_answers를 조합합니다. Reducer (Annotated[List[dict], add])가 자동으로 모든 parallel Send의 결과를 multi_answers에 모아둡니다. 이 노드는 단순히 모아진 결과를 읽어서 Markdown으로 조합합니다. """ answers = state.multi_answers original_question = state.original_multi_question or state.user_question if not answers: logger.error("다중 답변이 비어있음") return { "final_answer": "답변 생성에 실패했습니다. 다시 시도해 주세요.", "intermediate_steps": ["❌ multi_answers 비어있음"] } # 인덱스 순으로 정렬 answers.sort(key=lambda x: x["index"]) # Markdown 형식으로 조합 combined_parts = [] for ans in answers: section = f"""## {ans['index']+1}. {ans['question']} {ans['answer']}""" combined_parts.append(section) combined = "\n\n---\n\n".join(combined_parts) # 헤더 추가 header = f"# 다중 질문 답변\n\n원본 질문: {original_question}\n\n---\n\n" final_combined = header + combined logger.info("다중 답변 조합 완료: %d개", len(answers)) return { "final_answer": final_combined, "intermediate_steps": [f"✅ {len(answers)}개 답변 조합 완료"] } def _build_search_subgraph_local() -> StateGraph: """nodes.py 내부에서 단일 질문 그래프용 검색 서브그래프를 구성.""" subgraph = StateGraph(AgentState) subgraph.add_node("filter_and_score", filter_and_score_node) subgraph.add_node("summarize_results", summarize_results_node) subgraph.add_edge(START, "filter_and_score") subgraph.add_edge("filter_and_score", "summarize_results") subgraph.add_edge("summarize_results", END) return subgraph.compile() def _get_single_question_agent(): """ 다중 질문 worker에서 사용할 '단일 질문 파이프라인' 그래프를 lazy-compile 해서 캐싱합니다. (outer state 충돌을 피하기 위해, worker 내부에서 별도 그래프를 실행) """ global _SINGLE_QUESTION_AGENT # type: ignore[name-defined] try: return _SINGLE_QUESTION_AGENT # type: ignore[name-defined] except Exception: pass # ---- routing helpers (graph.py 의 단일 질문 흐름과 동일) ---- def _route_after_analysis(s: AgentState) -> Literal["generate_with_history", "check_cache"]: raw_qtype = s.question_type or "independent" legacy_map = {"followup": "clarification", "cache_candidate": "independent", "new_search": "independent"} question_type = legacy_map.get(raw_qtype, raw_qtype) return "generate_with_history" if question_type == "clarification" else "check_cache" def _route_after_cache(s: AgentState) -> Literal["return_cached_answer", "classify_intent"]: return "return_cached_answer" if s.cached_result else "classify_intent" def _route_after_evaluation(s: AgentState) -> Literal["refine_search", "search_subgraph"]: if s.needs_refinement and s.refinement_count < 1: return "refine_search" return "search_subgraph" def _initiate_parallel_search(s: AgentState): return [ Send("search_stackoverflow", s), Send("search_github", s), Send("search_official_docs", s), ] # ---- build ---- g = StateGraph(AgentState) g.add_node("analyze_question", analyze_question_node) g.add_node("generate_with_history", generate_with_history_node) g.add_node("check_cache", check_cache_node) g.add_node("return_cached_answer", return_cached_answer_node) g.add_node("classify_intent", classify_intent_node) g.add_node("search_stackoverflow", search_stackoverflow_node) g.add_node("search_github", search_github_node) g.add_node("search_official_docs", search_official_docs_node) g.add_node("collect_results", collect_results_node) g.add_node("evaluate_results", evaluate_results_node) g.add_node("refine_search", refine_search_node) g.add_node("generate_answer", generate_answer_node) search_subgraph = _build_search_subgraph_local() g.add_node("search_subgraph", search_subgraph) g.add_edge(START, "analyze_question") g.add_conditional_edges( "analyze_question", _route_after_analysis, {"generate_with_history": "generate_with_history", "check_cache": "check_cache"}, ) g.add_edge("generate_with_history", END) g.add_conditional_edges( "check_cache", _route_after_cache, {"return_cached_answer": "return_cached_answer", "classify_intent": "classify_intent"}, ) g.add_edge("return_cached_answer", END) g.add_conditional_edges("classify_intent", _initiate_parallel_search) g.add_edge("search_stackoverflow", "collect_results") g.add_edge("search_github", "collect_results") g.add_edge("search_official_docs", "collect_results") g.add_edge("collect_results", "evaluate_results") g.add_conditional_edges( "evaluate_results", _route_after_evaluation, {"refine_search": "refine_search", "search_subgraph": "search_subgraph"}, ) g.add_edge("refine_search", "classify_intent") g.add_edge("search_subgraph", "generate_answer") g.add_edge("generate_answer", END) _SINGLE_QUESTION_AGENT = g.compile() return _SINGLE_QUESTION_AGENT @trace_node("run_single_question_worker") async def run_single_question_worker_node(state: AgentState) -> dict: """ 다중 질문의 각 서브 질문을 '단일 질문 그래프'로 실행한 뒤, outer graph에는 reducer 채널(multi_answers)만 업데이트합니다. """ agent = _get_single_question_agent() # inner 실행은 multi-question 플래그를 꺼서(=multi_answers append 방지) inner = state.model_copy(deep=True) inner.is_multi_question = False inner.multi_answers = [] result = await agent.ainvoke( { "user_question": inner.user_question, "messages": inner.messages, } ) answer_text = result.get("final_answer") or "" return { "multi_answers": [ { "index": state.sub_question_index, "question": state.sub_question_text or state.user_question, "answer": answer_text, } ], "intermediate_steps": [f"✅ 서브 질문 {state.sub_question_index + 1} 처리 완료"], } @trace_node("generate_with_history") async def generate_with_history_node(state: AgentState) -> dict: """ 대화 히스토리만 사용하여 후속 질문에 답변합니다. Phase 2: Follow-up Handler - 캐시 검색 안 함 - 웹 검색 안 함 - 캐시에 저장 안 함 - messages 히스토리만 활용 """ user_question = state.user_question messages_history = state.messages logger.info("대화 히스토리 기반 답변 생성: %s", user_question[:50]) # 대화 맥락 구성 context_prompt = "이전 대화를 참고하여 후속 질문에 답변하세요.\n\n" if messages_history: context_prompt += "대화 내역:\n" for msg in messages_history[:-1]: # 현재 질문 제외 if hasattr(msg, 'type') and hasattr(msg, 'content'): role = "사용자" if msg.type == "human" else "AI" context_prompt += f"{role}: {msg.content}\n\n" context_prompt += f"현재 질문: {user_question}\n\n" context_prompt += "이전 대화 맥락을 고려하여 자세하고 친절하게 답변하세요." updates = {} steps_delta: List[str] = [] try: response = llm.invoke([HumanMessage(content=context_prompt)]) final_answer = response.content.strip() updates["final_answer"] = final_answer steps_delta.append(f"💬 대화 히스토리 기반 답변 생성 (길이: {len(final_answer)}자)") steps_delta.append("⚠️ 캐시 저장 생략 (보충 요청)") logger.info("대화 히스토리 기반 답변 생성 완료") except Exception as e: logger.error("대화 히스토리 기반 답변 생성 실패: %s", e, exc_info=True) updates["final_answer"] = "답변 생성에 실패했습니다. 다시 시도해 주세요." steps_delta.append(f"❌ 답변 생성 실패: {str(e)}") updates["intermediate_steps"] = steps_delta return updates