ZeroEngine / app.py
turtle170's picture
Update app.py
d75d52e verified
raw
history blame
12.6 kB
"""
ZEROENGINE KERNEL V0.1
Target SDK: Gradio 6.5.0
Optimized for: 2 vCPU / 16GB RAM
Features: KV-Cache Stitching, Hard Partitioning, Resource Gatekeeper, Ghost Terminal
"""
import os
import json
import time
import psutil
import threading
import logging
from datetime import datetime
from typing import List, Dict, Optional, Generator
import gradio as gr
from huggingface_hub import HfApi, hf_hub_download
try:
from llama_cpp import Llama
except ImportError:
from llama_cpp_pydist import Llama
# ==========================================
# SYSTEM CONFIGURATION & CONSTANTS
# ==========================================
HF_TOKEN = os.environ.get("HF_TOKEN")
SPACE_ID = os.environ.get("SPACE_ID")
LOG_FILE = "engine_telemetry.json"
RAM_LIMIT_PCT = 0.50
SYSTEM_RESERVE_MB = 250
DEFAULT_MODEL = "unsloth/Llama-3.2-1B-Instruct-GGUF"
DEFAULT_QUANT = "Llama-3.2-1B-Instruct-Q4_K_M.gguf"
logging.basicConfig(level=logging.INFO, format='%(asctime)s - ZEROENGINE - %(message)s')
logger = logging.getLogger(__name__)
# ==========================================
# CORE TELEMETRY & PERSISTENCE
# ==========================================
class TelemetryManager:
"""Handles JSON-based usage tracking and HF Space persistence."""
def __init__(self, api: HfApi):
self.api = api
self.stats = self._load_initial_stats()
def _load_initial_stats(self) -> Dict:
if os.path.exists(LOG_FILE):
try:
with open(LOG_FILE, "r") as f:
return json.load(f)
except Exception as e:
logger.error(f"Failed to load telemetry: {e}")
return {
"session_start": str(datetime.now()),
"load_count": {},
"total_tokens_generated": 0,
"popular_repos": []
}
def track_load(self, repo: str, filename: str):
key = f"{repo}/{filename}"
self.stats["load_count"][key] = self.stats["load_count"].get(key, 0) + 1
self._sync_to_cloud()
def track_generation(self, tokens: int):
self.stats["total_tokens_generated"] += tokens
def _sync_to_cloud(self):
if not HF_TOKEN or not SPACE_ID:
return
try:
with open(LOG_FILE, "w") as f:
json.dump(self.stats, f, indent=4)
self.api.upload_file(
path_or_fileobj=LOG_FILE,
path_in_repo=LOG_FILE,
repo_id=SPACE_ID,
repo_type="space"
)
logger.info("Telemetry synced to Space repository.")
except Exception as e:
logger.warning(f"Telemetry sync failed: {e}")
# ==========================================
# RESOURCE GATEKEEPER
# ==========================================
class ResourceMonitor:
"""Monitors vCPU and RAM to prevent Kernel Panics."""
@staticmethod
def get_metrics() -> Dict:
vm = psutil.virtual_memory()
cpu_freq = psutil.cpu_freq()
return {
"ram_used_gb": round(vm.used / (1024**3), 2),
"ram_avail_gb": round(vm.available / (1024**3), 2),
"ram_total_gb": round(vm.total / (1024**3), 2),
"ram_pct": vm.percent,
"cpu_usage_pct": psutil.cpu_percent(interval=None),
"load_avg": os.getloadavg()[0] if hasattr(os, 'getloadavg') else 0
}
@staticmethod
def validate_deployment(file_path: str) -> (bool, str):
vm = psutil.virtual_memory()
file_size_mb = os.path.getsize(file_path) / (1024**2)
total_ram_mb = vm.total / (1024**2)
avail_ram_mb = vm.available / (1024**2)
if file_size_mb > (total_ram_mb * RAM_LIMIT_PCT):
return False, f"Model size ({file_size_mb:.1f}MB) exceeds 50% System RAM limit."
if (file_size_mb + SYSTEM_RESERVE_MB) > avail_ram_mb:
return False, f"Insufficient headroom. Need {SYSTEM_RESERVE_MB}MB buffer."
return True, "Resource check passed."
# ==========================================
# THE ZEROENGINE KERNEL
# ==========================================
class ZeroEngine:
def __init__(self):
self.api = HfApi(token=HF_TOKEN)
self.telemetry = TelemetryManager(self.api)
self.llm: Optional[Llama] = None
self.active_model_info = {"repo": "", "file": ""}
self.kernel_lock = threading.Lock()
self.is_prefilling = False
def list_ggufs(self, repo_id: str) -> List[str]:
try:
files = self.api.list_repo_files(repo_id=repo_id)
return [f for f in files if f.endswith(".gguf")]
except Exception as e:
logger.error(f"HF API Error: {e}")
return []
def boot_kernel(self, repo: str, filename: str) -> str:
"""Downloads and initializes the llama-cpp-python instance."""
try:
logger.info(f"Booting Kernel with {filename}...")
path = hf_hub_download(repo_id=repo, filename=filename, token=HF_TOKEN)
valid, msg = ResourceMonitor.validate_deployment(path)
if not valid:
return msg
with self.kernel_lock:
if self.llm:
del self.llm
self.llm = Llama(
model_path=path,
n_ctx=4096,
n_threads=1,
use_mmap=True,
n_batch=512,
last_n_tokens_size=64,
verbose=False
)
self.active_model_info = {"repo": repo, "file": filename}
self.telemetry.track_load(repo, filename)
return f"🟒 KERNEL ONLINE: {filename} loaded successfully."
except Exception as e:
return f"πŸ”΄ BOOT FAILURE: {str(e)}"
def stitch_cache(self, ghost_text: str) -> str:
"""KV-CACHE STITCHING: Pre-processes queue tokens in background."""
if not self.llm or not ghost_text:
return "Kernel Idle"
if self.is_prefilling:
return "Kernel Busy"
def _bg_eval():
self.is_prefilling = True
try:
tokens = self.llm.tokenize(ghost_text.encode("utf-8"))
self.llm.eval(tokens)
logger.info(f"KV-Cache stitched for {len(tokens)} tokens.")
except Exception as e:
logger.error(f"Stitching failed: {e}")
finally:
self.is_prefilling = False
threading.Thread(target=_bg_eval, daemon=True).start()
return "⚑ Ghost Cache Primed"
def inference_generator(self, prompt: str, history: List, ghost_context: str) -> Generator:
"""Main chat generator using prefix-matched context."""
if not self.llm:
yield history + [{"role": "assistant", "content": "Engine offline. Please load a model in the Sidebar."}]
return
full_input = f"{ghost_context}\n{prompt}" if ghost_context else prompt
formatted_prompt = f"User: {full_input}\nAssistant: "
response_text = ""
start_time = time.time()
tokens_count = 0
try:
stream = self.llm(
formatted_prompt,
max_tokens=1024,
stop=["User:", "\n\n"],
stream=True
)
for chunk in stream:
token = chunk["choices"][0]["text"]
response_text += token
tokens_count += 1
elapsed = time.time() - start_time
tps = round(tokens_count / elapsed, 1) if elapsed > 0 else 0
yield history + [
{"role": "user", "content": prompt},
{"role": "assistant", "content": f"{response_text}\n\n`[{tps} t/s]`"}
]
self.telemetry.track_generation(tokens_count)
except Exception as e:
yield history + [{"role": "assistant", "content": f"Inference Error: {str(e)}"}]
# ==========================================
# GRADIO INTERFACE (DASHBOARD)
# ==========================================
kernel = ZeroEngine()
with gr.Blocks(
title="ZeroEngine Kernel",
theme=gr.themes.Monochrome(primary_hue="blue", radius_size="none"),
css=".gradio-container {background-color: #fafafa;} #sidebar {border-left: 1px solid #ddd;}"
) as demo:
gr.HTML("""
<div style="text-align: center; padding: 10px; border-bottom: 2px solid #000;">
<h1 style="margin: 0;">πŸ›°οΈ ZEROENGINE V0.1</h1>
<p style="margin: 0; font-family: monospace;">STATUS: HIGH-PERFORMANCE KERNEL / VCPU-PARTITIONED</p>
</div>
""")
with gr.Row():
with gr.Column(scale=8):
chat_box = gr.Chatbot(
type="messages",
label="Active Slot Inference",
height=650,
show_label=False,
bubble_full_width=False
)
with gr.Row():
with gr.Column(scale=9):
user_input = gr.Textbox(
placeholder="Input command for active processing core...",
label="Active Terminal",
container=False
)
with gr.Column(scale=1, min_width=50):
send_btn = gr.Button("EXE", variant="primary")
with gr.Sidebar(label="Engine Room", open=True) as sidebar:
gr.Markdown("### πŸ“Š Resource Gauges")
with gr.Row():
ram_metric = gr.Label(label="RAM Allocation", value="0/16 GB")
cpu_metric = gr.Label(label="CPU Load", value="0%")
gr.Markdown("---")
gr.Markdown("### πŸ› οΈ Kernel Control")
repo_input = gr.Textbox(label="HF Repo ID", value=DEFAULT_MODEL)
quant_dropdown = gr.Dropdown(label="Quantization Target", choices=[])
with gr.Row():
scan_btn = gr.Button("Scan Repo", size="sm")
boot_btn = gr.Button("BOOT KERNEL", variant="primary", size="sm")
boot_status = gr.Markdown("*Standby: Kernel not initialized.*")
gr.Markdown("---")
gr.Markdown("### πŸ‘» Ghost Terminal")
ghost_buffer = gr.Textbox(
label="Pre-typing Buffer (Queue)",
placeholder="Queue users type here to prime KV-cache...",
lines=3
)
stitch_status = gr.Markdown("Cache State: `EMPTY`")
stitch_btn = gr.Button("STITCH CACHE", size="sm")
gr.Markdown("---")
gr.Markdown("### πŸ“‰ System Logs")
log_output = gr.Code(label="Kernel Output", language="shell", value="[INIT] ZeroEngine Ready.")
# --- UI LOGIC ---
def update_system_stats():
m = ResourceMonitor.get_metrics()
ram_str = f"{m['ram_used_gb']} / {m['ram_total_gb']} GB"
cpu_str = f"{m['cpu_usage_pct']}%"
return ram_str, cpu_str
def on_scan(repo):
files = kernel.list_ggufs(repo)
if not files:
return gr.update(choices=[], value=None), "Repo scan failed or no GGUFs found."
return gr.update(choices=files, value=files[0]), f"Found {len(files)} quants."
def on_boot(repo, file):
yield "Initialising boot sequence...", gr.update(open=True)
res = kernel.boot_kernel(repo, file)
yield res, gr.update(open=True)
def on_stitch(text):
res = kernel.stitch_cache(text)
return f"Cache State: `{res}`"
demo.load(update_system_stats, None, [ram_metric, cpu_metric], every=2)
scan_btn.click(on_scan, [repo_input], [quant_dropdown, log_output])
boot_btn.click(
on_boot,
[repo_input, quant_dropdown],
[boot_status, sidebar]
)
stitch_btn.click(on_stitch, [ghost_buffer], [stitch_status])
input_args = [user_input, chat_box, ghost_buffer]
user_input.submit(kernel.inference_generator, input_args, [chat_box], concurrency_limit=2)
send_btn.click(kernel.inference_generator, input_args, [chat_box], concurrency_limit=2)
user_input.submit(lambda: "", None, [user_input])
user_input.submit(lambda: "", None, [ghost_buffer])
# ==========================================
# KERNEL EXECUTION
# ==========================================
if __name__ == "__main__":
demo.queue(max_size=20).launch(show_api=False)