#!/usr/bin/env python3 """ ArcaThread Processor v1.0 - Generates pre-computed champion stats from matchup-matrix data - Runs hourly to update stats for new patches - Creates champ-stats/{patch}/{champion}.json files """ import os import sys import json import time import re import threading import traceback from datetime import datetime from typing import Dict, List, Optional, Any from collections import defaultdict from fastapi import FastAPI import uvicorn import pandas as pd import numpy as np from huggingface_hub import hf_hub_download, CommitOperationAdd, list_repo_files from hf_client import get_hf_api, get_hf_config HF_CFG = get_hf_config() HF_TOKEN = HF_CFG.token DATASET_REPO = HF_CFG.dataset_repo PROCESS_INTERVAL_SECONDS = max(60, int(os.environ.get("PROCESS_INTERVAL_SECONDS", "3600"))) MIN_SAMPLE_SIZE = int(os.environ.get("MIN_SAMPLE_SIZE", "100")) RANKS = [ "IRON", "BRONZE", "SILVER", "GOLD", "PLATINUM", "EMERALD", "DIAMOND", "MASTER", "GRANDMASTER", "CHALLENGER" ] ROLE_MAPPING = {'TOP': 0, 'JUNGLE': 1, 'MIDDLE': 2, 'BOTTOM': 3, 'SUPPORT': 4, 'UNKNOWN': 5} # Global state is_running = True last_processing = None commit_cooldown_until = 0.0 stats = { "processings": 0, "champions_processed": 0, "patches_processed": [], "last_processing_per_patch": {}, "processing_history": [] } state_lock = threading.Lock() app = FastAPI(title="ArcaThread Processor v1.0") MAX_HISTORY = 20 def log(msg: str): """Thread-safe logging""" timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') print(f"[{timestamp}] {msg}", flush=True) def _normalize_patch_token(value: str) -> Optional[str]: """Extract major.minor from patch string""" text = str(value or "").strip() match = re.match(r"^(\d+)\.(\d+)", text) if not match: return None return f"{match.group(1)}.{match.group(2)}" def _extract_champion_name(champion_id: int) -> str: """Convert champion ID to name (placeholder - will use ID as key)""" return str(champion_id) def load_matchup_data_for_patch(patch: str) -> pd.DataFrame: """Load all matchup data for a specific patch across all ranks""" log(f"Loading matchup data for patch {patch}...") try: api = get_hf_api() all_files = list_repo_files(DATASET_REPO, repo_type="dataset", token=HF_TOKEN) # Filter for this patch's matchup files patch_files = [ f for f in all_files if f.startswith(f"matchup-matrix/") and f"/{patch}/" in f and f.endswith('.parquet') ] log(f"Found {len(patch_files)} matchup files for patch {patch}") if not patch_files: return pd.DataFrame() # Download and combine all files all_data = [] for file_path in patch_files: try: local_path = hf_hub_download( repo_id=DATASET_REPO, filename=file_path, repo_type="dataset", token=HF_TOKEN, local_dir="/tmp", ) df = pd.read_parquet(local_path) all_data.append(df) log(f" Loaded {file_path}: {len(df)} rows") except Exception as e: log(f" Failed to load {file_path}: {e}") continue if not all_data: return pd.DataFrame() combined = pd.concat(all_data, ignore_index=True) log(f"Combined patch {patch} data: {len(combined)} total rows") return combined except Exception as e: log(f"Error loading data for patch {patch}: {e}") log(traceback.format_exc()) return pd.DataFrame() def get_latest_patches(n: int = 3) -> List[str]: """Get the n latest patches from the dataset""" try: api = get_hf_api() all_files = list_repo_files(DATASET_REPO, repo_type="dataset", token=HF_TOKEN) patches = set() for f in all_files: if not f.startswith("matchup-matrix/"): continue parts = f.split("/") if len(parts) >= 3: patch = _normalize_patch_token(parts[2]) if patch: patches.add(patch) # Sort by version number (newest first) sorted_patches = sorted(patches, key=lambda p: [int(x) for x in p.split(".")], reverse=True) return sorted_patches[:n] except Exception as e: log(f"Error getting latest patches: {e}") return [] def compute_champion_stats(df: pd.DataFrame) -> Dict[str, Dict[str, Any]]: """Compute aggregated stats per champion from matchup data""" if df.empty: return {} champion_stats = defaultdict(lambda: { "champion_id": 0, "total_games": 0, "wins": 0, "by_role": defaultdict(lambda: {"games": 0, "wins": 0}), "by_rank": defaultdict(lambda: {"games": 0, "wins": 0}), "matchups": defaultdict(lambda: {"games": 0, "wins": 0}), }) for _, row in df.iterrows(): champ_id = int(row.get('champion_id', 0)) enemy_id = int(row.get('enemy_champion_id', 0)) wins = float(row.get('wins', 0)) sample_size = int(row.get('sample_size', 0)) role = str(row.get('role', 'UNKNOWN')).upper() rank = str(row.get('rank', 'UNKNOWN')).upper() if champ_id <= 0 or sample_size <= 0: continue stats_entry = champion_stats[champ_id] stats_entry["champion_id"] = champ_id stats_entry["total_games"] += sample_size stats_entry["wins"] += wins # By role stats_entry["by_role"][role]["games"] += sample_size stats_entry["by_role"][role]["wins"] += wins # By rank stats_entry["by_rank"][rank]["games"] += sample_size stats_entry["by_rank"][rank]["wins"] += wins # Matchups if enemy_id > 0: matchup_key = str(enemy_id) stats_entry["matchups"][matchup_key]["games"] += sample_size stats_entry["matchups"][matchup_key]["wins"] += wins # Convert to final format with win rates result = {} for champ_id, data in champion_stats.items(): total_games = data["total_games"] if total_games < MIN_SAMPLE_SIZE: continue win_rate = data["wins"] / total_games if total_games > 0 else 0.5 # Process by_role by_role = {} for role, role_data in data["by_role"].items(): if role_data["games"] >= MIN_SAMPLE_SIZE // 2: by_role[role] = { "games": role_data["games"], "win_rate": round(role_data["wins"] / role_data["games"], 4) } # Process by_rank by_rank = {} for rank, rank_data in data["by_rank"].items(): if rank_data["games"] >= MIN_SAMPLE_SIZE // 2: by_rank[rank] = { "games": rank_data["games"], "win_rate": round(rank_data["wins"] / rank_data["games"], 4) } # Process matchups (top 10 most played) matchups = [] for enemy_id, matchup_data in data["matchups"].items(): if matchup_data["games"] >= MIN_SAMPLE_SIZE // 5: matchups.append({ "enemy_champion_id": int(enemy_id), "games": matchup_data["games"], "win_rate": round(matchup_data["wins"] / matchup_data["games"], 4) }) # Sort matchups by games played and take top 20 matchups.sort(key=lambda x: x["games"], reverse=True) matchups = matchups[:20] result[str(champ_id)] = { "champion_id": champ_id, "total_games": total_games, "win_rate": round(win_rate, 4), "by_role": by_role, "by_rank": by_rank, "matchups": matchups, } return result def generate_tier_list(stats_by_champion: Dict[str, Dict], min_games: int = 500) -> List[Dict]: """Generate tier list from champion stats""" tiers = [] for champ_id, data in stats_by_champion.items(): if data["total_games"] < min_games: continue win_rate = data["win_rate"] # Determine tier based on win rate if win_rate >= 0.54: tier = "S" elif win_rate >= 0.52: tier = "A" elif win_rate >= 0.50: tier = "B" elif win_rate >= 0.48: tier = "C" else: tier = "D" tiers.append({ "champion_id": data["champion_id"], "tier": tier, "win_rate": win_rate, "games": data["total_games"], }) # Sort by win rate descending tiers.sort(key=lambda x: x["win_rate"], reverse=True) return tiers def build_upload_operation(local_path: str, repo_path: str) -> Optional[CommitOperationAdd]: """Validate and build a single upload operation""" if not os.path.exists(local_path): log(f"File not found: {local_path}") return None size = os.path.getsize(local_path) if size == 0: log(f"File is empty: {local_path}") return None return CommitOperationAdd(path_in_repo=repo_path, path_or_fileobj=local_path) def upload_operations(operations: List[CommitOperationAdd], commit_message: str) -> bool: """Upload files to HF dataset""" global commit_cooldown_until if not operations: return True now = time.time() if now < commit_cooldown_until: remaining = int(commit_cooldown_until - now) log(f"Skipping upload (commit cooldown active for {remaining}s)") return False try: api = get_hf_api() api.create_commit( repo_id=DATASET_REPO, repo_type="dataset", operations=operations, commit_message=commit_message, ) log(f"Uploaded {len(operations)} files") return True except Exception as e: err_text = str(e) if "429" in err_text or "Too Many Requests" in err_text: commit_cooldown_until = time.time() + 3600 log(f"Upload rate-limited. Pausing for 1 hour") log(f"Upload failed: {e}") return False def process_patch(patch: str) -> int: """Process a single patch and generate champion stats""" log(f"=" * 60) log(f"Processing patch: {patch}") log(f"=" * 60) # Load matchup data df = load_matchup_data_for_patch(patch) if df.empty: log(f"No data found for patch {patch}") return 0 log(f"Computing champion stats from {len(df)} rows...") champion_stats = compute_champion_stats(df) log(f"Generated stats for {len(champion_stats)} champions") if not champion_stats: log("No champions met the minimum sample size requirement") return 0 # Generate tier list tier_list = generate_tier_list(champion_stats) log(f"Generated tier list with {len(tier_list)} champions") # Save files locally temp_dir = f"/tmp/champ-stats/{patch}" os.makedirs(temp_dir, exist_ok=True) # Save individual champion files operations = [] for champ_id, data in champion_stats.items(): file_path = f"{temp_dir}/{champ_id}.json" with open(file_path, 'w') as f: json.dump(data, f, indent=2) repo_path = f"champ-stats/{patch}/{champ_id}.json" op = build_upload_operation(file_path, repo_path) if op: operations.append(op) # Save tier list tier_list_path = f"{temp_dir}/tier-list.json" with open(tier_list_path, 'w') as f: json.dump({ "patch": patch, "generated_at": datetime.now().isoformat(), "total_champions": len(tier_list), "tiers": tier_list, }, f, indent=2) tier_op = build_upload_operation(tier_list_path, f"champ-stats/{patch}/tier-list.json") if tier_op: operations.append(tier_op) # Save patch metadata meta_path = f"{temp_dir}/meta.json" with open(meta_path, 'w') as f: json.dump({ "patch": patch, "generated_at": datetime.now().isoformat(), "champions_count": len(champion_stats), "total_games": int(df['sample_size'].sum()) if 'sample_size' in df.columns else 0, "min_sample_size": MIN_SAMPLE_SIZE, }, f, indent=2) meta_op = build_upload_operation(meta_path, f"champ-stats/{patch}/meta.json") if meta_op: operations.append(meta_op) # Upload to HF if operations: commit_msg = f"Update champ-stats for patch {patch} - {datetime.now().isoformat()}" success = upload_operations(operations, commit_msg) if success: log(f"Successfully uploaded {len(operations)} files for patch {patch}") return len(champion_stats) return 0 def run_processing_cycle(): """Run a complete processing cycle for latest patches""" global stats, last_processing log("=" * 60) log("STARTING PROCESSING CYCLE") log("=" * 60) # Get latest patches patches = get_latest_patches(n=3) log(f"Found patches to process: {patches}") total_champions = 0 processed_patches = [] for patch in patches: if not is_running: break try: count = process_patch(patch) if count > 0: total_champions += count processed_patches.append(patch) with state_lock: stats["last_processing_per_patch"][patch] = datetime.now().isoformat() # Small delay between patches time.sleep(2) except Exception as e: log(f"Error processing patch {patch}: {e}") log(traceback.format_exc()) continue cycle_finished_at = datetime.now().isoformat() with state_lock: stats["processings"] += 1 stats["champions_processed"] = total_champions stats["patches_processed"] = processed_patches cycle_history = { "timestamp": cycle_finished_at, "patches": processed_patches, "champions": total_champions, } stats["processing_history"].append(cycle_history) if len(stats["processing_history"]) > MAX_HISTORY: stats["processing_history"] = stats["processing_history"][-MAX_HISTORY:] last_processing = cycle_finished_at log("=" * 60) log(f"PROCESSING CYCLE COMPLETE - {total_champions} champions across {len(processed_patches)} patches") log("=" * 60) def processing_loop(): """Main processing loop - runs every PROCESS_INTERVAL_SECONDS""" log("Processing loop starting...") if not HF_TOKEN: log("ERROR: HF_TOKEN not set!") return # Initial processing try: log("Running initial processing...") run_processing_cycle() except Exception as e: log(f"Initial processing failed: {e}") log(traceback.format_exc()) # Then every configured interval while is_running: log(f"Sleeping {PROCESS_INTERVAL_SECONDS} seconds until next cycle...") for _ in range(PROCESS_INTERVAL_SECONDS): if not is_running: break time.sleep(1) if not is_running: break try: run_processing_cycle() except Exception as e: log(f"Processing cycle failed: {e}") log(traceback.format_exc()) @app.get("/") def root(): with state_lock: history = list(stats.get("processing_history", [])) return { "message": "ArcaThread Processor v1.0 - use /health for status", "recent_history": history[-5:], } @app.get("/health") def health(): with state_lock: return { "status": "healthy", "last_processing": last_processing, "stats": { "processings": stats["processings"], "champions_processed": stats["champions_processed"], "patches_processed": stats["patches_processed"], }, "config": { "process_interval_seconds": PROCESS_INTERVAL_SECONDS, "min_sample_size": MIN_SAMPLE_SIZE, } } @app.get("/trigger") def trigger_processing(): """Manually trigger a processing cycle""" log("Manual processing trigger received") thread = threading.Thread(target=run_processing_cycle, daemon=True) thread.start() return {"status": "processing_triggered"} @app.get("/patch/{patch}") def get_patch_status(patch: str): """Get processing status for a specific patch""" with state_lock: last_proc = stats["last_processing_per_patch"].get(patch) return { "patch": patch, "last_processing": last_proc, "dataset_url": f"https://huggingface.co/datasets/{DATASET_REPO}/tree/main/champ-stats/{patch}" } @app.on_event("startup") def startup(): log("ArcaThread Processor v1.0 starting...") thread = threading.Thread(target=processing_loop, daemon=True, name="Processor") thread.start() if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=7860)