# Dynamic Parallel Search for Multiple Independent Questions
## 개요
CodeWeaver Phase 4는 **다중 독립 질문**을 Send API로 동적 병렬 처리하여, 각 질문마다 독립적인 검색 파이프라인을 실행합니다.
### 핵심 철학
> "기존 그래프를 100% 재사용하되, 질문 개수만큼 복제해서 병렬 실행한다"
- **기존 코드 재사용률**: ~95%
- **새로운 노드**: 5개 추가
- **새로운 edge 함수**: 1개 추가 (fanout_multi_questions)
- **수정된 노드**: 2개 수정 (create_plan, generate_answer)
## 주요 기능
### 1. 자동 질문 유형 감지
**create_plan_node**가 질문을 분석하여 3가지 케이스로 분류:
#### Case 1: single_topic
- **정의**: 하나의 주제를 다각도로 묻는 경우
- **예시**: "Spring Security JWT 인증 구현 방법"
- **서브질문**: ["개념", "구현", "예제"] (답변 섹션 구조용)
- **실행**: 기존 그래프 1회 (검색은 원본 질문으로)
#### Case 2: multiple_questions
- **정의**: 서로 무관한 독립 질문 (최대 2개)
- **예시**: "JWT가 뭐야? CORS는?"
- **서브질문**: ["JWT가 뭐야?", "CORS는?"] (각각 별도 검색)
- **실행**: Send API로 기존 그래프 2회 병렬 실행
#### Case 3: too_many
- **정의**: 질문 3개 이상
- **예시**: "JWT? CORS? Docker?"
- **실행**: 친절한 에러 메시지 표시, 대화 계속 가능
- **하드 가드**: LLM 분류와 무관하게 물음표 개수(3개 이상) 또는 질문 후보 개수(3개 이상)로 결정론적 차단
### 2. 질문 개수 제한
비용 및 품질 관리를 위해 **최대 2개 질문**으로 제한:
```
입력: "JWT? CORS? Docker? Redis?"
처리: too_many 케이스 → 에러 메시지
안내: "하나의 주제로 통합" 또는 "2개만 선택" 권장
```
### 3. Send API 동적 복제
**중요**: LangGraph에서 `List[Send]`는 노드 반환값이 아니라 **conditional edge 함수 반환값**으로만 사용됩니다.
```python
# initiate_dynamic_search_node: state 준비만 (dict 반환)
def initiate_dynamic_search_node(state: AgentState) -> dict:
return {"intermediate_steps": [...]} # Send 반환 안 함!
# fanout_multi_questions: conditional edge 함수 (List[Send] 반환)
def fanout_multi_questions(state: AgentState) -> List[Send]:
sends = []
for i, question in enumerate(["JWT가 뭐야?", "CORS는?"]):
child_state = state.model_copy(deep=True)
child_state.user_question = question
child_state.is_multi_question = True
# ... 메타데이터 설정 ...
sends.append(Send("run_single_question_worker", child_state))
return sends
# run_single_question_worker: 내부 서브그래프 실행
# 각 Send는 독립적으로 내부 그래프를 실행:
# analyze → cache → classify → search(×3) → collect → eval → subgraph → generate
# → multi_answers에 결과 추가
```
### 4. Reducer 자동 Fan-in (Reset 기능 포함)
```python
# State 정의 (커스텀 reducer 사용)
multi_answers: Annotated[List[Dict[str, Any]], merge_multi_answers] = []
# merge_multi_answers reducer:
# - 기본 동작: old + new (병렬 worker에서 답변을 동시에 append)
# - 리셋 동작: new의 첫 원소가 {"__token__": "__RESET_MULTI_ANS__"}이면
# old를 버리고 new[1:]로 교체 (이전 턴 누적 방지)
# run_single_question_worker 1이 리턴:
{"multi_answers": [{"index": 0, "question": "JWT가 뭐야?", "answer": "..."}]}
# run_single_question_worker 2가 리턴:
{"multi_answers": [{"index": 1, "question": "CORS는?", "answer": "..."}]}
# LangGraph Reducer가 자동 병합:
state.multi_answers = [
{"index": 0, ...},
{"index": 1, ...}
]
# combine_answers_node가 이를 통합 Markdown으로 변환
```
## 그래프 흐름
```mermaid
graph TD
START[START] --> plan[create_plan]
plan -->|single_topic| analyze[analyze_question]
plan -->|multiple_questions 2개| dynamic[initiate_dynamic_search]
plan -->|too_many 3+| tooMany[handle_too_many_questions]
tooMany --> END
analyze --> cache[check_cache]
cache -->|hit| returnCache[return_cached_answer]
cache -->|miss| classify[classify_intent]
returnCache --> END
classify --> searchSO[search_stackoverflow]
classify --> searchGH[search_github]
classify --> searchDocs[search_official_docs]
searchSO --> collect[collect_results]
searchGH --> collect
searchDocs --> collect
collect --> eval[evaluate_results]
eval -->|needs_refinement| refine[refine_search]
eval -->|sufficient| filterNode[filter_and_score]
refine --> classify
filterNode --> summarize[summarize_results]
summarize --> generate[generate_answer]
generate -->|is_multi_question| combine[combine_answers]
generate -->|single_topic| END
combine --> END
dynamic --> fanout[fanout_multi_questions
conditional edge]
fanout -.Send Q1.-> worker1[run_single_question_worker
내부 서브그래프]
fanout -.Send Q2.-> worker2[run_single_question_worker
내부 서브그래프]
worker1 --> combine
worker2 --> combine
```
### 흐름 설명
#### Single Topic (기존 동작 유지)
```
START → create_plan (case: single_topic)
→ analyze → cache → classify → search(×3) → collect → eval → subgraph → generate → END
```
#### Multiple Questions (신규)
```
START → create_plan (case: multiple_questions)
→ initiate_dynamic_search (state 준비)
→ fanout_multi_questions (conditional edge)
├─ Send("run_single_question_worker", Q1) → [내부 서브그래프 전체 파이프라인] → multi_answers[0]
└─ Send("run_single_question_worker", Q2) → [내부 서브그래프 전체 파이프라인] → multi_answers[1]
→ combine_answers (자동 fan-in) → END
```
#### Too Many (신규)
```
START → create_plan (case: too_many)
→ handle_too_many_questions → END
(사용자는 즉시 다시 질문 가능)
```
## 구현 상세
### State 확장
```python
# src/agent/state.py
class AgentState(BaseModel):
# ... 기존 필드 ...
# Phase 4: Dynamic Parallel Search
is_multi_question: bool = False
sub_question_index: int = 0
sub_question_text: Optional[str] = None
original_multi_question: Optional[str] = None
multi_answers: Annotated[List[Dict[str, Any]], merge_multi_answers] = []
```
### 새로운 노드 (5개)
#### 1. create_plan_node (수정)
- **위치**: `src/agent/nodes.py` 라인 206
- **역할**: 질문 유형 및 개수 판단
- **변경**:
- `case` 필드 추가 (single_topic/multiple_questions/too_many)
- **하드 가드 추가**: `_hard_guard_too_many` 함수로 3개 이상 질문 결정론적 차단
- 물음표 개수(3개 이상) 또는 질문 후보 개수(3개 이상) 감지
- LLM 분류와 무관하게 `too_many`로 강제
#### 2. handle_too_many_questions_node (신규)
- **위치**: `src/agent/nodes.py` 라인 1068
- **역할**: 3개 이상 질문 시 안내 메시지
- **특징**: 대화 종료하지 않음 (즉시 재질문 가능)
#### 3. initiate_dynamic_search_node (신규)
- **위치**: `src/agent/nodes.py` 라인 1092
- **역할**: 다중 질문 처리 진입점, state 준비
- **핵심**: dict만 반환 (Send는 반환하지 않음)
#### 4. fanout_multi_questions (신규 - Edge 함수)
- **위치**: `src/agent/nodes.py` 라인 1110
- **역할**: conditional edge 함수로 `List[Send]` 반환
- **핵심**: 각 서브 질문을 `run_single_question_worker`로 Send
#### 5. run_single_question_worker_node (신규)
- **위치**: `src/agent/nodes.py` 라인 1306
- **역할**: 내부 서브그래프를 실행하여 state 충돌 방지
- **핵심**:
- 독립된 단일 질문 그래프를 내부에서 실행
- outer graph의 scalar state 채널 충돌 방지
- 결과를 `multi_answers` reducer에만 추가
#### 6. combine_answers_node (신규)
- **위치**: `src/agent/nodes.py` 라인 1168
- **역할**: multi_answers를 통합 Markdown 포맷으로 변환
- **특징**: 자동 fan-in (모든 Send 완료 대기)
### 수정된 노드 (1개)
#### generate_answer_node (5줄 추가)
- **위치**: `src/agent/nodes.py` 라인 726
- **추가 내용**:
```python
# 기존 로직 마지막에 추가
if state.is_multi_question:
updates["multi_answers"] = [{
"index": state.sub_question_index,
"question": state.sub_question_text,
"answer": final_answer
}]
```
### 그래프 재구성
```python
# src/agent/graph.py
# 1. START 진입점 변경
graph.add_edge(START, "create_plan") # 기존: analyze_question
# 2. create_plan 후 분기 추가
graph.add_conditional_edges(
"create_plan",
route_after_plan,
{
"analyze_question": "analyze_question",
"initiate_dynamic_search": "initiate_dynamic_search",
"handle_too_many_questions": "handle_too_many_questions"
}
)
# 3. initiate_dynamic_search 후 fan-out
graph.add_conditional_edges(
"initiate_dynamic_search",
fanout_multi_questions, # List[Send] 반환
)
# 4. run_single_question_worker 후 fan-in
graph.add_edge("run_single_question_worker", "combine_answers")
# 5. generate_answer 후 분기 추가
graph.add_conditional_edges(
"generate_answer",
route_after_generate,
{
"combine_answers": "combine_answers",
END: END
}
)
```
## 사용 예시
### 예시 1: 단일 주제 (기존 동작)
```python
from CodeWeaver.src.agent.graph import create_agent
from langchain_core.messages import HumanMessage
agent = create_agent()
result = await agent.ainvoke({
"user_question": "React hooks 완벽 가이드",
"messages": [HumanMessage(content="React hooks 완벽 가이드")]
})
# 결과
# plan.case: "single_topic"
# plan.sub_questions: ["hooks란", "주요 hooks", "실무 패턴"]
# 흐름: 기존 그래프 1회 실행
# 출력: 일반 답변 형식
```
### 예시 2: 다중 독립 질문 (신규)
```python
result = await agent.ainvoke({
"user_question": "JWT가 뭐야? CORS 에러는 어떻게 해결해?",
"messages": [HumanMessage(content="JWT가 뭐야? CORS 에러는 어떻게 해결해?")]
})
# 결과
# plan.case: "multiple_questions"
# plan.sub_questions: ["JWT가 뭐야?", "CORS 에러는 어떻게 해결해?"]
# 흐름: Send API로 그래프 2회 병렬 실행
# 출력:
```
**출력 예시**:
```markdown
# 다중 질문 답변
원본 질문: JWT가 뭐야? CORS 에러는 어떻게 해결해?
---
## 1. JWT가 뭐야?
JWT(JSON Web Token)는 인증 정보를 안전하게 전송하기 위한...
[상세 답변...]
---
## 2. CORS 에러는 어떻게 해결해?
CORS(Cross-Origin Resource Sharing) 에러는...
[상세 답변...]
```
### 예시 3: 질문 3개 이상
```python
result = await agent.ainvoke({
"user_question": "JWT? CORS? Docker?",
"messages": [HumanMessage(content="JWT? CORS? Docker?")]
})
# 결과
# plan.case: "too_many"
# 출력:
```
**출력 예시**:
```
죄송합니다. 한 번에 최대 2개의 질문까지만 처리할 수 있습니다.
다음 중 하나를 선택해서 다시 질문해 주세요:
1. **하나의 주제로 통합해서 질문**
예: "JWT 인증과 CORS 설정을 함께 구현하는 방법"
2. **가장 중요한 2개 질문만 선택**
예: "JWT가 뭐야? 내 코드에 어떻게 적용해?"
3. **질문을 나눠서 순차적으로 질문**
예: 먼저 "JWT가 뭐야?" 질문 → 답변 확인 → 다음 질문
어떻게 도와드릴까요?
```
## 테스트
테스트 파일은 프로젝트 루트에 있습니다. (삭제됨 - 필요시 재생성)
### 테스트 시나리오
1. ✅ **단일 주제**: "Spring Security JWT 인증 구현 방법"
- 기존 그래프 1회 실행
- multi_answers 비어있음
- 일반 답변 형식
2. ✅ **다중 질문 2개**: "JWT가 뭐야? CORS는?"
- Send API로 그래프 2회 병렬 실행
- multi_answers에 2개 항목
- 섹션 구분된 통합 답변
3. ✅ **질문 3개 이상**: "JWT? CORS? Docker?"
- handle_too_many_questions로 분기
- 친절한 에러 메시지
- 대화 계속 가능
4. ✅ **엣지 케이스**: "JWT? CORS? Docker? Redis?"
- **하드 가드로 무조건 too_many 차단** (물음표 4개 감지)
- LLM 분류와 무관하게 차단 보장
## 성능 고려사항
### 병렬 실행
- **단일 주제**: 3개 검색 노드 병렬 (기존)
- **다중 질문 (2개)**: 2×3=6개 검색 노드 병렬
- LangGraph Send API가 자동 병렬화 관리
### 비용 관리
- 질문 개수 제한: 최대 2개
- 검색 결과 개수: 소스당 3-5개
- 다중 질문 시 의도 분류 생략 (기본값 "learning" 사용)
### 캐싱
- **단일 주제**: 전체 답변 캐시 ✅
- **다중 질문**: 각 서브 질문 답변 개별 캐시 ✅
- Q1 답변 → Q1 질문으로 캐시
- Q2 답변 → Q2 질문으로 캐시
- 다음번 동일 질문 시 개별 캐시 히트 가능
## 기술적 핵심
### 1. Send API 패턴 (Conditional Edge 함수 사용)
```python
# ❌ 잘못된 방법: 노드에서 Send 반환
def initiate_dynamic_search_node(state):
return [Send(...), Send(...)] # 에러 발생!
# ✅ 올바른 방법: conditional edge 함수에서 Send 반환
def fanout_multi_questions(state: AgentState) -> List[Send]:
sends = []
for i, question in enumerate(sub_questions):
child_state = state.model_copy(deep=True)
child_state.user_question = question
sends.append(Send("run_single_question_worker", child_state))
return sends
# 그래프 설정
graph.add_conditional_edges(
"initiate_dynamic_search",
fanout_multi_questions, # List[Send] 반환
)
# LangGraph가 자동으로:
# 1. 두 Send를 병렬 실행
# 2. 각 Send의 모든 노드 실행 대기
# 3. 다음 공통 노드로 이동 (combine_answers)
```
### 2. Reducer 자동 병합 (Reset 기능 포함)
```python
# State 정의 (커스텀 reducer)
multi_answers: Annotated[List[Dict[str, Any]], merge_multi_answers] = []
# merge_multi_answers reducer:
def merge_multi_answers(old: List[Dict], new: List[Dict]) -> List[Dict]:
if not new:
return old
# Reset 토큰 체크
if new[0].get("__token__") == "__RESET_MULTI_ANS__":
return new[1:] # 이전 턴 누적 방지
return old + new # 기본 병합
# create_plan_node에서 매 실행 시작 시 리셋:
updates["multi_answers"] = [{"__token__": "__RESET_MULTI_ANS__"}]
# 병렬 실행 시:
# [Q1_answer] + [Q2_answer] = [Q1_answer, Q2_answer]
```
### 3. Fan-in 보장
```python
# 모든 검색 노드가 collect_results로 연결
graph.add_edge("search_stackoverflow", "collect_results")
graph.add_edge("search_github", "collect_results")
graph.add_edge("search_official_docs", "collect_results")
# LangGraph가 자동으로:
# 1. 3개 검색 모두 완료 대기
# 2. collect_results 1회만 실행
```
## 코드 변경 요약
### 파일별 변경사항
| 파일 | 추가 | 수정 | 삭제 |
|------|------|------|------|
| `state.py` | 5 필드, 1 reducer 함수 | - | - |
| `nodes.py` | 5 노드 + 1 edge 함수 (~300줄) | 2 노드 (create_plan 하드 가드 추가, generate_answer 5줄) | - |
| `graph.py` | 3 routing 함수, 엣지 재구성 | build_agent_graph | - |
**총 변경량**: ~350줄 추가, ~100줄 수정
### 재사용률
- **기존 노드 재사용**: 12/16 (75%)
- **기존 로직 재사용**: ~95% (검색, 평가, 필터링, 요약 등)
- **새로운 개념**: Send API + Reducer만
## LangGraph 공식 가이드라인 준수
### ✅ Graph API
- StateGraph 사용
- Pydantic BaseModel state
- START/END 명시
### ✅ Workflows + Agents
- Send API로 동적 병렬화
- Conditional edges로 라우팅
- Fan-out/Fan-in 패턴
### ✅ Thinking in LangGraph
- 노드는 순수 함수 (한 가지 일만)
- State는 불변 업데이트
- Reducer로 병합 자동화
## 한계 및 향후 개선
### 현재 한계
1. **질문 개수 제한**: 최대 2개
- 비용 vs 품질 트레이드오프
- 향후 3-4개로 확장 가능
2. **캐싱 전략**: 통합 답변은 캐시 안 됨
- 각 서브 질문은 개별 캐시됨
- 동일한 다중 질문 재입력 시 개별 캐시 히트
3. **Refinement 루프**: 다중 질문에서도 각각 독립적으로 작동
- 한 질문 refine 시 다른 질문에 영향 없음
### 향후 개선 방향
1. **더 많은 질문 지원**: 3-4개까지 확장
2. **혼합 질문 감지**: "JWT가 뭐야? 그걸 Spring에 적용하려면?" (순차 의존)
3. **스트리밍 답변**: 각 서브 질문 완료 즉시 스트리밍
4. **우선순위**: 중요도에 따라 질문 순서 조정
## 참고 자료
- [LangGraph Graph API](https://docs.langchain.com/oss/python/langgraph/graph-api)
- [LangGraph Workflows + Agents](https://docs.langchain.com/oss/python/langgraph/workflows-agents)
- [LangGraph Thinking Guide](https://docs.langchain.com/oss/python/langgraph/thinking-in-langgraph)
- CodeWeaver Phase 3: Open Deep Research
## 문의
구현 관련 질문이나 버그 리포트는 이슈로 등록해주세요.