# Dynamic Parallel Search for Multiple Independent Questions ## 개요 CodeWeaver Phase 4는 **다중 독립 질문**을 Send API로 동적 병렬 처리하여, 각 질문마다 독립적인 검색 파이프라인을 실행합니다. ### 핵심 철학 > "기존 그래프를 100% 재사용하되, 질문 개수만큼 복제해서 병렬 실행한다" - **기존 코드 재사용률**: ~95% - **새로운 노드**: 5개 추가 - **새로운 edge 함수**: 1개 추가 (fanout_multi_questions) - **수정된 노드**: 2개 수정 (create_plan, generate_answer) ## 주요 기능 ### 1. 자동 질문 유형 감지 **create_plan_node**가 질문을 분석하여 3가지 케이스로 분류: #### Case 1: single_topic - **정의**: 하나의 주제를 다각도로 묻는 경우 - **예시**: "Spring Security JWT 인증 구현 방법" - **서브질문**: ["개념", "구현", "예제"] (답변 섹션 구조용) - **실행**: 기존 그래프 1회 (검색은 원본 질문으로) #### Case 2: multiple_questions - **정의**: 서로 무관한 독립 질문 (최대 2개) - **예시**: "JWT가 뭐야? CORS는?" - **서브질문**: ["JWT가 뭐야?", "CORS는?"] (각각 별도 검색) - **실행**: Send API로 기존 그래프 2회 병렬 실행 #### Case 3: too_many - **정의**: 질문 3개 이상 - **예시**: "JWT? CORS? Docker?" - **실행**: 친절한 에러 메시지 표시, 대화 계속 가능 - **하드 가드**: LLM 분류와 무관하게 물음표 개수(3개 이상) 또는 질문 후보 개수(3개 이상)로 결정론적 차단 ### 2. 질문 개수 제한 비용 및 품질 관리를 위해 **최대 2개 질문**으로 제한: ``` 입력: "JWT? CORS? Docker? Redis?" 처리: too_many 케이스 → 에러 메시지 안내: "하나의 주제로 통합" 또는 "2개만 선택" 권장 ``` ### 3. Send API 동적 복제 **중요**: LangGraph에서 `List[Send]`는 노드 반환값이 아니라 **conditional edge 함수 반환값**으로만 사용됩니다. ```python # initiate_dynamic_search_node: state 준비만 (dict 반환) def initiate_dynamic_search_node(state: AgentState) -> dict: return {"intermediate_steps": [...]} # Send 반환 안 함! # fanout_multi_questions: conditional edge 함수 (List[Send] 반환) def fanout_multi_questions(state: AgentState) -> List[Send]: sends = [] for i, question in enumerate(["JWT가 뭐야?", "CORS는?"]): child_state = state.model_copy(deep=True) child_state.user_question = question child_state.is_multi_question = True # ... 메타데이터 설정 ... sends.append(Send("run_single_question_worker", child_state)) return sends # run_single_question_worker: 내부 서브그래프 실행 # 각 Send는 독립적으로 내부 그래프를 실행: # analyze → cache → classify → search(×3) → collect → eval → subgraph → generate # → multi_answers에 결과 추가 ``` ### 4. Reducer 자동 Fan-in (Reset 기능 포함) ```python # State 정의 (커스텀 reducer 사용) multi_answers: Annotated[List[Dict[str, Any]], merge_multi_answers] = [] # merge_multi_answers reducer: # - 기본 동작: old + new (병렬 worker에서 답변을 동시에 append) # - 리셋 동작: new의 첫 원소가 {"__token__": "__RESET_MULTI_ANS__"}이면 # old를 버리고 new[1:]로 교체 (이전 턴 누적 방지) # run_single_question_worker 1이 리턴: {"multi_answers": [{"index": 0, "question": "JWT가 뭐야?", "answer": "..."}]} # run_single_question_worker 2가 리턴: {"multi_answers": [{"index": 1, "question": "CORS는?", "answer": "..."}]} # LangGraph Reducer가 자동 병합: state.multi_answers = [ {"index": 0, ...}, {"index": 1, ...} ] # combine_answers_node가 이를 통합 Markdown으로 변환 ``` ## 그래프 흐름 ```mermaid graph TD START[START] --> plan[create_plan] plan -->|single_topic| analyze[analyze_question] plan -->|multiple_questions 2개| dynamic[initiate_dynamic_search] plan -->|too_many 3+| tooMany[handle_too_many_questions] tooMany --> END analyze --> cache[check_cache] cache -->|hit| returnCache[return_cached_answer] cache -->|miss| classify[classify_intent] returnCache --> END classify --> searchSO[search_stackoverflow] classify --> searchGH[search_github] classify --> searchDocs[search_official_docs] searchSO --> collect[collect_results] searchGH --> collect searchDocs --> collect collect --> eval[evaluate_results] eval -->|needs_refinement| refine[refine_search] eval -->|sufficient| filterNode[filter_and_score] refine --> classify filterNode --> summarize[summarize_results] summarize --> generate[generate_answer] generate -->|is_multi_question| combine[combine_answers] generate -->|single_topic| END combine --> END dynamic --> fanout[fanout_multi_questions
conditional edge] fanout -.Send Q1.-> worker1[run_single_question_worker
내부 서브그래프] fanout -.Send Q2.-> worker2[run_single_question_worker
내부 서브그래프] worker1 --> combine worker2 --> combine ``` ### 흐름 설명 #### Single Topic (기존 동작 유지) ``` START → create_plan (case: single_topic) → analyze → cache → classify → search(×3) → collect → eval → subgraph → generate → END ``` #### Multiple Questions (신규) ``` START → create_plan (case: multiple_questions) → initiate_dynamic_search (state 준비) → fanout_multi_questions (conditional edge) ├─ Send("run_single_question_worker", Q1) → [내부 서브그래프 전체 파이프라인] → multi_answers[0] └─ Send("run_single_question_worker", Q2) → [내부 서브그래프 전체 파이프라인] → multi_answers[1] → combine_answers (자동 fan-in) → END ``` #### Too Many (신규) ``` START → create_plan (case: too_many) → handle_too_many_questions → END (사용자는 즉시 다시 질문 가능) ``` ## 구현 상세 ### State 확장 ```python # src/agent/state.py class AgentState(BaseModel): # ... 기존 필드 ... # Phase 4: Dynamic Parallel Search is_multi_question: bool = False sub_question_index: int = 0 sub_question_text: Optional[str] = None original_multi_question: Optional[str] = None multi_answers: Annotated[List[Dict[str, Any]], merge_multi_answers] = [] ``` ### 새로운 노드 (5개) #### 1. create_plan_node (수정) - **위치**: `src/agent/nodes.py` 라인 206 - **역할**: 질문 유형 및 개수 판단 - **변경**: - `case` 필드 추가 (single_topic/multiple_questions/too_many) - **하드 가드 추가**: `_hard_guard_too_many` 함수로 3개 이상 질문 결정론적 차단 - 물음표 개수(3개 이상) 또는 질문 후보 개수(3개 이상) 감지 - LLM 분류와 무관하게 `too_many`로 강제 #### 2. handle_too_many_questions_node (신규) - **위치**: `src/agent/nodes.py` 라인 1068 - **역할**: 3개 이상 질문 시 안내 메시지 - **특징**: 대화 종료하지 않음 (즉시 재질문 가능) #### 3. initiate_dynamic_search_node (신규) - **위치**: `src/agent/nodes.py` 라인 1092 - **역할**: 다중 질문 처리 진입점, state 준비 - **핵심**: dict만 반환 (Send는 반환하지 않음) #### 4. fanout_multi_questions (신규 - Edge 함수) - **위치**: `src/agent/nodes.py` 라인 1110 - **역할**: conditional edge 함수로 `List[Send]` 반환 - **핵심**: 각 서브 질문을 `run_single_question_worker`로 Send #### 5. run_single_question_worker_node (신규) - **위치**: `src/agent/nodes.py` 라인 1306 - **역할**: 내부 서브그래프를 실행하여 state 충돌 방지 - **핵심**: - 독립된 단일 질문 그래프를 내부에서 실행 - outer graph의 scalar state 채널 충돌 방지 - 결과를 `multi_answers` reducer에만 추가 #### 6. combine_answers_node (신규) - **위치**: `src/agent/nodes.py` 라인 1168 - **역할**: multi_answers를 통합 Markdown 포맷으로 변환 - **특징**: 자동 fan-in (모든 Send 완료 대기) ### 수정된 노드 (1개) #### generate_answer_node (5줄 추가) - **위치**: `src/agent/nodes.py` 라인 726 - **추가 내용**: ```python # 기존 로직 마지막에 추가 if state.is_multi_question: updates["multi_answers"] = [{ "index": state.sub_question_index, "question": state.sub_question_text, "answer": final_answer }] ``` ### 그래프 재구성 ```python # src/agent/graph.py # 1. START 진입점 변경 graph.add_edge(START, "create_plan") # 기존: analyze_question # 2. create_plan 후 분기 추가 graph.add_conditional_edges( "create_plan", route_after_plan, { "analyze_question": "analyze_question", "initiate_dynamic_search": "initiate_dynamic_search", "handle_too_many_questions": "handle_too_many_questions" } ) # 3. initiate_dynamic_search 후 fan-out graph.add_conditional_edges( "initiate_dynamic_search", fanout_multi_questions, # List[Send] 반환 ) # 4. run_single_question_worker 후 fan-in graph.add_edge("run_single_question_worker", "combine_answers") # 5. generate_answer 후 분기 추가 graph.add_conditional_edges( "generate_answer", route_after_generate, { "combine_answers": "combine_answers", END: END } ) ``` ## 사용 예시 ### 예시 1: 단일 주제 (기존 동작) ```python from CodeWeaver.src.agent.graph import create_agent from langchain_core.messages import HumanMessage agent = create_agent() result = await agent.ainvoke({ "user_question": "React hooks 완벽 가이드", "messages": [HumanMessage(content="React hooks 완벽 가이드")] }) # 결과 # plan.case: "single_topic" # plan.sub_questions: ["hooks란", "주요 hooks", "실무 패턴"] # 흐름: 기존 그래프 1회 실행 # 출력: 일반 답변 형식 ``` ### 예시 2: 다중 독립 질문 (신규) ```python result = await agent.ainvoke({ "user_question": "JWT가 뭐야? CORS 에러는 어떻게 해결해?", "messages": [HumanMessage(content="JWT가 뭐야? CORS 에러는 어떻게 해결해?")] }) # 결과 # plan.case: "multiple_questions" # plan.sub_questions: ["JWT가 뭐야?", "CORS 에러는 어떻게 해결해?"] # 흐름: Send API로 그래프 2회 병렬 실행 # 출력: ``` **출력 예시**: ```markdown # 다중 질문 답변 원본 질문: JWT가 뭐야? CORS 에러는 어떻게 해결해? --- ## 1. JWT가 뭐야? JWT(JSON Web Token)는 인증 정보를 안전하게 전송하기 위한... [상세 답변...] --- ## 2. CORS 에러는 어떻게 해결해? CORS(Cross-Origin Resource Sharing) 에러는... [상세 답변...] ``` ### 예시 3: 질문 3개 이상 ```python result = await agent.ainvoke({ "user_question": "JWT? CORS? Docker?", "messages": [HumanMessage(content="JWT? CORS? Docker?")] }) # 결과 # plan.case: "too_many" # 출력: ``` **출력 예시**: ``` 죄송합니다. 한 번에 최대 2개의 질문까지만 처리할 수 있습니다. 다음 중 하나를 선택해서 다시 질문해 주세요: 1. **하나의 주제로 통합해서 질문** 예: "JWT 인증과 CORS 설정을 함께 구현하는 방법" 2. **가장 중요한 2개 질문만 선택** 예: "JWT가 뭐야? 내 코드에 어떻게 적용해?" 3. **질문을 나눠서 순차적으로 질문** 예: 먼저 "JWT가 뭐야?" 질문 → 답변 확인 → 다음 질문 어떻게 도와드릴까요? ``` ## 테스트 테스트 파일은 프로젝트 루트에 있습니다. (삭제됨 - 필요시 재생성) ### 테스트 시나리오 1. ✅ **단일 주제**: "Spring Security JWT 인증 구현 방법" - 기존 그래프 1회 실행 - multi_answers 비어있음 - 일반 답변 형식 2. ✅ **다중 질문 2개**: "JWT가 뭐야? CORS는?" - Send API로 그래프 2회 병렬 실행 - multi_answers에 2개 항목 - 섹션 구분된 통합 답변 3. ✅ **질문 3개 이상**: "JWT? CORS? Docker?" - handle_too_many_questions로 분기 - 친절한 에러 메시지 - 대화 계속 가능 4. ✅ **엣지 케이스**: "JWT? CORS? Docker? Redis?" - **하드 가드로 무조건 too_many 차단** (물음표 4개 감지) - LLM 분류와 무관하게 차단 보장 ## 성능 고려사항 ### 병렬 실행 - **단일 주제**: 3개 검색 노드 병렬 (기존) - **다중 질문 (2개)**: 2×3=6개 검색 노드 병렬 - LangGraph Send API가 자동 병렬화 관리 ### 비용 관리 - 질문 개수 제한: 최대 2개 - 검색 결과 개수: 소스당 3-5개 - 다중 질문 시 의도 분류 생략 (기본값 "learning" 사용) ### 캐싱 - **단일 주제**: 전체 답변 캐시 ✅ - **다중 질문**: 각 서브 질문 답변 개별 캐시 ✅ - Q1 답변 → Q1 질문으로 캐시 - Q2 답변 → Q2 질문으로 캐시 - 다음번 동일 질문 시 개별 캐시 히트 가능 ## 기술적 핵심 ### 1. Send API 패턴 (Conditional Edge 함수 사용) ```python # ❌ 잘못된 방법: 노드에서 Send 반환 def initiate_dynamic_search_node(state): return [Send(...), Send(...)] # 에러 발생! # ✅ 올바른 방법: conditional edge 함수에서 Send 반환 def fanout_multi_questions(state: AgentState) -> List[Send]: sends = [] for i, question in enumerate(sub_questions): child_state = state.model_copy(deep=True) child_state.user_question = question sends.append(Send("run_single_question_worker", child_state)) return sends # 그래프 설정 graph.add_conditional_edges( "initiate_dynamic_search", fanout_multi_questions, # List[Send] 반환 ) # LangGraph가 자동으로: # 1. 두 Send를 병렬 실행 # 2. 각 Send의 모든 노드 실행 대기 # 3. 다음 공통 노드로 이동 (combine_answers) ``` ### 2. Reducer 자동 병합 (Reset 기능 포함) ```python # State 정의 (커스텀 reducer) multi_answers: Annotated[List[Dict[str, Any]], merge_multi_answers] = [] # merge_multi_answers reducer: def merge_multi_answers(old: List[Dict], new: List[Dict]) -> List[Dict]: if not new: return old # Reset 토큰 체크 if new[0].get("__token__") == "__RESET_MULTI_ANS__": return new[1:] # 이전 턴 누적 방지 return old + new # 기본 병합 # create_plan_node에서 매 실행 시작 시 리셋: updates["multi_answers"] = [{"__token__": "__RESET_MULTI_ANS__"}] # 병렬 실행 시: # [Q1_answer] + [Q2_answer] = [Q1_answer, Q2_answer] ``` ### 3. Fan-in 보장 ```python # 모든 검색 노드가 collect_results로 연결 graph.add_edge("search_stackoverflow", "collect_results") graph.add_edge("search_github", "collect_results") graph.add_edge("search_official_docs", "collect_results") # LangGraph가 자동으로: # 1. 3개 검색 모두 완료 대기 # 2. collect_results 1회만 실행 ``` ## 코드 변경 요약 ### 파일별 변경사항 | 파일 | 추가 | 수정 | 삭제 | |------|------|------|------| | `state.py` | 5 필드, 1 reducer 함수 | - | - | | `nodes.py` | 5 노드 + 1 edge 함수 (~300줄) | 2 노드 (create_plan 하드 가드 추가, generate_answer 5줄) | - | | `graph.py` | 3 routing 함수, 엣지 재구성 | build_agent_graph | - | **총 변경량**: ~350줄 추가, ~100줄 수정 ### 재사용률 - **기존 노드 재사용**: 12/16 (75%) - **기존 로직 재사용**: ~95% (검색, 평가, 필터링, 요약 등) - **새로운 개념**: Send API + Reducer만 ## LangGraph 공식 가이드라인 준수 ### ✅ Graph API - StateGraph 사용 - Pydantic BaseModel state - START/END 명시 ### ✅ Workflows + Agents - Send API로 동적 병렬화 - Conditional edges로 라우팅 - Fan-out/Fan-in 패턴 ### ✅ Thinking in LangGraph - 노드는 순수 함수 (한 가지 일만) - State는 불변 업데이트 - Reducer로 병합 자동화 ## 한계 및 향후 개선 ### 현재 한계 1. **질문 개수 제한**: 최대 2개 - 비용 vs 품질 트레이드오프 - 향후 3-4개로 확장 가능 2. **캐싱 전략**: 통합 답변은 캐시 안 됨 - 각 서브 질문은 개별 캐시됨 - 동일한 다중 질문 재입력 시 개별 캐시 히트 3. **Refinement 루프**: 다중 질문에서도 각각 독립적으로 작동 - 한 질문 refine 시 다른 질문에 영향 없음 ### 향후 개선 방향 1. **더 많은 질문 지원**: 3-4개까지 확장 2. **혼합 질문 감지**: "JWT가 뭐야? 그걸 Spring에 적용하려면?" (순차 의존) 3. **스트리밍 답변**: 각 서브 질문 완료 즉시 스트리밍 4. **우선순위**: 중요도에 따라 질문 순서 조정 ## 참고 자료 - [LangGraph Graph API](https://docs.langchain.com/oss/python/langgraph/graph-api) - [LangGraph Workflows + Agents](https://docs.langchain.com/oss/python/langgraph/workflows-agents) - [LangGraph Thinking Guide](https://docs.langchain.com/oss/python/langgraph/thinking-in-langgraph) - CodeWeaver Phase 3: Open Deep Research ## 문의 구현 관련 질문이나 버그 리포트는 이슈로 등록해주세요.