""" DarkMedia-X Studio — Backend API Server Deployed as Hugging Face Space (Docker) Serves REST API for the Vercel frontend dashboard. """ import os import json import time import re import hashlib import psutil import requests import threading import io from pathlib import Path from functools import wraps from fastapi import FastAPI, HTTPException, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import HTMLResponse, JSONResponse from dotenv import load_dotenv from PIL import Image, ImageFilter, ImageOps, ImageEnhance load_dotenv() # Custom Swagger UI CSS for DarkMedia-X theme DARKMEDIA_SWAGGER_CSS = """ """ app = FastAPI( title="DarkMedia-X Studio API", description="### 🎬 Automated Horror/Anime Video Production System\n\n" "**Endpoints for the Vercel frontend dashboard.**\n\n" "All media is stored on Cloudflare R2. Images are proxied through this API to avoid CORS issues.\n\n" "---\n" "### Quick Links\n" "- **Frontend**: https://darkmedia-xstudio.vercel.app\n" "- **HF Space**: https://huggingface.co/spaces/cybermedia/darkmedia-x-api\n" "- **R2 Bucket**: darkmedia-x-studio", version="1.0.0", docs_url=None, # Disable default docs redoc_url="/redoc", ) # Custom Swagger UI with DarkMedia-X theme @app.get("/docs", include_in_schema=False) async def custom_swagger_ui_html(): from fastapi.openapi.docs import get_swagger_ui_html from fastapi.openapi.utils import get_openapi openapi_schema = get_openapi( title=app.title, version=app.version, description=app.description, routes=app.routes, ) # Serve OpenAPI JSON @app.get("/openapi.json", include_in_schema=False) async def openapi_json(): return openapi_schema html_content = f""" {app.title} - Swagger UI {DARKMEDIA_SWAGGER_CSS}
""" return HTMLResponse(content=html_content) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # --- Server-Side Cache --- class Cache: """Simple TTL cache with ETag support.""" def __init__(self): self._store = {} def get(self, key): if key in self._store: entry = self._store[key] if time.time() - entry["ts"] < entry["ttl"]: return entry["data"], entry["etag"] del self._store[key] return None, None def set(self, key, data, ttl=60): etag = hashlib.md5(json.dumps(data, sort_keys=True).encode()).hexdigest()[:12] self._store[key] = {"data": data, "etag": etag, "ts": time.time(), "ttl": ttl} return etag def invalidate(self, key): self._store.pop(key, None) cache = Cache() def cached_endpoint(key, ttl=60): """Decorator that adds server-side caching + ETag + Cache-Control headers.""" def decorator(func): @wraps(func) async def wrapper(request: Request, *args, **kwargs): # Check If-None-Match for ETag if_none_match = request.headers.get("if-none-match") cached_data, cached_etag = cache.get(key) if cached_data and if_none_match and if_none_match.strip('"') == cached_etag: return JSONResponse(status_code=304, content={}, headers={ "ETag": f'"{cached_etag}"', "Cache-Control": f"public, max-age={ttl}, stale-while-revalidate={ttl*2}", }) if cached_data: return JSONResponse(content=cached_data, headers={ "ETag": f'"{cached_etag}"', "Cache-Control": f"public, max-age={ttl}, stale-while-revalidate={ttl*2}", }) result = await func(request, *args, **kwargs) if isinstance(result, dict): etag = cache.set(key, result, ttl) return JSONResponse(content=result, headers={ "ETag": f'"{etag}"', "Cache-Control": f"public, max-age={ttl}, stale-while-revalidate={ttl*2}", }) return result return wrapper return decorator def no_cache(func): """Decorator for endpoints that must always return fresh data.""" @wraps(func) async def wrapper(*args, **kwargs): result = await func(*args, **kwargs) if isinstance(result, dict): return JSONResponse(content=result, headers={ "Cache-Control": "no-cache, no-store, must-revalidate", "Pragma": "no-cache", "Expires": "0", }) return result return wrapper # Paths SPACE_DIR = Path(__file__).parent.resolve() DATA_DIR = SPACE_DIR / "data" STORIES_DIR = DATA_DIR / "stories" ASSETS_DIR = DATA_DIR / "assets" VIDEOS_DIR = DATA_DIR / "videos" STATE_DIR = DATA_DIR / "state" for d in [DATA_DIR, STORIES_DIR, ASSETS_DIR, VIDEOS_DIR, STATE_DIR]: d.mkdir(parents=True, exist_ok=True) # --- R2 Client --- def get_r2_client(): endpoint = os.getenv("R2_ENDPOINT", "") access_key = os.getenv("R2_ACCESS_KEY_ID", "") secret_key = os.getenv("R2_SECRET_ACCESS_KEY", "") if not all([endpoint, access_key, secret_key]): return None try: import boto3 from botocore.config import Config return boto3.client( "s3", endpoint_url=endpoint, aws_access_key_id=access_key, aws_secret_access_key=secret_key, config=Config(signature_version="s3v4"), ) except Exception: return None def r2_list(prefix=""): client = get_r2_client() if client is None: return [] bucket = os.getenv("R2_BUCKET", "darkmedia-x-studio") try: paginator = client.get_paginator("list_objects_v2") keys = [] for page in paginator.paginate(Bucket=bucket, Prefix=prefix): for obj in page.get("Contents", []): keys.append(obj["Key"]) return keys except Exception: return [] def r2_read_text(key): client = get_r2_client() if client is None: return None bucket = os.getenv("R2_BUCKET", "darkmedia-x-studio") try: resp = client.get_object(Bucket=bucket, Key=key) return resp["Body"].read().decode("utf-8") except Exception: return None def r2_presigned_url(key, expires_in=3600): client = get_r2_client() if client is None: return None bucket = os.getenv("R2_BUCKET", "darkmedia-x-studio") try: return client.generate_presigned_url( "get_object", Params={"Bucket": bucket, "Key": key}, ExpiresIn=expires_in, ) except Exception: return None @app.get("/api/image/{image_path:path}", tags=["Images"]) async def proxy_image(image_path: str): """Proxy image from R2 to avoid CORS issues with direct R2 URLs.""" import urllib.parse from fastapi.responses import Response key = urllib.parse.unquote(image_path) # Remove leading slash if present if key.startswith("/"): key = key[1:] client = get_r2_client() if client is None: raise HTTPException(status_code=500, detail="R2 not configured") bucket = os.getenv("R2_BUCKET", "darkmedia-x-studio") try: resp = client.get_object(Bucket=bucket, Key=key) content = resp["Body"].read() content_type = resp.get("ContentType", "image/png") # Determine content type from extension if not set if not content_type or content_type == "binary/octet-stream": if key.lower().endswith(".png"): content_type = "image/png" elif key.lower().endswith((".jpg", ".jpeg")): content_type = "image/jpeg" elif key.lower().endswith(".gif"): content_type = "image/gif" elif key.lower().endswith(".webp"): content_type = "image/webp" return Response(content=content, media_type=content_type, headers={ "Cache-Control": "public, max-age=3600", "Access-Control-Allow-Origin": "*", }) except Exception as e: raise HTTPException(status_code=404, detail=f"Image not found: {str(e)}") # --- Stories --- @app.get("/api/stories", tags=["Stories"]) @cached_endpoint(key="stories", ttl=30) async def get_stories(request: Request): stories = [] seen = set() # Local scan if STORIES_DIR.exists(): for dirpath, _, filenames in os.walk(str(STORIES_DIR)): for filename in filenames: if not filename.endswith(".md") or filename.startswith("README") or filename == "music_prompt.md": continue file_path = Path(dirpath) / filename rel_path = file_path.relative_to(STORIES_DIR) story_id = str(rel_path.parent).replace("\\", "/") if story_id in seen: continue seen.add(story_id) title = filename.replace(".md", "").replace("_", " ").strip() if title.lower() in {"story", "index", "readme"}: title = story_id.split("/")[-1].replace("_", " ").strip() img_dir = file_path.parent / "assets" / "images" if not img_dir.exists(): img_dir = file_path.parent / "images" image_count = 0 if img_dir.exists(): image_count = len([f for f in os.listdir(str(img_dir)) if f.lower().endswith((".png", ".jpg", ".jpeg"))]) scenes_dir = file_path.parent / "assets" / "scenes" total_scenes = 0 if scenes_dir.exists(): total_scenes = len([f for f in os.listdir(str(scenes_dir)) if f.startswith("scene_") and f.endswith(".txt")]) content = file_path.read_text(encoding="utf-8") word_count = len(content.split()) parent = file_path.parent category = parent.parent.name if parent.parent != STORIES_DIR else "General" stories.append({ "id": story_id, "title": title, "path": str(rel_path).replace("\\", "/"), "category": category, "processed": False, "image_count": image_count, "total_scenes": total_scenes if total_scenes > 0 else 10, "ready": image_count >= (total_scenes if total_scenes > 0 else 10), "word_count": word_count, "is_empty": word_count < 20, }) # R2 scan r2_keys = r2_list(prefix="stories/") for key in r2_keys: if not key.endswith(".md") or "README" in key or "music_prompt" in key: continue parts = key.split("/") if len(parts) < 3: continue story_id = "/".join(parts[1:-1]) if story_id in seen: continue seen.add(story_id) filename = parts[-1] title = filename.replace(".md", "").replace("_", " ").strip() if title.lower() in {"story", "index", "readme"}: title = story_id.split("/")[-1].replace("_", " ").strip() category = parts[1] # Count images on R2 image_prefix = "/".join(parts[:3]) + "/assets/images/" image_keys = [k for k in r2_keys if k.startswith(image_prefix) and k.endswith((".png", ".jpg", ".jpeg"))] image_count = len(image_keys) content = r2_read_text(key) or "" word_count = len(content.split()) stories.append({ "id": story_id, "title": title, "path": key, "category": category, "processed": False, "image_count": image_count, "total_scenes": 10, "ready": image_count >= 10, "word_count": word_count, "is_empty": word_count < 20, }) return {"stories": sorted(stories, key=lambda s: s["title"])} @app.get("/api/stories/content/{story_path:path}", tags=["Stories"]) @cached_endpoint(key="story_content", ttl=300) async def get_story_content(request: Request, story_path: str): """Get story content by path (frontend calls /api/stories/content/{id}).""" # Try local first base = STORIES_DIR / story_path for f in ["story.md", "story.txt"]: if (base / f).exists(): return {"content": (base / f).read_text(encoding="utf-8"), "status": "success", "path": story_path, "filename": f} # Try R2 r2_key = f"stories/{story_path}/story.md" content = r2_read_text(r2_key) if content: return {"content": content, "status": "success", "path": story_path, "filename": "story.md"} raise HTTPException(status_code=404, detail="Story not found") @app.get("/api/stories/{story_path:path}", tags=["Stories"]) @cached_endpoint(key="story_content", ttl=300) async def get_story(request: Request, story_path: str): base = STORIES_DIR / story_path for f in ["story.md", "story.txt"]: if (base / f).exists(): return {"content": (base / f).read_text(encoding="utf-8")} # Try R2 r2_key = f"stories/{story_path}/story.md" content = r2_read_text(r2_key) if content: return {"content": content} raise HTTPException(status_code=404, detail="Story not found") # --- Assets --- @app.get("/api/assets/music", tags=["Assets"]) @cached_endpoint(key="music", ttl=60) async def get_music(request: Request): music_dir = ASSETS_DIR / "background_music" files = [] if music_dir.exists(): files = [f.name for f in music_dir.glob("*.mp3") | music_dir.glob("*.wav")] return {"music": sorted(files)} @app.get("/api/assets/voice_samples", tags=["Assets"]) @cached_endpoint(key="voice_samples", ttl=60) async def get_voice_samples(request: Request): voice_dir = ASSETS_DIR / "voice_samples" files = [] if voice_dir.exists(): files = [f.name for f in voice_dir.glob("*.wav") | voice_dir.glob("*.mp3")] return {"samples": sorted(files)} # --- Videos --- @app.get("/api/videos", tags=["Videos"]) @cached_endpoint(key="videos", ttl=30) async def get_videos(request: Request): """List videos from R2 storage.""" videos = [] # Scan R2 for videos r2_keys = r2_list(prefix="videos/") for key in r2_keys: if key.lower().endswith(".mp4"): filename = key.split("/")[-1] # Extract story info from path if available parts = key.split("/") story = parts[1] if len(parts) > 1 else "Unknown" # Use proxy URL for video playback proxy_url = f"/api/video/play/{key}" videos.append({ "filename": filename, "path": key, "url": proxy_url, "story": story, }) # Also scan local directory (for local development) if VIDEOS_DIR.exists(): for f in VIDEOS_DIR.glob("*.mp4"): if not any(v["filename"] == f.name for v in videos): videos.append({ "filename": f.name, "path": f"videos/{f.name}", "url": f"/api/video/play/videos/{f.name}", "story": "local", }) return {"videos": sorted(videos, key=lambda x: x["filename"])} @app.get("/api/video/play/{video_path:path}", tags=["Videos"]) async def proxy_video(video_path: str): """Proxy video from R2 for playback.""" import urllib.parse from fastapi.responses import StreamingResponse key = urllib.parse.unquote(video_path) if key.startswith("/"): key = key[1:] client = get_r2_client() if client is None: raise HTTPException(status_code=500, detail="R2 not configured") bucket = os.getenv("R2_BUCKET", "darkmedia-x-studio") try: resp = client.get_object(Bucket=bucket, Key=key) content = resp["Body"].read() return StreamingResponse( iter([content]), media_type="video/mp4", headers={ "Cache-Control": "public, max-age=3600", "Access-Control-Allow-Origin": "*", "Accept-Ranges": "bytes", } ) except Exception as e: raise HTTPException(status_code=404, detail=f"Video not found: {str(e)}") # --- Generated Images (R2) --- @app.get("/api/generated_images", tags=["Images"]) @cached_endpoint(key="generated_images", ttl=60) async def get_generated_images(request: Request): r2_keys = r2_list(prefix="stories/") images = [] for key in r2_keys: if not key.endswith((".png", ".jpg", ".jpeg")) or "/images/" not in key: continue parts = key.split("/") category = parts[1] if len(parts) > 1 else "General" story_id = "/".join(parts[1:3]) scene = parts[-1].rsplit(".", 1)[0] # Use proxy URL to avoid CORS issues proxy_url = f"/api/image/{key}" images.append({ "url": proxy_url, "path": key, "filename": parts[-1], "story_id": story_id, "category": category, "scene": scene, }) return {"images": sorted(images, key=lambda x: x.get("path", ""))} # --- Library (Gallery view) --- @app.get("/api/library", tags=["Images"]) @cached_endpoint(key="library", ttl=30) async def get_library(request: Request): """Scan R2 for generated scene images, return gallery-ready list.""" r2_keys = r2_list(prefix="stories/") library = [] for key in r2_keys: if not key.endswith(".png"): continue # Skip non-scene images (e.g. UI assets) if "/images/" not in key and "/depths/" not in key and "/blender_test/" not in key: continue parts = key.split("/") if len(parts) < 4: continue # story_name is usually at index 2 (stories/{category}/{story}/...) story_name = parts[2] if len(parts) > 2 else "Unknown" # Use proxy URL to avoid CORS issues proxy_url = f"/api/image/{key}" library.append({ "story": story_name, "filename": parts[-1], "url": proxy_url, "path": key, }) library.sort(key=lambda x: x["filename"], reverse=True) return {"library": library[:200]} @app.delete("/api/library/{image_path:path}", tags=["Images"]) async def delete_library_image(image_path: str): """Delete an image from R2 storage.""" client = get_r2_client() if client is None: return {"status": "error", "message": "R2 client not configured"} bucket = os.getenv("R2_BUCKET", "darkmedia-x-studio") # URL-decode the path import urllib.parse key = urllib.parse.unquote(image_path) try: client.delete_object(Bucket=bucket, Key=key) # Invalidate caches cache.invalidate("library") cache.invalidate("generated_images") cache.invalidate("stories") return {"status": "success", "message": f"Deleted {key}"} except Exception as e: return {"status": "error", "message": str(e)} # --- Current Frame / Preview --- @app.get("/api/current_frame", tags=["Preview"]) @no_cache async def get_current_frame(): return {"frame": None, "narration": ""} # --- Status --- @app.get("/api/status", tags=["System"]) @no_cache async def get_status(): try: task_file = STATE_DIR / "current_task.json" if task_file.exists(): with open(task_file) as f: task = json.load(f) else: task = {"story": None, "step": "IDLE", "progress": 0} return { "status": "ready", "step": task.get("step", "IDLE"), "progress": task.get("progress", 0), "story": task.get("story"), "error": task.get("error"), "current_scene": task.get("current_scene", 0), "total_scenes": task.get("total_scenes", 0), "timestamp": time.time(), } except Exception as e: return {"status": "error", "message": str(e)} @app.get("/api/logs", tags=["System"]) async def get_logs(): return {"logs": []} # --- System Info --- @app.get("/api/system", tags=["System"]) @no_cache async def get_system(): try: ram = psutil.virtual_memory() return { "ram_used": ram.used / (1024**3), "ram_total": ram.total / (1024**3), "ram_percent": ram.percent, "cpu_percent": psutil.cpu_percent(interval=0.1), "python_version": os.sys.version.split()[0], } except Exception as e: return {"error": str(e)} # --- Engine Health --- @app.get("/api/engine/health", tags=["System"]) @no_cache async def get_engine_health(): try: task_file = STATE_DIR / "current_task.json" if task_file.exists(): with open(task_file) as f: task = json.load(f) story = task.get("story", "UNKNOWN") step = task.get("step", "IDLE") progress = task.get("progress", 0) else: story = "AUCUN" step = "IDLE" progress = 0 scenes_count = 0 total_scenes = 10 if story and story != "AUCUN": story_dir = STORIES_DIR / story img_dir = story_dir / "assets" / "images" if img_dir.exists(): scenes_count = len(list(img_dir.glob("scene_*.png"))) verdict = "HEALTHY" if progress == 0 else "RENDERING" verdict_color = "green" if progress == 0 else "yellow" return { "verdict": verdict, "verdict_color": verdict_color, "story": story, "step": step, "progress": progress, "scenes_generated": scenes_count, "total_scenes": total_scenes, "logs": [], } except Exception as e: return {"verdict": "ERROR", "verdict_color": "red", "message": str(e)} # --- UI Settings --- @app.get("/api/ui/settings", tags=["Settings"]) @cached_endpoint(key="ui_settings", ttl=10) async def get_ui_settings(request: Request): settings_file = STATE_DIR / "ui_settings.json" if settings_file.exists(): with open(settings_file) as f: return json.load(f) return {} @app.post("/api/ui/settings", tags=["Settings"]) async def save_ui_settings(settings: dict): settings_file = STATE_DIR / "ui_settings.json" with open(settings_file, "w") as f: json.dump(settings, f) cache.invalidate("ui_settings") return {"status": "ok"} # --- TTS Preview --- @app.get("/api/tts/voices", tags=["Audio"]) async def get_tts_voices(): """List available TTS voices.""" return { "voices": [ {"id": "fr-FR-DeniseNeural", "name": "Denise (FR)", "gender": "female", "language": "fr-FR"}, {"id": "fr-FR-HenriNeural", "name": "Henri (FR)", "gender": "male", "language": "fr-FR"}, {"id": "fr-CA-SylvieNeural", "name": "Sylvie (CA)", "gender": "female", "language": "fr-CA"}, {"id": "fr-CA-JeanNeural", "name": "Jean (CA)", "gender": "male", "language": "fr-CA"}, {"id": "fr-FR-AlainNeural", "name": "Alain (FR)", "gender": "male", "language": "fr-FR"}, {"id": "fr-FR-BrigitteNeural", "name": "Brigitte (FR)", "gender": "female", "language": "fr-FR"}, {"id": "fr-FR-CelesteNeural", "name": "Celeste (FR)", "gender": "female", "language": "fr-FR"}, {"id": "fr-FR-ClaudeNeural", "name": "Claude (FR)", "gender": "male", "language": "fr-FR"}, {"id": "fr-FR-CoralieNeural", "name": "Coralie (FR)", "gender": "female", "language": "fr-FR"}, {"id": "fr-FR-EloiseNeural", "name": "Eloise (FR)", "gender": "female", "language": "fr-FR"}, {"id": "fr-FR-JacquelineNeural", "name": "Jacqueline (FR)", "gender": "female", "language": "fr-FR"}, {"id": "fr-FR-JeromeNeural", "name": "Jerome (FR)", "gender": "male", "language": "fr-FR"}, {"id": "fr-FR-JosephineNeural", "name": "Josephine (FR)", "gender": "female", "language": "fr-FR"}, {"id": "fr-FR-MauriceNeural", "name": "Maurice (FR)", "gender": "male", "language": "fr-FR"}, {"id": "fr-FR-YvesNeural", "name": "Yves (FR)", "gender": "male", "language": "fr-FR"}, {"id": "fr-FR-YvetteNeural", "name": "Yvette (FR)", "gender": "female", "language": "fr-FR"}, ] } @app.post("/api/tts/preview", tags=["Audio"]) async def tts_preview(payload: dict): import tempfile import edge_tts text = payload.get("text", "Test audio DarkMedia-X.") voice = payload.get("voice", "fr-FR-DeniseNeural") rate = payload.get("rate", "+0%") pitch = payload.get("pitch", "+0Hz") # Map edge-tts voice names voice_map = { "fr-FR-DeniseNeural": "fr-FR-DeniseNeural", "fr-FR-HenriNeural": "fr-FR-HenriNeural", "fr-CA-SylvieNeural": "fr-CA-SylvieNeural", "fr-CA-JeanNeural": "fr-CA-JeanNeural", } tts_voice = voice_map.get(voice, "fr-FR-DeniseNeural") try: with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as tmp: comm = edge_tts.Communicate(text, tts_voice, rate=rate, pitch=pitch) await comm.save(tmp.name) tmp.seek(0) audio_data = tmp.read() import base64 b64 = base64.b64encode(audio_data).decode() return {"status": "success", "url": f"data:audio/mp3;base64,{b64}"} except Exception as e: return {"status": "error", "message": str(e)} # --- Story Images (R2) --- @app.get("/api/images", tags=["Images"]) async def get_story_images(story_id: str): """List images for a story from R2.""" import urllib.parse story_id = urllib.parse.unquote(story_id).strip("/") images = [] r2_keys = r2_list(prefix=f"stories/{story_id}/assets/images/") for key in r2_keys: if key.lower().endswith((".png", ".jpg", ".jpeg")): filename = key.split("/")[-1] # Use proxy URL to avoid CORS issues proxy_url = f"/api/image/{key}" images.append({ "path": key, "filename": filename, "url": proxy_url, "timestamp": 0, }) images.sort(key=lambda x: x["filename"]) return {"images": images} @app.delete("/api/images", tags=["Images"]) async def delete_story_image(story_id: str, filename: str): """Delete an image from R2.""" import urllib.parse story_id = urllib.parse.unquote(story_id).strip("/") filename = urllib.parse.unquote(filename).strip("/") key = f"stories/{story_id}/assets/images/{filename}" client = get_r2_client() if client is None: return {"status": "error", "message": "R2 not configured"} bucket = os.getenv("R2_BUCKET", "darkmedia-x-studio") try: client.delete_object(Bucket=bucket, Key=key) cache.invalidate("generated_images") cache.invalidate("library") cache.invalidate("stories") return {"status": "deleted", "filename": filename} except Exception as e: return {"status": "error", "message": str(e)} # --- Story Mutations (R2) --- @app.delete("/api/stories/{story_path:path}", tags=["Stories"]) async def delete_story(story_path: str): """Delete a story and all its assets from R2.""" import urllib.parse story_path = urllib.parse.unquote(story_path).strip("/") prefix = f"stories/{story_path}/" client = get_r2_client() if client is None: return {"status": "error", "message": "R2 not configured"} bucket = os.getenv("R2_BUCKET", "darkmedia-x-studio") try: # List and delete all objects under the story prefix paginator = client.get_paginator("list_objects_v2") for page in paginator.paginate(Bucket=bucket, Prefix=prefix): for obj in page.get("Contents", []): client.delete_object(Bucket=bucket, Key=obj["Key"]) cache.invalidate("stories") cache.invalidate("generated_images") cache.invalidate("library") return {"status": "success", "message": f"Story '{story_path}' deleted"} except Exception as e: return {"status": "error", "message": str(e)} @app.post("/api/stories/save", tags=["Stories"]) async def save_story(payload: dict): """Save story content to R2.""" story_id = payload.get("story_id", "") content = payload.get("content", "") if not story_id or not content: return {"status": "error", "message": "story_id and content required"} client = get_r2_client() if client is None: return {"status": "error", "message": "R2 not configured"} bucket = os.getenv("R2_BUCKET", "darkmedia-x-studio") key = f"stories/{story_id}/story.md" try: client.put_object(Bucket=bucket, Key=key, Body=content.encode("utf-8"), ContentType="text/markdown") cache.invalidate("stories") return {"status": "success", "message": "Story saved"} except Exception as e: return {"status": "error", "message": str(e)} @app.post("/api/stories/normalize_ai", tags=["Stories"]) async def normalize_ai(payload: dict = None): """Normalize story prompts (stub - returns success).""" return {"status": "success", "message": "Normalization complete", "fixed_files": 0} @app.post("/api/gemini/ask", tags=["AI"]) async def gemini_ask(payload: dict): """Call Gemini API for AI assistance.""" import requests api_key = os.getenv("GEMINI_API_KEY", "") if not api_key: return {"status": "error", "message": "GEMINI_API_KEY not configured"} prompt = payload.get("prompt", "") model = payload.get("model", "gemini-2.0-flash") try: url = f"https://generativelanguage.googleapis.com/v1beta/models/{model}:generateContent?key={api_key}" resp = requests.post(url, json={ "contents": [{"parts": [{"text": prompt}]}] }, timeout=60) data = resp.json() text = data.get("candidates", [{}])[0].get("content", {}).get("parts", [{}])[0].get("text", "") return {"status": "success", "response": text} except Exception as e: return {"status": "error", "message": str(e)} @app.post("/api/stories/launch", tags=["Pipeline"]) async def launch_pipeline(payload: dict): """Trigger image generation pipeline using HF Inference API (FLUX).""" import json import threading import io import re story_id = payload.get("story_id", "") story_path = payload.get("story_path", "") config = payload.get("config", {}) regenerate = payload.get("regenerate_all", False) if not story_id: return {"status": "error", "message": "story_id required"} # Write initial task status task_file = STATE_DIR / "current_task.json" task_data = { "story": story_id, "story_path": story_path, "step": "READING_STORY", "progress": 0, "config": config, "timestamp": time.time() } try: STATE_DIR.mkdir(parents=True, exist_ok=True) with open(task_file, "w") as f: json.dump(task_data, f) except Exception: pass cache.invalidate("status") def run_voice_generation(client, bucket, task_file, task_data, config): """Generate TTS audio for all scenes.""" import edge_tts import tempfile story_key = f"stories/{story_id}/story.md" try: story_content = r2_read_text(story_key) if not story_content: with open(task_file, "w") as f: json.dump({**task_data, "step": "ERROR", "error": "Story not found"}, f) return except Exception as e: with open(task_file, "w") as f: json.dump({**task_data, "step": "ERROR", "error": str(e)}, f) return scenes = [] scene_pattern = re.compile(r'#+\s*[Ss]c[eè]ne\s*(\d+)[\s:]*\n*(.*?)(?=#+\s*[Ss]c[eè]ne|\Z)', re.DOTALL) matches = scene_pattern.findall(story_content) if not matches: blocks = [b.strip() for b in story_content.split('\n\n') if b.strip()] for i, block in enumerate(blocks[:10], 1): scenes.append({"id": i, "text": block[:500]}) else: for num, content in matches: scenes.append({"id": int(num), "text": content.strip()[:500]}) if not scenes: with open(task_file, "w") as f: json.dump({**task_data, "step": "ERROR", "error": "No scenes found"}, f) return voice = config.get("voice", "fr-FR-DeniseNeural") rate = config.get("rate", "+0%") total_scenes = len(scenes) for idx, scene in enumerate(scenes): scene_num = scene["id"] scene_text = scene["text"] progress = int((idx / total_scenes) * 100) with open(task_file, "w") as f: json.dump({ **task_data, "step": f"GENERATING_VOICE_{scene_num}", "progress": progress, "current_scene": scene_num, "total_scenes": total_scenes }, f) cache.invalidate("status") audio_key = f"stories/{story_id}/assets/audio/scene_{scene_num}.mp3" try: with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as tmp: comm = edge_tts.Communicate(scene_text, voice, rate=rate) await comm.save(tmp.name) tmp.seek(0) with open(tmp.name, "rb") as f: client.put_object( Bucket=bucket, Key=audio_key, Body=f.read(), ContentType="audio/mp3" ) except Exception as e: import traceback error_msg = f"TTS error scene {scene_num}: {str(e)}\n{traceback.format_exc()}" with open(task_file, "w") as f: json.dump({**task_data, "step": "ERROR", "error": error_msg, "progress": progress}, f) return with open(task_file, "w") as f: json.dump({ **task_data, "step": "DONE", "progress": 100, "voices_generated": total_scenes }, f) cache.invalidate("status") cache.invalidate("stories") def run_music_generation(client, bucket, task_file, task_data, config): """Generate background music for the story.""" music_style = config.get("music_style", "dark_ambient") with open(task_file, "w") as f: json.dump({ **task_data, "step": "GENERATING_MUSIC", "progress": 50, "music_style": music_style }, f) cache.invalidate("status") # Placeholder: music generation not yet implemented # TODO: Integrate with MusicGen or similar with open(task_file, "w") as f: json.dump({ **task_data, "step": "DONE", "progress": 100, "music_generated": True }, f) cache.invalidate("status") def run_generation(): """Background thread for image/voice/music generation.""" hf_token = os.getenv("HF_TOKEN", "") if not hf_token: with open(task_file, "w") as f: json.dump({**task_data, "step": "ERROR", "error": "HF_TOKEN not configured"}, f) return client = get_r2_client() bucket = os.getenv("R2_BUCKET", "darkmedia-x-studio") # Check if this is a voice-only generation if config.get("voice_only", False): run_voice_generation(client, bucket, task_file, task_data, config) return # Check if this is a music-only generation if config.get("music_only", False): run_music_generation(client, bucket, task_file, task_data, config) return # 1. Read story content story_key = f"stories/{story_id}/story.md" try: story_content = r2_read_text(story_key) if not story_content: with open(task_file, "w") as f: json.dump({**task_data, "step": "ERROR", "error": "Story not found in R2"}, f) return except Exception as e: with open(task_file, "w") as f: json.dump({**task_data, "step": "ERROR", "error": str(e)}, f) return # 2. Parse scenes from story markdown scenes = [] scene_pattern = re.compile(r'#+\s*[Ss]c[eè]ne\s*(\d+)[\s:]*\n*(.*?)(?=#+\s*[Ss]c[eè]ne|\Z)', re.DOTALL) matches = scene_pattern.findall(story_content) if not matches: blocks = [b.strip() for b in story_content.split('\n\n') if b.strip()] for i, block in enumerate(blocks[:10], 1): scenes.append({"id": i, "prompt": block[:500]}) else: for num, content in matches: scenes.append({"id": int(num), "prompt": content.strip()[:500]}) if not scenes: with open(task_file, "w") as f: json.dump({**task_data, "step": "ERROR", "error": "No scenes found in story"}, f) return total_scenes = len(scenes) # 3. Generate images for each scene via Gradio Client (ZeroGPU) image_model = config.get("image_model", "flux") model_spaces = { "flux": os.getenv("HF_IMAGE_SPACE_FLUX", "cybermedia/flux-zerogpu"), "ssd": os.getenv("HF_IMAGE_SPACE_SSD", "cybermedia/ssd-zerogpu"), "sdxl": os.getenv("HF_IMAGE_SPACE_SDXL", "cybermedia/sdxl-zerogpu"), "playground": os.getenv("HF_IMAGE_SPACE_PLAYGROUND", "cybermedia/playground-zerogpu"), } gradio_space = model_spaces.get(image_model, model_spaces["flux"]) for idx, scene in enumerate(scenes): scene_num = scene["id"] scene_prompt = scene["prompt"] progress = int((idx / total_scenes) * 100) with open(task_file, "w") as f: json.dump({ **task_data, "step": f"GENERATING_SCENE_{scene_num}", "progress": progress, "current_scene": scene_num, "total_scenes": total_scenes }, f) cache.invalidate("status") img_key = f"stories/{story_id}/assets/images/scene_{scene_num}.png" if not regenerate: try: client.head_object(Bucket=bucket, Key=img_key) continue except Exception: pass enhanced_prompt = f"horror anime style, dark atmosphere, cinematic, {scene_prompt}" try: from gradio_client import Client gradio_client = Client(gradio_space) result = gradio_client.predict( prompt=enhanced_prompt, steps=20, seed=42, api_name="/generate" ) if isinstance(result, str): with open(result, "rb") as f: client.put_object( Bucket=bucket, Key=img_key, Body=f.read(), ContentType="image/png" ) else: error_msg = f"{image_model.upper()} Gradio returned unexpected result: {result}" with open(task_file, "w") as f: json.dump({**task_data, "step": "ERROR", "error": error_msg, "progress": progress}, f) return except Exception as e: import traceback error_msg = f"{image_model.upper()} Gradio error: {str(e)}\n{traceback.format_exc()}" with open(task_file, "w") as f: json.dump({**task_data, "step": "ERROR", "error": error_msg, "progress": progress}, f) return with open(task_file, "w") as f: json.dump({ **task_data, "step": "DONE", "progress": 100, "images_generated": total_scenes }, f) cache.invalidate("status") cache.invalidate("stories") cache.invalidate("generated_images") # Start generation in background thread thread = threading.Thread(target=run_generation, daemon=True) thread.start() return {"status": "success", "message": f"Generation started for '{story_id}'", "job_id": story_id} @app.post("/api/stories/remix", tags=["Pipeline"]) async def remix_story(payload: dict): """Remix a story with new settings.""" story_id = payload.get("story_id", "") if not story_id: return {"status": "error", "message": "story_id required"} return {"status": "success", "message": f"Remix queued for '{story_id}'"} @app.post("/api/stories/apply_art_style", tags=["Images"]) async def apply_art_style(payload: dict): """Apply an art style filter to all images of a story.""" story_id = payload.get("story_id", "") style = payload.get("style", "none") if not story_id or style == "none": return {"status": "error", "message": "story_id and style required"} client = get_r2_client() if client is None: return {"status": "error", "message": "R2 not configured"} bucket = os.getenv("R2_BUCKET", "darkmedia-x-studio") # List images for this story prefix = f"stories/{story_id}/assets/images/" r2_keys = r2_list(prefix=prefix) image_keys = [k for k in r2_keys if k.lower().endswith((".png", ".jpg", ".jpeg"))] if not image_keys: return {"status": "error", "message": "No images found for this story"} processed = 0 errors = [] for key in image_keys: try: # Download image from R2 resp = client.get_object(Bucket=bucket, Key=key) img_data = resp["Body"].read() img = Image.open(io.BytesIO(img_data)) # Apply style styled_img = apply_style_filter(img, style) # Save back to R2 with _styled suffix output = io.BytesIO() styled_img.save(output, format="PNG") output.seek(0) styled_key = key.replace(".png", "_styled.png").replace(".jpg", "_styled.png").replace(".jpeg", "_styled.png") client.put_object(Bucket=bucket, Key=styled_key, Body=output.getvalue(), ContentType="image/png") processed += 1 except Exception as e: errors.append(f"{key}: {str(e)}") cache.invalidate("generated_images") cache.invalidate("library") cache.invalidate("stories") return { "status": "success", "message": f"Applied '{style}' to {processed} images", "processed": processed, "errors": errors[:5] # Limit error output } def apply_style_filter(img, style): """Apply a style filter to a PIL Image.""" if style == "oil_paint": return img.filter(ImageFilter.SMOOTH_MORE).filter(ImageFilter.EDGE_ENHANCE_MORE) elif style == "charcoal": return ImageOps.grayscale(img).point(lambda x: 255 if x > 128 else 0, mode="1").convert("RGB") elif style == "sketch": gray = ImageOps.grayscale(img) inverted = ImageOps.invert(gray) blurred = inverted.filter(ImageFilter.GaussianBlur(radius=2)) return ImageOps.grayscale(Image.blend(gray, blurred, 0.5)).convert("RGB") elif style == "vintage": enhancer = ImageEnhance.Color(img) img = enhancer.enhance(0.7) enhancer = ImageEnhance.Brightness(img) img = enhancer.enhance(0.9) enhancer = ImageEnhance.Contrast(img) img = enhancer.enhance(1.2) # Warm tint r, g, b = img.split() r = r.point(lambda x: min(255, int(x * 1.15 + 15))) b = b.point(lambda x: max(0, int(x * 0.85))) return Image.merge("RGB", (r, g, b)) elif style == "night_vision": gray = ImageOps.grayscale(img) r = gray.point(lambda x: int(x * 0.2)) g = gray.point(lambda x: min(255, int(x * 1.3))) b = gray.point(lambda x: int(x * 0.2)) return Image.merge("RGB", (r, g, b)) elif style == "pixel_art": w, h = img.size small = img.resize((w // 8, h // 8), Image.NEAREST) return small.resize((w, h), Image.NEAREST) elif style == "vhs_static": import random pixels = img.load() w, h = img.size for y in range(0, h, 3): for x in range(w): r, g, b = pixels[x, y][:3] noise = random.randint(-30, 30) pixels[x, y] = ( min(255, max(0, r + noise + 10)), min(255, max(0, g + noise)), min(255, max(0, b + noise - 10)) ) return img return img # --- Config --- @app.get("/api/config", tags=["System"]) @cached_endpoint(key="config", ttl=3600) async def get_config(request: Request): return { "ai_mode": os.getenv("AI_MODE", "cloud"), "image_gen_mode": os.getenv("IMAGE_GEN_MODE", "gemini"), "voice_gen_mode": os.getenv("VOICE_GEN_MODE", "edge-tts"), } # --- Health Check --- @app.get("/health", tags=["System"]) async def health(): return {"status": "ok", "service": "darkmedia-x-api"} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)