racing-analysis / src /tracking_processor.py
matias-cataife's picture
Upload src/tracking_processor.py with huggingface_hub
94c785f verified
"""
Tracking Data Processor
=======================
Este módulo procesa datos de tracking de jugadores para enriquecer
el análisis de balón parado con métricas físicas y posicionales.
Métricas que se pueden calcular:
--------------------------------
1. MÉTRICAS FÍSICAS (por secuencia de BP):
- Distancia total recorrida por equipo
- Velocidad máxima alcanzada
- Sprints (>25 km/h) durante la secuencia
- Aceleración/desaceleración
2. MÉTRICAS POSICIONALES (en el momento del corner):
- Formación defensiva (distribución de jugadores en área)
- Marcaje hombre a hombre vs zonal
- Jugadores en zona de remate
- Espacios libres en el área
3. MÉTRICAS DE MOVIMIENTO (durante la secuencia):
- Carreras de desmarque
- Movimientos de blocaje
- Pressing post-pérdida
4. MÉTRICAS DE RECUPERACIÓN DEFENSIVA:
- Tiempo para reorganizarse
- Jugadores en posición defensiva
- Transiciones defensivas
"""
import pandas as pd
import numpy as np
from pathlib import Path
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass
import json
@dataclass
class PitchDimensions:
"""Dimensiones del campo en metros (105x68 es estándar UEFA)"""
length: float = 105.0
width: float = 68.0
penalty_area_length: float = 16.5
penalty_area_width: float = 40.32
goal_area_length: float = 5.5
goal_area_width: float = 18.32
center_x: float = 0.0 # Centro del campo
center_y: float = 0.0
@dataclass
class TrackingFrame:
"""Representa un frame de tracking con todos los jugadores"""
frame: int
timestamp_ms: int
period: int
home_players: Dict[str, Tuple[float, float, float]] # player_id -> (x, y, speed)
away_players: Dict[str, Tuple[float, float, float]]
ball_position: Tuple[float, float, float] # x, y, z
ball_speed: float
team_in_possession: Optional[str]
player_in_possession: Optional[str]
class TrackingDataLoader:
"""Carga y parsea datos de tracking"""
def __init__(self, filepath: str):
self.filepath = Path(filepath)
self.df: Optional[pd.DataFrame] = None
self.pitch = PitchDimensions()
def load(self, sample_rate: int = 1) -> pd.DataFrame:
"""
Carga el archivo de tracking.
Args:
sample_rate: Leer cada N frames (1 = todos, 5 = cada 5 frames)
"""
print(f"📂 Cargando tracking data: {self.filepath.name}")
self.df = pd.read_csv(
self.filepath,
low_memory=False,
dtype={
'frame': 'int32',
'player_id': 'str',
'player_x': 'float32',
'player_y': 'float32',
'player_speed': 'float32',
'is_player_visible': 'int8',
'ball_x': 'float32',
'ball_y': 'float32',
'ball_z': 'float32',
'ball_speed': 'float32',
'is_ball_visible': 'int8',
'match_period': 'int8',
'video_time_ms': 'int32'
}
)
# Downsample si es necesario
if sample_rate > 1:
unique_frames = self.df['frame'].unique()
sampled_frames = unique_frames[::sample_rate]
self.df = self.df[self.df['frame'].isin(sampled_frames)]
print(f" ✓ {len(self.df):,} filas cargadas")
print(f" ✓ {self.df['frame'].nunique():,} frames")
print(f" ✓ {self.df['player_id'].nunique()} jugadores únicos")
return self.df
def get_teams(self) -> Tuple[str, str]:
"""Identifica los IDs de los dos equipos"""
teams = self.df['team_in_poss'].dropna().unique()
teams = [t for t in teams if pd.notna(t)]
return tuple(teams[:2]) if len(teams) >= 2 else (teams[0], None)
class SetPieceTrackingExtractor:
"""
Extrae datos de tracking para secuencias de balón parado.
Combina los eventos procesados con los datos de tracking.
"""
def __init__(self, tracking_df: pd.DataFrame, events_df: pd.DataFrame):
self.tracking = tracking_df
self.events = events_df
self.fps = 25 # Frames por segundo
def get_frame_for_timestamp(self, period: int, minute: int, second: int) -> Optional[int]:
"""
Encuentra el frame correspondiente a un momento del partido.
"""
# Convertir minuto/segundo a milisegundos del video
if period == 2:
# El segundo tiempo empieza desde 0 en video_time_ms
target_ms = (minute - 45) * 60 * 1000 + second * 1000
else:
target_ms = minute * 60 * 1000 + second * 1000
period_df = self.tracking[self.tracking['match_period'] == period]
if period_df.empty:
return None
# Encontrar el frame más cercano
closest_idx = (period_df['video_time_ms'] - target_ms).abs().idxmin()
return period_df.loc[closest_idx, 'frame']
def extract_sequence_tracking(
self,
period: int,
start_minute: int,
start_second: int,
duration_seconds: float = 10.0
) -> pd.DataFrame:
"""
Extrae los datos de tracking para una secuencia de balón parado.
Args:
period: Período del partido (1 o 2)
start_minute: Minuto de inicio
start_second: Segundo de inicio
duration_seconds: Duración de la secuencia a extraer
Returns:
DataFrame con el tracking de la secuencia
"""
start_frame = self.get_frame_for_timestamp(period, start_minute, start_second)
if start_frame is None:
return pd.DataFrame()
end_frame = start_frame + int(duration_seconds * self.fps)
return self.tracking[
(self.tracking['frame'] >= start_frame) &
(self.tracking['frame'] <= end_frame) &
(self.tracking['match_period'] == period)
]
class TrackingMetricsCalculator:
"""Calcula métricas avanzadas a partir de datos de tracking"""
def __init__(self, pitch: PitchDimensions = None):
self.pitch = pitch or PitchDimensions()
def calculate_physical_metrics(self, sequence_df: pd.DataFrame) -> Dict:
"""
Calcula métricas físicas para una secuencia.
Returns:
Dict con métricas como distancia total, sprints, velocidad máxima
"""
if sequence_df.empty:
return {}
metrics = {}
# Velocidad máxima por jugador
max_speeds = sequence_df.groupby('player_id')['player_speed'].max()
metrics['max_speed_kmh'] = float(max_speeds.max() * 3.6)
# Sprints (>25 km/h = 6.94 m/s)
sprint_threshold = 6.94
sprints = sequence_df[sequence_df['player_speed'] > sprint_threshold]
metrics['num_sprints'] = len(sprints['player_id'].unique())
# Distancia total por equipo (aproximación)
# Calculamos el desplazamiento entre frames
sequence_sorted = sequence_df.sort_values(['player_id', 'frame'])
sequence_sorted['dx'] = sequence_sorted.groupby('player_id')['player_x'].diff()
sequence_sorted['dy'] = sequence_sorted.groupby('player_id')['player_y'].diff()
sequence_sorted['distance'] = np.sqrt(
sequence_sorted['dx']**2 + sequence_sorted['dy']**2
)
total_distance = sequence_sorted.groupby('player_id')['distance'].sum()
metrics['total_distance_m'] = float(total_distance.sum())
metrics['avg_distance_per_player_m'] = float(total_distance.mean())
return metrics
def calculate_defensive_setup(
self,
frame_df: pd.DataFrame,
defending_team_id: str,
attacking_side: str = 'right' # 'left' o 'right' indica qué arco defienden
) -> Dict:
"""
Analiza la disposición defensiva en un momento específico (e.g., al ejecutarse el corner).
Returns:
Dict con métricas de formación defensiva
"""
if frame_df.empty:
return {}
# Filtrar jugadores visibles del equipo defensor
# (asumiendo que podemos inferir el equipo del jugador por contexto)
visible_players = frame_df[frame_df['is_player_visible'] == 1]
# Definir zona del área (depende de qué lado ataca)
if attacking_side == 'right':
penalty_area_x = self.pitch.length / 2 - self.pitch.penalty_area_length
area_filter = visible_players['player_x'] >= penalty_area_x
else:
penalty_area_x = -self.pitch.length / 2 + self.pitch.penalty_area_length
area_filter = visible_players['player_x'] <= penalty_area_x
players_in_area = visible_players[area_filter]
metrics = {
'players_in_penalty_area': len(players_in_area),
'avg_distance_to_goal': 0,
'defensive_spread': 0, # Dispersión de la defensa
}
if not players_in_area.empty:
# Calcular dispersión (std de posiciones)
metrics['defensive_spread_x'] = float(players_in_area['player_x'].std())
metrics['defensive_spread_y'] = float(players_in_area['player_y'].std())
# Distancia promedio al arco
goal_x = self.pitch.length / 2 if attacking_side == 'right' else -self.pitch.length / 2
metrics['avg_distance_to_goal'] = float(
np.sqrt((players_in_area['player_x'] - goal_x)**2 +
players_in_area['player_y']**2).mean()
)
return metrics
def detect_runs(
self,
sequence_df: pd.DataFrame,
speed_threshold_kmh: float = 20.0
) -> List[Dict]:
"""
Detecta carreras significativas durante una secuencia.
Returns:
Lista de carreras detectadas con info del jugador, duración, etc.
"""
speed_threshold = speed_threshold_kmh / 3.6 # Convertir a m/s
runs = []
for player_id in sequence_df['player_id'].unique():
player_df = sequence_df[sequence_df['player_id'] == player_id].sort_values('frame')
# Detectar secuencias de frames con velocidad alta
high_speed = player_df['player_speed'] > speed_threshold
# Encontrar inicio/fin de carreras
run_start = None
for idx, (frame, is_running) in enumerate(zip(player_df['frame'], high_speed)):
if is_running and run_start is None:
run_start = frame
elif not is_running and run_start is not None:
runs.append({
'player_id': player_id,
'start_frame': run_start,
'end_frame': frame,
'duration_frames': frame - run_start,
'max_speed_kmh': float(
player_df[
(player_df['frame'] >= run_start) &
(player_df['frame'] < frame)
]['player_speed'].max() * 3.6
)
})
run_start = None
return runs
class TrackingProcessor:
"""
Procesador principal que integra tracking con secuencias de balón parado.
"""
def __init__(self, tracking_path: str, match_id: str):
self.tracking_path = Path(tracking_path)
self.match_id = match_id
self.loader = TrackingDataLoader(tracking_path)
self.metrics_calc = TrackingMetricsCalculator()
def process_match(self, corner_sequences: pd.DataFrame = None) -> Dict:
"""
Procesa el tracking completo de un partido.
Args:
corner_sequences: DataFrame con secuencias de corners del partido
Returns:
Dict con métricas agregadas y por secuencia
"""
# Cargar tracking
tracking_df = self.loader.load(sample_rate=1)
results = {
'match_id': self.match_id,
'tracking_stats': self._calculate_match_stats(tracking_df),
'sequences': []
}
if corner_sequences is not None:
# Procesar cada secuencia de corner
extractor = SetPieceTrackingExtractor(tracking_df, corner_sequences)
for _, seq in corner_sequences.iterrows():
seq_tracking = extractor.extract_sequence_tracking(
period=seq['period_id'],
start_minute=seq['minute'],
start_second=seq['second'],
duration_seconds=15.0
)
if not seq_tracking.empty:
results['sequences'].append({
'corner_sequence_id': seq['corner_sequence_id'],
'physical_metrics': self.metrics_calc.calculate_physical_metrics(seq_tracking),
'runs': self.metrics_calc.detect_runs(seq_tracking)
})
return results
def _calculate_match_stats(self, df: pd.DataFrame) -> Dict:
"""Estadísticas generales del partido"""
return {
'total_frames': int(df['frame'].nunique()),
'duration_minutes': float(df['video_time_ms'].max() / 1000 / 60),
'unique_players': int(df['player_id'].nunique()),
'max_speed_kmh': float(df['player_speed'].max() * 3.6),
'avg_visibility_pct': float(df['is_player_visible'].mean() * 100)
}
# =============================================================================
# FUNCIONES DE UTILIDAD PARA INTEGRACIÓN CON PIPELINE EXISTENTE
# =============================================================================
def enrich_corner_sequence_with_tracking(
sequence_id: str,
tracking_df: pd.DataFrame,
period: int,
minute: int,
second: int
) -> Dict:
"""
Función de alto nivel para enriquecer una secuencia de corner con datos de tracking.
Args:
sequence_id: ID de la secuencia de corner
tracking_df: DataFrame con datos de tracking del partido
period: Período del partido
minute: Minuto del corner
second: Segundo del corner
Returns:
Dict con métricas de tracking para la secuencia
"""
extractor = SetPieceTrackingExtractor(tracking_df, pd.DataFrame())
metrics_calc = TrackingMetricsCalculator()
# Extraer tracking de la secuencia (15 segundos post-corner)
seq_tracking = extractor.extract_sequence_tracking(
period=period,
start_minute=minute,
start_second=second,
duration_seconds=15.0
)
if seq_tracking.empty:
return {'sequence_id': sequence_id, 'has_tracking': False}
# Frame inicial (momento del corner)
start_frame = seq_tracking['frame'].min()
initial_frame = seq_tracking[seq_tracking['frame'] == start_frame]
return {
'sequence_id': sequence_id,
'has_tracking': True,
'physical_metrics': metrics_calc.calculate_physical_metrics(seq_tracking),
'runs': metrics_calc.detect_runs(seq_tracking),
'initial_setup': {
'players_visible': int(initial_frame['is_player_visible'].sum()),
'ball_visible': bool(initial_frame['is_ball_visible'].any())
}
}
def get_player_heatmap_data(
tracking_df: pd.DataFrame,
player_id: str,
period: Optional[int] = None
) -> Dict:
"""
Genera datos para un heatmap de posiciones de un jugador.
Returns:
Dict con arrays de posiciones x, y para generar heatmap
"""
df = tracking_df[tracking_df['player_id'] == player_id]
if period is not None:
df = df[df['match_period'] == period]
visible = df[df['is_player_visible'] == 1]
return {
'player_id': player_id,
'x': visible['player_x'].tolist(),
'y': visible['player_y'].tolist(),
'n_samples': len(visible)
}
# =============================================================================
# EJEMPLO DE USO
# =============================================================================
if __name__ == "__main__":
# Ejemplo de uso
TRACKING_FILE = "datasets/2025-08-16 - Santander vs Castellón - tracking.csv"
print("=" * 70)
print("🔬 TRACKING DATA PROCESSOR - Demo")
print("=" * 70)
# Cargar datos
loader = TrackingDataLoader(TRACKING_FILE)
df = loader.load(sample_rate=5) # Cada 5 frames para demo rápido
# Calcular métricas para un frame específico
calc = TrackingMetricsCalculator()
# Obtener un frame del primer tiempo
sample_frames = df[df['match_period'] == 1]['frame'].unique()[:250]
sample_df = df[df['frame'].isin(sample_frames)]
print("\n📊 Métricas físicas (muestra de 10 segundos):")
physical = calc.calculate_physical_metrics(sample_df)
for key, value in physical.items():
print(f" {key}: {value:.2f}")
print("\n🏃 Carreras detectadas:")
runs = calc.detect_runs(sample_df, speed_threshold_kmh=20.0)
print(f" Total: {len(runs)} carreras")
if runs:
top_run = max(runs, key=lambda x: x['max_speed_kmh'])
print(f" Carrera más rápida: {top_run['max_speed_kmh']:.1f} km/h")
print("\n✅ Procesamiento completado")