from flask import Flask, request, jsonify, send_from_directory from flask_cors import CORS import pandas as pd import numpy as np import json import os import sys import requests from flask import Response import time import re from functools import lru_cache # Add src directory to path for imports (needed for Azure deployment) src_dir = os.path.dirname(os.path.abspath(__file__)) if src_dir not in sys.path: sys.path.insert(0, src_dir) # Import new modules from barrel_manager import BarrelManager from autocomplete import initialize_autocomplete, SmartAutocomplete from semantic_search import initialize_semantic_search from performance_monitor import performance_monitor, track_query from dynamic_indexer import DynamicIndexer from optimized_search import OptimizedSearchEngine # Get base directory base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) project_root = os.path.dirname(base_dir) static_dir = os.path.join(project_root, 'static') # EXTREMELY IMPORTANT: Extract payload sequentially before ANYTHING is instantiated globally _zip_path = os.path.join(base_dir, 'data', 'scoutsearch_data.zip') _data_dir = os.path.join(base_dir, 'data') if os.path.exists(_zip_path) and not os.path.exists(os.path.join(_data_dir, 'raw', 'players_22.csv')): print(f"[STARTUP] Unzipping payload {_zip_path} as early step...") try: import zipfile with zipfile.ZipFile(_zip_path, 'r') as zipf: zipf.extractall(_data_dir) print("[STARTUP] Raw Dataset & Indexes extracted successfully before engine instantiation.") except Exception as e: print(f"[CRITICAL ERROR] Zip extraction failed: {e}") app = Flask(__name__, static_folder=static_dir, static_url_path='/static') CORS(app) # Enable CORS for all routes # Initialize dynamic_indexer at module level (will be set in init_advanced_components) dynamic_indexer = None def sanitize_for_json(obj): """Recursively convert numpy/pandas types to native Python types for JSON serialization.""" # Import here to avoid circular issues in some environments if isinstance(obj, dict): return {k: sanitize_for_json(v) for k, v in obj.items()} if isinstance(obj, list): return [sanitize_for_json(v) for v in obj] # numpy types try: if isinstance(obj, (np.integer,)): return int(obj) if isinstance(obj, (np.floating,)): return float(obj) if isinstance(obj, (np.ndarray,)): return [sanitize_for_json(v) for v in obj.tolist()] except Exception: pass return obj # Add CSP headers to all responses @app.after_request def set_csp_headers(response): # Allow images from SoFIFA CDN and our own server response.headers['Content-Security-Policy'] = ( "default-src 'self'; " "connect-src 'self' http://localhost:5000 http://127.0.0.1:5000; " "img-src 'self' data: blob: https://cdn.sofifa.net https://via.placeholder.com; " "script-src 'self' 'unsafe-inline' 'unsafe-eval' https://cdnjs.cloudflare.com; " "style-src 'self' 'unsafe-inline' https://cdnjs.cloudflare.com; " "font-src 'self' https://cdnjs.cloudflare.com;" ) return response class TextSearchEngine: def __init__(self, dataset_path=None, mapping_path=None, inverted_index_path=None, lexicon_path=None): # Build absolute paths base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) self.dataset_path = dataset_path or os.path.join(base_dir, 'data', 'raw', 'search_engine_dataset.jsonl') self.mapping_path = mapping_path or os.path.join(base_dir, 'data', 'raw', 'player_mapping_enhanced.json') self.inverted_index_path = inverted_index_path or os.path.join(base_dir, 'data', 'index', 'inverted_index.json') self.lexicon_path = lexicon_path or os.path.join(base_dir, 'data', 'index', 'lexicon_complete.json') # Initialize barrel manager for scalable index access barrel_dir = os.path.join(base_dir, 'data', 'index', 'barrels') self.barrel_manager = BarrelManager(barrel_dir, self.lexicon_path) # Initialize semantic search from semantic_search import semantic_engine self.semantic_engine = semantic_engine self.documents = {} self.player_mapping = {} self.inverted_index = None # Lazy load (fallback) self.word_doc_freq = None # Lazy load self.total_docs = 0 self.index_loaded = False self.load_data() def load_data(self): """Load Transfermarkt dataset and player mapping""" try: # Load player mapping with open(self.mapping_path, 'r') as f: self.player_mapping = json.load(f) print(f"[OK] Player mapping loaded: {len(self.player_mapping)} mappings") # Load Transfermarkt documents with open(self.dataset_path, 'r', encoding='utf-8') as f: for line in f: doc = json.loads(line) pid = str(doc.get('player_id', '')) # Store only what is needed or minimal representation self.documents[pid] = doc self.total_docs = len(self.documents) print(f"[OK] Text documents loaded: {self.total_docs} documents") except Exception as e: print(f"[ERROR] Error loading text search data: {e}") def ensure_index_loaded(self): """Lazy load inverted index on first search""" if not self.index_loaded: try: print(" Loading inverted index...") with open(self.inverted_index_path, 'r', encoding='utf-8') as f: index_data = json.load(f) self.inverted_index = index_data.get('inverted_index', {}) self.word_doc_freq = index_data.get('word_document_frequency', {}) self.index_loaded = True print(f"[OK] Inverted index loaded: {len(self.inverted_index)} terms") except Exception as e: print(f"[WARNING] Could not load inverted index: {e}") self.inverted_index = {} self.word_doc_freq = {} self.index_loaded = True def tokenize(self, text): """Tokenize and normalize text""" import re # Convert to lowercase and split on non-alphanumeric text = text.lower() # Keep hyphens in words like "left-back" tokens = re.findall(r'\b[a-z0-9]+(?:-[a-z0-9]+)*\b', text) return tokens def calculate_tf_idf(self, term, player_id): """Calculate TF-IDF score for a term in a document""" # TF: term frequency in document postings = self.inverted_index.get(term, {}) player_key = f"player_{player_id}" # Get term frequency if player_key in postings: posting_data = postings[player_key] if isinstance(posting_data, dict): tf = posting_data.get('frequency', 0) else: tf = posting_data else: tf = 0 if tf == 0: return 0 # IDF: inverse document frequency df = self.word_doc_freq.get(term, 0) if df == 0: return 0 import math idf = math.log(self.total_docs / df) return tf * idf def search_text(self, query, limit=50): """Advanced text search with TF-IDF ranking + barrel system + semantic expansion""" if not self.documents or not query.strip(): return [] try: # Expand query with semantic synonyms expanded_terms = [query.lower()] if self.semantic_engine: expanded_terms = self.semantic_engine.expand_query(query, max_expansions=2) # Tokenize all expanded queries all_query_terms = [] for term in expanded_terms: all_query_terms.extend(self.tokenize(term)) # Remove duplicates while preserving order query_terms = list(dict.fromkeys(all_query_terms)) if not query_terms: return [] # Score documents using barrel manager (memory efficient!) doc_scores = {} # Use barrel manager to get postings (only loads needed barrels) for term in query_terms: postings = self.barrel_manager.get_postings(term) for player_key, posting_data in postings.items(): try: # Extract player_id if isinstance(posting_data, dict): player_id = posting_data.get('player_id') tf = posting_data.get('frequency', 1) else: # Parse from key if isinstance(player_key, str) and player_key.startswith('player_'): player_id = int(player_key.replace('player_', '')) else: player_id = int(player_key) tf = posting_data if player_id not in doc_scores: doc_scores[player_id] = 0 # Get document frequency from barrel manager df = self.barrel_manager.get_term_df(term) if df > 0: import math idf = math.log(self.total_docs / df) doc_scores[player_id] += tf * idf else: doc_scores[player_id] += tf except (ValueError, TypeError, AttributeError) as e: continue # Phrase matching bonus original_query_terms = self.tokenize(query.lower()) if len(original_query_terms) > 1: query_lower = query.lower() for player_id in list(doc_scores.keys()): doc = self.documents.get(str(player_id)) if doc: doc_text = doc.get('text_content', '').lower() if query_lower in doc_text: doc_scores[player_id] *= 2.5 elif all(term in doc_text for term in original_query_terms): doc_scores[player_id] *= 1.5 # Sort and get results ranked_player_ids = sorted(doc_scores.keys(), key=lambda x: doc_scores[x], reverse=True) results = [] for player_id in ranked_player_ids[:limit]: doc = self.documents.get(str(player_id)) if doc: results.append(doc) return results except Exception as e: print(f"[WARNING] Error in text search: {e}") import traceback traceback.print_exc() return self.simple_search(query, limit) def simple_search(self, query, limit=50): """Fallback simple substring search""" query_lower = query.lower() results = [] for doc in self.documents.values(): text_content = doc.get('text_content', '').lower() if query_lower in text_content: results.append(doc) if len(results) >= limit: break return results class ScoutSearchEngine: def __init__(self, data_path=None, detailed_profiles_path=None): # Build absolute paths base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) self.data_path = data_path or os.path.join(base_dir, 'data', 'raw', 'players_22.csv') self.detailed_profiles_path = detailed_profiles_path or os.path.join(base_dir, 'data', 'processed', 'complete_player_profiles.json') self.df = None self.text_search_engine = TextSearchEngine() self.detailed_profiles = {} # Performance optimization: LRU cache for search results self._search_cache = {} self._cache_max_size = 100 # Pre-computed data for faster searches self._normalized_names = None self._position_masks = {} self.load_data() self.load_detailed_profiles() self._precompute_search_data() def load_data(self): """Load the FIFA 22 dataset""" # Only load the columns we actually need to save massive amounts of RAM on Render needed_cols = [ 'sofifa_id', 'short_name', 'long_name', 'player_positions', 'overall', 'potential', 'value_eur', 'wage_eur', 'age', 'height_cm', 'club_name', 'nationality_name', 'preferred_foot', 'pace', 'shooting', 'passing', 'dribbling', 'defending', 'physic', 'player_face_url', 'club_logo_url', 'nation_flag_url', 'work_rate', 'skill_moves', 'weak_foot' ] try: # Check which columns actually exist to avoid KeyError import csv with open(self.data_path, 'r', encoding='utf-8', errors='ignore') as f: header = next(csv.reader(f)) actual_cols = [c for c in needed_cols if c in header] self.df = pd.read_csv(self.data_path, usecols=actual_cols, encoding='utf-8', low_memory=False) print(f"[OK] Dataset loaded: {len(self.df)} players, {len(self.df.columns)} columns") except UnicodeDecodeError: self.df = pd.read_csv(self.data_path, usecols=actual_cols, encoding='latin-1', low_memory=False) print(f"[OK] Dataset loaded with latin-1: {len(self.df)} players") except FileNotFoundError: print(f"[ERROR] File {self.data_path} not found!") return # Clean the data self.clean_data() def _precompute_search_data(self): """Pre-compute normalized names and position masks for faster searching""" if self.df is None or self.df.empty: return try: from text_processor import get_text_processor tp = get_text_processor() # Pre-compute normalized names (huge speedup for text search) self.df['_norm_long_name'] = self.df['long_name'].apply(lambda x: tp.normalize_text(str(x)) if pd.notna(x) else '') self.df['_norm_short_name'] = self.df['short_name'].apply(lambda x: tp.normalize_text(str(x)) if pd.notna(x) else '') self.df['_norm_first_name'] = self.df['long_name'].apply(lambda x: tp.normalize_text(str(x).split()[0]) if pd.notna(x) and str(x).strip() else '') self.df['_norm_last_name'] = self.df['long_name'].apply(lambda x: tp.normalize_text(str(x).split()[-1]) if pd.notna(x) and str(x).strip() else '') # Pre-compute lowercase club and nationality for faster matching self.df['_club_lower'] = self.df['club_name'].str.lower().fillna('') self.df['_nationality_lower'] = self.df['nationality_name'].str.lower().fillna('') print(f"[OK] Pre-computed search data for {len(self.df)} players") except Exception as e: print(f"[WARNING] Could not pre-compute search data: {e}") def clean_data(self): """Clean and prepare the data""" # Ensure numeric columns are properly formatted numeric_columns = ['overall', 'potential', 'age', 'pace', 'shooting', 'passing', 'dribbling', 'defending', 'physic', 'value_eur', 'skill_moves', 'weak_foot'] for col in numeric_columns: if col in self.df.columns: self.df[col] = pd.to_numeric(self.df[col], errors='coerce').fillna(0) # Fill missing image URLs with empty string image_columns = ['player_face_url', 'nation_flag_url', 'club_logo_url'] for col in image_columns: if col in self.df.columns: self.df[col] = self.df[col].fillna('') # Fill missing text columns if 'preferred_foot' in self.df.columns: self.df['preferred_foot'] = self.df['preferred_foot'].fillna('Right') # Parse work_rate into attacking and defensive if 'work_rate' in self.df.columns and 'attacking_work_rate' not in self.df.columns: try: rates = self.df['work_rate'].str.split('/', expand=True) if len(rates.columns) == 2: self.df['attacking_work_rate'] = rates[0].str.strip() self.df['defensive_work_rate'] = rates[1].str.strip() else: self.df['attacking_work_rate'] = 'Medium' self.df['defensive_work_rate'] = 'Medium' except: self.df['attacking_work_rate'] = 'Medium' self.df['defensive_work_rate'] = 'Medium' def search_players(self, filters, sort_by='overall', ascending=False, limit=50): """Main search function for attribute-based search""" if self.df is None or self.df.empty: return pd.DataFrame() results = self.df.copy() results = self.apply_filters(results, filters) # If we have very few results, relax the filters slightly if len(results) < 5: print(f"[WARNING] Only {len(results)} players found with current filters. Consider relaxing search criteria.") # Prioritize main club matches if club filter is specified # For example: "barcelona" should prioritize "FC Barcelona" over "RCD Espanyol de Barcelona" if 'club' in filters and filters['club'] and len(results) > 0: club_filter = filters['club'].lower().strip() # Calculate priority score for each club # Highest priority: major club with common prefix (FC Barcelona, Real Madrid, etc.) # Medium priority: starts with search term (Barcelona SC) # Low priority: contains search term elsewhere (RCD Espanyol de Barcelona) def club_priority(club_name): club_lower = str(club_name).lower() # Exact match if club_lower == club_filter: return 4 # Common European club prefix + search term (FC Barcelona, Real Madrid, etc.) # This handles the major clubs correctly major_prefixes = ['fc ', 'real ', 'atletico '] for prefix in major_prefixes: if club_lower.startswith(prefix) and club_lower[len(prefix):].startswith(club_filter): return 3 # Starts with search term directly if club_lower.startswith(club_filter + ' ') or (club_lower.startswith(club_filter) and not any(club_lower.endswith(suffix) for suffix in [' de ' + club_filter])): return 2 # Less common prefix + search term (RCD, Athletic, Club, etc.) other_prefixes = ['rcd ', 'athletic ', 'club ', 'ca ', 'cd '] for prefix in other_prefixes: if club_lower.startswith(prefix) and club_lower[len(prefix):].startswith(club_filter): return 2 # Contains search term anywhere (e.g., "RCD Espanyol de Barcelona") if club_filter in club_lower: return 1 return 0 results['_club_priority'] = results['club_name'].apply(club_priority) # Sort by priority first, then by the specified sort column results = results.sort_values(by=['_club_priority', sort_by], ascending=[False, ascending]) results = results.drop('_club_priority', axis=1) else: # Sort the results normally if sort_by in results.columns: results = results.sort_values(by=sort_by, ascending=ascending) # Always try to return at least some results if the dataset has them if len(results) == 0: print("[ERROR] No players match the specified criteria") else: print(f"[OK] Found {len(results)} players matching criteria, returning top {min(limit, len(results))}") return results.head(limit) def search_players_text(self, query, limit=50): """Enhanced text-based search using FIFA dataset directly with intelligent matching""" if self.df is None or self.df.empty: return pd.DataFrame() # Check cache first for exact query match cache_key = f"{query.lower().strip()}_{limit}" if cache_key in self._search_cache: return self._search_cache[cache_key].copy() # Import text processor for normalization try: from text_processor import get_text_processor text_processor = get_text_processor() query_lower = text_processor.normalize_text(query) except: query_lower = query.lower().strip() query_words = query_lower.split() original_query_words = query_words.copy() # Keep original for parsing # ====================================================================== # PARSE COMPARISON OPERATORS FIRST (before number extraction) # ====================================================================== age_min, age_max = None, None ovr_min, ovr_max = None, None # Parse "between X and Y" patterns if 'between' in query_lower and 'and' in query_lower: between_idx = original_query_words.index('between') if 'between' in original_query_words else -1 if between_idx >= 0 and between_idx + 3 < len(original_query_words): try: val1 = int(original_query_words[between_idx + 1]) val2 = int(original_query_words[between_idx + 3]) # Skip "and" min_val, max_val = min(val1, val2), max(val1, val2) # Determine if it's age or ovr based on context context_words = original_query_words[:between_idx] if any(w in context_words for w in ['age', 'years', 'old']): # "between" is inclusive, but filter uses <, so add 1 to max age_min, age_max = min_val, max_val + 1 elif any(w in context_words for w in ['ovr', 'overall', 'rating', 'rated']): # "between" is inclusive, but filter uses <=, so keep as is ovr_min, ovr_max = min_val, max_val else: # Default to ovr if no context - "between" is inclusive ovr_min, ovr_max = min_val, max_val # Remove parsed words from working list query_words = [w for w in query_words if w not in ['between', str(val1), 'and', str(val2)]] except (ValueError, IndexError): pass # Parse "greater than", "more than", "above", "over" patterns comparison_operators = { 'greater': ('>', 1), 'more': ('>', 1), 'above': ('>', 1), 'over': ('>', 1), 'less': ('<', 0), 'fewer': ('<', 0), 'under': ('<', 0), 'below': ('<', 0) } for operator, (op_symbol, offset) in comparison_operators.items(): if operator in query_lower: op_idx = original_query_words.index(operator) if operator in original_query_words else -1 if op_idx >= 0: # Look for "than" after operator than_idx = op_idx + 1 if op_idx + 1 < len(original_query_words) and original_query_words[op_idx + 1] == 'than' else op_idx num_idx = than_idx + 1 if num_idx < len(original_query_words): try: value = int(original_query_words[num_idx]) # Determine if it's age or ovr based on context context_words = original_query_words[:op_idx] is_age = any(w in context_words or w in original_query_words for w in ['age', 'years', 'old']) is_ovr = any(w in context_words or w in original_query_words for w in ['ovr', 'overall', 'rating', 'rated']) if is_age: if op_symbol == '>': age_min = value + offset else: age_max = value + offset query_words = [w for w in query_words if w not in [operator, 'than', str(value)]] elif is_ovr: if op_symbol == '>': ovr_min = value + offset else: ovr_max = value + offset query_words = [w for w in query_words if w not in [operator, 'than', str(value)]] else: # Default: check if value looks like age (15-45) or rating (40-99) if 15 <= value <= 45: if op_symbol == '>': age_min = value + offset else: age_max = value + offset elif 40 <= value <= 99: if op_symbol == '>': ovr_min = value + offset else: ovr_max = value + offset query_words = [w for w in query_words if w not in [operator, 'than', str(value)]] break except (ValueError, IndexError): pass # ====================================================================== # NOW EXTRACT NUMBERS FOR RESULT LIMITS # ====================================================================== custom_limit = limit number_keywords = ['top', 'best', 'worst', 'first', 'last', 'lowest', 'highest', 'cheapest', 'fastest', 'slowest', 'tallest', 'shortest'] # Check for "keyword NUMBER" pattern (e.g., "top 10") for i, word in enumerate(query_words): if word in number_keywords and i + 1 < len(query_words): try: custom_limit = int(query_words[i + 1]) query_words = [w for w in query_words if w != str(custom_limit)] break except ValueError: pass # Check for "NUMBER keyword" pattern (e.g., "10 best") if custom_limit == limit: # Only if not already found for i, word in enumerate(query_words): if word.isdigit() and i + 1 < len(query_words): next_word = query_words[i + 1] if next_word in number_keywords or next_word in ['players', 'strikers', 'defenders', 'midfielders', 'goalkeepers', 'wingers', 'forwards']: try: custom_limit = int(word) query_words = [w for w in query_words if w != str(custom_limit)] break except ValueError: pass # Check for standalone numbers at start/end (e.g., "show me 15 young talents") if custom_limit == limit: # Only if not already found for word in query_words: if word.isdigit(): num = int(word) if 1 <= num <= 100: # Reasonable range for result count custom_limit = num query_words = [w for w in query_words if w != str(custom_limit)] break # Keyword categorization with synonym mapping quality_keywords = ['best', 'top', 'elite', 'world', 'class', 'great', 'good', 'worst', 'bad', 'poor', 'lowest', 'highest', 'cheap', 'expensive', 'valuable'] age_keywords = ['young', 'old', 'veteran', 'experienced', 'talent', 'promising'] attribute_keywords_list = ['fast', 'quick', 'speedy', 'pacey', 'strong', 'physical', 'shooter', 'finisher', 'clinical', 'passer', 'playmaker', 'creative', 'dribbler', 'skilled', 'technical', 'defensive', 'tackler', 'tall', 'short'] position_keywords_list = ['striker', 'forward', 'winger', 'midfielder', 'defender', 'goalkeeper', 'keeper', 'fullback', 'wingback', 'centre', 'center', 'attacking', 'defensive'] rating_patterns = ['rating', 'rated', 'overall', 'ovr'] # Synonym mapping for query normalization keyword_synonyms = { 'lowest': 'worst', 'cheapest': 'cheap', 'expensive': 'valuable', 'highest': 'best', 'fastest': 'fast', 'slowest': 'slow', 'tallest': 'tall', 'shortest': 'short' } # Normalize query with synonyms normalized_query = query_lower for synonym, target in keyword_synonyms.items(): if synonym in normalized_query: normalized_query = normalized_query.replace(synonym, target) if target not in query_lower: query_words.append(target) all_keywords = quality_keywords + age_keywords + attribute_keywords_list + position_keywords_list + number_keywords + rating_patterns non_keyword_words = [w for w in original_query_words if w not in all_keywords and not w.isdigit()] # Detect single name query is_single_name_query = len(non_keyword_words) == 1 and len(original_query_words) <= 3 # Create scoring dataframe results = self.df.copy() results['search_score'] = 0.0 # Position keywords mapping (expanded) position_keywords = { 'striker': ['ST', 'CF'], 'forward': ['ST', 'CF', 'LW', 'RW'], 'winger': ['LW', 'RW', 'LM', 'RM'], 'left winger': ['LW', 'LM'], 'right winger': ['RW', 'RM'], 'midfielder': ['CM', 'CDM', 'CAM', 'LM', 'RM'], 'central midfielder': ['CM'], 'defensive midfielder': ['CDM'], 'attacking midfielder': ['CAM'], 'defender': ['CB', 'LB', 'RB', 'LWB', 'RWB'], 'centre back': ['CB'], 'center back': ['CB'], 'fullback': ['LB', 'RB'], 'left back': ['LB'], 'right back': ['RB'], 'wingback': ['LWB', 'RWB'], 'goalkeeper': ['GK'], 'keeper': ['GK'] } # Attribute keywords mapping (expanded) attribute_keywords = { 'fast': ('pace', 85), 'quick': ('pace', 85), 'speedy': ('pace', 85), 'pacey': ('pace', 85), 'strong': ('physic', 80), 'physical': ('physic', 80), 'shooter': ('shooting', 80), 'finisher': ('shooting', 85), 'clinical': ('shooting', 85), 'passer': ('passing', 80), 'playmaker': ('passing', 85), 'creative': ('passing', 80), 'dribbler': ('dribbling', 80), 'skilled': ('dribbling', 85), 'technical': ('dribbling', 80), 'defensive': ('defending', 75), 'tackler': ('defending', 80) } # League/Competition keywords league_keywords = { 'premier league': ['England', 'English', 'Manchester', 'Liverpool', 'Chelsea', 'Arsenal', 'Tottenham'], 'la liga': ['Spain', 'Spanish', 'Real Madrid', 'Barcelona', 'Atletico'], 'serie a': ['Italy', 'Italian', 'Juventus', 'Milan', 'Inter', 'Roma', 'Napoli'], 'bundesliga': ['Germany', 'German', 'Bayern', 'Dortmund', 'Leipzig'], 'ligue 1': ['France', 'French', 'PSG', 'Paris', 'Lyon', 'Marseille'] } # Single name query - exact name matching (using pre-computed columns) if is_single_name_query: search_name = non_keyword_words[0] # Use pre-computed normalized names if available if '_norm_first_name' in results.columns: exact_match = (results['_norm_first_name'] == search_name) | (results['_norm_last_name'] == search_name) results.loc[exact_match, 'search_score'] += 200 partial_match = (results['_norm_first_name'].str.contains(search_name, na=False)) | (results['_norm_last_name'].str.contains(search_name, na=False)) results.loc[partial_match & ~exact_match, 'search_score'] += 100 short_match = results['_norm_short_name'].str.contains(search_name, na=False) results.loc[short_match, 'search_score'] += 50 else: # Fallback to runtime computation try: from text_processor import get_text_processor tp = get_text_processor() results['first_name'] = results['long_name'].apply(lambda x: tp.normalize_text(str(x).split()[0]) if pd.notna(x) else '') results['last_name'] = results['long_name'].apply(lambda x: tp.normalize_text(str(x).split()[-1]) if pd.notna(x) else '') results['norm_short_name'] = results['short_name'].apply(lambda x: tp.normalize_text(str(x)) if pd.notna(x) else '') except: results['first_name'] = results['long_name'].str.split().str[0].str.lower() results['last_name'] = results['long_name'].str.split().str[-1].str.lower() results['norm_short_name'] = results['short_name'].str.lower() exact_match = (results['first_name'] == search_name) | (results['last_name'] == search_name) results.loc[exact_match, 'search_score'] += 200 partial_match = (results['first_name'].str.contains(search_name, na=False)) | (results['last_name'].str.contains(search_name, na=False)) results.loc[partial_match & ~exact_match, 'search_score'] += 100 short_match = results['norm_short_name'].str.contains(search_name, na=False) results.loc[short_match, 'search_score'] += 50 results = results.drop(columns=['first_name', 'last_name', 'norm_short_name']) else: # Multi-word query - enhanced name matching (using pre-computed columns) if '_norm_long_name' not in results.columns: try: from text_processor import get_text_processor tp = get_text_processor() results['norm_long_name'] = results['long_name'].apply(lambda x: tp.normalize_text(str(x)) if pd.notna(x) else '') results['norm_short_name'] = results['short_name'].apply(lambda x: tp.normalize_text(str(x)) if pd.notna(x) else '') except: results['norm_long_name'] = results['long_name'].str.lower() results['norm_short_name'] = results['short_name'].str.lower() else: results['norm_long_name'] = results['_norm_long_name'] results['norm_short_name'] = results['_norm_short_name'] for word in query_words: if len(word) > 2 and word not in all_keywords and not word.isdigit(): # Check if word is in any part of normalized name name_parts = results['norm_long_name'].str.split() exact_name_part = name_parts.apply(lambda parts: word in parts if isinstance(parts, list) else False) results.loc[exact_name_part, 'search_score'] += 150 name_match = results['norm_long_name'].str.contains(word, na=False) results.loc[name_match & ~exact_name_part, 'search_score'] += 100 short_match = results['norm_short_name'].str.contains(word, na=False) results.loc[short_match, 'search_score'] += 80 results = results.drop(columns=['norm_long_name', 'norm_short_name']) # Nationality/Country matching (using pre-computed lowercase if available) nationality_col = '_nationality_lower' if '_nationality_lower' in results.columns else 'nationality_name' if nationality_col == 'nationality_name': nationality_lower = results['nationality_name'].str.lower() else: nationality_lower = results['_nationality_lower'] # Map nationality keywords to proper country names nationality_map = { 'brazilian': 'brazil', 'argentinian': 'argentina', 'french': 'france', 'spanish': 'spain', 'german': 'germany', 'english': 'england', 'italian': 'italy', 'portuguese': 'portugal', 'dutch': 'netherlands', 'belgian': 'belgium' } for word in query_words: # Check if word is a nationality adjective and map it search_word = nationality_map.get(word, word) if word not in all_keywords and not word.isdigit() and len(word) > 3: nationality_match = nationality_lower.str.contains(search_word, na=False) results.loc[nationality_match, 'search_score'] += 60 # Club matching (using pre-computed lowercase if available) club_col = '_club_lower' if '_club_lower' in results.columns else 'club_name' if club_col == 'club_name': club_lower = results['club_name'].str.lower() else: club_lower = results['_club_lower'] for word in query_words: if word not in all_keywords and not word.isdigit() and len(word) > 2: club_match = club_lower.str.contains(word, na=False) results.loc[club_match, 'search_score'] += 50 # League matching for league, keywords in league_keywords.items(): if league in query_lower: for keyword in keywords: club_match = results['club_name'].str.contains(keyword, case=False, na=False) results.loc[club_match, 'search_score'] += 40 # Position matching with strict filtering position_matched = False strict_position_filter = False for keyword, positions in position_keywords.items(): if keyword in query_lower: position_matched = True # Check if player has ANY of the positions for this keyword any_pos_match = pd.Series([False] * len(results), index=results.index) for pos in positions: pos_match = results['player_positions'].str.contains(pos, case=False, na=False) any_pos_match |= pos_match # Award points for matching the position group results.loc[any_pos_match, 'search_score'] += 80 # Apply penalty only ONCE if player doesn't match ANY position in the group if keyword in ['goalkeeper', 'keeper', 'striker', 'winger']: strict_position_filter = True results.loc[~any_pos_match, 'search_score'] -= 150 # Attribute matching with thresholds for keyword, (attr_col, threshold) in attribute_keywords.items(): if keyword in query_lower and attr_col in results.columns: excellent_attr = results[attr_col] >= threshold + 5 high_attr = (results[attr_col] >= threshold) & (results[attr_col] < threshold + 5) medium_attr = (results[attr_col] >= threshold - 10) & (results[attr_col] < threshold) results.loc[excellent_attr, 'search_score'] += 60 results.loc[high_attr, 'search_score'] += 40 results.loc[medium_attr, 'search_score'] += 20 # Quality modifiers (using normalized query) if any(word in normalized_query for word in ['best', 'top', 'elite', 'world class', 'great', 'highest']): results['search_score'] += (results['overall'] - 70) * 3.5 results.loc[results['overall'] >= 88, 'search_score'] += 80 results.loc[results['overall'] >= 85, 'search_score'] += 50 if any(word in normalized_query for word in ['worst', 'bad', 'poor']): results['search_score'] += (75 - results['overall']) * 2 results.loc[results['overall'] <= 65, 'search_score'] += 50 # Value-based modifiers if 'cheap' in normalized_query and 'value_eur' in results.columns: results.loc[results['value_eur'] <= 1000000, 'search_score'] += 60 results.loc[results['value_eur'] <= 500000, 'search_score'] += 40 if 'valuable' in normalized_query and 'value_eur' in results.columns: results.loc[results['value_eur'] >= 50000000, 'search_score'] += 60 results.loc[results['value_eur'] >= 100000000, 'search_score'] += 40 # Physical attribute modifiers if 'tall' in normalized_query and 'height_cm' in results.columns: results.loc[results['height_cm'] >= 190, 'search_score'] += 60 results.loc[results['height_cm'] >= 185, 'search_score'] += 30 if 'short' in normalized_query and 'height_cm' in results.columns: results.loc[results['height_cm'] <= 170, 'search_score'] += 60 results.loc[results['height_cm'] <= 175, 'search_score'] += 30 # Age-based filtering if 'young' in query_lower or 'talent' in query_lower or 'promising' in query_lower: young_talent = (results['age'] <= 23) & (results['potential'] >= 80) results.loc[young_talent, 'search_score'] += 70 results['pot_diff'] = results['potential'] - results['overall'] results.loc[results['pot_diff'] > 15, 'search_score'] += 50 results.loc[results['pot_diff'] > 10, 'search_score'] += 30 results = results.drop(columns=['pot_diff']) if 'old' in query_lower or 'veteran' in query_lower or 'experienced' in query_lower: veteran = results['age'] >= 32 results.loc[veteran, 'search_score'] += 50 # Apply age range filters if age_min is not None: results = results[results['age'] >= age_min] if age_max is not None: results = results[results['age'] < age_max] # Apply overall rating filters if ovr_min is not None: results = results[results['overall'] >= ovr_min] if ovr_max is not None: results = results[results['overall'] <= ovr_max] # Foot preference if 'left footed' in query_lower or 'left foot' in query_lower: results.loc[results['preferred_foot'] == 'Left', 'search_score'] += 40 if 'right footed' in query_lower or 'right foot' in query_lower: results.loc[results['preferred_foot'] == 'Right', 'search_score'] += 40 # Work rate matching if 'high attacking' in query_lower: results.loc[results['attacking_work_rate'] == 'High', 'search_score'] += 30 if 'high defensive' in query_lower: results.loc[results['defensive_work_rate'] == 'High', 'search_score'] += 30 # Filter by score threshold # Special case: if query only has filters (age/ovr) and no meaningful search terms, don't filter by score has_only_filters = (age_min is not None or age_max is not None or ovr_min is not None or ovr_max is not None) # Check if we have meaningful search terms (excluding common words like 'players', 'with', comparison operators, etc.) common_words = {'players', 'player', 'with', 'from', 'in', 'at', 'of', 'the', 'a', 'an', 'and', 'above', 'below', 'over', 'under', 'between', 'greater', 'less', 'than', 'more', 'fewer', 'higher', 'lower', 'good', 'bad', 'age', 'years', 'old', 'ovr', 'overall', 'rating', 'rated'} meaningful_search_terms = [w for w in original_query_words if w not in all_keywords and w not in common_words and not w.isdigit() and len(w) > 2] has_meaningful_search = len(meaningful_search_terms) > 0 if any(word in normalized_query for word in ['best', 'top', 'elite', 'highest']): if not position_matched and not has_meaningful_search: results = results.sort_values(by='overall', ascending=False) return results.head(custom_limit) else: if has_meaningful_search: results = results[results['search_score'] > -100] elif any(word in normalized_query for word in ['worst', 'poor', 'bad']): # For "worst/lowest" queries, sort by overall ascending if results['search_score'].max() > 0: results = results[results['search_score'] > 0] results = results.sort_values(by=['search_score', 'overall'], ascending=[False, True]) return results.head(custom_limit) else: # Only filter by score if there were meaningful search terms (names/clubs/nationalities) if has_meaningful_search: results = results[results['search_score'] > 0] elif has_only_filters: # If we have filters but no search terms, keep all results (filters already applied) pass else: # If no filters and no meaningful search terms, filter by score results = results[results['search_score'] > 0] # Fallback search if no results found if results.empty or len(results) == 0: # Try a broader text search combined_text = ( self.df['long_name'].fillna('') + ' ' + self.df['player_positions'].fillna('') + ' ' + self.df['nationality_name'].fillna('') + ' ' + self.df['club_name'].fillna('') ).str.lower() search_terms = [w for w in query_words if w not in all_keywords and not w.isdigit() and len(w) > 2] if search_terms: match_mask = combined_text.str.contains('|'.join(search_terms), na=False, regex=True) results = self.df[match_mask].copy() results['search_score'] = results['overall'] # If still no results, return top players as fallback if results.empty or len(results) == 0: results = self.df.copy() results['search_score'] = results['overall'] results = results.sort_values(by='overall', ascending=False).head(custom_limit) # Sort by score then overall rating results = results.sort_values(by=['search_score', 'overall'], ascending=[False, False]) final_results = results.head(custom_limit) # Store in cache (LRU-style: remove oldest if cache is full) if len(self._search_cache) >= self._cache_max_size: # Remove oldest entry oldest_key = next(iter(self._search_cache)) del self._search_cache[oldest_key] self._search_cache[cache_key] = final_results.copy() return final_results def apply_filters(self, df, filters): """Apply filters to dataframe""" filtered_df = df.copy() # Overall rating if 'overall_min' in filters: filtered_df = filtered_df[filtered_df['overall'] >= filters['overall_min']] if 'overall_max' in filters: filtered_df = filtered_df[filtered_df['overall'] <= filters['overall_max']] # Potential if 'potential_min' in filters: filtered_df = filtered_df[filtered_df['potential'] >= filters['potential_min']] if 'potential_max' in filters: filtered_df = filtered_df[filtered_df['potential'] <= filters['potential_max']] # Age if 'age_min' in filters: filtered_df = filtered_df[filtered_df['age'] >= filters['age_min']] if 'age_max' in filters: filtered_df = filtered_df[filtered_df['age'] <= filters['age_max']] # Position if 'position' in filters and filters['position']: filtered_df = filtered_df[filtered_df['player_positions'].str.contains(filters['position'], case=False, na=False)] # Attributes attribute_mapping = { 'pace': 'pace', 'shooting': 'shooting', 'passing': 'passing', 'dribbling': 'dribbling', 'defending': 'defending', 'physicality': 'physic' } for attr_key, data_col in attribute_mapping.items(): min_key = f"{attr_key}_min" max_key = f"{attr_key}_max" if min_key in filters and data_col in filtered_df.columns: filtered_df = filtered_df[filtered_df[data_col] >= filters[min_key]] if max_key in filters and data_col in filtered_df.columns: filtered_df = filtered_df[filtered_df[data_col] <= filters[max_key]] # Text filters with improved matching if 'nationality' in filters and filters['nationality']: nat_filter = filters['nationality'].strip() if nat_filter: filtered_df = filtered_df[filtered_df['nationality_name'].str.contains(nat_filter, case=False, na=False)] if 'club' in filters and filters['club']: club_filter = filters['club'].strip() if club_filter: # Club aliases for common abbreviations and variations club_aliases = { 'psg': 'paris saint-germain', 'barca': 'barcelona', 'real': 'real madrid', 'atletico': 'atl', # Matches "Atltico" (partial match) 'munchen': 'bayern', # Mnchen alternative 'munich': 'bayern', 'man utd': 'manchester united', 'man city': 'manchester city', 'juve': 'juventus', 'spurs': 'tottenham', 'arsenal': 'arsenal', 'inter': 'inter', 'ac milan': 'ac milan', } # Check if the filter is an alias club_search = club_aliases.get(club_filter.lower(), club_filter) # Also try to normalize accented characters for matching # Create a normalized version of club names for matching try: import unicodedata def normalize_text(text): if pd.isna(text): return '' # Normalize unicode and remove accents normalized = unicodedata.normalize('NFD', str(text)) return ''.join(c for c in normalized if unicodedata.category(c) != 'Mn').lower() # Try exact match first match = filtered_df['club_name'].str.contains(club_search, case=False, na=False) # If no matches, try normalized search if match.sum() == 0: normalized_clubs = filtered_df['club_name'].apply(normalize_text) normalized_search = normalize_text(club_search) match = normalized_clubs.str.contains(normalized_search, na=False) filtered_df = filtered_df[match] except: filtered_df = filtered_df[filtered_df['club_name'].str.contains(club_search, case=False, na=False)] return filtered_df def get_alternative_image_url(self, player_row): """Get player image from alternative source using SoFIFA ID""" sofifa_id = player_row.get('sofifa_id', '') if sofifa_id: # Format: https://cdn.sofifa.net/players/158/023/22_120.png sofifa_str = str(int(sofifa_id)) if len(sofifa_str) >= 6: part1 = sofifa_str[:-3] part2 = sofifa_str return f"https://cdn.sofifa.net/players/{part1}/{part2}/22_120.png" return "" def fix_image_url(self, url): """Fix common image URL issues""" if not url: return "" # Fix relative URLs if url.startswith('//'): return 'https:' + url elif url.startswith('/'): return 'https://cdn.sofifa.net' + url elif not url.startswith('http'): return 'https://cdn.sofifa.net' + url return url def get_player_card_data(self, player_row): """Extract player data for frontend display""" def safe_int(value, default=0): """Safely convert value to int, handling NaN and None""" try: if pd.isna(value): return default return int(value) except (ValueError, TypeError): return default def safe_str(value, default=''): """Safely convert value to string, handling NaN and None""" try: if pd.isna(value): return default return str(value) except (ValueError, TypeError): return default # Get original URL and fix it original_url = safe_str(player_row.get('player_face_url', '')) fixed_original_url = self.fix_image_url(original_url) # Get alternative URL alternative_url = self.get_alternative_image_url(player_row) # Choose the best available URL photo_url = fixed_original_url if fixed_original_url else alternative_url # Get nation flag URL and club logo URL from dataset nation_flag_url = self.fix_image_url(safe_str(player_row.get('nation_flag_url', ''))) club_logo_url = self.fix_image_url(safe_str(player_row.get('club_logo_url', ''))) return { 'id': safe_str(player_row.get('sofifa_id', player_row.get('player_id', ''))), 'name': safe_str(player_row.get('long_name', ''), 'Unknown'), 'short_name': safe_str(player_row.get('short_name', ''), 'Unknown'), 'overall': safe_int(player_row.get('overall', 0)), 'potential': safe_int(player_row.get('potential', 0)), 'position': safe_str(player_row.get('player_positions', ''), 'SUB'), 'age': safe_int(player_row.get('age', 0)), 'club': safe_str(player_row.get('club_name', ''), 'Free Agent'), 'nationality': safe_str(player_row.get('nationality_name', ''), 'Unknown'), 'photo_url': photo_url, 'nation_flag_url': nation_flag_url, 'club_logo_url': club_logo_url, 'preferred_foot': safe_str(player_row.get('preferred_foot', ''), 'Right'), 'skill_moves': safe_int(player_row.get('skill_moves', 0)), 'weak_foot': safe_int(player_row.get('weak_foot', 0)), 'attributes': { 'pace': safe_int(player_row.get('pace', 0)), 'shooting': safe_int(player_row.get('shooting', 0)), 'passing': safe_int(player_row.get('passing', 0)), 'dribbling': safe_int(player_row.get('dribbling', 0)), 'defending': safe_int(player_row.get('defending', 0)), 'physicality': safe_int(player_row.get('physic', 0)) }, 'value_eur': safe_int(player_row.get('value_eur', 0)), 'wage_eur': safe_int(player_row.get('wage_eur', 0)) } def load_detailed_profiles(self): """Load optional detailed profiles (if available) and cache them.""" try: if os.path.exists(self.detailed_profiles_path): with open(self.detailed_profiles_path, 'r', encoding='utf-8') as f: data = json.load(f) # Normalize to mapping by player_id (Transfermarkt ID) if isinstance(data, dict): # Keys are already player_id strings self.detailed_profiles = {str(k): v for k, v in data.items()} elif isinstance(data, list): # If it's a list of profiles, index by player_id mapping = {} for item in data: pid = None if isinstance(item, dict): if 'player_id' in item: pid = item.get('player_id') elif 'id' in item: pid = item.get('id') if pid is not None: mapping[str(pid)] = item self.detailed_profiles = mapping print(f"[OK] Loaded detailed profiles: {len(self.detailed_profiles)} entries") else: self.detailed_profiles = {} except Exception as e: print(f"[ERROR] Error loading detailed profiles: {e}") self.detailed_profiles = {} # Initialize the search engine search_engine = ScoutSearchEngine() # Initialize optimized search engine (with pre-built indices) optimized_search = None try: optimized_search = OptimizedSearchEngine(search_engine.df) except Exception as e: print(f"[WARNING] Optimized search not available: {e}") @app.route('/') def serve_frontend(): """Serve the main frontend page""" base_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) return send_from_directory(base_dir, 'index.html') @app.route('/') def serve_static(filename): """Serve static files directly from root (like 7070065.jpg, etc.)""" base_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) return send_from_directory(base_dir, filename) # NEW: Text Search Endpoint @app.route('/api/text-search', methods=['POST', 'OPTIONS']) def text_search_players(): """API endpoint for text-based player search with semantic expansion""" if request.method == 'OPTIONS': return '', 200 start_time = time.time() query = "" success = True result_count = 0 try: data = request.get_json() query = data.get('query', '') limit = data.get('limit', 50) use_semantic = data.get('semantic', True) # Enable by default if not query: return jsonify({ 'success': False, 'error': 'Query parameter is required' }), 400 print(f" Text search query: '{query}' (semantic={use_semantic})") # Apply semantic expansion if enabled expanded_query = query if use_semantic: try: from semantic_search import semantic_engine expanded_terms = semantic_engine.expand_query(query, max_expansions=2) expanded_query = ' '.join(expanded_terms) print(f" Expanded query: '{expanded_query}'") except Exception as e: print(f"[WARNING] Semantic expansion failed: {e}") # Perform text search with expanded query results_df = search_engine.search_players_text(expanded_query, limit=limit) result_count = len(results_df) print(f" Found {len(results_df)} results") # Convert to frontend format players_data = [] for _, player in results_df.iterrows(): players_data.append(search_engine.get_player_card_data(player)) # Sanitize for JSON serialization players_data = sanitize_for_json(players_data) return jsonify({ 'success': True, 'players': players_data, 'count': len(players_data), 'query': query, 'expanded_query': expanded_query if use_semantic else None }) except Exception as e: success = False print(f"[ERROR] Text search error: {e}") import traceback traceback.print_exc() return jsonify({ 'success': False, 'error': str(e) }), 500 finally: # Track performance response_time = (time.time() - start_time) * 1000 # Convert to ms word_count = len(query.split()) if query else 0 query_type = 'text_search_single' if word_count == 1 else f'text_search_{word_count}word' track_query(query, response_time, result_count, query_type, success) # Your existing endpoints @app.route('/api/search', methods=['POST', 'OPTIONS']) def search_players(): """API endpoint for attribute-based player search""" if request.method == 'OPTIONS': return '', 200 start_time = time.time() success = True result_count = 0 try: data = request.get_json() # Extract filters from request filters = { 'overall_min': data.get('overallMin', 0), 'overall_max': data.get('overallMax', 99), 'potential_min': data.get('potentialMin', 0), 'potential_max': data.get('potentialMax', 99), 'position': data.get('position', ''), 'pace_min': data.get('paceMin', 0), 'pace_max': data.get('paceMax', 99), 'shooting_min': data.get('shootingMin', 0), 'shooting_max': data.get('shootingMax', 99), 'passing_min': data.get('passingMin', 0), 'passing_max': data.get('passingMax', 99), 'dribbling_min': data.get('dribblingMin', 0), 'dribbling_max': data.get('dribblingMax', 99), 'nationality': data.get('nationality', ''), 'club': data.get('club', '') } # Debug logging for attribute search if filters.get('club') and filters.get('position'): print(f"[DEBUG ATTR] Club='{filters['club']}' Position='{filters['position']}'") # Only apply age filters if explicitly provided if 'ageMin' in data and data['ageMin'] is not None: filters['age_min'] = data['ageMin'] if 'ageMax' in data and data['ageMax'] is not None: filters['age_max'] = data['ageMax'] sort_by = data.get('sortBy', 'overall') limit = data.get('limit', 50) # Get query if provided (for name filtering) query = data.get('query', '').strip() # USE OPTIMIZED SEARCH if available and query is provided if optimized_search and query: opt_filters = { 'overallMin': filters.get('overall_min', 0), 'overallMax': filters.get('overall_max', 99), 'ageMin': filters.get('age_min', None), 'ageMax': filters.get('age_max', None), 'position': filters.get('position', ''), 'paceMin': filters.get('pace_min', 0), 'shootingMin': filters.get('shooting_min', 0) } results = optimized_search.search(query, opt_filters, limit) result_count = len(results) # Convert optimized results to frontend format (matching get_player_card_data) players_data = [] for player in results: card_data = { 'id': '', # Optimized search doesn't have sofifa_id in preprocessed data 'name': player['long_name'], 'short_name': player['short_name'], 'overall': int(player['overall']), 'potential': int(player['potential']), 'position': player['player_positions'], 'age': int(player['age']), 'club': player['club_name'], 'nationality': player['nationality_name'], 'photo_url': '', # Not in optimized index 'nation_flag_url': '', 'club_logo_url': '', 'preferred_foot': 'Right', 'skill_moves': 0, 'weak_foot': 0, 'attributes': { 'pace': int(player['pace']), 'shooting': int(player['shooting']), 'passing': int(player['passing']), 'dribbling': int(player['dribbling']), 'defending': int(player['defending']), 'physicality': int(player['physic']) }, 'value_eur': float(player['value_eur']), 'wage_eur': float(player['wage_eur']) } players_data.append(card_data) else: # Fallback to original search results_df = search_engine.search_players(filters, sort_by=sort_by, limit=limit) result_count = len(results_df) # Debug logging: show what clubs are in the results if filters.get('club') and filters.get('position') and len(results_df) > 0: clubs_in_results = results_df['club_name'].value_counts() print(f"[DEBUG ATTR] Results by club: {dict(clubs_in_results.head(5))}") barcelona_only = results_df[results_df['club_name'].str.contains('FC Barcelona', case=False, na=False)] print(f"[DEBUG ATTR] FC Barcelona players: {len(barcelona_only)}") if len(barcelona_only) > 0: print(f"[DEBUG ATTR] FC Barcelona strikers: {list(barcelona_only['short_name'].head(5))}") # Convert to frontend format players_data = [] for _, player in results_df.iterrows(): players_data.append(search_engine.get_player_card_data(player)) # Sanitize for JSON serialization players_data = sanitize_for_json(players_data) return jsonify({ 'success': True, 'players': players_data, 'count': len(players_data), 'message': f'Found {len(players_data)} players' if len(players_data) > 0 else 'No players found matching your criteria. Try adjusting the filters.' }) except Exception as e: success = False return jsonify({ 'success': False, 'error': str(e) }), 500 finally: # Track performance response_time = (time.time() - start_time) * 1000 # Convert to ms track_query('attribute_search', response_time, result_count, 'attribute_search', success) @app.route('/api/test-search', methods=['GET']) def test_search(): """Test endpoint to verify search logic works""" try: # Test with overall <= 76 test_filters = { 'overall_min': 0, 'overall_max': 76 } print(f" Testing with filters: {test_filters}") results_df = search_engine.search_players(filters=test_filters, limit=20) print(f" Got {len(results_df)} results") players_list = [] for _, player in results_df.iterrows(): players_list.append({ 'name': player.get('short_name', 'Unknown'), 'overall': int(player.get('overall', 0)), 'age': int(player.get('age', 0)), 'club': player.get('club_name', 'Unknown') }) return jsonify({ 'success': True, 'total_results': len(results_df), 'filters_used': test_filters, 'players': players_list, 'dataset_size': len(search_engine.df) }) except Exception as e: import traceback return jsonify({ 'success': False, 'error': str(e), 'traceback': traceback.format_exc() }), 500 @app.route('/api/players/top', methods=['GET', 'OPTIONS']) def get_top_players(): """Get top players for initial display""" if request.method == 'OPTIONS': return '', 200 try: filters = {'overall_min': 80} results_df = search_engine.search_players(filters, limit=20) players_data = [] for _, player in results_df.iterrows(): players_data.append(search_engine.get_player_card_data(player)) # Sanitize for JSON serialization players_data = sanitize_for_json(players_data) return jsonify({ 'success': True, 'players': players_data }) except Exception as e: return jsonify({ 'success': False, 'error': str(e) }), 500 @app.route('/api/debug/images', methods=['GET']) def debug_images(): """Debug endpoint to check image URLs""" try: # Get top 5 players with their image URLs filters = {'overall_min': 85} results_df = search_engine.search_players(filters, limit=10) debug_info = [] for _, player in results_df.iterrows(): player_data = search_engine.get_player_card_data(player) debug_info.append({ 'name': player.get('long_name', ''), 'sofifa_id': player.get('sofifa_id', ''), 'original_url': player.get('player_face_url', ''), 'final_url': player_data['photo_url'], 'has_original': bool(player.get('player_face_url', '')), 'alternative_url': search_engine.get_alternative_image_url(player) }) return jsonify({ 'success': True, 'debug_info': debug_info, 'total_players_checked': len(debug_info) }) except Exception as e: return jsonify({ 'success': False, 'error': str(e) }), 500 @app.route('/api/debug/columns', methods=['GET']) def debug_columns(): """Debug endpoint to check available columns""" try: if search_engine.df is None: return jsonify({'success': False, 'error': 'Data not loaded'}) # Get all columns all_columns = list(search_engine.df.columns) # Get image-related columns image_columns = [col for col in all_columns if any(keyword in col.lower() for keyword in ['url', 'logo', 'flag', 'badge', 'face', 'image'])] # Get first player sample to see actual data sample_player = search_engine.df.iloc[0] if len(search_engine.df) > 0 else {} sample_data = {} for col in image_columns: if col in sample_player and pd.notna(sample_player[col]): sample_data[col] = sample_player[col] return jsonify({ 'success': True, 'total_columns': len(all_columns), 'image_columns': image_columns, 'sample_image_data': sample_data, 'first_5_columns': all_columns[:5] }) except Exception as e: return jsonify({ 'success': False, 'error': str(e) }), 500 @app.route('/api/debug/test-player', methods=['GET']) def debug_test_player(): """Test endpoint to check one player's data""" try: if search_engine.df is None or search_engine.df.empty: return jsonify({'success': False, 'error': 'Data not loaded'}) # Get first player player = search_engine.df.iloc[0] player_data = search_engine.get_player_card_data(player) return jsonify({ 'success': True, 'player': player_data, 'has_nation_flag': bool(player_data['nation_flag_url']), 'has_club_logo': bool(player_data['club_logo_url']), 'has_skill_moves': 'skill_moves' in player_data, 'has_preferred_foot': 'preferred_foot' in player_data }) except Exception as e: return jsonify({ 'success': False, 'error': str(e) }), 500 @app.route('/api/player/', methods=['GET']) def get_player_details(player_id): """Get detailed information for a specific player""" try: player_df = search_engine.df[search_engine.df['sofifa_id'] == player_id] if player_df.empty: return jsonify({ 'success': False, 'error': 'Player not found' }), 404 player = player_df.iloc[0] player_data = search_engine.get_player_card_data(player) # Attempt to attach richer detailed profile if available detailed = {} try: # Map sofifa_id to transfermarkt player_id, then lookup detailed profile if hasattr(search_engine, 'detailed_profiles') and hasattr(search_engine.text_search_engine, 'player_mapping'): tm_player_id = search_engine.text_search_engine.player_mapping.get(str(player_id)) if tm_player_id: detailed = search_engine.detailed_profiles.get(str(tm_player_id), {}) except Exception as e: print(f"Warning: Could not load detailed profile for {player_id}: {e}") detailed = {} # If detailed exists, attach under `details` to avoid colliding with core fields if detailed and isinstance(detailed, dict): player_data_enriched = dict(player_data) player_data_enriched['details'] = detailed else: player_data_enriched = player_data # Sanitize numpy/pandas types for JSON player_data_enriched = sanitize_for_json(player_data_enriched) return jsonify({ 'success': True, 'player': player_data_enriched }) except Exception as e: return jsonify({ 'success': False, 'error': str(e) }), 500 @app.route('/api/filters/options', methods=['GET']) def get_filter_options(): """Get available options for filters (positions, nationalities, clubs)""" try: if search_engine.df is None: return jsonify({'success': False, 'error': 'Data not loaded'}) # Get unique positions all_positions = [] for positions in search_engine.df['player_positions'].dropna(): if isinstance(positions, str): all_positions.extend([pos.strip() for pos in positions.split(',')]) unique_positions = sorted(list(set(all_positions))) # Get unique nationalities (top 50) nationalities = search_engine.df['nationality_name'].dropna().unique() top_nationalities = sorted(nationalities)[:50] # Get unique clubs (top 50) clubs = search_engine.df['club_name'].dropna().unique() top_clubs = sorted(clubs)[:50] return jsonify({ 'success': True, 'positions': unique_positions, 'nationalities': top_nationalities.tolist(), 'clubs': top_clubs.tolist() }) except Exception as e: return jsonify({ 'success': False, 'error': str(e) }), 500 @app.route('/api/stats', methods=['GET']) def get_system_stats(): """Get system statistics""" try: if search_engine.df is None: return jsonify({'success': False, 'error': 'Data not loaded'}) total_players = len(search_engine.df) players_with_images = search_engine.df['player_face_url'].notna().sum() avg_overall = search_engine.df['overall'].mean() avg_age = search_engine.df['age'].mean() # Top 5 nationalities top_nationalities = search_engine.df['nationality_name'].value_counts().head(5).to_dict() # Top 5 clubs top_clubs = search_engine.df['club_name'].value_counts().head(5).to_dict() return jsonify({ 'success': True, 'stats': { 'total_players': total_players, 'players_with_images': int(players_with_images), 'image_coverage': f"{(players_with_images / total_players * 100):.1f}%", 'average_rating': f"{avg_overall:.1f}", 'average_age': f"{avg_age:.1f}", 'top_nationalities': top_nationalities, 'top_clubs': top_clubs } }) except Exception as e: return jsonify({ 'success': False, 'error': str(e) }), 500 @app.route('/api/image-proxy') def image_proxy(): """Proxy images to avoid CORS and CSP issues""" try: image_url = request.args.get('url') if not image_url: return jsonify({'error': 'No URL provided'}), 400 # Fix the URL if needed if image_url.startswith('//'): image_url = 'https:' + image_url elif image_url.startswith('/'): image_url = 'https://cdn.sofifa.net' + image_url # Fetch the image headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' } response = requests.get(image_url, headers=headers, timeout=10) response.raise_for_status() # Return the image with proper headers return Response( response.content, content_type=response.headers.get('Content-Type', 'image/jpeg'), headers={ 'Cache-Control': 'public, max-age=86400', 'Access-Control-Allow-Origin': '*' } ) except Exception as e: print(f"Image proxy error for {image_url}: {e}") # Return a transparent pixel as fallback from io import BytesIO try: from PIL import Image img = Image.new('RGBA', (1, 1), (0, 0, 0, 0)) img_io = BytesIO() img.save(img_io, 'PNG') img_io.seek(0) return Response(img_io.getvalue(), content_type='image/png') except ImportError: # If PIL is not available, return empty response return Response(b'', content_type='image/png') # NEW ENDPOINTS FOR REQUIREMENTS @app.route('/api/autocomplete', methods=['GET', 'OPTIONS']) def autocomplete_suggestions(): """Get autocomplete suggestions for query prefix""" if request.method == 'OPTIONS': return '', 204 try: from autocomplete import autocomplete_engine prefix = request.args.get('q', '').strip() limit = int(request.args.get('limit', 5)) if not autocomplete_engine: return jsonify({'suggestions': []}) suggestions = autocomplete_engine.get_smart_suggestions(prefix, limit=limit) return jsonify({ 'suggestions': suggestions, 'prefix': prefix }) except Exception as e: print(f"Autocomplete error: {e}") return jsonify({'suggestions': [], 'error': str(e)}), 500 @app.route('/api/performance', methods=['GET']) def get_performance_metrics(): """Get performance metrics and requirement compliance""" try: stats = performance_monitor.get_statistics() return jsonify({ 'success': True, 'metrics': stats, 'report': performance_monitor.get_performance_report().split('\n') }) except Exception as e: return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/barrel/stats', methods=['GET']) def get_barrel_stats(): """Get barrel manager statistics""" try: text_engine = search_engine.text_search_engine barrel_stats = text_engine.barrel_manager.get_statistics() return jsonify({ 'success': True, 'barrel_stats': barrel_stats }) except Exception as e: return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/semantic/expand', methods=['POST']) def expand_query_semantic(): """Expand query with semantic synonyms""" try: data = request.get_json() query = data.get('query', '') max_expansions = data.get('max_expansions', 3) from semantic_search import semantic_engine if not semantic_engine: return jsonify({'success': False, 'error': 'Semantic engine not initialized'}) expanded = semantic_engine.expand_query(query, max_expansions=max_expansions) return jsonify({ 'success': True, 'original_query': query, 'expanded_terms': expanded }) except Exception as e: return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/player/add', methods=['POST']) def add_new_player(): """ Add a new player dynamically with full indexing (REQUIREMENT #10) Updates lexicon, forward index, inverted index, and barrels Makes player immediately searchable without blocking existing searches """ start_time = time.time() try: data = request.get_json() # Validate required fields required_fields = ['short_name', 'overall', 'age', 'nationality_name'] missing = [f for f in required_fields if f not in data] if missing: return jsonify({ 'success': False, 'error': f'Missing required fields: {", ".join(missing)}' }), 400 # Generate unique player ID new_player_id = len(search_engine.df) + 1 data['player_id'] = new_player_id # Set defaults for missing optional fields defaults = { 'long_name': data.get('short_name', 'Unknown'), 'player_positions': data.get('player_positions', 'SUB'), 'club_name': data.get('club_name', 'Free Agent'), 'league_name': data.get('league_name', 'Unknown'), 'potential': data.get('potential', data.get('overall', 70)), 'value_eur': data.get('value_eur', 100000), 'wage_eur': data.get('wage_eur', 1000), 'preferred_foot': data.get('preferred_foot', 'Right'), 'weak_foot': data.get('weak_foot', 3), 'skill_moves': data.get('skill_moves', 3), 'work_rate': data.get('work_rate', 'Medium/Medium'), 'body_type': data.get('body_type', 'Normal'), 'pace': data.get('pace', 70), 'shooting': data.get('shooting', 70), 'passing': data.get('passing', 70), 'dribbling': data.get('dribbling', 70), 'defending': data.get('defending', 70), 'physic': data.get('physic', 70), 'player_face_url': data.get('player_face_url', ''), 'club_logo_url': data.get('club_logo_url', ''), 'nation_flag_url': data.get('nation_flag_url', '') } # Apply defaults for key, value in defaults.items(): if key not in data: data[key] = value # Add player to FIFA dataset (in-memory) new_row = pd.DataFrame([data]) search_engine.df = pd.concat([search_engine.df, new_row], ignore_index=True) # Update optimized search engine if available if optimized_search is not None: try: # Add to name index name_tokens = data['short_name'].lower().split() for token in name_tokens: if token not in optimized_search.name_index: optimized_search.name_index[token] = set() optimized_search.name_index[token].add(new_player_id) # Add to club index club = data.get('club_name', 'Free Agent') if club not in optimized_search.club_index: optimized_search.club_index[club] = set() optimized_search.club_index[club].add(new_player_id) # Add to nationality index nationality = data.get('nationality_name', 'Unknown') if nationality not in optimized_search.nationality_index: optimized_search.nationality_index[nationality] = set() optimized_search.nationality_index[nationality].add(new_player_id) except Exception as idx_error: print(f"[WARNING] Could not update optimized search: {idx_error}") # Create text content for indexing text_content = f"{data['short_name']} {data.get('long_name', '')} {data.get('player_positions', '')} {data.get('nationality_name', '')} {data.get('club_name', '')} {data.get('league_name', '')}".lower() # Check if dynamic_indexer is available if dynamic_indexer is None: return jsonify({ 'success': False, 'error': 'Dynamic indexer not initialized. Server may need restart.' }), 500 # Index the document using DynamicIndexer doc_id, success, message = dynamic_indexer.add_document( doc_content=text_content, doc_metadata={ 'player_id': new_player_id, 'type': 'player', 'source': 'user_added' } ) if not success: return jsonify({ 'success': False, 'error': f'Indexing failed: {message}' }), 500 # Reload barrel manager to include new terms try: from barrel_manager import barrel_manager barrel_manager.reload_mappings() except: pass # Not critical elapsed = (time.time() - start_time) * 1000 return jsonify({ 'success': True, 'message': f'Player "{data["short_name"]}" added and indexed successfully', 'player_id': new_player_id, 'doc_id': doc_id, 'indexing_time_ms': round(elapsed, 2), 'total_players': len(search_engine.df), 'index_stats': dynamic_indexer.get_stats() }) except Exception as e: import traceback return jsonify({ 'success': False, 'error': str(e), 'traceback': traceback.format_exc() }), 500 def init_advanced_components(): base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) # 0. Extract compressed dataset if running on cloud zip_path = os.path.join(base_dir, 'data', 'scoutsearch_data.zip') data_dir = os.path.join(base_dir, 'data') if os.path.exists(zip_path): print(f"[STARTUP] Found dataset payload {zip_path}, unzipping...") try: import zipfile with zipfile.ZipFile(zip_path, 'r') as zipf: # We changed the zip structure to extract straight into 'data' rather than 'data/index' zipf.extractall(data_dir) print("[STARTUP] Dataset extracted successfully.") except Exception as e: print(f"[ERROR] Failed to extract dataset payload: {e}") print("\n Initializing components...") # 1. Autocomplete System try: lexicon_path = os.path.join(base_dir, 'data', 'index', 'lexicon_complete.json') if os.path.exists(lexicon_path): from autocomplete import autocomplete_engine initialize_autocomplete(lexicon_path) print("[OK] Autocomplete engine initialized") else: print("[WARNING] Lexicon not found, autocomplete disabled") except Exception as e: print(f"[WARNING] Autocomplete initialization failed: {e}") # 2. Semantic Search try: from semantic_search import semantic_engine initialize_semantic_search() # Loads Word2Vec or custom synonyms print("[OK] Semantic search initialized with Word2Vec embeddings") except Exception as e: print(f"[WARNING] Semantic search initialization failed: {e}") # 3. Dynamic Indexer try: index_dir_path = os.path.join(base_dir, 'data', 'index') # Use a local reference to avoid global keyword issue at module level _di = DynamicIndexer(data_dir=index_dir_path) # Update the module-level variable via globals() globals()['dynamic_indexer'] = _di print(f"[OK] Dynamic indexer initialized ({_di.get_stats()['total_terms']:,} terms)") except Exception as e: print(f"[WARNING] Dynamic indexer initialization failed: {e}") # Run initialization automatically for WSGI environments (like gunicorn) init_advanced_components() if __name__ == '__main__': # Check if required files exist base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) required_files = { os.path.join(base_dir, 'data', 'raw', 'players_22.csv'): 'FIFA 22 dataset', os.path.join(base_dir, 'data', 'raw', 'search_engine_dataset.jsonl'): 'Transfermarkt text dataset', os.path.join(base_dir, 'data', 'raw', 'player_mapping_enhanced.json'): 'Player mapping' } missing_files = [] for file, description in required_files.items(): if not os.path.exists(file): missing_files.append(f"{file} ({description})") if missing_files: print("[ERROR] Missing required files:") for file in missing_files: print(f" - {file}") else: print("=" * 60) print("[STARTUP] STARTING ENHANCED SCOUTSEARCH SERVER") print("=" * 60) # 4. Performance Monitor (already initialized globally) print("[OK] Performance monitor active") print("\n Components loaded:") print(" - FIFA 22 dataset") print(" - Text search engine with barrel system") print(" - Autocomplete with Trie") print(" - Semantic search") print(" - Performance monitoring") print("\n Server running at: http://localhost:5000") print("=" * 60) print("\n Available endpoints:") print(" GET / - Main frontend") print(" POST /api/search - Attribute search") print(" POST /api/text-search - Text search with barrels & semantic") print(" POST /api/player/add - Add new player (DYNAMIC INDEXING)") print(" GET /api/players/top - Get top players") print(" GET /api/autocomplete - Autocomplete suggestions") print(" POST /api/semantic/expand - Semantic query expansion") print(" GET /api/performance - System performance stats") print(" GET /api/stats - System statistics") print(" GET /api/player/ - Get player details") print(" GET /api/debug/* - Debug endpoints") print("") port = int(os.environ.get('PORT', 8000)) app.run(debug=False, host='0.0.0.0', port=port)