govon-runtime / src /cli /daemon.py
umyunsang's picture
sync: main branch src/ with PR#561+#563 (tool calling + E2E observability)
0b04246 verified
"""GovOn daemon lifecycle ๊ด€๋ฆฌ.
Issue #144: CLI-daemon/LangGraph runtime ์—ฐ๋™ ๋ฐ session resume.
uvicorn์œผ๋กœ ๋ฐฑ๊ทธ๋ผ์šด๋“œ์—์„œ GovOn API ์„œ๋ฒ„๋ฅผ ๊ธฐ๋™ํ•˜๊ณ ,
PID ํŒŒ์ผ๋กœ ํ”„๋กœ์„ธ์Šค ์ƒํƒœ๋ฅผ ์ถ”์ ํ•œ๋‹ค.
.. note::
์ด ๋ชจ๋“ˆ์€ **๋กœ์ปฌ daemon ์ „์šฉ**์ž…๋‹ˆ๋‹ค.
์›๊ฒฉ ์„œ๋ฒ„์— ์—ฐ๊ฒฐํ•  ๋•Œ๋Š” ``GOVON_RUNTIME_URL`` ํ™˜๊ฒฝ๋ณ€์ˆ˜๋ฅผ ์„ค์ •ํ•˜๋ฉด
``shell.py``์˜ ``main()``์ด ์ด ๋ชจ๋“ˆ์„ ์™„์ „ํžˆ ๊ฑด๋„ˆ๋›ฐ๊ณ  ์ง€์ •๋œ URL์—
์ง์ ‘ ์—ฐ๊ฒฐํ•ฉ๋‹ˆ๋‹ค. Docker, ํด๋ผ์šฐ๋“œ ๋ฐฐํฌ, CI ํ™˜๊ฒฝ์—์„œ๋Š” ํ•ด๋‹น ๋ฐฉ์‹์„
์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ์„ ๊ถŒ์žฅํ•ฉ๋‹ˆ๋‹ค.
"""
from __future__ import annotations
import os
import signal
import subprocess
import sys
import time
from pathlib import Path
from typing import Optional
import httpx
from loguru import logger
class DaemonManager:
"""GovOn API ์„œ๋ฒ„ daemon lifecycle ๊ด€๋ฆฌ์ž.
PID ํŒŒ์ผ๊ณผ /health ์—”๋“œํฌ์ธํŠธ๋ฅผ ๊ฒฐํ•ฉํ•˜์—ฌ daemon ์ƒํƒœ๋ฅผ ํ™•์ธํ•˜๊ณ ,
ํ•„์š” ์‹œ uvicorn์œผ๋กœ ๋ฐฑ๊ทธ๋ผ์šด๋“œ ๊ธฐ๋™ํ•œ๋‹ค.
ํ™˜๊ฒฝ๋ณ€์ˆ˜ ``GOVON_PORT``๋กœ ํฌํŠธ๋ฅผ ์˜ค๋ฒ„๋ผ์ด๋“œํ•  ์ˆ˜ ์žˆ๋‹ค (๊ธฐ๋ณธ: 8000).
"""
GOVON_HOME = Path.home() / ".govon"
_HEALTH_CHECK_TIMEOUT = 120 # ์ตœ๋Œ€ ๋Œ€๊ธฐ ์ดˆ
_HEALTH_CHECK_INTERVAL = 1 # ์žฌ์‹œ๋„ ๊ฐ„๊ฒฉ (์ดˆ)
def __init__(self) -> None:
self.GOVON_HOME.mkdir(parents=True, exist_ok=True)
self.port: int = int(os.environ.get("GOVON_PORT", "8000"))
self.pid_path: Path = self.GOVON_HOME / "daemon.pid"
self.log_path: Path = self.GOVON_HOME / "daemon.log"
def get_base_url(self) -> str:
"""daemon base URL์„ ๋ฐ˜ํ™˜ํ•œ๋‹ค."""
return f"http://127.0.0.1:{self.port}"
def is_running(self) -> bool:
"""daemon์ด ์‹คํ–‰ ์ค‘์ธ์ง€ ํ™•์ธํ•œ๋‹ค.
PID ํŒŒ์ผ์ด ์กด์žฌํ•˜๊ณ  ํ•ด๋‹น ํ”„๋กœ์„ธ์Šค๊ฐ€ ์‚ด์•„ ์žˆ์œผ๋ฉฐ,
/health ์—”๋“œํฌ์ธํŠธ๊ฐ€ ์‘๋‹ตํ•  ๋•Œ True๋ฅผ ๋ฐ˜ํ™˜ํ•œ๋‹ค.
"""
pid = self._read_pid()
if pid is None:
return False
# PID ํ”„๋กœ์„ธ์Šค ์ƒ์กด ํ™•์ธ
if not self._pid_alive(pid):
logger.debug(f"[daemon] PID {pid} ํ”„๋กœ์„ธ์Šค๊ฐ€ ์—†์Œ. PID ํŒŒ์ผ ์ œ๊ฑฐ.")
self._remove_pid()
return False
# /health HTTP ํ™•์ธ
try:
with httpx.Client(timeout=5.0) as client:
resp = client.get(f"{self.get_base_url()}/health")
return resp.status_code == 200
except (httpx.ConnectError, httpx.TimeoutException, Exception):
return False
def start(self) -> bool:
"""uvicorn์„ ๋ฐฑ๊ทธ๋ผ์šด๋“œ๋กœ ๊ธฐ๋™ํ•˜๊ณ  PID๋ฅผ ๊ธฐ๋กํ•œ๋‹ค.
Returns
-------
bool
๊ธฐ๋™ ์„ฑ๊ณต ์—ฌ๋ถ€ (health check ํ†ต๊ณผ ์‹œ True).
"""
# ๋ ˆ์ด์Šค ์ปจ๋””์…˜ ๋ฐฉ์ง€: ๊ธฐ๋™ ์ „ ํ•œ ๋ฒˆ ๋” health check
if self.is_running():
logger.info("[daemon] ์ด๋ฏธ ์‹คํ–‰ ์ค‘์ž…๋‹ˆ๋‹ค.")
return True
cmd = [
sys.executable,
"-m",
"uvicorn",
"src.inference.api_server:app",
"--host",
"127.0.0.1",
"--port",
str(self.port),
]
if self._port_in_use():
logger.error(f"[daemon] ํฌํŠธ {self.port}์ด ์ด๋ฏธ ์‚ฌ์šฉ ์ค‘์ž…๋‹ˆ๋‹ค.")
return False
logger.info(f"[daemon] ๊ธฐ๋™ ๋ช…๋ น: {' '.join(cmd)}")
with open(self.log_path, "a") as log_file:
proc = subprocess.Popen(
cmd,
stdout=log_file,
stderr=log_file,
start_new_session=True,
)
self._write_pid(proc.pid)
logger.info(f"[daemon] ํ”„๋กœ์„ธ์Šค ๊ธฐ๋™ ์™„๋ฃŒ. PID={proc.pid}")
# health check ๋Œ€๊ธฐ
healthy = self._wait_until_healthy()
if not healthy:
logger.error("[daemon] health check ์‹คํŒจ. ํ”„๋กœ์„ธ์Šค๋ฅผ ์ •๋ฆฌํ•ฉ๋‹ˆ๋‹ค.")
self.stop()
return False
return True
def stop(self) -> None:
"""daemon์„ ์ •์ƒ ์ข…๋ฃŒํ•œ๋‹ค (SIGTERM โ†’ timeout ํ›„ SIGKILL)."""
pid = self._read_pid()
if pid is None:
logger.info("[daemon] PID ํŒŒ์ผ์ด ์—†์Šต๋‹ˆ๋‹ค. ์‹คํ–‰ ์ค‘์ด ์•„๋‹Œ ๊ฒƒ์œผ๋กœ ๊ฐ„์ฃผํ•ฉ๋‹ˆ๋‹ค.")
return
if not self._pid_alive(pid):
logger.info(f"[daemon] PID {pid} ํ”„๋กœ์„ธ์Šค๊ฐ€ ์—†์Šต๋‹ˆ๋‹ค.")
self._remove_pid()
return
logger.info(f"[daemon] SIGTERM ์ „์†ก: PID={pid}")
os.kill(pid, signal.SIGTERM)
# ์ตœ๋Œ€ 10์ดˆ ๋Œ€๊ธฐ
for _ in range(10):
time.sleep(1)
if not self._pid_alive(pid):
logger.info(f"[daemon] PID {pid} ์ •์ƒ ์ข…๋ฃŒ๋จ.")
self._remove_pid()
return
logger.warning(f"[daemon] SIGKILL ์ „์†ก: PID={pid}")
try:
os.kill(pid, signal.SIGKILL)
except ProcessLookupError:
pass
self._remove_pid()
def ensure_running(self) -> str:
"""daemon์ด ์‹คํ–‰ ์ค‘์ž„์„ ๋ณด์žฅํ•˜๊ณ  base URL์„ ๋ฐ˜ํ™˜ํ•œ๋‹ค.
์‹คํ–‰ ์ค‘์ด ์•„๋‹ˆ๋ฉด start()๋ฅผ ํ˜ธ์ถœํ•œ๋‹ค.
Returns
-------
str
daemon base URL (์˜ˆ: "http://127.0.0.1:8000").
Raises
------
RuntimeError
daemon ๊ธฐ๋™์— ์‹คํŒจํ•œ ๊ฒฝ์šฐ.
"""
if not self.is_running():
success = self.start()
if not success:
raise RuntimeError(
"GovOn daemon ๊ธฐ๋™์— ์‹คํŒจํ–ˆ์Šต๋‹ˆ๋‹ค. " f"๋กœ๊ทธ๋ฅผ ํ™•์ธํ•˜์„ธ์š”: {self.log_path}"
)
return self.get_base_url()
def _port_in_use(self) -> bool:
"""ํฌํŠธ๊ฐ€ ์ด๋ฏธ ์‚ฌ์šฉ ์ค‘์ธ์ง€ ํ™•์ธํ•œ๋‹ค."""
import socket
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
return s.connect_ex(("127.0.0.1", self.port)) == 0
# ------------------------------------------------------------------
# ๋‚ด๋ถ€ ํ—ฌํผ
# ------------------------------------------------------------------
def _read_pid(self) -> Optional[int]:
"""PID ํŒŒ์ผ์—์„œ PID๋ฅผ ์ฝ๋Š”๋‹ค. ํŒŒ์ผ์ด ์—†์œผ๋ฉด None."""
if not self.pid_path.exists():
return None
try:
first_line = self.pid_path.read_text().strip().splitlines()[0]
return int(first_line.split()[0])
except (ValueError, OSError, IndexError):
return None
def _write_pid(self, pid: int) -> None:
"""PID์™€ ๊ธฐ๋™ ์‹œ๊ฐ(epoch timestamp)์„ ํŒŒ์ผ์— ๊ธฐ๋กํ•œ๋‹ค."""
self.pid_path.write_text(f"{pid} {int(time.time())}")
def _remove_pid(self) -> None:
"""PID ํŒŒ์ผ์„ ์ œ๊ฑฐํ•œ๋‹ค."""
try:
self.pid_path.unlink()
except FileNotFoundError:
pass
@staticmethod
def _pid_alive(pid: int) -> bool:
"""ํ”„๋กœ์„ธ์Šค๊ฐ€ ์‚ด์•„ ์žˆ๋Š”์ง€ ํ™•์ธํ•œ๋‹ค."""
try:
os.kill(pid, 0)
return True
except ProcessLookupError:
return False
except PermissionError:
# ํ”„๋กœ์„ธ์Šค๊ฐ€ ์กด์žฌํ•˜์ง€๋งŒ ๊ถŒํ•œ์ด ์—†๋Š” ๊ฒฝ์šฐ โ†’ ์‚ด์•„ ์žˆ์Œ์œผ๋กœ ๊ฐ„์ฃผ
return True
def _wait_until_healthy(self) -> bool:
"""health check๊ฐ€ ํ†ต๊ณผํ•  ๋•Œ๊นŒ์ง€ ์ตœ๋Œ€ 120์ดˆ ๋Œ€๊ธฐํ•œ๋‹ค."""
deadline = time.monotonic() + self._HEALTH_CHECK_TIMEOUT
while time.monotonic() < deadline:
try:
with httpx.Client(timeout=3.0) as client:
resp = client.get(f"{self.get_base_url()}/health")
if resp.status_code == 200:
logger.info("[daemon] health check ํ†ต๊ณผ.")
return True
except (httpx.ConnectError, httpx.TimeoutException, Exception):
pass
time.sleep(self._HEALTH_CHECK_INTERVAL)
logger.error("[daemon] health check timeout (120์ดˆ).")
return False