turtle170's picture
Update app.py
c3aee78 verified
raw
history blame
38.2 kB
import gradio as gr
import json
import time
import hashlib
import logging
import datetime
import pytz
from typing import Dict, Optional
# Initialize logging for backend
logging.basicConfig(level=logging.INFO, format='%(asctime)s - BACKEND - %(message)s', force=True)
logger = logging.getLogger(__name__)
# Suppress asyncio warnings during shutdown
import warnings
warnings.filterwarnings("ignore", category=RuntimeWarning, message=".*asyncio.*")
# ============================================================================
# ZEROENGINE-BACKEND: Background Processing Service
# ============================================================================
# This space handles:
# - Tokenization pre-processing
# - Prompt caching
# - Token accounting calculations
# - Response caching
# ============================================================================
# In-memory caches (will reset on space restart)
prompt_cache = {}
response_cache = {}
token_ledger = {}
backend_start_time = time.time()
def tokenize_text(text: str) -> str:
"""Enhanced tokenization with extremely detailed logging"""
logger.info(f"[TOKENIZE] ===== TOKENIZE REQUEST START =====")
logger.info(f"[TOKENIZE] Input text length: {len(text)} characters")
logger.info(f"[TOKENIZE] Input text preview: '{text[:100]}{'...' if len(text) > 100 else ''}'")
logger.info(f"[TOKENIZE] Input text hash: {hashlib.md5(text.encode()).hexdigest()[:16]}")
start_time = time.time()
try:
# Simple character-based estimation (can be enhanced with proper tokenizer)
estimated_tokens = len(text.split()) + len(text) // 4
processing_time = time.time() - start_time
result = {
"success": True,
"estimated_tokens": estimated_tokens,
"processing_time_ms": round(processing_time * 1000, 2),
"text_length": len(text),
"word_count": len(text.split()),
"char_count": len(text),
"timestamp": datetime.datetime.now(pytz.UTC).isoformat(),
"request_id": hashlib.md5(f"{text}{time.time()}".encode()).hexdigest()[:8]
}
logger.info(f"[TOKENIZE] βœ… Tokenization completed successfully")
logger.info(f"[TOKENIZE] Estimated tokens: {estimated_tokens}")
logger.info(f"[TOKENIZE] Processing time: {processing_time:.4f}s ({processing_time*1000:.2f}ms)")
logger.info(f"[TOKENIZE] Word count: {len(text.split())}")
logger.info(f"[TOKENIZE] Character count: {len(text)}")
logger.info(f"[TOKENIZE] Request ID: {result['request_id']}")
logger.info(f"[TOKENIZE] ===== TOKENIZE REQUEST END =====")
# Create cache key
text_hash = hashlib.md5(text.encode()).hexdigest()[:16]
prompt_cache[text_hash] = {
"text": text[:100] + "..." if len(text) > 100 else text,
"tokens": estimated_tokens,
"cached_at": time.time()
}
logger.info(f"[TOKENIZE] Cached tokenization result for key: {text_hash}")
return json.dumps(result, indent=2)
except Exception as e:
processing_time = time.time() - start_time
logger.error(f"[TOKENIZE] ❌ Tokenization failed after {processing_time:.4f}s: {e}")
logger.error(f"[TOKENIZE] Error type: {type(e).__name__}")
logger.error(f"[TOKENIZE] Error details: {str(e)}")
logger.error(f"[TOKENIZE] Input text that caused error: '{text[:200]}{'...' if len(text) > 200 else ''}'")
logger.error(f"[TOKENIZE] ===== TOKENIZE REQUEST END (ERROR) =====")
return json.dumps({
"success": False,
"error": str(e),
"error_type": type(e).__name__,
"processing_time_ms": round(processing_time * 1000, 2),
"timestamp": datetime.datetime.now(pytz.UTC).isoformat()
}, indent=2)
def cache_prompt(key: str, value: str) -> str:
"""Store prompt in cache with extremely detailed logging"""
logger.info(f"[CACHE-PROMPT] ===== CACHE PROMPT REQUEST START =====")
logger.info(f"[CACHE-PROMPT] Requested key: '{key}'")
logger.info(f"[CACHE-PROMPT] Key length: {len(key)} characters")
logger.info(f"[CACHE-PROMPT] Key hash: {hashlib.md5(key.encode()).hexdigest()[:16]}")
logger.info(f"[CACHE-PROMPT] Value length: {len(value)} characters")
logger.info(f"[CACHE-PROMPT] Value preview: '{value[:100]}{'...' if len(value) > 100 else ''}'")
logger.info(f"[CACHE-PROMPT] Current cache size: {len(prompt_cache)} entries")
logger.info(f"[CACHE-PROMPT] Current cache memory usage: {sum(len(v) for v in prompt_cache.values())} characters")
logger.info(f"[CACHE-PROMPT] Available keys: {list(prompt_cache.keys())[:10]}{'...' if len(prompt_cache) > 10 else ''}")
start_time = time.time()
try:
prompt_cache[key] = {
"value": value,
"timestamp": time.time()
}
processing_time = time.time() - start_time
# Limit cache size to 100 entries
if len(prompt_cache) > 100:
oldest_key = min(prompt_cache.keys(), key=lambda k: prompt_cache[k]["timestamp"])
del prompt_cache[oldest_key]
logger.info(f"[CACHE-PROMPT] Removed oldest entry: {oldest_key}")
result = {
"success": True,
"key": key,
"value_length": len(value),
"cache_size": len(prompt_cache),
"processing_time_ms": round(processing_time * 1000, 2),
"timestamp": datetime.datetime.now(pytz.UTC).isoformat(),
"request_id": hashlib.md5(f"{key}{time.time()}".encode()).hexdigest()[:8]
}
logger.info(f"[CACHE-PROMPT] βœ… Prompt cached successfully")
logger.info(f"[CACHE-PROMPT] Cached key: '{key}'")
logger.info(f"[CACHE-PROMPT] Value length: {len(value)} characters")
logger.info(f"[CACHE-PROMPT] Value preview: '{value[:100]}{'...' if len(value) > 100 else ''}'")
logger.info(f"[CACHE-PROMPT] Processing time: {processing_time:.4f}s ({processing_time*1000:.2f}ms)")
logger.info(f"[CACHE-PROMPT] Request ID: {result['request_id']}")
logger.info(f"[CACHE-PROMPT] ===== CACHE PROMPT REQUEST END =====")
return json.dumps(result, indent=2)
except Exception as e:
processing_time = time.time() - start_time
logger.error(f"[CACHE-PROMPT] ❌ Cache prompt failed after {processing_time:.4f}s: {e}")
logger.error(f"[CACHE-PROMPT] Error type: {type(e).__name__}")
logger.error(f"[CACHE-PROMPT] Error details: {str(e)}")
logger.error(f"[CACHE-PROMPT] Key that caused error: '{key}'")
logger.error(f"[CACHE-PROMPT] Value that caused error: '{value[:200]}{'...' if len(value) > 200 else ''}'")
logger.error(f"[CACHE-PROMPT] ===== CACHE PROMPT REQUEST END (ERROR) =====")
return json.dumps({
"success": False,
"error": str(e),
"error_type": type(e).__name__,
"processing_time_ms": round(processing_time * 1000, 2),
"timestamp": datetime.datetime.now(pytz.UTC).isoformat()
}, indent=2)
def get_cached_prompt(key: str) -> str:
"""Retrieve a cached prompt with extremely detailed logging"""
logger.info(f"[GET-PROMPT] ===== GET CACHED PROMPT REQUEST START =====")
logger.info(f"[GET-PROMPT] Requested key: '{key}'")
logger.info(f"[GET-PROMPT] Key length: {len(key)} characters")
logger.info(f"[GET-PROMPT] Key hash: {hashlib.md5(key.encode()).hexdigest()[:16]}")
logger.info(f"[GET-PROMPT] Current cache size: {len(prompt_cache)} entries")
logger.info(f"[GET-PROMPT] Current cache memory usage: {sum(len(v) for v in prompt_cache.values())} characters")
logger.info(f"[GET-PROMPT] Available keys: {list(prompt_cache.keys())[:10]}{'...' if len(prompt_cache) > 10 else ''}")
start_time = time.time()
try:
cached_value = prompt_cache.get(key)
processing_time = time.time() - start_time
if cached_value is not None:
result = {
"success": True,
"found": True,
"key": key,
"value": cached_value,
"value_length": len(cached_value),
"cache_size": len(prompt_cache),
"processing_time_ms": round(processing_time * 1000, 2),
"timestamp": datetime.datetime.now(pytz.UTC).isoformat(),
"request_id": hashlib.md5(f"{key}{time.time()}".encode()).hexdigest()[:8],
"cache_hit": True
}
logger.info(f"[GET-PROMPT] βœ… Cache HIT - prompt found")
logger.info(f"[GET-PROMPT] Found key: '{key}'")
logger.info(f"[GET-PROMPT] Value length: {len(cached_value)} characters")
logger.info(f"[GET-PROMPT] Value preview: '{cached_value[:100]}{'...' if len(cached_value) > 100 else ''}'")
logger.info(f"[GET-PROMPT] Processing time: {processing_time:.4f}s ({processing_time*1000:.2f}ms)")
logger.info(f"[GET-PROMPT] Request ID: {result['request_id']}")
else:
result = {
"success": True,
"found": False,
"key": key,
"value": None,
"cache_size": len(prompt_cache),
"processing_time_ms": round(processing_time * 1000, 2),
"timestamp": datetime.datetime.now(pytz.UTC).isoformat(),
"request_id": hashlib.md5(f"{key}{time.time()}".encode()).hexdigest()[:8],
"cache_hit": False
}
logger.warning(f"[GET-PROMPT] ⚠️ Cache MISS - prompt not found")
logger.warning(f"[GET-PROMPT] Missing key: '{key}'")
logger.warning(f"[GET-PROMPT] Processing time: {processing_time:.4f}s ({processing_time*1000:.2f}ms)")
logger.warning(f"[GET-PROMPT] Request ID: {result['request_id']}")
logger.info(f"[GET-PROMPT] ===== GET CACHED PROMPT REQUEST END =====")
return json.dumps(result, indent=2)
except Exception as e:
processing_time = time.time() - start_time
logger.error(f"[GET-PROMPT] ❌ Get cached prompt failed after {processing_time:.4f}s: {e}")
logger.error(f"[GET-PROMPT] Error type: {type(e).__name__}")
logger.error(f"[GET-PROMPT] Error details: {str(e)}")
logger.error(f"[GET-PROMPT] Key that caused error: '{key}'")
logger.error(f"[GET-PROMPT] ===== GET CACHED PROMPT REQUEST END (ERROR) =====")
return json.dumps({
"success": False,
"error": str(e),
"error_type": type(e).__name__,
"processing_time_ms": round(processing_time * 1000, 2),
"timestamp": datetime.datetime.now(pytz.UTC).isoformat()
}, indent=2)
def cache_response(prompt_hash: str, response: str) -> str:
"""Cache a complete response with extremely detailed logging"""
logger.info(f"[CACHE-RESPONSE] ===== CACHE RESPONSE REQUEST START =====")
logger.info(f"[CACHE-RESPONSE] Prompt hash: '{prompt_hash}'")
logger.info(f"[CACHE-RESPONSE] Hash length: {len(prompt_hash)} characters")
logger.info(f"[CACHE-RESPONSE] Response length: {len(response)} characters")
logger.info(f"[CACHE-RESPONSE] Response preview: '{response[:150]}{'...' if len(response) > 150 else ''}'")
logger.info(f"[CACHE-RESPONSE] Current response cache size: {len(response_cache)} entries")
logger.info(f"[CACHE-RESPONSE] Current cache memory usage: {sum(len(v['response']) for v in response_cache.values())} characters")
logger.info(f"[CACHE-RESPONSE] Available hashes: {list(response_cache.keys())[:10]}{'...' if len(response_cache) > 10 else ''}")
start_time = time.time()
try:
response_cache[prompt_hash] = {
"response": response,
"timestamp": time.time()
}
processing_time = time.time() - start_time
# Limit cache size to 50 entries
if len(response_cache) > 50:
oldest_key = min(response_cache.keys(), key=lambda k: response_cache[k]["timestamp"])
del response_cache[oldest_key]
logger.info(f"[CACHE-RESPONSE] Removed oldest entry: {oldest_key}")
result = {
"success": True,
"cached_hash": prompt_hash,
"response_length": len(response),
"cache_size": len(response_cache),
"processing_time_ms": round(processing_time * 1000, 2),
"timestamp": datetime.datetime.now(pytz.UTC).isoformat(),
"request_id": hashlib.md5(f"{prompt_hash}{time.time()}".encode()).hexdigest()[:8],
"cache_memory_usage": sum(len(v['response']) for v in response_cache.values())
}
logger.info(f"[CACHE-RESPONSE] βœ… Response cached successfully")
logger.info(f"[CACHE-RESPONSE] Cached hash: '{prompt_hash}'")
logger.info(f"[CACHE-RESPONSE] Response length: {len(response)} characters")
logger.info(f"[CACHE-RESPONSE] New cache size: {len(response_cache)} entries")
logger.info(f"[CACHE-RESPONSE] New cache memory usage: {result['cache_memory_usage']} characters")
logger.info(f"[CACHE-RESPONSE] Processing time: {processing_time:.4f}s ({processing_time*1000:.2f}ms)")
logger.info(f"[CACHE-RESPONSE] Request ID: {result['request_id']}")
logger.info(f"[CACHE-RESPONSE] ===== CACHE RESPONSE REQUEST END =====")
return json.dumps(result, indent=2)
except Exception as e:
processing_time = time.time() - start_time
logger.error(f"[CACHE-RESPONSE] ❌ Cache response failed after {processing_time:.4f}s: {e}")
logger.error(f"[CACHE-RESPONSE] Error type: {type(e).__name__}")
logger.error(f"[CACHE-RESPONSE] Error details: {str(e)}")
logger.error(f"[CACHE-RESPONSE] Hash that caused error: '{prompt_hash}'")
logger.error(f"[CACHE-RESPONSE] Response preview that caused error: '{response[:300]}{'...' if len(response) > 300 else ''}'")
logger.error(f"[CACHE-RESPONSE] ===== CACHE RESPONSE REQUEST END (ERROR) =====")
return json.dumps({
"success": False,
"error": str(e),
"error_type": type(e).__name__,
"processing_time_ms": round(processing_time * 1000, 2),
"timestamp": datetime.datetime.now(pytz.UTC).isoformat()
}, indent=2)
def get_cached_response(prompt_hash: str) -> str:
"""Retrieve cached response with extremely detailed logging"""
logger.info(f"[GET-RESPONSE] ===== GET CACHED RESPONSE REQUEST START =====")
logger.info(f"[GET-RESPONSE] Requested hash: '{prompt_hash}'")
logger.info(f"[GET-RESPONSE] Hash length: {len(prompt_hash)} characters")
logger.info(f"[GET-RESPONSE] Current response cache size: {len(response_cache)} entries")
logger.info(f"[GET-RESPONSE] Current cache memory usage: {sum(len(v['response']) for v in response_cache.values())} characters")
logger.info(f"[GET-RESPONSE] Available hashes: {list(response_cache.keys())[:10]}{'...' if len(response_cache) > 10 else ''}")
start_time = time.time()
try:
cached_data = response_cache.get(prompt_hash)
processing_time = time.time() - start_time
if cached_data is not None:
response = cached_data["response"]
age_seconds = round(time.time() - cached_data["timestamp"], 2)
result = {
"success": True,
"found": True,
"hash": prompt_hash,
"response": response,
"response_length": len(response),
"age_seconds": age_seconds,
"cache_size": len(response_cache),
"processing_time_ms": round(processing_time * 1000, 2),
"timestamp": datetime.datetime.now(pytz.UTC).isoformat(),
"request_id": hashlib.md5(f"{prompt_hash}{time.time()}".encode()).hexdigest()[:8],
"cache_hit": True,
"cached_at": datetime.datetime.fromtimestamp(cached_data["timestamp"], pytz.UTC).isoformat()
}
logger.info(f"[GET-RESPONSE] βœ… Cache HIT - response found")
logger.info(f"[GET-RESPONSE] Found hash: '{prompt_hash}'")
logger.info(f"[GET-RESPONSE] Response length: {len(response)} characters")
logger.info(f"[GET-RESPONSE] Response preview: '{response[:150]}{'...' if len(response) > 150 else ''}'")
logger.info(f"[GET-RESPONSE] Response age: {age_seconds} seconds")
logger.info(f"[GET-RESPONSE] Cached at: {result['cached_at']}")
logger.info(f"[GET-RESPONSE] Processing time: {processing_time:.4f}s ({processing_time*1000:.2f}ms)")
logger.info(f"[GET-RESPONSE] Request ID: {result['request_id']}")
else:
result = {
"success": True,
"found": False,
"hash": prompt_hash,
"response": None,
"cache_size": len(response_cache),
"processing_time_ms": round(processing_time * 1000, 2),
"timestamp": datetime.datetime.now(pytz.UTC).isoformat(),
"request_id": hashlib.md5(f"{prompt_hash}{time.time()}".encode()).hexdigest()[:8],
"cache_hit": False
}
logger.warning(f"[GET-RESPONSE] ⚠️ Cache MISS - response not found")
logger.warning(f"[GET-RESPONSE] Missing hash: '{prompt_hash}'")
logger.warning(f"[GET-RESPONSE] Processing time: {processing_time:.4f}s ({processing_time*1000:.2f}ms)")
logger.warning(f"[GET-RESPONSE] Request ID: {result['request_id']}")
logger.info(f"[GET-RESPONSE] ===== GET CACHED RESPONSE REQUEST END =====")
return json.dumps(result, indent=2)
except Exception as e:
processing_time = time.time() - start_time
logger.error(f"[GET-RESPONSE] ❌ Get cached response failed after {processing_time:.4f}s: {e}")
logger.error(f"[GET-RESPONSE] Error type: {type(e).__name__}")
logger.error(f"[GET-RESPONSE] Error details: {str(e)}")
logger.error(f"[GET-RESPONSE] Hash that caused error: '{prompt_hash}'")
logger.error(f"[GET-RESPONSE] ===== GET CACHED RESPONSE REQUEST END (ERROR) =====")
return json.dumps({
"success": False,
"error": str(e),
"error_type": type(e).__name__,
"processing_time_ms": round(processing_time * 1000, 2),
"timestamp": datetime.datetime.now(pytz.UTC).isoformat()
}, indent=2)
def calculate_token_cost(username: str, duration_ms: float) -> str:
"""Calculate token cost with extremely detailed logging"""
logger.info(f"[TOKEN-COST] ===== TOKEN COST REQUEST START =====")
logger.info(f"[TOKEN-COST] Username: '{username}'")
logger.info(f"[TOKEN-COST] Username length: {len(username)} characters")
logger.info(f"[TOKEN-COST] Duration: {duration_ms}ms")
logger.info(f"[TOKEN-COST] Current users tracked: {len(token_ledger)}")
logger.info(f"[TOKEN-COST] User ledger keys: {list(token_ledger.keys())[:10]}{'...' if len(token_ledger) > 10 else ''}")
if username in token_ledger:
user_data = token_ledger[username]
logger.info(f"[TOKEN-COST] Existing user data found:")
logger.info(f"[TOKEN-COST] - Total cost: {user_data['total_cost']} tokens")
logger.info(f"[TOKEN-COST] - Total duration: {user_data['total_duration_ms']}ms")
logger.info(f"[TOKEN-COST] - Previous requests: {user_data['requests']}")
else:
logger.info(f"[TOKEN-COST] New user - creating ledger entry")
start_time = time.time()
try:
cost = (duration_ms / 100.0) * 0.001 # 0.001 tokens per 100ms
processing_time = time.time() - start_time
# Track in ledger (for analytics)
if username not in token_ledger:
token_ledger[username] = {
"total_cost": 0.0,
"total_duration_ms": 0.0,
"requests": 0,
"first_seen": time.time(),
"last_seen": time.time()
}
# Update user data
token_ledger[username]["total_cost"] += cost
token_ledger[username]["total_duration_ms"] += duration_ms
token_ledger[username]["requests"] += 1
token_ledger[username]["last_seen"] = time.time()
user_data = token_ledger[username]
avg_cost_per_request = user_data["total_cost"] / user_data["requests"]
avg_duration_per_request = user_data["total_duration_ms"] / user_data["requests"]
account_age_seconds = round(time.time() - user_data["first_seen"], 2)
result = {
"success": True,
"username": username,
"duration_ms": duration_ms,
"cost": round(cost, 6),
"total_cost": round(user_data["total_cost"], 4),
"total_requests": user_data["requests"],
"total_duration_ms": round(user_data["total_duration_ms"], 2),
"avg_cost_per_request": round(avg_cost_per_request, 6),
"avg_duration_per_request": round(avg_duration_per_request, 2),
"account_age_seconds": account_age_seconds,
"processing_time_ms": round(processing_time * 1000, 2),
"timestamp": datetime.datetime.now(pytz.UTC).isoformat(),
"request_id": hashlib.md5(f"{username}{duration_ms}{time.time()}".encode()).hexdigest()[:8]
}
logger.info(f"[TOKEN-COST] βœ… Token cost calculated successfully")
logger.info(f"[TOKEN-COST] Request cost: {cost} tokens")
logger.info(f"[TOKEN-COST] User total cost: {user_data['total_cost']} tokens")
logger.info(f"[TOKEN-COST] User total requests: {user_data['requests']}")
logger.info(f"[TOKEN-COST] User avg cost per request: {avg_cost_per_request} tokens")
logger.info(f"[TOKEN-COST] User avg duration per request: {avg_duration_per_request}ms")
logger.info(f"[TOKEN-COST] User account age: {account_age_seconds} seconds")
logger.info(f"[TOKEN-COST] Processing time: {processing_time:.4f}s ({processing_time*1000:.2f}ms)")
logger.info(f"[TOKEN-COST] Request ID: {result['request_id']}")
logger.info(f"[TOKEN-COST] ===== TOKEN COST REQUEST END =====")
return json.dumps(result, indent=2)
except Exception as e:
processing_time = time.time() - start_time
logger.error(f"[TOKEN-COST] ❌ Token cost calculation failed after {processing_time:.4f}s: {e}")
logger.error(f"[TOKEN-COST] Error type: {type(e).__name__}")
logger.error(f"[TOKEN-COST] Error details: {str(e)}")
logger.error(f"[TOKEN-COST] Username that caused error: '{username}'")
logger.error(f"[TOKEN-COST] Duration that caused error: {duration_ms}ms")
logger.error(f"[TOKEN-COST] ===== TOKEN COST REQUEST END (ERROR) =====")
return json.dumps({
"success": False,
"error": str(e),
"error_type": type(e).__name__,
"processing_time_ms": round(processing_time * 1000, 2),
"timestamp": datetime.datetime.now(pytz.UTC).isoformat()
}, indent=2)
def get_cache_stats() -> str:
"""Get cache statistics with extremely detailed logging"""
logger.info(f"[CACHE-STATS] ===== CACHE STATS REQUEST START =====")
logger.info(f"[CACHE-STATS] Current prompt cache size: {len(prompt_cache)} entries")
logger.info(f"[CACHE-STATS] Current response cache size: {len(response_cache)} entries")
logger.info(f"[CACHE-STATS] Current users tracked: {len(token_ledger)}")
logger.info(f"[CACHE-STATS] Prompt cache memory usage: {sum(len(str(v)) for v in prompt_cache.values())} characters")
logger.info(f"[CACHE-STATS] Response cache memory usage: {sum(len(v['response']) for v in response_cache.values())} characters")
logger.info(f"[CACHE-STATS] Total requests processed: {sum(u['requests'] for u in token_ledger.values())}")
start_time = time.time()
try:
# Calculate detailed statistics
total_prompt_memory = sum(len(str(v)) for v in prompt_cache.values())
total_response_memory = sum(len(v['response']) for v in response_cache.values())
total_requests = sum(u['requests'] for u in token_ledger.values())
total_tokens = sum(u['total_cost'] for u in token_ledger.values())
total_duration = sum(u['total_duration_ms'] for u in token_ledger.values())
# User statistics
active_users = len([u for u in token_ledger.values() if time.time() - u.get('last_seen', u.get('first_seen', 0)) < 3600])
avg_requests_per_user = total_requests / len(token_ledger) if len(token_ledger) > 0 else 0
avg_tokens_per_user = total_tokens / len(token_ledger) if len(token_ledger) > 0 else 0
processing_time = time.time() - start_time
result = {
"success": True,
"prompt_cache_size": len(prompt_cache),
"response_cache_size": len(response_cache),
"users_tracked": len(token_ledger),
"active_users_last_hour": active_users,
"total_requests": total_requests,
"total_tokens_spent": round(total_tokens, 4),
"total_duration_ms": round(total_duration, 2),
"avg_requests_per_user": round(avg_requests_per_user, 2),
"avg_tokens_per_user": round(avg_tokens_per_user, 4),
"prompt_cache_memory_bytes": total_prompt_memory,
"response_cache_memory_bytes": total_response_memory,
"total_cache_memory_bytes": total_prompt_memory + total_response_memory,
"processing_time_ms": round(processing_time * 1000, 2),
"timestamp": datetime.datetime.now(pytz.UTC).isoformat(),
"request_id": hashlib.md5(f"stats{time.time()}".encode()).hexdigest()[:8]
}
logger.info(f"[CACHE-STATS] βœ… Cache statistics retrieved successfully")
logger.info(f"[CACHE-STATS] Prompt cache: {len(prompt_cache)} entries ({total_prompt_memory} chars)")
logger.info(f"[CACHE-STATS] Response cache: {len(response_cache)} entries ({total_response_memory} chars)")
logger.info(f"[CACHE-STATS] Users tracked: {len(token_ledger)} ({active_users} active last hour)")
logger.info(f"[CACHE-STATS] Total requests: {total_requests}")
logger.info(f"[CACHE-STATS] Total tokens spent: {total_tokens}")
logger.info(f"[CACHE-STATS] Total duration: {total_duration}ms")
logger.info(f"[CACHE-STATS] Avg requests per user: {avg_requests_per_user}")
logger.info(f"[CACHE-STATS] Avg tokens per user: {avg_tokens_per_user}")
logger.info(f"[CACHE-STATS] Processing time: {processing_time:.4f}s ({processing_time*1000:.2f}ms)")
logger.info(f"[CACHE-STATS] Request ID: {result['request_id']}")
logger.info(f"[CACHE-STATS] ===== CACHE STATS REQUEST END =====")
return json.dumps(result, indent=2)
except Exception as e:
processing_time = time.time() - start_time
logger.error(f"[CACHE-STATS] ❌ Cache statistics retrieval failed after {processing_time:.4f}s: {e}")
logger.error(f"[CACHE-STATS] Error type: {type(e).__name__}")
logger.error(f"[CACHE-STATS] Error details: {str(e)}")
logger.error(f"[CACHE-STATS] ===== CACHE STATS REQUEST END (ERROR) =====")
return json.dumps({
"success": False,
"error": str(e),
"error_type": type(e).__name__,
"processing_time_ms": round(processing_time * 1000, 2),
"timestamp": datetime.datetime.now(pytz.UTC).isoformat()
}, indent=2)
def get_backend_health() -> str:
"""Get backend health status with extremely detailed logging"""
logger.info(f"[BACKEND-HEALTH] ===== BACKEND HEALTH REQUEST START =====")
logger.info(f"[BACKEND-HEALTH] Checking backend health status...")
logger.info(f"[BACKEND-HEALTH] Current prompt cache size: {len(prompt_cache)} entries")
logger.info(f"[BACKEND-HEALTH] Current response cache size: {len(response_cache)} entries")
logger.info(f"[BACKEND-HEALTH] Current users tracked: {len(token_ledger)}")
logger.info(f"[BACKEND-HEALTH] Total requests processed: {sum(u['requests'] for u in token_ledger.values())}")
start_time = time.time()
try:
# Calculate health metrics
total_cache_size = len(prompt_cache) + len(response_cache)
total_requests = sum(u['requests'] for u in token_ledger.values())
total_memory_usage = sum(len(str(v)) for v in prompt_cache.values()) + sum(len(v['response']) for v in response_cache.values())
# Determine health status
health_status = "healthy"
issues = []
if total_cache_size > 200:
health_status = "degraded"
issues.append("High cache usage")
if len(token_ledger) > 1000:
health_status = "degraded"
issues.append("High user count")
if total_memory_usage > 10000000: # 10MB
health_status = "degraded"
issues.append("High memory usage")
processing_time = time.time() - start_time
result = {
"success": True,
"status": health_status,
"issues": issues,
"prompt_cache_size": len(prompt_cache),
"response_cache_size": len(response_cache),
"total_cache_size": total_cache_size,
"users_tracked": len(token_ledger),
"total_requests": total_requests,
"total_memory_usage_bytes": total_memory_usage,
"uptime_seconds": round(time.time() - backend_start_time, 2) if 'backend_start_time' in globals() else 0,
"processing_time_ms": round(processing_time * 1000, 2),
"timestamp": datetime.datetime.now(pytz.UTC).isoformat(),
"request_id": hashlib.md5(f"health{time.time()}".encode()).hexdigest()[:8]
}
logger.info(f"[BACKEND-HEALTH] βœ… Backend health check completed successfully")
logger.info(f"[BACKEND-HEALTH] Health status: {health_status}")
if issues:
logger.warning(f"[BACKEND-HEALTH] Issues detected: {', '.join(issues)}")
logger.info(f"[BACKEND-HEALTH] Total cache size: {total_cache_size} entries")
logger.info(f"[BACKEND-HEALTH] Users tracked: {len(token_ledger)}")
logger.info(f"[BACKEND-HEALTH] Total requests: {total_requests}")
logger.info(f"[BACKEND-HEALTH] Memory usage: {total_memory_usage} bytes")
logger.info(f"[BACKEND-HEALTH] Processing time: {processing_time:.4f}s ({processing_time*1000:.2f}ms)")
logger.info(f"[BACKEND-HEALTH] Request ID: {result['request_id']}")
logger.info(f"[BACKEND-HEALTH] ===== BACKEND HEALTH REQUEST END =====")
return json.dumps(result, indent=2)
except Exception as e:
processing_time = time.time() - start_time
logger.error(f"[BACKEND-HEALTH] ❌ Backend health check failed after {processing_time:.4f}s: {e}")
logger.error(f"[BACKEND-HEALTH] Error type: {type(e).__name__}")
logger.error(f"[BACKEND-HEALTH] Error details: {str(e)}")
logger.error(f"[BACKEND-HEALTH] ===== BACKEND HEALTH REQUEST END (ERROR) =====")
return json.dumps({
"success": False,
"status": "error",
"error": str(e),
"error_type": type(e).__name__,
"processing_time_ms": round(processing_time * 1000, 2),
"timestamp": datetime.datetime.now(pytz.UTC).isoformat()
}, indent=2)
# ============================================================================
# GRADIO INTERFACE
# ============================================================================
with gr.Blocks(title="ZeroEngine-Backend") as demo:
# Apply theme after Blocks creation for Gradio 6.5.0 compatibility
if hasattr(demo, 'theme'):
demo.theme = gr.themes.Monochrome()
gr.HTML("""
<div style='text-align: center; padding: 20px;'>
<h1>πŸ”§ ZeroEngine-Backend</h1>
<p style='color: #888;'>Background Processing Service for ZeroEngine</p>
</div>
""")
with gr.Tab("πŸ”’ Tokenize"):
gr.Markdown("### Fast Tokenization Pre-Processing")
with gr.Row():
with gr.Column():
tokenize_input = gr.Textbox(
label="Text to Tokenize",
placeholder="Enter text here...",
lines=5
)
tokenize_btn = gr.Button("Tokenize", variant="primary")
with gr.Column():
tokenize_output = gr.Code(label="Result (JSON)", language="json")
tokenize_btn.click(tokenize_text, [tokenize_input], [tokenize_output])
with gr.Tab("πŸ’Ύ Prompt Cache"):
gr.Markdown("### Store and Retrieve Prompts")
with gr.Row():
with gr.Column():
cache_key_input = gr.Textbox(label="Cache Key")
cache_value_input = gr.Textbox(label="Value to Cache", lines=3)
cache_store_btn = gr.Button("Store", variant="primary")
cache_store_output = gr.Code(label="Result", language="json")
with gr.Column():
cache_get_input = gr.Textbox(label="Key to Retrieve")
cache_get_btn = gr.Button("Retrieve", variant="secondary")
cache_get_output = gr.Code(label="Result", language="json")
cache_store_btn.click(cache_prompt, [cache_key_input, cache_value_input], [cache_store_output])
cache_get_btn.click(get_cached_prompt, [cache_get_input], [cache_get_output])
with gr.Tab("⚑ Response Cache"):
gr.Markdown("### Cache Complete Responses")
with gr.Row():
with gr.Column():
resp_hash_input = gr.Textbox(label="Prompt Hash")
resp_value_input = gr.Textbox(label="Response to Cache", lines=5)
resp_cache_btn = gr.Button("Cache Response", variant="primary")
resp_cache_output = gr.Code(label="Result", language="json")
with gr.Column():
resp_get_input = gr.Textbox(label="Hash to Retrieve")
resp_get_btn = gr.Button("Get Response", variant="secondary")
resp_get_output = gr.Code(label="Result", language="json")
resp_cache_btn.click(cache_response, [resp_hash_input, resp_value_input], [resp_cache_output])
resp_get_btn.click(get_cached_response, [resp_get_input], [resp_get_output])
with gr.Tab("πŸ’° Token Accounting"):
gr.Markdown("### Calculate Token Costs")
with gr.Row():
username_input = gr.Textbox(label="Username", value="turtle170")
duration_input = gr.Number(label="Duration (ms)", value=5000)
calc_btn = gr.Button("Calculate Cost", variant="primary")
calc_output = gr.Code(label="Result (JSON)", language="json")
calc_btn.click(calculate_token_cost, [username_input, duration_input], [calc_output])
with gr.Tab("πŸ“Š Stats"):
gr.Markdown("### Cache Statistics")
stats_btn = gr.Button("Get Stats", variant="primary")
stats_output = gr.Code(label="Statistics (JSON)", language="json")
stats_btn.click(get_cache_stats, None, [stats_output])
with gr.Tab("πŸ₯ Health"):
gr.Markdown("### Backend Health Status")
health_btn = gr.Button("Check Health", variant="primary")
health_output = gr.Code(label="Health Status (JSON)", language="json")
health_btn.click(get_backend_health, None, [health_output])
if __name__ == "__main__":
import atexit
import signal
def cleanup_on_exit():
"""Cleanup function called on application exit"""
logger.info("[CLEANUP] Backend shutting down...")
# Clear caches
global prompt_cache, response_cache, token_ledger
logger.info(f"[CLEANUP] Clearing {len(prompt_cache)} prompt cache entries")
logger.info(f"[CLEANUP] Clearing {len(response_cache)} response cache entries")
logger.info(f"[CLEANUP] Clearing {len(token_ledger)} user token records")
prompt_cache.clear()
response_cache.clear()
token_ledger.clear()
logger.info("[CLEANUP] Backend shutdown complete")
# Register cleanup functions
atexit.register(cleanup_on_exit)
def signal_handler(signum, frame):
"""Handle shutdown signals gracefully"""
logger.info(f"[CLEANUP] Received signal {signum}")
cleanup_on_exit()
import sys
sys.exit(0)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
logger.info("[INIT] ===== BACKEND APPLICATION STARTUP =====")
logger.info(f"[INIT] ZeroEngine-Backend starting up...")
logger.info(f"[INIT] Backend start time: {datetime.datetime.fromtimestamp(backend_start_time, pytz.UTC).isoformat()}")
logger.info(f"[INIT] Python version: {sys.version}")
logger.info(f"[INIT] Gradio version: {gr.__version__}")
logger.info(f"[INIT] Cache sizes - Prompt: {len(prompt_cache)}, Response: {len(response_cache)}")
logger.info(f"[INIT] Users tracked: {len(token_ledger)}")
logger.info(f"[INIT] Server will launch on port 7861")
logger.info(f"[INIT] ===== BACKEND APPLICATION STARTUP END =====")
demo.launch(server_name="0.0.0.0", server_port=7861, ssr_mode=False)