from fastapi import FastAPI from fastapi.responses import HTMLResponse, JSONResponse import httpx import asyncio import time from datetime import datetime from typing import Dict, List from contextlib import asynccontextmanager # Configuration PING_INTERVAL = 600 HEALTH_CHECK_INTERVAL = 1800 # 30 minute # List of other pinger Spaces pinger_spaces = [ "https://rajhuggingface4253-ping.hf.space", ] # Regular servers to ping (HTTP GET) regular_servers = [ "https://rajhuggingface4253-backend-compressorpro.hf.space", "https://rajhuggingface4253-backend-compressorpro2.hf.space", "https://rajhuggingface4253-compressor3pro.hf.space", "https://rajhuggingface4253-bgr.hf.space", "https://rajhuggingface4253-real.hf.space", "https://rajhuggingface4253-cmy.hf.space", "https://rajhuggingface4253-waif.hf.space", "https://rajhuggingface4253-naf.hf.space", "https://rajhuggingface4253-grammar.hf.space" ] # Models to warm with proper configuration models_to_warm = [ { "name": "gemma", "url": "https://rajhuggingface4253-gemma-checking.hf.space", "endpoint": "/fix", "type": "streaming_chat", "timeout": 45.0, "payload": { "text": "warmup", } }, { "name": "NLLB Translator", "url": "https://rajhuggingface4253-translate.hf.space", "endpoint": "/translate", "type": "streaming_chat", "timeout": 45.0, "payload": { "text": "warmup", "src_lang": "eng_Latn", "tgt_lang": "hin_Deva" } }, { "name": "Qwen 1", "url": "https://rajhuggingface4253-qwen.hf.space", "endpoint": "/chat", "type": "streaming_chat", "timeout": 45.0, "payload": { "prompt": "--- HISTORY START ---\nUser: Say 'ready' if you're working\n--- HISTORY END ---\n\nUser's latest message: \"Say 'ready' if you're working\"", "max_new_tokens": 50, "temperature": 0.1, "enable_code_execution": False, "enable_web_search": False, "enable_thinking": False } }, { "name": "Qwen 2", "url": "https://rajhuggingface4253-qwe.hf.space", "endpoint": "/chat", "type": "streaming_chat", "timeout": 45.0, "payload": { "prompt": "--- HISTORY START ---\nUser: Say 'ready' if you're working\n--- HISTORY END ---\n\nUser's latest message: \"Say 'ready' if you're working\"", "max_new_tokens": 50, "temperature": 0.1, "enable_code_execution": False, "enable_web_search": False, "enable_thinking": False } }, { "name": "Qwen 3", "url": "https://rajhuggingface4253-qwen3.hf.space", "endpoint": "/chat", "type": "streaming_chat", "timeout": 45.0, "payload": { "prompt": "--- HISTORY START ---\nUser: Say 'ready' if you're working\n--- HISTORY END ---\n\nUser's latest message: \"Say 'ready' if you're working\"", "max_new_tokens": 50, "temperature": 0.1, "enable_code_execution": False, "enable_web_search": False, "enable_thinking": False } }, { "name": "Kokoro TTS", "url": "https://rajhuggingface4253-koko.hf.space", "endpoint": "/health", "type": "health_check", "timeout": 15.0 }, { "name": "Kitten TTS", "url": "https://rajhuggingface4253-kitten.hf.space", "endpoint": "/health", "type": "health_check", "timeout": 15.0 } ] # Global state ping_results: Dict[str, Dict] = {} model_warmup_results: Dict[str, Dict] = {} health_results: Dict[str, Dict] = {} last_ping_run: datetime = None last_model_warmup: datetime = None last_health_check_time: float = 0 class ParallelWarmer: def __init__(self): self.max_retries = 3 self.retry_delay = 1 # Base delay in seconds for exponential backoff async def ping_server_with_retry(self, url: str) -> Dict: """Ping a server with automatic retry on failure""" last_error = None for attempt in range(self.max_retries): start_time = time.time() # Defined BEFORE try block try: async with httpx.AsyncClient(timeout=10.0) as client: response = await client.get(url) response_time = round((time.time() - start_time) * 1000, 1) if response.status_code < 500: # Only retry on server errors return { 'status': 'success', 'response_time_ms': response_time, 'status_code': response.status_code, 'timestamp': datetime.now().isoformat(), 'attempts': attempt + 1 } else: last_error = f"HTTP {response.status_code}" except Exception as e: last_error = str(e) response_time = round((time.time() - start_time) * 1000, 1) # Exponential backoff for retries if attempt < self.max_retries - 1: await asyncio.sleep(self.retry_delay * (2 ** attempt)) return { 'status': 'error', 'error': str(last_error) if last_error else 'Max retries exceeded', 'timestamp': datetime.now().isoformat(), 'attempts': self.max_retries, 'response_time_ms': response_time if 'response_time' in locals() else 0 } async def warmup_chat_model_with_retry(self, model_config: Dict) -> Dict: """Warm up chat models with retry logic""" last_error = None for attempt in range(self.max_retries): start_time = time.time() # ✅ Defined BEFORE try block try: async with httpx.AsyncClient(timeout=model_config.get('timeout', 45.0)) as client: payload = model_config['payload'] api_url = f"{model_config['url']}{model_config['endpoint']}" response = await client.post(api_url, json=payload) response_time = round((time.time() - start_time) * 1000, 1) if response.status_code == 200: try: collected_response = "" async for chunk in response.aiter_text(): if chunk.strip(): collected_response += chunk if collected_response and len(collected_response.strip()) > 5: return { 'status': 'success', 'response_time_ms': response_time, 'status_code': response.status_code, 'ai_response': collected_response[:100].strip(), 'got_ai_response': True, 'timestamp': datetime.now().isoformat(), 'attempts': attempt + 1 } else: return { 'status': 'success', 'response_time_ms': response_time, 'status_code': response.status_code, 'ai_response': 'Empty response', 'got_ai_response': False, 'timestamp': datetime.now().isoformat(), 'attempts': attempt + 1 } except Exception as e: # Stream error but HTTP 200 OK return { 'status': 'success', 'response_time_ms': response_time, 'status_code': response.status_code, 'ai_response': f'Stream error: {str(e)}', 'got_ai_response': False, 'timestamp': datetime.now().isoformat(), 'attempts': attempt + 1 } else: last_error = f"HTTP {response.status_code}" except asyncio.TimeoutError: last_error = 'Request timeout' except Exception as e: last_error = str(e) # Calculate response time even on error response_time = round((time.time() - start_time) * 1000, 1) # Exponential backoff for retries if attempt < self.max_retries - 1: await asyncio.sleep(self.retry_delay * (2 ** attempt)) return { 'status': 'error', 'error': str(last_error) if last_error else 'Max retries exceeded', 'response_time_ms': response_time, 'timestamp': datetime.now().isoformat(), 'attempts': self.max_retries } async def warmup_health_model_with_retry(self, model_config: Dict) -> Dict: """Warm up health endpoint models with retry""" last_error = None for attempt in range(self.max_retries): start_time = time.time() # ✅ Defined BEFORE try block try: async with httpx.AsyncClient(timeout=model_config.get('timeout', 15.0)) as client: api_url = f"{model_config['url']}{model_config['endpoint']}" response = await client.get(api_url) response_time = round((time.time() - start_time) * 1000, 1) if response.status_code == 200: return { 'status': 'success', 'response_time_ms': response_time, 'status_code': response.status_code, 'timestamp': datetime.now().isoformat(), 'attempts': attempt + 1 } else: last_error = f"HTTP {response.status_code}" except Exception as e: last_error = str(e) # Calculate response time even on error response_time = round((time.time() - start_time) * 1000, 1) # Exponential backoff for retries if attempt < self.max_retries - 1: await asyncio.sleep(self.retry_delay * (2 ** attempt)) return { 'status': 'error', 'error': str(last_error) if last_error else 'Max retries exceeded', 'response_time_ms': response_time, 'timestamp': datetime.now().isoformat(), 'attempts': self.max_retries } async def warmup_single_model_with_retry(self, model_config: Dict) -> Dict: """Route to appropriate warming method with retry""" if model_config.get('type') == 'streaming_chat': return await self.warmup_chat_model_with_retry(model_config) else: return await self.warmup_health_model_with_retry(model_config) # Initialize the parallel warmer warmer = ParallelWarmer() async def ping_all_in_parallel(): """Ping ALL targets in parallel with isolated error handling""" global ping_results, model_warmup_results, health_results, last_ping_run, last_model_warmup, last_health_check_time all_tasks = [] task_mapping = {} # 1. Create tasks for regular servers for server in regular_servers: task = asyncio.create_task(warmer.ping_server_with_retry(server)) all_tasks.append(task) task_mapping[task] = ('server', server) # 2. Create tasks for model warmups for model in models_to_warm: task = asyncio.create_task(warmer.warmup_single_model_with_retry(model)) all_tasks.append(task) task_mapping[task] = ('model', model['url']) # 3. Create tasks for health checks (if needed) current_time = time.time() if (current_time - last_health_check_time) >= HEALTH_CHECK_INTERVAL and pinger_spaces: for space_url in pinger_spaces: health_url = f"{space_url}/health" task = asyncio.create_task(warmer.ping_server_with_retry(health_url)) all_tasks.append(task) task_mapping[task] = ('health', space_url) last_health_check_time = current_time # 4. Execute ALL tasks in parallel if all_tasks: results = await asyncio.gather(*all_tasks, return_exceptions=True) # 5. Process results (isolated - failures don't affect others) for task, result in zip(all_tasks, results): task_type, identifier = task_mapping[task] if isinstance(result, Exception): # Task crashed but we isolate the failure print(f"⚠️ Task crashed with exception: {type(result).__name__}: {result}") error_result = { 'status': 'error', 'error': f"{type(result).__name__}: {str(result)}", 'timestamp': datetime.now().isoformat() } result = error_result if task_type == 'server': ping_results[identifier] = result elif task_type == 'model': # Find model name for the URL model_name = next((m['name'] for m in models_to_warm if m['url'] == identifier), identifier) model_warmup_results[identifier] = { 'model_info': {'name': model_name, 'url': identifier}, 'health_check': result } elif task_type == 'health': health_results[identifier] = result # Update timestamps last_ping_run = datetime.now() last_model_warmup = datetime.now() # Log summary server_success = sum(1 for r in ping_results.values() if r.get('status') == 'success') model_success = sum(1 for r in model_warmup_results.values() if r['health_check'].get('status') == 'success') model_ai_response = sum(1 for r in model_warmup_results.values() if r['health_check'].get('got_ai_response')) print(f"✅ {datetime.now().strftime('%H:%M:%S')} - Parallel ping complete: " f"{server_success}/{len(regular_servers)} servers OK, " f"{model_success}/{len(models_to_warm)} models healthy " f"({model_ai_response} AI responding)") async def continuous_parallel_pinging(): """Main pinging loop with full parallel execution""" print("🚀 Smart Model Warmer Started (Fully Parallel)!") print(f"🌐 Regular servers: {len(regular_servers)}") print(f"🤖 Models to warm: {len(models_to_warm)}") print(f"🔗 Pinger network: {len(pinger_spaces)}") while True: try: start_cycle = time.time() await ping_all_in_parallel() # Calculate sleep time (ensure exactly PING_INTERVAL between starts) cycle_duration = time.time() - start_cycle sleep_time = max(0, PING_INTERVAL - cycle_duration) if sleep_time > 0: await asyncio.sleep(sleep_time) else: print(f"⚠️ Warning: Ping cycle took {cycle_duration:.1f}s (longer than {PING_INTERVAL}s interval)!") await asyncio.sleep(1) # Minimum delay except Exception as e: print(f"❌ Error in main loop: {e}") await asyncio.sleep(60) # Recover after error @asynccontextmanager async def lifespan(app: FastAPI): # Startup asyncio.create_task(continuous_parallel_pinging()) yield # Shutdown print("Shutting down...") app = FastAPI(title="Smart Model Warmer", lifespan=lifespan) @app.get("/", response_class=HTMLResponse) async def home(): """Dashboard showing warming status""" regular_success = sum(1 for r in ping_results.values() if r.get('status') == 'success') model_success = sum(1 for r in model_warmup_results.values() if r['health_check'].get('status') == 'success') health_success = sum(1 for r in health_results.values() if r.get('status') == 'success') ai_success = sum(1 for r in model_warmup_results.values() if r['health_check'].get('got_ai_response')) # Get model details for display model_statuses = [] for url, data in model_warmup_results.items(): model_info = data['model_info'] health = data['health_check'] status_display = "success" if health['status'] == 'success' else "error" ai_indicator = " ✓AI" if health.get('got_ai_response') else "" error_display = f" - {health['error']}" if health.get('error') else "" preview = f" - '{health.get('ai_response', '')[:50]}...'" if health.get('ai_response') else "" model_statuses.append({ 'name': model_info['name'], 'type': next((m.get('type', 'health_check') for m in models_to_warm if m['url'] == url), 'health_check'), 'status_display': status_display, 'response_time': health.get('response_time_ms', 0), 'ai_indicator': ai_indicator, 'error_display': error_display, 'preview': preview }) model_status_html = "".join([ f"
{regular_success}/{len(regular_servers)} OK
{model_success}/{len(models_to_warm)} Healthy
{ai_success}/{len(models_to_warm)} AI Responding
{health_success}/{len(pinger_spaces)} OK
Last Model Check: {last_model_warmup.strftime('%Y-%m-%d %H:%M:%S') if last_model_warmup else 'Never'}
Last Server Check: {last_ping_run.strftime('%Y-%m-%d %H:%M:%S') if last_ping_run else 'Never'}
Next check in: ~{PING_INTERVAL // 60} minutes
All checks run in parallel with automatic retries