scoutsearch / Backend /src /search_engine.py
Ali00922's picture
Upload 37 files
da6a0a4 verified
# search_engine_barrels.py
# BARREL-OPTIMIZED SEARCH ENGINE - Loads only required barrels per query
import csv
import json
import math
import os
import re
import time
from collections import defaultdict
# ---------- CONFIG & PATHS ----------
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
PROJECT_ROOT = os.path.dirname(SCRIPT_DIR)
INDEX_DIR = os.path.join(PROJECT_ROOT, "data", "index")
BARREL_DIR = os.path.join(INDEX_DIR, "barrels")
LEXICON_PATH = os.path.join(INDEX_DIR, "lexicon_complete.json")
FORWARD_INDEX_PATH = os.path.join(INDEX_DIR, "forward_index_termid.json")
TERM_TO_BARREL_MAP_PATH = os.path.join(BARREL_DIR, "term_to_barrel_map.json")
MARKET_VALUE_PATH = os.path.join(os.path.dirname(__file__), "..", "data", "raw", "player_latest_market_value", "player_latest_market_value.csv")
PROFILE_DATA_PATH = os.path.join(os.path.dirname(__file__), "..", "data", "processed", "complete_player_profiles.json")
# BM25 parameters
K1 = 1.2
B = 0.75
# Scoring boosts
NAME_TOKEN_WEIGHT = 0.75
NAME_PREFIX_BONUS = 1.25
EXACT_NAME_BONUS = 3.0
RAW_SUBSTRING_BONUS = 0.25
MARKET_VALUE_WEIGHT = 12.0
PROFILE_LENGTH_WEIGHT = 4.0
NON_NAME_MATCH_PENALTY = 1.5
# ---------- TEXT NORMALIZATION ----------
COMPREHENSIVE_STOP_WORDS = {
"the", "and", "in", "for", "with", "on", "at", "from", "by", "as", "is", "was",
"are", "were", "be", "been", "have", "has", "had", "to", "of", "a", "an", "that",
"this", "these", "those", "it", "its", "or", "but", "not", "what", "which", "who",
"when", "where", "why", "how", "all", "any", "both", "each", "few", "more", "most",
"other", "some", "such", "no", "nor", "only", "own", "same", "so", "than", "too",
"very", "can", "will", "just", "should", "now", "player", "club", "team", "football",
"soccer", "match", "game", "season", "league", "cup", "champions", "premier", "la",
"bundesliga", "serie", "current", "main", "position", "nationality", "birth", "place",
# Universal terms that appear in ALL documents (filtering for memory/performance)
"comprehensive", "international", "performance", "transfermarkt", "injury",
"summary", "market", "history", "database", "value",
# Stemmed versions and other universal terms
"data", "teammat", "sourc", "career", "assist", "app", "minut",
"available", "national", "significant", "teammate", "transfer", "goal"
}
def simple_stemmer(word: str) -> str:
if word.endswith("ing") and len(word) > 5:
return word[:-3]
elif word.endswith("ed") and len(word) > 4:
return word[:-2]
elif word.endswith("es") and len(word) > 4:
return word[:-2]
elif word.endswith("s") and len(word) > 3:
return word[:-1]
return word
def normalize_and_tokenize(text: str):
text = text.lower()
tokens = re.findall(r"\b[a-z]+\b", text)
result = []
for w in tokens:
if w in COMPREHENSIVE_STOP_WORDS or len(w) <= 2:
continue
result.append(simple_stemmer(w))
return result
def normalize_name_tokens(value: str):
if not isinstance(value, str):
return []
tokens = re.findall(r"[a-z]+", value.lower())
return [simple_stemmer(tok) for tok in tokens if tok]
def build_name_metadata(name: str):
tokens = normalize_name_tokens(name)
token_set = set(tokens)
normalized = " ".join(tokens)
return {
"tokens": tokens,
"token_set": token_set,
"normalized": normalized,
"raw_lower": name.lower() if isinstance(name, str) else "",
}
def load_market_values(path: str):
values = {}
try:
with open(path, "r", encoding="utf-8") as handle:
reader = csv.DictReader(handle)
for row in reader:
try:
player_id = int(row.get("player_id", ""))
except (TypeError, ValueError):
continue
raw_value = row.get("value")
try:
value = float(raw_value)
except (TypeError, ValueError):
continue
date_key = row.get("date_unix", "") or ""
current = values.get(player_id)
if current is None or date_key > current[0]:
values[player_id] = (date_key, value)
except FileNotFoundError:
print(f"[warn] Market value file not found at {path}")
return {}
return {pid: info[1] for pid, info in values.items()}
def load_profile_lengths(path: str):
try:
with open(path, "r", encoding="utf-8") as handle:
data = json.load(handle)
except FileNotFoundError:
print(f"[warn] Profile data file not found at {path}")
return {}
lengths = {}
for entry in data:
player_id = entry.get("player_id")
if not isinstance(player_id, int):
continue
detailed = entry.get("detailed_content")
if isinstance(detailed, str) and detailed:
lengths[player_id] = len(detailed)
return lengths
# ---------- LOAD STATIC INDEXES (NOT INVERTED INDEX) ----------
print("[init] Loading lexicon...")
with open(LEXICON_PATH, "r", encoding="utf-8") as f:
lexicon_entries = json.load(f)
token_to_id = {entry["token"]: entry["term_id"] for entry in lexicon_entries}
termid_to_token = {entry["term_id"]: entry["token"] for entry in lexicon_entries}
term_document_frequency = {entry["term_id"]: entry["df"] for entry in lexicon_entries}
print(f"[done] Lexicon loaded: {len(token_to_id):,} tokens")
print("[init] Loading forward index...")
with open(FORWARD_INDEX_PATH, "r", encoding="utf-8") as f:
forward_index = json.load(f)
doc_by_id = {doc["player_id"]: doc for doc in forward_index}
N = len(doc_by_id)
avg_doc_len = sum(d["total_terms"] for d in forward_index) / N if N > 0 else 0.0
name_metadata = {doc_id: build_name_metadata(doc.get("player_name"))
for doc_id, doc in doc_by_id.items()}
print(f"[done] Forward index: {N:,} documents (avg_len={avg_doc_len:.2f})")
print("[init] Loading term-to-barrel mapping...")
with open(TERM_TO_BARREL_MAP_PATH, "r", encoding="utf-8") as f:
term_to_barrel = json.load(f)
print(f"[done] Term-to-barrel map loaded: {len(term_to_barrel):,} mappings")
print("[init] Loading market values...")
player_market_value = load_market_values(MARKET_VALUE_PATH)
max_market_value = max(player_market_value.values(), default=0.0)
market_value_log_max = math.log1p(max_market_value) if max_market_value > 0 else 1.0
print(f"[done] Market values loaded for {len(player_market_value):,} players")
print("[init] Loading profile metadata...")
profile_length_by_id = load_profile_lengths(PROFILE_DATA_PATH)
max_profile_length = max(profile_length_by_id.values(), default=0)
profile_length_log_max = math.log1p(max_profile_length) if max_profile_length > 0 else 1.0
print(f"[done] Profile metadata loaded for {len(profile_length_by_id):,} players")
# ---------- BARREL CACHE (LRU-like) ----------
barrel_cache = {}
MAX_CACHED_BARRELS = 10 # Keep only 10 barrels in memory at once
def load_barrel(barrel_name: str):
"""Load a barrel file and cache it. Implements simple LRU eviction."""
if barrel_name in barrel_cache:
return barrel_cache[barrel_name]
barrel_path = os.path.join(BARREL_DIR, f"{barrel_name}.json")
try:
with open(barrel_path, "r", encoding="utf-8") as f:
barrel_data = json.load(f)
# Cache management
if len(barrel_cache) >= MAX_CACHED_BARRELS:
# Remove oldest (first) entry
oldest_key = next(iter(barrel_cache))
del barrel_cache[oldest_key]
barrel_cache[barrel_name] = barrel_data
return barrel_data
except FileNotFoundError:
print(f"[error] Barrel file not found: {barrel_path}")
return None
# ---------- QUERY TO TERM IDs ----------
def tokens_to_term_ids(tokens):
seen = set()
unique_term_ids = []
for tok in tokens:
tid = token_to_id.get(tok)
if tid is None or tid in seen:
continue
seen.add(tid)
unique_term_ids.append(tid)
return unique_term_ids
# ---------- BM25 SCORING ----------
def bm25_score(tf, df, doc_len, N, avg_doc_len, k1=K1, b=B):
idf = math.log((N - df + 0.5) / (df + 0.5) + 1.0)
denom = tf + k1 * (1 - b + b * (doc_len / avg_doc_len))
return idf * (tf * (k1 + 1) / denom)
# ---------- BARREL-BASED SEARCH ----------
def search(query: str, top_k: int = 10, verbose: bool = True):
start_time = time.perf_counter()
log = print if verbose else (lambda *args, **kwargs: None)
log(f"\n[query] {query}")
query_tokens = normalize_and_tokenize(query)
term_ids = tokens_to_term_ids(query_tokens)
if not term_ids:
elapsed = (time.perf_counter() - start_time) * 1000
log(f"No query terms found in lexicon. (took {elapsed:.2f} ms)")
return []
log("Query tokens -> term_ids:",
[(termid_to_token.get(tid, "?"), tid) for tid in term_ids])
# **KEY OPTIMIZATION: Determine which barrels to load**
required_barrels = set()
for tid in term_ids:
barrel_name = term_to_barrel.get(str(tid))
if barrel_name:
required_barrels.add(barrel_name)
log(f"[barrels] Loading {len(required_barrels)} barrel(s): {sorted(required_barrels)}")
# Load only required barrels
barrel_load_start = time.perf_counter()
loaded_barrels = {}
for barrel_name in required_barrels:
barrel_data = load_barrel(barrel_name)
if barrel_data:
loaded_barrels[barrel_name] = barrel_data
barrel_load_time = (time.perf_counter() - barrel_load_start) * 1000
log(f"[barrels] Loaded in {barrel_load_time:.2f} ms")
# BM25 scoring using barrel data
scores = defaultdict(float)
for tid in term_ids:
df = term_document_frequency.get(tid, 0)
if df == 0:
continue
# Get barrel for this term
barrel_name = term_to_barrel.get(str(tid))
if not barrel_name or barrel_name not in loaded_barrels:
continue
barrel_data = loaded_barrels[barrel_name]
inverted_index_part = barrel_data.get("inverted_index", {})
# Get postings for this term
term_data = inverted_index_part.get(str(tid))
if not term_data:
continue
postings = term_data.get("postings", {})
for doc_id_str, info in postings.items():
doc_id = int(doc_id_str)
tf = info["tf"]
doc_len = doc_by_id[doc_id]["total_terms"]
scores[doc_id] += bm25_score(tf, df, doc_len, N, avg_doc_len)
# Metadata boosting (same as before)
if scores:
query_name_tokens = normalize_name_tokens(query)
query_name = " ".join(query_name_tokens)
raw_query_lower = query.lower().strip()
for doc_id in scores:
boost = 0.0
meta = name_metadata.get(doc_id)
has_name_match = False
match_count = 0
if meta:
if query_tokens:
match_count = sum(1 for tok in query_tokens if tok in meta["token_set"])
if match_count:
boost += NAME_TOKEN_WEIGHT * match_count
has_name_match = True
if query_name:
if meta["normalized"] == query_name:
boost += EXACT_NAME_BONUS
has_name_match = True
elif meta["normalized"].startswith(query_name):
boost += NAME_PREFIX_BONUS
has_name_match = True
if raw_query_lower and raw_query_lower in meta["raw_lower"]:
boost += RAW_SUBSTRING_BONUS
has_name_match = True
if not has_name_match and query_tokens:
boost -= NON_NAME_MATCH_PENALTY
if has_name_match:
value = player_market_value.get(doc_id)
if value and market_value_log_max > 0.0:
boost += MARKET_VALUE_WEIGHT * (math.log1p(value) / market_value_log_max)
length = profile_length_by_id.get(doc_id)
if length and profile_length_log_max > 0.0:
boost += PROFILE_LENGTH_WEIGHT * (math.log1p(length) / profile_length_log_max)
scores[doc_id] += boost
if not scores:
elapsed = (time.perf_counter() - start_time) * 1000
log(f"No documents matched these terms. (took {elapsed:.2f} ms)")
return []
ranked = sorted(scores.items(), key=lambda x: x[1], reverse=True)[:top_k]
results = []
for rank, (doc_id, score) in enumerate(ranked, start=1):
doc = doc_by_id[doc_id]
results.append({
"rank": rank,
"doc_id": doc_id,
"player_id": doc["player_id"],
"player_name": doc["player_name"],
"score": score,
"market_value": player_market_value.get(doc_id),
})
elapsed = (time.perf_counter() - start_time) * 1000
log("\n[top] Results:")
for r in results:
value = r["market_value"]
length = profile_length_by_id.get(r["doc_id"])
extras = []
if value:
extras.append(f"market_value~{value:,.0f} EUR")
if length:
extras.append(f"profile_chars={length}")
extra_text = f" [{', '.join(extras)}]" if extras else ""
log(f"{r['rank']:2d}. [{r['score']:.3f}] {r['player_name']} (player_id={r['player_id']}){extra_text}")
log(f"\n[time] {elapsed:.2f} ms (barrel_load={barrel_load_time:.2f} ms)")
log(f"[memory] {len(barrel_cache)} barrels cached, {len(required_barrels)} loaded for this query")
if elapsed < 500:
log("[perf]Under 500 ms goal")
else:
log("[perf]Above 500 ms goal")
return results
# ---------- CLI ----------
if __name__ == "__main__":
print("\n[ready] BARREL-OPTIMIZED search engine ready.")
print(f"[info] System loads only required barrels per query (max {MAX_CACHED_BARRELS} cached)")
print("[info] Type a query or press Enter to exit.\n")
while True:
q = input("Query> ").strip()
if not q:
break
search(q, top_k=10)
print("\n[exit] Exiting search engine.")