sql_barcelona_agent / mcp_one_player_supabase.py
fredcaixeta
n
2043bee
import os
import sys
import asyncio
import signal
from typing import Optional
from functools import partial
from dotenv import load_dotenv
from psycopg2 import pool
from psycopg2.extras import RealDictCursor
from mcp.server.fastmcp import FastMCP
# =========================
# .env
# =========================
load_dotenv()
SUPABASE_DB_URI = os.getenv("SUPABASE_DB_URI")
DEFAULT_PORT = int(os.getenv("DEFAULT_PORT", "8003"))
if "YOUR_PASSWORD" in SUPABASE_DB_URI:
print("❌ ERRO: Defina SUPABASE_DB_URI com sua senha real.")
sys.exit(1)
# =========================
# Conexão Supabase (URI + SSL)
# =========================
class SupabaseConnection:
def __init__(self, dsn: str):
try:
self.pool = pool.ThreadedConnectionPool(
minconn=1,
maxconn=10,
dsn=dsn
)
print("✓ Pool criado")
conn = self.pool.getconn()
try:
with conn.cursor() as cur:
cur.execute("SELECT current_database(), inet_server_addr(), inet_server_port();")
db, ip, port = cur.fetchone()
#print(f"✓ Conexão OK: db={db}, ip={ip}, port={port}")
finally:
self.pool.putconn(conn)
except Exception as e:
print("❌ Falha na conexão:", e)
sys.exit(1)
async def execute_query_async(self, query: str, parameters: Optional[tuple] = None):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, partial(self._execute_query_sync, query, parameters))
def _execute_query_sync(self, query: str, parameters: Optional[tuple] = None):
conn = None
try:
conn = self.pool.getconn()
conn.autocommit = True
with conn.cursor(cursor_factory=RealDictCursor) as cur:
if parameters:
cur.execute(query, parameters)
else:
cur.execute(query)
if cur.description:
return [dict(r) for r in cur.fetchall()]
return []
except Exception as e:
raise Exception(f"Erro ao executar query: {str(e)}")
finally:
if conn:
self.pool.putconn(conn)
def close(self):
if hasattr(self, 'pool') and self.pool:
self.pool.closeall()
print("✓ Pool fechado")
db = SupabaseConnection(SUPABASE_DB_URI)
# =========================
# MCP Server
# =========================
mcp = FastMCP(name="Football Player Match Analytics - Supabase", port=str(DEFAULT_PORT))
@mcp.tool()
async def execute_sql_query(query: str, limit: int = 100) -> str:
"""
Executa query SQL READ-ONLY.
"""
query_upper = query.upper().strip()
dangerous = ['DELETE', 'DROP', 'INSERT', 'UPDATE', 'ALTER', 'CREATE', 'TRUNCATE']
if any(k in query_upper for k in dangerous):
return "❌ ERRO: Apenas queries de leitura (SELECT) são permitidas."
try:
if 'LIMIT' not in query_upper:
query += f" LIMIT {limit}"
results = await db.execute_query_async(query)
if not results:
return "✓ Query executada, mas nenhum resultado encontrado."
lines = [f"📊 Resultados ({len(results)} registros):\n"]
for i, record in enumerate(results[:10], 1):
items = [f"{k}={v}" for k, v in record.items()]
lines.append(f"{i}. {', '.join(items)}")
if len(results) > 10:
lines.append(f"\n... e mais {len(results) - 10} resultados.")
return "\n".join(lines)
except Exception as e:
return f"❌ Erro: {str(e)}"
@mcp.tool()
async def get_player_match_history(player_name: str, limit: int = 10) -> str:
query = """
SELECT
match_date, opponent, home_away, minutes_played,
goals, assists, shots, xg, pass_completion_pct, player_nickname
FROM player_match_stats
WHERE player_nickname ILIKE %s
ORDER BY match_date DESC
LIMIT %s
"""
try:
results = await db.execute_query_async(query, (f'%{player_name}%', limit))
if not results:
return f"❌ Nenhum dado encontrado para '{player_name}'"
out = [f"📊 HISTÓRICO DE PARTIDAS - {player_name.upper()}\n"]
for r in results:
xg = f"{(r.get('xg') or 0):.2f}"
pcp = f"{(r.get('pass_completion_pct') or 0):.1f}"
out.append(
f"📅 {r['match_date']} vs {r['opponent']} ({r['home_away']}) - "
f"{r['minutes_played']}min | "
f"⚽ {r['goals']} gols, 🤝 {r['assists']} assists | "
f"🎯 {r['shots']} chutes (xG: {xg}) | "
f"📈 {pcp}% passes"
)
return "\n".join(out)
except Exception as e:
return f"❌ Erro: {str(e)}"
@mcp.tool()
async def get_match_performances(match_date: Optional[str] = None, opponent: Optional[str] = None, limit: int = 15) -> str:
if not match_date and not opponent:
return "❌ Forneça match_date OU opponent"
where_clauses, params = [], []
if match_date:
where_clauses.append("match_date = %s")
params.append(match_date)
if opponent:
where_clauses.append("opponent ILIKE %s")
params.append(f'%{opponent}%')
where_sql = " AND ".join(where_clauses)
params.append(limit)
query = f"""
SELECT
player_nickname, minutes_played, goals, assists,
(goals + assists) as contributions, shots, xg,
pass_completion_pct, touches
FROM player_match_stats
WHERE {where_sql}
ORDER BY (goals + assists) DESC, xg DESC
LIMIT %s
"""
try:
results = await db.execute_query_async(query, tuple(params))
if not results:
return "❌ Partida não encontrada"
info = f"{match_date or 'Data'} vs {opponent or 'Adversário'}"
out = [f"🏟️ PERFORMANCES - {info}\n"]
for i, r in enumerate(results, 1):
xg = f"{(r.get('xg') or 0):.2f}"
pcp = f"{(r.get('pass_completion_pct') or 0):.1f}"
touches = r.get('touches', 0)
out.append(
f"{i}. {r['player_nickname']} ({r['minutes_played']}min): "
f"⚽ {r['goals']}G + 🤝 {r['assists']}A = {r['contributions']} contrib. | "
f"🎯 {r['shots']} chutes (xG: {xg}) | "
f"📈 {pcp}% passes | "
f"👟 {touches} toques"
)
return "\n".join(out)
except Exception as e:
return f"❌ Erro: {str(e)}"
@mcp.tool()
async def get_top_performances(metric: str = "goals", limit: int = 10) -> str:
valid = {
"goals": ("goals", "Gols"),
"assists": ("assists", "Assistências"),
"contributions": ("goals + assists", "Contribuições"),
"xg": ("xg", "xG"),
"shots": ("shots", "Finalizações")
}
if metric not in valid:
return f"❌ Métrica inválida. Opções: {', '.join(valid.keys())}"
metric_sql, metric_name = valid[metric]
query = f"""
SELECT
player_nickname, match_date, opponent, home_away,
goals, assists, xg, shots, pass_completion_pct
FROM player_match_stats
ORDER BY {metric_sql} DESC
LIMIT %s
"""
try:
results = await db.execute_query_async(query, (limit,))
if not results:
return "❌ Nenhum dado encontrado"
out = [f"🏆 TOP {len(results)} PERFORMANCES - {metric_name.upper()}\n"]
for i, r in enumerate(results, 1):
xg = f"{(r.get('xg') or 0):.2f}"
out.append(
f"{i}. {r['player_nickname']} - {r['match_date']} vs {r['opponent']} ({r['home_away']}): "
f"⚽ {r['goals']}G, 🤝 {r['assists']}A, xG: {xg}"
)
return "\n".join(out)
except Exception as e:
return f"❌ Erro: {str(e)}"
# =========================
# Lifecycle robusto (evita CancelScope em task errada)
# =========================
shutdown_event = asyncio.Event()
def _handle_signal():
if not shutdown_event.is_set():
shutdown_event.set()
async def main():
loop = asyncio.get_running_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
try:
loop.add_signal_handler(sig, _handle_signal)
except NotImplementedError:
pass
try:
# print("=" * 70)
# print("🚀 SERVIDOR MCP - SUPABASE (URI)")
# print("=" * 70)
# print(f"🔗 DSN: {SUPABASE_DB_URI.split('@')[1] if '@' in SUPABASE_DB_URI else 'URI configurada'}")
# print(f"🌐 Porta HTTP MCP: {DEFAULT_PORT}")
# print("=" * 70)
server_task = asyncio.create_task(
mcp.run_streamable_http_async()
)
await shutdown_event.wait()
server_task.cancel()
try:
await server_task
except asyncio.CancelledError:
pass
finally:
db.close()
print("Servidor MCP finalizado.")
if __name__ == "__main__":
asyncio.run(main())