| """ |
| GameForge Space Backend - With debug |
| """ |
| import os |
| import sys |
| import json |
| import traceback |
| from pathlib import Path |
|
|
| import spaces |
| from gradio import Server |
| from gradio.data_classes import FileData |
| from fastapi.responses import HTMLResponse, JSONResponse |
|
|
| SPACE_DIR = Path(__file__).parent |
| sys.path.insert(0, str(SPACE_DIR)) |
|
|
| |
| _init_errors = [] |
| _registry = None |
| _router = None |
|
|
| try: |
| from gameforge.config.registry_loader import get_registry |
| _registry = get_registry() |
| _init_errors.append(f"Registry loaded: {_registry.summary()}") |
| except Exception as e: |
| _init_errors.append(f"Registry error: {e}\n{traceback.format_exc()}") |
|
|
| try: |
| from gameforge.engine.router import get_router |
| _router = get_router() |
| _init_errors.append("Router loaded OK") |
| except Exception as e: |
| _init_errors.append(f"Router error: {e}") |
|
|
| try: |
| from gameforge.config.prompts import get_template_for_asset, format_npc_dialogue |
| _init_errors.append("Prompts loaded OK") |
| except Exception as e: |
| _init_errors.append(f"Prompts error: {e}") |
|
|
| STORAGE_DIR = Path("/tmp/gameforge_assets") |
| STORAGE_DIR.mkdir(parents=True, exist_ok=True) |
|
|
| app = Server() |
|
|
|
|
| @app.get("/") |
| async def homepage(): |
| html_path = SPACE_DIR / "static" / "index.html" |
| if html_path.exists(): |
| return HTMLResponse(html_path.read_text(encoding="utf-8")) |
| return HTMLResponse("<h1>GameForge</h1><p>Frontend not found</p>") |
|
|
|
|
| @app.get("/static/{path:path}") |
| async def static_files(path: str): |
| from fastapi.responses import FileResponse |
| file_path = SPACE_DIR / "static" / path |
| if file_path.exists() and file_path.is_file(): |
| media_types = {".css": "text/css", ".js": "application/javascript", ".png": "image/png"} |
| return FileResponse(str(file_path), media_type=media_types.get(file_path.suffix, "application/octet-stream")) |
| return {"error": "Not found"} |
|
|
|
|
| @app.get("/debug") |
| async def debug(): |
| """Debug endpoint to check initialization.""" |
| return JSONResponse({ |
| "errors": _init_errors, |
| "registry_models": len(_registry.list_asset_types()) if _registry else 0, |
| "pipelines": list((SPACE_DIR / "pipelines").glob("*.yaml")) if (SPACE_DIR / "pipelines").exists() else [], |
| }) |
|
|
|
|
| @app.api() |
| def registry_info(): |
| if not _registry: |
| return {"summary": {"error": "Registry not loaded"}, "models": []} |
| summary = _registry.summary() |
| models = [] |
| for at in _registry.list_asset_types(): |
| asset = _registry.get_asset(at) |
| if asset: |
| for v, m in asset.variants.items(): |
| models.append({ |
| "asset_type": at, "variant": v, "model": m.model, |
| "type": m.type, "license": m.license, "hardware": m.hardware, |
| "status": m.status, "free": m.is_free, "commercial_safe": m.is_commercial_safe, |
| }) |
| return {"summary": summary, "models": models} |
|
|
|
|
| @app.api() |
| def get_route(asset_type: str, variant: str = "primary"): |
| if not _router: |
| return {"error": "Router not loaded"} |
| return _router.route(asset_type, variant).to_dict() |
|
|
|
|
| @app.api() |
| def list_pipelines(): |
| pipes_dir = SPACE_DIR / "pipelines" |
| result = [] |
| for path in sorted(pipes_dir.glob("*.yaml")): |
| try: |
| import yaml |
| with open(path) as f: |
| data = yaml.safe_load(f) |
| result.append({"name": path.stem, "description": data.get("description", ""), "steps": len(data.get("steps", []))}) |
| except: |
| pass |
| return result |
|
|
|
|
| @app.api() |
| def format_prompt(asset_type: str, user_prompt: str, model_family: str = ""): |
| try: |
| template = get_template_for_asset(asset_type, model_family) |
| if template: |
| return template.format(user_prompt) |
| except: |
| pass |
| return {"prompt": user_prompt, "negative_prompt": ""} |
|
|
|
|
| @app.api() |
| def format_npc(text: str, emotion: str = "neutral", speaker: str = ""): |
| return {"formatted": format_npc_dialogue(text, emotion, speaker)} |
|
|
|
|
| @app.api() |
| def list_assets(folder: str = ""): |
| search_dir = STORAGE_DIR / folder if folder else STORAGE_DIR |
| if not search_dir.exists(): |
| return [] |
| return [{"name": f.name, "path": str(f), "size": f.stat().st_size, "format": f.suffix.lower()} |
| for f in sorted(search_dir.rglob("*")) if f.is_file()] |
|
|
|
|
| @spaces.GPU(duration=60) |
| def _generate_image(prompt: str, negative_prompt: str = "", steps: int = 4): |
| from huggingface_hub import InferenceClient |
| token = "" |
| for tp in [os.path.expanduser("~/.cache/huggingface/token"), os.path.expanduser("~/.huggingface/token")]: |
| if os.path.isfile(tp): |
| token = open(tp).read().strip() |
| break |
| client = InferenceClient(token=token, provider="hf-inference") |
| image = client.text_to_image(prompt, model="black-forest-labs/FLUX.1-schnell", |
| negative_prompt=negative_prompt or None, num_inference_steps=steps) |
| out_path = str(STORAGE_DIR / f"img_{hash(prompt) % 100000}.png") |
| image.save(out_path) |
| return out_path |
|
|
|
|
| @app.api() |
| def generate_image(prompt: str, negative_prompt: str = "", steps: int = 4): |
| out_path = _generate_image(prompt, negative_prompt, steps) |
| return FileData(path=out_path) |
|
|
|
|
| @spaces.GPU(duration=60) |
| def _generate_voice(text: str): |
| from gradio_client import Client as GradioClient |
| import shutil |
| client = GradioClient("mrfakename/MeloTTS") |
| result = client.predict(text, api_name="/synthesize") |
| audio_path = result[0] if isinstance(result, tuple) else result |
| out_path = str(STORAGE_DIR / f"voice_{hash(text) % 100000}.wav") |
| if os.path.isfile(str(audio_path)): |
| shutil.copy2(str(audio_path), out_path) |
| return out_path |
| return None |
|
|
|
|
| @app.api() |
| def generate_voice(text: str): |
| result = _generate_voice(text) |
| return FileData(path=result) if result else None |
|
|
|
|
| @spaces.GPU(duration=120) |
| def _generate_video(prompt: str): |
| from gradio_client import Client as GradioClient |
| import shutil |
| client = GradioClient("alexnasa/ltx-2-TURBO") |
| result = client.predict(prompt, api_name="/generate") |
| video_path = result[0] if isinstance(result, tuple) else result |
| out_path = str(STORAGE_DIR / f"video_{hash(prompt) % 100000}.mp4") |
| if os.path.isfile(str(video_path)): |
| shutil.copy2(str(video_path), out_path) |
| return out_path |
| return None |
|
|
|
|
| @app.api() |
| def generate_video(prompt: str): |
| result = _generate_video(prompt) |
| return FileData(path=result) if result else None |
|
|
|
|
| if __name__ == "__main__": |
| app.launch(show_error=True) |
|
|