""" Step 2: Build corner sequences and map to zones. This script: 1. Identifies corners in the eventing data 2. Builds sequences from corners to absorption events 3. Normalizes coordinates and maps events to zones 4. Adds players_involved column Input: - eventing_consolidado.csv Output: - corner_sequences_summary.csv (one row per sequence) - corner_events_detail.csv (one row per event) """ import pandas as pd import numpy as np from pathlib import Path from typing import Dict, List, Optional, Tuple, Any from tqdm import tqdm from .utils import ( load_config, parse_qualifiers, has_qualifier, get_qualifier_value, ensure_output_dir, format_sequence_id ) from .zones import ( ZONE_BUCKETS, is_corner_from_left, mirror_coordinates, point_to_zone, is_point_in_any_zone, normalize_event_coordinates ) # ============================================================================= # EVENT MAPPINGS # ============================================================================= # Events that end a sequence (absorption) ABSORPTION_MAP: Dict[str, str] = { "Goal": "goal", "defensive_foul": "defensive_foul", "OffsideGiven": "offside_given", "OffsidePass": "offside_given", "OffsideProvoked": "offside_given", "GoalKick": "goalkeeper_control", "End": "end_period", "KeeperPickup": "goalkeeper_control", "Claim": "goalkeeper_control", "corner": "corner", } # Match state mapping (Spanish to English) MATCH_STATE_MAP: Dict[str, str] = { 'Ganando': 'winning', 'Perdiendo': 'losing', 'Empate': 'drawing', } # Events allowed within a sequence (transitions) TRANSITION_MAP: Dict[str, str] = { "pass": "pass", "cross": "cross", "offensive_foul": "offensive_foul", "defensive_possession": "defensive_possession", "keeper_action": "keeper_action", "Aerial": "pass", "Clearance": "defensive_possession", "ChanceMissed": "shot", "KeeperSweeper": "keeper_action", "Penalty": "penalty", "PenaltyFaced": "penalty", "Punch": "keeper_action", "SavedShot": "shot", "ShotOnPost": "shot", "MissedShots": "shot", "Smother": "keeper_action", "KeeperSaveInTheBox": "keeper_action", "Save": "defensive_possession", "shot": "shot", } # ============================================================================= # EVENT PREPROCESSING # ============================================================================= def preprocess_event(row: pd.Series, team_id_atacante: Optional[int] = None) -> Optional[str]: """ Preprocess an event and return its type. Key change: Returns 'other_events' for unmapped events instead of None. """ event_name = row.get('event_name', '') qualifiers_str = row.get('qualifiers', '[]') team_id = row.get('teamId', None) qualifiers = parse_qualifiers(qualifiers_str) # 1. GoalKick: Pass with GoalKick qualifier if event_name == 'Pass' and has_qualifier(qualifiers, 'GoalKick'): return 'goalkeeper_control' # 2. Corners: Pass with CornerTaken if event_name == 'Pass' and has_qualifier(qualifiers, 'CornerTaken'): return 'corner' # 3. Crosses: Pass with Cross (but not CornerTaken or GoalKick) if event_name == 'Pass' and has_qualifier(qualifiers, 'Cross'): if not has_qualifier(qualifiers, 'CornerTaken') and not has_qualifier(qualifiers, 'GoalKick'): return 'cross' # 4. Normal passes if event_name == 'Pass': if (not has_qualifier(qualifiers, 'Cross') and not has_qualifier(qualifiers, 'CornerTaken') and not has_qualifier(qualifiers, 'GoalKick')): return 'pass' # 5. Fouls if event_name == 'Foul': if has_qualifier(qualifiers, 'Defensive'): return 'defensive_foul' else: return 'offensive_foul' # 6. Ball recovery by defender if event_name == 'BallRecovery' and team_id_atacante is not None and pd.notna(team_id): # Compare as strings to handle alphanumeric IDs if str(team_id) != str(team_id_atacante): return 'defensive_possession' # 7. Direct mapping from TRANSITION_MAP if event_name in TRANSITION_MAP: return TRANSITION_MAP[event_name] # 8. Absorption events if event_name in ABSORPTION_MAP: return ABSORPTION_MAP[event_name] # 9. NEW: Return 'other_events' for unmapped events instead of None return 'other_events' def is_absorption_event(event_type: Optional[str]) -> bool: """Check if an event type is an absorption event.""" if event_type is None: return False absorption_types = set(ABSORPTION_MAP.values()) return event_type in absorption_types # ============================================================================= # SEQUENCE BUILDING # ============================================================================= def extract_corner_info(qualifiers: List[Dict]) -> Dict: """Extract corner information from qualifiers.""" corner_info = {} for q in qualifiers: q_type = q.get('type', {}) if not isinstance(q_type, dict): continue q_name = q_type.get('displayName', '') q_value = q.get('value') if q_name == 'Zone': corner_info['zone'] = q_value if q_value and q_value != 'N/A' else None elif q_name in ['Angle', 'Length', 'PassEndX', 'PassEndY']: try: corner_info[q_name.lower()] = float(q_value) if q_value and q_value != 'N/A' else None except (ValueError, TypeError): corner_info[q_name.lower()] = None elif q_name in ['LeftFoot', 'RightFoot', 'Chipped', 'Cross', 'Longball', 'KeyPass', 'IntentionalAssist', 'IntentionalGoalAssist', 'BigChanceCreated', 'FromCorner', 'ShotAssist']: corner_info[q_name.lower()] = True return corner_info def build_sequence_from_corner( corner_row: pd.Series, df: pd.DataFrame, max_events: int = 50 ) -> Optional[Dict]: """Build a sequence of events starting from a corner.""" match_id = corner_row['matchId'] period_id = corner_row['period_id'] team_id = corner_row['teamId'] event_index = corner_row.name # Get events after this corner in the same period match_events = df[ (df['matchId'] == match_id) & (df['period_id'] == period_id) & (df.index > event_index) ].copy() # Initialize sequence with corner qualifiers = parse_qualifiers(corner_row.get('qualifiers', '[]')) corner_info = extract_corner_info(qualifiers) sequence = [{ 'event_type': 'corner', 'event_name': corner_row.get('event_name', ''), 'eventId': corner_row.get('eventId', ''), 'id': corner_row.get('id', ''), 'matchId': match_id, # Added for xG lookup 'minute': corner_row.get('minute', 0), 'second': corner_row.get('second', 0), 'x': corner_row.get('x', 0), 'y': corner_row.get('y', 0), 'teamId': team_id, 'playerId': corner_row.get('playerId', None), 'playerName': corner_row.get('jugador', ''), 'position': corner_row.get('position', ''), 'corner_info': corner_info, }] absorption_event = None termination_reason = None absorption_event_info = None if len(match_events) == 0: absorption_event = 'end_period' termination_reason = 'no_more_events_in_period' else: for idx, row in match_events.iterrows(): if len(sequence) >= max_events: absorption_event = 'truncated' termination_reason = 'max_events_reached' break processed_event = preprocess_event(row, team_id_atacante=team_id) # Handle Goal specially if processed_event == 'goal': qualifiers_str = row.get('qualifiers', '[]') row_qualifiers = parse_qualifiers(qualifiers_str) is_own_goal = has_qualifier(row_qualifiers, 'OwnGoal') sequence.append({ 'event_type': 'shot', 'event_name': row.get('event_name', ''), 'eventId': row.get('eventId', ''), 'id': row.get('id', ''), 'matchId': match_id, # Added for xG lookup 'minute': row.get('minute', 0), 'second': row.get('second', 0), 'x': row.get('x', 0), 'y': row.get('y', 0), 'teamId': row.get('teamId', 0), 'playerId': row.get('playerId', None), 'playerName': row.get('jugador', ''), 'position': row.get('position', ''), 'is_goal': True, 'is_own_goal': is_own_goal, }) absorption_event_info = { 'x': row.get('x', 0), 'y': row.get('y', 0), } absorption_event = 'goal' termination_reason = 'explicit_absorption_event' break # Handle other absorption events if is_absorption_event(processed_event): sequence.append({ 'event_type': processed_event, 'event_name': row.get('event_name', ''), 'eventId': row.get('eventId', ''), 'id': row.get('id', ''), 'matchId': match_id, # Added for xG lookup 'minute': row.get('minute', 0), 'second': row.get('second', 0), 'x': row.get('x', 0), 'y': row.get('y', 0), 'teamId': row.get('teamId', 0), 'playerId': row.get('playerId', None), 'playerName': row.get('jugador', ''), 'position': row.get('position', ''), }) absorption_event_info = { 'x': row.get('x', 0), 'y': row.get('y', 0), } absorption_event = processed_event termination_reason = 'explicit_absorption_event' break # Add transition event (including 'other_events') sequence.append({ 'event_type': processed_event, 'event_name': row.get('event_name', ''), 'eventId': row.get('eventId', ''), 'id': row.get('id', ''), 'matchId': match_id, # Added for xG lookup 'minute': row.get('minute', 0), 'second': row.get('second', 0), 'x': row.get('x', 0), 'y': row.get('y', 0), 'teamId': row.get('teamId', 0), 'playerId': row.get('playerId', None), 'playerName': row.get('jugador', ''), 'position': row.get('position', ''), }) if absorption_event is None: print( f" ⚠️ No absorption event found for corner in match {match_id}, period {period_id}, " f"minute {corner_row.get('minute', '?')} — skipping sequence" ) return None # Extract match state (winning/losing/drawing) raw_match_state = corner_row.get('estado_partido', '') match_state = MATCH_STATE_MAP.get(raw_match_state, 'unknown') return { 'corner_eventId': corner_row.get('eventId', ''), 'matchId': match_id, 'period_id': period_id, 'period_name': corner_row.get('period_name', ''), 'teamId': team_id, 'TeamName': corner_row.get('TeamName', ''), 'TeamRival': corner_row.get('TeamRival', ''), 'fecha': corner_row.get('fecha', ''), 'minute': corner_row.get('minute', 0), 'second': corner_row.get('second', 0), 'corner_playerId': sequence[0].get('playerId'), 'corner_playerName': sequence[0].get('playerName', ''), 'corner_info': corner_info, 'match_state': match_state, 'sequence': sequence, 'sequence_length': len(sequence), 'absorption_event': absorption_event, 'termination_reason': termination_reason, 'absorption_event_coords': absorption_event_info, } # ============================================================================= # ZONE MAPPING # ============================================================================= def process_sequence_zones( sequence_data: Dict, events_df: pd.DataFrame ) -> Dict: """Process a sequence: normalize coordinates and map to zones.""" corner_event = sequence_data['sequence'][0] corner_y = corner_event['y'] is_left = is_corner_from_left(corner_y) corner_side = "left" if is_left else "right" corner_team_id = sequence_data['teamId'] processed_events = [] third_exit_detected = False is_own_goal = False for i, event in enumerate(sequence_data['sequence']): event_x = event.get('x') event_y = event.get('y') event_team_id = event.get('teamId', 0) # Get end coordinates from next event is_last_event = (i == len(sequence_data['sequence']) - 1) event_endX = None event_endY = None if not is_last_event: next_event = sequence_data['sequence'][i + 1] event_endX = next_event.get('x') event_endY = next_event.get('y') else: absorption_coords = sequence_data.get('absorption_event_coords') if absorption_coords: event_endX = absorption_coords.get('x') event_endY = absorption_coords.get('y') # Get xG, xGoT, xT, isShot from original data using (matchId, id) composite key event_id = event.get('id') event_match_id = event.get('matchId') # Use matchId from event, not sequence event_xG = None event_xGoT = None event_xT = None event_isShot = None if events_df is not None and event_id is not None and event_match_id is not None: lookup_key = (event_match_id, event_id) try: if lookup_key in events_df.index: event_data = events_df.loc[lookup_key] if isinstance(event_data, pd.Series): event_xG = event_data.get('xG') if pd.notna(event_data.get('xG')) else None event_xGoT = event_data.get('xGoT') if pd.notna(event_data.get('xGoT')) else None event_xT = event_data.get('xT') if pd.notna(event_data.get('xT')) else None event_isShot = event_data.get('isShot') if pd.notna(event_data.get('isShot')) else None except Exception: pass # Lookup failed, leave values as None # Determine if attacking team event_is_own_goal = event.get('is_own_goal', False) if event_is_own_goal and event['event_type'] == 'shot': is_attacking_team = True is_own_goal = True else: is_attacking_team = (event_team_id == corner_team_id and event_team_id != 0) # Normalize coordinates if event_x is not None and event_y is not None: if is_left: event_x, event_y = mirror_coordinates(event_x, event_y) if not is_attacking_team: event_x = 100 - event_x event_y = 100 - event_y if event_endX is not None and event_endY is not None: if is_left: event_endX, event_endY = mirror_coordinates(event_endX, event_endY) if not is_attacking_team: event_endX = 100 - event_endX event_endY = 100 - event_endY # Check for third exit is_goal = event.get('is_goal', False) or event['event_type'] == 'goal' if event_x is not None and event_y is not None: if not is_point_in_any_zone(event_x, event_y, ZONE_BUCKETS): if is_last_event and is_goal and not event_is_own_goal: third_exit_detected = True break elif not is_goal: third_exit_detected = True break # Map to zones origin_zone = point_to_zone(event_x, event_y, ZONE_BUCKETS) if event_x is not None else None destination_zone = point_to_zone(event_endX, event_endY, ZONE_BUCKETS) if event_endX is not None else None processed_events.append({ 'event_index': i, 'event_type': event['event_type'], 'event_name': event['event_name'], 'eventId': event['eventId'], 'id': event.get('id', ''), 'x': event_x, 'y': event_y, 'endX': event_endX, 'endY': event_endY, 'origin_zone': origin_zone, 'destination_zone': destination_zone, 'teamId': event_team_id, 'playerId': event.get('playerId'), 'playerName': event.get('playerName', ''), 'position': event.get('position', ''), 'minute': event.get('minute', 0), 'second': event.get('second', 0), 'is_attacking_team': is_attacking_team, 'xG': event_xG, 'xGoT': event_xGoT, 'xT': event_xT, 'isShot': event_isShot, }) # Determine final absorption final_absorption = 'third_exit' if third_exit_detected else sequence_data['absorption_event'] # Build players_involved: [(playerName, TeamName, event_index), ...] players_involved = [] for event in processed_events: player_name = event.get('playerName', '') team_name = sequence_data['TeamName'] if event.get('is_attacking_team') else sequence_data['TeamRival'] event_idx = event.get('event_index', 0) if player_name: players_involved.append((player_name, team_name, event_idx)) return { 'corner_eventId': sequence_data['corner_eventId'], 'matchId': sequence_data['matchId'], 'period_id': sequence_data['period_id'], 'period_name': sequence_data['period_name'], 'teamId': sequence_data['teamId'], 'TeamName': sequence_data['TeamName'], 'TeamRival': sequence_data['TeamRival'], 'fecha': sequence_data['fecha'], 'minute': sequence_data['minute'], 'second': sequence_data['second'], 'corner_playerId': sequence_data.get('corner_playerId'), 'corner_playerName': sequence_data.get('corner_playerName', ''), 'corner_info': sequence_data.get('corner_info', {}), 'match_state': sequence_data.get('match_state', 'unknown'), 'corner_side': corner_side, 'sequence_length': len(processed_events), 'absorption_event': final_absorption, 'is_own_goal': is_own_goal, 'events': processed_events, 'players_involved': players_involved, } # ============================================================================= # CSV GENERATION # ============================================================================= def generate_summary_csv(sequences: List[Dict], output_path: Path) -> None: """Generate summary CSV (one row per sequence).""" rows = [] for seq in sequences: event_types = [e['event_type'] for e in seq['events']] event_sequence_str = ' -> '.join(event_types) corner_info = seq.get('corner_info', {}) corner_sequence_id = format_sequence_id( seq['matchId'], seq['corner_eventId'], seq['minute'], seq.get('second', 0) ) # Get initial zone (where corner lands) - using corner's endX, endY initial_zone = None corner_endx = corner_info.get('passendx') corner_endy = corner_info.get('passendy') if corner_endx is not None and corner_endy is not None: try: end_x = float(corner_endx) end_y = float(corner_endy) # Mirror coordinates if corner is from left side is_left = seq['corner_side'] == 'left' if is_left: end_x, end_y = mirror_coordinates(end_x, end_y) # Assign zone based on where corner lands initial_zone = point_to_zone(end_x, end_y, ZONE_BUCKETS) except (ValueError, TypeError): # Fallback to old method if conversion fails if len(seq['events']) > 1: initial_zone = seq['events'][1].get('origin_zone') row = { 'corner_sequence_id': corner_sequence_id, 'matchId': seq['matchId'], 'corner_eventId': seq['corner_eventId'], 'fecha': seq['fecha'], 'period_id': seq['period_id'], 'period_name': seq['period_name'], 'minute': seq['minute'], 'second': seq.get('second', 0), 'teamId': seq['teamId'], 'TeamName': seq['TeamName'], 'TeamRival': seq['TeamRival'], 'match_state': seq.get('match_state', 'unknown'), 'corner_side': seq['corner_side'], 'corner_playerId': seq.get('corner_playerId'), 'corner_playerName': seq.get('corner_playerName', ''), 'corner_angle': corner_info.get('angle'), 'corner_length': corner_info.get('length'), 'corner_passendx': corner_info.get('passendx'), 'corner_passendy': corner_info.get('passendy'), 'corner_zone': corner_info.get('zone'), 'corner_leftfoot': corner_info.get('leftfoot', False), 'corner_rightfoot': corner_info.get('rightfoot', False), 'corner_chipped': corner_info.get('chipped', False), 'corner_cross': corner_info.get('cross', False), 'corner_longball': corner_info.get('longball', False), 'corner_shotassist': corner_info.get('shotassist'), 'corner_keypass': corner_info.get('keypass', False), 'corner_intentionalassist': corner_info.get('intentionalassist', False), 'corner_intentionalgoalassist': corner_info.get('intentionalgoalassist', False), 'corner_bigchancecreated': corner_info.get('bigchancecreated', False), 'sequence_length': seq['sequence_length'], 'event_sequence': event_sequence_str, 'absorption_event': seq['absorption_event'], 'is_own_goal': seq.get('is_own_goal', False), 'initial_zone': initial_zone, 'players_involved': str(seq.get('players_involved', [])), } rows.append(row) df = pd.DataFrame(rows) ensure_output_dir(output_path) df.to_csv(output_path, index=False) print(f" ✅ Summary CSV: {output_path} ({len(df):,} sequences)") def generate_detail_csv(sequences: List[Dict], output_path: Path) -> None: """Generate detail CSV (one row per event).""" rows = [] for seq in sequences: corner_sequence_id = format_sequence_id( seq['matchId'], seq['corner_eventId'], seq['minute'], seq.get('second', 0) ) for event in seq['events']: row = { 'corner_sequence_id': corner_sequence_id, 'matchId': seq['matchId'], 'corner_eventId': seq['corner_eventId'], 'fecha': seq['fecha'], 'period_id': seq['period_id'], 'period_name': seq['period_name'], 'teamId': seq['teamId'], 'TeamName': seq['TeamName'], 'TeamRival': seq['TeamRival'], 'corner_side': seq['corner_side'], 'corner_minute': seq['minute'], 'corner_second': seq.get('second', 0), 'event_index': event['event_index'], 'event_type': event['event_type'], 'event_name': event['event_name'], 'eventId': event['eventId'], 'id': event.get('id', ''), 'x': event['x'], 'y': event['y'], 'endX': event['endX'], 'endY': event['endY'], 'origin_zone': event['origin_zone'], 'destination_zone': event['destination_zone'], 'event_teamId': event['teamId'], 'event_playerId': event['playerId'], 'event_playerName': event.get('playerName', ''), 'event_position': event.get('position', ''), 'event_minute': event['minute'], 'event_second': event['second'], 'is_attacking_team': event.get('is_attacking_team', False), 'xG': event.get('xG'), 'xGoT': event.get('xGoT'), 'isShot': event.get('isShot'), } rows.append(row) df = pd.DataFrame(rows) ensure_output_dir(output_path) df.to_csv(output_path, index=False) print(f" ✅ Detail CSV: {output_path} ({len(df):,} events)") # ============================================================================= # MAIN FUNCTION # ============================================================================= def build_sequences( eventing_path: Path, output_folder: Path ) -> Tuple[Path, Path]: """ Main function to build corner sequences. Args: eventing_path: Path to eventing CSV output_folder: Output directory Returns: Tuple of (summary_path, detail_path) Raises: FileNotFoundError: If eventing CSV doesn't exist. ValueError: If required columns are missing or no corners found. """ print(f"\n{'='*80}") print("STEP 2: BUILDING CORNER SEQUENCES") print(f"{'='*80}") # Validate input file exists if not eventing_path.exists(): raise FileNotFoundError(f"Eventing CSV not found: {eventing_path}") config = load_config() max_events = config.get('max_sequence_length', 50) # Load data print(f"\n📂 Loading eventing data from {eventing_path}...") df = pd.read_csv(eventing_path, low_memory=False) print(f" ✅ Loaded {len(df):,} events") # Validate required columns required_cols = ['matchId', 'period_id', 'time_seconds', 'eventId', 'event_name', 'qualifiers', 'x', 'y', 'teamId', 'TeamName', 'TeamRival'] missing_cols = [col for col in required_cols if col not in df.columns] if missing_cols: raise ValueError( f"Eventing CSV is missing required columns: {missing_cols}. " "Ensure Step 1 preprocessing completed successfully." ) # Sort by match, period, time df = df.sort_values(['matchId', 'period_id', 'time_seconds', 'eventId']).reset_index(drop=True) # Identify corners print("\n🎯 Identifying corners...") df['processed_event'] = df.apply(lambda row: preprocess_event(row, None), axis=1) corners = df[df['processed_event'] == 'corner'].copy() print(f" ✅ Found {len(corners):,} corners in {corners['matchId'].nunique()} matches") if len(corners) == 0: raise ValueError("No corners found in the data") # Build sequences print(f"\n🔨 Building sequences (max {max_events} events each)...") raw_sequences = [] for idx, corner_row in tqdm(corners.iterrows(), total=len(corners), desc=" Building"): seq = build_sequence_from_corner(corner_row, df, max_events) if seq: raw_sequences.append(seq) print(f" ✅ Built {len(raw_sequences):,} sequences") # Create events index for xG/xGoT lookup using (matchId, id) as composite key print("\n📊 Processing zones and normalizing coordinates...") if 'id' in df.columns and 'matchId' in df.columns: events_df = df.set_index(['matchId', 'id'])[['xG', 'xGoT', 'xT', 'isShot']] else: events_df = None processed_sequences = [] for seq in tqdm(raw_sequences, desc=" Processing"): processed = process_sequence_zones(seq, events_df) processed_sequences.append(processed) # Statistics absorption_counts = pd.Series([s['absorption_event'] for s in processed_sequences]).value_counts() print(f"\n📈 Absorption event distribution:") for event_type, count in absorption_counts.items(): pct = count / len(processed_sequences) * 100 print(f" {event_type}: {count:,} ({pct:.1f}%)") # Generate CSVs print(f"\n💾 Generating output files...") summary_path = output_folder / "corner_sequences_summary.csv" detail_path = output_folder / "corner_events_detail.csv" generate_summary_csv(processed_sequences, summary_path) generate_detail_csv(processed_sequences, detail_path) print(f"\n{'='*80}") print("✅ STEP 2 COMPLETE") print(f" Sequences: {len(processed_sequences):,}") print(f" Summary: {summary_path}") print(f" Detail: {detail_path}") print(f"{'='*80}") return summary_path, detail_path if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Build corner sequences") parser.add_argument("--eventing-path", required=True, help="Path to eventing CSV") parser.add_argument("--league", required=True, help="League name (used for output folder)") parser.add_argument("--output-folder", type=Path, default=None, help="Output directory (default: racing_tools/datasets/processed/LEAGUE_NAME)") args = parser.parse_args() # Set default output folder: corner_kick_pipeline/datasets/processed/LEAGUE_NAME if args.output_folder is None: league_folder = args.league.replace(" ", "_").replace("/", "-") output_folder = Path(__file__).parent.parent / "datasets" / "processed" / league_folder else: output_folder = args.output_folder build_sequences( eventing_path=Path(args.eventing_path), output_folder=output_folder )