eodi-mcp / src /auth /tool_handlers.py
lovelymango's picture
Upload 25 files
978996e verified
"""
User Tool Handlers
==================
user_* MCP Tool์˜ ์‹ค์ œ ์‹คํ–‰ ๋กœ์ง.
server_streamable.py์˜ execute_tool_async์—์„œ ํ˜ธ์ถœ๋ฉ๋‹ˆ๋‹ค.
"""
import logging
from typing import Dict, Any
from .session_manager import get_user_session_manager
from .magic_link import get_magic_link_auth
from .config import SUPPORTED_CHAINS, TIER_ALIASES, SERVER_BASE_URL
logger = logging.getLogger("eodi.auth.tool_handlers")
async def execute_user_tool(name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""
User Tool ์‹คํ–‰ ๋ผ์šฐํ„ฐ.
Args:
name: Tool ์ด๋ฆ„ (user_*)
arguments: Tool ์ธ์ž
Returns:
Tool ์‹คํ–‰ ๊ฒฐ๊ณผ
"""
# ๊ธฐ์กด ์ธ์ฆ/๋ฉค๋ฒ„์‹ญ ๊ด€๋ จ Tool
if name == "user_get_profile":
return await handle_get_profile(arguments)
elif name == "user_update_membership":
return await handle_update_membership(arguments)
elif name == "user_delete_membership":
return await handle_delete_membership(arguments)
elif name == "user_request_auth":
return await handle_request_auth(arguments)
elif name == "user_verify_code":
return await handle_verify_code(arguments)
# ์‹ ์šฉ์นด๋“œ ๊ด€๋ จ Tool (์ธ์ฆ ํ•„์š”)
elif name in [
"user_add_credit_card",
"user_get_credit_cards",
"user_delete_credit_card",
"user_update_credit_usage",
"user_get_credit_recommendations",
"user_get_credit_usage_summary"
]:
return await _handle_credit_card_tool(name, arguments)
# ๊ฐ€์น˜ ํ‰๊ฐ€ ๊ด€๋ จ Tool (์ธ์ฆ ํ•„์š”)
elif name in [
"user_get_asset_valuation",
"user_update_valuation_style",
"user_get_valuation_styles",
"user_parse_asset_text",
"calculate_miles_vs_cashback",
"generate_award_search_link"
]:
return await _handle_valuation_tool(name, arguments)
else:
return {"success": False, "error": f"Unknown user tool: {name}"}
async def _handle_credit_card_tool(name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""
์‹ ์šฉ์นด๋“œ ๊ด€๋ จ Tool์„ ์œ„ํ•œ ๊ณตํ†ต ๋ž˜ํผ.
์„ธ์…˜ ๊ฒ€์ฆ ํ›„ ํ•ด๋‹น ํ•ธ๋“ค๋Ÿฌ ํ˜ธ์ถœ.
"""
from .credit_card_handlers import (
handle_add_credit_card,
handle_get_credit_cards,
handle_delete_credit_card,
handle_update_credit_usage,
handle_get_credit_recommendations,
handle_get_credit_usage_summary
)
session_token = arguments.get("session_token")
user_session_manager = get_user_session_manager()
# ์ธ์ฆ ํ™•์ธ
if not session_token:
return {
"success": False,
"error": "session_token์ด ํ•„์š”ํ•ฉ๋‹ˆ๋‹ค. ๋จผ์ € user_get_profile์„ ํ˜ธ์ถœํ•˜์—ฌ ์ธ์ฆ ์ƒํƒœ๋ฅผ ํ™•์ธํ•˜์„ธ์š”.",
"auth_url": f"{SERVER_BASE_URL}/auth/login"
}
session = user_session_manager.get_session(session_token)
if not session:
return {
"success": False,
"error": "์„ธ์…˜์ด ๋งŒ๋ฃŒ๋˜์—ˆ์Šต๋‹ˆ๋‹ค. ๋‹ค์‹œ ๋กœ๊ทธ์ธํ•ด์ฃผ์„ธ์š”.",
"auth_url": f"{SERVER_BASE_URL}/auth/login"
}
user_id = session.user_id
# ๊ฐ ํ•ธ๋“ค๋Ÿฌ ํ˜ธ์ถœ
if name == "user_add_credit_card":
return await handle_add_credit_card(arguments, user_id)
elif name == "user_get_credit_cards":
return await handle_get_credit_cards(arguments, user_id)
elif name == "user_delete_credit_card":
return await handle_delete_credit_card(arguments, user_id)
elif name == "user_update_credit_usage":
return await handle_update_credit_usage(arguments, user_id)
elif name == "user_get_credit_recommendations":
return await handle_get_credit_recommendations(arguments, user_id)
elif name == "user_get_credit_usage_summary":
return await handle_get_credit_usage_summary(arguments, user_id)
else:
return {"success": False, "error": f"Unknown credit card tool: {name}"}
async def _handle_valuation_tool(name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""
๊ฐ€์น˜ ํ‰๊ฐ€ ๊ด€๋ จ Tool์„ ์œ„ํ•œ ๊ณตํ†ต ๋ž˜ํผ.
์„ธ์…˜ ๊ฒ€์ฆ ํ›„ ํ•ด๋‹น ํ•ธ๋“ค๋Ÿฌ ํ˜ธ์ถœ.
"""
from .valuation_handlers import (
handle_get_asset_valuation,
handle_update_valuation_style,
handle_get_valuation_styles,
handle_calculate_miles_vs_cashback,
handle_generate_award_search_link
)
from .asset_parser import handle_parse_asset_text
session_token = arguments.get("session_token")
user_session_manager = get_user_session_manager()
# ์ธ์ฆ ํ™•์ธ
if not session_token:
return {
"success": False,
"error": "session_token์ด ํ•„์š”ํ•ฉ๋‹ˆ๋‹ค. ๋จผ์ € user_get_profile์„ ํ˜ธ์ถœํ•˜์—ฌ ์ธ์ฆ ์ƒํƒœ๋ฅผ ํ™•์ธํ•˜์„ธ์š”.",
"auth_url": f"{SERVER_BASE_URL}/auth/login"
}
session = user_session_manager.get_session(session_token)
if not session:
return {
"success": False,
"error": "์„ธ์…˜์ด ๋งŒ๋ฃŒ๋˜์—ˆ์Šต๋‹ˆ๋‹ค. ๋‹ค์‹œ ๋กœ๊ทธ์ธํ•ด์ฃผ์„ธ์š”.",
"auth_url": f"{SERVER_BASE_URL}/auth/login"
}
user_id = session.user_id
# ๊ฐ ํ•ธ๋“ค๋Ÿฌ ํ˜ธ์ถœ
if name == "user_get_asset_valuation":
return await handle_get_asset_valuation(arguments, user_id)
elif name == "user_update_valuation_style":
return await handle_update_valuation_style(arguments, user_id)
elif name == "user_get_valuation_styles":
return await handle_get_valuation_styles(arguments, user_id)
elif name == "user_parse_asset_text":
return await handle_parse_asset_text(arguments, user_id)
elif name == "calculate_miles_vs_cashback":
return await handle_calculate_miles_vs_cashback(arguments, user_id)
elif name == "generate_award_search_link":
return await handle_generate_award_search_link(arguments, user_id)
else:
return {"success": False, "error": f"Unknown valuation tool: {name}"}
async def handle_get_profile(arguments: Dict[str, Any]) -> Dict[str, Any]:
"""
user_get_profile Tool ํ•ธ๋“ค๋Ÿฌ.
์„ธ์…˜ ํ† ํฐ์ด ์žˆ์œผ๋ฉด ํ”„๋กœํ•„ ๋ฐ˜ํ™˜, ์—†์œผ๋ฉด ์ธ์ฆ URL ๋ฐ˜ํ™˜.
"""
session_token = arguments.get("session_token")
user_session_manager = get_user_session_manager()
if not session_token:
# ์ธ์ฆ๋˜์ง€ ์•Š์Œ โ†’ ์ธ์ฆ URL ์•ˆ๋‚ด
return {
"success": True,
"connected": False,
"auth_url": f"{SERVER_BASE_URL}/auth/login",
"message": "Eodi์— ์—ฐ๊ฒฐ๋˜์ง€ ์•Š์•˜์Šต๋‹ˆ๋‹ค. auth_url์—์„œ ๋กœ๊ทธ์ธํ•˜๊ฑฐ๋‚˜, user_request_auth๋กœ ์ด๋ฉ”์ผ ์ธ์ฆ์„ ์‹œ์ž‘ํ•˜์„ธ์š”."
}
# ์„ธ์…˜ ๊ฒ€์ฆ
session = user_session_manager.get_session(session_token)
if not session:
return {
"success": True,
"connected": False,
"auth_url": f"{SERVER_BASE_URL}/auth/login",
"message": "์„ธ์…˜์ด ๋งŒ๋ฃŒ๋˜์—ˆ์Šต๋‹ˆ๋‹ค. ๋‹ค์‹œ ๋กœ๊ทธ์ธํ•ด์ฃผ์„ธ์š”."
}
# ๋ฉค๋ฒ„์‹ญ ์กฐํšŒ
try:
from src.db.supabase_adapter import SupabaseAdapter
adapter = SupabaseAdapter()
memberships = adapter.get_user_memberships(session.user_id)
profile = adapter.get_user_profile(session.user_id)
except Exception as e:
logger.error(f"ํ”„๋กœํ•„ ์กฐํšŒ ์˜ค๋ฅ˜: {e}")
memberships = []
profile = None
return {
"success": True,
"connected": True,
"user_id": session.user_id,
"profile": {
"email_masked": session.email_masked,
"preferred_airports": profile.get("preferred_airports", []) if profile else [],
"created_at": profile.get("created_at") if profile else None,
},
"memberships": [
{
"chain": m["chain"],
"tier": m["tier"],
"expires_at": m.get("expires_at"),
}
for m in memberships
],
"memberships_count": len(memberships)
}
async def handle_update_membership(arguments: Dict[str, Any]) -> Dict[str, Any]:
"""
user_update_membership Tool ํ•ธ๋“ค๋Ÿฌ.
"""
session_token = arguments.get("session_token")
chain = arguments.get("chain", "").upper()
tier = arguments.get("tier", "")
user_session_manager = get_user_session_manager()
# ์ธ์ฆ ํ™•์ธ
if not session_token:
return {
"success": False,
"error": "session_token์ด ํ•„์š”ํ•ฉ๋‹ˆ๋‹ค. ๋จผ์ € user_get_profile์„ ํ˜ธ์ถœํ•˜์—ฌ ์ธ์ฆ ์ƒํƒœ๋ฅผ ํ™•์ธํ•˜์„ธ์š”."
}
session = user_session_manager.get_session(session_token)
if not session:
return {
"success": False,
"error": "์„ธ์…˜์ด ๋งŒ๋ฃŒ๋˜์—ˆ์Šต๋‹ˆ๋‹ค. ๋‹ค์‹œ ๋กœ๊ทธ์ธํ•ด์ฃผ์„ธ์š”.",
"auth_url": f"{SERVER_BASE_URL}/auth/login"
}
# ์ฒด์ธ ์œ ํšจ์„ฑ
if chain not in SUPPORTED_CHAINS:
return {
"success": False,
"error": f"์ง€์›ํ•˜์ง€ ์•Š๋Š” ์ฒด์ธ์ž…๋‹ˆ๋‹ค: {chain}",
"supported_chains": list(SUPPORTED_CHAINS.keys())
}
# ๋“ฑ๊ธ‰ ์ •๊ทœํ™”
tier_normalized = TIER_ALIASES.get(tier, tier)
# ๋“ฑ๊ธ‰ ์œ ํšจ์„ฑ
valid_tiers = SUPPORTED_CHAINS[chain]
tier_lower_map = {t.lower(): t for t in valid_tiers}
if tier_normalized.lower() not in tier_lower_map:
return {
"success": False,
"error": f"{chain}์˜ ์œ ํšจํ•˜์ง€ ์•Š์€ ๋“ฑ๊ธ‰์ž…๋‹ˆ๋‹ค: {tier}",
"valid_tiers": valid_tiers
}
tier_final = tier_lower_map[tier_normalized.lower()]
# ๋ฉค๋ฒ„์‹ญ ์ €์žฅ
try:
from src.db.supabase_adapter import SupabaseAdapter
adapter = SupabaseAdapter()
# ๊ธฐ์กด ๋ฉค๋ฒ„์‹ญ ํ™•์ธ
existing = adapter.get_user_memberships(session.user_id)
is_update = any(m["chain"] == chain for m in existing)
result = adapter.upsert_membership(session.user_id, chain, tier_final)
if result:
action = "updated" if is_update else "created"
chain_names = {
"HILTON": "ํžํŠผ",
"MARRIOTT": "๋ฉ”๋ฆฌ์–ดํŠธ",
"IHG": "IHG",
"ACCOR": "์•„์ฝ”๋ฅด",
"HYATT": "ํ•˜์–ํŠธ",
}
chain_name = chain_names.get(chain, chain)
return {
"success": True,
"action": action,
"membership": {
"chain": chain,
"tier": tier_final,
},
"message": f"{chain_name} {tier_final} ๋“ฑ๊ธ‰์ด {'์ˆ˜์ •' if is_update else '๋“ฑ๋ก'}๋˜์—ˆ์Šต๋‹ˆ๋‹ค."
}
else:
return {
"success": False,
"error": "๋ฉค๋ฒ„์‹ญ ์ €์žฅ์— ์‹คํŒจํ–ˆ์Šต๋‹ˆ๋‹ค. ์ž ์‹œ ํ›„ ๋‹ค์‹œ ์‹œ๋„ํ•ด์ฃผ์„ธ์š”."
}
except Exception as e:
logger.error(f"๋ฉค๋ฒ„์‹ญ ์ €์žฅ ์˜ค๋ฅ˜: {e}")
return {
"success": False,
"error": f"๋ฉค๋ฒ„์‹ญ ์ €์žฅ ์ค‘ ์˜ค๋ฅ˜๊ฐ€ ๋ฐœ์ƒํ–ˆ์Šต๋‹ˆ๋‹ค: {e}"
}
async def handle_delete_membership(arguments: Dict[str, Any]) -> Dict[str, Any]:
"""
user_delete_membership Tool ํ•ธ๋“ค๋Ÿฌ.
"""
session_token = arguments.get("session_token")
chain = arguments.get("chain", "").upper()
user_session_manager = get_user_session_manager()
# ์ธ์ฆ ํ™•์ธ
if not session_token:
return {
"success": False,
"error": "session_token์ด ํ•„์š”ํ•ฉ๋‹ˆ๋‹ค."
}
session = user_session_manager.get_session(session_token)
if not session:
return {
"success": False,
"error": "์„ธ์…˜์ด ๋งŒ๋ฃŒ๋˜์—ˆ์Šต๋‹ˆ๋‹ค. ๋‹ค์‹œ ๋กœ๊ทธ์ธํ•ด์ฃผ์„ธ์š”."
}
# ์ฒด์ธ ์œ ํšจ์„ฑ
if chain not in SUPPORTED_CHAINS:
return {
"success": False,
"error": f"์ง€์›ํ•˜์ง€ ์•Š๋Š” ์ฒด์ธ์ž…๋‹ˆ๋‹ค: {chain}"
}
# ๋ฉค๋ฒ„์‹ญ ์‚ญ์ œ
try:
from src.db.supabase_adapter import SupabaseAdapter
adapter = SupabaseAdapter()
if adapter.delete_membership(session.user_id, chain):
chain_names = {
"HILTON": "ํžํŠผ",
"MARRIOTT": "๋ฉ”๋ฆฌ์–ดํŠธ",
"IHG": "IHG",
"ACCOR": "์•„์ฝ”๋ฅด",
"HYATT": "ํ•˜์–ํŠธ",
}
chain_name = chain_names.get(chain, chain)
return {
"success": True,
"message": f"{chain_name} ๋ฉค๋ฒ„์‹ญ์ด ์‚ญ์ œ๋˜์—ˆ์Šต๋‹ˆ๋‹ค."
}
else:
return {
"success": False,
"error": "๋ฉค๋ฒ„์‹ญ ์‚ญ์ œ์— ์‹คํŒจํ–ˆ์Šต๋‹ˆ๋‹ค."
}
except Exception as e:
logger.error(f"๋ฉค๋ฒ„์‹ญ ์‚ญ์ œ ์˜ค๋ฅ˜: {e}")
return {
"success": False,
"error": f"๋ฉค๋ฒ„์‹ญ ์‚ญ์ œ ์ค‘ ์˜ค๋ฅ˜๊ฐ€ ๋ฐœ์ƒํ–ˆ์Šต๋‹ˆ๋‹ค: {e}"
}
async def handle_request_auth(arguments: Dict[str, Any]) -> Dict[str, Any]:
"""
user_request_auth Tool ํ•ธ๋“ค๋Ÿฌ.
์ด๋ฉ”์ผ๋กœ Magic Link ๋ฐœ์†ก.
"""
email = arguments.get("email", "").strip().lower()
magic_link_auth = get_magic_link_auth()
if not email:
return {
"success": False,
"error": "์ด๋ฉ”์ผ ์ฃผ์†Œ๋ฅผ ์ž…๋ ฅํ•ด์ฃผ์„ธ์š”."
}
# ๊ฐ„๋‹จํ•œ ์ด๋ฉ”์ผ ํ˜•์‹ ๊ฒ€์ฆ
if "@" not in email or "." not in email:
return {
"success": False,
"error": "์˜ฌ๋ฐ”๋ฅธ ์ด๋ฉ”์ผ ํ˜•์‹์ด ์•„๋‹™๋‹ˆ๋‹ค."
}
success, error = await magic_link_auth.send_magic_link(email)
if success:
# ์ด๋ฉ”์ผ ๋งˆ์Šคํ‚น
local, domain = email.split("@", 1)
if len(local) <= 2:
masked = local[0] + "*"
else:
masked = local[0] + "*" * (len(local) - 2) + local[-1]
email_masked = f"{masked}@{domain}"
return {
"success": True,
"message": f"{email_masked}์œผ๋กœ ๋กœ๊ทธ์ธ ๋งํฌ๋ฅผ ๋ณด๋ƒˆ์Šต๋‹ˆ๋‹ค.\n\n๐Ÿ“ง ์ด๋ฉ”์ผ์—์„œ ๋งํฌ๋ฅผ ํด๋ฆญํ•œ ํ›„,\nํ™”๋ฉด์— ํ‘œ์‹œ๋˜๋Š” 6์ž๋ฆฌ ์ฝ”๋“œ๋ฅผ user_verify_code๋กœ ์ž…๋ ฅํ•ด์ฃผ์„ธ์š”.",
"next_step": "user_verify_code"
}
else:
return {
"success": False,
"error": error or "์ด๋ฉ”์ผ ๋ฐœ์†ก์— ์‹คํŒจํ–ˆ์Šต๋‹ˆ๋‹ค."
}
async def handle_verify_code(arguments: Dict[str, Any]) -> Dict[str, Any]:
"""
user_verify_code Tool ํ•ธ๋“ค๋Ÿฌ.
6์ž๋ฆฌ ์ธ์ฆ ์ฝ”๋“œ ๊ฒ€์ฆ.
"""
code = arguments.get("code", "").strip()
magic_link_auth = get_magic_link_auth()
if not code:
return {
"success": False,
"error": "์ธ์ฆ ์ฝ”๋“œ๋ฅผ ์ž…๋ ฅํ•ด์ฃผ์„ธ์š”."
}
# ์ฝ”๋“œ ๊ธธ์ด ๊ฒ€์ฆ
code_clean = code.replace(" ", "").replace("-", "")
if len(code_clean) != 6 or not code_clean.isdigit():
return {
"success": False,
"error": "6์ž๋ฆฌ ์ˆซ์ž ์ฝ”๋“œ๋ฅผ ์ž…๋ ฅํ•ด์ฃผ์„ธ์š”."
}
# client_ip๋Š” MCP Tool์—์„œ ์ง์ ‘ ์ ‘๊ทผ ๋ถˆ๊ฐ€ โ†’ ๊ธฐ๋ณธ๊ฐ’ ์‚ฌ์šฉ
success, session_token, email_masked, error = magic_link_auth.verify_auth_code(
code_clean, "mcp_client"
)
if success:
return {
"success": True,
"connected": True,
"session_token": session_token,
"email_masked": email_masked,
"message": f"โœ… ์ธ์ฆ ์™„๋ฃŒ! {email_masked}\n\n์ด์ œ ๋ฉค๋ฒ„์‹ญ์„ ๋“ฑ๋กํ•ด๋ณผ๊นŒ์š”?\n์–ด๋–ค ํ˜ธํ…” ์ฒด์ธ ๋ฉค๋ฒ„์‹ญ์„ ๊ฐ€์ง€๊ณ  ๊ณ„์„ธ์š”?"
}
else:
return {
"success": False,
"error": error
}