govon-runtime / src /inference /api_server.py
Deploy
fix: CUDA isolation + vllm import ์ œ๊ฑฐ + health check ์ˆ˜์ •
b5e31d0
import asyncio
import json
import os
import re
import time
import uuid
from contextlib import asynccontextmanager
from dataclasses import dataclass
from pathlib import Path
from typing import Any, AsyncGenerator, Dict, List, Optional
from fastapi import Depends, FastAPI, HTTPException, Request, Security
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, StreamingResponse
from fastapi.security import APIKeyHeader
from loguru import logger
try:
import httpx as _httpx
except ImportError:
_httpx = None
from .adapter_registry import AdapterRegistry
from .agent_loop import AgentLoop, AgentTrace
from .agent_manager import AgentManager
from .feature_flags import FeatureFlags
from .runtime_config import RuntimeConfig
from .schemas import (
AgentRunRequest,
AgentRunResponse,
AgentTraceSchema,
GenerateCivilResponseRequest,
GenerateCivilResponseResponse,
GenerateRequest,
GenerateResponse,
ToolResultSchema,
)
from .session_context import SessionContext, SessionStore
from .tool_router import ToolType, tool_name
SKIP_MODEL_LOAD = os.getenv("SKIP_MODEL_LOAD", "false").lower() in ("true", "1", "yes")
async def _noop_tool(query: str, context: dict, session: Any) -> dict:
"""build_all_tools fallback์šฉ no-op tool."""
return {"success": False, "error": "tool์ด ์ดˆ๊ธฐํ™”๋˜์ง€ ์•Š์•˜์Šต๋‹ˆ๋‹ค"}
try:
from slowapi import Limiter
from slowapi.middleware import SlowAPIMiddleware
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
_RATE_LIMIT_AVAILABLE = True
except ImportError:
limiter = None
_RATE_LIMIT_AVAILABLE = False
_API_KEY = os.getenv("API_KEY")
_api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
async def verify_api_key(api_key: str = Security(_api_key_header)):
if _API_KEY is None:
return
if api_key != _API_KEY:
raise HTTPException(status_code=401, detail="์œ ํšจํ•˜์ง€ ์•Š์€ API ํ‚ค์ž…๋‹ˆ๋‹ค.")
runtime_config = RuntimeConfig.from_env()
runtime_config.log_summary()
MODEL_PATH = runtime_config.model.model_path
DATA_PATH = runtime_config.paths.data_path
INDEX_PATH = runtime_config.paths.index_path
GPU_UTILIZATION = runtime_config.gpu_utilization
MAX_MODEL_LEN = runtime_config.max_model_len
TRUST_REMOTE_CODE = runtime_config.model.trust_remote_code
_PROJECT_ROOT = str(Path(__file__).resolve().parent.parent.parent)
AGENTS_DIR = runtime_config.paths.agents_dir
@dataclass
class SamplingParams:
"""vLLM HTTP API์šฉ ์ƒ˜ํ”Œ๋ง ํŒŒ๋ผ๋ฏธํ„ฐ. vLLM ์ง์ ‘ import ์—†์ด ๋™์ž‘."""
max_tokens: int = 512
temperature: float = 0.7
top_p: float = 1.0
stop: Optional[list] = None
repetition_penalty: float = 1.0
@dataclass
class PreparedGeneration:
prompt: str
sampling_params: SamplingParams
class _VLLMOutputItem:
"""vLLM HTTP ์‘๋‹ต์˜ ๋‹จ์ผ choice๋ฅผ ๊ธฐ์กด ์ธํ„ฐํŽ˜์ด์Šค๋กœ ๋ž˜ํ•‘."""
def __init__(self, text: str, finish_reason: str, token_ids: list):
self.text = text
self.finish_reason = finish_reason
self.token_ids = token_ids
class _VLLMHttpResult:
"""vLLM HTTP ์‘๋‹ต์„ ๊ธฐ์กด AsyncLLM ๊ฒฐ๊ณผ ์ธํ„ฐํŽ˜์ด์Šค๋กœ ๋ž˜ํ•‘.
๊ธฐ์กด ์ฝ”๋“œ๊ฐ€ ``output.outputs[0].text``, ``output.prompt_token_ids`` ๋“ฑ์—
์ ‘๊ทผํ•˜๋ฏ€๋กœ ๋™์ผํ•œ ์†์„ฑ์„ ์ œ๊ณตํ•œ๋‹ค.
"""
def __init__(self, data: dict):
self._data = data
choices = data.get("choices", [])
usage = data.get("usage", {})
self.outputs = []
for choice in choices:
msg = choice.get("message", {})
text = msg.get("content", "")
finish = choice.get("finish_reason", "stop")
self.outputs.append(
_VLLMOutputItem(
text=text,
finish_reason=finish,
token_ids=list(range(usage.get("completion_tokens", 0))),
)
)
self.prompt_token_ids = list(range(usage.get("prompt_tokens", 0)))
def _extract_approval_request(graph_state: Any) -> Any:
"""LangGraph interrupt state์—์„œ approval payload๋ฅผ ์ถ”์ถœํ•œ๋‹ค."""
if not graph_state or not getattr(graph_state, "tasks", None):
return None
task = graph_state.tasks[0]
if not getattr(task, "interrupts", None):
return None
return task.interrupts[0].value
class vLLMEngineManager:
"""GovOn Shell MVP์šฉ ๋กœ์ปฌ ๋Ÿฐํƒ€์ž„ ๋งค๋‹ˆ์ €.
vLLM์€ ๋ณ„๋„ ํ”„๋กœ์„ธ์Šค(entrypoint.sh)์—์„œ OpenAI-compatible ์„œ๋ฒ„๋กœ ์‹คํ–‰๋œ๋‹ค.
์ด ํด๋ž˜์Šค๋Š” httpx๋กœ vLLM HTTP API๋ฅผ ํ˜ธ์ถœํ•œ๋‹ค.
"""
def __init__(self):
self._vllm_base_url = f"http://localhost:{os.getenv('VLLM_PORT', '8000')}"
self._http_client: Optional[Any] = None
self.feature_flags = FeatureFlags.from_env()
self.session_store = SessionStore()
self.agent_manager = AgentManager(AGENTS_DIR)
self.agent_loop: Optional[AgentLoop] = None
self.graph = None # LangGraph CompiledGraph (v2 ์—”๋“œํฌ์ธํŠธ์šฉ)
self._checkpointer_ctx = None # AsyncSqliteSaver ์ปจํ…์ŠคํŠธ ๋งค๋‹ˆ์ € (lifespan์—์„œ ๊ด€๋ฆฌ)
self._sync_checkpointer_conn = None # SqliteSaver์šฉ sqlite3 connection (leak ๋ฐฉ์ง€)
self._init_agent_loop()
# _init_graph()๋Š” lifespan()์—์„œ ํ˜ธ์ถœ โ€” ๋ชจ๋“ˆ ๋กœ๋“œ ์‹œ์  ์‹คํ–‰ ๋ฐฉ์ง€
async def initialize(self):
if SKIP_MODEL_LOAD:
logger.info("SKIP_MODEL_LOAD=true: ๋ชจ๋ธ ๋ฐ ์ธ๋ฑ์Šค ๋กœ๋”ฉ์„ ๊ฑด๋„ˆ๋œ๋‹ˆ๋‹ค.")
return
# vLLM ์„œ๋ฒ„๋Š” entrypoint.sh์—์„œ ์ด๋ฏธ ๊ธฐ๋™๋จ โ€” health check๋งŒ ์ˆ˜ํ–‰
logger.info(f"vLLM ์„œ๋ฒ„ ์—ฐ๊ฒฐ ํ™•์ธ: {self._vllm_base_url}")
if _httpx is None:
raise RuntimeError("httpx๊ฐ€ ์„ค์น˜๋˜์–ด ์žˆ์ง€ ์•Š์Šต๋‹ˆ๋‹ค. pip install httpx")
self._http_client = _httpx.AsyncClient(
base_url=self._vllm_base_url,
timeout=_httpx.Timeout(300.0, connect=30.0),
)
# vLLM ์„œ๋ฒ„ health check (entrypoint.sh์—์„œ ์ด๋ฏธ ํ™•์ธํ–ˆ์ง€๋งŒ ์ด์ค‘ ๊ฒ€์ฆ)
for attempt in range(10):
try:
resp = await self._http_client.get("/health")
if resp.status_code == 200:
logger.info("vLLM ์„œ๋ฒ„ ์—ฐ๊ฒฐ ์„ฑ๊ณต")
return
except Exception:
pass
logger.debug(f"vLLM ์„œ๋ฒ„ ๋Œ€๊ธฐ ์ค‘... ({attempt + 1}/10)")
await asyncio.sleep(3)
raise RuntimeError(f"vLLM ์„œ๋ฒ„์— ์—ฐ๊ฒฐํ•  ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค: {self._vllm_base_url}")
def _escape_special_tokens(self, text: str) -> str:
tokens = [
"[|user|]",
"[|assistant|]",
"[|system|]",
"[|endofturn|]",
"<thought>",
"</thought>",
]
for token in tokens:
text = text.replace(
token,
token.replace("[", "\\[")
.replace("]", "\\]")
.replace("<", "\\<")
.replace(">", "\\>"),
)
return text
@staticmethod
def _strip_thought_blocks(text: str) -> str:
# <thought>...</thought> (๊ตฌํ˜•) ๋ฐ <think>...</think> (EXAONE-4.0 ์ถ”๋ก  ๋ชจ๋“œ) ๋ชจ๋‘ ์ œ๊ฑฐ
text = re.sub(r"<thought>.*?</thought>\s*", "", text, flags=re.DOTALL)
text = re.sub(r"<think>.*?</think>\s*", "", text, flags=re.DOTALL)
return text.strip()
def _build_persona_prompt(self, agent_name: str, user_message: str) -> str:
if self.agent_manager and self.agent_manager.get_agent(agent_name):
return self.agent_manager.build_prompt(agent_name, user_message)
return user_message
def _extract_query(self, prompt: str) -> str:
user_match = re.search(r"\[\|user\|\](.*?)\[\|endofturn\|\]", prompt, re.DOTALL)
if user_match:
user_block = user_match.group(1)
complaint_match = re.search(r"๋ฏผ์›\s*๋‚ด์šฉ\s*:\s*(.+)", user_block, re.DOTALL)
if complaint_match:
return complaint_match.group(1).strip()
return user_block.strip()
return prompt
@staticmethod
def _is_evidence_request(query: str) -> bool:
return any(token in query for token in ("๊ทผ๊ฑฐ", "์ถœ์ฒ˜", "์™œ", "์ด์œ ", "๋งํฌ"))
@staticmethod
def _is_revision_request(query: str) -> bool:
return any(token in query for token in ("๋‹ค์‹œ", "์ˆ˜์ •", "๊ณ ์ณ", "์ •์ค‘", "๊ณต์†", "๋ณด๊ฐ•"))
def _latest_prior_turns(
self,
session: SessionContext,
current_query: str,
) -> tuple[Optional[str], Optional[str]]:
turns = list(session.recent_history)
if turns and turns[-1].role == "user" and turns[-1].content == current_query:
turns = turns[:-1]
previous_user = next(
(turn.content for turn in reversed(turns) if turn.role == "user"), None
)
previous_assistant = next(
(turn.content for turn in reversed(turns) if turn.role == "assistant"),
None,
)
return previous_user, previous_assistant
def _build_working_query(self, query: str, session: SessionContext) -> str:
query = query.strip()
if not query:
return query
if not (self._is_evidence_request(query) or self._is_revision_request(query)):
return query
previous_user, previous_assistant = self._latest_prior_turns(session, query)
parts: List[str] = []
if previous_user:
parts.append(f"์›๋ž˜ ์š”์ฒญ: {previous_user}")
if previous_assistant:
parts.append(f"์ด์ „ ๋‹ต๋ณ€: {previous_assistant[:600]}")
if self._is_revision_request(query):
parts.append(f"์ˆ˜์ • ์š”์ฒญ: {query}")
return "\n\n".join(parts) if parts else query
@staticmethod
def _format_evidence_items(evidence_dict: Dict[str, Any]) -> str:
"""EvidenceEnvelope dict๋ฅผ ์†Œ๋น„ํ•˜์—ฌ ์ถœ์ฒ˜ ๋ชฉ๋ก ํ…์ŠคํŠธ๋ฅผ ์ƒ์„ฑํ•œ๋‹ค.
EvidenceItem์ด ์žˆ์œผ๋ฉด source-specific branching ์—†์ด ๋‹จ์ผ ํฌ๋งคํ„ฐ๋กœ ์ฒ˜๋ฆฌํ•œ๋‹ค.
"""
items = evidence_dict.get("items", [])
if not items:
return ""
lines: list[str] = []
for idx, item in enumerate(items[:10], start=1):
source_type = item.get("source_type", "")
title = item.get("title", "")
link = item.get("link_or_path", "")
if source_type == "api":
label = title or "์™ธ๋ถ€ API ๊ฒฐ๊ณผ"
if link:
lines.append(f"[{idx}] {label} - {link}")
else:
lines.append(f"[{idx}] {label}")
else:
label = title or "์ƒ์„ฑ ์ฐธ์กฐ"
if link:
lines.append(f"[{idx}] {label} - {link}")
else:
lines.append(f"[{idx}] {label}")
return "\n".join(lines)
def _summarize_evidence(
self,
api_lookup_data: Dict[str, Any],
) -> str:
# EvidenceEnvelope๊ฐ€ ์žˆ์œผ๋ฉด ์šฐ์„  ์‚ฌ์šฉ
evidence = api_lookup_data.get("evidence")
if isinstance(evidence, dict) and evidence.get("items"):
lines = ["๊ทผ๊ฑฐ ์š”์•ฝ"]
api_items = [i for i in evidence["items"] if i.get("source_type") == "api"]
if api_items:
titles = ", ".join(i["title"] for i in api_items[:3] if i.get("title"))
lines.append(
f"- ์™ธ๋ถ€ ๋ฏผ์›๋ถ„์„ API์—์„œ ์œ ์‚ฌ ์‚ฌ๋ก€ {len(api_items)}๊ฑด์„ ํ™•์ธํ–ˆ์Šต๋‹ˆ๋‹ค."
+ (f" ๋Œ€ํ‘œ ์‚ฌ๋ก€: {titles}" if titles else "")
)
if len(lines) == 1:
lines.append(
"- ๋‚ด๋ถ€ ๊ฒ€์ƒ‰ ๊ฒฐ๊ณผ๋ฅผ ์ถฉ๋ถ„ํžˆ ํ™•๋ณดํ•˜์ง€ ๋ชปํ•ด ์ผ๋ฐ˜ ํ–‰์ • ์‘๋Œ€ ์›์น™ ๊ธฐ์ค€์œผ๋กœ ์ž‘์„ฑํ–ˆ์Šต๋‹ˆ๋‹ค."
)
return "\n".join(lines)
# Legacy ํฌ๋งคํ„ฐ (EvidenceItem ์—†์„ ๋•Œ)
lines = ["๊ทผ๊ฑฐ ์š”์•ฝ"]
api_results = api_lookup_data.get("results", [])
if api_results:
titles = []
for item in api_results[:3]:
title = item.get("title") or item.get("qnaTitle") or item.get("question")
if title:
titles.append(title)
lines.append(
f"- ์™ธ๋ถ€ ๋ฏผ์›๋ถ„์„ API์—์„œ ์œ ์‚ฌ ์‚ฌ๋ก€ {len(api_results)}๊ฑด์„ ํ™•์ธํ–ˆ์Šต๋‹ˆ๋‹ค."
+ (f" ๋Œ€ํ‘œ ์‚ฌ๋ก€: {', '.join(titles)}" if titles else "")
)
if len(lines) == 1:
lines.append(
"- ๋‚ด๋ถ€ ๊ฒ€์ƒ‰ ๊ฒฐ๊ณผ๋ฅผ ์ถฉ๋ถ„ํžˆ ํ™•๋ณดํ•˜์ง€ ๋ชปํ•ด ์ผ๋ฐ˜ ํ–‰์ • ์‘๋Œ€ ์›์น™ ๊ธฐ์ค€์œผ๋กœ ์ž‘์„ฑํ–ˆ์Šต๋‹ˆ๋‹ค."
)
return "\n".join(lines)
@staticmethod
def _api_source_line(index: int, item: Dict[str, Any]) -> str:
title = item.get("title") or item.get("qnaTitle") or item.get("question") or "์™ธ๋ถ€ API ๊ฒฐ๊ณผ"
url = item.get("url") or item.get("detailUrl") or ""
if url:
return f"[{index}] {title} - {url}"
return f"[{index}] {title}"
def _build_evidence_section(
self,
session: SessionContext,
current_query: str,
api_data: Dict[str, Any],
) -> str:
_, previous_answer = self._latest_prior_turns(session, current_query)
lines = ["๊ทผ๊ฑฐ/์ถœ์ฒ˜"]
cursor = 1
# EvidenceEnvelope๊ฐ€ ์žˆ์œผ๋ฉด ๋‹จ์ผ ํฌ๋งคํ„ฐ๋กœ ์šฐ์„  ์ฒ˜๋ฆฌ
api_evidence = api_data.get("evidence")
if api_evidence and isinstance(api_evidence, dict) and api_evidence.get("items"):
for item in api_evidence["items"][:5]:
title = item.get("title", "") or "์™ธ๋ถ€ API ๊ฒฐ๊ณผ"
link = item.get("link_or_path", "")
if link:
lines.append(f"[{cursor}] {title} - {link}")
else:
lines.append(f"[{cursor}] {title}")
cursor += 1
else:
# Legacy API ํฌ๋งคํ„ฐ
api_items = api_data.get("citations") or api_data.get("results") or []
for item in api_items[:5]:
lines.append(self._api_source_line(cursor, item))
cursor += 1
if cursor == 1:
lines.append("- ๊ฒ€์ƒ‰ ๊ฐ€๋Šฅํ•œ ๊ทผ๊ฑฐ๋ฅผ ์ฐพ์ง€ ๋ชปํ–ˆ์Šต๋‹ˆ๋‹ค.")
section = "\n".join(lines)
if previous_answer:
return f"{previous_answer}\n\n{section}"
return section
async def _prepare_civil_response_generation(
self,
request: GenerateCivilResponseRequest,
flags: Optional[FeatureFlags] = None,
external_cases: Optional[List[dict]] = None,
) -> PreparedGeneration:
gen_defaults = runtime_config.generation
safe_message = self._escape_special_tokens(self._extract_query(request.prompt))
user_content = f"๋‹ค์Œ ๋ฏผ์›์— ๋Œ€ํ•œ ๋‹ต๋ณ€์„ ์ž‘์„ฑํ•ด ์ฃผ์„ธ์š”.\n\n{safe_message}"
prompt = self._build_persona_prompt("draft_response", user_content)
sampling_params = SamplingParams(
temperature=request.temperature,
top_p=request.top_p,
max_tokens=request.max_tokens,
stop=request.stop or gen_defaults.stop_sequences,
repetition_penalty=gen_defaults.repetition_penalty,
)
return PreparedGeneration(
prompt=prompt,
sampling_params=sampling_params,
)
async def _prepare_draft_only(
self,
request: GenerateCivilResponseRequest,
flags: Optional[FeatureFlags] = None,
) -> PreparedGeneration:
"""LoRA ์ดˆ์•ˆ ์ƒ์„ฑ์šฉ: ์ฟผ๋ฆฌ๋งŒ์œผ๋กœ ํ”„๋กฌํ”„ํŠธ ์ƒ์„ฑ.
์‚ฌ์šฉ์ž ์ฟผ๋ฆฌ๋ฅผ persona ํ”„๋กฌํ”„ํŠธ๋กœ ๊ฐ์‹ธ์„œ ๋ฐ˜ํ™˜ํ•œ๋‹ค.
"""
gen_defaults = runtime_config.generation
safe_message = self._escape_special_tokens(self._extract_query(request.prompt))
# ํ•™์Šต ๋ฐ์ดํ„ฐ ํ˜•์‹: user = instruction + "\n\n" + input
user_content = f"๋‹ค์Œ ๋ฏผ์›์— ๋Œ€ํ•œ ๋‹ต๋ณ€์„ ์ž‘์„ฑํ•ด ์ฃผ์„ธ์š”.\n\n{safe_message}"
prompt = self._build_persona_prompt("draft_response", user_content)
sampling_params = SamplingParams(
temperature=(
request.temperature if request.temperature is not None else gen_defaults.temperature
),
top_p=request.top_p if request.top_p is not None else gen_defaults.top_p,
max_tokens=request.max_tokens or gen_defaults.max_tokens,
stop=request.stop or gen_defaults.stop_sequences,
repetition_penalty=gen_defaults.repetition_penalty,
)
return PreparedGeneration(
prompt=prompt,
sampling_params=sampling_params,
)
async def synthesize_final(
self,
draft_text: str,
evidence_items: list,
query: str,
adapter_name: str = "public_admin",
) -> str:
"""์ดˆ์•ˆ + ๋„๊ตฌ ๊ฒฐ๊ณผ๋ฅผ ๋ฒ ์ด์Šค ๋ชจ๋ธ๋กœ ํ†ตํ•ฉํ•˜์—ฌ ์ตœ์ข… ๋‹ต๋ณ€ ์ƒ์„ฑ.
LoRA ์–ด๋Œ‘ํ„ฐ๋Š” ํ•™์Šต ํ˜•์‹(์งˆ๋ฌธโ†’๋‹ต๋ณ€)์— ํŠนํ™”๋˜์–ด ์žˆ์–ด
์ดˆ์•ˆ+๊ทผ๊ฑฐ ํ†ตํ•ฉ ๊ฐ™์€ ๋ฒ”์šฉ ํƒœ์Šคํฌ์—๋Š” ๋ฒ ์ด์Šค ๋ชจ๋ธ์ด ์ ํ•ฉํ•˜๋‹ค.
"""
safe_query = self._escape_special_tokens(query[:400])
safe_draft = self._escape_special_tokens(draft_text[:800])
# ๊ทผ๊ฑฐ ํ…์ŠคํŠธ ์กฐ๋ฆฝ
evidence_text = ""
for item in evidence_items[:5]:
source_type = item.get("source_type", "")
title = item.get("title", "")
excerpt = item.get("excerpt", "")[:200]
label = "[์™ธ๋ถ€]" if source_type == "api" else "[์ƒ์„ฑ]"
if title or excerpt:
evidence_text += f"- {label} {title}: {excerpt}\n"
if not evidence_text.strip():
evidence_text = "(๊ฒ€์ƒ‰ ๊ทผ๊ฑฐ ์—†์Œ)"
# ๋ฒ ์ด์Šค ๋ชจ๋ธ ๋ฒ”์šฉ ํ•ฉ์„ฑ ํ”„๋กฌํ”„ํŠธ
synthesis_prompt = (
"[|system|]๋‹น์‹ ์€ ๋ฏผ์› ๋‹ต๋ณ€์„ ๋ณด๊ฐ•ํ•˜๋Š” ์ „๋ฌธ๊ฐ€์ž…๋‹ˆ๋‹ค. "
"์ดˆ์•ˆ๊ณผ ์ฐธ๊ณ  ๊ทผ๊ฑฐ๋ฅผ ๊ฒฐํ•ฉํ•˜์—ฌ ์ •ํ™•ํ•˜๊ณ  ๊ณต๊ฐ์ ์ธ ์ตœ์ข… ๋‹ต๋ณ€์„ ์ž‘์„ฑํ•˜์„ธ์š”. "
"๋ฒ•์  ๊ทผ๊ฑฐ๊ฐ€ ์žˆ์œผ๋ฉด ์ธ์šฉํ•˜๊ณ , ์ ˆ์ฐจ์™€ ์กฐ์น˜์‚ฌํ•ญ์„ ๋ช…ํ™•ํžˆ ํฌํ•จํ•˜์„ธ์š”."
"[|endofturn|]\n"
"[|user|]๋‹ค์Œ ์ดˆ์•ˆ๊ณผ ๊ทผ๊ฑฐ๋ฅผ ๊ฒฐํ•ฉํ•˜์—ฌ ์ตœ์ข… ๋ฏผ์› ๋‹ต๋ณ€์„ ์ž‘์„ฑํ•˜์„ธ์š”.\n\n"
f"[๋ฏผ์› ์งˆ์˜]\n{safe_query}\n\n"
f"[์ดˆ์•ˆ]\n{safe_draft}\n\n"
f"[์ฐธ๊ณ  ๊ทผ๊ฑฐ]\n{evidence_text}"
"[|endofturn|]\n[|assistant|]"
)
# ๋ฒ ์ด์Šค ๋ชจ๋ธ ์‚ฌ์šฉ (LoRA ์—†์Œ) โ€” ํ•ฉ์„ฑ์€ ๋ฒ”์šฉ ํƒœ์Šคํฌ
sampling_params = SamplingParams(
max_tokens=768,
temperature=0.6,
top_p=0.9,
stop=["[|endofturn|]"],
)
import uuid as _uuid
request_id = str(_uuid.uuid4())
try:
output = await self._run_engine(
synthesis_prompt, sampling_params, request_id, lora_request=None
)
except Exception as exc:
logger.warning(f"[synthesize_final] ํ•ฉ์„ฑ ์‹คํŒจ: {exc}")
return draft_text
if output is None or not output.outputs:
return draft_text
return self._strip_thought_blocks(output.outputs[0].text)
async def _run_engine(
self,
prompt: str,
sampling_params: SamplingParams,
request_id: str,
lora_request=None,
):
"""vLLM OpenAI-compatible HTTP API๋ฅผ ํ†ตํ•ด ํ…์ŠคํŠธ๋ฅผ ์ƒ์„ฑํ•œ๋‹ค."""
if self._http_client is None:
return None
# EXAONE chat template ํ˜•์‹์˜ prompt๋ฅผ messages๋กœ ๋ณ€ํ™˜
messages = self._prompt_to_messages(prompt)
body: Dict[str, Any] = {
"model": MODEL_PATH,
"messages": messages,
"max_tokens": sampling_params.max_tokens,
"temperature": sampling_params.temperature,
"stream": False,
}
if sampling_params.top_p is not None and sampling_params.top_p < 1.0:
body["top_p"] = sampling_params.top_p
if sampling_params.stop:
body["stop"] = list(sampling_params.stop)
if sampling_params.repetition_penalty and sampling_params.repetition_penalty != 1.0:
body["repetition_penalty"] = sampling_params.repetition_penalty
# LoRA ์–ด๋Œ‘ํ„ฐ ์ง€์ •
if lora_request is not None:
body["model"] = lora_request.lora_name
try:
resp = await self._http_client.post("/v1/chat/completions", json=body)
resp.raise_for_status()
data = resp.json()
except Exception as exc:
logger.error(f"vLLM HTTP ํ˜ธ์ถœ ์‹คํŒจ: {exc}")
return None
# OpenAI ์‘๋‹ต์„ ๊ธฐ์กด ์ธํ„ฐํŽ˜์ด์Šค์™€ ํ˜ธํ™˜๋˜๋Š” ๊ฐ์ฒด๋กœ ๋ž˜ํ•‘
return _VLLMHttpResult(data)
@staticmethod
def _prompt_to_messages(prompt: str) -> list:
"""EXAONE chat template ํ˜•์‹ ํ”„๋กฌํ”„ํŠธ๋ฅผ OpenAI messages๋กœ ๋ณ€ํ™˜."""
messages = []
# [|system|]...[|endofturn|], [|user|]...[|endofturn|], [|assistant|]... ํŒŒ์‹ฑ
import re as _re
parts = _re.split(r"\[\\?\|(\w+)\\?\|]", prompt)
role = None
for part in parts:
if part in ("system", "user", "assistant"):
role = part
elif role and part.strip():
content = part.replace("[|endofturn|]", "").strip()
if content:
messages.append({"role": role, "content": content})
role = None
if not messages:
messages = [{"role": "user", "content": prompt}]
return messages
async def generate(
self,
request: GenerateRequest,
request_id: str,
flags: Optional[FeatureFlags] = None,
) -> Any:
return await self.generate_civil_response(request, request_id, flags)
async def generate_civil_response(
self,
request: GenerateCivilResponseRequest,
request_id: str,
flags: Optional[FeatureFlags] = None,
external_cases: Optional[List[dict]] = None,
lora_request=None,
) -> Any:
prepared = await self._prepare_civil_response_generation(request, flags, external_cases)
return await self._run_engine(
prepared.prompt, prepared.sampling_params, request_id, lora_request=lora_request
)
async def generate_stream(
self,
request: GenerateRequest,
request_id: str,
flags: Optional[FeatureFlags] = None,
) -> Any:
prepared = await self._prepare_civil_response_generation(request, flags)
if self._http_client is None:
raise RuntimeError("vLLM ์„œ๋ฒ„์— ์—ฐ๊ฒฐ๋˜์ง€ ์•Š์•˜์Šต๋‹ˆ๋‹ค.")
messages = self._prompt_to_messages(prepared.prompt)
body = {
"model": MODEL_PATH,
"messages": messages,
"max_tokens": prepared.sampling_params.max_tokens,
"temperature": prepared.sampling_params.temperature,
"stream": True,
}
if prepared.sampling_params.stop:
body["stop"] = list(prepared.sampling_params.stop)
return self._http_client.stream("POST", "/v1/chat/completions", json=body)
def _init_agent_loop(self) -> None:
from src.inference.actions.data_go_kr import MinwonAnalysisAction
engine_ref = self
minwon_action = MinwonAnalysisAction()
async def _api_lookup_tool(query: str, context: dict, session: SessionContext) -> dict:
working_query = query.strip()
payload = await minwon_action.fetch_similar_cases(
working_query,
{
**context,
"session_context": session.build_context_summary(),
},
)
results = payload["results"] or []
return {
"query": payload["query"],
"count": len(results),
"results": results,
"context_text": payload["context_text"],
"citations": [citation.to_dict() for citation in payload["citations"]],
"source": "data.go.kr",
}
async def _draft_response_tool(
query: str,
context: dict,
session: SessionContext,
) -> dict:
working_query = engine_ref._build_working_query(query, session)
# LoRA-First: ์ฟผ๋ฆฌ๋งŒ์œผ๋กœ ์ดˆ์•ˆ ์ƒ์„ฑ
adapter_name = context.get("adapter") if context else None
if not adapter_name:
adapter_name = "public_admin"
_adapter_reg = AdapterRegistry.get_instance()
lora_req = _adapter_reg.get_lora_request(adapter_name)
gen_request = GenerateCivilResponseRequest(
prompt=working_query,
max_tokens=2048,
temperature=0.7,
)
request_id = str(uuid.uuid4())
prepared = await engine_ref._prepare_draft_only(gen_request)
final_output = await engine_ref._run_engine(
prepared.prompt, prepared.sampling_params, request_id, lora_request=lora_req
)
if final_output is None or not final_output.outputs:
return {
"text": "",
"draft_text": "",
"success": False,
"error": "๋ฏผ์› ๋‹ต๋ณ€ ์ดˆ์•ˆ ์ƒ์„ฑ ์‹คํŒจ",
"results": [],
"context_text": "",
}
draft_text = engine_ref._strip_thought_blocks(final_output.outputs[0].text)
return {
"text": draft_text,
"draft_text": draft_text,
"success": True,
"results": [],
"context_text": draft_text,
"prompt_tokens": len(final_output.prompt_token_ids),
"completion_tokens": len(final_output.outputs[0].token_ids),
}
tool_registry = {
ToolType.API_LOOKUP: _api_lookup_tool,
"draft_response": _draft_response_tool,
}
self.agent_loop = AgentLoop(tool_registry=tool_registry)
def _build_langgraph_tools(self) -> list:
"""LangGraph ToolNode์šฉ ๋„๊ตฌ ๋ชฉ๋ก์„ ์ƒ์„ฑํ•œ๋‹ค.
build_all_tools()๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ StructuredTool ๋ชฉ๋ก์„ ๋ฐ˜ํ™˜ํ•œ๋‹ค.
AgentLoop์˜ tool_registry์—์„œ ๊ธฐ์กด closure๋ฅผ ์ถ”์ถœํ•˜์—ฌ ์ „๋‹ฌํ•œ๋‹ค.
"""
from src.inference.graph.tools import build_all_tools
if self.agent_loop is None:
return build_all_tools(
api_lookup_action=self._get_api_lookup_action(),
)
# AgentLoop์˜ tool_registry์—์„œ ๊ธฐ์กด closure๋ฅผ ์ถ”์ถœ
raw_tools = {
str(k.value if hasattr(k, "value") else k): v for k, v in self.agent_loop._tools.items()
}
return build_all_tools(
api_lookup_action=self._get_api_lookup_action(),
draft_response_fn=raw_tools.get("draft_response"),
)
def _get_api_lookup_action(self) -> Any:
"""AgentLoop์— ๋“ฑ๋ก๋œ api_lookup์˜ MinwonAnalysisAction์„ ์ถ”์ถœํ•œ๋‹ค."""
if self.agent_loop is None:
return None
tool_fn = self.agent_loop._tools.get(ToolType.API_LOOKUP)
# ApiLookupCapability์ธ ๊ฒฝ์šฐ action์„ ์ง์ ‘ ์ถ”์ถœ
if hasattr(tool_fn, "_action"):
return tool_fn._action
# closure์ธ ๊ฒฝ์šฐ action์„ ์ถ”์ถœํ•  ์ˆ˜ ์—†์œผ๋ฏ€๋กœ None ๋ฐ˜ํ™˜
# (MinwonAnalysisAction์€ _init_agent_loop์—์„œ ์ƒˆ๋กœ ์ƒ์„ฑํ•œ๋‹ค)
try:
from src.inference.actions.data_go_kr import MinwonAnalysisAction
return MinwonAnalysisAction()
except Exception:
return None
def _init_graph_with_async_checkpointer(self, checkpointer: object) -> None:
"""lifespan์—์„œ AsyncSqliteSaver๊ฐ€ ์ค€๋น„๋œ ํ›„ graph๋ฅผ ์žฌ๊ตฌ์„ฑํ•œ๋‹ค."""
self._init_graph(checkpointer=checkpointer)
def _init_graph(self, checkpointer: Optional[object] = None) -> None:
"""LangGraph StateGraph๋ฅผ ์ดˆ๊ธฐํ™”ํ•œ๋‹ค.
v4 ์•„ํ‚คํ…์ฒ˜: ReAct + ToolNode ๊ธฐ๋ฐ˜.
LLM์ด ์ž์œจ์ ์œผ๋กœ ๋„๊ตฌ ํ˜ธ์ถœ์„ ๊ฒฐ์ •ํ•˜๋ฉฐ, ์ •์  planner/executor๋ฅผ ์‚ฌ์šฉํ•˜์ง€ ์•Š๋Š”๋‹ค.
Parameters
----------
checkpointer : optional
์™ธ๋ถ€์—์„œ ์ฃผ์ž…ํ•  LangGraph checkpointer.
None์ด๋ฉด SqliteSaver(๋™๊ธฐ sqlite3)๋ฅผ ์‹œ๋„ํ•˜๊ณ ,
import ์‹คํŒจ ์‹œ MemorySaver๋กœ fallbackํ•œ๋‹ค.
SqliteSaver DB ๊ฒฝ๋กœ๋Š” SessionStore DB์™€ ๊ฐ™์€ ๋””๋ ‰ํ„ฐ๋ฆฌ์—
``langgraph_checkpoints.db``๋กœ ์ƒ์„ฑ๋œ๋‹ค (๊ด€์‹ฌ์‚ฌ ๋ถ„๋ฆฌ).
"""
try:
from src.inference.graph.builder import build_govon_graph
except ImportError as exc:
logger.warning(f"LangGraph graph ์ดˆ๊ธฐํ™” ์‹คํŒจ (import ์˜ค๋ฅ˜): {exc}")
return
tools = self._build_langgraph_tools()
# LLM ์ธ์Šคํ„ด์Šค ๊ตฌ์„ฑ
if SKIP_MODEL_LOAD:
# CI/ํ…Œ์ŠคํŠธ ํ™˜๊ฒฝ: LLM์ด ์—†์œผ๋ฏ€๋กœ graph ์ดˆ๊ธฐํ™” ์Šคํ‚ต
logger.info("SKIP_MODEL_LOAD=true: LangGraph graph ์ดˆ๊ธฐํ™” ์Šคํ‚ต")
return
elif os.getenv("LANGGRAPH_MODEL_BASE_URL"):
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(
base_url=os.environ["LANGGRAPH_MODEL_BASE_URL"],
api_key=os.getenv("LANGGRAPH_MODEL_API_KEY", "EMPTY"),
model=os.getenv("LANGGRAPH_PLANNER_MODEL", runtime_config.model.model_path),
temperature=0.0,
max_tokens=1024,
)
else:
# ์šด์˜ ํ™˜๊ฒฝ: vLLM OpenAI-compatible endpoint ์‚ฌ์šฉ
from langchain_openai import ChatOpenAI
vllm_port = os.getenv("VLLM_PORT", "8000")
llm = ChatOpenAI(
base_url=f"http://localhost:{vllm_port}/v1",
api_key="EMPTY",
model=runtime_config.model.model_path,
temperature=0.0,
max_tokens=1024,
)
# checkpointer๊ฐ€ ์™ธ๋ถ€์—์„œ ์ฃผ์ž…๋˜์ง€ ์•Š์œผ๋ฉด SqliteSaver๋ฅผ ์‹œ๋„ํ•œ๋‹ค.
if checkpointer is None:
checkpointer, conn = _build_sync_sqlite_checkpointer(self.session_store.db_path)
if self._sync_checkpointer_conn is not None:
try:
self._sync_checkpointer_conn.close()
except Exception:
pass
self._sync_checkpointer_conn = conn
self.graph = build_govon_graph(
llm=llm,
tools=tools,
session_store=self.session_store,
checkpointer=checkpointer,
)
logger.info("LangGraph graph ์ดˆ๊ธฐํ™” ์™„๋ฃŒ")
def _build_sync_sqlite_checkpointer(
session_db_path: str,
) -> tuple:
"""SqliteSaver(๋™๊ธฐ) ๋˜๋Š” MemorySaver(fallback)๋ฅผ ๋ฐ˜ํ™˜ํ•œ๋‹ค.
LangGraph checkpointer์šฉ SQLite DB๋Š” SessionStore์˜ sessions.sqlite3์™€
๊ฐ™์€ ๋””๋ ‰ํ„ฐ๋ฆฌ์— ๋ณ„๋„ ํŒŒ์ผ ``langgraph_checkpoints.db``๋กœ ์ƒ์„ฑํ•œ๋‹ค.
๋‘ DB๋ฅผ ๋ถ„๋ฆฌํ•จ์œผ๋กœ์จ ๊ด€์‹ฌ์‚ฌ(์„ธ์…˜ ๋ฉ”ํƒ€ vs. graph ์ฒดํฌํฌ์ธํŠธ)๋ฅผ ๋ช…ํ™•ํžˆ ๊ตฌ๋ถ„ํ•œ๋‹ค.
SqliteSaver๋Š” ํ”„๋กœ์„ธ์Šค ์žฌ์‹œ์ž‘ ํ›„์—๋„ interrupt ์ƒํƒœ๋ฅผ SQLite์—์„œ ๋ณต์›ํ•˜๋ฏ€๋กœ
MemorySaver์™€ ๋‹ฌ๋ฆฌ ์žฌ์‹œ์ž‘-์•ˆ์ „(restart-safe)ํ•˜๋‹ค.
Parameters
----------
session_db_path : str
SessionStore๊ฐ€ ์‚ฌ์šฉ ์ค‘์ธ sessions.sqlite3 ํŒŒ์ผ ๊ฒฝ๋กœ.
์ด ๊ฒฝ๋กœ์˜ ๋ถ€๋ชจ ๋””๋ ‰ํ„ฐ๋ฆฌ์— langgraph_checkpoints.db๋ฅผ ์ƒ์„ฑํ•œ๋‹ค.
Returns
-------
tuple[SqliteSaver | MemorySaver, sqlite3.Connection | None]
(checkpointer, conn) ํŠœํ”Œ.
SqliteSaver ์‚ฌ์šฉ ์‹œ conn์€ ์—ด๋ฆฐ sqlite3.Connection์ด๋ฉฐ,
ํ˜ธ์ถœ์ž๊ฐ€ ์ ์ ˆํ•œ ์‹œ์ ์— closeํ•ด์•ผ ํ•œ๋‹ค.
MemorySaver fallback ์‹œ conn์€ None์ด๋‹ค.
"""
cp_db_path = str(Path(session_db_path).parent / "langgraph_checkpoints.db")
try:
from langgraph.checkpoint.sqlite import SqliteSaver
conn = __import__("sqlite3").connect(cp_db_path, check_same_thread=False)
saver = SqliteSaver(conn)
logger.info(f"LangGraph checkpointer: SqliteSaver ({cp_db_path})")
return saver, conn
except ImportError:
logger.warning(
"langgraph-checkpoint-sqlite ๋ฏธ์„ค์น˜ โ€” MemorySaver๋กœ fallbackํ•ฉ๋‹ˆ๋‹ค. "
"ํ”„๋กœ์„ธ์Šค ์žฌ์‹œ์ž‘ ์‹œ interrupt ์ƒํƒœ๊ฐ€ ์†Œ๋ฉธ๋ฉ๋‹ˆ๋‹ค."
)
from langgraph.checkpoint.memory import MemorySaver
return MemorySaver(), None
manager = vLLMEngineManager()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""FastAPI lifespan: ๋ชจ๋ธ/์ธ๋ฑ์Šค ์ดˆ๊ธฐํ™” ๋ฐ AsyncSqliteSaver ์—…๊ทธ๋ ˆ์ด๋“œ.
startup ๋‹จ๊ณ„์—์„œ AsyncSqliteSaver๊ฐ€ ์‚ฌ์šฉ ๊ฐ€๋Šฅํ•˜๋ฉด graph๋ฅผ ์žฌ๊ตฌ์„ฑํ•œ๋‹ค.
AsyncSqliteSaver๋Š” async ์ปจํ…์ŠคํŠธ ๋งค๋‹ˆ์ €๋กœ ๊ด€๋ฆฌํ•˜๋ฉฐ, shutdown ์‹œ ์ •๋ฆฌํ•œ๋‹ค.
AsyncSqliteSaver import ์‹คํŒจ ์‹œ _init_graph์—์„œ ์ด๋ฏธ ์„ค์ •๋œ
SqliteSaver(๋˜๋Š” MemorySaver fallback)๋ฅผ ๊ทธ๋Œ€๋กœ ์œ ์ง€ํ•œ๋‹ค.
"""
await manager.initialize()
# vLLM ์„œ๋ฒ„ ์—ฐ๊ฒฐ ํ›„ graph ์ดˆ๊ธฐํ™” (๋ชจ๋“ˆ ๋กœ๋“œ ์‹œ์ ์ด ์•„๋‹Œ lifespan์—์„œ ์‹คํ–‰)
manager._init_graph()
# AsyncSqliteSaver๋กœ graph ์žฌ๊ตฌ์„ฑ ์‹œ๋„ (๋” ๋†’์€ async ์„ฑ๋Šฅ)
async_cp_db = str(Path(manager.session_store.db_path).parent / "langgraph_checkpoints.db")
try:
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
async with AsyncSqliteSaver.from_conn_string(async_cp_db) as async_saver:
# ๋™๊ธฐ SqliteSaver๊ฐ€ ๋ณด์œ ํ•˜๋˜ connection์„ ๋‹ซ์•„ leak์„ ๋ฐฉ์ง€ํ•œ๋‹ค.
if manager._sync_checkpointer_conn is not None:
try:
manager._sync_checkpointer_conn.close()
except Exception:
pass
manager._sync_checkpointer_conn = None
manager._checkpointer_ctx = async_saver
manager._init_graph_with_async_checkpointer(async_saver)
logger.info(f"LangGraph checkpointer: AsyncSqliteSaver ({async_cp_db})")
yield
manager._checkpointer_ctx = None
except ImportError:
logger.info("AsyncSqliteSaver ๋ฏธ์„ค์น˜ โ€” SqliteSaver(๋™๊ธฐ) ๋˜๋Š” MemorySaver๋กœ ์‹คํ–‰ํ•ฉ๋‹ˆ๋‹ค.")
yield
app = FastAPI(
title="GovOn Local Runtime",
description="Local FastAPI daemon for the GovOn Agentic Shell MVP.",
lifespan=lifespan,
)
ALLOWED_ORIGINS = os.getenv("CORS_ORIGINS", "").split(",")
if ALLOWED_ORIGINS and ALLOWED_ORIGINS[0]:
app.add_middleware(
CORSMiddleware,
allow_origins=ALLOWED_ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
if _RATE_LIMIT_AVAILABLE and limiter is not None:
app.state.limiter = limiter
app.add_middleware(SlowAPIMiddleware)
@app.get("/health")
async def health():
return {
"status": "healthy",
"profile": runtime_config.profile.value,
"model": runtime_config.model.model_path,
"agents_loaded": manager.agent_manager.list_agents() if manager.agent_manager else [],
"feature_flags": {
"model_version": manager.feature_flags.model_version,
},
"session_store": {
"driver": "sqlite",
"path": manager.session_store.db_path,
},
}
def _rate_limit(limit_string: str):
if _RATE_LIMIT_AVAILABLE and limiter is not None:
return limiter.limit(limit_string)
def _noop(func):
return func
return _noop
def get_feature_flags(request: Request) -> FeatureFlags:
header = request.headers.get("X-Feature-Flag")
return manager.feature_flags.override_from_header(header)
@app.post("/v1/generate-civil-response", response_model=GenerateCivilResponseResponse)
@_rate_limit("30/minute")
async def generate_civil_response(
request: GenerateCivilResponseRequest,
_: None = Depends(verify_api_key),
flags: FeatureFlags = Depends(get_feature_flags),
):
if request.stream:
raise HTTPException(status_code=400, detail="๋ฏผ์› ๋‹ต๋ณ€ ์ŠคํŠธ๋ฆฌ๋ฐ์€ /v1/stream์„ ์‚ฌ์šฉํ•˜์„ธ์š”.")
request_id = str(uuid.uuid4())
final_output = await manager.generate_civil_response(
request,
request_id,
flags,
)
if final_output is None:
raise HTTPException(status_code=500, detail="๋ฏผ์› ๋‹ต๋ณ€ ์ƒ์„ฑ์— ์‹คํŒจํ–ˆ์Šต๋‹ˆ๋‹ค.")
return GenerateCivilResponseResponse(
request_id=request_id,
complaint_id=request.complaint_id,
text=manager._strip_thought_blocks(final_output.outputs[0].text),
prompt_tokens=len(final_output.prompt_token_ids),
completion_tokens=len(final_output.outputs[0].token_ids),
)
@app.post("/v1/generate", response_model=GenerateResponse)
@_rate_limit("30/minute")
async def generate(
request: GenerateRequest,
_: None = Depends(verify_api_key),
flags: FeatureFlags = Depends(get_feature_flags),
):
if request.stream:
raise HTTPException(status_code=400, detail="Use /v1/stream for streaming.")
request_id = str(uuid.uuid4())
final_output = await manager.generate(request, request_id, flags)
if final_output is None:
raise HTTPException(status_code=500, detail="Generation failed.")
return GenerateResponse(
request_id=request_id,
complaint_id=request.complaint_id,
text=manager._strip_thought_blocks(final_output.outputs[0].text),
prompt_tokens=len(final_output.prompt_token_ids),
completion_tokens=len(final_output.outputs[0].token_ids),
)
@app.post("/v1/chat/completions")
@_rate_limit("30/minute")
async def chat_completions(
request: Request,
_: None = Depends(verify_api_key),
):
"""OpenAI-compatible /v1/chat/completions.
vLLM HTTP API๋ฅผ ๊ฒฝ์œ ํ•˜์—ฌ ํ…์ŠคํŠธ๋ฅผ ์ƒ์„ฑํ•œ๋‹ค.
v2 ReAct graph๋Š” ChatOpenAI๊ฐ€ vLLM OpenAI ์„œ๋ฒ„์— ์ง์ ‘ ์—ฐ๊ฒฐํ•˜๋ฏ€๋กœ
์ด ์—”๋“œํฌ์ธํŠธ๋Š” v1 ํ˜ธํ™˜ ์œ ์ง€์šฉ์ด๋‹ค.
"""
try:
body = await request.json()
except Exception:
raise HTTPException(status_code=400, detail="Invalid JSON body.")
messages: list[dict] = body.get("messages", [])
if not messages:
raise HTTPException(status_code=422, detail="messages must not be empty.")
try:
max_tokens = int(body.get("max_tokens", 512))
temperature = float(body.get("temperature", 0.7))
except (ValueError, TypeError):
raise HTTPException(status_code=400, detail="Invalid max_tokens or temperature value.")
if not (1 <= max_tokens <= runtime_config.max_model_len):
raise HTTPException(
status_code=400,
detail=f"max_tokens must be between 1 and {runtime_config.max_model_len}.",
)
if not (0.0 <= temperature <= 2.0):
raise HTTPException(status_code=400, detail="temperature must be between 0.0 and 2.0.")
model: str = body.get("model", runtime_config.model.model_path)
# ๋ฉ”์‹œ์ง€ โ†’ ํ”„๋กฌํ”„ํŠธ ๋ณ€ํ™˜ (EXAONE chat template ํ˜•์‹)
prompt_parts: list[str] = []
for msg in messages:
role = msg.get("role", "user")
content = msg.get("content", "")
if role == "system":
prompt_parts.append(f"[|system|]{content}[|endofturn|]")
elif role == "user":
prompt_parts.append(f"[|user|]{content}[|endofturn|]")
elif role == "assistant":
prompt_parts.append(f"[|assistant|]{content}[|endofturn|]")
else:
logger.warning(f"chat_completions: ์ง€์›ํ•˜์ง€ ์•Š๋Š” role ๋ฌด์‹œ: {role!r}")
prompt_parts.append("[|assistant|]")
prompt = "\n".join(prompt_parts)
if manager._http_client is None:
raise HTTPException(status_code=503, detail="vLLM server not connected.")
request_id = str(uuid.uuid4())
logger.info(
f"chat_completions request_id={request_id} messages={len(messages)} max_tokens={max_tokens}"
)
sampling_params = SamplingParams(
max_tokens=max_tokens,
temperature=temperature,
stop=["[|endofturn|]"],
)
try:
final_output = await manager._run_engine(prompt, sampling_params, request_id)
except Exception as exc:
logger.error(f"chat_completions generation failed: {exc}")
raise HTTPException(status_code=500, detail="Generation failed due to internal error.")
if final_output is None or not final_output.outputs:
raise HTTPException(status_code=500, detail="Generation failed.")
output = final_output.outputs[0]
text = manager._strip_thought_blocks(output.text)
prompt_tokens = len(final_output.prompt_token_ids)
completion_tokens = len(output.token_ids)
vllm_reason = getattr(output, "finish_reason", None)
finish_reason = "length" if vllm_reason == "length" else "stop"
return {
"id": f"chatcmpl-{request_id}",
"object": "chat.completion",
"created": int(time.time()),
"model": model,
"choices": [
{
"index": 0,
"message": {"role": "assistant", "content": text},
"finish_reason": finish_reason,
}
],
"usage": {
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": prompt_tokens + completion_tokens,
},
}
@app.post("/v1/stream")
@_rate_limit("30/minute")
async def stream_generate(
request: GenerateRequest,
_: None = Depends(verify_api_key),
flags: FeatureFlags = Depends(get_feature_flags),
):
if not request.stream:
request.stream = True
request_id = str(uuid.uuid4())
results_stream = await manager.generate_stream(
request,
request_id,
flags,
)
async def stream_results() -> AsyncGenerator[str, None]:
async for request_output in results_stream:
text = request_output.outputs[0].text
finished = request_output.finished
if finished:
text = manager._strip_thought_blocks(text)
response_obj = {"request_id": request_id, "text": text, "finished": finished}
yield f"data: {json.dumps(response_obj, ensure_ascii=False)}\n\n"
return StreamingResponse(stream_results(), media_type="text/event-stream")
def _trace_to_schema(trace: AgentTrace) -> AgentTraceSchema:
return AgentTraceSchema(
request_id=trace.request_id,
session_id=trace.session_id,
plan=trace.plan_tools,
plan_reason=trace.plan_reason,
tool_results=[
ToolResultSchema(
tool=tool_name(result.tool),
success=result.success,
latency_ms=round(result.latency_ms, 2),
data=result.data,
error=result.error,
)
for result in trace.tool_results
],
total_latency_ms=round(trace.total_latency_ms, 2),
error=trace.error,
)
@app.post("/v1/agent/run", response_model=AgentRunResponse)
@_rate_limit("30/minute")
async def agent_run(
request: AgentRunRequest,
_: None = Depends(verify_api_key),
):
if not manager.agent_loop:
raise HTTPException(status_code=503, detail="์—์ด์ „ํŠธ ๋ฃจํ”„๊ฐ€ ์ดˆ๊ธฐํ™”๋˜์ง€ ์•Š์•˜์Šต๋‹ˆ๋‹ค.")
if request.stream:
raise HTTPException(status_code=400, detail="์ŠคํŠธ๋ฆฌ๋ฐ์€ /v1/agent/stream์„ ์‚ฌ์šฉํ•˜์„ธ์š”.")
session = manager.session_store.get_or_create(session_id=request.session_id)
request_id = str(uuid.uuid4())
trace = await manager.agent_loop.run(
query=request.query,
session=session,
request_id=request_id,
force_tools=request.force_tools,
)
return AgentRunResponse(
request_id=request_id,
session_id=session.session_id,
text=trace.final_text,
trace=_trace_to_schema(trace),
)
@app.post("/v1/agent/stream")
@_rate_limit("30/minute")
async def agent_stream(
request: AgentRunRequest,
_: None = Depends(verify_api_key),
):
if not manager.agent_loop:
raise HTTPException(status_code=503, detail="์—์ด์ „ํŠธ ๋ฃจํ”„๊ฐ€ ์ดˆ๊ธฐํ™”๋˜์ง€ ์•Š์•˜์Šต๋‹ˆ๋‹ค.")
session = manager.session_store.get_or_create(session_id=request.session_id)
request_id = str(uuid.uuid4())
async def stream_events() -> AsyncGenerator[str, None]:
async for event in manager.agent_loop.run_stream(
query=request.query,
session=session,
request_id=request_id,
force_tools=request.force_tools,
):
yield f"data: {json.dumps(event, ensure_ascii=False)}\n\n"
return StreamingResponse(stream_events(), media_type="text/event-stream")
# ---------------------------------------------------------------------------
# v2 ์—”๋“œํฌ์ธํŠธ: LangGraph ๊ธฐ๋ฐ˜ agent ์‹คํ–‰ (interrupt/approve ํŒจํ„ด)
# ---------------------------------------------------------------------------
@app.post("/v2/agent/stream")
@_rate_limit("30/minute")
async def v2_agent_stream(
request: AgentRunRequest,
_http_request: Request,
_: None = Depends(verify_api_key),
):
"""LangGraph ๊ธฐ๋ฐ˜ agent SSE ์ŠคํŠธ๋ฆฌ๋ฐ ์‹คํ–‰.
graph.astream()์„ ์‚ฌ์šฉํ•ด ๋…ธ๋“œ๋ณ„ ์™„๋ฃŒ ์ด๋ฒคํŠธ๋ฅผ SSE๋กœ ์ „์†กํ•œ๋‹ค.
์ด๋ฒคํŠธ ํ˜•์‹ (๊ฐ ์ค„: ``data: <JSON>\\n\\n``):
- ๋…ธ๋“œ ์ง„ํ–‰: ``{"node": "<name>", "status": "completed", ...}``
- approval_wait ๋„๋‹ฌ:
``{"node": "approval_wait", "status": "awaiting_approval",
"approval_request": {...}, "thread_id": "..."}``
- ์˜ค๋ฅ˜: ``{"node": "error", "status": "error", "error": "..."}``
์Šน์ธ ํ๋ฆ„:
- ํด๋ผ์ด์–ธํŠธ๋Š” ``awaiting_approval`` ์ด๋ฒคํŠธ ์ˆ˜์‹  ํ›„ ์ŠคํŠธ๋ฆผ์ด ์ข…๋ฃŒ๋จ์„ ์ธ์ง€ํ•˜๊ณ 
``/v2/agent/approve``๋กœ ์Šน์ธ/๊ฑฐ์ ˆ์„ ์ „๋‹ฌํ•œ๋‹ค.
"""
if not manager.graph:
async def _no_graph():
yield 'data: {"node": "error", "status": "error", "error": "LangGraph graph๊ฐ€ ์ดˆ๊ธฐํ™”๋˜์ง€ ์•Š์•˜์Šต๋‹ˆ๋‹ค."}\n\n'
return StreamingResponse(_no_graph(), media_type="text/event-stream")
from langchain_core.messages import HumanMessage
thread_id = request.session_id or str(uuid.uuid4())
session_id = thread_id
request_id = str(uuid.uuid4())
config = {"configurable": {"thread_id": thread_id}}
initial_state = {
"session_id": session_id,
"request_id": request_id,
"messages": [HumanMessage(content=request.query)],
}
# ๊ธฐ์กด interrupt ์ƒํƒœ๊ฐ€ ๋‚จ์•„์žˆ์œผ๋ฉด ๊ฑฐ์ ˆ(cancel)๋กœ ํ•ด์†Œ
try:
from langgraph.types import Command
existing_state = await manager.graph.aget_state(config)
if existing_state and existing_state.next:
await manager.graph.ainvoke(
Command(resume={"approved": False, "cancel": True}),
config,
)
except Exception as clear_exc:
logger.warning(f"[v2] interrupt ์ƒํƒœ ํ™•์ธ/ํ•ด์†Œ ์‹คํŒจ (๋ฌด์‹œ): {type(clear_exc).__name__}")
async def _generate() -> AsyncGenerator[str, None]:
try:
async for chunk in manager.graph.astream(initial_state, config, stream_mode="updates"):
# chunk: {node_name: state_delta}
for node_name, state_delta in chunk.items():
event: dict = {
"node": node_name,
"status": "completed",
}
# persist ์™„๋ฃŒ ์‹œ evidence_items๋ฅผ ์ด๋ฒคํŠธ์— ํฌํ•จ.
# ์ „์ œ: stream_mode="updates"์—์„œ state_delta๋Š” ๋…ธ๋“œ์˜ raw return dict๋‹ค.
# evidence_items ์Šคํ‚ค๋งˆ: EvidenceItem.to_dict() ํ•„๋“œ๋ฅผ ๋”ฐ๋ฅธ๋‹ค.
# source_type: "api" | "llm_generated"
# title, excerpt, link_or_path, page, score, provider_meta
if node_name == "persist" and isinstance(state_delta, dict):
if state_delta.get("final_text"):
event["final_text"] = state_delta["final_text"]
if state_delta.get("evidence_items"):
event["evidence_items"] = state_delta["evidence_items"]
# approval_wait: ๋ช…์‹œ์  ๋…ธ๋“œ๋ช… ๋˜๋Š” LangGraph interrupt() ํ˜ธ์ถœ ์‹œ
# stream_mode="updates"์—์„œ emit๋˜๋Š” "__interrupt__" ์ฒญํฌ ๋ชจ๋‘ ์ฒ˜๋ฆฌ
if node_name in ("approval_wait", "__interrupt__"):
try:
graph_state = await manager.graph.aget_state(config)
if graph_state.next:
event = {
"node": "approval_wait",
"status": "awaiting_approval",
"approval_request": _extract_approval_request(graph_state),
"thread_id": thread_id,
"session_id": session_id,
}
except Exception as exc:
logger.warning(f"[v2/agent/stream] aget_state ์‹คํŒจ: {exc}")
event["node"] = "approval_wait"
event["status"] = "awaiting_approval"
event["thread_id"] = thread_id
event["session_id"] = session_id
event["approval_request"] = {
"prompt": "์Šน์ธ ์ •๋ณด๋ฅผ ๋ถˆ๋Ÿฌ์˜ฌ ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค. /v2/agent/approve๋กœ ์ง„ํ–‰ํ•˜์„ธ์š”."
}
yield f"data: {json.dumps(event, ensure_ascii=False)}\n\n"
# Stop streaming after awaiting_approval (client must call /v2/agent/approve)
if event.get("status") == "awaiting_approval":
return
except Exception as exc:
logger.error(f"[v2/agent/stream] ์ŠคํŠธ๋ฆผ ์˜ˆ์™ธ: {exc}")
error_event = {"node": "error", "status": "error", "error": str(exc)}
yield f"data: {json.dumps(error_event, ensure_ascii=False)}\n\n"
return StreamingResponse(_generate(), media_type="text/event-stream")
@app.post("/v2/agent/run")
@_rate_limit("30/minute")
async def v2_agent_run(
request: AgentRunRequest,
_http_request: Request,
_: None = Depends(verify_api_key),
):
"""LangGraph ๊ธฐ๋ฐ˜ agent ์‹คํ–‰ (1๋‹จ๊ณ„: interrupt๊นŒ์ง€).
graph๋ฅผ ์‹คํ–‰ํ•˜์—ฌ `approval_wait` ๋…ธ๋“œ์—์„œ interrupt๋˜๋ฉด
`status: awaiting_approval`๊ณผ ํ•จ๊ป˜ ์Šน์ธ ์š”์ฒญ ์ •๋ณด๋ฅผ ๋ฐ˜ํ™˜ํ•œ๋‹ค.
ํด๋ผ์ด์–ธํŠธ๋Š” ๋ฐ˜ํ™˜๋œ `thread_id`๋ฅผ ์ €์žฅํ•ด๋‘๊ณ 
`/v2/agent/approve`๋กœ ์Šน์ธ/๊ฑฐ์ ˆ์„ ์ „๋‹ฌํ•ด์•ผ ํ•œ๋‹ค.
Session Resume Contract
-----------------------
๋™์ผ session_id๋กœ ์žฌ์š”์ฒญํ•˜๋Š” ๊ฒฝ์šฐ ๋‹ค์Œ ๊ทœ์น™์„ ๋”ฐ๋ฅธ๋‹ค:
1. **interrupt ๋Œ€๊ธฐ ์ค‘**: graph๊ฐ€ approval_wait์—์„œ interrupt ์ƒํƒœ์ด๋ฉด
ํ˜„์žฌ checkpoint์—์„œ resumeํ•˜์ง€ ์•Š๊ณ  ์ƒˆ ๋ฉ”์‹œ์ง€๋ฅผ *์ถ”๊ฐ€ํ•˜์—ฌ* ์ด์–ด์„œ ์‹คํ–‰ํ•œ๋‹ค.
(์žฌ์š”์ฒญ์€ ์ƒˆ graph_run์œผ๋กœ ์ฒ˜๋ฆฌํ•œ๋‹ค.)
์Šน์ธ/๊ฑฐ์ ˆ์€ ๋ฐ˜๋“œ์‹œ `/v2/agent/approve`๋ฅผ ํ†ตํ•ด ์ฒ˜๋ฆฌํ•ด์•ผ ํ•œ๋‹ค.
2. **์™„๋ฃŒ๋œ graph**: graph๊ฐ€ END์— ๋„๋‹ฌํ•œ ์ƒํƒœ(state.next == [])์ด๋ฉด
๋™์ผ thread_id์— ์ƒˆ graph_run์„ ์‹œ์ž‘ํ•œ๋‹ค. LangGraph checkpointer๊ฐ€
๋™์ผ thread_id์—์„œ ์ด์ „ ์ƒํƒœ๋ฅผ ๋ˆ„์ ํ•˜๋ฏ€๋กœ ๋Œ€ํ™” ํžˆ์Šคํ† ๋ฆฌ๊ฐ€ ๋ณด์กด๋œ๋‹ค.
3. **ํ”„๋กœ์„ธ์Šค ์žฌ์‹œ์ž‘ ํ›„**: SqliteSaver ์‚ฌ์šฉ ์‹œ DB์—์„œ checkpoint๊ฐ€ ๋ณต์›๋˜๋ฏ€๋กœ
interrupt ์ƒํƒœ๊ฐ€ ์œ ์ง€๋œ๋‹ค. ํด๋ผ์ด์–ธํŠธ๋Š” ๊ธฐ์กด thread_id๋กœ `/v2/agent/approve`
๋ฅผ ๋‹ค์‹œ ํ˜ธ์ถœํ•˜๋ฉด ์ค‘๋‹จ๋œ ์ง€์ ์—์„œ resumeํ•  ์ˆ˜ ์žˆ๋‹ค.
Note: session_id == thread_id. ๋‘ ๊ฐ’์€ ํ•ญ์ƒ ๋™์ผํ•˜๊ฒŒ ์œ ์ง€๋œ๋‹ค.
"""
if not manager.graph:
raise HTTPException(status_code=503, detail="LangGraph graph๊ฐ€ ์ดˆ๊ธฐํ™”๋˜์ง€ ์•Š์•˜์Šต๋‹ˆ๋‹ค.")
from langchain_core.messages import HumanMessage
thread_id = request.session_id or str(uuid.uuid4())
session_id = thread_id # thread_id๋ฅผ session_id๋กœ ํ™•์ • (session_id == thread_id ๋ถˆ๋ณ€)
request_id = str(uuid.uuid4())
config = {"configurable": {"thread_id": thread_id}}
initial_state = {
"session_id": session_id,
"request_id": request_id,
"messages": [HumanMessage(content=request.query)],
}
# ๊ธฐ์กด interrupt ์ƒํƒœ๊ฐ€ ๋‚จ์•„์žˆ์œผ๋ฉด ๊ฑฐ์ ˆ(cancel)๋กœ ํ•ด์†Œ
try:
existing_state = await manager.graph.aget_state(config)
if existing_state and existing_state.next:
from langgraph.types import Command
await manager.graph.ainvoke(
Command(resume={"approved": False, "cancel": True}),
config,
)
except Exception as clear_exc:
logger.warning(f"[v2] interrupt ์ƒํƒœ ํ™•์ธ/ํ•ด์†Œ ์‹คํŒจ (๋ฌด์‹œ): {type(clear_exc).__name__}")
try:
await manager.graph.ainvoke(initial_state, config)
# interrupt ์ƒํƒœ ํ™•์ธ
graph_state = await manager.graph.aget_state(config)
if graph_state.next:
# interrupt ๋Œ€๊ธฐ ์ค‘: approval_request ์ •๋ณด๋ฅผ ํด๋ผ์ด์–ธํŠธ์— ๋ฐ˜ํ™˜
return {
"status": "awaiting_approval",
"thread_id": thread_id,
"session_id": session_id,
"graph_run_id": request_id,
"approval_request": _extract_approval_request(graph_state),
}
# interrupt ์—†์ด ์™„๋ฃŒ๋œ ๊ฒฝ์šฐ (rejected ๋˜๋Š” ์˜ค๋ฅ˜)
final_state = graph_state.values
return {
"status": "completed",
"thread_id": thread_id,
"session_id": session_id,
"graph_run_id": request_id,
"text": final_state.get("final_text", ""),
"evidence_items": final_state.get("evidence_items", []),
}
except Exception as exc:
logger.error(f"[v2/agent/run] ์˜ˆ์™ธ ๋ฐœ์ƒ: {exc}")
# graph_run์„ "error" status๋กœ ๊ธฐ๋ก ์‹œ๋„
try:
if manager.session_store:
session = manager.session_store.get_or_create(session_id)
session.add_graph_run(
request_id=request_id,
plan_summary=f"[error] {exc}",
approval_status="",
executed_capabilities=[],
status="error",
total_latency_ms=0.0,
)
except Exception as persist_exc:
logger.warning(f"[v2/agent/run] error persist ์‹คํŒจ: {persist_exc}")
logger.exception(f"[v2/agent/run] ์š”์ฒญ ์ฒ˜๋ฆฌ ์‹คํŒจ: {exc}")
return JSONResponse(
status_code=500,
content={
"status": "error",
"thread_id": thread_id,
"session_id": session_id,
"graph_run_id": request_id,
"error": "์š”์ฒญ ์ฒ˜๋ฆฌ ์ค‘ ๋‚ด๋ถ€ ์˜ค๋ฅ˜๊ฐ€ ๋ฐœ์ƒํ–ˆ์Šต๋‹ˆ๋‹ค.",
},
)
@app.post("/v2/agent/approve")
@_rate_limit("30/minute")
async def v2_agent_approve(
thread_id: str,
approved: bool,
_http_request: Request,
_: None = Depends(verify_api_key),
):
"""interrupt๋œ graph๋ฅผ resumeํ•œ๋‹ค (2๋‹จ๊ณ„: ์Šน์ธ/๊ฑฐ์ ˆ).
Parameters
----------
thread_id : str
`/v2/agent/run`์—์„œ ๋ฐ˜ํ™˜๋œ thread_id.
approved : bool
True๋ฉด tool_execute๋กœ ์ง„ํ–‰, False๋ฉด graph๊ฐ€ END๋กœ ์ข…๋ฃŒ.
"""
if not manager.graph:
raise HTTPException(status_code=503, detail="LangGraph graph๊ฐ€ ์ดˆ๊ธฐํ™”๋˜์ง€ ์•Š์•˜์Šต๋‹ˆ๋‹ค.")
from langgraph.types import Command
config = {"configurable": {"thread_id": thread_id}}
try:
result = await manager.graph.ainvoke(
Command(resume={"approved": approved}),
config,
)
# ๊ฑฐ์ ˆ์ด๋ฉด "rejected", ์Šน์ธ ์™„๋ฃŒ๋ฉด "completed"
approval_status = result.get("approval_status", "")
if not approved:
response_status = "rejected"
else:
response_status = "completed"
return {
"status": response_status,
"thread_id": thread_id,
"session_id": result.get("session_id", ""),
"graph_run_id": result.get("request_id", ""),
"text": result.get("final_text", ""),
"evidence_items": result.get("evidence_items", []),
"approval_status": approval_status,
}
except Exception as exc:
logger.error(f"[v2/agent/approve] ์˜ˆ์™ธ ๋ฐœ์ƒ: {exc}")
# graph_run์„ "error" status๋กœ ๊ธฐ๋ก ์‹œ๋„
session_id = ""
request_id = ""
try:
if manager.session_store:
graph_state = await manager.graph.aget_state(config)
state_values = graph_state.values if graph_state else {}
session_id = state_values.get("session_id", "")
request_id = state_values.get("request_id", "")
if session_id:
session = manager.session_store.get_or_create(session_id)
session.add_graph_run(
request_id=request_id,
plan_summary=f"[error] {exc}",
approval_status="",
executed_capabilities=[],
status="error",
total_latency_ms=0.0,
)
except Exception as persist_exc:
logger.warning(f"[v2/agent/approve] error persist ์‹คํŒจ: {persist_exc}")
logger.exception(f"[v2/agent/approve] ์Šน์ธ ์ฒ˜๋ฆฌ ์‹คํŒจ: {exc}")
return JSONResponse(
status_code=500,
content={
"status": "error",
"thread_id": thread_id,
"session_id": session_id,
"graph_run_id": request_id,
"error": "์Šน์ธ ์ฒ˜๋ฆฌ ์ค‘ ๋‚ด๋ถ€ ์˜ค๋ฅ˜๊ฐ€ ๋ฐœ์ƒํ–ˆ์Šต๋‹ˆ๋‹ค.",
},
)
@app.post("/v2/agent/cancel")
@_rate_limit("30/minute")
async def v2_agent_cancel(
thread_id: str,
_http_request: Request,
_: None = Depends(verify_api_key),
):
"""interrupt ๋Œ€๊ธฐ ์ค‘์ธ graph๋ฅผ ๊ฐ•์ œ ์ทจ์†Œํ•œ๋‹ค.
interrupt ์ƒํƒœ์—์„œ ๊ฑฐ์ ˆ ์ฒ˜๋ฆฌ(Command(resume={"approved": False}))๋ฅผ ์ˆ˜ํ–‰ํ•˜๋˜,
state์— interrupt_reason="user_cancel"์„ ์ „๋‹ฌํ•˜์—ฌ
persist ๋…ธ๋“œ๊ฐ€ graph_run status๋ฅผ "interrupted"๋กœ ๊ธฐ๋กํ•˜๊ฒŒ ํ•œ๋‹ค.
Parameters
----------
thread_id : str
`/v2/agent/run`์—์„œ ๋ฐ˜ํ™˜๋œ thread_id.
"""
if not manager.graph:
raise HTTPException(status_code=503, detail="LangGraph graph๊ฐ€ ์ดˆ๊ธฐํ™”๋˜์ง€ ์•Š์•˜์Šต๋‹ˆ๋‹ค.")
from langgraph.types import Command
config = {"configurable": {"thread_id": thread_id}}
try:
# interrupt ์ƒํƒœ ํ™•์ธ
graph_state = await manager.graph.aget_state(config)
if not graph_state or not graph_state.next:
raise HTTPException(
status_code=409,
detail="ํ•ด๋‹น thread๋Š” ํ˜„์žฌ interrupt ๋Œ€๊ธฐ ์ƒํƒœ๊ฐ€ ์•„๋‹™๋‹ˆ๋‹ค.",
)
session_id = graph_state.values.get("session_id", "")
# ๊ฐ•์ œ ๊ฑฐ์ ˆ + interrupt_reason ์ „๋‹ฌ๋กœ resume
result = await manager.graph.ainvoke(
Command(resume={"approved": False, "cancel": True}),
config,
)
# persist ๋…ธ๋“œ์—์„œ "interrupted" ๊ธฐ๋ก์„ ์œ„ํ•ด state update
# (approval_wait_node๊ฐ€ cancel ์‹ ํ˜ธ๋ฅผ interrupt_reason์œผ๋กœ ๋ณ€ํ™˜)
return {
"status": "cancelled",
"thread_id": thread_id,
"session_id": session_id,
"graph_run_id": result.get("request_id", ""),
}
except HTTPException:
raise
except Exception as exc:
logger.exception(f"[v2/agent/cancel] ์ทจ์†Œ ์ฒ˜๋ฆฌ ์‹คํŒจ: {exc}")
return JSONResponse(
status_code=500,
content={
"status": "error",
"thread_id": thread_id,
"error": "์ทจ์†Œ ์ฒ˜๋ฆฌ ์ค‘ ๋‚ด๋ถ€ ์˜ค๋ฅ˜๊ฐ€ ๋ฐœ์ƒํ–ˆ์Šต๋‹ˆ๋‹ค.",
},
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, **runtime_config.to_uvicorn_kwargs())