import asyncio
import json
import os
import re
import time
import uuid
from contextlib import asynccontextmanager
from dataclasses import dataclass
from pathlib import Path
from typing import Any, AsyncGenerator, Dict, List, Optional
from fastapi import Depends, FastAPI, HTTPException, Request, Security
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, StreamingResponse
from fastapi.security import APIKeyHeader
from loguru import logger
try:
import httpx as _httpx
except ImportError:
_httpx = None
from .adapter_registry import AdapterRegistry
from .agent_loop import AgentLoop, AgentTrace
from .agent_manager import AgentManager
from .feature_flags import FeatureFlags
from .runtime_config import RuntimeConfig
from .schemas import (
AgentRunRequest,
AgentRunResponse,
AgentTraceSchema,
GenerateCivilResponseRequest,
GenerateCivilResponseResponse,
GenerateRequest,
GenerateResponse,
ToolResultSchema,
)
from .session_context import SessionContext, SessionStore
from .tool_router import ToolType, tool_name
SKIP_MODEL_LOAD = os.getenv("SKIP_MODEL_LOAD", "false").lower() in ("true", "1", "yes")
async def _noop_tool(query: str, context: dict, session: Any) -> dict:
"""build_all_tools fallback용 no-op tool."""
return {"success": False, "error": "tool이 초기화되지 않았습니다"}
try:
from slowapi import Limiter
from slowapi.middleware import SlowAPIMiddleware
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
_RATE_LIMIT_AVAILABLE = True
except ImportError:
limiter = None
_RATE_LIMIT_AVAILABLE = False
_API_KEY = os.getenv("API_KEY")
_api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
async def verify_api_key(api_key: str = Security(_api_key_header)):
if _API_KEY is None:
return
if api_key != _API_KEY:
raise HTTPException(status_code=401, detail="유효하지 않은 API 키입니다.")
runtime_config = RuntimeConfig.from_env()
runtime_config.log_summary()
MODEL_PATH = runtime_config.model.model_path
DATA_PATH = runtime_config.paths.data_path
INDEX_PATH = runtime_config.paths.index_path
GPU_UTILIZATION = runtime_config.gpu_utilization
MAX_MODEL_LEN = runtime_config.max_model_len
TRUST_REMOTE_CODE = runtime_config.model.trust_remote_code
_PROJECT_ROOT = str(Path(__file__).resolve().parent.parent.parent)
AGENTS_DIR = runtime_config.paths.agents_dir
@dataclass
class SamplingParams:
"""vLLM HTTP API용 샘플링 파라미터. vLLM 직접 import 없이 동작."""
max_tokens: int = 512
temperature: float = 0.7
top_p: float = 1.0
stop: Optional[list] = None
repetition_penalty: float = 1.0
@dataclass
class PreparedGeneration:
prompt: str
sampling_params: SamplingParams
class _VLLMOutputItem:
"""vLLM HTTP 응답의 단일 choice를 기존 인터페이스로 래핑."""
def __init__(self, text: str, finish_reason: str, token_ids: list):
self.text = text
self.finish_reason = finish_reason
self.token_ids = token_ids
class _VLLMHttpResult:
"""vLLM HTTP 응답을 기존 AsyncLLM 결과 인터페이스로 래핑.
기존 코드가 ``output.outputs[0].text``, ``output.prompt_token_ids`` 등에
접근하므로 동일한 속성을 제공한다.
"""
def __init__(self, data: dict):
self._data = data
choices = data.get("choices", [])
usage = data.get("usage", {})
self.outputs = []
for choice in choices:
msg = choice.get("message", {})
text = msg.get("content", "")
finish = choice.get("finish_reason", "stop")
self.outputs.append(
_VLLMOutputItem(
text=text,
finish_reason=finish,
token_ids=list(range(usage.get("completion_tokens", 0))),
)
)
self.prompt_token_ids = list(range(usage.get("prompt_tokens", 0)))
def _extract_approval_request(graph_state: Any) -> Any:
"""LangGraph interrupt state에서 approval payload를 추출한다."""
if not graph_state or not getattr(graph_state, "tasks", None):
return None
task = graph_state.tasks[0]
if not getattr(task, "interrupts", None):
return None
return task.interrupts[0].value
class vLLMEngineManager:
"""GovOn Shell MVP용 로컬 런타임 매니저.
vLLM은 별도 프로세스(entrypoint.sh)에서 OpenAI-compatible 서버로 실행된다.
이 클래스는 httpx로 vLLM HTTP API를 호출한다.
"""
def __init__(self):
self._vllm_base_url = f"http://localhost:{os.getenv('VLLM_PORT', '8000')}"
self._http_client: Optional[Any] = None
self.feature_flags = FeatureFlags.from_env()
self.session_store = SessionStore()
self.agent_manager = AgentManager(AGENTS_DIR)
self.agent_loop: Optional[AgentLoop] = None
self.graph = None # LangGraph CompiledGraph (v2 엔드포인트용)
self._checkpointer_ctx = None # AsyncSqliteSaver 컨텍스트 매니저 (lifespan에서 관리)
self._sync_checkpointer_conn = None # SqliteSaver용 sqlite3 connection (leak 방지)
self._init_agent_loop()
# _init_graph()는 lifespan()에서 호출 — 모듈 로드 시점 실행 방지
async def initialize(self):
if SKIP_MODEL_LOAD:
logger.info("SKIP_MODEL_LOAD=true: 모델 및 인덱스 로딩을 건너뜁니다.")
return
# vLLM 서버는 entrypoint.sh에서 이미 기동됨 — health check만 수행
logger.info(f"vLLM 서버 연결 확인: {self._vllm_base_url}")
if _httpx is None:
raise RuntimeError("httpx가 설치되어 있지 않습니다. pip install httpx")
self._http_client = _httpx.AsyncClient(
base_url=self._vllm_base_url,
timeout=_httpx.Timeout(300.0, connect=30.0),
)
# vLLM 서버 health check (entrypoint.sh에서 이미 확인했지만 이중 검증)
for attempt in range(10):
try:
resp = await self._http_client.get("/health")
if resp.status_code == 200:
logger.info("vLLM 서버 연결 성공")
return
except Exception:
pass
logger.debug(f"vLLM 서버 대기 중... ({attempt + 1}/10)")
await asyncio.sleep(3)
raise RuntimeError(f"vLLM 서버에 연결할 수 없습니다: {self._vllm_base_url}")
def _escape_special_tokens(self, text: str) -> str:
tokens = [
"[|user|]",
"[|assistant|]",
"[|system|]",
"[|endofturn|]",
"",
"",
]
for token in tokens:
text = text.replace(
token,
token.replace("[", "\\[")
.replace("]", "\\]")
.replace("<", "\\<")
.replace(">", "\\>"),
)
return text
@staticmethod
def _strip_thought_blocks(text: str) -> str:
# ... (구형) 및 ... (EXAONE-4.0 추론 모드) 모두 제거
text = re.sub(r".*?\s*", "", text, flags=re.DOTALL)
text = re.sub(r".*?\s*", "", text, flags=re.DOTALL)
return text.strip()
def _build_persona_prompt(self, agent_name: str, user_message: str) -> str:
if self.agent_manager and self.agent_manager.get_agent(agent_name):
return self.agent_manager.build_prompt(agent_name, user_message)
return user_message
def _extract_query(self, prompt: str) -> str:
user_match = re.search(r"\[\|user\|\](.*?)\[\|endofturn\|\]", prompt, re.DOTALL)
if user_match:
user_block = user_match.group(1)
complaint_match = re.search(r"민원\s*내용\s*:\s*(.+)", user_block, re.DOTALL)
if complaint_match:
return complaint_match.group(1).strip()
return user_block.strip()
return prompt
@staticmethod
def _is_evidence_request(query: str) -> bool:
return any(token in query for token in ("근거", "출처", "왜", "이유", "링크"))
@staticmethod
def _is_revision_request(query: str) -> bool:
return any(token in query for token in ("다시", "수정", "고쳐", "정중", "공손", "보강"))
def _latest_prior_turns(
self,
session: SessionContext,
current_query: str,
) -> tuple[Optional[str], Optional[str]]:
turns = list(session.recent_history)
if turns and turns[-1].role == "user" and turns[-1].content == current_query:
turns = turns[:-1]
previous_user = next(
(turn.content for turn in reversed(turns) if turn.role == "user"), None
)
previous_assistant = next(
(turn.content for turn in reversed(turns) if turn.role == "assistant"),
None,
)
return previous_user, previous_assistant
def _build_working_query(self, query: str, session: SessionContext) -> str:
query = query.strip()
if not query:
return query
if not (self._is_evidence_request(query) or self._is_revision_request(query)):
return query
previous_user, previous_assistant = self._latest_prior_turns(session, query)
parts: List[str] = []
if previous_user:
parts.append(f"원래 요청: {previous_user}")
if previous_assistant:
parts.append(f"이전 답변: {previous_assistant[:600]}")
if self._is_revision_request(query):
parts.append(f"수정 요청: {query}")
return "\n\n".join(parts) if parts else query
@staticmethod
def _format_evidence_items(evidence_dict: Dict[str, Any]) -> str:
"""EvidenceEnvelope dict를 소비하여 출처 목록 텍스트를 생성한다.
EvidenceItem이 있으면 source-specific branching 없이 단일 포매터로 처리한다.
"""
items = evidence_dict.get("items", [])
if not items:
return ""
lines: list[str] = []
for idx, item in enumerate(items[:10], start=1):
source_type = item.get("source_type", "")
title = item.get("title", "")
link = item.get("link_or_path", "")
if source_type == "api":
label = title or "외부 API 결과"
if link:
lines.append(f"[{idx}] {label} - {link}")
else:
lines.append(f"[{idx}] {label}")
else:
label = title or "생성 참조"
if link:
lines.append(f"[{idx}] {label} - {link}")
else:
lines.append(f"[{idx}] {label}")
return "\n".join(lines)
def _summarize_evidence(
self,
api_lookup_data: Dict[str, Any],
) -> str:
# EvidenceEnvelope가 있으면 우선 사용
evidence = api_lookup_data.get("evidence")
if isinstance(evidence, dict) and evidence.get("items"):
lines = ["근거 요약"]
api_items = [i for i in evidence["items"] if i.get("source_type") == "api"]
if api_items:
titles = ", ".join(i["title"] for i in api_items[:3] if i.get("title"))
lines.append(
f"- 외부 민원분석 API에서 유사 사례 {len(api_items)}건을 확인했습니다."
+ (f" 대표 사례: {titles}" if titles else "")
)
if len(lines) == 1:
lines.append(
"- 내부 검색 결과를 충분히 확보하지 못해 일반 행정 응대 원칙 기준으로 작성했습니다."
)
return "\n".join(lines)
# Legacy 포매터 (EvidenceItem 없을 때)
lines = ["근거 요약"]
api_results = api_lookup_data.get("results", [])
if api_results:
titles = []
for item in api_results[:3]:
title = item.get("title") or item.get("qnaTitle") or item.get("question")
if title:
titles.append(title)
lines.append(
f"- 외부 민원분석 API에서 유사 사례 {len(api_results)}건을 확인했습니다."
+ (f" 대표 사례: {', '.join(titles)}" if titles else "")
)
if len(lines) == 1:
lines.append(
"- 내부 검색 결과를 충분히 확보하지 못해 일반 행정 응대 원칙 기준으로 작성했습니다."
)
return "\n".join(lines)
@staticmethod
def _api_source_line(index: int, item: Dict[str, Any]) -> str:
title = item.get("title") or item.get("qnaTitle") or item.get("question") or "외부 API 결과"
url = item.get("url") or item.get("detailUrl") or ""
if url:
return f"[{index}] {title} - {url}"
return f"[{index}] {title}"
def _build_evidence_section(
self,
session: SessionContext,
current_query: str,
api_data: Dict[str, Any],
) -> str:
_, previous_answer = self._latest_prior_turns(session, current_query)
lines = ["근거/출처"]
cursor = 1
# EvidenceEnvelope가 있으면 단일 포매터로 우선 처리
api_evidence = api_data.get("evidence")
if api_evidence and isinstance(api_evidence, dict) and api_evidence.get("items"):
for item in api_evidence["items"][:5]:
title = item.get("title", "") or "외부 API 결과"
link = item.get("link_or_path", "")
if link:
lines.append(f"[{cursor}] {title} - {link}")
else:
lines.append(f"[{cursor}] {title}")
cursor += 1
else:
# Legacy API 포매터
api_items = api_data.get("citations") or api_data.get("results") or []
for item in api_items[:5]:
lines.append(self._api_source_line(cursor, item))
cursor += 1
if cursor == 1:
lines.append("- 검색 가능한 근거를 찾지 못했습니다.")
section = "\n".join(lines)
if previous_answer:
return f"{previous_answer}\n\n{section}"
return section
async def _prepare_civil_response_generation(
self,
request: GenerateCivilResponseRequest,
flags: Optional[FeatureFlags] = None,
external_cases: Optional[List[dict]] = None,
) -> PreparedGeneration:
gen_defaults = runtime_config.generation
safe_message = self._escape_special_tokens(self._extract_query(request.prompt))
user_content = f"다음 민원에 대한 답변을 작성해 주세요.\n\n{safe_message}"
prompt = self._build_persona_prompt("draft_response", user_content)
sampling_params = SamplingParams(
temperature=request.temperature,
top_p=request.top_p,
max_tokens=request.max_tokens,
stop=request.stop or gen_defaults.stop_sequences,
repetition_penalty=gen_defaults.repetition_penalty,
)
return PreparedGeneration(
prompt=prompt,
sampling_params=sampling_params,
)
async def _prepare_draft_only(
self,
request: GenerateCivilResponseRequest,
flags: Optional[FeatureFlags] = None,
) -> PreparedGeneration:
"""LoRA 초안 생성용: 쿼리만으로 프롬프트 생성.
사용자 쿼리를 persona 프롬프트로 감싸서 반환한다.
"""
gen_defaults = runtime_config.generation
safe_message = self._escape_special_tokens(self._extract_query(request.prompt))
# 학습 데이터 형식: user = instruction + "\n\n" + input
user_content = f"다음 민원에 대한 답변을 작성해 주세요.\n\n{safe_message}"
prompt = self._build_persona_prompt("draft_response", user_content)
sampling_params = SamplingParams(
temperature=(
request.temperature if request.temperature is not None else gen_defaults.temperature
),
top_p=request.top_p if request.top_p is not None else gen_defaults.top_p,
max_tokens=request.max_tokens or gen_defaults.max_tokens,
stop=request.stop or gen_defaults.stop_sequences,
repetition_penalty=gen_defaults.repetition_penalty,
)
return PreparedGeneration(
prompt=prompt,
sampling_params=sampling_params,
)
async def synthesize_final(
self,
draft_text: str,
evidence_items: list,
query: str,
adapter_name: str = "public_admin",
) -> str:
"""초안 + 도구 결과를 베이스 모델로 통합하여 최종 답변 생성.
LoRA 어댑터는 학습 형식(질문→답변)에 특화되어 있어
초안+근거 통합 같은 범용 태스크에는 베이스 모델이 적합하다.
"""
safe_query = self._escape_special_tokens(query[:400])
safe_draft = self._escape_special_tokens(draft_text[:800])
# 근거 텍스트 조립
evidence_text = ""
for item in evidence_items[:5]:
source_type = item.get("source_type", "")
title = item.get("title", "")
excerpt = item.get("excerpt", "")[:200]
label = "[외부]" if source_type == "api" else "[생성]"
if title or excerpt:
evidence_text += f"- {label} {title}: {excerpt}\n"
if not evidence_text.strip():
evidence_text = "(검색 근거 없음)"
# 베이스 모델 범용 합성 프롬프트
synthesis_prompt = (
"[|system|]당신은 민원 답변을 보강하는 전문가입니다. "
"초안과 참고 근거를 결합하여 정확하고 공감적인 최종 답변을 작성하세요. "
"법적 근거가 있으면 인용하고, 절차와 조치사항을 명확히 포함하세요."
"[|endofturn|]\n"
"[|user|]다음 초안과 근거를 결합하여 최종 민원 답변을 작성하세요.\n\n"
f"[민원 질의]\n{safe_query}\n\n"
f"[초안]\n{safe_draft}\n\n"
f"[참고 근거]\n{evidence_text}"
"[|endofturn|]\n[|assistant|]"
)
# 베이스 모델 사용 (LoRA 없음) — 합성은 범용 태스크
sampling_params = SamplingParams(
max_tokens=768,
temperature=0.6,
top_p=0.9,
stop=["[|endofturn|]"],
)
import uuid as _uuid
request_id = str(_uuid.uuid4())
try:
output = await self._run_engine(
synthesis_prompt, sampling_params, request_id, lora_request=None
)
except Exception as exc:
logger.warning(f"[synthesize_final] 합성 실패: {exc}")
return draft_text
if output is None or not output.outputs:
return draft_text
return self._strip_thought_blocks(output.outputs[0].text)
async def _run_engine(
self,
prompt: str,
sampling_params: SamplingParams,
request_id: str,
lora_request=None,
):
"""vLLM OpenAI-compatible HTTP API를 통해 텍스트를 생성한다."""
if self._http_client is None:
return None
# EXAONE chat template 형식의 prompt를 messages로 변환
messages = self._prompt_to_messages(prompt)
body: Dict[str, Any] = {
"model": MODEL_PATH,
"messages": messages,
"max_tokens": sampling_params.max_tokens,
"temperature": sampling_params.temperature,
"stream": False,
}
if sampling_params.top_p is not None and sampling_params.top_p < 1.0:
body["top_p"] = sampling_params.top_p
if sampling_params.stop:
body["stop"] = list(sampling_params.stop)
if sampling_params.repetition_penalty and sampling_params.repetition_penalty != 1.0:
body["repetition_penalty"] = sampling_params.repetition_penalty
# LoRA 어댑터 지정
if lora_request is not None:
body["model"] = lora_request.lora_name
try:
resp = await self._http_client.post("/v1/chat/completions", json=body)
resp.raise_for_status()
data = resp.json()
except Exception as exc:
logger.error(f"vLLM HTTP 호출 실패: {exc}")
return None
# OpenAI 응답을 기존 인터페이스와 호환되는 객체로 래핑
return _VLLMHttpResult(data)
@staticmethod
def _prompt_to_messages(prompt: str) -> list:
"""EXAONE chat template 형식 프롬프트를 OpenAI messages로 변환."""
messages = []
# [|system|]...[|endofturn|], [|user|]...[|endofturn|], [|assistant|]... 파싱
import re as _re
parts = _re.split(r"\[\\?\|(\w+)\\?\|]", prompt)
role = None
for part in parts:
if part in ("system", "user", "assistant"):
role = part
elif role and part.strip():
content = part.replace("[|endofturn|]", "").strip()
if content:
messages.append({"role": role, "content": content})
role = None
if not messages:
messages = [{"role": "user", "content": prompt}]
return messages
async def generate(
self,
request: GenerateRequest,
request_id: str,
flags: Optional[FeatureFlags] = None,
) -> Any:
return await self.generate_civil_response(request, request_id, flags)
async def generate_civil_response(
self,
request: GenerateCivilResponseRequest,
request_id: str,
flags: Optional[FeatureFlags] = None,
external_cases: Optional[List[dict]] = None,
lora_request=None,
) -> Any:
prepared = await self._prepare_civil_response_generation(request, flags, external_cases)
return await self._run_engine(
prepared.prompt, prepared.sampling_params, request_id, lora_request=lora_request
)
async def generate_stream(
self,
request: GenerateRequest,
request_id: str,
flags: Optional[FeatureFlags] = None,
) -> Any:
prepared = await self._prepare_civil_response_generation(request, flags)
if self._http_client is None:
raise RuntimeError("vLLM 서버에 연결되지 않았습니다.")
messages = self._prompt_to_messages(prepared.prompt)
body = {
"model": MODEL_PATH,
"messages": messages,
"max_tokens": prepared.sampling_params.max_tokens,
"temperature": prepared.sampling_params.temperature,
"stream": True,
}
if prepared.sampling_params.stop:
body["stop"] = list(prepared.sampling_params.stop)
return self._http_client.stream("POST", "/v1/chat/completions", json=body)
def _init_agent_loop(self) -> None:
from src.inference.actions.data_go_kr import MinwonAnalysisAction
engine_ref = self
minwon_action = MinwonAnalysisAction()
async def _api_lookup_tool(query: str, context: dict, session: SessionContext) -> dict:
working_query = query.strip()
payload = await minwon_action.fetch_similar_cases(
working_query,
{
**context,
"session_context": session.build_context_summary(),
},
)
results = payload["results"] or []
return {
"query": payload["query"],
"count": len(results),
"results": results,
"context_text": payload["context_text"],
"citations": [citation.to_dict() for citation in payload["citations"]],
"source": "data.go.kr",
}
async def _draft_response_tool(
query: str,
context: dict,
session: SessionContext,
) -> dict:
working_query = engine_ref._build_working_query(query, session)
# LoRA-First: 쿼리만으로 초안 생성
adapter_name = context.get("adapter") if context else None
if not adapter_name:
adapter_name = "public_admin"
_adapter_reg = AdapterRegistry.get_instance()
lora_req = _adapter_reg.get_lora_request(adapter_name)
gen_request = GenerateCivilResponseRequest(
prompt=working_query,
max_tokens=2048,
temperature=0.7,
)
request_id = str(uuid.uuid4())
prepared = await engine_ref._prepare_draft_only(gen_request)
final_output = await engine_ref._run_engine(
prepared.prompt, prepared.sampling_params, request_id, lora_request=lora_req
)
if final_output is None or not final_output.outputs:
return {
"text": "",
"draft_text": "",
"success": False,
"error": "민원 답변 초안 생성 실패",
"results": [],
"context_text": "",
}
draft_text = engine_ref._strip_thought_blocks(final_output.outputs[0].text)
return {
"text": draft_text,
"draft_text": draft_text,
"success": True,
"results": [],
"context_text": draft_text,
"prompt_tokens": len(final_output.prompt_token_ids),
"completion_tokens": len(final_output.outputs[0].token_ids),
}
tool_registry = {
ToolType.API_LOOKUP: _api_lookup_tool,
"draft_response": _draft_response_tool,
}
self.agent_loop = AgentLoop(tool_registry=tool_registry)
def _build_langgraph_tools(self) -> list:
"""LangGraph ToolNode용 도구 목록을 생성한다.
build_all_tools()를 사용하여 StructuredTool 목록을 반환한다.
AgentLoop의 tool_registry에서 기존 closure를 추출하여 전달한다.
"""
from src.inference.graph.tools import build_all_tools
if self.agent_loop is None:
return build_all_tools(
api_lookup_action=self._get_api_lookup_action(),
)
# AgentLoop의 tool_registry에서 기존 closure를 추출
raw_tools = {
str(k.value if hasattr(k, "value") else k): v for k, v in self.agent_loop._tools.items()
}
return build_all_tools(
api_lookup_action=self._get_api_lookup_action(),
draft_response_fn=raw_tools.get("draft_response"),
)
def _get_api_lookup_action(self) -> Any:
"""AgentLoop에 등록된 api_lookup의 MinwonAnalysisAction을 추출한다."""
if self.agent_loop is None:
return None
tool_fn = self.agent_loop._tools.get(ToolType.API_LOOKUP)
# ApiLookupCapability인 경우 action을 직접 추출
if hasattr(tool_fn, "_action"):
return tool_fn._action
# closure인 경우 action을 추출할 수 없으므로 None 반환
# (MinwonAnalysisAction은 _init_agent_loop에서 새로 생성한다)
try:
from src.inference.actions.data_go_kr import MinwonAnalysisAction
return MinwonAnalysisAction()
except Exception:
return None
def _init_graph_with_async_checkpointer(self, checkpointer: object) -> None:
"""lifespan에서 AsyncSqliteSaver가 준비된 후 graph를 재구성한다."""
self._init_graph(checkpointer=checkpointer)
def _init_graph(self, checkpointer: Optional[object] = None) -> None:
"""LangGraph StateGraph를 초기화한다.
v4 아키텍처: ReAct + ToolNode 기반.
LLM이 자율적으로 도구 호출을 결정하며, 정적 planner/executor를 사용하지 않는다.
Parameters
----------
checkpointer : optional
외부에서 주입할 LangGraph checkpointer.
None이면 SqliteSaver(동기 sqlite3)를 시도하고,
import 실패 시 MemorySaver로 fallback한다.
SqliteSaver DB 경로는 SessionStore DB와 같은 디렉터리에
``langgraph_checkpoints.db``로 생성된다 (관심사 분리).
"""
try:
from src.inference.graph.builder import build_govon_graph
except ImportError as exc:
logger.warning(f"LangGraph graph 초기화 실패 (import 오류): {exc}")
return
tools = self._build_langgraph_tools()
# LLM 인스턴스 구성
if SKIP_MODEL_LOAD:
# CI/테스트 환경: LLM이 없으므로 graph 초기화 스킵
logger.info("SKIP_MODEL_LOAD=true: LangGraph graph 초기화 스킵")
return
elif os.getenv("LANGGRAPH_MODEL_BASE_URL"):
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(
base_url=os.environ["LANGGRAPH_MODEL_BASE_URL"],
api_key=os.getenv("LANGGRAPH_MODEL_API_KEY", "EMPTY"),
model=os.getenv("LANGGRAPH_PLANNER_MODEL", runtime_config.model.model_path),
temperature=0.0,
max_tokens=1024,
)
else:
# 운영 환경: vLLM OpenAI-compatible endpoint 사용
from langchain_openai import ChatOpenAI
vllm_port = os.getenv("VLLM_PORT", "8000")
llm = ChatOpenAI(
base_url=f"http://localhost:{vllm_port}/v1",
api_key="EMPTY",
model=runtime_config.model.model_path,
temperature=0.0,
max_tokens=1024,
)
# checkpointer가 외부에서 주입되지 않으면 SqliteSaver를 시도한다.
if checkpointer is None:
checkpointer, conn = _build_sync_sqlite_checkpointer(self.session_store.db_path)
if self._sync_checkpointer_conn is not None:
try:
self._sync_checkpointer_conn.close()
except Exception:
pass
self._sync_checkpointer_conn = conn
self.graph = build_govon_graph(
llm=llm,
tools=tools,
session_store=self.session_store,
checkpointer=checkpointer,
)
logger.info("LangGraph graph 초기화 완료")
def _build_sync_sqlite_checkpointer(
session_db_path: str,
) -> tuple:
"""SqliteSaver(동기) 또는 MemorySaver(fallback)를 반환한다.
LangGraph checkpointer용 SQLite DB는 SessionStore의 sessions.sqlite3와
같은 디렉터리에 별도 파일 ``langgraph_checkpoints.db``로 생성한다.
두 DB를 분리함으로써 관심사(세션 메타 vs. graph 체크포인트)를 명확히 구분한다.
SqliteSaver는 프로세스 재시작 후에도 interrupt 상태를 SQLite에서 복원하므로
MemorySaver와 달리 재시작-안전(restart-safe)하다.
Parameters
----------
session_db_path : str
SessionStore가 사용 중인 sessions.sqlite3 파일 경로.
이 경로의 부모 디렉터리에 langgraph_checkpoints.db를 생성한다.
Returns
-------
tuple[SqliteSaver | MemorySaver, sqlite3.Connection | None]
(checkpointer, conn) 튜플.
SqliteSaver 사용 시 conn은 열린 sqlite3.Connection이며,
호출자가 적절한 시점에 close해야 한다.
MemorySaver fallback 시 conn은 None이다.
"""
cp_db_path = str(Path(session_db_path).parent / "langgraph_checkpoints.db")
try:
from langgraph.checkpoint.sqlite import SqliteSaver
conn = __import__("sqlite3").connect(cp_db_path, check_same_thread=False)
saver = SqliteSaver(conn)
logger.info(f"LangGraph checkpointer: SqliteSaver ({cp_db_path})")
return saver, conn
except ImportError:
logger.warning(
"langgraph-checkpoint-sqlite 미설치 — MemorySaver로 fallback합니다. "
"프로세스 재시작 시 interrupt 상태가 소멸됩니다."
)
from langgraph.checkpoint.memory import MemorySaver
return MemorySaver(), None
manager = vLLMEngineManager()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""FastAPI lifespan: 모델/인덱스 초기화 및 AsyncSqliteSaver 업그레이드.
startup 단계에서 AsyncSqliteSaver가 사용 가능하면 graph를 재구성한다.
AsyncSqliteSaver는 async 컨텍스트 매니저로 관리하며, shutdown 시 정리한다.
AsyncSqliteSaver import 실패 시 _init_graph에서 이미 설정된
SqliteSaver(또는 MemorySaver fallback)를 그대로 유지한다.
"""
await manager.initialize()
# vLLM 서버 연결 후 graph 초기화 (모듈 로드 시점이 아닌 lifespan에서 실행)
manager._init_graph()
# AsyncSqliteSaver로 graph 재구성 시도 (더 높은 async 성능)
async_cp_db = str(Path(manager.session_store.db_path).parent / "langgraph_checkpoints.db")
try:
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
async with AsyncSqliteSaver.from_conn_string(async_cp_db) as async_saver:
# 동기 SqliteSaver가 보유하던 connection을 닫아 leak을 방지한다.
if manager._sync_checkpointer_conn is not None:
try:
manager._sync_checkpointer_conn.close()
except Exception:
pass
manager._sync_checkpointer_conn = None
manager._checkpointer_ctx = async_saver
manager._init_graph_with_async_checkpointer(async_saver)
logger.info(f"LangGraph checkpointer: AsyncSqliteSaver ({async_cp_db})")
yield
manager._checkpointer_ctx = None
except ImportError:
logger.info("AsyncSqliteSaver 미설치 — SqliteSaver(동기) 또는 MemorySaver로 실행합니다.")
yield
app = FastAPI(
title="GovOn Local Runtime",
description="Local FastAPI daemon for the GovOn Agentic Shell MVP.",
lifespan=lifespan,
)
ALLOWED_ORIGINS = os.getenv("CORS_ORIGINS", "").split(",")
if ALLOWED_ORIGINS and ALLOWED_ORIGINS[0]:
app.add_middleware(
CORSMiddleware,
allow_origins=ALLOWED_ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
if _RATE_LIMIT_AVAILABLE and limiter is not None:
app.state.limiter = limiter
app.add_middleware(SlowAPIMiddleware)
@app.get("/health")
async def health():
return {
"status": "healthy",
"profile": runtime_config.profile.value,
"model": runtime_config.model.model_path,
"agents_loaded": manager.agent_manager.list_agents() if manager.agent_manager else [],
"feature_flags": {
"model_version": manager.feature_flags.model_version,
},
"session_store": {
"driver": "sqlite",
"path": manager.session_store.db_path,
},
}
def _rate_limit(limit_string: str):
if _RATE_LIMIT_AVAILABLE and limiter is not None:
return limiter.limit(limit_string)
def _noop(func):
return func
return _noop
def get_feature_flags(request: Request) -> FeatureFlags:
header = request.headers.get("X-Feature-Flag")
return manager.feature_flags.override_from_header(header)
@app.post("/v1/generate-civil-response", response_model=GenerateCivilResponseResponse)
@_rate_limit("30/minute")
async def generate_civil_response(
request: GenerateCivilResponseRequest,
_: None = Depends(verify_api_key),
flags: FeatureFlags = Depends(get_feature_flags),
):
if request.stream:
raise HTTPException(status_code=400, detail="민원 답변 스트리밍은 /v1/stream을 사용하세요.")
request_id = str(uuid.uuid4())
final_output = await manager.generate_civil_response(
request,
request_id,
flags,
)
if final_output is None:
raise HTTPException(status_code=500, detail="민원 답변 생성에 실패했습니다.")
return GenerateCivilResponseResponse(
request_id=request_id,
complaint_id=request.complaint_id,
text=manager._strip_thought_blocks(final_output.outputs[0].text),
prompt_tokens=len(final_output.prompt_token_ids),
completion_tokens=len(final_output.outputs[0].token_ids),
)
@app.post("/v1/generate", response_model=GenerateResponse)
@_rate_limit("30/minute")
async def generate(
request: GenerateRequest,
_: None = Depends(verify_api_key),
flags: FeatureFlags = Depends(get_feature_flags),
):
if request.stream:
raise HTTPException(status_code=400, detail="Use /v1/stream for streaming.")
request_id = str(uuid.uuid4())
final_output = await manager.generate(request, request_id, flags)
if final_output is None:
raise HTTPException(status_code=500, detail="Generation failed.")
return GenerateResponse(
request_id=request_id,
complaint_id=request.complaint_id,
text=manager._strip_thought_blocks(final_output.outputs[0].text),
prompt_tokens=len(final_output.prompt_token_ids),
completion_tokens=len(final_output.outputs[0].token_ids),
)
@app.post("/v1/chat/completions")
@_rate_limit("30/minute")
async def chat_completions(
request: Request,
_: None = Depends(verify_api_key),
):
"""OpenAI-compatible /v1/chat/completions.
vLLM HTTP API를 경유하여 텍스트를 생성한다.
v2 ReAct graph는 ChatOpenAI가 vLLM OpenAI 서버에 직접 연결하므로
이 엔드포인트는 v1 호환 유지용이다.
"""
try:
body = await request.json()
except Exception:
raise HTTPException(status_code=400, detail="Invalid JSON body.")
messages: list[dict] = body.get("messages", [])
if not messages:
raise HTTPException(status_code=422, detail="messages must not be empty.")
try:
max_tokens = int(body.get("max_tokens", 512))
temperature = float(body.get("temperature", 0.7))
except (ValueError, TypeError):
raise HTTPException(status_code=400, detail="Invalid max_tokens or temperature value.")
if not (1 <= max_tokens <= runtime_config.max_model_len):
raise HTTPException(
status_code=400,
detail=f"max_tokens must be between 1 and {runtime_config.max_model_len}.",
)
if not (0.0 <= temperature <= 2.0):
raise HTTPException(status_code=400, detail="temperature must be between 0.0 and 2.0.")
model: str = body.get("model", runtime_config.model.model_path)
# 메시지 → 프롬프트 변환 (EXAONE chat template 형식)
prompt_parts: list[str] = []
for msg in messages:
role = msg.get("role", "user")
content = msg.get("content", "")
if role == "system":
prompt_parts.append(f"[|system|]{content}[|endofturn|]")
elif role == "user":
prompt_parts.append(f"[|user|]{content}[|endofturn|]")
elif role == "assistant":
prompt_parts.append(f"[|assistant|]{content}[|endofturn|]")
else:
logger.warning(f"chat_completions: 지원하지 않는 role 무시: {role!r}")
prompt_parts.append("[|assistant|]")
prompt = "\n".join(prompt_parts)
if manager._http_client is None:
raise HTTPException(status_code=503, detail="vLLM server not connected.")
request_id = str(uuid.uuid4())
logger.info(
f"chat_completions request_id={request_id} messages={len(messages)} max_tokens={max_tokens}"
)
sampling_params = SamplingParams(
max_tokens=max_tokens,
temperature=temperature,
stop=["[|endofturn|]"],
)
try:
final_output = await manager._run_engine(prompt, sampling_params, request_id)
except Exception as exc:
logger.error(f"chat_completions generation failed: {exc}")
raise HTTPException(status_code=500, detail="Generation failed due to internal error.")
if final_output is None or not final_output.outputs:
raise HTTPException(status_code=500, detail="Generation failed.")
output = final_output.outputs[0]
text = manager._strip_thought_blocks(output.text)
prompt_tokens = len(final_output.prompt_token_ids)
completion_tokens = len(output.token_ids)
vllm_reason = getattr(output, "finish_reason", None)
finish_reason = "length" if vllm_reason == "length" else "stop"
return {
"id": f"chatcmpl-{request_id}",
"object": "chat.completion",
"created": int(time.time()),
"model": model,
"choices": [
{
"index": 0,
"message": {"role": "assistant", "content": text},
"finish_reason": finish_reason,
}
],
"usage": {
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": prompt_tokens + completion_tokens,
},
}
@app.post("/v1/stream")
@_rate_limit("30/minute")
async def stream_generate(
request: GenerateRequest,
_: None = Depends(verify_api_key),
flags: FeatureFlags = Depends(get_feature_flags),
):
if not request.stream:
request.stream = True
request_id = str(uuid.uuid4())
results_stream = await manager.generate_stream(
request,
request_id,
flags,
)
async def stream_results() -> AsyncGenerator[str, None]:
async for request_output in results_stream:
text = request_output.outputs[0].text
finished = request_output.finished
if finished:
text = manager._strip_thought_blocks(text)
response_obj = {"request_id": request_id, "text": text, "finished": finished}
yield f"data: {json.dumps(response_obj, ensure_ascii=False)}\n\n"
return StreamingResponse(stream_results(), media_type="text/event-stream")
def _trace_to_schema(trace: AgentTrace) -> AgentTraceSchema:
return AgentTraceSchema(
request_id=trace.request_id,
session_id=trace.session_id,
plan=trace.plan_tools,
plan_reason=trace.plan_reason,
tool_results=[
ToolResultSchema(
tool=tool_name(result.tool),
success=result.success,
latency_ms=round(result.latency_ms, 2),
data=result.data,
error=result.error,
)
for result in trace.tool_results
],
total_latency_ms=round(trace.total_latency_ms, 2),
error=trace.error,
)
@app.post("/v1/agent/run", response_model=AgentRunResponse)
@_rate_limit("30/minute")
async def agent_run(
request: AgentRunRequest,
_: None = Depends(verify_api_key),
):
if not manager.agent_loop:
raise HTTPException(status_code=503, detail="에이전트 루프가 초기화되지 않았습니다.")
if request.stream:
raise HTTPException(status_code=400, detail="스트리밍은 /v1/agent/stream을 사용하세요.")
session = manager.session_store.get_or_create(session_id=request.session_id)
request_id = str(uuid.uuid4())
trace = await manager.agent_loop.run(
query=request.query,
session=session,
request_id=request_id,
force_tools=request.force_tools,
)
return AgentRunResponse(
request_id=request_id,
session_id=session.session_id,
text=trace.final_text,
trace=_trace_to_schema(trace),
)
@app.post("/v1/agent/stream")
@_rate_limit("30/minute")
async def agent_stream(
request: AgentRunRequest,
_: None = Depends(verify_api_key),
):
if not manager.agent_loop:
raise HTTPException(status_code=503, detail="에이전트 루프가 초기화되지 않았습니다.")
session = manager.session_store.get_or_create(session_id=request.session_id)
request_id = str(uuid.uuid4())
async def stream_events() -> AsyncGenerator[str, None]:
async for event in manager.agent_loop.run_stream(
query=request.query,
session=session,
request_id=request_id,
force_tools=request.force_tools,
):
yield f"data: {json.dumps(event, ensure_ascii=False)}\n\n"
return StreamingResponse(stream_events(), media_type="text/event-stream")
# ---------------------------------------------------------------------------
# v2 엔드포인트: LangGraph 기반 agent 실행 (interrupt/approve 패턴)
# ---------------------------------------------------------------------------
@app.post("/v2/agent/stream")
@_rate_limit("30/minute")
async def v2_agent_stream(
request: AgentRunRequest,
_http_request: Request,
_: None = Depends(verify_api_key),
):
"""LangGraph 기반 agent SSE 스트리밍 실행.
graph.astream()을 사용해 노드별 완료 이벤트를 SSE로 전송한다.
이벤트 형식 (각 줄: ``data: \\n\\n``):
- 노드 진행: ``{"node": "", "status": "completed", ...}``
- approval_wait 도달:
``{"node": "approval_wait", "status": "awaiting_approval",
"approval_request": {...}, "thread_id": "..."}``
- 오류: ``{"node": "error", "status": "error", "error": "..."}``
승인 흐름:
- 클라이언트는 ``awaiting_approval`` 이벤트 수신 후 스트림이 종료됨을 인지하고
``/v2/agent/approve``로 승인/거절을 전달한다.
"""
if not manager.graph:
async def _no_graph():
yield 'data: {"node": "error", "status": "error", "error": "LangGraph graph가 초기화되지 않았습니다."}\n\n'
return StreamingResponse(_no_graph(), media_type="text/event-stream")
from langchain_core.messages import HumanMessage
thread_id = request.session_id or str(uuid.uuid4())
session_id = thread_id
request_id = str(uuid.uuid4())
config = {"configurable": {"thread_id": thread_id}}
initial_state = {
"session_id": session_id,
"request_id": request_id,
"messages": [HumanMessage(content=request.query)],
}
# 기존 interrupt 상태가 남아있으면 거절(cancel)로 해소
try:
from langgraph.types import Command
existing_state = await manager.graph.aget_state(config)
if existing_state and existing_state.next:
await manager.graph.ainvoke(
Command(resume={"approved": False, "cancel": True}),
config,
)
except Exception as clear_exc:
logger.warning(f"[v2] interrupt 상태 확인/해소 실패 (무시): {type(clear_exc).__name__}")
async def _generate() -> AsyncGenerator[str, None]:
try:
async for chunk in manager.graph.astream(initial_state, config, stream_mode="updates"):
# chunk: {node_name: state_delta}
for node_name, state_delta in chunk.items():
event: dict = {
"node": node_name,
"status": "completed",
}
# persist 완료 시 evidence_items를 이벤트에 포함.
# 전제: stream_mode="updates"에서 state_delta는 노드의 raw return dict다.
# evidence_items 스키마: EvidenceItem.to_dict() 필드를 따른다.
# source_type: "api" | "llm_generated"
# title, excerpt, link_or_path, page, score, provider_meta
if node_name == "persist" and isinstance(state_delta, dict):
if state_delta.get("final_text"):
event["final_text"] = state_delta["final_text"]
if state_delta.get("evidence_items"):
event["evidence_items"] = state_delta["evidence_items"]
# approval_wait: 명시적 노드명 또는 LangGraph interrupt() 호출 시
# stream_mode="updates"에서 emit되는 "__interrupt__" 청크 모두 처리
if node_name in ("approval_wait", "__interrupt__"):
try:
graph_state = await manager.graph.aget_state(config)
if graph_state.next:
event = {
"node": "approval_wait",
"status": "awaiting_approval",
"approval_request": _extract_approval_request(graph_state),
"thread_id": thread_id,
"session_id": session_id,
}
except Exception as exc:
logger.warning(f"[v2/agent/stream] aget_state 실패: {exc}")
event["node"] = "approval_wait"
event["status"] = "awaiting_approval"
event["thread_id"] = thread_id
event["session_id"] = session_id
event["approval_request"] = {
"prompt": "승인 정보를 불러올 수 없습니다. /v2/agent/approve로 진행하세요."
}
yield f"data: {json.dumps(event, ensure_ascii=False)}\n\n"
# Stop streaming after awaiting_approval (client must call /v2/agent/approve)
if event.get("status") == "awaiting_approval":
return
except Exception as exc:
logger.error(f"[v2/agent/stream] 스트림 예외: {exc}")
error_event = {"node": "error", "status": "error", "error": str(exc)}
yield f"data: {json.dumps(error_event, ensure_ascii=False)}\n\n"
return StreamingResponse(_generate(), media_type="text/event-stream")
@app.post("/v2/agent/run")
@_rate_limit("30/minute")
async def v2_agent_run(
request: AgentRunRequest,
_http_request: Request,
_: None = Depends(verify_api_key),
):
"""LangGraph 기반 agent 실행 (1단계: interrupt까지).
graph를 실행하여 `approval_wait` 노드에서 interrupt되면
`status: awaiting_approval`과 함께 승인 요청 정보를 반환한다.
클라이언트는 반환된 `thread_id`를 저장해두고
`/v2/agent/approve`로 승인/거절을 전달해야 한다.
Session Resume Contract
-----------------------
동일 session_id로 재요청하는 경우 다음 규칙을 따른다:
1. **interrupt 대기 중**: graph가 approval_wait에서 interrupt 상태이면
현재 checkpoint에서 resume하지 않고 새 메시지를 *추가하여* 이어서 실행한다.
(재요청은 새 graph_run으로 처리한다.)
승인/거절은 반드시 `/v2/agent/approve`를 통해 처리해야 한다.
2. **완료된 graph**: graph가 END에 도달한 상태(state.next == [])이면
동일 thread_id에 새 graph_run을 시작한다. LangGraph checkpointer가
동일 thread_id에서 이전 상태를 누적하므로 대화 히스토리가 보존된다.
3. **프로세스 재시작 후**: SqliteSaver 사용 시 DB에서 checkpoint가 복원되므로
interrupt 상태가 유지된다. 클라이언트는 기존 thread_id로 `/v2/agent/approve`
를 다시 호출하면 중단된 지점에서 resume할 수 있다.
Note: session_id == thread_id. 두 값은 항상 동일하게 유지된다.
"""
if not manager.graph:
raise HTTPException(status_code=503, detail="LangGraph graph가 초기화되지 않았습니다.")
from langchain_core.messages import HumanMessage
thread_id = request.session_id or str(uuid.uuid4())
session_id = thread_id # thread_id를 session_id로 확정 (session_id == thread_id 불변)
request_id = str(uuid.uuid4())
config = {"configurable": {"thread_id": thread_id}}
initial_state = {
"session_id": session_id,
"request_id": request_id,
"messages": [HumanMessage(content=request.query)],
}
# 기존 interrupt 상태가 남아있으면 거절(cancel)로 해소
try:
existing_state = await manager.graph.aget_state(config)
if existing_state and existing_state.next:
from langgraph.types import Command
await manager.graph.ainvoke(
Command(resume={"approved": False, "cancel": True}),
config,
)
except Exception as clear_exc:
logger.warning(f"[v2] interrupt 상태 확인/해소 실패 (무시): {type(clear_exc).__name__}")
try:
await manager.graph.ainvoke(initial_state, config)
# interrupt 상태 확인
graph_state = await manager.graph.aget_state(config)
if graph_state.next:
# interrupt 대기 중: approval_request 정보를 클라이언트에 반환
return {
"status": "awaiting_approval",
"thread_id": thread_id,
"session_id": session_id,
"graph_run_id": request_id,
"approval_request": _extract_approval_request(graph_state),
}
# interrupt 없이 완료된 경우 (rejected 또는 오류)
final_state = graph_state.values
return {
"status": "completed",
"thread_id": thread_id,
"session_id": session_id,
"graph_run_id": request_id,
"text": final_state.get("final_text", ""),
"evidence_items": final_state.get("evidence_items", []),
}
except Exception as exc:
logger.error(f"[v2/agent/run] 예외 발생: {exc}")
# graph_run을 "error" status로 기록 시도
try:
if manager.session_store:
session = manager.session_store.get_or_create(session_id)
session.add_graph_run(
request_id=request_id,
plan_summary=f"[error] {exc}",
approval_status="",
executed_capabilities=[],
status="error",
total_latency_ms=0.0,
)
except Exception as persist_exc:
logger.warning(f"[v2/agent/run] error persist 실패: {persist_exc}")
logger.exception(f"[v2/agent/run] 요청 처리 실패: {exc}")
return JSONResponse(
status_code=500,
content={
"status": "error",
"thread_id": thread_id,
"session_id": session_id,
"graph_run_id": request_id,
"error": "요청 처리 중 내부 오류가 발생했습니다.",
},
)
@app.post("/v2/agent/approve")
@_rate_limit("30/minute")
async def v2_agent_approve(
thread_id: str,
approved: bool,
_http_request: Request,
_: None = Depends(verify_api_key),
):
"""interrupt된 graph를 resume한다 (2단계: 승인/거절).
Parameters
----------
thread_id : str
`/v2/agent/run`에서 반환된 thread_id.
approved : bool
True면 tool_execute로 진행, False면 graph가 END로 종료.
"""
if not manager.graph:
raise HTTPException(status_code=503, detail="LangGraph graph가 초기화되지 않았습니다.")
from langgraph.types import Command
config = {"configurable": {"thread_id": thread_id}}
try:
result = await manager.graph.ainvoke(
Command(resume={"approved": approved}),
config,
)
# 거절이면 "rejected", 승인 완료면 "completed"
approval_status = result.get("approval_status", "")
if not approved:
response_status = "rejected"
else:
response_status = "completed"
return {
"status": response_status,
"thread_id": thread_id,
"session_id": result.get("session_id", ""),
"graph_run_id": result.get("request_id", ""),
"text": result.get("final_text", ""),
"evidence_items": result.get("evidence_items", []),
"approval_status": approval_status,
}
except Exception as exc:
logger.error(f"[v2/agent/approve] 예외 발생: {exc}")
# graph_run을 "error" status로 기록 시도
session_id = ""
request_id = ""
try:
if manager.session_store:
graph_state = await manager.graph.aget_state(config)
state_values = graph_state.values if graph_state else {}
session_id = state_values.get("session_id", "")
request_id = state_values.get("request_id", "")
if session_id:
session = manager.session_store.get_or_create(session_id)
session.add_graph_run(
request_id=request_id,
plan_summary=f"[error] {exc}",
approval_status="",
executed_capabilities=[],
status="error",
total_latency_ms=0.0,
)
except Exception as persist_exc:
logger.warning(f"[v2/agent/approve] error persist 실패: {persist_exc}")
logger.exception(f"[v2/agent/approve] 승인 처리 실패: {exc}")
return JSONResponse(
status_code=500,
content={
"status": "error",
"thread_id": thread_id,
"session_id": session_id,
"graph_run_id": request_id,
"error": "승인 처리 중 내부 오류가 발생했습니다.",
},
)
@app.post("/v2/agent/cancel")
@_rate_limit("30/minute")
async def v2_agent_cancel(
thread_id: str,
_http_request: Request,
_: None = Depends(verify_api_key),
):
"""interrupt 대기 중인 graph를 강제 취소한다.
interrupt 상태에서 거절 처리(Command(resume={"approved": False}))를 수행하되,
state에 interrupt_reason="user_cancel"을 전달하여
persist 노드가 graph_run status를 "interrupted"로 기록하게 한다.
Parameters
----------
thread_id : str
`/v2/agent/run`에서 반환된 thread_id.
"""
if not manager.graph:
raise HTTPException(status_code=503, detail="LangGraph graph가 초기화되지 않았습니다.")
from langgraph.types import Command
config = {"configurable": {"thread_id": thread_id}}
try:
# interrupt 상태 확인
graph_state = await manager.graph.aget_state(config)
if not graph_state or not graph_state.next:
raise HTTPException(
status_code=409,
detail="해당 thread는 현재 interrupt 대기 상태가 아닙니다.",
)
session_id = graph_state.values.get("session_id", "")
# 강제 거절 + interrupt_reason 전달로 resume
result = await manager.graph.ainvoke(
Command(resume={"approved": False, "cancel": True}),
config,
)
# persist 노드에서 "interrupted" 기록을 위해 state update
# (approval_wait_node가 cancel 신호를 interrupt_reason으로 변환)
return {
"status": "cancelled",
"thread_id": thread_id,
"session_id": session_id,
"graph_run_id": result.get("request_id", ""),
}
except HTTPException:
raise
except Exception as exc:
logger.exception(f"[v2/agent/cancel] 취소 처리 실패: {exc}")
return JSONResponse(
status_code=500,
content={
"status": "error",
"thread_id": thread_id,
"error": "취소 처리 중 내부 오류가 발생했습니다.",
},
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, **runtime_config.to_uvicorn_kwargs())