#!/usr/bin/env python3 """ ArcaThread Processor v1.0 - Generates pre-computed champion stats from matchup-matrix data - Runs hourly to update stats for new patches - Creates champ-stats/{patch}/{champion}.json files """ import os import json import time import re import threading import traceback from datetime import datetime from typing import Dict, List, Optional, Any from collections import defaultdict from fastapi import FastAPI import uvicorn import pandas as pd import numpy as np from huggingface_hub import hf_hub_download, CommitOperationAdd, list_repo_files from hf_client import get_hf_api, get_hf_config HF_CFG = get_hf_config() HF_TOKEN = HF_CFG.token DATASET_REPO = HF_CFG.dataset_repo PROCESS_INTERVAL_SECONDS = max(60, int(os.environ.get("PROCESS_INTERVAL_SECONDS", "3600"))) MIN_SAMPLE_SIZE = int(os.environ.get("MIN_SAMPLE_SIZE", "100")) DATASET_FILE_CACHE_SECONDS = max(30, int(os.environ.get("DATASET_FILE_CACHE_SECONDS", "300"))) TIER_MIN_GAMES = max(1, int(os.environ.get("TIER_MIN_GAMES", "500"))) TIER_CALIBRATION_MODE = str(os.environ.get("TIER_CALIBRATION_MODE", "quantile")).strip().lower() TIER_STATIC_THRESHOLDS = ( float(os.environ.get("TIER_STATIC_S_MIN_WR", "0.54")), float(os.environ.get("TIER_STATIC_A_MIN_WR", "0.52")), float(os.environ.get("TIER_STATIC_B_MIN_WR", "0.50")), float(os.environ.get("TIER_STATIC_C_MIN_WR", "0.48")), ) RANKS = [ "IRON", "BRONZE", "SILVER", "GOLD", "PLATINUM", "EMERALD", "DIAMOND", "MASTER", "GRANDMASTER", "CHALLENGER" ] # Global state is_running = True last_processing = None commit_cooldown_until = 0.0 stats = { "processings": 0, "champions_processed": 0, "patches_processed": [], "last_processing_per_patch": {}, "processing_history": [] } state_lock = threading.Lock() dataset_file_cache_lock = threading.Lock() dataset_file_cache = { "timestamp": 0.0, "files": [], } app = FastAPI(title="ArcaThread Processor v1.0") MAX_HISTORY = 20 def log(msg: str): """Thread-safe logging""" timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') print(f"[{timestamp}] {msg}", flush=True) def _normalize_patch_token(value: str) -> Optional[str]: """Extract major.minor from patch string""" text = str(value or "").strip() match = re.match(r"^(\d+)\.(\d+)", text) if not match: return None return f"{match.group(1)}.{match.group(2)}" def list_dataset_files(force_refresh: bool = False) -> List[str]: """List dataset files with a short-lived cache.""" now = time.time() with dataset_file_cache_lock: cached_files = dataset_file_cache.get("files", []) cached_at = float(dataset_file_cache.get("timestamp", 0.0) or 0.0) if ( not force_refresh and cached_files and (now - cached_at) < DATASET_FILE_CACHE_SECONDS ): return list(cached_files) files = list_repo_files(DATASET_REPO, repo_type="dataset", token=HF_TOKEN) with dataset_file_cache_lock: dataset_file_cache["files"] = list(files) dataset_file_cache["timestamp"] = now return files def load_existing_patch_meta(patch: str) -> Optional[Dict[str, Any]]: """Load existing meta for a patch if present.""" meta_path = f"champ-stats/{patch}/meta.json" try: local_path = hf_hub_download( repo_id=DATASET_REPO, filename=meta_path, repo_type="dataset", token=HF_TOKEN, local_dir="/tmp", ) with open(local_path, "r", encoding="utf-8") as handle: payload = json.load(handle) if isinstance(payload, dict): return payload except Exception: return None return None def load_matchup_data_for_patch(patch: str) -> pd.DataFrame: """Load all matchup data for a specific patch across all ranks""" log(f"Loading matchup data for patch {patch}...") try: all_files = list_dataset_files() # Filter for this patch's matchup files patch_files = [ f for f in all_files if f.startswith(f"matchup-matrix/") and f"/{patch}/" in f and f.endswith('.parquet') ] log(f"Found {len(patch_files)} matchup files for patch {patch}") if not patch_files: return pd.DataFrame() # Download and combine all files all_data = [] for file_path in patch_files: try: local_path = hf_hub_download( repo_id=DATASET_REPO, filename=file_path, repo_type="dataset", token=HF_TOKEN, local_dir="/tmp", ) df = pd.read_parquet(local_path) all_data.append(df) log(f" Loaded {file_path}: {len(df)} rows") except Exception as e: log(f" Failed to load {file_path}: {e}") continue if not all_data: return pd.DataFrame() combined = pd.concat(all_data, ignore_index=True) log(f"Combined patch {patch} data: {len(combined)} total rows") return combined except Exception as e: log(f"Error loading data for patch {patch}: {e}") log(traceback.format_exc()) return pd.DataFrame() def get_latest_patches(n: int = 3) -> List[str]: """Get the n latest patches from the dataset""" try: all_files = list_dataset_files() patches = set() for f in all_files: if not f.startswith("matchup-matrix/"): continue parts = f.split("/") if len(parts) >= 3: patch = _normalize_patch_token(parts[2]) if patch: patches.add(patch) # Sort by version number (newest first) sorted_patches = sorted(patches, key=lambda p: [int(x) for x in p.split(".")], reverse=True) return sorted_patches[:n] except Exception as e: log(f"Error getting latest patches: {e}") return [] def compute_champion_stats(df: pd.DataFrame) -> Dict[str, Dict[str, Any]]: """Compute aggregated stats per champion from matchup data""" if df.empty: return {} champion_stats = defaultdict(lambda: { "champion_id": 0, "total_games": 0, "wins": 0, "by_role": defaultdict(lambda: {"games": 0, "wins": 0}), "by_rank": defaultdict(lambda: {"games": 0, "wins": 0}), "matchups": defaultdict(lambda: {"games": 0, "wins": 0}), }) for _, row in df.iterrows(): champ_id = int(row.get('champion_id', 0)) enemy_id = int(row.get('enemy_champion_id', 0)) wins = float(row.get('wins', 0)) sample_size = int(row.get('sample_size', 0)) role = str(row.get('role', 'UNKNOWN')).upper() rank = str(row.get('rank', 'UNKNOWN')).upper() if champ_id <= 0 or sample_size <= 0: continue stats_entry = champion_stats[champ_id] stats_entry["champion_id"] = champ_id stats_entry["total_games"] += sample_size stats_entry["wins"] += wins # By role stats_entry["by_role"][role]["games"] += sample_size stats_entry["by_role"][role]["wins"] += wins # By rank stats_entry["by_rank"][rank]["games"] += sample_size stats_entry["by_rank"][rank]["wins"] += wins # Matchups if enemy_id > 0: matchup_key = str(enemy_id) stats_entry["matchups"][matchup_key]["games"] += sample_size stats_entry["matchups"][matchup_key]["wins"] += wins # Convert to final format with win rates result = {} for champ_id, data in champion_stats.items(): total_games = data["total_games"] if total_games < MIN_SAMPLE_SIZE: continue win_rate = data["wins"] / total_games if total_games > 0 else 0.5 # Process by_role by_role = {} for role, role_data in data["by_role"].items(): if role_data["games"] >= MIN_SAMPLE_SIZE // 2: by_role[role] = { "games": role_data["games"], "win_rate": round(role_data["wins"] / role_data["games"], 4) } # Process by_rank by_rank = {} for rank, rank_data in data["by_rank"].items(): if rank_data["games"] >= MIN_SAMPLE_SIZE // 2: by_rank[rank] = { "games": rank_data["games"], "win_rate": round(rank_data["wins"] / rank_data["games"], 4) } # Process matchups (top 10 most played) matchups = [] for enemy_id, matchup_data in data["matchups"].items(): if matchup_data["games"] >= MIN_SAMPLE_SIZE // 5: matchups.append({ "enemy_champion_id": int(enemy_id), "games": matchup_data["games"], "win_rate": round(matchup_data["wins"] / matchup_data["games"], 4) }) # Sort matchups by games played and take top 20 matchups.sort(key=lambda x: x["games"], reverse=True) matchups = matchups[:20] result[str(champ_id)] = { "champion_id": champ_id, "total_games": total_games, "win_rate": round(win_rate, 4), "by_role": by_role, "by_rank": by_rank, "matchups": matchups, } return result def _resolve_tier_thresholds(win_rates: List[float]) -> tuple: """ Resolve tier thresholds. - quantile mode: patch-adaptive cutoffs from current win-rate distribution. - static mode: fixed win-rate cutoffs. """ if TIER_CALIBRATION_MODE == "quantile" and len(win_rates) >= 10: quantiles = np.quantile(np.asarray(win_rates, dtype=np.float32), [0.8, 0.6, 0.4, 0.2]) s_min, a_min, b_min, c_min = [float(v) for v in quantiles] return s_min, a_min, b_min, c_min, "quantile" s_min, a_min, b_min, c_min = TIER_STATIC_THRESHOLDS return float(s_min), float(a_min), float(b_min), float(c_min), "static" def _assign_tier(win_rate: float, thresholds: tuple) -> str: s_min, a_min, b_min, c_min = thresholds if win_rate >= s_min: return "S" if win_rate >= a_min: return "A" if win_rate >= b_min: return "B" if win_rate >= c_min: return "C" return "D" def generate_tier_list( stats_by_champion: Dict[str, Dict], min_games: Optional[int] = None ) -> tuple[List[Dict], Dict[str, Any]]: """Generate tier list from champion stats with explicit calibration metadata.""" minimum_games = max(1, int(min_games if min_games is not None else TIER_MIN_GAMES)) candidates = [ data for data in stats_by_champion.values() if int(data.get("total_games", 0) or 0) >= minimum_games ] if not candidates: calibration = { "mode": "none", "min_games": minimum_games, "thresholds": {"S": None, "A": None, "B": None, "C": None}, "eligible_champions": 0, } return [], calibration win_rates = [float(data.get("win_rate", 0.5) or 0.5) for data in candidates] s_min, a_min, b_min, c_min, used_mode = _resolve_tier_thresholds(win_rates) thresholds = (s_min, a_min, b_min, c_min) tiers = [] for data in candidates: win_rate = float(data.get("win_rate", 0.5) or 0.5) tier = _assign_tier(win_rate, thresholds) tiers.append({ "champion_id": int(data.get("champion_id", 0) or 0), "tier": tier, "win_rate": win_rate, "games": int(data.get("total_games", 0) or 0), }) tiers.sort(key=lambda x: x["win_rate"], reverse=True) calibration = { "mode": used_mode, "min_games": minimum_games, "thresholds": { "S": round(s_min, 4), "A": round(a_min, 4), "B": round(b_min, 4), "C": round(c_min, 4), }, "eligible_champions": len(candidates), } return tiers, calibration def build_upload_operation(local_path: str, repo_path: str) -> Optional[CommitOperationAdd]: """Validate and build a single upload operation""" if not os.path.exists(local_path): log(f"File not found: {local_path}") return None size = os.path.getsize(local_path) if size == 0: log(f"File is empty: {local_path}") return None return CommitOperationAdd(path_in_repo=repo_path, path_or_fileobj=local_path) def upload_operations(operations: List[CommitOperationAdd], commit_message: str) -> bool: """Upload files to HF dataset""" global commit_cooldown_until if not operations: return True now = time.time() if now < commit_cooldown_until: remaining = int(commit_cooldown_until - now) log(f"Skipping upload (commit cooldown active for {remaining}s)") return False try: api = get_hf_api() api.create_commit( repo_id=DATASET_REPO, repo_type="dataset", operations=operations, commit_message=commit_message, ) log(f"Uploaded {len(operations)} files") return True except Exception as e: err_text = str(e) if "429" in err_text or "Too Many Requests" in err_text: commit_cooldown_until = time.time() + 3600 log(f"Upload rate-limited. Pausing for 1 hour") log(f"Upload failed: {e}") return False def process_patch(patch: str) -> int: """Process a single patch and generate champion stats""" log(f"=" * 60) log(f"Processing patch: {patch}") log(f"=" * 60) # Load matchup data df = load_matchup_data_for_patch(patch) if df.empty: log(f"No data found for patch {patch}") return 0 log(f"Computing champion stats from {len(df)} rows...") champion_stats = compute_champion_stats(df) log(f"Generated stats for {len(champion_stats)} champions") if not champion_stats: log("No champions met the minimum sample size requirement") return 0 # Generate tier list tier_list, tier_calibration = generate_tier_list(champion_stats) log(f"Generated tier list with {len(tier_list)} champions") total_games = int(df['sample_size'].sum()) if 'sample_size' in df.columns else 0 meta_core = { "patch": patch, "champions_count": len(champion_stats), "total_games": total_games, "min_sample_size": MIN_SAMPLE_SIZE, } existing_meta = load_existing_patch_meta(patch) if existing_meta: existing_core = { "patch": str(existing_meta.get("patch", "")), "champions_count": int(existing_meta.get("champions_count", -1) or -1), "total_games": int(existing_meta.get("total_games", -1) or -1), "min_sample_size": int(existing_meta.get("min_sample_size", -1) or -1), } if existing_core == meta_core: log(f"No material changes for patch {patch}; skipping upload") return len(champion_stats) # Save files locally temp_dir = f"/tmp/champ-stats/{patch}" os.makedirs(temp_dir, exist_ok=True) # Save individual champion files operations = [] for champ_id, data in champion_stats.items(): file_path = f"{temp_dir}/{champ_id}.json" with open(file_path, 'w') as f: json.dump(data, f, indent=2) repo_path = f"champ-stats/{patch}/{champ_id}.json" op = build_upload_operation(file_path, repo_path) if op: operations.append(op) # Save tier list tier_list_path = f"{temp_dir}/tier-list.json" with open(tier_list_path, 'w') as f: json.dump({ "patch": patch, "generated_at": datetime.now().isoformat(), "total_champions": len(tier_list), "calibration": tier_calibration, "tiers": tier_list, }, f, indent=2) tier_op = build_upload_operation(tier_list_path, f"champ-stats/{patch}/tier-list.json") if tier_op: operations.append(tier_op) # Save patch metadata meta_path = f"{temp_dir}/meta.json" with open(meta_path, 'w') as f: json.dump({ "patch": patch, "generated_at": datetime.now().isoformat(), "champions_count": len(champion_stats), "total_games": total_games, "min_sample_size": MIN_SAMPLE_SIZE, }, f, indent=2) meta_op = build_upload_operation(meta_path, f"champ-stats/{patch}/meta.json") if meta_op: operations.append(meta_op) # Upload to HF if operations: commit_msg = f"Update champ-stats for patch {patch} - {datetime.now().isoformat()}" success = upload_operations(operations, commit_msg) if success: log(f"Successfully uploaded {len(operations)} files for patch {patch}") return len(champion_stats) return 0 def run_processing_cycle(): """Run a complete processing cycle for latest patches""" global stats, last_processing log("=" * 60) log("STARTING PROCESSING CYCLE") log("=" * 60) list_dataset_files(force_refresh=True) # Get latest patches patches = get_latest_patches(n=3) log(f"Found patches to process: {patches}") total_champions = 0 processed_patches = [] for patch in patches: if not is_running: break try: count = process_patch(patch) if count > 0: total_champions += count processed_patches.append(patch) with state_lock: stats["last_processing_per_patch"][patch] = datetime.now().isoformat() # Small delay between patches time.sleep(2) except Exception as e: log(f"Error processing patch {patch}: {e}") log(traceback.format_exc()) continue cycle_finished_at = datetime.now().isoformat() with state_lock: stats["processings"] += 1 stats["champions_processed"] = total_champions stats["patches_processed"] = processed_patches cycle_history = { "timestamp": cycle_finished_at, "patches": processed_patches, "champions": total_champions, } stats["processing_history"].append(cycle_history) if len(stats["processing_history"]) > MAX_HISTORY: stats["processing_history"] = stats["processing_history"][-MAX_HISTORY:] last_processing = cycle_finished_at log("=" * 60) log(f"PROCESSING CYCLE COMPLETE - {total_champions} champions across {len(processed_patches)} patches") log("=" * 60) def processing_loop(): """Main processing loop - runs every PROCESS_INTERVAL_SECONDS""" log("Processing loop starting...") if not HF_TOKEN: log("ERROR: HF_TOKEN not set!") return # Initial processing try: log("Running initial processing...") run_processing_cycle() except Exception as e: log(f"Initial processing failed: {e}") log(traceback.format_exc()) # Then every configured interval while is_running: log(f"Sleeping {PROCESS_INTERVAL_SECONDS} seconds until next cycle...") for _ in range(PROCESS_INTERVAL_SECONDS): if not is_running: break time.sleep(1) if not is_running: break try: run_processing_cycle() except Exception as e: log(f"Processing cycle failed: {e}") log(traceback.format_exc()) @app.get("/") def root(): with state_lock: history = list(stats.get("processing_history", [])) return { "message": "ArcaThread Processor v1.0 - use /health for status", "recent_history": history[-5:], } @app.get("/health") def health(): with state_lock: return { "status": "healthy", "last_processing": last_processing, "stats": { "processings": stats["processings"], "champions_processed": stats["champions_processed"], "patches_processed": stats["patches_processed"], }, "config": { "process_interval_seconds": PROCESS_INTERVAL_SECONDS, "min_sample_size": MIN_SAMPLE_SIZE, "tier_min_games": TIER_MIN_GAMES, "tier_calibration_mode": TIER_CALIBRATION_MODE, } } @app.get("/trigger") def trigger_processing(): """Manually trigger a processing cycle""" log("Manual processing trigger received") thread = threading.Thread(target=run_processing_cycle, daemon=True) thread.start() return {"status": "processing_triggered"} @app.get("/patch/{patch}") def get_patch_status(patch: str): """Get processing status for a specific patch""" with state_lock: last_proc = stats["last_processing_per_patch"].get(patch) return { "patch": patch, "last_processing": last_proc, "dataset_url": f"https://huggingface.co/datasets/{DATASET_REPO}/tree/main/champ-stats/{patch}" } @app.on_event("startup") def startup(): log("ArcaThread Processor v1.0 starting...") thread = threading.Thread(target=processing_loop, daemon=True, name="Processor") thread.start() if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=7860)