Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import json | |
| import time | |
| import hashlib | |
| from typing import Dict, Optional | |
| # ============================================================================ | |
| # ZEROENGINE-BACKEND: Background Processing Service | |
| # ============================================================================ | |
| # This space handles: | |
| # - Tokenization pre-processing | |
| # - Prompt caching | |
| # - Token accounting calculations | |
| # - Response caching | |
| # ============================================================================ | |
| # In-memory caches (will reset on space restart) | |
| prompt_cache = {} | |
| response_cache = {} | |
| token_ledger = {} | |
| def tokenize_text(text: str) -> str: | |
| """ | |
| Fast tokenization without loading full model | |
| Returns: JSON string with token count estimation | |
| """ | |
| try: | |
| # Simple estimation (4 chars β 1 token for English) | |
| # This is FAST and good enough for pre-processing | |
| estimated_tokens = len(text) // 4 | |
| word_count = len(text.split()) | |
| # Create cache key | |
| text_hash = hashlib.md5(text.encode()).hexdigest()[:16] | |
| result = { | |
| "success": True, | |
| "text_hash": text_hash, | |
| "estimated_tokens": estimated_tokens, | |
| "word_count": word_count, | |
| "char_count": len(text), | |
| "timestamp": time.time() | |
| } | |
| # Cache this tokenization | |
| prompt_cache[text_hash] = { | |
| "text": text[:100] + "..." if len(text) > 100 else text, | |
| "tokens": estimated_tokens, | |
| "cached_at": time.time() | |
| } | |
| return json.dumps(result, indent=2) | |
| except Exception as e: | |
| return json.dumps({ | |
| "success": False, | |
| "error": str(e) | |
| }, indent=2) | |
| def cache_prompt(key: str, value: str) -> str: | |
| """ | |
| Store prompt in cache with timestamp | |
| """ | |
| try: | |
| prompt_cache[key] = { | |
| "value": value, | |
| "timestamp": time.time() | |
| } | |
| # Limit cache size to 100 entries | |
| if len(prompt_cache) > 100: | |
| oldest_key = min(prompt_cache.keys(), key=lambda k: prompt_cache[k]["timestamp"]) | |
| del prompt_cache[oldest_key] | |
| return json.dumps({ | |
| "success": True, | |
| "cached": key, | |
| "cache_size": len(prompt_cache) | |
| }, indent=2) | |
| except Exception as e: | |
| return json.dumps({ | |
| "success": False, | |
| "error": str(e) | |
| }, indent=2) | |
| def get_cached_prompt(key: str) -> str: | |
| """ | |
| Retrieve cached prompt | |
| """ | |
| try: | |
| if key in prompt_cache: | |
| data = prompt_cache[key] | |
| return json.dumps({ | |
| "success": True, | |
| "value": data["value"], | |
| "age_seconds": round(time.time() - data["timestamp"], 2) | |
| }, indent=2) | |
| return json.dumps({ | |
| "success": False, | |
| "error": "Cache key not found" | |
| }, indent=2) | |
| except Exception as e: | |
| return json.dumps({ | |
| "success": False, | |
| "error": str(e) | |
| }, indent=2) | |
| def cache_response(prompt_hash: str, response: str) -> str: | |
| """ | |
| Cache a complete response for instant retrieval | |
| """ | |
| try: | |
| response_cache[prompt_hash] = { | |
| "response": response, | |
| "timestamp": time.time() | |
| } | |
| # Limit cache size | |
| if len(response_cache) > 50: | |
| oldest_key = min(response_cache.keys(), key=lambda k: response_cache[k]["timestamp"]) | |
| del response_cache[oldest_key] | |
| return json.dumps({ | |
| "success": True, | |
| "cached": prompt_hash, | |
| "cache_size": len(response_cache) | |
| }, indent=2) | |
| except Exception as e: | |
| return json.dumps({ | |
| "success": False, | |
| "error": str(e) | |
| }, indent=2) | |
| def get_cached_response(prompt_hash: str) -> str: | |
| """ | |
| Retrieve cached response | |
| """ | |
| try: | |
| if prompt_hash in response_cache: | |
| data = response_cache[prompt_hash] | |
| return json.dumps({ | |
| "success": True, | |
| "response": data["response"], | |
| "age_seconds": round(time.time() - data["timestamp"], 2) | |
| }, indent=2) | |
| return json.dumps({ | |
| "success": False, | |
| "error": "Response not cached" | |
| }, indent=2) | |
| except Exception as e: | |
| return json.dumps({ | |
| "success": False, | |
| "error": str(e) | |
| }, indent=2) | |
| def calculate_token_cost(username: str, duration_ms: float) -> str: | |
| """ | |
| Calculate token cost for a user | |
| Stateless - just returns the calculation | |
| """ | |
| try: | |
| cost = (duration_ms / 100.0) * 0.001 # 0.001 tokens per 100ms | |
| # Track in ledger (for analytics) | |
| if username not in token_ledger: | |
| token_ledger[username] = { | |
| "total_cost": 0.0, | |
| "total_duration_ms": 0.0, | |
| "requests": 0 | |
| } | |
| token_ledger[username]["total_cost"] += cost | |
| token_ledger[username]["total_duration_ms"] += duration_ms | |
| token_ledger[username]["requests"] += 1 | |
| return json.dumps({ | |
| "success": True, | |
| "username": username, | |
| "duration_ms": duration_ms, | |
| "cost": round(cost, 6), | |
| "total_cost": round(token_ledger[username]["total_cost"], 4), | |
| "total_requests": token_ledger[username]["requests"] | |
| }, indent=2) | |
| except Exception as e: | |
| return json.dumps({ | |
| "success": False, | |
| "error": str(e) | |
| }, indent=2) | |
| def get_cache_stats() -> str: | |
| """ | |
| Get statistics about cache usage | |
| """ | |
| try: | |
| return json.dumps({ | |
| "success": True, | |
| "prompt_cache_size": len(prompt_cache), | |
| "response_cache_size": len(response_cache), | |
| "users_tracked": len(token_ledger), | |
| "total_requests": sum(u["requests"] for u in token_ledger.values()), | |
| "timestamp": time.time() | |
| }, indent=2) | |
| except Exception as e: | |
| return json.dumps({ | |
| "success": False, | |
| "error": str(e) | |
| }, indent=2) | |
| # ============================================================================ | |
| # GRADIO INTERFACE | |
| # ============================================================================ | |
| with gr.Blocks(title="ZeroEngine-Backend", theme=gr.themes.Monochrome()) as demo: | |
| gr.HTML(""" | |
| <div style='text-align: center; padding: 20px;'> | |
| <h1>π§ ZeroEngine-Backend</h1> | |
| <p style='color: #888;'>Background Processing Service for ZeroEngine</p> | |
| </div> | |
| """) | |
| with gr.Tab("π’ Tokenize"): | |
| gr.Markdown("### Fast Tokenization Pre-Processing") | |
| with gr.Row(): | |
| with gr.Column(): | |
| tokenize_input = gr.Textbox( | |
| label="Text to Tokenize", | |
| placeholder="Enter text here...", | |
| lines=5 | |
| ) | |
| tokenize_btn = gr.Button("Tokenize", variant="primary") | |
| with gr.Column(): | |
| tokenize_output = gr.Code(label="Result (JSON)", language="json") | |
| tokenize_btn.click(tokenize_text, [tokenize_input], [tokenize_output]) | |
| with gr.Tab("πΎ Prompt Cache"): | |
| gr.Markdown("### Store and Retrieve Prompts") | |
| with gr.Row(): | |
| with gr.Column(): | |
| cache_key_input = gr.Textbox(label="Cache Key") | |
| cache_value_input = gr.Textbox(label="Value to Cache", lines=3) | |
| cache_store_btn = gr.Button("Store", variant="primary") | |
| cache_store_output = gr.Code(label="Result", language="json") | |
| with gr.Column(): | |
| cache_get_input = gr.Textbox(label="Key to Retrieve") | |
| cache_get_btn = gr.Button("Retrieve", variant="secondary") | |
| cache_get_output = gr.Code(label="Result", language="json") | |
| cache_store_btn.click(cache_prompt, [cache_key_input, cache_value_input], [cache_store_output]) | |
| cache_get_btn.click(get_cached_prompt, [cache_get_input], [cache_get_output]) | |
| with gr.Tab("β‘ Response Cache"): | |
| gr.Markdown("### Cache Complete Responses") | |
| with gr.Row(): | |
| with gr.Column(): | |
| resp_hash_input = gr.Textbox(label="Prompt Hash") | |
| resp_value_input = gr.Textbox(label="Response to Cache", lines=5) | |
| resp_cache_btn = gr.Button("Cache Response", variant="primary") | |
| resp_cache_output = gr.Code(label="Result", language="json") | |
| with gr.Column(): | |
| resp_get_input = gr.Textbox(label="Hash to Retrieve") | |
| resp_get_btn = gr.Button("Get Response", variant="secondary") | |
| resp_get_output = gr.Code(label="Result", language="json") | |
| resp_cache_btn.click(cache_response, [resp_hash_input, resp_value_input], [resp_cache_output]) | |
| resp_get_btn.click(get_cached_response, [resp_get_input], [resp_get_output]) | |
| with gr.Tab("π° Token Accounting"): | |
| gr.Markdown("### Calculate Token Costs") | |
| with gr.Row(): | |
| username_input = gr.Textbox(label="Username", value="turtle170") | |
| duration_input = gr.Number(label="Duration (ms)", value=5000) | |
| calc_btn = gr.Button("Calculate Cost", variant="primary") | |
| calc_output = gr.Code(label="Result (JSON)", language="json") | |
| calc_btn.click(calculate_token_cost, [username_input, duration_input], [calc_output]) | |
| with gr.Tab("π Stats"): | |
| gr.Markdown("### Cache Statistics") | |
| stats_btn = gr.Button("Get Stats", variant="primary") | |
| stats_output = gr.Code(label="Statistics (JSON)", language="json") | |
| stats_btn.click(get_cache_stats, None, [stats_output]) | |
| if __name__ == "__main__": | |
| demo.launch(server_name="0.0.0.0", server_port=7860, ssr_mode=False, share = True) |