racing-analysis / src /plot_analysis.py
matias-cataife's picture
Upload src/plot_analysis.py with huggingface_hub
5ace802 verified
"""
AI-powered plot analysis using Gemini.
Extracts statistics from dataframes and generates insights.
"""
import os
import json
import hashlib
import time
import re
from pathlib import Path
import pandas as pd
def strip_markdown(text):
"""Remove markdown formatting from text for plain text output."""
# Remove bold/italic markers
text = re.sub(r'\*\*([^*]+)\*\*', r'\1', text)
text = re.sub(r'\*([^*]+)\*', r'\1', text)
text = re.sub(r'__([^_]+)__', r'\1', text)
text = re.sub(r'_([^_]+)_', r'\1', text)
# Remove headers
text = re.sub(r'^#{1,6}\s+', '', text, flags=re.MULTILINE)
# Remove bullet points
text = re.sub(r'^\s*[-*]\s+', '', text, flags=re.MULTILINE)
# Clean up multiple newlines
text = re.sub(r'\n{3,}', '\n\n', text)
return text.strip()
# Gemini imports (optional - only if API key is set)
try:
import google.generativeai as genai
GEMINI_AVAILABLE = True
except ImportError:
GEMINI_AVAILABLE = False
# Cache directory for analysis results
CACHE_DIR = Path(__file__).parent.parent / "outputs" / "analysis_cache"
# Models to try in order (fallback chain) - lighter models first for better rate limits
GEMINI_MODELS = ["gemini-2.0-flash-lite", "gemini-2.5-flash-lite", "gemini-2.0-flash"]
def get_gemini_client(model_name=None):
"""Initialize Gemini client if API key is available."""
api_key = os.environ.get("GEMINI_API_KEY")
if not api_key:
return None
if not GEMINI_AVAILABLE:
return None
genai.configure(api_key=api_key)
model = model_name or GEMINI_MODELS[0]
return genai.GenerativeModel(model)
def get_cache_key(plot_type, stats_dict, context):
"""Generate a cache key from the analysis parameters."""
data = json.dumps({
"plot_type": plot_type,
"stats": stats_dict,
"context": context
}, sort_keys=True, default=str)
return hashlib.md5(data.encode()).hexdigest()
def get_cached_analysis(cache_key):
"""Retrieve cached analysis if available."""
cache_file = CACHE_DIR / f"{cache_key}.json"
if cache_file.exists():
with open(cache_file, "r", encoding="utf-8") as f:
return json.load(f).get("analysis")
return None
def save_to_cache(cache_key, analysis):
"""Save analysis to cache."""
CACHE_DIR.mkdir(parents=True, exist_ok=True)
cache_file = CACHE_DIR / f"{cache_key}.json"
with open(cache_file, "w", encoding="utf-8") as f:
json.dump({"analysis": analysis}, f, ensure_ascii=False)
# =============================================================================
# STATS EXTRACTORS - Enhanced with benchmarks, players, trends
# =============================================================================
def _get_shots_from_events(events_df, sequence_ids=None):
"""Extract shot stats from events_detail DataFrame."""
if events_df is None or events_df.empty:
return {"shots": 0, "xG": 0, "avg_xG": 0}
# Filter by sequence IDs if provided
if sequence_ids is not None:
events_df = events_df[events_df["corner_sequence_id"].isin(sequence_ids)]
# Get shots using isShot column
if "isShot" in events_df.columns:
shots = events_df[events_df["isShot"] == True]
elif "event_type" in events_df.columns:
shots = events_df[events_df["event_type"] == "shot"]
else:
return {"shots": 0, "xG": 0, "avg_xG": 0}
shot_count = len(shots)
total_xg = shots["xG"].sum() if "xG" in shots.columns else 0
avg_xg = shots["xG"].mean() if "xG" in shots.columns and shot_count > 0 else 0
return {
"shots": int(shot_count),
"xG": round(total_xg, 2),
"avg_xG": round(avg_xg, 3) if shot_count > 0 else 0
}
def _calculate_league_benchmarks(full_summary_df, events_df=None):
"""Calculate league-wide averages for comparison."""
if full_summary_df is None or full_summary_df.empty:
return {}
total = len(full_summary_df)
goals = len(full_summary_df[full_summary_df["absorption_event"] == "goal"])
benchmarks = {
"league_goal_rate": round(goals / total * 100, 2) if total > 0 else 0,
"league_total_sequences": total,
"league_total_goals": int(goals)
}
# Get shot data from events
if events_df is not None and not events_df.empty:
shot_stats = _get_shots_from_events(events_df)
benchmarks["league_shots"] = shot_stats["shots"]
benchmarks["league_xG"] = shot_stats["xG"]
benchmarks["league_shot_rate"] = round(shot_stats["shots"] / total * 100, 2) if total > 0 else 0
return benchmarks
def _extract_player_stats(summary_df, events_df=None):
"""Extract top corner takers and scorers."""
if summary_df.empty or "corner_playerName" not in summary_df.columns:
return {}
# Top takers
taker_counts = summary_df["corner_playerName"].value_counts()
top_takers = []
for player, count in taker_counts.head(3).items():
player_df = summary_df[summary_df["corner_playerName"] == player]
goals = len(player_df[player_df["absorption_event"] == "goal"])
top_takers.append({
"name": player,
"corners": int(count),
"goals_generated": int(goals),
"goal_rate": round(goals / count * 100, 1) if count > 0 else 0
})
# Top scorers (from events if available)
top_scorers = []
if events_df is not None and not events_df.empty:
# Get goal events
if "event_type" in events_df.columns:
goal_events = events_df[
(events_df["event_type"] == "shot") &
(events_df["corner_sequence_id"].isin(
summary_df[summary_df["absorption_event"] == "goal"]["corner_sequence_id"]
))
]
if "event_playerName" in goal_events.columns and len(goal_events) > 0:
scorer_counts = goal_events["event_playerName"].value_counts()
for player, count in scorer_counts.head(3).items():
top_scorers.append({"name": player, "goals": int(count)})
return {
"top_takers": top_takers,
"top_scorers": top_scorers
}
def _extract_opponent_breakdown(summary_df, events_df=None):
"""Extract performance breakdown by opponent."""
if summary_df.empty or "TeamRival" not in summary_df.columns:
return {}
# Group by opponent
opponent_stats = []
opponent_groups = summary_df.groupby("TeamRival")
for opponent, opp_df in opponent_groups:
goals = len(opp_df[opp_df["absorption_event"] == "goal"])
seq_ids = opp_df["corner_sequence_id"].unique()
shot_data = _get_shots_from_events(events_df, seq_ids) if events_df is not None else {"shots": 0, "xG": 0}
opponent_stats.append({
"opponent": opponent,
"corners": len(opp_df),
"goals": int(goals),
"shots": shot_data["shots"],
"xG": shot_data["xG"]
})
# Sort by goals descending
opponent_stats.sort(key=lambda x: x["goals"], reverse=True)
# Top 3 best and worst opponents
return {
"best_opponents": opponent_stats[:3],
"worst_opponents": opponent_stats[-3:] if len(opponent_stats) > 3 else []
}
def _extract_monthly_trend(summary_df, events_df=None):
"""Extract monthly performance trend."""
if summary_df.empty or "fecha" not in summary_df.columns:
return {}
summary_df = summary_df.copy()
# Parse fecha (handle format like "2025-08-16Z")
try:
summary_df["month"] = pd.to_datetime(summary_df["fecha"].str.replace("Z", "")).dt.to_period("M")
except Exception:
return {}
monthly_stats = []
for month, month_df in summary_df.groupby("month"):
goals = len(month_df[month_df["absorption_event"] == "goal"])
seq_ids = month_df["corner_sequence_id"].unique()
shot_data = _get_shots_from_events(events_df, seq_ids) if events_df is not None else {"shots": 0, "xG": 0}
monthly_stats.append({
"month": str(month),
"corners": len(month_df),
"goals": int(goals),
"goal_rate": round(goals / len(month_df) * 100, 1) if len(month_df) > 0 else 0,
"shots": shot_data["shots"],
"xG": shot_data["xG"]
})
# Sort by month
monthly_stats.sort(key=lambda x: x["month"])
return {"monthly_trend": monthly_stats}
def _extract_zone_effectiveness(summary_df, events_df=None):
"""Extract zone effectiveness - which zones lead to goals."""
if summary_df.empty:
return {}
zone_col = "initial_zone" if "initial_zone" in summary_df.columns else "corner_zone"
if zone_col not in summary_df.columns:
return {}
zone_stats = []
for zone, zone_df in summary_df.groupby(zone_col):
total = len(zone_df)
goals = len(zone_df[zone_df["absorption_event"] == "goal"])
seq_ids = zone_df["corner_sequence_id"].unique()
shot_data = _get_shots_from_events(events_df, seq_ids) if events_df is not None else {"shots": 0, "xG": 0}
zone_stats.append({
"zone": zone,
"corners": total,
"goals": int(goals),
"goal_rate": round(goals / total * 100, 1) if total > 0 else 0,
"shots": shot_data["shots"],
"shot_rate": round(shot_data["shots"] / total * 100, 1) if total > 0 else 0,
"xG": shot_data["xG"]
})
# Sort by goal_rate descending
zone_stats.sort(key=lambda x: x["goal_rate"], reverse=True)
return {
"most_effective_zones": zone_stats[:5],
"least_effective_zones": zone_stats[-3:] if len(zone_stats) > 5 else []
}
def extract_zone_heatmap_stats(summary_df, events_df=None, full_league_df=None):
"""Extract stats for zone destination heatmap."""
if summary_df.empty:
return {}
zone_col = "initial_zone" if "initial_zone" in summary_df.columns else "corner_zone"
if zone_col not in summary_df.columns:
return {}
zone_counts = summary_df[zone_col].value_counts()
total = len(summary_df)
top_zones = []
for zone, count in zone_counts.head(5).items():
zone_df = summary_df[summary_df[zone_col] == zone]
goals = len(zone_df[zone_df["absorption_event"] == "goal"])
top_zones.append({
"zone": zone,
"count": int(count),
"percentage": round(count / total * 100, 1),
"goals": int(goals),
"goal_rate": round(goals / count * 100, 1) if count > 0 else 0
})
stats = {
"total_sequences": total,
"unique_zones": int(zone_counts.nunique()),
"top_5_zones": top_zones,
"concentration": round(zone_counts.head(3).sum() / total * 100, 1) if total > 0 else 0
}
# Add zone effectiveness
stats.update(_extract_zone_effectiveness(summary_df, events_df))
return stats
def extract_absorption_stats(summary_df, events_df=None, full_league_df=None):
"""Extract stats for absorption distribution."""
if summary_df.empty or "absorption_event" not in summary_df.columns:
return {}
abs_counts = summary_df["absorption_event"].value_counts()
total = len(summary_df)
distribution = []
for event, count in abs_counts.items():
distribution.append({
"event": event,
"count": int(count),
"percentage": round(count / total * 100, 1)
})
goal_count = abs_counts.get("goal", 0)
shot_data = _get_shots_from_events(events_df, summary_df["corner_sequence_id"].unique())
stats = {
"total_sequences": total,
"distribution": distribution,
"goals": int(goal_count),
"goal_rate": round(goal_count / total * 100, 2) if total > 0 else 0,
"shots": shot_data["shots"],
"shot_rate": round(shot_data["shots"] / total * 100, 2) if total > 0 else 0,
"xG": shot_data["xG"],
"avg_xG_per_shot": shot_data["avg_xG"]
}
# Add league benchmarks
if full_league_df is not None:
stats["benchmarks"] = _calculate_league_benchmarks(full_league_df, events_df)
return stats
def extract_esv_by_zone_stats(summary_df, events_df=None, full_league_df=None):
"""Extract zone effectiveness statistics (renamed from ESV)."""
if summary_df.empty:
return {}
zone_col = "initial_zone" if "initial_zone" in summary_df.columns else "corner_zone"
if zone_col not in summary_df.columns:
return {}
# Calculate effectiveness by zone using goals and shots
zone_stats = []
for zone, zone_df in summary_df.groupby(zone_col):
total = len(zone_df)
goals = len(zone_df[zone_df["absorption_event"] == "goal"])
seq_ids = zone_df["corner_sequence_id"].unique()
shot_data = _get_shots_from_events(events_df, seq_ids)
zone_stats.append({
"zone": zone,
"corners": total,
"goals": int(goals),
"goal_rate": round(goals / total * 100, 1) if total > 0 else 0,
"shots": shot_data["shots"],
"xG": shot_data["xG"]
})
# Sort by goal_rate
zone_stats.sort(key=lambda x: x["goal_rate"], reverse=True)
return {
"total_sequences": len(summary_df),
"zones_by_effectiveness": zone_stats[:7]
}
def extract_side_analysis_stats(summary_df, events_df=None, full_league_df=None):
"""Extract left vs right side comparison stats."""
if summary_df.empty or "corner_side" not in summary_df.columns:
return {}
sides = {}
for side in ["IZQ", "DER"]:
side_df = summary_df[summary_df["corner_side"] == side]
if len(side_df) == 0:
continue
goals = len(side_df[side_df["absorption_event"] == "goal"])
seq_ids = side_df["corner_sequence_id"].unique()
shot_data = _get_shots_from_events(events_df, seq_ids)
sides[side] = {
"count": len(side_df),
"goals": int(goals),
"goal_rate": round(goals / len(side_df) * 100, 2),
"shots": shot_data["shots"],
"shot_rate": round(shot_data["shots"] / len(side_df) * 100, 2),
"xG": shot_data["xG"]
}
return {
"total_sequences": len(summary_df),
"by_side": sides
}
def extract_time_period_stats(summary_df, events_df=None, full_league_df=None):
"""Extract stats by time period."""
if summary_df.empty or "minute" not in summary_df.columns:
return {}
def get_period(minute):
if minute <= 15: return "0-15"
elif minute <= 30: return "16-30"
elif minute <= 45: return "31-45"
elif minute <= 60: return "46-60"
elif minute <= 75: return "61-75"
elif minute <= 90: return "76-90"
else: return "90+"
summary_df = summary_df.copy()
summary_df["period"] = summary_df["minute"].apply(get_period)
periods = []
for period in ["0-15", "16-30", "31-45", "46-60", "61-75", "76-90", "90+"]:
period_df = summary_df[summary_df["period"] == period]
if len(period_df) == 0:
continue
goals = len(period_df[period_df["absorption_event"] == "goal"])
seq_ids = period_df["corner_sequence_id"].unique()
shot_data = _get_shots_from_events(events_df, seq_ids)
periods.append({
"period": period,
"count": len(period_df),
"goals": int(goals),
"goal_rate": round(goals / len(period_df) * 100, 2),
"shots": shot_data["shots"],
"xG": shot_data["xG"]
})
return {
"total_sequences": len(summary_df),
"by_period": periods
}
def extract_shots_stats(summary_df, events_df=None, full_league_df=None):
"""Extract shot statistics with benchmarks."""
if summary_df.empty:
return {}
total = len(summary_df)
goals = len(summary_df[summary_df["absorption_event"] == "goal"])
# Get shots from events
seq_ids = summary_df["corner_sequence_id"].unique()
shot_data = _get_shots_from_events(events_df, seq_ids)
stats = {
"total_sequences": total,
"goals": int(goals),
"goal_rate": round(goals / total * 100, 2) if total > 0 else 0,
"shots": shot_data["shots"],
"shot_rate": round(shot_data["shots"] / total * 100, 2) if total > 0 else 0,
"xG": shot_data["xG"],
"xG_per_sequence": round(shot_data["xG"] / total, 3) if total > 0 else 0,
"avg_xG_per_shot": shot_data["avg_xG"],
"conversion_rate": round(goals / shot_data["shots"] * 100, 1) if shot_data["shots"] > 0 else 0
}
# Add league benchmarks
if full_league_df is not None:
stats["benchmarks"] = _calculate_league_benchmarks(full_league_df, events_df)
return stats
def extract_shots_quality_stats(summary_df, events_df=None, full_league_df=None):
"""Extract shot quality statistics (xG distribution, high-value chances)."""
if summary_df.empty:
return {}
total = len(summary_df)
goals = len(summary_df[summary_df["absorption_event"] == "goal"])
seq_ids = summary_df["corner_sequence_id"].unique()
stats = {
"total_sequences": total,
"goals": int(goals),
"goal_rate": round(goals / total * 100, 2) if total > 0 else 0,
}
# Get detailed shot data from events
if events_df is not None and not events_df.empty:
filtered = events_df[events_df["corner_sequence_id"].isin(seq_ids)]
if "isShot" in filtered.columns:
shots = filtered[filtered["isShot"] == True].copy()
elif "event_type" in filtered.columns:
shots = filtered[filtered["event_type"] == "shot"].copy()
else:
shots = pd.DataFrame()
if len(shots) > 0 and "xG" in shots.columns:
stats["shots"] = len(shots)
stats["xG_total"] = round(shots["xG"].sum(), 2)
stats["xG_avg"] = round(shots["xG"].mean(), 3)
stats["xG_max"] = round(shots["xG"].max(), 3)
stats["xG_min"] = round(shots["xG"].min(), 3)
# Classify shots by quality
high_quality = len(shots[shots["xG"] >= 0.15])
medium_quality = len(shots[(shots["xG"] >= 0.05) & (shots["xG"] < 0.15)])
low_quality = len(shots[shots["xG"] < 0.05])
stats["high_quality_shots"] = int(high_quality)
stats["medium_quality_shots"] = int(medium_quality)
stats["low_quality_shots"] = int(low_quality)
stats["high_quality_pct"] = round(high_quality / len(shots) * 100, 1) if len(shots) > 0 else 0
else:
stats["shots"] = 0
stats["xG_total"] = 0
else:
shot_data = _get_shots_from_events(events_df, seq_ids)
stats.update(shot_data)
return stats
def extract_shots_location_stats(summary_df, events_df=None, full_league_df=None):
"""Extract shot location statistics (where shots come from)."""
if summary_df.empty:
return {}
total = len(summary_df)
goals = len(summary_df[summary_df["absorption_event"] == "goal"])
seq_ids = summary_df["corner_sequence_id"].unique()
stats = {
"total_sequences": total,
"goals": int(goals),
}
# Get shot locations from events
if events_df is not None and not events_df.empty:
filtered = events_df[events_df["corner_sequence_id"].isin(seq_ids)]
if "isShot" in filtered.columns:
shots = filtered[filtered["isShot"] == True].copy()
elif "event_type" in filtered.columns:
shots = filtered[filtered["event_type"] == "shot"].copy()
else:
shots = pd.DataFrame()
if len(shots) > 0:
stats["shots"] = len(shots)
# Analyze shot locations (x, y coordinates)
if "x" in shots.columns and "y" in shots.columns:
# Inside box (x > 83, y between 21.1 and 78.9 approximately)
inside_box = shots[(shots["x"] > 83) & (shots["y"] > 21) & (shots["y"] < 79)]
outside_box = shots[~((shots["x"] > 83) & (shots["y"] > 21) & (shots["y"] < 79))]
stats["shots_inside_box"] = len(inside_box)
stats["shots_outside_box"] = len(outside_box)
stats["inside_box_pct"] = round(len(inside_box) / len(shots) * 100, 1) if len(shots) > 0 else 0
# Central vs lateral (y < 36 = lateral izq, y > 64 = lateral der, else central)
central = shots[(shots["y"] >= 36) & (shots["y"] <= 64)]
lateral = shots[(shots["y"] < 36) | (shots["y"] > 64)]
stats["shots_central"] = len(central)
stats["shots_lateral"] = len(lateral)
stats["central_pct"] = round(len(central) / len(shots) * 100, 1) if len(shots) > 0 else 0
# Average shot distance from goal (goal at x=100)
if "x" in shots.columns:
shots["distance"] = 100 - shots["x"]
stats["avg_shot_distance"] = round(shots["distance"].mean(), 1)
# xG if available
if "xG" in shots.columns:
stats["xG_total"] = round(shots["xG"].sum(), 2)
else:
stats["shots"] = 0
else:
shot_data = _get_shots_from_events(events_df, seq_ids)
stats.update(shot_data)
return stats
def extract_xg_by_zone_stats(summary_df, events_df=None, full_league_df=None):
"""Extract xG accumulated by zone statistics."""
if summary_df.empty:
return {}
zone_col = "initial_zone" if "initial_zone" in summary_df.columns else "corner_zone"
if zone_col not in summary_df.columns:
return extract_shots_stats(summary_df, events_df, full_league_df)
total = len(summary_df)
seq_ids = summary_df["corner_sequence_id"].unique()
# Calculate xG by zone
zone_xg = []
for zone, zone_df in summary_df.groupby(zone_col):
zone_seq_ids = zone_df["corner_sequence_id"].unique()
zone_count = len(zone_df)
zone_goals = len(zone_df[zone_df["absorption_event"] == "goal"])
# Get xG for this zone's sequences
xg = 0
shots = 0
if events_df is not None and not events_df.empty:
zone_events = events_df[events_df["corner_sequence_id"].isin(zone_seq_ids)]
if "isShot" in zone_events.columns:
shot_events = zone_events[zone_events["isShot"] == True]
elif "event_type" in zone_events.columns:
shot_events = zone_events[zone_events["event_type"] == "shot"]
else:
shot_events = pd.DataFrame()
if len(shot_events) > 0 and "xG" in shot_events.columns:
xg = shot_events["xG"].sum()
shots = len(shot_events)
zone_xg.append({
"zone": zone,
"corners": zone_count,
"shots": int(shots),
"goals": int(zone_goals),
"xG": round(xg, 2),
"xG_per_corner": round(xg / zone_count, 3) if zone_count > 0 else 0
})
# Sort by xG descending
zone_xg.sort(key=lambda x: x["xG"], reverse=True)
# Total stats
total_shot_data = _get_shots_from_events(events_df, seq_ids)
return {
"total_sequences": total,
"total_shots": total_shot_data["shots"],
"total_xG": total_shot_data["xG"],
"top_zones_by_xG": zone_xg[:5],
"low_zones_by_xG": [z for z in zone_xg if z["xG"] > 0][-3:] if len([z for z in zone_xg if z["xG"] > 0]) > 5 else []
}
def extract_xg_avg_by_zone_stats(summary_df, events_df=None, full_league_df=None):
"""Extract average xG per corner by zone (efficiency focus)."""
stats = extract_xg_by_zone_stats(summary_df, events_df, full_league_df)
if not stats:
return {}
# Re-sort by xG per corner (efficiency) instead of total xG
if "top_zones_by_xG" in stats:
stats["zones_by_efficiency"] = sorted(
stats["top_zones_by_xG"],
key=lambda x: x.get("xG_per_corner", 0),
reverse=True
)
return stats
def extract_event_density_stats(summary_df, events_df=None, full_league_df=None):
"""Extract event density/heatmap statistics."""
stats = extract_zone_heatmap_stats(summary_df, events_df, full_league_df)
if not stats:
return {}
# Add event count info if available
if events_df is not None and not events_df.empty:
stats["total_events"] = len(events_df)
stats["events_per_sequence"] = round(len(events_df) / stats.get("total_sequences", 1), 1)
return stats
def extract_finalization_type_stats(summary_df, events_df=None, full_league_df=None):
"""Extract how sequences end by type (goal, shot, clearance, etc.)."""
if summary_df.empty or "absorption_event" not in summary_df.columns:
return {}
total = len(summary_df)
abs_counts = summary_df["absorption_event"].value_counts()
# Categorize endings
positive = ["goal"]
neutral = ["shot", "corner", "free_kick"]
negative = ["clearance", "interception", "out_of_play", "goal_kick"]
positive_count = sum(abs_counts.get(e, 0) for e in positive)
shot_count = abs_counts.get("shot", 0) + abs_counts.get("goal", 0)
negative_count = sum(abs_counts.get(e, 0) for e in negative)
return {
"total_sequences": total,
"goals": int(abs_counts.get("goal", 0)),
"shots_generated": int(shot_count),
"cleared": int(negative_count),
"goal_pct": round(abs_counts.get("goal", 0) / total * 100, 1) if total > 0 else 0,
"shot_pct": round(shot_count / total * 100, 1) if total > 0 else 0,
"cleared_pct": round(negative_count / total * 100, 1) if total > 0 else 0,
"full_distribution": {str(k): int(v) for k, v in abs_counts.items()}
}
def extract_absorption_by_zone_stats(summary_df, events_df=None, full_league_df=None):
"""Extract absorption events broken down by zone."""
if summary_df.empty:
return {}
zone_col = "initial_zone" if "initial_zone" in summary_df.columns else "corner_zone"
if zone_col not in summary_df.columns or "absorption_event" not in summary_df.columns:
return extract_absorption_stats(summary_df, events_df, full_league_df)
zone_absorption = []
for zone, zone_df in summary_df.groupby(zone_col):
total = len(zone_df)
goals = len(zone_df[zone_df["absorption_event"] == "goal"])
shots = len(zone_df[zone_df["absorption_event"].isin(["shot", "goal"])])
cleared = len(zone_df[zone_df["absorption_event"].isin(["clearance", "interception"])])
zone_absorption.append({
"zone": zone,
"corners": total,
"goals": int(goals),
"shots": int(shots),
"cleared": int(cleared),
"goal_rate": round(goals / total * 100, 1) if total > 0 else 0
})
zone_absorption.sort(key=lambda x: x["goal_rate"], reverse=True)
return {
"total_sequences": len(summary_df),
"zones": zone_absorption[:7]
}
def extract_league_comparison_stats(summary_df, events_df=None, full_league_df=None):
"""Extract team vs league comparison statistics."""
if summary_df.empty:
return {}
total = len(summary_df)
goals = len(summary_df[summary_df["absorption_event"] == "goal"])
seq_ids = summary_df["corner_sequence_id"].unique()
shot_data = _get_shots_from_events(events_df, seq_ids)
team_stats = {
"total_sequences": total,
"goals": int(goals),
"goal_rate": round(goals / total * 100, 2) if total > 0 else 0,
"shots": shot_data["shots"],
"shot_rate": round(shot_data["shots"] / total * 100, 2) if total > 0 else 0,
"xG": shot_data["xG"],
}
# Add league benchmarks for comparison
if full_league_df is not None and not full_league_df.empty:
league_benchmarks = _calculate_league_benchmarks(full_league_df, events_df)
team_stats["league_benchmarks"] = league_benchmarks
# Calculate differences
if "league_goal_rate" in league_benchmarks:
team_stats["goal_rate_vs_league"] = round(
team_stats["goal_rate"] - league_benchmarks["league_goal_rate"], 2
)
if "league_shot_rate" in league_benchmarks:
team_stats["shot_rate_vs_league"] = round(
team_stats["shot_rate"] - league_benchmarks.get("league_shot_rate", 0), 2
)
return team_stats
def extract_season_metrics_stats(summary_df, events_df=None, full_league_df=None):
"""Extract metrics broken down by season."""
if summary_df.empty or "season" not in summary_df.columns:
# Fall back to monthly if no season column
return _extract_monthly_trend(summary_df, events_df)
seasons = []
for season, season_df in summary_df.groupby("season"):
total = len(season_df)
goals = len(season_df[season_df["absorption_event"] == "goal"])
seq_ids = season_df["corner_sequence_id"].unique()
shot_data = _get_shots_from_events(events_df, seq_ids)
seasons.append({
"season": str(season),
"corners": total,
"goals": int(goals),
"goal_rate": round(goals / total * 100, 1) if total > 0 else 0,
"shots": shot_data["shots"],
"xG": shot_data["xG"]
})
seasons.sort(key=lambda x: x["season"])
return {
"total_sequences": len(summary_df),
"by_season": seasons
}
def extract_shot_rate_stats(summary_df, events_df=None, full_league_df=None):
"""Extract shot generation rate statistics."""
stats = extract_shots_stats(summary_df, events_df, full_league_df)
# Focus on rate metrics
if stats:
stats["sequences_per_shot"] = round(
stats["total_sequences"] / stats["shots"], 1
) if stats.get("shots", 0) > 0 else 0
return stats
def extract_goal_rate_by_zone_stats(summary_df, events_df=None, full_league_df=None):
"""Extract goal rate by destination zone."""
return _extract_zone_effectiveness(summary_df, events_df)
def extract_taker_analysis_stats(summary_df, events_df=None, full_league_df=None):
"""Extract corner taker specific statistics."""
if summary_df.empty or "corner_playerName" not in summary_df.columns:
return extract_summary_stats(summary_df, events_df, full_league_df)
taker_stats = []
for player, player_df in summary_df.groupby("corner_playerName"):
total = len(player_df)
goals = len(player_df[player_df["absorption_event"] == "goal"])
seq_ids = player_df["corner_sequence_id"].unique()
shot_data = _get_shots_from_events(events_df, seq_ids)
taker_stats.append({
"player": player,
"corners": total,
"goals": int(goals),
"goal_rate": round(goals / total * 100, 1) if total > 0 else 0,
"shots": shot_data["shots"],
"shot_rate": round(shot_data["shots"] / total * 100, 1) if total > 0 else 0,
"xG": shot_data["xG"]
})
taker_stats.sort(key=lambda x: x["corners"], reverse=True)
return {
"total_sequences": len(summary_df),
"total_takers": len(taker_stats),
"top_takers": taker_stats[:5],
"most_effective": sorted(taker_stats, key=lambda x: x["goal_rate"], reverse=True)[:3]
}
def extract_player_analysis_stats(summary_df, events_df=None, full_league_df=None):
"""Extract general player involvement statistics."""
# Similar to taker but can include scorers
stats = extract_taker_analysis_stats(summary_df, events_df, full_league_df)
# Add scorers if events available
if events_df is not None and not events_df.empty:
goal_seqs = summary_df[summary_df["absorption_event"] == "goal"]["corner_sequence_id"]
if len(goal_seqs) > 0:
goal_events = events_df[events_df["corner_sequence_id"].isin(goal_seqs)]
if "isShot" in goal_events.columns:
goal_shots = goal_events[goal_events["isShot"] == True]
if "event_playerName" in goal_shots.columns:
scorer_counts = goal_shots["event_playerName"].value_counts()
stats["top_scorers"] = [
{"player": p, "goals": int(c)}
for p, c in scorer_counts.head(3).items()
]
return stats
def extract_team_comparison_stats(summary_df, events_df=None, full_league_df=None):
"""Extract team-level comparison statistics."""
if summary_df.empty or "TeamName" not in summary_df.columns:
return extract_summary_stats(summary_df, events_df, full_league_df)
team_stats = []
for team, team_df in summary_df.groupby("TeamName"):
total = len(team_df)
goals = len(team_df[team_df["absorption_event"] == "goal"])
seq_ids = team_df["corner_sequence_id"].unique()
shot_data = _get_shots_from_events(events_df, seq_ids)
team_stats.append({
"team": team,
"corners": total,
"goals": int(goals),
"goal_rate": round(goals / total * 100, 1) if total > 0 else 0,
"shots": shot_data["shots"],
"xG": shot_data["xG"]
})
team_stats.sort(key=lambda x: x["goal_rate"], reverse=True)
return {
"total_sequences": len(summary_df),
"teams": team_stats[:10]
}
def extract_goal_sequences_stats(summary_df, events_df=None, full_league_df=None):
"""Extract statistics about sequences that ended in goals."""
if summary_df.empty:
return {}
goal_seqs = summary_df[summary_df["absorption_event"] == "goal"]
total_goals = len(goal_seqs)
if total_goals == 0:
return {
"total_sequences": len(summary_df),
"total_goals": 0,
"goal_rate": 0
}
stats = {
"total_sequences": len(summary_df),
"total_goals": total_goals,
"goal_rate": round(total_goals / len(summary_df) * 100, 2),
}
# Analyze goal sequences
if "sequence_length" in goal_seqs.columns:
stats["avg_goal_sequence_length"] = round(goal_seqs["sequence_length"].mean(), 1)
# Zone distribution of goals
zone_col = "initial_zone" if "initial_zone" in goal_seqs.columns else "corner_zone"
if zone_col in goal_seqs.columns:
zone_goals = goal_seqs[zone_col].value_counts()
stats["goals_by_zone"] = [
{"zone": z, "goals": int(c)}
for z, c in zone_goals.head(5).items()
]
# Side distribution
if "corner_side" in goal_seqs.columns:
side_goals = goal_seqs["corner_side"].value_counts()
stats["goals_by_side"] = {str(k): int(v) for k, v in side_goals.items()}
return stats
def extract_sequence_length_stats(summary_df, events_df=None, full_league_df=None):
"""Extract sequence length distribution statistics."""
if summary_df.empty or "sequence_length" not in summary_df.columns:
return {}
lengths = summary_df["sequence_length"]
total = len(summary_df)
goals = len(summary_df[summary_df["absorption_event"] == "goal"])
# Length distribution
length_dist = lengths.value_counts().sort_index()
# Goal rate by length
length_goal_rate = []
for length, count in length_dist.items():
length_df = summary_df[summary_df["sequence_length"] == length]
length_goals = len(length_df[length_df["absorption_event"] == "goal"])
length_goal_rate.append({
"length": int(length),
"count": int(count),
"goals": int(length_goals),
"goal_rate": round(length_goals / count * 100, 1) if count > 0 else 0
})
return {
"total_sequences": total,
"avg_length": round(lengths.mean(), 1),
"min_length": int(lengths.min()),
"max_length": int(lengths.max()),
"median_length": int(lengths.median()),
"length_distribution": length_goal_rate[:10]
}
def extract_half_comparison_stats(summary_df, events_df=None, full_league_df=None):
"""Extract first vs second half comparison."""
if summary_df.empty or "period_id" not in summary_df.columns:
return {}
halves = {}
for half in [1, 2]:
half_df = summary_df[summary_df["period_id"] == half]
if len(half_df) == 0:
continue
goals = len(half_df[half_df["absorption_event"] == "goal"])
seq_ids = half_df["corner_sequence_id"].unique()
shot_data = _get_shots_from_events(events_df, seq_ids)
halves[f"half_{half}"] = {
"count": len(half_df),
"goals": int(goals),
"goal_rate": round(goals / len(half_df) * 100, 2),
"shots": shot_data["shots"],
"xG": shot_data["xG"]
}
return {
"total_sequences": len(summary_df),
"by_half": halves
}
def extract_summary_stats(summary_df, events_df=None, full_league_df=None):
"""Extract comprehensive summary statistics."""
if summary_df.empty:
return {}
total = len(summary_df)
goals = len(summary_df[summary_df["absorption_event"] == "goal"])
# Get shots from events
seq_ids = summary_df["corner_sequence_id"].unique()
shot_data = _get_shots_from_events(events_df, seq_ids)
stats = {
"total_sequences": total,
"goals": int(goals),
"goal_rate": round(goals / total * 100, 2) if total > 0 else 0,
"shots": shot_data["shots"],
"shot_rate": round(shot_data["shots"] / total * 100, 2) if total > 0 else 0,
"xG": shot_data["xG"],
"xG_per_sequence": round(shot_data["xG"] / total, 3) if total > 0 else 0,
"avg_xG_per_shot": shot_data["avg_xG"],
"conversion_rate": round(goals / shot_data["shots"] * 100, 1) if shot_data["shots"] > 0 else 0,
"avg_sequence_length": round(summary_df["sequence_length"].mean(), 1) if "sequence_length" in summary_df.columns else 0
}
# Add league benchmarks
if full_league_df is not None:
stats["benchmarks"] = _calculate_league_benchmarks(full_league_df, events_df)
# Add player stats
player_stats = _extract_player_stats(summary_df, events_df)
if player_stats:
stats.update(player_stats)
# Add opponent breakdown
opponent_stats = _extract_opponent_breakdown(summary_df, events_df)
if opponent_stats:
stats.update(opponent_stats)
# Add monthly trend
trend_stats = _extract_monthly_trend(summary_df, events_df)
if trend_stats:
stats.update(trend_stats)
return stats
# =============================================================================
# PROMPT BUILDERS
# =============================================================================
SYSTEM_PROMPT = """Analista táctico de balón parado. Informe oficial para cuerpo técnico.
FORMATO:
- Texto plano sin markdown (no usar **, ##, ni bullets)
- Español formal con vocabulario futbolístico (primer palo, segundo palo, área chica, punto penal, frontal del área)
- Ir directo al análisis, sin títulos
- 3-5 oraciones describiendo qué muestran los datos
- Enfoque descriptivo, no prescriptivo
MÉTRICAS A USAR: xG, xT, porcentajes, "1 de cada X corners"
EVITAR:
- Mencionar ESV o "valor de secuencia"
- Inventar comparativas con la liga si no hay datos de "league_comparison"
SI HAY DATOS DE LIGA (league_comparison):
- Valores positivos → "X% por encima de la media de la liga"
- Valores negativos → "X% por debajo de la media de la liga"
"""
PLOT_PROMPTS = {
"zone_heatmap": """DATOS - Distribución de zonas de destino:
{stats}
CONFIGURACIÓN: {context}
Describir: Zonas predominantes, nivel de concentración o dispersión, patrones de envío.""",
"absorption": """DATOS - Finalización de secuencias:
{stats}
CONFIGURACIÓN: {context}
Describir: Cómo terminan las jugadas, efectividad comparada con la liga si hay benchmarks, balance entre goles/remates/pérdidas.""",
"esv_by_zone": """DATOS - Efectividad por zona:
{stats}
CONFIGURACIÓN: {context}
Describir: Qué zonas producen más goles y remates, diferencias de rendimiento entre zonas.""",
"side_analysis": """DATOS - Comparativa izquierda vs derecha:
{stats}
CONFIGURACIÓN: {context}
Describir: Diferencias de rendimiento entre lados, volumen de corners por lado, efectividad comparada.""",
"time_periods": """DATOS - Rendimiento por tramo del partido:
{stats}
CONFIGURACIÓN: {context}
Describir: En qué momentos son más/menos peligrosos, tendencias a lo largo del partido.""",
"shots": """DATOS - Generación de remates:
{stats}
CONFIGURACIÓN: {context}
Describir: Volumen de remates generados, calidad de las ocasiones (xG), tasa de conversión.""",
"shots_quality": """DATOS - Calidad de los remates (xG individual):
{stats}
CONFIGURACIÓN: {context}
Describir: Distribución de calidad de las ocasiones generadas. Analizar la proporción de ocasiones de alta calidad (xG alto) vs ocasiones de baja calidad. Comentar el xG promedio y máximo de los remates, indicando si se generan oportunidades claras de gol o remates de bajo peligro.""",
"shots_location": """DATOS - Ubicación de los remates en el campo:
{stats}
CONFIGURACIÓN: {context}
Describir: De dónde provienen los remates generados. Analizar la proporción dentro vs fuera del área, remates desde zona central vs lateral, y la distancia media al arco. Indicar si los remates se producen desde posiciones favorables o difíciles.""",
"xg_by_zone": """DATOS - xG acumulado por zona de destino:
{stats}
CONFIGURACIÓN: {context}
Describir: Qué zonas de destino del corner generan más xG acumulado. Identificar las zonas más productivas y las menos efectivas en términos de generación de peligro. Comparar el xG por corner entre las distintas zonas.""",
"xg_avg_by_zone": """DATOS - xG promedio por zona (eficiencia):
{stats}
CONFIGURACIÓN: {context}
Describir: Eficiencia de cada zona en términos de xG generado por corner. Identificar qué zonas maximizan el peligro por envío, distinguiendo entre volumen (muchos corners) y eficiencia (alto xG por corner).""",
"event_density": """DATOS - Densidad de eventos en el campo:
{stats}
CONFIGURACIÓN: {context}
Describir: Distribución espacial de los eventos durante las secuencias. Indicar las zonas de mayor actividad y concentración de acciones.""",
"finalization_type": """DATOS - Tipo de finalización de secuencias:
{stats}
CONFIGURACIÓN: {context}
Describir: Cómo terminan las jugadas de balón parado clasificadas por tipo (gol, remate, despeje, etc.). Analizar el balance entre finalizaciones positivas y negativas.""",
"absorption_by_zone": """DATOS - Finalización por zona de destino:
{stats}
CONFIGURACIÓN: {context}
Describir: Cómo terminan las jugadas según la zona a la que se envía el balón. Identificar qué zonas tienen mayor tasa de finalización positiva (gol/remate) vs negativa (despeje).""",
"league_comparison": """DATOS - Comparativa equipo vs liga:
{stats}
CONFIGURACIÓN: {context}
Describir: Rendimiento del equipo comparado con la media de la liga. USAR ÚNICAMENTE los porcentajes de "COMPARATIVA LIGA" si aparecen en la configuración (ej: "+71.5% vs liga" significa 71.5% por encima de la media). Si no hay datos de comparativa, solo describir las métricas del equipo sin mencionar la liga.""",
"season_metrics": """DATOS - Métricas por temporada:
{stats}
CONFIGURACIÓN: {context}
Describir: Evolución del rendimiento a lo largo de las temporadas. Identificar tendencias de mejora o empeoramiento, y comparar las distintas temporadas entre sí.""",
"shot_rate": """DATOS - Tasa de generación de remates:
{stats}
CONFIGURACIÓN: {context}
Describir: Frecuencia con la que los corners generan remates. Analizar cuántas secuencias se necesitan en promedio para generar un tiro y la eficiencia en la creación de ocasiones.""",
"goal_rate_by_zone": """DATOS - Tasa de gol por zona:
{stats}
CONFIGURACIÓN: {context}
Describir: Qué zonas de destino tienen mayor probabilidad de terminar en gol. Identificar las zonas más letales y las que tienen menor efectividad goleadora.""",
"taker_analysis": """DATOS - Análisis de ejecutores de corners:
{stats}
CONFIGURACIÓN: {context}
Describir: Rendimiento de los ejecutores de corners. Identificar al principal ejecutor por volumen y al más efectivo por tasa de gol/remate. Comparar productividad entre los distintos ejecutores.""",
"player_analysis": """DATOS - Análisis de jugadores involucrados:
{stats}
CONFIGURACIÓN: {context}
Describir: Participación de jugadores en las secuencias de balón parado. Identificar ejecutores principales y goleadores si los hay. Analizar la distribución de responsabilidades.""",
"team_comparison": """DATOS - Comparativa entre equipos:
{stats}
CONFIGURACIÓN: {context}
Describir: Rendimiento comparativo entre equipos en balón parado. Identificar los equipos más efectivos y los patrones que los diferencian.""",
"goal_sequences": """DATOS - Secuencias que terminaron en gol:
{stats}
CONFIGURACIÓN: {context}
Describir: Características de las jugadas que terminaron en gol. Analizar desde qué zonas llegaron los goles, qué lado fue más productivo, y patrones comunes en las secuencias exitosas.""",
"sequence_length": """DATOS - Longitud de las secuencias:
{stats}
CONFIGURACIÓN: {context}
Describir: Distribución de la duración de las jugadas (número de eventos). Analizar si las secuencias cortas o largas tienen mayor efectividad, y cuál es la longitud típica.""",
"half_comparison": """DATOS - Primer tiempo vs segundo tiempo:
{stats}
CONFIGURACIÓN: {context}
Describir: Diferencias de rendimiento entre tiempos, consistencia, cambios de patrón.""",
"summary": """DATOS - Resumen completo de rendimiento:
{stats}
CONFIGURACIÓN: {context}
Describir: Rendimiento general (goles, remates, xG), principales ejecutores, rendimiento contra distintos rivales, evolución mensual si hay datos de tendencia. SOLO mencionar comparativa con liga si aparece "COMPARATIVA LIGA" en la configuración con porcentajes explícitos.""",
"conclusion": """DATOS COMPLETOS DEL INFORME:
{stats}
CONFIGURACIÓN: {context}
Redactar una CONCLUSIÓN EJECUTIVA del informe de balón parado. Incluir:
1. Resumen del rendimiento global (eficiencia goleadora, generación de ocasiones)
2. Puntos fuertes identificados (zonas, ejecutores, momentos del partido)
3. Áreas de oportunidad (sin dar recomendaciones, solo describir dónde hay margen)
4. SOLO si hay "COMPARATIVA LIGA" en la configuración: usar esos porcentajes exactos (ej: "+71.5% vs liga" = 71.5% por encima)
5. Tendencia general si hay datos mensuales (mejorando, estable, empeorando)
IMPORTANTE: Si no hay "COMPARATIVA LIGA" explícita, NO mencionar la liga ni hacer comparativas.
Extensión: 6-8 oraciones. Tono formal y objetivo."""
}
def build_context_string(context_dict):
"""Build a concise context string from user configuration."""
if isinstance(context_dict, str):
return context_dict
parts = []
# Core info
if context_dict.get("team"):
parts.append(f"Equipo: {context_dict['team']}")
if context_dict.get("league"):
parts.append(f"Liga: {context_dict['league'].replace('_', ' ')}")
# Analysis type
analysis_type = context_dict.get("analysis_type", "ofensivo")
parts.append(f"Análisis: {'corners a favor' if analysis_type == 'ofensivo' else 'corners en contra'}")
# Set piece type
sp_type = context_dict.get("set_piece_type", "corners")
sp_names = {"corners": "Corners", "free_kicks": "Tiros libres", "throw_ins": "Saques de banda", "goal_kicks": "Saques de arco"}
parts.append(f"Tipo: {sp_names.get(sp_type, sp_type)}")
# Seasons
if context_dict.get("seasons"):
seasons = context_dict["seasons"]
if isinstance(seasons, list):
parts.append(f"Temporadas: {', '.join(seasons)}")
else:
parts.append(f"Temporada: {seasons}")
# Filters applied
filters = []
if context_dict.get("half"):
filters.append(f"Tiempo: {context_dict['half']}º")
if context_dict.get("side"):
side_name = "izquierda" if context_dict["side"] == "IZQ" else "derecha"
filters.append(f"Lado: {side_name}")
if context_dict.get("corner_type"):
filters.append(f"Tipo corner: {context_dict['corner_type']}")
if context_dict.get("match_state"):
states = {"winning": "ganando", "drawing": "empatando", "losing": "perdiendo"}
filters.append(f"Estado: {states.get(context_dict['match_state'], context_dict['match_state'])}")
if context_dict.get("period"):
filters.append(f"Período: min {context_dict['period']}")
if filters:
parts.append(f"Filtros: {', '.join(filters)}")
# Sample size indicator
if context_dict.get("total_sequences"):
parts.append(f"Muestra: {context_dict['total_sequences']} secuencias")
# League comparison data (IMPORTANT for AI to use correctly)
lc = context_dict.get("league_comparison", {})
if lc:
comparison_parts = []
if lc.get("goal_rate") is not None:
val = lc["goal_rate"]
comparison_parts.append(f"tasa_gol: {'+' if val >= 0 else ''}{val}% vs liga")
if lc.get("shot_rate") is not None:
val = lc["shot_rate"]
comparison_parts.append(f"tasa_remate: {'+' if val >= 0 else ''}{val}% vs liga")
if lc.get("xg_total") is not None:
val = lc["xg_total"]
comparison_parts.append(f"xG: {'+' if val >= 0 else ''}{val}% vs liga")
if comparison_parts:
parts.append(f"COMPARATIVA LIGA: {', '.join(comparison_parts)}")
# Basic stats for reference
bs = context_dict.get("basic_stats", {})
if bs and bs.get("goal_rate") is not None:
parts.append(f"Stats: {bs.get('goals', 0)} goles, {bs.get('goal_rate', 0)}% tasa gol, {bs.get('shots', 0)} remates")
return " | ".join(parts)
def build_prompt(plot_type, stats_dict, context):
"""Build the prompt for Gemini."""
template = PLOT_PROMPTS.get(plot_type, PLOT_PROMPTS["summary"])
stats_str = json.dumps(stats_dict, indent=2, ensure_ascii=False)
context_str = build_context_string(context)
return template.format(stats=stats_str, context=context_str)
# =============================================================================
# MAIN ANALYSIS FUNCTION
# =============================================================================
def generate_plot_analysis(plot_type, summary_df, events_df=None, context=""):
"""
Generate AI analysis for a plot.
Args:
plot_type: Type of plot (zone_heatmap, absorption, esv_by_zone, etc.)
summary_df: Summary DataFrame
events_df: Events DataFrame (optional)
context: Additional context (team, league, filters, etc.)
Returns:
dict with 'analysis' text and 'stats' used
"""
# Extract stats based on plot type (all extractors now receive events_df for shot data)
extractors = {
# Zone/heatmap
"zone_heatmap": lambda: extract_zone_heatmap_stats(summary_df, events_df),
"event_density": lambda: extract_event_density_stats(summary_df, events_df),
# Absorption/finalization
"absorption": lambda: extract_absorption_stats(summary_df, events_df),
"finalization_type": lambda: extract_finalization_type_stats(summary_df, events_df),
"absorption_by_zone": lambda: extract_absorption_by_zone_stats(summary_df, events_df),
# Zone effectiveness
"esv_by_zone": lambda: extract_esv_by_zone_stats(summary_df, events_df),
"goal_rate_by_zone": lambda: extract_goal_rate_by_zone_stats(summary_df, events_df),
# Side analysis
"side_analysis": lambda: extract_side_analysis_stats(summary_df, events_df),
# Time/comparison
"time_periods": lambda: extract_time_period_stats(summary_df, events_df),
"half_comparison": lambda: extract_half_comparison_stats(summary_df, events_df),
"league_comparison": lambda: extract_league_comparison_stats(summary_df, events_df),
"season_metrics": lambda: extract_season_metrics_stats(summary_df, events_df),
# Shots/xG
"shots": lambda: extract_shots_stats(summary_df, events_df),
"shots_quality": lambda: extract_shots_quality_stats(summary_df, events_df),
"shots_location": lambda: extract_shots_location_stats(summary_df, events_df),
"shot_rate": lambda: extract_shot_rate_stats(summary_df, events_df),
"xg_by_zone": lambda: extract_xg_by_zone_stats(summary_df, events_df),
"xg_avg_by_zone": lambda: extract_xg_avg_by_zone_stats(summary_df, events_df),
# Player/taker
"taker_analysis": lambda: extract_taker_analysis_stats(summary_df, events_df),
"player_analysis": lambda: extract_player_analysis_stats(summary_df, events_df),
"team_comparison": lambda: extract_team_comparison_stats(summary_df, events_df),
# Sequences
"goal_sequences": lambda: extract_goal_sequences_stats(summary_df, events_df),
"sequence_length": lambda: extract_sequence_length_stats(summary_df, events_df),
# Summary (fallback)
"summary": lambda: extract_summary_stats(summary_df, events_df),
}
extractor = extractors.get(plot_type, extractors["summary"])
stats = extractor()
if not stats:
return {"analysis": "No hay suficientes datos para generar análisis.", "stats": {}}
# Remove ESV-related keys from stats before sending to AI (internal metric)
def remove_esv_keys(obj):
if isinstance(obj, dict):
return {k: remove_esv_keys(v) for k, v in obj.items() if 'esv' not in k.lower()}
elif isinstance(obj, list):
return [remove_esv_keys(item) for item in obj]
return obj
stats_for_ai = remove_esv_keys(stats)
# Check cache
cache_key = get_cache_key(plot_type, stats, context)
cached = get_cached_analysis(cache_key)
if cached:
return {"analysis": cached, "stats": stats, "cached": True}
# Check API key availability
if not os.environ.get("GEMINI_API_KEY") or not GEMINI_AVAILABLE:
return {
"analysis": "API de Gemini no configurada. Configura GEMINI_API_KEY en las variables de entorno.",
"stats": stats,
"error": "no_api_key"
}
# Generate analysis with retry logic and model fallback
prompt = build_prompt(plot_type, stats_for_ai, context)
last_error = None
for model_name in GEMINI_MODELS:
model = get_gemini_client(model_name)
if not model:
continue
# Retry with exponential backoff
for attempt in range(3):
try:
response = model.generate_content(
f"{SYSTEM_PROMPT}\n\n{prompt}",
generation_config=genai.types.GenerationConfig(
max_output_tokens=500,
temperature=0.7,
)
)
analysis = response.text.strip()
# Strip markdown formatting for plain text output
analysis = strip_markdown(analysis)
# Cache result
save_to_cache(cache_key, analysis)
return {"analysis": analysis, "stats": stats, "cached": False, "model": model_name}
except Exception as e:
last_error = str(e)
# Check if it's a rate limit error (429)
if "429" in str(e) or "quota" in str(e).lower() or "rate" in str(e).lower():
wait_time = (2 ** attempt) * 2 # 2s, 4s, 8s
time.sleep(wait_time)
continue
# For other errors, try next model
break
return {
"analysis": f"Error al generar análisis después de varios intentos: {last_error}",
"stats": stats,
"error": last_error
}
def generate_report_conclusion(summary_df, events_df=None, context=""):
"""
Generate a comprehensive conclusion for the PDF report.
Args:
summary_df: Summary DataFrame with all sequences
events_df: Events DataFrame (optional)
context: Context dict with team, league, filters
Returns:
dict with 'conclusion' text
"""
# Extract comprehensive stats for conclusion
stats = extract_summary_stats(summary_df, events_df)
if not stats:
return {"conclusion": "No hay suficientes datos para generar conclusión.", "stats": {}}
# Remove ESV-related keys
def remove_esv_keys(obj):
if isinstance(obj, dict):
return {k: remove_esv_keys(v) for k, v in obj.items() if 'esv' not in k.lower()}
elif isinstance(obj, list):
return [remove_esv_keys(item) for item in obj]
return obj
stats_for_ai = remove_esv_keys(stats)
# Check cache
cache_key = get_cache_key("conclusion", stats, context)
cached = get_cached_analysis(cache_key)
if cached:
return {"conclusion": cached, "stats": stats, "cached": True}
# Check API key
if not os.environ.get("GEMINI_API_KEY") or not GEMINI_AVAILABLE:
return {"conclusion": "", "stats": stats, "error": "no_api_key"}
# Build prompt
prompt = build_prompt("conclusion", stats_for_ai, context)
last_error = None
for model_name in GEMINI_MODELS:
model = get_gemini_client(model_name)
if not model:
continue
for attempt in range(3):
try:
response = model.generate_content(
f"{SYSTEM_PROMPT}\n\n{prompt}",
generation_config=genai.types.GenerationConfig(
max_output_tokens=800, # Longer for conclusion
temperature=0.7,
)
)
conclusion = response.text.strip()
conclusion = strip_markdown(conclusion)
save_to_cache(cache_key, conclusion)
return {"conclusion": conclusion, "stats": stats, "cached": False, "model": model_name}
except Exception as e:
last_error = str(e)
if "429" in str(e) or "quota" in str(e).lower() or "rate" in str(e).lower():
wait_time = (2 ** attempt) * 2
time.sleep(wait_time)
continue
break
return {"conclusion": f"Error: {last_error}", "stats": stats, "error": last_error}
# Mapping from plot filename patterns to analysis types
# Order matters - more specific patterns should come first
PLOT_TYPE_MAPPING = [
# Zone/heatmap plots
("distribucion_destinos", "zone_heatmap"),
("densidad_eventos", "event_density"),
("heatmap", "zone_heatmap"),
("destino", "zone_heatmap"),
# Absorption/finalization plots - differentiated
("distribucion_absorcion", "absorption"),
("finalizacion_por_tipo", "finalization_type"),
("absorcion_por_zona", "absorption_by_zone"),
("absorption", "absorption"),
# Side analysis
("analisis_lado", "side_analysis"),
("lado_corner", "side_analysis"),
("izq_der", "side_analysis"),
("side", "side_analysis"),
# Time/period/comparison analysis - differentiated
("comparacion_fases", "half_comparison"),
("comparacion_vs_liga", "league_comparison"),
("metricas_por_temporada", "season_metrics"),
("periodo", "time_periods"),
("temporal", "time_periods"),
# Shot/xG plots - specific types for different visualizations
("tiros_con_xg", "shots_quality"),
("ubicacion_tiros", "shots_location"),
("xg_acumulado", "xg_by_zone"),
("xg_promedio", "xg_avg_by_zone"),
("tasa_tiro", "shot_rate"),
("tasa_gol_por_zona", "goal_rate_by_zone"),
("shot", "shots"),
("tiro", "shots"),
# Zone effectiveness
("esv_por_zona", "esv_by_zone"),
("esv_medio", "esv_by_zone"),
# Player/taker analysis - differentiated
("analisis_ejecutores", "taker_analysis"),
("analisis_jugadores", "player_analysis"),
("esv_por_ejecutor", "taker_analysis"),
("esv_por_equipo", "team_comparison"),
# Sequences - specific type
("secuencias_gol", "goal_sequences"),
# Length/distribution
("distribucion_longitud", "sequence_length"),
# General summary (fallbacks)
("resumen", "summary"),
("summary", "summary"),
("tabla", "summary"),
]
def infer_plot_type(plot_name):
"""Infer plot type from plot filename or display name.
Handles both raw filenames (e.g., 'distribucion_destinos') and
display names (e.g., 'Distribucion Destinos').
"""
# Normalize: lowercase and handle both underscores and spaces
plot_name_normalized = plot_name.lower().replace(' ', '_')
for pattern, plot_type in PLOT_TYPE_MAPPING:
if pattern in plot_name_normalized:
return plot_type
return "summary"