""" Eodi MCP Server - Streamable HTTP Transport (MCP 2025-03-26) ============================================================ PlayMCP 등록을 위한 Streamable HTTP Transport 지원 서버입니다. MCP Protocol Revision: 2025-03-26 - 단일 MCP endpoint에서 POST/GET 모두 처리 - POST: JSON-RPC 메시지 수신, JSON 또는 SSE 스트림으로 응답 - GET: SSE 스트림으로 서버 → 클라이언트 메시지 수신 - Mcp-Session-Id 헤더로 세션 관리 실행: uvicorn server_streamable:app --host 0.0.0.0 --port 7860 """ import os import sys import json import uuid import asyncio import logging from pathlib import Path from typing import Any, Dict, Optional from datetime import datetime # 로깅 설정 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger("mcp_server") # 프로젝트 루트 경로 추가 (src/mcp에서 2단계 위로) PROJECT_ROOT = Path(__file__).parent.parent.parent.absolute() sys.path.insert(0, str(PROJECT_ROOT)) # .env 로드 (로컬 개발용 - HF Space에서는 Secrets가 환경변수로 자동 주입됨) from dotenv import load_dotenv env_file = PROJECT_ROOT / ".env" if env_file.exists(): load_dotenv(env_file) logger.info(f"Loaded .env from {env_file}") else: logger.info("No .env file found, using system environment variables (HF Space mode)") from starlette.applications import Starlette from starlette.responses import JSONResponse, StreamingResponse, Response from starlette.routing import Route from starlette.requests import Request from starlette.middleware import Middleware from starlette.middleware.cors import CORSMiddleware from starlette.middleware.base import BaseHTTPMiddleware # ============================================================ # 0. Rate Limiting (스크래핑 방어) # ============================================================ from collections import defaultdict import time as time_module class RateLimiter: """ 간단한 IP 기반 Rate Limiter. 환경변수로 설정 가능: RATE_LIMIT_MAX_REQUESTS: 윈도우당 최대 요청 수 (기본: 60) RATE_LIMIT_WINDOW_SECONDS: 윈도우 시간(초) (기본: 60) """ def __init__( self, max_requests: int = None, window_seconds: int = None ): self.max_requests = max_requests or int(os.getenv("RATE_LIMIT_MAX_REQUESTS", "60")) self.window = window_seconds or int(os.getenv("RATE_LIMIT_WINDOW_SECONDS", "60")) self.requests: Dict[str, list] = defaultdict(list) self._last_cleanup = time_module.time() def _cleanup_old_requests(self): """오래된 요청 기록 정리 (메모리 관리)""" now = time_module.time() # 5분마다 정리 if now - self._last_cleanup < 300: return self._last_cleanup = now for ip in list(self.requests.keys()): self.requests[ip] = [t for t in self.requests[ip] if now - t < self.window] if not self.requests[ip]: del self.requests[ip] def is_allowed(self, client_ip: str) -> bool: """요청 허용 여부 확인""" now = time_module.time() # 주기적 메모리 정리 self._cleanup_old_requests() # 윈도우 내 요청만 유지 self.requests[client_ip] = [ t for t in self.requests[client_ip] if now - t < self.window ] # 제한 확인 if len(self.requests[client_ip]) >= self.max_requests: logger.warning(f"Rate limit exceeded for IP: {client_ip[:20]}...") return False self.requests[client_ip].append(now) return True def get_remaining(self, client_ip: str) -> int: """남은 요청 수 반환""" now = time_module.time() current_requests = len([ t for t in self.requests.get(client_ip, []) if now - t < self.window ]) return max(0, self.max_requests - current_requests) # 전역 Rate Limiter 인스턴스 rate_limiter = RateLimiter() class RateLimitMiddleware(BaseHTTPMiddleware): """Rate Limiting 미들웨어""" # Rate Limit 제외 경로 EXEMPT_PATHS = {"/health", "/"} async def dispatch(self, request: Request, call_next): # 헬스체크 등은 제외 if request.url.path in self.EXEMPT_PATHS: return await call_next(request) # 클라이언트 IP 추출 (프록시 고려) client_ip = request.headers.get( "X-Forwarded-For", request.client.host if request.client else "unknown" ) # X-Forwarded-For는 쉼표로 구분된 IP 목록일 수 있음 if "," in client_ip: client_ip = client_ip.split(",")[0].strip() # Rate Limit 확인 if not rate_limiter.is_allowed(client_ip): return JSONResponse( { "error": "Rate limit exceeded", "message": f"Too many requests. Please wait and try again.", "retry_after_seconds": rate_limiter.window }, status_code=429, headers={ "Retry-After": str(rate_limiter.window), "X-RateLimit-Limit": str(rate_limiter.max_requests), "X-RateLimit-Remaining": "0" } ) # 정상 처리 response = await call_next(request) # Rate Limit 헤더 추가 response.headers["X-RateLimit-Limit"] = str(rate_limiter.max_requests) response.headers["X-RateLimit-Remaining"] = str(rate_limiter.get_remaining(client_ip)) return response # 1. MCP Protocol Constants & Tools Definition # ============================================================ PROTOCOL_VERSION = "2025-03-26" SERVER_NAME = "eodi-kb" SERVER_VERSION = "1.0.0" # Tool 정의 (MCP 표준 형식) - Phase 4 확장: 여행 플랫폼 (호텔/항공/카드/뉴스) # ⚠️ 중요: 이 정의는 src/mcp/tools.py의 TOOLS와 동기화되어야 합니다! TOOLS = [ { "name": "kb_search", "description": ( "Search the travel benefits knowledge base using vector search. " "Covers hotel loyalty programs, airline mileage programs, credit card benefits, and travel deals/news. " "Supports both Korean and English queries." ), "inputSchema": { "type": "object", "properties": { "query": { "type": "string", "description": "Search query (Korean or English)" }, "domain": { "type": "string", "enum": ["hotel", "airline", "card", "news", "all"], "description": "Filter by travel domain: hotel, airline, card, news (deals/promotions), or all (default)" }, "chain": { "type": "string", "description": "Filter by chain/airline/card issuer. Comma-separated for multiple (e.g., 'MARRIOTT,HILTON'). " "Hotels: IHG, MARRIOTT, ACCOR, HILTON, HYATT. " "Airlines: KOREAN_AIR, ASIANA, DELTA, UNITED. " "Cards: AMEX, SHINHAN, HYUNDAI, KB, LOTTE, HANA, SAMSUNG, WOORI" }, "type": { "type": "string", "enum": [ "loyalty_program", "membership_tier", "credit_card", "subscription_program", "points_system", "milestone_program", "airline_program", "airline_tier", "award_chart", "hotel_property", "tier_implementation", "best_rate_guarantee", "deal_alert", "news_update", "channel_benefit", "member_rate" ], "description": "Filter by document type (optional)" }, "threshold": { "type": "number", "default": 0.3, "minimum": 0.1, "maximum": 0.9, "description": "Similarity threshold (0.1-0.9). Lower = more results, higher = more precise." }, "limit": { "type": "integer", "default": 5, "minimum": 1, "maximum": 20, "description": "Maximum number of results to return" } }, "required": ["query"] } }, { "name": "kb_get_document", "description": ( "Retrieve the full content of a specific document. " "Includes original metadata and extracted knowledge." ), "inputSchema": { "type": "object", "properties": { "doc_id": { "type": "string", "description": "Unique document ID (obtained from search results)" } }, "required": ["doc_id"] } }, { "name": "kb_get_context", "description": ( "Search the travel benefits knowledge base and return relevant raw text context. " "Covers hotel loyalty, airline mileage, credit card benefits, and travel deals/news. " "The client AI generates the answer based on this context. " "This is a Thin Server pattern - no AI API cost on server side." ), "inputSchema": { "type": "object", "properties": { "question": { "type": "string", "description": "User question (Korean or English)" }, "domain": { "type": "string", "enum": ["hotel", "airline", "card", "news", "all"], "description": "Filter by travel domain: hotel, airline, card, news (deals/promotions), or all (default)" }, "chain": { "type": "string", "description": "Filter by chain/airline/card issuer. Comma-separated for multiple (e.g., 'MARRIOTT,HILTON')" }, "threshold": { "type": "number", "default": 0.3, "minimum": 0.1, "maximum": 0.9, "description": "Similarity threshold. Lower = more results." }, "limit": { "type": "integer", "default": 5, "minimum": 1, "maximum": 10, "description": "Number of context chunks to return" } }, "required": ["question"] } }, { "name": "kb_get_metadata", "description": ( "Get available filter values (chains, card issuers, data types) in the knowledge base. " "Use this to understand the KB structure before complex searches." ), "inputSchema": { "type": "object", "properties": {}, "required": [] } }, # ============================================================ # Travel Utility Tools (환율, 시간) # ============================================================ { "name": "get_current_time", "description": ( "Get current time for a specific timezone. " "Useful for travel planning and scheduling." ), "inputSchema": { "type": "object", "properties": { "timezone": { "type": "string", "default": "Asia/Seoul", "description": "IANA timezone (e.g., Asia/Seoul, Asia/Tokyo, Europe/Paris)" } }, "required": [] } }, { "name": "get_exchange_rates", "description": ( "Get current exchange rates. Uses Frankfurter API (ECB data) as primary, " "Exchange Rate API as fallback for VND, TWD, etc. " "Always shows the source and timestamp of rates." ), "inputSchema": { "type": "object", "properties": { "base_currency": { "type": "string", "default": "USD", "description": "Base currency code (e.g., USD, EUR, KRW)" }, "target_currencies": { "type": "string", "description": "Comma-separated target currencies (e.g., 'KRW,JPY,VND'). If omitted, returns major travel currencies." } }, "required": [] } }, { "name": "convert_currency", "description": ( "Convert an amount from one currency to another. " "Shows the exchange rate used in the calculation. " "Users can provide a custom_rate to recalculate with a different rate." ), "inputSchema": { "type": "object", "properties": { "amount": { "type": "number", "description": "Amount to convert" }, "from_currency": { "type": "string", "description": "Source currency code (e.g., USD)" }, "to_currency": { "type": "string", "description": "Target currency code (e.g., KRW)" }, "custom_rate": { "type": "number", "description": "Optional: Use this exchange rate instead of API rate" } }, "required": ["amount", "from_currency", "to_currency"] } }, { "name": "get_travel_info", "description": ( "Get comprehensive travel info including current time and exchange rates. " "Useful for quick travel overview." ), "inputSchema": { "type": "object", "properties": { "timezone": { "type": "string", "default": "Asia/Seoul", "description": "Timezone for current time" }, "base_currency": { "type": "string", "default": "USD", "description": "Base currency for exchange rates" } }, "required": [] } }, # ============================================================ # User Tools (User Gate - 사용자 인증/멤버십) # ============================================================ { "name": "user_get_profile", "description": ( "현재 사용자의 프로필과 멤버십 정보를 조회합니다. " "연결되지 않은 경우 auth_url을 반환합니다." ), "inputSchema": { "type": "object", "properties": { "session_token": { "type": "string", "description": "인증 세션 토큰 (이전 인증에서 받은 값)" } }, "required": [] } }, { "name": "user_update_membership", "description": "호텔 체인 멤버십을 등록하거나 수정합니다.", "inputSchema": { "type": "object", "properties": { "chain": { "type": "string", "enum": ["HILTON", "MARRIOTT", "IHG", "ACCOR", "HYATT"], "description": "호텔 체인 코드" }, "tier": { "type": "string", "description": "멤버십 등급 (예: Gold, Platinum, Diamond)" }, "session_token": { "type": "string", "description": "인증 세션 토큰" } }, "required": ["chain", "tier", "session_token"] } }, { "name": "user_delete_membership", "description": "호텔 체인 멤버십을 삭제합니다.", "inputSchema": { "type": "object", "properties": { "chain": { "type": "string", "enum": ["HILTON", "MARRIOTT", "IHG", "ACCOR", "HYATT"], "description": "삭제할 호텔 체인 코드" }, "session_token": { "type": "string", "description": "인증 세션 토큰" } }, "required": ["chain", "session_token"] } }, { "name": "user_request_auth", "description": ( "Magic Link 인증을 요청합니다. " "이메일로 로그인 링크가 발송됩니다." ), "inputSchema": { "type": "object", "properties": { "email": { "type": "string", "format": "email", "description": "인증에 사용할 이메일 주소" } }, "required": ["email"] } }, { "name": "user_verify_code", "description": "Magic Link 인증 후 받은 6자리 코드를 검증합니다.", "inputSchema": { "type": "object", "properties": { "code": { "type": "string", "description": "6자리 인증 코드" } }, "required": ["code"] } }, # ============================================================ # Credit Card Tools (신용카드 관리/혜택 추천) # ============================================================ { "name": "user_add_credit_card", "description": ( "보유 신용카드를 등록합니다. " "등록된 카드를 바탕으로 사용 가능한 크레딧/혜택을 추적하고 추천받을 수 있습니다." ), "inputSchema": { "type": "object", "properties": { "card_id": { "type": "string", "description": ( "카드 고유 ID. 예: AMEX_PLATINUM_US, CHASE_SAPPHIRE_RESERVE. " "지원 발급사: AMEX, CHASE, CITI, CAPITAL_ONE" ) }, "card_name": { "type": "string", "description": "카드 이름 (선택, card_id로 자동 추론 가능)" }, "issuer_code": { "type": "string", "description": "발급사 코드 (선택, card_id로 자동 추론 가능)" }, "region": { "type": "string", "default": "USA", "description": "발급 국가 (기본: USA)" }, "card_open_date": { "type": "string", "description": "카드 개설일 (선택, YYYY-MM-DD)" }, "anniversary_month": { "type": "integer", "minimum": 1, "maximum": 12, "description": "연회비 갱신/청구 월 (선택, 1-12)" }, "annual_fee": { "type": "number", "description": "연회비 금액 (선택)" }, "session_token": { "type": "string", "description": "인증 세션 토큰" } }, "required": ["card_id", "session_token"] } }, { "name": "user_get_credit_cards", "description": "등록된 보유 신용카드 목록을 조회합니다.", "inputSchema": { "type": "object", "properties": { "session_token": { "type": "string", "description": "인증 세션 토큰" } }, "required": ["session_token"] } }, { "name": "user_delete_credit_card", "description": "등록된 신용카드를 삭제합니다.", "inputSchema": { "type": "object", "properties": { "card_id": { "type": "string", "description": "삭제할 카드 ID" }, "session_token": { "type": "string", "description": "인증 세션 토큰" } }, "required": ["card_id", "session_token"] } }, { "name": "user_update_credit_usage", "description": ( "신용카드 크레딧/혜택 사용 기록을 업데이트합니다. " "예: AMEX Platinum FHR 크레딧 $300 사용 완료" ), "inputSchema": { "type": "object", "properties": { "card_id": { "type": "string", "description": "카드 ID (예: AMEX_PLATINUM_US)" }, "benefit_id": { "type": "string", "description": ( "혜택 ID. 예: amex_plat_hotel_credit (FHR), " "amex_plat_saks_credit, amex_plat_uber_cash, " "amex_plat_digital_entertainment, amex_plat_resy_credit" ) }, "amount_used": { "type": "number", "description": "사용 금액 (선택, 미입력 시 전체 한도로 기록)" }, "usage_date": { "type": "string", "description": "사용일 (선택, YYYY-MM-DD, 미입력 시 오늘)" }, "description": { "type": "string", "description": "사용 내역 메모 (선택)" }, "session_token": { "type": "string", "description": "인증 세션 토큰" } }, "required": ["card_id", "benefit_id", "session_token"] } }, { "name": "user_get_credit_recommendations", "description": ( "현재 시점에서 사용해야 할 크레딧/혜택을 추천합니다. " "기간별 마감일이 임박한 혜택을 우선순위로 보여줍니다. " "예: 'AMEX Platinum FHR 상반기 크레딧 $300 - 6월 30일 마감'" ), "inputSchema": { "type": "object", "properties": { "session_token": { "type": "string", "description": "인증 세션 토큰" } }, "required": ["session_token"] } }, { "name": "user_get_credit_usage_summary", "description": ( "특정 카드 또는 전체 카드의 크레딧 사용 현황 요약을 조회합니다. " "각 혜택별 사용/잔여 금액과 비율을 보여줍니다." ), "inputSchema": { "type": "object", "properties": { "card_id": { "type": "string", "description": "특정 카드 ID로 필터링 (선택, 미입력 시 전체 카드)" }, "session_token": { "type": "string", "description": "인증 세션 토큰" } }, "required": ["session_token"] } }, # ============================================================ # Shadow Valuation Tools (포인트/마일리지 가치 평가) # ============================================================ { "name": "user_get_asset_valuation", "description": ( "보유 포인트/마일리지의 현재 가치를 계산합니다. " "사용자의 여행 스타일(PREMIUM/VALUE/CASHBACK)에 따라 동일한 포인트도 다른 가치로 평가됩니다. " "예: AMEX MR 50,000점은 PREMIUM 스타일에서 $1,100, CASHBACK 스타일에서 $300로 평가됩니다." ), "inputSchema": { "type": "object", "properties": { "assets": { "type": "array", "items": { "type": "object", "properties": { "program": { "type": "string", "description": "프로그램 ID (예: AMEX_MR, CHASE_UR, KOREAN_AIR, MARRIOTT_BONVOY)" }, "amount": { "type": "number", "description": "포인트/마일 수량" } }, "required": ["program", "amount"] }, "description": "평가할 자산 목록" }, "style": { "type": "string", "enum": ["PREMIUM", "VALUE", "CASHBACK"], "description": "이번 계산에 사용할 스타일 (선택, 미입력 시 저장된 사용자 스타일 사용)" }, "session_token": { "type": "string", "description": "인증 세션 토큰" } }, "required": ["assets", "session_token"] } }, { "name": "user_update_valuation_style", "description": ( "포인트 가치 평가에 사용할 여행 스타일을 설정합니다. " "PREMIUM: 비즈니스/퍼스트 발권 목표 (높은 가치), " "VALUE: 일반 사용 (표준 가치), " "CASHBACK: 현금성 사용 선호 (낮은 가치)" ), "inputSchema": { "type": "object", "properties": { "style": { "type": "string", "enum": ["PREMIUM", "VALUE", "CASHBACK"], "description": "설정할 여행 스타일" }, "custom_valuations": { "type": "object", "description": ( "개별 프로그램 가치 오버라이드 (선택). " "예: {\"AMEX_MR\": 2.5, \"KOREAN_AIR\": 20}" ) }, "session_token": { "type": "string", "description": "인증 세션 토큰" } }, "required": ["session_token"] } }, { "name": "user_get_valuation_styles", "description": ( "지원하는 여행 스타일과 포인트 프로그램 목록을 조회합니다. " "각 스타일별 예시 가치와 현재 사용자 설정을 확인할 수 있습니다." ), "inputSchema": { "type": "object", "properties": { "session_token": { "type": "string", "description": "인증 세션 토큰" } }, "required": ["session_token"] } }, # ============================================================ # Asset Parser / Calculator / Deeplink Tools # ============================================================ { "name": "user_parse_asset_text", "description": ( "사용자가 입력한 텍스트에서 포인트/마일리지 정보를 자동으로 추출합니다. " "예: '대한항공 45000 마일, AMEX MR 50000점' → 구조화된 자산 목록으로 변환" ), "inputSchema": { "type": "object", "properties": { "text": { "type": "string", "description": ( "파싱할 텍스트. 프로그램명과 숫자를 포함해주세요. " "예: '대한항공 45000\\n아시아나 12000\\nAMEX MR 50000'" ) }, "save_to_profile": { "type": "boolean", "default": False, "description": "파싱된 자산을 프로필에 저장할지 여부 (향후 기능)" }, "session_token": { "type": "string", "description": "인증 세션 토큰" } }, "required": ["text", "session_token"] } }, { "name": "calculate_miles_vs_cashback", "description": ( "마일리지 적립 카드 vs 현금 할인 카드 중 어느 것이 더 유리한지 계산합니다. " "예: 10만원 결제 시 '1000원당 1마일' vs '1.5% 할인' 비교" ), "inputSchema": { "type": "object", "properties": { "amount": { "type": "number", "description": "결제 금액 (원)" }, "mile_rate": { "type": "number", "default": 1, "description": "적립 마일 수 (기본: 1)" }, "mile_per": { "type": "number", "default": 1000, "description": "마일 적립 기준 금액 (원, 기본: 1000)" }, "mile_program": { "type": "string", "default": "KOREAN_AIR", "description": "마일리지 프로그램 (기본: KOREAN_AIR)" }, "discount_percent": { "type": "number", "default": 1.5, "description": "할인율 (%, 기본: 1.5)" }, "session_token": { "type": "string", "description": "인증 세션 토큰" } }, "required": ["amount", "session_token"] } }, { "name": "generate_award_search_link", "description": ( "특가석 검색을 위한 Seats.aero/Point.me URL을 생성합니다. " "도시명(한글/영문)을 공항 코드로 자동 변환합니다. " "예: '서울 → 파리 비즈니스' → 검색 URL 생성" ), "inputSchema": { "type": "object", "properties": { "origin": { "type": "string", "default": "ICN", "description": "출발지 (도시명 또는 공항 코드, 기본: ICN)" }, "destination": { "type": "string", "description": "목적지 (도시명 또는 공항 코드)" }, "date": { "type": "string", "description": "날짜 (YYYY-MM 또는 YYYY-MM-DD 형식, 미입력 시 유연 검색)" }, "cabin_class": { "type": "string", "default": "J", "description": "좌석 클래스 (Y/W/J/F 또는 이코노미/비즈니스/퍼스트)" }, "session_token": { "type": "string", "description": "인증 세션 토큰" } }, "required": ["destination", "session_token"] } } ] # ============================================================ # 1.5 MCP Prompts Definition (클라이언트 AI 추론 가이드) # ============================================================ PROMPTS = [ { "name": "hotel_dining_expert", "description": ( "호텔 다이닝 전문가 모드. 특정 호텔에서 식사할 때 " "최적의 혜택을 받는 방법을 단계별로 안내합니다. " "체인 확인 → 멤버십 혜택 → 등급 획득 전략까지 종합 분석." ), "arguments": [ { "name": "hotel_name", "description": "조회할 호텔명 (예: 콘래드 서울, JW 메리어트 서울)", "required": True }, { "name": "dining_intent", "description": "식사 목적 (예: 조식, 런치, 디너, 라운지)", "required": False } ] }, { "name": "status_match_advisor", "description": ( "Status Match 전략 고문. 원하는 호텔 체인의 엘리트 등급을 " "가장 빠르고 저렴하게 획득하는 방법을 안내합니다. " "신용카드, 유료 멤버십, Status Match 경로를 종합 분석." ), "arguments": [ { "name": "target_chain", "description": "목표 호텔 체인 (예: HILTON, MARRIOTT, IHG, ACCOR)", "required": True }, { "name": "current_status", "description": "현재 보유한 호텔 멤버십/등급 (없으면 생략)", "required": False } ] }, { "name": "hotel_benefit_analyzer", "description": ( "호텔 혜택 분석기. 특정 호텔에서 투숙할 때 " "멤버십 등급별로 받을 수 있는 혜택을 상세히 분석합니다. " "조식, 라운지, 업그레이드, 레이트 체크아웃 등." ), "arguments": [ { "name": "hotel_name", "description": "조회할 호텔명", "required": True }, { "name": "membership_tier", "description": "확인할 멤버십 등급 (예: Gold, Platinum, Diamond)", "required": False } ] } ] def get_prompt_messages(prompt_name: str, arguments: Dict[str, Any]) -> list: """프롬프트별 시스템 메시지 생성""" if prompt_name == "hotel_dining_expert": hotel_name = arguments.get("hotel_name", "호텔") dining_intent = arguments.get("dining_intent", "식사") return [ { "role": "user", "content": { "type": "text", "text": f"""당신은 호텔 다이닝 전문가입니다. 사용자가 {hotel_name}에서 {dining_intent}을 하려고 합니다. 다음 단계에 따라 분석하세요. 각 단계에서 kb_search 또는 kb_get_context 도구를 호출하세요: ## 1단계: 호텔 체인 확인 - "{hotel_name}" 검색으로 어느 체인(힐튼, 메리어트, IHG, 아코르 등) 소속인지 확인하세요. ## 2단계: 다이닝 혜택 검색 - 해당 체인의 멤버십 다이닝 혜택을 검색하세요. - 예: "힐튼 다이닝 할인", "메리어트 레스토랑 혜택" 등 - 할인율, 적립 포인트, 적용 조건 등을 파악하세요. ## 3단계: 등급 획득 전략 (중요!) - 사용자가 해당 체인의 등급이 없을 수 있습니다. - 다음을 검색하세요: a) 신용카드로 자동 부여되는 등급 (예: "아멕스 플래티넘 힐튼 골드") b) Status Match 가능 여부 (예: "IHG ambassador status match 힐튼") c) 유료 멤버십 (예: "IHG 앰버서더", "아코르 플러스") ## 4단계: 종합 답변 - 위 정보를 종합하여 최적의 다이닝 전략을 제안하세요. - "등급이 없다면 ~하는 것을 추천합니다" 형식으로 안내하세요. 지금 분석을 시작하세요.""" } } ] elif prompt_name == "status_match_advisor": target_chain = arguments.get("target_chain", "호텔 체인") current_status = arguments.get("current_status", "없음") return [ { "role": "user", "content": { "type": "text", "text": f"""당신은 호텔 Status Match 전문가입니다. 사용자가 {target_chain}의 엘리트 등급을 획득하려고 합니다. 현재 상태: {current_status} 다음 경로들을 조사하고 최적의 방법을 추천하세요: ## 경로 1: 신용카드 (가장 빠름) - "{target_chain} 신용카드" 검색 - 자동 부여 등급, 연회비, 추가 혜택 파악 ## 경로 2: Status Match - "status match {target_chain}" 검색 - 어떤 체인에서 매칭 가능한지, 조건은 무엇인지 파악 ## 경로 3: 유료 멤버십 → Status Match - 저렴한 유료 멤버십으로 기본 등급 획득 후 매칭하는 방법 - 예: IHG Ambassador($225) → 타 체인 매칭 ## 경로 4: 챌린지/프로모션 - 신규 가입자 프로모션이나 스테이터스 챌린지 검색 각 경로의 비용, 소요 시간, 난이도를 비교하여 최적의 방법을 추천하세요.""" } } ] elif prompt_name == "hotel_benefit_analyzer": hotel_name = arguments.get("hotel_name", "호텔") membership_tier = arguments.get("membership_tier") tier_query = f" {membership_tier}" if membership_tier else "" return [ { "role": "user", "content": { "type": "text", "text": f"""당신은 호텔 혜택 분석 전문가입니다. {hotel_name}의{tier_query} 멤버십 혜택을 분석해주세요. ## 1단계: 호텔 정보 확인 - "{hotel_name}" 검색으로 호텔 상세 정보 확인 - 소속 체인, 브랜드, 레스토랑, 라운지 정보 파악 ## 2단계: 등급별 혜택 검색 - 해당 호텔/체인의 멤버십 등급별 혜택 검색 - 조식, 라운지 접근권, 객실 업그레이드, 레이트 체크아웃 등 ## 3단계: 실제 적용 상황 - 해당 호텔에서 혜택이 실제로 어떻게 적용되는지 확인 - 예: "조식은 ~레스토랑에서 제공", "라운지 이용시간은 ~" 표 형식으로 정리하여 제공하세요.""" } } ] else: return [ { "role": "user", "content": { "type": "text", "text": f"프롬프트 '{prompt_name}'에 대한 정의가 없습니다." } } ] class SessionManager: """MCP 세션 관리자""" def __init__(self): self.sessions: Dict[str, Dict[str, Any]] = {} def create_session(self) -> str: """새 세션 생성""" session_id = str(uuid.uuid4()).replace("-", "") self.sessions[session_id] = { "created_at": datetime.utcnow().isoformat(), "initialized": False, "client_info": None } logger.info(f"Session created: {session_id[:8]}...") return session_id def get_session(self, session_id: str) -> Optional[Dict[str, Any]]: """세션 조회""" return self.sessions.get(session_id) def update_session(self, session_id: str, data: Dict[str, Any]): """세션 업데이트""" if session_id in self.sessions: self.sessions[session_id].update(data) def delete_session(self, session_id: str) -> bool: """세션 삭제""" if session_id in self.sessions: del self.sessions[session_id] logger.info(f"Session deleted: {session_id[:8]}...") return True return False def is_valid(self, session_id: str) -> bool: """세션 유효성 검사""" return session_id in self.sessions session_manager = SessionManager() # ============================================================ # 3. Tool Handler (Lazy Loading) # ============================================================ _handler = None def get_handler(): """Lazy 로딩된 KB Tool Handler""" global _handler if _handler is None: try: from src.mcp.tools import get_handler as _get_handler _handler = _get_handler() logger.info("KB Handler initialized successfully") except Exception as e: logger.error(f"Failed to initialize handler: {e}") raise return _handler def execute_tool(name: str, arguments: Dict[str, Any]) -> Dict[str, Any]: """Tool 실행""" handler = get_handler() if name == "kb_search": return handler.handle_search(**arguments) elif name == "kb_get_document": return handler.handle_get_document(**arguments) elif name == "kb_get_context": return handler.handle_get_context(**arguments) elif name == "kb_get_metadata": return handler.handle_get_metadata() else: return {"success": False, "error": f"Unknown tool: {name}"} async def execute_tool_async(name: str, arguments: Dict[str, Any]) -> Dict[str, Any]: """비동기 Tool 실행 (travel utilities는 async)""" # KB 도구는 동기 실행 if name.startswith("kb_"): loop = asyncio.get_event_loop() return await loop.run_in_executor( None, lambda: execute_tool(name, arguments) ) # Travel utility 도구는 비동기 실행 from src.utils.travel_utils import ( get_current_time, get_exchange_rates, convert_currency, get_travel_info ) if name == "get_current_time": return await get_current_time(**arguments) elif name == "get_exchange_rates": return await get_exchange_rates(**arguments) elif name == "convert_currency": return await convert_currency(**arguments) elif name == "get_travel_info": return await get_travel_info(**arguments) # User 도구 (User Gate) if name.startswith("user_"): from src.auth.tool_handlers import execute_user_tool return await execute_user_tool(name, arguments) return {"success": False, "error": f"Unknown tool: {name}"} # ============================================================ # 4. JSON-RPC Message Handlers # ============================================================ async def handle_initialize(params: Dict[str, Any], session_id: str) -> Dict[str, Any]: """initialize 요청 처리""" client_info = params.get("clientInfo", {}) protocol_version = params.get("protocolVersion", "unknown") logger.info(f"Initialize from {client_info.get('name', 'unknown')} " f"(protocol: {protocol_version})") session_manager.update_session(session_id, { "initialized": True, "client_info": client_info, "protocol_version": protocol_version }) return { "protocolVersion": PROTOCOL_VERSION, "serverInfo": { "name": SERVER_NAME, "version": SERVER_VERSION }, "capabilities": { "tools": {"listChanged": False}, "resources": {"listChanged": False}, "prompts": {"listChanged": False} } } async def handle_tools_list() -> Dict[str, Any]: """tools/list 요청 처리""" logger.info(f"Returning {len(TOOLS)} tools") return {"tools": TOOLS} async def handle_tools_call(params: Dict[str, Any]) -> Dict[str, Any]: """tools/call 요청 처리""" tool_name = params.get("name") arguments = params.get("arguments", {}) logger.info(f"Tool call: {tool_name}") if not tool_name: raise ValueError("Tool name is required") # 비동기 도구 실행 (travel utils는 async, kb는 executor) result = await execute_tool_async(tool_name, arguments) # 응답 크기 체크 (PlayMCP 24k 제한) result_text = json.dumps(result, ensure_ascii=False, indent=2) if len(result_text) > 23000: # 약간의 여유 logger.warning(f"Response size {len(result_text)} may exceed PlayMCP limit") return { "content": [ { "type": "text", "text": result_text } ] } async def handle_resources_list() -> Dict[str, Any]: """resources/list 요청 처리""" return { "resources": [ { "uri": "eodi://kb/stats", "name": "KB Statistics", "description": "Knowledge base statistics", "mimeType": "application/json" } ] } async def handle_prompts_list() -> Dict[str, Any]: """prompts/list 요청 처리""" logger.info(f"Returning {len(PROMPTS)} prompts") return {"prompts": PROMPTS} async def handle_prompts_get(params: Dict[str, Any]) -> Dict[str, Any]: """prompts/get 요청 처리 - 프롬프트 메시지 반환""" prompt_name = params.get("name") arguments = params.get("arguments", {}) logger.info(f"Getting prompt: {prompt_name}") if not prompt_name: raise ValueError("Prompt name is required") # 프롬프트가 존재하는지 확인 prompt_exists = any(p["name"] == prompt_name for p in PROMPTS) if not prompt_exists: raise ValueError(f"Unknown prompt: {prompt_name}") # 프롬프트 메시지 생성 messages = get_prompt_messages(prompt_name, arguments) return { "description": next( (p["description"] for p in PROMPTS if p["name"] == prompt_name), "" ), "messages": messages } async def handle_message(message: Dict[str, Any], session_id: str) -> Optional[Dict[str, Any]]: """JSON-RPC 메시지 처리""" method = message.get("method", "") params = message.get("params", {}) msg_id = message.get("id") logger.info(f"<- {method}") try: if method == "initialize": result = await handle_initialize(params, session_id) elif method == "tools/list": result = await handle_tools_list() elif method == "tools/call": result = await handle_tools_call(params) elif method == "resources/list": result = await handle_resources_list() elif method == "prompts/list": result = await handle_prompts_list() elif method == "prompts/get": result = await handle_prompts_get(params) elif method == "notifications/initialized": # 알림은 응답 없음 return None elif method == "ping": result = {} else: logger.warning(f"Unknown method: {method}") if msg_id is not None: return { "jsonrpc": "2.0", "id": msg_id, "error": { "code": -32601, "message": f"Method not found: {method}" } } return None if msg_id is not None: return { "jsonrpc": "2.0", "id": msg_id, "result": result } return None except Exception as e: logger.error(f"Error handling {method}: {e}") if msg_id is not None: return { "jsonrpc": "2.0", "id": msg_id, "error": { "code": -32603, "message": str(e) } } return None # ============================================================ # 5. HTTP Endpoints (Streamable HTTP Transport) # ============================================================ async def mcp_endpoint(request: Request) -> Response: """ MCP Endpoint - Streamable HTTP Transport POST: JSON-RPC 메시지 수신 GET: SSE 스트림 (서버 → 클라이언트) DELETE: 세션 종료 """ # 세션 ID 확인 session_id = request.headers.get("Mcp-Session-Id") if request.method == "POST": return await handle_post(request, session_id) elif request.method == "GET": return await handle_get(request, session_id) elif request.method == "DELETE": return await handle_delete(request, session_id) elif request.method == "OPTIONS": return Response( status_code=200, headers={ "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Methods": "GET, POST, DELETE, OPTIONS", "Access-Control-Allow-Headers": "*", } ) else: return JSONResponse( {"error": "Method not allowed"}, status_code=405 ) async def handle_post(request: Request, session_id: Optional[str]) -> Response: """POST 요청 처리 - JSON-RPC 메시지""" # Content-Type 확인 content_type = request.headers.get("Content-Type", "") if "application/json" not in content_type: return JSONResponse( {"jsonrpc": "2.0", "error": {"code": -32700, "message": "Invalid content type"}}, status_code=400 ) # Accept 헤더 확인 accept = request.headers.get("Accept", "") wants_sse = "text/event-stream" in accept try: body = await request.json() except json.JSONDecodeError: return JSONResponse( {"jsonrpc": "2.0", "error": {"code": -32700, "message": "Parse error"}}, status_code=400 ) # 배치 요청 처리 is_batch = isinstance(body, list) messages = body if is_batch else [body] # Initialize 요청인지 확인 (세션 생성) has_initialize = any(m.get("method") == "initialize" for m in messages) if has_initialize: # 새 세션 생성 session_id = session_manager.create_session() elif session_id and not session_manager.is_valid(session_id): # 유효하지 않은 세션 return JSONResponse( {"jsonrpc": "2.0", "error": {"code": -32000, "message": "Invalid session"}}, status_code=404 ) elif not session_id and not has_initialize: # 세션 없이 일반 요청 (허용 - 세션 없이도 동작) session_id = session_manager.create_session() # 모든 메시지가 notification/response인지 확인 all_notifications = all( m.get("id") is None or "result" in m or "error" in m for m in messages ) if all_notifications: # notification만 있으면 202 Accepted for msg in messages: await handle_message(msg, session_id) return Response(status_code=202) # 요청 처리 responses = [] for msg in messages: response = await handle_message(msg, session_id) if response: responses.append(response) # 응답 헤더 headers = {"Access-Control-Allow-Origin": "*"} # Initialize 응답에는 세션 ID 포함 if has_initialize and session_id: headers["Mcp-Session-Id"] = session_id # SSE 또는 JSON 응답 if wants_sse and len(responses) > 0: # SSE 스트림으로 응답 async def generate_sse(): for resp in responses: data = json.dumps(resp, ensure_ascii=False) yield f"data: {data}\n\n" return StreamingResponse( generate_sse(), media_type="text/event-stream", headers=headers ) else: # JSON 응답 if is_batch: return JSONResponse(responses, headers=headers) elif responses: return JSONResponse(responses[0], headers=headers) else: return Response(status_code=202, headers=headers) async def handle_get(request: Request, session_id: Optional[str]) -> Response: """GET 요청 처리 - SSE 스트림 (서버 → 클라이언트)""" # Accept 헤더 확인 accept = request.headers.get("Accept", "") if "text/event-stream" not in accept: return JSONResponse( {"error": "Accept header must include text/event-stream"}, status_code=400 ) # 세션 없으면 SSE 미지원 응답 if not session_id or not session_manager.is_valid(session_id): return Response(status_code=405) # Method Not Allowed # SSE 스트림 (현재는 keep-alive만) async def generate_sse(): try: while True: # 서버에서 클라이언트로 보낼 메시지가 있으면 전송 # 현재는 단순 keep-alive yield ": keep-alive\n\n" await asyncio.sleep(30) except asyncio.CancelledError: logger.info(f"SSE stream closed for session {session_id[:8]}...") return StreamingResponse( generate_sse(), media_type="text/event-stream", headers={ "Access-Control-Allow-Origin": "*", "Cache-Control": "no-cache", "Connection": "keep-alive" } ) async def handle_delete(request: Request, session_id: Optional[str]) -> Response: """DELETE 요청 처리 - 세션 종료""" if not session_id: return JSONResponse( {"error": "Session ID required"}, status_code=400 ) if session_manager.delete_session(session_id): return Response(status_code=204) # No Content else: return Response(status_code=404) # Not Found # ============================================================ # 6. Additional Endpoints # ============================================================ async def health_check(request: Request) -> JSONResponse: """Health Check""" return JSONResponse({ "status": "healthy", "service": SERVER_NAME, "version": SERVER_VERSION, "protocol": PROTOCOL_VERSION, "transport": "streamable-http" }) async def root(request: Request) -> JSONResponse: """Root Endpoint""" return JSONResponse({ "name": "Eodi MCP Server", "description": "Travel Benefits Knowledge Base (Hotel Loyalty, Airline Miles, Credit Cards)", "version": SERVER_VERSION, "protocol": PROTOCOL_VERSION, "documentation": "https://github.com/lovelymango/eodi-mcp", "endpoints": { "mcp": "/mcp (Streamable HTTP Transport)", "sse": "/sse (Legacy SSE Transport)", "sse_capabilities": "/sse?mode=capabilities (Standard MCP)", "health": "/health", "openapi": "/openapi.json (ChatGPT GPTs Actions)", "api_search": "/api/search (REST API)", "api_context": "/api/context (REST API)" }, "clients": { "Claude Desktop": "Use /mcp or /sse endpoint", "PlayMCP": "Use /sse endpoint", "ChatGPT GPTs": "Use /openapi.json for Actions setup", "Standard MCP": "Use /sse?mode=capabilities" } }) # ============================================================ # 7. Legacy SSE Endpoint (Backwards Compatibility) # ============================================================ async def legacy_sse_endpoint(request: Request) -> Response: """ 구버전 HTTP+SSE Transport 호환 (2024-11-05) GET /sse → endpoint 이벤트 반환 (PlayMCP/Claude) GET /sse?mode=capabilities → capabilities 이벤트 반환 (범용 MCP 클라이언트) 이후 클라이언트는 /messages/{session_id}로 POST """ if request.method == "GET": session_id = session_manager.create_session() # 클라이언트 모드 감지 mode = request.query_params.get("mode", "").lower() user_agent = request.headers.get("User-Agent", "").lower() # capabilities 모드: 범용 MCP 클라이언트용 (ChatGPT 커넥터 등) use_capabilities = ( mode == "capabilities" or "chatgpt" in user_agent or "openai" in user_agent or mode == "standard" ) async def generate_sse(): if use_capabilities: # MCP 표준 capabilities 이벤트 (범용 호환) capabilities_data = { "protocol": "mcp", "version": "2024-11-05", "serverInfo": { "name": SERVER_NAME, "version": SERVER_VERSION }, "capabilities": { "tools": True }, "tools": [ { "name": tool["name"], "description": tool["description"], "inputSchema": tool["inputSchema"] } for tool in TOOLS ], "session_id": session_id, "messages_endpoint": f"/messages/?session_id={session_id}" } yield f"event: capabilities\ndata: {json.dumps(capabilities_data)}\n\n" else: # PlayMCP/Claude 호환 endpoint 이벤트 yield f"event: endpoint\ndata: /messages/?session_id={session_id}\n\n" # 연결 유지 try: while True: yield ": keep-alive\n\n" await asyncio.sleep(30) except asyncio.CancelledError: pass return StreamingResponse( generate_sse(), media_type="text/event-stream", headers={ "Access-Control-Allow-Origin": "*", "Cache-Control": "no-cache" } ) elif request.method == "POST": # Streamable HTTP로 리다이렉트 return await handle_post(request, request.headers.get("Mcp-Session-Id")) elif request.method == "OPTIONS": return Response( status_code=200, headers={ "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Methods": "GET, POST, OPTIONS", "Access-Control-Allow-Headers": "*", } ) else: return Response(status_code=405) async def legacy_messages_endpoint(request: Request) -> Response: """구버전 /messages 엔드포인트""" session_id = request.query_params.get("session_id") if request.method == "POST": return await handle_post(request, session_id) elif request.method == "OPTIONS": return Response( status_code=200, headers={ "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Methods": "POST, OPTIONS", "Access-Control-Allow-Headers": "*", } ) else: return Response(status_code=405) # ============================================================ # 8. ChatGPT GPTs Actions Support (OpenAPI) # ============================================================ def generate_openapi_schema() -> dict: """Generate OpenAPI 3.1.0 schema for ChatGPT GPTs Actions""" return { "openapi": "3.1.0", "info": { "title": "Eodi Travel Benefits API", "description": "Search hotel loyalty programs, airline miles, credit card benefits, and travel deals. Supports Korean and English queries.", "version": SERVER_VERSION }, "servers": [ {"url": "https://lovelymango-eodi-mcp.hf.space"} ], "paths": { "/api/search": { "post": { "operationId": "searchKnowledgeBase", "summary": "Search travel benefits knowledge base", "description": "Vector search for hotel loyalty, airline miles, credit cards, and travel deals.", "x-openai-isConsequential": False, "requestBody": { "required": True, "content": { "application/json": { "schema": { "type": "object", "required": ["query"], "properties": { "query": { "type": "string", "description": "Search query in Korean or English" }, "domain": { "type": "string", "enum": ["hotel", "airline", "card", "news", "all"], "description": "Filter by domain: hotel, airline, card, news, or all" }, "chain": { "type": "string", "description": "Filter by chain (MARRIOTT, HILTON, IHG, HYATT, ACCOR)" }, "limit": { "type": "integer", "default": 5, "maximum": 10, "description": "Max results to return (1-10)" } } } } } }, "responses": { "200": { "description": "Search results with matching chunks", "content": { "application/json": { "schema": { "type": "object", "properties": { "success": {"type": "boolean"}, "results": {"type": "array"}, "total": {"type": "integer"} } } } } } } } }, "/api/context": { "post": { "operationId": "getContext", "summary": "Get context for a travel benefits question", "description": "Retrieve relevant context chunks to answer a question about travel benefits.", "x-openai-isConsequential": False, "requestBody": { "required": True, "content": { "application/json": { "schema": { "type": "object", "required": ["question"], "properties": { "question": { "type": "string", "description": "User question about travel benefits in Korean or English" }, "domain": { "type": "string", "enum": ["hotel", "airline", "card", "news", "all"], "description": "Filter by domain" }, "chain": { "type": "string", "description": "Filter by chain/brand" }, "limit": { "type": "integer", "default": 5, "maximum": 10, "description": "Number of context chunks (1-10)" } } } } } }, "responses": { "200": { "description": "Context chunks for answering the question", "content": { "application/json": { "schema": { "type": "object", "properties": { "success": {"type": "boolean"}, "context_chunks": {"type": "array"}, "question": {"type": "string"} } } } } } } } } } } async def openapi_schema(request: Request) -> JSONResponse: """OpenAPI schema for ChatGPT GPTs Actions""" return JSONResponse(generate_openapi_schema()) async def api_search(request: Request) -> JSONResponse: """REST API for kb_search (ChatGPT Actions compatible)""" try: body = await request.json() handler = get_handler() result = handler.handle_search( query=body.get("query", ""), domain=body.get("domain"), chain=body.get("chain"), limit=min(body.get("limit", 5), 10) # Max 10 for Actions ) return JSONResponse(result) except Exception as e: return JSONResponse({"success": False, "error": str(e)}, status_code=500) async def api_context(request: Request) -> JSONResponse: """REST API for kb_get_context (ChatGPT Actions compatible)""" try: body = await request.json() handler = get_handler() result = handler.handle_get_context( question=body.get("question", ""), domain=body.get("domain"), chain=body.get("chain"), limit=min(body.get("limit", 5), 10) ) return JSONResponse(result) except Exception as e: return JSONResponse({"success": False, "error": str(e)}, status_code=500) # ============================================================ # 9. Application Setup # ============================================================ routes = [ # 메인 엔드포인트 Route("/", root), Route("/health", health_check), # OpenAPI schema for ChatGPT GPTs Actions Route("/openapi.json", openapi_schema), Route("/.well-known/openapi.json", openapi_schema), # REST API for ChatGPT GPTs Actions Route("/api/search", api_search, methods=["POST", "OPTIONS"]), Route("/api/context", api_context, methods=["POST", "OPTIONS"]), # Streamable HTTP Transport (2025-03-26) Route("/mcp", mcp_endpoint, methods=["GET", "POST", "DELETE", "OPTIONS"]), # Legacy HTTP+SSE Transport (2024-11-05) - 하위 호환 Route("/sse", legacy_sse_endpoint, methods=["GET", "POST", "OPTIONS"]), Route("/messages", legacy_messages_endpoint, methods=["POST", "OPTIONS"]), Route("/messages/", legacy_messages_endpoint, methods=["POST", "OPTIONS"]), ] # Auth routes 확장 (User Gate) try: from src.auth.routes import auth_routes routes.extend(auth_routes) logger.info(f"Auth routes loaded: {len(auth_routes)} endpoints") except ImportError as e: logger.warning(f"Auth routes not loaded: {e}") middleware = [ Middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ), # Rate Limiting (스크래핑 방어) Middleware(RateLimitMiddleware), ] app = Starlette( routes=routes, middleware=middleware, debug=False ) # ============================================================ # 9. Startup # ============================================================ @app.on_event("startup") async def startup(): logger.info(f"🚀 Eodi MCP Server v{SERVER_VERSION} starting...") logger.info(f" Protocol: {PROTOCOL_VERSION}") logger.info(f" Transport: Streamable HTTP + Legacy SSE") logger.info(f" SUPABASE_URL: {'set' if os.getenv('SUPABASE_URL') else 'NOT SET'}") if __name__ == "__main__": import uvicorn uvicorn.run( "server_streamable:app", host="0.0.0.0", port=7860, proxy_headers=True, forwarded_allow_ips="*" )