""" CodeWeaver LangGraph 워크플로우 구성. LangGraph 6가지 핵심 기능 완벽 구현: ✅ Conditional Edges: 질문 유형, 캐시 여부에 따른 분기 ✅ Send API: 3개 검색 노드 병렬 실행 (fan-out/fan-in) ✅ Subgraph: 검색 결과 처리 파이프라인 ✅ Map-Reduce: Send API로 병렬 검색 → 결과 머지 ✅ Checkpointing: MemorySaver로 대화 상태 저장 ✅ Pydantic Typed State: 타입 안전성 보장 """ import logging from typing import Literal from langgraph.checkpoint.memory import MemorySaver from langgraph.graph import StateGraph, START, END from langgraph.types import Send from src.agent.state import AgentState from src.agent.nodes import ( analyze_question_node, check_cache_node, create_plan_node, classify_intent_node, search_stackoverflow_node, search_github_node, search_official_docs_node, collect_results_node, evaluate_results_node, refine_search_node, filter_and_score_node, summarize_results_node, generate_answer_node, return_cached_answer_node, generate_with_history_node, handle_too_many_questions_node, initiate_dynamic_search_node, combine_answers_node, fanout_multi_questions, run_single_question_worker_node, ) logger = logging.getLogger(__name__) def build_search_subgraph() -> StateGraph: """ 검색 결과 처리 서브그래프를 구성합니다. 흐름: filter_and_score → summarize_results 이 서브그래프는 메인 그래프에서 하나의 노드처럼 동작하며, 검색 결과의 필터링과 요약을 담당합니다. Returns: 컴파일된 서브그래프 """ # 서브그래프 생성 (AgentState 사용) subgraph = StateGraph(AgentState) # 노드 추가 subgraph.add_node("filter_and_score", filter_and_score_node) subgraph.add_node("summarize_results", summarize_results_node) # 서브그래프 내부 흐름 정의 # START → filter_and_score → summarize_results → END subgraph.add_edge(START, "filter_and_score") subgraph.add_edge("filter_and_score", "summarize_results") subgraph.add_edge("summarize_results", END) return subgraph.compile() def route_after_analysis(state: AgentState) -> Literal["generate_with_history", "check_cache"]: """ 질문 분석 결과에 따라 다음 노드를 결정합니다. Phase 2: New Routing Structure Args: state: 현재 에이전트 상태 Returns: - "generate_with_history": 후속 질문 → 대화 히스토리 기반 답변 - "check_cache": 독립 질문 → 캐시 확인 """ # NOTE: 과거 체크포인트/구버전 상태값 호환을 위해 구값도 매핑 처리 raw_qtype = state.question_type or "independent" legacy_map = { "followup": "clarification", "cache_candidate": "independent", "new_search": "independent", } question_type = legacy_map.get(raw_qtype, raw_qtype) if question_type == "clarification": return "generate_with_history" # new_topic / independent 는 모두 캐시 확인(히트면 검색 생략, 미스면 검색) return "check_cache" def route_after_plan(state: AgentState) -> Literal["analyze_question", "initiate_dynamic_search", "handle_too_many_questions"]: """ create_plan 결과에 따라 다음 노드를 결정합니다. Phase 4: Dynamic Parallel Search Args: state: 현재 에이전트 상태 Returns: - "analyze_question": 단일 주제 → 기존 그래프 실행 - "initiate_dynamic_search": 다중 질문 (2개) → Send API로 그래프 2회 실행 - "handle_too_many_questions": 질문 3개 이상 → 에러 메시지 """ plan = state.plan or {} case = plan.get("case", "single_topic") if case == "too_many": return "handle_too_many_questions" elif case == "multiple_questions": return "initiate_dynamic_search" else: return "analyze_question" def route_after_cache(state: AgentState) -> Literal["return_cached_answer", "classify_intent"]: """ 캐시 히트 여부에 따라 다음 노드를 결정합니다. Phase 3 → Phase 4: create_plan 제거됨 (이미 START에서 실행) Args: state: 현재 에이전트 상태 Returns: - "return_cached_answer": 캐시 히트 시 즉시 답변 반환 - "classify_intent": 캐시 미스 시 의도 분류 """ if state.cached_result: return "return_cached_answer" else: return "classify_intent" def route_after_generate(state: AgentState) -> Literal["combine_answers", END]: """ generate_answer 후 다음 노드를 결정합니다. Phase 4: Dynamic Parallel Search Args: state: 현재 에이전트 상태 Returns: - "combine_answers": 다중 질문 → 답변 통합 - END: 단일 질문 → 종료 """ if state.is_multi_question: return "combine_answers" return END def route_after_evaluation(state: AgentState) -> Literal["refine_search", "search_subgraph"]: """ 검색 결과 평가 후 다음 노드를 결정합니다. Phase 3: Open Deep Research 패턴 - 쿼리 개선 루프 Args: state: 현재 에이전트 상태 Returns: - "refine_search": 결과 부족 & 개선 횟수 0회 → 쿼리 개선 - "search_subgraph": 결과 충분 or 개선 횟수 1회 → 필터링 진행 """ needs_refinement = state.needs_refinement refinement_count = state.refinement_count # 안전장치: 최대 1회만 개선 if needs_refinement and refinement_count < 1: return "refine_search" else: return "search_subgraph" def initiate_parallel_search(state: AgentState): """ Send API를 사용하여 3개의 검색 노드를 병렬로 실행합니다. LangGraph Send API (Map-Reduce 패턴): - 각 검색 노드에 동일한 state를 전송 - 모든 노드가 병렬로 실행됨 - 결과는 자동으로 머지됨 Args: state: 현재 에이전트 상태 Returns: Send 객체 리스트 (fan-out) """ # Send API를 사용한 fan-out # 3개의 검색 노드가 동시에 실행됨 return [ Send("search_stackoverflow", state), Send("search_github", state), Send("search_official_docs", state), ] def build_agent_graph() -> StateGraph: """ CodeWeaver 에이전트의 메인 그래프를 구성합니다. Phase 4: Dynamic Parallel Search for Multiple Questions 전체 흐름: 1. START → create_plan (질문 유형 및 개수 판단) 2. 질문 유형에 따른 분기: - single_topic: analyze_question → 기존 그래프 - multiple_questions: initiate_dynamic_search → Send API (각 질문마다 기존 그래프 독립 실행) - too_many: handle_too_many_questions → END 3. analyze_question → 질문 분석 - clarification: generate_with_history → END - new_topic/independent: check_cache 4. 캐시 확인: - 히트: return_cached_answer → END - 미스: classify_intent 5. Send API (병렬 검색 fan-out): - classify_intent → 3개 검색 노드 병렬 실행 6. collect_results (fan-in) → evaluate_results 7. 검색 결과 평가: - 부족 & refinement_count < 1: refine_search → classify_intent (루프) - 충분 or refinement_count >= 1: search_subgraph 8. search_subgraph (filter → summarize) 9. search_subgraph → generate_answer 10. generate_answer 후 분기: - is_multi_question: combine_answers → END - 단일 질문: END 핵심 개선사항 (Phase 4): - ✅ create_plan을 START로 이동 (질문 개수 먼저 감지) - ✅ Send API로 기존 그래프 재사용 (코드 중복 없음) - ✅ 질문 3개 이상 시 친절한 에러 메시지 - ✅ Reducer 패턴으로 자동 fan-in Returns: 구성된 StateGraph (컴파일 전) """ # 메인 그래프 생성 graph = StateGraph(AgentState) # Phase 4: 계획 수립 (START 직후) graph.add_node("create_plan", create_plan_node) graph.add_node("handle_too_many_questions", handle_too_many_questions_node) graph.add_node("initiate_dynamic_search", initiate_dynamic_search_node) # Phase 2: 질문 분석 & 대화 히스토리 처리 graph.add_node("analyze_question", analyze_question_node) graph.add_node("generate_with_history", generate_with_history_node) # 캐시 관련 graph.add_node("check_cache", check_cache_node) graph.add_node("return_cached_answer", return_cached_answer_node) # 의도 분류 graph.add_node("classify_intent", classify_intent_node) # Send API를 위한 병렬 검색 노드 graph.add_node("search_stackoverflow", search_stackoverflow_node) graph.add_node("search_github", search_github_node) graph.add_node("search_official_docs", search_official_docs_node) # Phase 3: 결과 수집 및 평가 graph.add_node("collect_results", collect_results_node) graph.add_node("evaluate_results", evaluate_results_node) graph.add_node("refine_search", refine_search_node) # 최종 답변 생성 graph.add_node("generate_answer", generate_answer_node) # Phase 4: 다중 질문 답변 통합 graph.add_node("combine_answers", combine_answers_node) graph.add_node("run_single_question_worker", run_single_question_worker_node) # 서브그래프 (필터링 & 요약) search_subgraph = build_search_subgraph() graph.add_node("search_subgraph", search_subgraph) # ===== 엣지 구성 ===== # 1. START → create_plan (Phase 4: 진입점 변경) graph.add_edge(START, "create_plan") # 2. create_plan → 분기 (Phase 4: 질문 유형별 분기) graph.add_conditional_edges( "create_plan", route_after_plan, { "analyze_question": "analyze_question", "initiate_dynamic_search": "initiate_dynamic_search", "handle_too_many_questions": "handle_too_many_questions", } ) # 3. handle_too_many_questions → END graph.add_edge("handle_too_many_questions", END) # 4. initiate_dynamic_search는 Send 리턴 (각 Send가 analyze_question으로) # 실제 fan-out은 conditional edge 함수에서 수행해야 함 graph.add_conditional_edges( "initiate_dynamic_search", fanout_multi_questions, ) # multi-question worker들이 끝나면 reducer(multi_answers)에 모인 결과를 합칩니다. # Fan-in: 두 worker가 모두 이 edge로 들어오면 combine_answers는 1회 실행됩니다. graph.add_edge("run_single_question_worker", "combine_answers") # 5. 질문 분석 결과에 따른 분기 graph.add_conditional_edges( "analyze_question", route_after_analysis, { "generate_with_history": "generate_with_history", "check_cache": "check_cache", } ) # 6. 대화 히스토리 기반 답변 → END graph.add_edge("generate_with_history", END) # 7. 캐시 확인 결과에 따른 분기 (Phase 4: create_plan 제거됨) graph.add_conditional_edges( "check_cache", route_after_cache, { "return_cached_answer": "return_cached_answer", "classify_intent": "classify_intent", } ) # 8. 캐시 히트 시 즉시 종료 graph.add_edge("return_cached_answer", END) # 9. Send API를 사용한 병렬 검색 (fan-out) graph.add_conditional_edges( "classify_intent", initiate_parallel_search, ) # 10. 모든 검색 노드 → collect_results (fan-in) graph.add_edge("search_stackoverflow", "collect_results") graph.add_edge("search_github", "collect_results") graph.add_edge("search_official_docs", "collect_results") # 11. collect_results → evaluate_results graph.add_edge("collect_results", "evaluate_results") # 12. 검색 결과 평가에 따른 분기 (Phase 3: refine_search 추가) graph.add_conditional_edges( "evaluate_results", route_after_evaluation, { "refine_search": "refine_search", "search_subgraph": "search_subgraph", } ) # 13. 쿼리 개선 → 의도 분류 (루프) graph.add_edge("refine_search", "classify_intent") # 14. 서브그래프 → 최종 답변 생성 graph.add_edge("search_subgraph", "generate_answer") # 15. 최종 답변 후 분기 (Phase 4: 다중 질문 처리) graph.add_conditional_edges( "generate_answer", route_after_generate, { "combine_answers": "combine_answers", END: END } ) # 16. combine_answers → 종료 graph.add_edge("combine_answers", END) return graph def create_agent(enable_checkpointing: bool = True): """ CodeWeaver 에이전트를 생성하고 컴파일합니다. Args: enable_checkpointing: 체크포인트 활성화 여부 - True: MemorySaver 사용 (개발/테스트용) - False: 체크포인트 없이 실행 (상태 저장 불가) Returns: 컴파일된 실행 가능한 그래프 Note: 프로덕션 환경에서는 MemorySaver 대신 PostgresSaver, SqliteSaver 등 영구 저장소 사용 권장 """ graph = build_agent_graph() if enable_checkpointing: # 메모리 기반 체크포인터 (프로덕션에서는 DB 사용 권장) memory = MemorySaver() return graph.compile(checkpointer=memory) else: return graph.compile() # 에이전트 인스턴스 생성 (모듈 임포트 시 자동 생성) agent = create_agent(enable_checkpointing=True)