import asyncio import os from typing import List, Dict, Any, Optional from pathlib import Path from functools import partial from dotenv import load_dotenv from psycopg2 import pool from psycopg2.extras import RealDictCursor load_dotenv() from pydantic_ai import Agent, RunContext from pydantic_ai.providers.groq import GroqProvider from pydantic_ai.models.groq import GroqModel from pydantic_ai.messages import PartDeltaEvent, TextPartDelta from pydantic_graph import End from pydantic import BaseModel # PROMPTS from prompts import matches_prompt, players_prompt, player_matchid_prompt, graph_agent_prompt MATCHES_SYSTEM_PROMPT = matches_prompt.MATCHES_SYSTEM_PROMPT PLAYERS_SYSTEM_PROMPT = players_prompt.PLAYERS_SYSTEM_PROMPT PLAYER_MATCHID_SYSTEM_PROMPT = player_matchid_prompt.SYSTEM_PROMPT GRAPH_AGENT_SYSTEM_PROMPT = graph_agent_prompt.GRAPH_AGENT_SYSTEM_PROMPT # Ajustado import pandas as pd import matplotlib matplotlib.use('Agg') import matplotlib.pyplot as plt import io from PIL import Image, ImageDraw, ImageFont from dataclasses import dataclass SUPABASE_DB_URI = os.getenv("SUPABASE_DB_URI") # ========================= SupabaseConnection (migrada do MCP) ========================= class SupabaseConnection: def __init__(self, dsn: str): self.pool = pool.ThreadedConnectionPool(minconn=1, maxconn=10, dsn=dsn) print("✓ Pool Supabase criado") # Teste conexão conn = self.pool.getconn() try: with conn.cursor() as cur: cur.execute("SELECT 1") finally: self.pool.putconn(conn) async def execute_query_async(self, query: str, parameters: Optional[tuple] = None): loop = asyncio.get_event_loop() return await loop.run_in_executor(None, partial(self._execute_query_sync, query, parameters)) def _execute_query_sync(self, query: str, parameters: Optional[tuple] = None): conn = None try: conn = self.pool.getconn() conn.autocommit = True with conn.cursor(cursor_factory=RealDictCursor) as cur: if parameters: cur.execute(query, parameters) else: cur.execute(query) if cur.description: return [dict(r) for r in cur.fetchall()] return [] except Exception as e: raise Exception(f"Erro query: {str(e)}") finally: if conn: self.pool.putconn(conn) def close(self): if hasattr(self, 'pool') and self.pool: self.pool.closeall() print("✓ Pool fechado") db = SupabaseConnection(SUPABASE_DB_URI) # ========================= Agents (sem MCP) ========================= class Deps(BaseModel): ai_query: str api_key = os.getenv("GROQ_DEV_API_KEY") groq_model = GroqModel("moonshotai/kimi-k2-instruct-0905", provider=GroqProvider(api_key=api_key)) api_key_2 = os.getenv("GROQ_DEV_API_KEY_2") groq_model_2 = GroqModel("openai/gpt-oss-20b", provider=GroqProvider(api_key=api_key_2)) one_player_agent = Agent( model=groq_model, system_prompt=PLAYER_MATCHID_SYSTEM_PROMPT ) # ========================= TOOLS DIRETAS no one_player_agent ========================= @one_player_agent.tool_plain() async def execute_sql_query(query: str, limit: int = 100) -> str: """Executa query SQL READ-ONLY.""" query_upper = query.upper().strip() dangerous = ['DELETE', 'DROP', 'INSERT', 'UPDATE', 'ALTER', 'CREATE', 'TRUNCATE'] if any(k in query_upper for k in dangerous): return "❌ Apenas SELECT permitidas." try: if 'LIMIT' not in query_upper: query += f" LIMIT {limit}" results = await db.execute_query_async(query) if not results: return "✓ Query OK, sem resultados." lines = [f"📊 {len(results)} registros:\n"] for i, record in enumerate(results[:10], 1): items = [f"{k}={v}" for k, v in record.items()] lines.append(f"{i}. {', '.join(items)}") if len(results) > 10: lines.append(f"\n... +{len(results)-10}") return "\n".join(lines) except Exception as e: return f"❌ Erro: {str(e)}" @one_player_agent.tool_plain() async def get_player_match_history(player_name: str, limit: int = 10) -> str: query = """ SELECT match_date, opponent, home_away, minutes_played, goals, assists, shots, xg, pass_completion_pct, player_nickname FROM player_match_stats WHERE player_nickname ILIKE %s ORDER BY match_date DESC LIMIT %s """ try: results = await db.execute_query_async(query, (f'%{player_name}%', limit)) if not results: return f"❌ Nenhum dado para '{player_name}'" out = [f"📊 HISTÓRICO - {player_name.upper()}\n"] for r in results: xg = f"{(r.get('xg') or 0):.2f}" pcp = f"{(r.get('pass_completion_pct') or 0):.1f}" out.append(f"📅 {r['match_date']} vs {r['opponent']} ({r['home_away']}) - {r['minutes_played']}min | ⚽{r['goals']}G 🤝{r['assists']}A | 🎯{r['shots']} (xG:{xg}) | 📈{pcp}%") return "\n".join(out) except Exception as e: return f"❌ Erro: {str(e)}" @one_player_agent.tool_plain() async def get_match_performances(match_date: Optional[str] = None, opponent: Optional[str] = None, limit: int = 15) -> str: if not match_date and not opponent: return "❌ Forneça match_date OU opponent" where_clauses, params = [], [] if match_date: where_clauses.append("match_date = %s"); params.append(match_date) if opponent: where_clauses.append("opponent ILIKE %s"); params.append(f'%{opponent}%') where_sql = " AND ".join(where_clauses) params.append(limit) query = f""" SELECT player_nickname, minutes_played, goals, assists, (goals + assists) as contributions, shots, xg, pass_completion_pct, touches FROM player_match_stats WHERE {where_sql} ORDER BY (goals + assists) DESC, xg DESC LIMIT %s """ try: results = await db.execute_query_async(query, tuple(params)) if not results: return "❌ Partida não encontrada" info = f"{match_date or ''} vs {opponent or ''}".strip() out = [f"🏟️ PERFORMANCES - {info}\n"] for i, r in enumerate(results, 1): xg = f"{(r.get('xg') or 0):.2f}"; pcp = f"{(r.get('pass_completion_pct') or 0):.1f}" touches = r.get('touches', 0) out.append(f"{i}. {r['player_nickname']} ({r['minutes_played']}min): ⚽{r['goals']}G + 🤝{r['assists']}A = {r['contributions']} | 🎯{r['shots']} (xG:{xg}) | 📈{pcp}% | 👟{touches}") return "\n".join(out) except Exception as e: return f"❌ Erro: {str(e)}" @one_player_agent.tool_plain() async def show_available_tables() -> str: query = """ SELECT table_name FROM information_schema.tables WHERE table_schema = 'public' ORDER BY table_name """ # listar tabelas via information_schema [web:61] try: results = await db.execute_query_async(query) if not results: return "❌ Nenhuma tabela encontrada no schema public." return "📚 TABELAS:\n" + "\n".join([f"- {r['table_name']}" for r in results]) except Exception as e: return f"❌ Erro: {str(e)}" @one_player_agent.tool_plain() async def get_goal_events(player_scorer: str = None, assist_from: str = None, opponent: str = None, only_crosses: bool = False, only_high_crosses: bool = False, limit: int = 50) -> str: # IMPORTANT: troque player_goals_stats pelo nome REAL da sua tabela table = "player_goals_stats" where = ["team = 'Barcelona'"] params = [] if player_scorer: where.append("player_scorer_nick ILIKE %s"); params.append(f"%{player_scorer}%") if assist_from: where.append("pass_from_nick ILIKE %s"); params.append(f"%{assist_from}%") if opponent: where.append("opponent ILIKE %s"); params.append(f"%{opponent}%") if only_crosses: where.append("is_cross_pass = TRUE") if only_high_crosses: where.append("is_cross_pass_high = TRUE") params.append(limit) where_sql = " AND ".join(where) query = f""" SELECT match_date, opponent, home_away, minute, second, player_scorer_nick, pass_from_nick, is_cross_pass, is_cross_pass_high, play_pattern, shot_type, shot_body_part, xg FROM {table} WHERE {where_sql} ORDER BY match_date DESC, minute DESC, second DESC LIMIT %s """ try: results = await db.execute_query_async(query, tuple(params)) if not results: return "❌ Sem gols encontrados para esse filtro." out = ["⚽ EVENTOS DE GOL\n"] for i, r in enumerate(results, 1): out.append( f"{i}. {r['match_date']} vs {r['opponent']} ({r['home_away']}) " f"{r.get('minute')}:{r.get('second')} - " f"{r.get('player_scorer_nick')} (assist: {r.get('pass_from_nick')}) | " f"cross={r.get('is_cross_pass')} high_cross={r.get('is_cross_pass_high')} | " f"{r.get('shot_body_part')} | xG={r.get('xg')}" ) return "\n".join(out) except Exception as e: return f"❌ Erro: {str(e)}" @one_player_agent.tool_plain() async def get_top_performances(metric: str = "goals", limit: int = 10) -> str: valid = {"goals": ("goals", "Gols"), "assists": ("assists", "Assistências"), "contributions": ("goals + assists", "Contribuições"), "xg": ("xg", "xG"), "shots": ("shots", "Finalizações")} if metric not in valid: return f"❌ Métricas: {', '.join(valid)}" metric_sql, metric_name = valid[metric] query = f""" SELECT player_nickname, match_date, opponent, home_away, goals, assists, xg, shots, pass_completion_pct FROM player_match_stats ORDER BY {metric_sql} DESC LIMIT %s """ try: results = await db.execute_query_async(query, (limit,)) if not results: return "❌ Sem dados" out = [f"🏆 TOP {metric_name.upper()}\n"] for i, r in enumerate(results, 1): xg = f"{(r.get('xg') or 0):.2f}" out.append(f"{i}. {r['player_nickname']} - {r['match_date']} vs {r['opponent']} ({r['home_away']}): ⚽{r['goals']}G 🤝{r['assists']}A xG:{xg}") return "\n".join(out) except Exception as e: return f"❌ Erro: {str(e)}" # Manter create_chart (já era tool local) last_chart_image = None # ... (código do create_chart igual, global last_chart_image) # Funções de response (já sem MCP) async def agent_conventional_response(user_query: str) -> str: res = await one_player_agent.run(user_prompt=user_query) #print(res.output) return res.output async def stream_agent_response_safe(user_query: str) -> str: try: async with one_player_agent.iter(user_query) as agent_run: async for node in agent_run: if isinstance(node, End) and agent_run.result: print(str(agent_run.result.output)) return str(agent_run.result.output) except Exception as e: import traceback traceback.print_exc() return f"Erro: {str(e)}" return "Nenhuma resposta." # Cleanup global async def shutdown(): db.close() if __name__ == "__main__": _ = asyncio.run(agent_conventional_response("quantos sao os gols baseados em cruzamentos?")) print(_)