Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| ArcaThread Processor v1.0 | |
| - Generates pre-computed champion stats from matchup-matrix data | |
| - Runs hourly to update stats for new patches | |
| - Creates champ-stats/{patch}/{champion}.json files | |
| """ | |
| import os | |
| import json | |
| import time | |
| import re | |
| import threading | |
| import traceback | |
| from datetime import datetime | |
| from typing import Dict, List, Optional, Any | |
| from collections import defaultdict | |
| from fastapi import FastAPI | |
| import uvicorn | |
| import pandas as pd | |
| import numpy as np | |
| from huggingface_hub import hf_hub_download, CommitOperationAdd, list_repo_files | |
| from hf_client import get_hf_api, get_hf_config | |
| HF_CFG = get_hf_config() | |
| HF_TOKEN = HF_CFG.token | |
| DATASET_REPO = HF_CFG.dataset_repo | |
| PROCESS_INTERVAL_SECONDS = max(60, int(os.environ.get("PROCESS_INTERVAL_SECONDS", "3600"))) | |
| MIN_SAMPLE_SIZE = int(os.environ.get("MIN_SAMPLE_SIZE", "100")) | |
| DATASET_FILE_CACHE_SECONDS = max(30, int(os.environ.get("DATASET_FILE_CACHE_SECONDS", "300"))) | |
| TIER_MIN_GAMES = max(1, int(os.environ.get("TIER_MIN_GAMES", "500"))) | |
| TIER_CALIBRATION_MODE = str(os.environ.get("TIER_CALIBRATION_MODE", "quantile")).strip().lower() | |
| TIER_STATIC_THRESHOLDS = ( | |
| float(os.environ.get("TIER_STATIC_S_MIN_WR", "0.54")), | |
| float(os.environ.get("TIER_STATIC_A_MIN_WR", "0.52")), | |
| float(os.environ.get("TIER_STATIC_B_MIN_WR", "0.50")), | |
| float(os.environ.get("TIER_STATIC_C_MIN_WR", "0.48")), | |
| ) | |
| RANKS = [ | |
| "IRON", "BRONZE", "SILVER", "GOLD", "PLATINUM", | |
| "EMERALD", "DIAMOND", "MASTER", "GRANDMASTER", "CHALLENGER" | |
| ] | |
| # Global state | |
| is_running = True | |
| last_processing = None | |
| commit_cooldown_until = 0.0 | |
| stats = { | |
| "processings": 0, | |
| "champions_processed": 0, | |
| "patches_processed": [], | |
| "last_processing_per_patch": {}, | |
| "processing_history": [] | |
| } | |
| state_lock = threading.Lock() | |
| dataset_file_cache_lock = threading.Lock() | |
| dataset_file_cache = { | |
| "timestamp": 0.0, | |
| "files": [], | |
| } | |
| app = FastAPI(title="ArcaThread Processor v1.0") | |
| MAX_HISTORY = 20 | |
| def log(msg: str): | |
| """Thread-safe logging""" | |
| timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
| print(f"[{timestamp}] {msg}", flush=True) | |
| def _normalize_patch_token(value: str) -> Optional[str]: | |
| """Extract major.minor from patch string""" | |
| text = str(value or "").strip() | |
| match = re.match(r"^(\d+)\.(\d+)", text) | |
| if not match: | |
| return None | |
| return f"{match.group(1)}.{match.group(2)}" | |
| def list_dataset_files(force_refresh: bool = False) -> List[str]: | |
| """List dataset files with a short-lived cache.""" | |
| now = time.time() | |
| with dataset_file_cache_lock: | |
| cached_files = dataset_file_cache.get("files", []) | |
| cached_at = float(dataset_file_cache.get("timestamp", 0.0) or 0.0) | |
| if ( | |
| not force_refresh | |
| and cached_files | |
| and (now - cached_at) < DATASET_FILE_CACHE_SECONDS | |
| ): | |
| return list(cached_files) | |
| files = list_repo_files(DATASET_REPO, repo_type="dataset", token=HF_TOKEN) | |
| with dataset_file_cache_lock: | |
| dataset_file_cache["files"] = list(files) | |
| dataset_file_cache["timestamp"] = now | |
| return files | |
| def load_existing_patch_meta(patch: str) -> Optional[Dict[str, Any]]: | |
| """Load existing meta for a patch if present.""" | |
| meta_path = f"champ-stats/{patch}/meta.json" | |
| try: | |
| local_path = hf_hub_download( | |
| repo_id=DATASET_REPO, | |
| filename=meta_path, | |
| repo_type="dataset", | |
| token=HF_TOKEN, | |
| local_dir="/tmp", | |
| ) | |
| with open(local_path, "r", encoding="utf-8") as handle: | |
| payload = json.load(handle) | |
| if isinstance(payload, dict): | |
| return payload | |
| except Exception: | |
| return None | |
| return None | |
| def load_matchup_data_for_patch(patch: str) -> pd.DataFrame: | |
| """Load all matchup data for a specific patch across all ranks""" | |
| log(f"Loading matchup data for patch {patch}...") | |
| try: | |
| all_files = list_dataset_files() | |
| # Filter for this patch's matchup files | |
| patch_files = [ | |
| f for f in all_files | |
| if f.startswith(f"matchup-matrix/") | |
| and f"/{patch}/" in f | |
| and f.endswith('.parquet') | |
| ] | |
| log(f"Found {len(patch_files)} matchup files for patch {patch}") | |
| if not patch_files: | |
| return pd.DataFrame() | |
| # Download and combine all files | |
| all_data = [] | |
| for file_path in patch_files: | |
| try: | |
| local_path = hf_hub_download( | |
| repo_id=DATASET_REPO, | |
| filename=file_path, | |
| repo_type="dataset", | |
| token=HF_TOKEN, | |
| local_dir="/tmp", | |
| ) | |
| df = pd.read_parquet(local_path) | |
| all_data.append(df) | |
| log(f" Loaded {file_path}: {len(df)} rows") | |
| except Exception as e: | |
| log(f" Failed to load {file_path}: {e}") | |
| continue | |
| if not all_data: | |
| return pd.DataFrame() | |
| combined = pd.concat(all_data, ignore_index=True) | |
| log(f"Combined patch {patch} data: {len(combined)} total rows") | |
| return combined | |
| except Exception as e: | |
| log(f"Error loading data for patch {patch}: {e}") | |
| log(traceback.format_exc()) | |
| return pd.DataFrame() | |
| def get_latest_patches(n: int = 3) -> List[str]: | |
| """Get the n latest patches from the dataset""" | |
| try: | |
| all_files = list_dataset_files() | |
| patches = set() | |
| for f in all_files: | |
| if not f.startswith("matchup-matrix/"): | |
| continue | |
| parts = f.split("/") | |
| if len(parts) >= 3: | |
| patch = _normalize_patch_token(parts[2]) | |
| if patch: | |
| patches.add(patch) | |
| # Sort by version number (newest first) | |
| sorted_patches = sorted(patches, key=lambda p: [int(x) for x in p.split(".")], reverse=True) | |
| return sorted_patches[:n] | |
| except Exception as e: | |
| log(f"Error getting latest patches: {e}") | |
| return [] | |
| def compute_champion_stats(df: pd.DataFrame) -> Dict[str, Dict[str, Any]]: | |
| """Compute aggregated stats per champion from matchup data""" | |
| if df.empty: | |
| return {} | |
| champion_stats = defaultdict(lambda: { | |
| "champion_id": 0, | |
| "total_games": 0, | |
| "wins": 0, | |
| "by_role": defaultdict(lambda: {"games": 0, "wins": 0}), | |
| "by_rank": defaultdict(lambda: {"games": 0, "wins": 0}), | |
| "matchups": defaultdict(lambda: {"games": 0, "wins": 0}), | |
| }) | |
| for _, row in df.iterrows(): | |
| champ_id = int(row.get('champion_id', 0)) | |
| enemy_id = int(row.get('enemy_champion_id', 0)) | |
| wins = float(row.get('wins', 0)) | |
| sample_size = int(row.get('sample_size', 0)) | |
| role = str(row.get('role', 'UNKNOWN')).upper() | |
| rank = str(row.get('rank', 'UNKNOWN')).upper() | |
| if champ_id <= 0 or sample_size <= 0: | |
| continue | |
| stats_entry = champion_stats[champ_id] | |
| stats_entry["champion_id"] = champ_id | |
| stats_entry["total_games"] += sample_size | |
| stats_entry["wins"] += wins | |
| # By role | |
| stats_entry["by_role"][role]["games"] += sample_size | |
| stats_entry["by_role"][role]["wins"] += wins | |
| # By rank | |
| stats_entry["by_rank"][rank]["games"] += sample_size | |
| stats_entry["by_rank"][rank]["wins"] += wins | |
| # Matchups | |
| if enemy_id > 0: | |
| matchup_key = str(enemy_id) | |
| stats_entry["matchups"][matchup_key]["games"] += sample_size | |
| stats_entry["matchups"][matchup_key]["wins"] += wins | |
| # Convert to final format with win rates | |
| result = {} | |
| for champ_id, data in champion_stats.items(): | |
| total_games = data["total_games"] | |
| if total_games < MIN_SAMPLE_SIZE: | |
| continue | |
| win_rate = data["wins"] / total_games if total_games > 0 else 0.5 | |
| # Process by_role | |
| by_role = {} | |
| for role, role_data in data["by_role"].items(): | |
| if role_data["games"] >= MIN_SAMPLE_SIZE // 2: | |
| by_role[role] = { | |
| "games": role_data["games"], | |
| "win_rate": round(role_data["wins"] / role_data["games"], 4) | |
| } | |
| # Process by_rank | |
| by_rank = {} | |
| for rank, rank_data in data["by_rank"].items(): | |
| if rank_data["games"] >= MIN_SAMPLE_SIZE // 2: | |
| by_rank[rank] = { | |
| "games": rank_data["games"], | |
| "win_rate": round(rank_data["wins"] / rank_data["games"], 4) | |
| } | |
| # Process matchups (top 10 most played) | |
| matchups = [] | |
| for enemy_id, matchup_data in data["matchups"].items(): | |
| if matchup_data["games"] >= MIN_SAMPLE_SIZE // 5: | |
| matchups.append({ | |
| "enemy_champion_id": int(enemy_id), | |
| "games": matchup_data["games"], | |
| "win_rate": round(matchup_data["wins"] / matchup_data["games"], 4) | |
| }) | |
| # Sort matchups by games played and take top 20 | |
| matchups.sort(key=lambda x: x["games"], reverse=True) | |
| matchups = matchups[:20] | |
| result[str(champ_id)] = { | |
| "champion_id": champ_id, | |
| "total_games": total_games, | |
| "win_rate": round(win_rate, 4), | |
| "by_role": by_role, | |
| "by_rank": by_rank, | |
| "matchups": matchups, | |
| } | |
| return result | |
| def _resolve_tier_thresholds(win_rates: List[float]) -> tuple: | |
| """ | |
| Resolve tier thresholds. | |
| - quantile mode: patch-adaptive cutoffs from current win-rate distribution. | |
| - static mode: fixed win-rate cutoffs. | |
| """ | |
| if TIER_CALIBRATION_MODE == "quantile" and len(win_rates) >= 10: | |
| quantiles = np.quantile(np.asarray(win_rates, dtype=np.float32), [0.8, 0.6, 0.4, 0.2]) | |
| s_min, a_min, b_min, c_min = [float(v) for v in quantiles] | |
| return s_min, a_min, b_min, c_min, "quantile" | |
| s_min, a_min, b_min, c_min = TIER_STATIC_THRESHOLDS | |
| return float(s_min), float(a_min), float(b_min), float(c_min), "static" | |
| def _assign_tier(win_rate: float, thresholds: tuple) -> str: | |
| s_min, a_min, b_min, c_min = thresholds | |
| if win_rate >= s_min: | |
| return "S" | |
| if win_rate >= a_min: | |
| return "A" | |
| if win_rate >= b_min: | |
| return "B" | |
| if win_rate >= c_min: | |
| return "C" | |
| return "D" | |
| def generate_tier_list( | |
| stats_by_champion: Dict[str, Dict], | |
| min_games: Optional[int] = None | |
| ) -> tuple[List[Dict], Dict[str, Any]]: | |
| """Generate tier list from champion stats with explicit calibration metadata.""" | |
| minimum_games = max(1, int(min_games if min_games is not None else TIER_MIN_GAMES)) | |
| candidates = [ | |
| data for data in stats_by_champion.values() | |
| if int(data.get("total_games", 0) or 0) >= minimum_games | |
| ] | |
| if not candidates: | |
| calibration = { | |
| "mode": "none", | |
| "min_games": minimum_games, | |
| "thresholds": {"S": None, "A": None, "B": None, "C": None}, | |
| "eligible_champions": 0, | |
| } | |
| return [], calibration | |
| win_rates = [float(data.get("win_rate", 0.5) or 0.5) for data in candidates] | |
| s_min, a_min, b_min, c_min, used_mode = _resolve_tier_thresholds(win_rates) | |
| thresholds = (s_min, a_min, b_min, c_min) | |
| tiers = [] | |
| for data in candidates: | |
| win_rate = float(data.get("win_rate", 0.5) or 0.5) | |
| tier = _assign_tier(win_rate, thresholds) | |
| tiers.append({ | |
| "champion_id": int(data.get("champion_id", 0) or 0), | |
| "tier": tier, | |
| "win_rate": win_rate, | |
| "games": int(data.get("total_games", 0) or 0), | |
| }) | |
| tiers.sort(key=lambda x: x["win_rate"], reverse=True) | |
| calibration = { | |
| "mode": used_mode, | |
| "min_games": minimum_games, | |
| "thresholds": { | |
| "S": round(s_min, 4), | |
| "A": round(a_min, 4), | |
| "B": round(b_min, 4), | |
| "C": round(c_min, 4), | |
| }, | |
| "eligible_champions": len(candidates), | |
| } | |
| return tiers, calibration | |
| def build_upload_operation(local_path: str, repo_path: str) -> Optional[CommitOperationAdd]: | |
| """Validate and build a single upload operation""" | |
| if not os.path.exists(local_path): | |
| log(f"File not found: {local_path}") | |
| return None | |
| size = os.path.getsize(local_path) | |
| if size == 0: | |
| log(f"File is empty: {local_path}") | |
| return None | |
| return CommitOperationAdd(path_in_repo=repo_path, path_or_fileobj=local_path) | |
| def upload_operations(operations: List[CommitOperationAdd], commit_message: str) -> bool: | |
| """Upload files to HF dataset""" | |
| global commit_cooldown_until | |
| if not operations: | |
| return True | |
| now = time.time() | |
| if now < commit_cooldown_until: | |
| remaining = int(commit_cooldown_until - now) | |
| log(f"Skipping upload (commit cooldown active for {remaining}s)") | |
| return False | |
| try: | |
| api = get_hf_api() | |
| api.create_commit( | |
| repo_id=DATASET_REPO, | |
| repo_type="dataset", | |
| operations=operations, | |
| commit_message=commit_message, | |
| ) | |
| log(f"Uploaded {len(operations)} files") | |
| return True | |
| except Exception as e: | |
| err_text = str(e) | |
| if "429" in err_text or "Too Many Requests" in err_text: | |
| commit_cooldown_until = time.time() + 3600 | |
| log(f"Upload rate-limited. Pausing for 1 hour") | |
| log(f"Upload failed: {e}") | |
| return False | |
| def process_patch(patch: str) -> int: | |
| """Process a single patch and generate champion stats""" | |
| log(f"=" * 60) | |
| log(f"Processing patch: {patch}") | |
| log(f"=" * 60) | |
| # Load matchup data | |
| df = load_matchup_data_for_patch(patch) | |
| if df.empty: | |
| log(f"No data found for patch {patch}") | |
| return 0 | |
| log(f"Computing champion stats from {len(df)} rows...") | |
| champion_stats = compute_champion_stats(df) | |
| log(f"Generated stats for {len(champion_stats)} champions") | |
| if not champion_stats: | |
| log("No champions met the minimum sample size requirement") | |
| return 0 | |
| # Generate tier list | |
| tier_list, tier_calibration = generate_tier_list(champion_stats) | |
| log(f"Generated tier list with {len(tier_list)} champions") | |
| total_games = int(df['sample_size'].sum()) if 'sample_size' in df.columns else 0 | |
| meta_core = { | |
| "patch": patch, | |
| "champions_count": len(champion_stats), | |
| "total_games": total_games, | |
| "min_sample_size": MIN_SAMPLE_SIZE, | |
| } | |
| existing_meta = load_existing_patch_meta(patch) | |
| if existing_meta: | |
| existing_core = { | |
| "patch": str(existing_meta.get("patch", "")), | |
| "champions_count": int(existing_meta.get("champions_count", -1) or -1), | |
| "total_games": int(existing_meta.get("total_games", -1) or -1), | |
| "min_sample_size": int(existing_meta.get("min_sample_size", -1) or -1), | |
| } | |
| if existing_core == meta_core: | |
| log(f"No material changes for patch {patch}; skipping upload") | |
| return len(champion_stats) | |
| # Save files locally | |
| temp_dir = f"/tmp/champ-stats/{patch}" | |
| os.makedirs(temp_dir, exist_ok=True) | |
| # Save individual champion files | |
| operations = [] | |
| for champ_id, data in champion_stats.items(): | |
| file_path = f"{temp_dir}/{champ_id}.json" | |
| with open(file_path, 'w') as f: | |
| json.dump(data, f, indent=2) | |
| repo_path = f"champ-stats/{patch}/{champ_id}.json" | |
| op = build_upload_operation(file_path, repo_path) | |
| if op: | |
| operations.append(op) | |
| # Save tier list | |
| tier_list_path = f"{temp_dir}/tier-list.json" | |
| with open(tier_list_path, 'w') as f: | |
| json.dump({ | |
| "patch": patch, | |
| "generated_at": datetime.now().isoformat(), | |
| "total_champions": len(tier_list), | |
| "calibration": tier_calibration, | |
| "tiers": tier_list, | |
| }, f, indent=2) | |
| tier_op = build_upload_operation(tier_list_path, f"champ-stats/{patch}/tier-list.json") | |
| if tier_op: | |
| operations.append(tier_op) | |
| # Save patch metadata | |
| meta_path = f"{temp_dir}/meta.json" | |
| with open(meta_path, 'w') as f: | |
| json.dump({ | |
| "patch": patch, | |
| "generated_at": datetime.now().isoformat(), | |
| "champions_count": len(champion_stats), | |
| "total_games": total_games, | |
| "min_sample_size": MIN_SAMPLE_SIZE, | |
| }, f, indent=2) | |
| meta_op = build_upload_operation(meta_path, f"champ-stats/{patch}/meta.json") | |
| if meta_op: | |
| operations.append(meta_op) | |
| # Upload to HF | |
| if operations: | |
| commit_msg = f"Update champ-stats for patch {patch} - {datetime.now().isoformat()}" | |
| success = upload_operations(operations, commit_msg) | |
| if success: | |
| log(f"Successfully uploaded {len(operations)} files for patch {patch}") | |
| return len(champion_stats) | |
| return 0 | |
| def run_processing_cycle(): | |
| """Run a complete processing cycle for latest patches""" | |
| global stats, last_processing | |
| log("=" * 60) | |
| log("STARTING PROCESSING CYCLE") | |
| log("=" * 60) | |
| list_dataset_files(force_refresh=True) | |
| # Get latest patches | |
| patches = get_latest_patches(n=3) | |
| log(f"Found patches to process: {patches}") | |
| total_champions = 0 | |
| processed_patches = [] | |
| for patch in patches: | |
| if not is_running: | |
| break | |
| try: | |
| count = process_patch(patch) | |
| if count > 0: | |
| total_champions += count | |
| processed_patches.append(patch) | |
| with state_lock: | |
| stats["last_processing_per_patch"][patch] = datetime.now().isoformat() | |
| # Small delay between patches | |
| time.sleep(2) | |
| except Exception as e: | |
| log(f"Error processing patch {patch}: {e}") | |
| log(traceback.format_exc()) | |
| continue | |
| cycle_finished_at = datetime.now().isoformat() | |
| with state_lock: | |
| stats["processings"] += 1 | |
| stats["champions_processed"] = total_champions | |
| stats["patches_processed"] = processed_patches | |
| cycle_history = { | |
| "timestamp": cycle_finished_at, | |
| "patches": processed_patches, | |
| "champions": total_champions, | |
| } | |
| stats["processing_history"].append(cycle_history) | |
| if len(stats["processing_history"]) > MAX_HISTORY: | |
| stats["processing_history"] = stats["processing_history"][-MAX_HISTORY:] | |
| last_processing = cycle_finished_at | |
| log("=" * 60) | |
| log(f"PROCESSING CYCLE COMPLETE - {total_champions} champions across {len(processed_patches)} patches") | |
| log("=" * 60) | |
| def processing_loop(): | |
| """Main processing loop - runs every PROCESS_INTERVAL_SECONDS""" | |
| log("Processing loop starting...") | |
| if not HF_TOKEN: | |
| log("ERROR: HF_TOKEN not set!") | |
| return | |
| # Initial processing | |
| try: | |
| log("Running initial processing...") | |
| run_processing_cycle() | |
| except Exception as e: | |
| log(f"Initial processing failed: {e}") | |
| log(traceback.format_exc()) | |
| # Then every configured interval | |
| while is_running: | |
| log(f"Sleeping {PROCESS_INTERVAL_SECONDS} seconds until next cycle...") | |
| for _ in range(PROCESS_INTERVAL_SECONDS): | |
| if not is_running: | |
| break | |
| time.sleep(1) | |
| if not is_running: | |
| break | |
| try: | |
| run_processing_cycle() | |
| except Exception as e: | |
| log(f"Processing cycle failed: {e}") | |
| log(traceback.format_exc()) | |
| def root(): | |
| with state_lock: | |
| history = list(stats.get("processing_history", [])) | |
| return { | |
| "message": "ArcaThread Processor v1.0 - use /health for status", | |
| "recent_history": history[-5:], | |
| } | |
| def health(): | |
| with state_lock: | |
| return { | |
| "status": "healthy", | |
| "last_processing": last_processing, | |
| "stats": { | |
| "processings": stats["processings"], | |
| "champions_processed": stats["champions_processed"], | |
| "patches_processed": stats["patches_processed"], | |
| }, | |
| "config": { | |
| "process_interval_seconds": PROCESS_INTERVAL_SECONDS, | |
| "min_sample_size": MIN_SAMPLE_SIZE, | |
| "tier_min_games": TIER_MIN_GAMES, | |
| "tier_calibration_mode": TIER_CALIBRATION_MODE, | |
| } | |
| } | |
| def trigger_processing(): | |
| """Manually trigger a processing cycle""" | |
| log("Manual processing trigger received") | |
| thread = threading.Thread(target=run_processing_cycle, daemon=True) | |
| thread.start() | |
| return {"status": "processing_triggered"} | |
| def get_patch_status(patch: str): | |
| """Get processing status for a specific patch""" | |
| with state_lock: | |
| last_proc = stats["last_processing_per_patch"].get(patch) | |
| return { | |
| "patch": patch, | |
| "last_processing": last_proc, | |
| "dataset_url": f"https://huggingface.co/datasets/{DATASET_REPO}/tree/main/champ-stats/{patch}" | |
| } | |
| def startup(): | |
| log("ArcaThread Processor v1.0 starting...") | |
| thread = threading.Thread(target=processing_loop, daemon=True, name="Processor") | |
| thread.start() | |
| if __name__ == "__main__": | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |