import os import sys import asyncio import signal from typing import Optional from functools import partial from dotenv import load_dotenv from psycopg2 import pool from psycopg2.extras import RealDictCursor from mcp.server.fastmcp import FastMCP # ========================= # .env # ========================= load_dotenv() SUPABASE_DB_URI = os.getenv("SUPABASE_DB_URI") DEFAULT_PORT = int(os.getenv("DEFAULT_PORT", "8003")) if "YOUR_PASSWORD" in SUPABASE_DB_URI: print("❌ ERRO: Defina SUPABASE_DB_URI com sua senha real.") sys.exit(1) # ========================= # Conexão Supabase (URI + SSL) # ========================= class SupabaseConnection: def __init__(self, dsn: str): try: self.pool = pool.ThreadedConnectionPool( minconn=1, maxconn=10, dsn=dsn ) print("✓ Pool criado") conn = self.pool.getconn() try: with conn.cursor() as cur: cur.execute("SELECT current_database(), inet_server_addr(), inet_server_port();") db, ip, port = cur.fetchone() #print(f"✓ Conexão OK: db={db}, ip={ip}, port={port}") finally: self.pool.putconn(conn) except Exception as e: print("❌ Falha na conexão:", e) sys.exit(1) async def execute_query_async(self, query: str, parameters: Optional[tuple] = None): loop = asyncio.get_event_loop() return await loop.run_in_executor(None, partial(self._execute_query_sync, query, parameters)) def _execute_query_sync(self, query: str, parameters: Optional[tuple] = None): conn = None try: conn = self.pool.getconn() conn.autocommit = True with conn.cursor(cursor_factory=RealDictCursor) as cur: if parameters: cur.execute(query, parameters) else: cur.execute(query) if cur.description: return [dict(r) for r in cur.fetchall()] return [] except Exception as e: raise Exception(f"Erro ao executar query: {str(e)}") finally: if conn: self.pool.putconn(conn) def close(self): if hasattr(self, 'pool') and self.pool: self.pool.closeall() print("✓ Pool fechado") db = SupabaseConnection(SUPABASE_DB_URI) # ========================= # MCP Server # ========================= mcp = FastMCP(name="Football Player Match Analytics - Supabase", port=str(DEFAULT_PORT)) @mcp.tool() async def execute_sql_query(query: str, limit: int = 100) -> str: """ Executa query SQL READ-ONLY. """ query_upper = query.upper().strip() dangerous = ['DELETE', 'DROP', 'INSERT', 'UPDATE', 'ALTER', 'CREATE', 'TRUNCATE'] if any(k in query_upper for k in dangerous): return "❌ ERRO: Apenas queries de leitura (SELECT) são permitidas." try: if 'LIMIT' not in query_upper: query += f" LIMIT {limit}" results = await db.execute_query_async(query) if not results: return "✓ Query executada, mas nenhum resultado encontrado." lines = [f"📊 Resultados ({len(results)} registros):\n"] for i, record in enumerate(results[:10], 1): items = [f"{k}={v}" for k, v in record.items()] lines.append(f"{i}. {', '.join(items)}") if len(results) > 10: lines.append(f"\n... e mais {len(results) - 10} resultados.") return "\n".join(lines) except Exception as e: return f"❌ Erro: {str(e)}" @mcp.tool() async def get_player_match_history(player_name: str, limit: int = 10) -> str: query = """ SELECT match_date, opponent, home_away, minutes_played, goals, assists, shots, xg, pass_completion_pct, player_nickname FROM player_match_stats WHERE player_nickname ILIKE %s ORDER BY match_date DESC LIMIT %s """ try: results = await db.execute_query_async(query, (f'%{player_name}%', limit)) if not results: return f"❌ Nenhum dado encontrado para '{player_name}'" out = [f"📊 HISTÓRICO DE PARTIDAS - {player_name.upper()}\n"] for r in results: xg = f"{(r.get('xg') or 0):.2f}" pcp = f"{(r.get('pass_completion_pct') or 0):.1f}" out.append( f"📅 {r['match_date']} vs {r['opponent']} ({r['home_away']}) - " f"{r['minutes_played']}min | " f"⚽ {r['goals']} gols, 🤝 {r['assists']} assists | " f"🎯 {r['shots']} chutes (xG: {xg}) | " f"📈 {pcp}% passes" ) return "\n".join(out) except Exception as e: return f"❌ Erro: {str(e)}" @mcp.tool() async def get_match_performances(match_date: Optional[str] = None, opponent: Optional[str] = None, limit: int = 15) -> str: if not match_date and not opponent: return "❌ Forneça match_date OU opponent" where_clauses, params = [], [] if match_date: where_clauses.append("match_date = %s") params.append(match_date) if opponent: where_clauses.append("opponent ILIKE %s") params.append(f'%{opponent}%') where_sql = " AND ".join(where_clauses) params.append(limit) query = f""" SELECT player_nickname, minutes_played, goals, assists, (goals + assists) as contributions, shots, xg, pass_completion_pct, touches FROM player_match_stats WHERE {where_sql} ORDER BY (goals + assists) DESC, xg DESC LIMIT %s """ try: results = await db.execute_query_async(query, tuple(params)) if not results: return "❌ Partida não encontrada" info = f"{match_date or 'Data'} vs {opponent or 'Adversário'}" out = [f"🏟️ PERFORMANCES - {info}\n"] for i, r in enumerate(results, 1): xg = f"{(r.get('xg') or 0):.2f}" pcp = f"{(r.get('pass_completion_pct') or 0):.1f}" touches = r.get('touches', 0) out.append( f"{i}. {r['player_nickname']} ({r['minutes_played']}min): " f"⚽ {r['goals']}G + 🤝 {r['assists']}A = {r['contributions']} contrib. | " f"🎯 {r['shots']} chutes (xG: {xg}) | " f"📈 {pcp}% passes | " f"👟 {touches} toques" ) return "\n".join(out) except Exception as e: return f"❌ Erro: {str(e)}" @mcp.tool() async def get_top_performances(metric: str = "goals", limit: int = 10) -> str: valid = { "goals": ("goals", "Gols"), "assists": ("assists", "Assistências"), "contributions": ("goals + assists", "Contribuições"), "xg": ("xg", "xG"), "shots": ("shots", "Finalizações") } if metric not in valid: return f"❌ Métrica inválida. Opções: {', '.join(valid.keys())}" metric_sql, metric_name = valid[metric] query = f""" SELECT player_nickname, match_date, opponent, home_away, goals, assists, xg, shots, pass_completion_pct FROM player_match_stats ORDER BY {metric_sql} DESC LIMIT %s """ try: results = await db.execute_query_async(query, (limit,)) if not results: return "❌ Nenhum dado encontrado" out = [f"🏆 TOP {len(results)} PERFORMANCES - {metric_name.upper()}\n"] for i, r in enumerate(results, 1): xg = f"{(r.get('xg') or 0):.2f}" out.append( f"{i}. {r['player_nickname']} - {r['match_date']} vs {r['opponent']} ({r['home_away']}): " f"⚽ {r['goals']}G, 🤝 {r['assists']}A, xG: {xg}" ) return "\n".join(out) except Exception as e: return f"❌ Erro: {str(e)}" # ========================= # Lifecycle robusto (evita CancelScope em task errada) # ========================= shutdown_event = asyncio.Event() def _handle_signal(): if not shutdown_event.is_set(): shutdown_event.set() async def main(): loop = asyncio.get_running_loop() for sig in (signal.SIGINT, signal.SIGTERM): try: loop.add_signal_handler(sig, _handle_signal) except NotImplementedError: pass try: # print("=" * 70) # print("🚀 SERVIDOR MCP - SUPABASE (URI)") # print("=" * 70) # print(f"🔗 DSN: {SUPABASE_DB_URI.split('@')[1] if '@' in SUPABASE_DB_URI else 'URI configurada'}") # print(f"🌐 Porta HTTP MCP: {DEFAULT_PORT}") # print("=" * 70) server_task = asyncio.create_task( mcp.run_streamable_http_async() ) await shutdown_event.wait() server_task.cancel() try: await server_task except asyncio.CancelledError: pass finally: db.close() print("Servidor MCP finalizado.") if __name__ == "__main__": asyncio.run(main())