""" Eodi MCP Server =============== 호텔 로열티 프로그램 지식 베이스 MCP 서버. Transport: - stdio: 로컬 개발 및 Claude Desktop 연동 - HTTP: 향후 배포용 (Streamable HTTP) 실행: python -m src.mcp.server 또는 python src/mcp/server.py """ import os import sys import json import asyncio from typing import Dict, Any, Optional from pathlib import Path # 프로젝트 루트를 경로에 추가 (절대 경로 사용!) PROJECT_ROOT = Path(__file__).parent.parent.parent.absolute() sys.path.insert(0, str(PROJECT_ROOT)) # .env 파일 로드 (절대 경로로!) from dotenv import load_dotenv env_path = PROJECT_ROOT / ".env" if env_path.exists(): load_dotenv(env_path) from src.mcp.tools import TOOLS, execute_tool def log(msg: str): """stderr로 로깅 (stdout은 MCP 프로토콜용)""" print(msg, file=sys.stderr, flush=True) class EodiMCPServer: """ Eodi MCP 서버 - stdio 트랜스포트 구현 MCP 프로토콜 메시지 처리: - initialize: 서버 정보 반환 - tools/list: 사용 가능한 툴 목록 반환 - tools/call: 툴 실행 - resources/list: 리소스 목록 (향후) - resources/read: 리소스 읽기 (향후) """ def __init__(self): self.server_info = { "name": "eodi-kb", "version": "0.1.0", "description": "호텔 로열티 프로그램 지식 베이스 MCP 서버" } self.capabilities = { "tools": {}, "resources": {} } log(f"[eodi-kb] Server initialized at {PROJECT_ROOT}") log(f"[eodi-kb] SUPABASE_URL: {'set' if os.getenv('SUPABASE_URL') else 'NOT SET'}") log(f"[eodi-kb] GEMINI_API_KEY: {'set' if os.getenv('GEMINI_API_KEY') else 'NOT SET'}") async def handle_message(self, message: Dict[str, Any]) -> Dict[str, Any]: """MCP 메시지 처리""" method = message.get("method", "") params = message.get("params", {}) msg_id = message.get("id") log(f"[eodi-kb] <- {method}") try: if method == "initialize": result = await self._handle_initialize(params) elif method == "tools/list": result = await self._handle_tools_list() elif method == "tools/call": result = await self._handle_tools_call(params) elif method == "resources/list": result = await self._handle_resources_list() elif method == "resources/read": result = await self._handle_resources_read(params) elif method == "notifications/initialized": return None elif method == "prompts/list": result = {"prompts": []} else: log(f"[eodi-kb] Unknown method: {method}") result = {"error": {"code": -32601, "message": f"Unknown method: {method}"}} if msg_id is not None: return { "jsonrpc": "2.0", "id": msg_id, "result": result } return None except Exception as e: log(f"[eodi-kb] Error: {e}") if msg_id is not None: return { "jsonrpc": "2.0", "id": msg_id, "error": { "code": -32603, "message": str(e) } } return None async def _handle_initialize(self, params: Dict[str, Any]) -> Dict[str, Any]: """initialize 핸들러""" log(f"[eodi-kb] Initialize from: {params.get('clientInfo', {}).get('name', 'unknown')}") return { "protocolVersion": "2024-11-05", "serverInfo": self.server_info, "capabilities": self.capabilities } async def _handle_tools_list(self) -> Dict[str, Any]: """tools/list 핸들러""" log(f"[eodi-kb] Returning {len(TOOLS)} tools") return {"tools": TOOLS} async def _handle_tools_call(self, params: Dict[str, Any]) -> Dict[str, Any]: """tools/call 핸들러""" tool_name = params.get("name") arguments = params.get("arguments", {}) log(f"[eodi-kb] Calling tool: {tool_name}") if not tool_name: raise ValueError("Tool name is required") # 동기 함수를 비동기로 실행 loop = asyncio.get_event_loop() result = await loop.run_in_executor( None, lambda: execute_tool(tool_name, arguments) ) log(f"[eodi-kb] Tool result: success={result.get('success', False)}") # MCP 응답 형식 return { "content": [ { "type": "text", "text": json.dumps(result, ensure_ascii=False, indent=2) } ] } async def _handle_resources_list(self) -> Dict[str, Any]: """resources/list 핸들러""" return { "resources": [ { "uri": "eodi://kb/stats", "name": "KB Statistics", "description": "지식 베이스 통계 정보", "mimeType": "application/json" } ] } async def _handle_resources_read(self, params: Dict[str, Any]) -> Dict[str, Any]: """resources/read 핸들러""" uri = params.get("uri", "") if uri == "eodi://kb/stats": from src.db import SupabaseAdapter adapter = SupabaseAdapter() stats = adapter.get_stats() return { "contents": [ { "uri": uri, "mimeType": "application/json", "text": json.dumps(stats, ensure_ascii=False) } ] } raise ValueError(f"Unknown resource: {uri}") async def run_stdio(self): """stdio 트랜스포트로 서버 실행""" log(f"🚀 Eodi MCP Server v{self.server_info['version']} started (stdio)") # stdin을 비동기로 읽기 위한 reader reader = asyncio.StreamReader() protocol = asyncio.StreamReaderProtocol(reader) await asyncio.get_event_loop().connect_read_pipe( lambda: protocol, sys.stdin ) while True: try: # 한 줄 읽기 line = await reader.readline() if not line: log("[eodi-kb] EOF received, shutting down") break line_str = line.decode('utf-8').strip() if not line_str: continue # JSON-RPC 메시지 파싱 try: message = json.loads(line_str) except json.JSONDecodeError as e: log(f"[eodi-kb] JSON parse error: {e}") continue # 메시지 처리 response = await self.handle_message(message) # 응답 전송 (stdout) if response: response_line = json.dumps(response, ensure_ascii=False) sys.stdout.write(response_line + "\n") sys.stdout.flush() except asyncio.CancelledError: break except Exception as e: log(f"[eodi-kb] Error: {e}") log("[eodi-kb] Server stopped") async def main(): """비동기 메인 진입점""" server = EodiMCPServer() await server.run_stdio() def main_sync(): """동기 메인 진입점 (CLI 엔트리포인트용)""" asyncio.run(main()) if __name__ == "__main__": main_sync()