Spaces:
Running
Running
| """ | |
| User Tool Handlers | |
| ================== | |
| user_* MCP Tool์ ์ค์ ์คํ ๋ก์ง. | |
| server_streamable.py์ execute_tool_async์์ ํธ์ถ๋ฉ๋๋ค. | |
| """ | |
| import logging | |
| from typing import Dict, Any | |
| from .session_manager import get_user_session_manager | |
| from .magic_link import get_magic_link_auth | |
| from .config import SUPPORTED_CHAINS, TIER_ALIASES, SERVER_BASE_URL | |
| logger = logging.getLogger("eodi.auth.tool_handlers") | |
| async def execute_user_tool(name: str, arguments: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| User Tool ์คํ ๋ผ์ฐํฐ. | |
| Args: | |
| name: Tool ์ด๋ฆ (user_*) | |
| arguments: Tool ์ธ์ | |
| Returns: | |
| Tool ์คํ ๊ฒฐ๊ณผ | |
| """ | |
| # ๊ธฐ์กด ์ธ์ฆ/๋ฉค๋ฒ์ญ ๊ด๋ จ Tool | |
| if name == "user_get_profile": | |
| return await handle_get_profile(arguments) | |
| elif name == "user_update_membership": | |
| return await handle_update_membership(arguments) | |
| elif name == "user_delete_membership": | |
| return await handle_delete_membership(arguments) | |
| elif name == "user_request_auth": | |
| return await handle_request_auth(arguments) | |
| elif name == "user_verify_code": | |
| return await handle_verify_code(arguments) | |
| # ์ ์ฉ์นด๋ ๊ด๋ จ Tool (์ธ์ฆ ํ์) | |
| elif name in [ | |
| "user_add_credit_card", | |
| "user_get_credit_cards", | |
| "user_delete_credit_card", | |
| "user_update_credit_usage", | |
| "user_get_credit_recommendations", | |
| "user_get_credit_usage_summary" | |
| ]: | |
| return await _handle_credit_card_tool(name, arguments) | |
| # ๊ฐ์น ํ๊ฐ ๊ด๋ จ Tool (์ธ์ฆ ํ์) | |
| elif name in [ | |
| "user_get_asset_valuation", | |
| "user_update_valuation_style", | |
| "user_get_valuation_styles", | |
| "user_parse_asset_text", | |
| "calculate_miles_vs_cashback", | |
| "generate_award_search_link" | |
| ]: | |
| return await _handle_valuation_tool(name, arguments) | |
| else: | |
| return {"success": False, "error": f"Unknown user tool: {name}"} | |
| async def _handle_credit_card_tool(name: str, arguments: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| ์ ์ฉ์นด๋ ๊ด๋ จ Tool์ ์ํ ๊ณตํต ๋ํผ. | |
| ์ธ์ ๊ฒ์ฆ ํ ํด๋น ํธ๋ค๋ฌ ํธ์ถ. | |
| """ | |
| from .credit_card_handlers import ( | |
| handle_add_credit_card, | |
| handle_get_credit_cards, | |
| handle_delete_credit_card, | |
| handle_update_credit_usage, | |
| handle_get_credit_recommendations, | |
| handle_get_credit_usage_summary | |
| ) | |
| session_token = arguments.get("session_token") | |
| user_session_manager = get_user_session_manager() | |
| # ์ธ์ฆ ํ์ธ | |
| if not session_token: | |
| return { | |
| "success": False, | |
| "error": "session_token์ด ํ์ํฉ๋๋ค. ๋จผ์ user_get_profile์ ํธ์ถํ์ฌ ์ธ์ฆ ์ํ๋ฅผ ํ์ธํ์ธ์.", | |
| "auth_url": f"{SERVER_BASE_URL}/auth/login" | |
| } | |
| session = user_session_manager.get_session(session_token) | |
| if not session: | |
| return { | |
| "success": False, | |
| "error": "์ธ์ ์ด ๋ง๋ฃ๋์์ต๋๋ค. ๋ค์ ๋ก๊ทธ์ธํด์ฃผ์ธ์.", | |
| "auth_url": f"{SERVER_BASE_URL}/auth/login" | |
| } | |
| user_id = session.user_id | |
| # ๊ฐ ํธ๋ค๋ฌ ํธ์ถ | |
| if name == "user_add_credit_card": | |
| return await handle_add_credit_card(arguments, user_id) | |
| elif name == "user_get_credit_cards": | |
| return await handle_get_credit_cards(arguments, user_id) | |
| elif name == "user_delete_credit_card": | |
| return await handle_delete_credit_card(arguments, user_id) | |
| elif name == "user_update_credit_usage": | |
| return await handle_update_credit_usage(arguments, user_id) | |
| elif name == "user_get_credit_recommendations": | |
| return await handle_get_credit_recommendations(arguments, user_id) | |
| elif name == "user_get_credit_usage_summary": | |
| return await handle_get_credit_usage_summary(arguments, user_id) | |
| else: | |
| return {"success": False, "error": f"Unknown credit card tool: {name}"} | |
| async def _handle_valuation_tool(name: str, arguments: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| ๊ฐ์น ํ๊ฐ ๊ด๋ จ Tool์ ์ํ ๊ณตํต ๋ํผ. | |
| ์ธ์ ๊ฒ์ฆ ํ ํด๋น ํธ๋ค๋ฌ ํธ์ถ. | |
| """ | |
| from .valuation_handlers import ( | |
| handle_get_asset_valuation, | |
| handle_update_valuation_style, | |
| handle_get_valuation_styles, | |
| handle_calculate_miles_vs_cashback, | |
| handle_generate_award_search_link | |
| ) | |
| from .asset_parser import handle_parse_asset_text | |
| session_token = arguments.get("session_token") | |
| user_session_manager = get_user_session_manager() | |
| # ์ธ์ฆ ํ์ธ | |
| if not session_token: | |
| return { | |
| "success": False, | |
| "error": "session_token์ด ํ์ํฉ๋๋ค. ๋จผ์ user_get_profile์ ํธ์ถํ์ฌ ์ธ์ฆ ์ํ๋ฅผ ํ์ธํ์ธ์.", | |
| "auth_url": f"{SERVER_BASE_URL}/auth/login" | |
| } | |
| session = user_session_manager.get_session(session_token) | |
| if not session: | |
| return { | |
| "success": False, | |
| "error": "์ธ์ ์ด ๋ง๋ฃ๋์์ต๋๋ค. ๋ค์ ๋ก๊ทธ์ธํด์ฃผ์ธ์.", | |
| "auth_url": f"{SERVER_BASE_URL}/auth/login" | |
| } | |
| user_id = session.user_id | |
| # ๊ฐ ํธ๋ค๋ฌ ํธ์ถ | |
| if name == "user_get_asset_valuation": | |
| return await handle_get_asset_valuation(arguments, user_id) | |
| elif name == "user_update_valuation_style": | |
| return await handle_update_valuation_style(arguments, user_id) | |
| elif name == "user_get_valuation_styles": | |
| return await handle_get_valuation_styles(arguments, user_id) | |
| elif name == "user_parse_asset_text": | |
| return await handle_parse_asset_text(arguments, user_id) | |
| elif name == "calculate_miles_vs_cashback": | |
| return await handle_calculate_miles_vs_cashback(arguments, user_id) | |
| elif name == "generate_award_search_link": | |
| return await handle_generate_award_search_link(arguments, user_id) | |
| else: | |
| return {"success": False, "error": f"Unknown valuation tool: {name}"} | |
| async def handle_get_profile(arguments: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| user_get_profile Tool ํธ๋ค๋ฌ. | |
| ์ธ์ ํ ํฐ์ด ์์ผ๋ฉด ํ๋กํ ๋ฐํ, ์์ผ๋ฉด ์ธ์ฆ URL ๋ฐํ. | |
| """ | |
| session_token = arguments.get("session_token") | |
| user_session_manager = get_user_session_manager() | |
| if not session_token: | |
| # ์ธ์ฆ๋์ง ์์ โ ์ธ์ฆ URL ์๋ด | |
| return { | |
| "success": True, | |
| "connected": False, | |
| "auth_url": f"{SERVER_BASE_URL}/auth/login", | |
| "message": "Eodi์ ์ฐ๊ฒฐ๋์ง ์์์ต๋๋ค. auth_url์์ ๋ก๊ทธ์ธํ๊ฑฐ๋, user_request_auth๋ก ์ด๋ฉ์ผ ์ธ์ฆ์ ์์ํ์ธ์." | |
| } | |
| # ์ธ์ ๊ฒ์ฆ | |
| session = user_session_manager.get_session(session_token) | |
| if not session: | |
| return { | |
| "success": True, | |
| "connected": False, | |
| "auth_url": f"{SERVER_BASE_URL}/auth/login", | |
| "message": "์ธ์ ์ด ๋ง๋ฃ๋์์ต๋๋ค. ๋ค์ ๋ก๊ทธ์ธํด์ฃผ์ธ์." | |
| } | |
| # ๋ฉค๋ฒ์ญ ์กฐํ | |
| try: | |
| from src.db.supabase_adapter import SupabaseAdapter | |
| adapter = SupabaseAdapter() | |
| memberships = adapter.get_user_memberships(session.user_id) | |
| profile = adapter.get_user_profile(session.user_id) | |
| except Exception as e: | |
| logger.error(f"ํ๋กํ ์กฐํ ์ค๋ฅ: {e}") | |
| memberships = [] | |
| profile = None | |
| return { | |
| "success": True, | |
| "connected": True, | |
| "user_id": session.user_id, | |
| "profile": { | |
| "email_masked": session.email_masked, | |
| "preferred_airports": profile.get("preferred_airports", []) if profile else [], | |
| "created_at": profile.get("created_at") if profile else None, | |
| }, | |
| "memberships": [ | |
| { | |
| "chain": m["chain"], | |
| "tier": m["tier"], | |
| "expires_at": m.get("expires_at"), | |
| } | |
| for m in memberships | |
| ], | |
| "memberships_count": len(memberships) | |
| } | |
| async def handle_update_membership(arguments: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| user_update_membership Tool ํธ๋ค๋ฌ. | |
| """ | |
| session_token = arguments.get("session_token") | |
| chain = arguments.get("chain", "").upper() | |
| tier = arguments.get("tier", "") | |
| user_session_manager = get_user_session_manager() | |
| # ์ธ์ฆ ํ์ธ | |
| if not session_token: | |
| return { | |
| "success": False, | |
| "error": "session_token์ด ํ์ํฉ๋๋ค. ๋จผ์ user_get_profile์ ํธ์ถํ์ฌ ์ธ์ฆ ์ํ๋ฅผ ํ์ธํ์ธ์." | |
| } | |
| session = user_session_manager.get_session(session_token) | |
| if not session: | |
| return { | |
| "success": False, | |
| "error": "์ธ์ ์ด ๋ง๋ฃ๋์์ต๋๋ค. ๋ค์ ๋ก๊ทธ์ธํด์ฃผ์ธ์.", | |
| "auth_url": f"{SERVER_BASE_URL}/auth/login" | |
| } | |
| # ์ฒด์ธ ์ ํจ์ฑ | |
| if chain not in SUPPORTED_CHAINS: | |
| return { | |
| "success": False, | |
| "error": f"์ง์ํ์ง ์๋ ์ฒด์ธ์ ๋๋ค: {chain}", | |
| "supported_chains": list(SUPPORTED_CHAINS.keys()) | |
| } | |
| # ๋ฑ๊ธ ์ ๊ทํ | |
| tier_normalized = TIER_ALIASES.get(tier, tier) | |
| # ๋ฑ๊ธ ์ ํจ์ฑ | |
| valid_tiers = SUPPORTED_CHAINS[chain] | |
| tier_lower_map = {t.lower(): t for t in valid_tiers} | |
| if tier_normalized.lower() not in tier_lower_map: | |
| return { | |
| "success": False, | |
| "error": f"{chain}์ ์ ํจํ์ง ์์ ๋ฑ๊ธ์ ๋๋ค: {tier}", | |
| "valid_tiers": valid_tiers | |
| } | |
| tier_final = tier_lower_map[tier_normalized.lower()] | |
| # ๋ฉค๋ฒ์ญ ์ ์ฅ | |
| try: | |
| from src.db.supabase_adapter import SupabaseAdapter | |
| adapter = SupabaseAdapter() | |
| # ๊ธฐ์กด ๋ฉค๋ฒ์ญ ํ์ธ | |
| existing = adapter.get_user_memberships(session.user_id) | |
| is_update = any(m["chain"] == chain for m in existing) | |
| result = adapter.upsert_membership(session.user_id, chain, tier_final) | |
| if result: | |
| action = "updated" if is_update else "created" | |
| chain_names = { | |
| "HILTON": "ํํผ", | |
| "MARRIOTT": "๋ฉ๋ฆฌ์ดํธ", | |
| "IHG": "IHG", | |
| "ACCOR": "์์ฝ๋ฅด", | |
| "HYATT": "ํ์ํธ", | |
| } | |
| chain_name = chain_names.get(chain, chain) | |
| return { | |
| "success": True, | |
| "action": action, | |
| "membership": { | |
| "chain": chain, | |
| "tier": tier_final, | |
| }, | |
| "message": f"{chain_name} {tier_final} ๋ฑ๊ธ์ด {'์์ ' if is_update else '๋ฑ๋ก'}๋์์ต๋๋ค." | |
| } | |
| else: | |
| return { | |
| "success": False, | |
| "error": "๋ฉค๋ฒ์ญ ์ ์ฅ์ ์คํจํ์ต๋๋ค. ์ ์ ํ ๋ค์ ์๋ํด์ฃผ์ธ์." | |
| } | |
| except Exception as e: | |
| logger.error(f"๋ฉค๋ฒ์ญ ์ ์ฅ ์ค๋ฅ: {e}") | |
| return { | |
| "success": False, | |
| "error": f"๋ฉค๋ฒ์ญ ์ ์ฅ ์ค ์ค๋ฅ๊ฐ ๋ฐ์ํ์ต๋๋ค: {e}" | |
| } | |
| async def handle_delete_membership(arguments: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| user_delete_membership Tool ํธ๋ค๋ฌ. | |
| """ | |
| session_token = arguments.get("session_token") | |
| chain = arguments.get("chain", "").upper() | |
| user_session_manager = get_user_session_manager() | |
| # ์ธ์ฆ ํ์ธ | |
| if not session_token: | |
| return { | |
| "success": False, | |
| "error": "session_token์ด ํ์ํฉ๋๋ค." | |
| } | |
| session = user_session_manager.get_session(session_token) | |
| if not session: | |
| return { | |
| "success": False, | |
| "error": "์ธ์ ์ด ๋ง๋ฃ๋์์ต๋๋ค. ๋ค์ ๋ก๊ทธ์ธํด์ฃผ์ธ์." | |
| } | |
| # ์ฒด์ธ ์ ํจ์ฑ | |
| if chain not in SUPPORTED_CHAINS: | |
| return { | |
| "success": False, | |
| "error": f"์ง์ํ์ง ์๋ ์ฒด์ธ์ ๋๋ค: {chain}" | |
| } | |
| # ๋ฉค๋ฒ์ญ ์ญ์ | |
| try: | |
| from src.db.supabase_adapter import SupabaseAdapter | |
| adapter = SupabaseAdapter() | |
| if adapter.delete_membership(session.user_id, chain): | |
| chain_names = { | |
| "HILTON": "ํํผ", | |
| "MARRIOTT": "๋ฉ๋ฆฌ์ดํธ", | |
| "IHG": "IHG", | |
| "ACCOR": "์์ฝ๋ฅด", | |
| "HYATT": "ํ์ํธ", | |
| } | |
| chain_name = chain_names.get(chain, chain) | |
| return { | |
| "success": True, | |
| "message": f"{chain_name} ๋ฉค๋ฒ์ญ์ด ์ญ์ ๋์์ต๋๋ค." | |
| } | |
| else: | |
| return { | |
| "success": False, | |
| "error": "๋ฉค๋ฒ์ญ ์ญ์ ์ ์คํจํ์ต๋๋ค." | |
| } | |
| except Exception as e: | |
| logger.error(f"๋ฉค๋ฒ์ญ ์ญ์ ์ค๋ฅ: {e}") | |
| return { | |
| "success": False, | |
| "error": f"๋ฉค๋ฒ์ญ ์ญ์ ์ค ์ค๋ฅ๊ฐ ๋ฐ์ํ์ต๋๋ค: {e}" | |
| } | |
| async def handle_request_auth(arguments: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| user_request_auth Tool ํธ๋ค๋ฌ. | |
| ์ด๋ฉ์ผ๋ก Magic Link ๋ฐ์ก. | |
| """ | |
| email = arguments.get("email", "").strip().lower() | |
| magic_link_auth = get_magic_link_auth() | |
| if not email: | |
| return { | |
| "success": False, | |
| "error": "์ด๋ฉ์ผ ์ฃผ์๋ฅผ ์ ๋ ฅํด์ฃผ์ธ์." | |
| } | |
| # ๊ฐ๋จํ ์ด๋ฉ์ผ ํ์ ๊ฒ์ฆ | |
| if "@" not in email or "." not in email: | |
| return { | |
| "success": False, | |
| "error": "์ฌ๋ฐ๋ฅธ ์ด๋ฉ์ผ ํ์์ด ์๋๋๋ค." | |
| } | |
| success, error = await magic_link_auth.send_magic_link(email) | |
| if success: | |
| # ์ด๋ฉ์ผ ๋ง์คํน | |
| local, domain = email.split("@", 1) | |
| if len(local) <= 2: | |
| masked = local[0] + "*" | |
| else: | |
| masked = local[0] + "*" * (len(local) - 2) + local[-1] | |
| email_masked = f"{masked}@{domain}" | |
| return { | |
| "success": True, | |
| "message": f"{email_masked}์ผ๋ก ๋ก๊ทธ์ธ ๋งํฌ๋ฅผ ๋ณด๋์ต๋๋ค.\n\n๐ง ์ด๋ฉ์ผ์์ ๋งํฌ๋ฅผ ํด๋ฆญํ ํ,\nํ๋ฉด์ ํ์๋๋ 6์๋ฆฌ ์ฝ๋๋ฅผ user_verify_code๋ก ์ ๋ ฅํด์ฃผ์ธ์.", | |
| "next_step": "user_verify_code" | |
| } | |
| else: | |
| return { | |
| "success": False, | |
| "error": error or "์ด๋ฉ์ผ ๋ฐ์ก์ ์คํจํ์ต๋๋ค." | |
| } | |
| async def handle_verify_code(arguments: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| user_verify_code Tool ํธ๋ค๋ฌ. | |
| 6์๋ฆฌ ์ธ์ฆ ์ฝ๋ ๊ฒ์ฆ. | |
| """ | |
| code = arguments.get("code", "").strip() | |
| magic_link_auth = get_magic_link_auth() | |
| if not code: | |
| return { | |
| "success": False, | |
| "error": "์ธ์ฆ ์ฝ๋๋ฅผ ์ ๋ ฅํด์ฃผ์ธ์." | |
| } | |
| # ์ฝ๋ ๊ธธ์ด ๊ฒ์ฆ | |
| code_clean = code.replace(" ", "").replace("-", "") | |
| if len(code_clean) != 6 or not code_clean.isdigit(): | |
| return { | |
| "success": False, | |
| "error": "6์๋ฆฌ ์ซ์ ์ฝ๋๋ฅผ ์ ๋ ฅํด์ฃผ์ธ์." | |
| } | |
| # client_ip๋ MCP Tool์์ ์ง์ ์ ๊ทผ ๋ถ๊ฐ โ ๊ธฐ๋ณธ๊ฐ ์ฌ์ฉ | |
| success, session_token, email_masked, error = magic_link_auth.verify_auth_code( | |
| code_clean, "mcp_client" | |
| ) | |
| if success: | |
| return { | |
| "success": True, | |
| "connected": True, | |
| "session_token": session_token, | |
| "email_masked": email_masked, | |
| "message": f"โ ์ธ์ฆ ์๋ฃ! {email_masked}\n\n์ด์ ๋ฉค๋ฒ์ญ์ ๋ฑ๋กํด๋ณผ๊น์?\n์ด๋ค ํธํ ์ฒด์ธ ๋ฉค๋ฒ์ญ์ ๊ฐ์ง๊ณ ๊ณ์ธ์?" | |
| } | |
| else: | |
| return { | |
| "success": False, | |
| "error": error | |
| } | |