Spaces:
Sleeping
Sleeping
| """ | |
| DarkMedia-X Studio — Backend API Server | |
| Deployed as Hugging Face Space (Docker) | |
| Serves REST API for the Vercel frontend dashboard. | |
| """ | |
| import os | |
| import json | |
| import time | |
| import re | |
| import hashlib | |
| import psutil | |
| import requests | |
| import threading | |
| import io | |
| from pathlib import Path | |
| from functools import wraps | |
| from fastapi import FastAPI, HTTPException, Request | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse | |
| from dotenv import load_dotenv | |
| # Load environment variables from current or parent directories | |
| load_dotenv() | |
| load_dotenv(dotenv_path=Path(__file__).parent / ".env") | |
| load_dotenv(dotenv_path=Path(__file__).parent.parent / ".env") | |
| # Auth configuration | |
| AUTH_PROVIDER_URL = (os.getenv("AUTH_ROUTER_URL") or os.getenv("AUTH_PROVIDER_URL", "https://auth-provider-api.vercel.app")).rstrip("/") | |
| AUTH_PROVIDER_KEY = os.getenv("AUTH_ROUTER_HEADER_API_KEY") or os.getenv("HEADER_AUTH_PROVIDER_API_KEY", "") | |
| # Custom Swagger UI CSS for DarkMedia-X theme | |
| DARKMEDIA_SWAGGER_CSS = """ | |
| <style> | |
| /* DarkMedia-X Theme Overrides */ | |
| :root { | |
| --dmx-bg: #0a0a0a; | |
| --dmx-bg-secondary: #111111; | |
| --dmx-border: #1a1a1a; | |
| --dmx-text: #e0e0e0; | |
| --dmx-text-muted: #888888; | |
| --dmx-accent: #e94560; | |
| --dmx-accent-hover: #ff6b81; | |
| --dmx-success: #4caf50; | |
| --dmx-warning: #ff9800; | |
| --dmx-error: #f44336; | |
| } | |
| body { | |
| background: var(--dmx-bg) !important; | |
| color: var(--dmx-text) !important; | |
| font-family: 'Courier New', monospace !important; | |
| } | |
| /* Header */ | |
| .swagger-ui .topbar { | |
| background: linear-gradient(135deg, #0a0a0a 0%, #1a0a0a 100%) !important; | |
| border-bottom: 2px solid var(--dmx-accent) !important; | |
| } | |
| .swagger-ui .topbar .download-url-wrapper input[type=text] { | |
| background: var(--dmx-bg-secondary) !important; | |
| color: var(--dmx-text) !important; | |
| border: 1px solid var(--dmx-border) !important; | |
| } | |
| .swagger-ui .topbar .download-url-wrapper .download-url-button { | |
| background: var(--dmx-accent) !important; | |
| color: #fff !important; | |
| } | |
| /* Title */ | |
| .swagger-ui .info .title { | |
| color: var(--dmx-accent) !important; | |
| font-family: 'Courier New', monospace !important; | |
| letter-spacing: 2px !important; | |
| } | |
| .swagger-ui .info .base-url { | |
| color: var(--dmx-text-muted) !important; | |
| } | |
| .swagger-ui .info a { | |
| color: var(--dmx-accent) !important; | |
| } | |
| /* Tags */ | |
| .swagger-ui .opblock-tag { | |
| background: var(--dmx-bg-secondary) !important; | |
| border-bottom: 1px solid var(--dmx-border) !important; | |
| color: var(--dmx-accent) !important; | |
| font-family: 'Courier New', monospace !important; | |
| letter-spacing: 1px !important; | |
| } | |
| .swagger-ui .opblock-tag:hover { | |
| background: #1a0a0a !important; | |
| } | |
| .swagger-ui .opblock-tag small { | |
| color: var(--dmx-text-muted) !important; | |
| } | |
| /* Operations */ | |
| .swagger-ui .opblock { | |
| background: var(--dmx-bg-secondary) !important; | |
| border: 1px solid var(--dmx-border) !important; | |
| border-radius: 4px !important; | |
| margin-bottom: 10px !important; | |
| } | |
| .swagger-ui .opblock:hover { | |
| border-color: var(--dmx-accent) !important; | |
| box-shadow: 0 0 10px rgba(233, 69, 96, 0.2) !important; | |
| } | |
| .swagger-ui .opblock .opblock-summary { | |
| border: none !important; | |
| } | |
| .swagger-ui .opblock .opblock-summary-method { | |
| background: var(--dmx-accent) !important; | |
| color: #fff !important; | |
| font-weight: bold !important; | |
| border-radius: 3px !important; | |
| } | |
| .swagger-ui .opblock .opblock-summary-path { | |
| color: var(--dmx-text) !important; | |
| font-family: 'Courier New', monospace !important; | |
| } | |
| .swagger-ui .opblock .opblock-summary-description { | |
| color: var(--dmx-text-muted) !important; | |
| } | |
| /* Method colors */ | |
| .swagger-ui .opblock.opblock-get { | |
| border-left: 4px solid #4caf50 !important; | |
| } | |
| .swagger-ui .opblock.opblock-get .opblock-summary-method { | |
| background: #4caf50 !important; | |
| } | |
| .swagger-ui .opblock.opblock-post { | |
| border-left: 4px solid #e94560 !important; | |
| } | |
| .swagger-ui .opblock.opblock-post .opblock-summary-method { | |
| background: #e94560 !important; | |
| } | |
| .swagger-ui .opblock.opblock-delete { | |
| border-left: 4px solid #f44336 !important; | |
| } | |
| .swagger-ui .opblock.opblock-delete .opblock-summary-method { | |
| background: #f44336 !important; | |
| } | |
| .swagger-ui .opblock.opblock-put { | |
| border-left: 4px solid #ff9800 !important; | |
| } | |
| .swagger-ui .opblock.opblock-put .opblock-summary-method { | |
| background: #ff9800 !important; | |
| } | |
| /* Parameters */ | |
| .swagger-ui .parameters-col_description input[type=text], | |
| .swagger-ui .parameters-col_description textarea { | |
| background: var(--dmx-bg) !important; | |
| color: var(--dmx-text) !important; | |
| border: 1px solid var(--dmx-border) !important; | |
| border-radius: 3px !important; | |
| } | |
| .swagger-ui .parameters-col_description input[type=text]:focus, | |
| .swagger-ui .parameters-col_description textarea:focus { | |
| border-color: var(--dmx-accent) !important; | |
| box-shadow: 0 0 5px rgba(233, 69, 96, 0.3) !important; | |
| } | |
| .swagger-ui table thead tr td, | |
| .swagger-ui table thead tr th { | |
| color: var(--dmx-accent) !important; | |
| border-bottom: 1px solid var(--dmx-border) !important; | |
| } | |
| .swagger-ui table tbody tr td { | |
| border-bottom: 1px solid var(--dmx-border) !important; | |
| } | |
| /* Execute button */ | |
| .swagger-ui .btn.execute { | |
| background: var(--dmx-accent) !important; | |
| color: #fff !important; | |
| border: none !important; | |
| border-radius: 3px !important; | |
| font-weight: bold !important; | |
| letter-spacing: 1px !important; | |
| } | |
| .swagger-ui .btn.execute:hover { | |
| background: var(--dmx-accent-hover) !important; | |
| box-shadow: 0 0 15px rgba(233, 69, 96, 0.4) !important; | |
| } | |
| .swagger-ui .btn.cancel { | |
| background: transparent !important; | |
| color: var(--dmx-text-muted) !important; | |
| border: 1px solid var(--dmx-border) !important; | |
| } | |
| /* Response */ | |
| .swagger-ui .responses-inner { | |
| background: var(--dmx-bg) !important; | |
| border: 1px solid var(--dmx-border) !important; | |
| border-radius: 4px !important; | |
| } | |
| .swagger-ui .response-col_status { | |
| color: var(--dmx-text) !important; | |
| } | |
| .swagger-ui .response-col_status .response-undocumented { | |
| color: var(--dmx-text-muted) !important; | |
| } | |
| .swagger-ui .highlight-code { | |
| background: var(--dmx-bg) !important; | |
| } | |
| .swagger-ui .highlight-code .microlight { | |
| background: var(--dmx-bg) !important; | |
| color: var(--dmx-text) !important; | |
| } | |
| /* Models */ | |
| .swagger-ui section.models { | |
| background: var(--dmx-bg-secondary) !important; | |
| border: 1px solid var(--dmx-border) !important; | |
| } | |
| .swagger-ui section.models h4 { | |
| color: var(--dmx-text-muted) !important; | |
| border-bottom: 1px solid var(--dmx-border) !important; | |
| } | |
| .swagger-ui .model-title { | |
| color: var(--dmx-accent) !important; | |
| } | |
| .swagger-ui .model { | |
| background: var(--dmx-bg) !important; | |
| border: 1px solid var(--dmx-border) !important; | |
| } | |
| /* Scrollbar */ | |
| ::-webkit-scrollbar { | |
| width: 8px !important; | |
| height: 8px !important; | |
| } | |
| ::-webkit-scrollbar-track { | |
| background: var(--dmx-bg) !important; | |
| } | |
| ::-webkit-scrollbar-thumb { | |
| background: var(--dmx-border) !important; | |
| border-radius: 4px !important; | |
| } | |
| ::-webkit-scrollbar-thumb:hover { | |
| background: var(--dmx-accent) !important; | |
| } | |
| /* Links */ | |
| .swagger-ui a { | |
| color: var(--dmx-accent) !important; | |
| } | |
| .swagger-ui a:hover { | |
| color: var(--dmx-accent-hover) !important; | |
| } | |
| /* Try it out */ | |
| .swagger-ui .try-out__btn { | |
| background: transparent !important; | |
| color: var(--dmx-accent) !important; | |
| border: 1px solid var(--dmx-accent) !important; | |
| } | |
| .swagger-ui .try-out__btn:hover { | |
| background: var(--dmx-accent) !important; | |
| color: #fff !important; | |
| } | |
| /* Auth */ | |
| .swagger-ui .auth-wrapper .authorize { | |
| background: transparent !important; | |
| color: var(--dmx-accent) !important; | |
| border: 1px solid var(--dmx-accent) !important; | |
| } | |
| /* Footer */ | |
| .swagger-ui .wrapper { | |
| max-width: 1400px !important; | |
| } | |
| </style> | |
| """ | |
| app = FastAPI( | |
| title="DarkMedia-X Studio API", | |
| description="### 🎬 Automated Horror/Anime Video Production System\n\n" | |
| "**Endpoints for the Vercel frontend dashboard.**\n\n" | |
| "All media is stored on Cloudflare R2. Images are proxied through this API to avoid CORS issues.\n\n" | |
| "---\n" | |
| "### Quick Links\n" | |
| "- **Frontend**: https://darkmedia-xstudio.vercel.app\n" | |
| "- **HF Space**: https://huggingface.co/spaces/cybermedia/darkmedia-x-api\n" | |
| "- **R2 Bucket**: darkmedia-x-studio", | |
| version="1.0.0", | |
| docs_url=None, # Disable default docs | |
| redoc_url="/redoc", | |
| ) | |
| # Custom Swagger UI with DarkMedia-X theme | |
| async def custom_swagger_ui_html(): | |
| from fastapi.openapi.docs import get_swagger_ui_html | |
| from fastapi.openapi.utils import get_openapi | |
| openapi_schema = get_openapi( | |
| title=app.title, | |
| version=app.version, | |
| description=app.description, | |
| routes=app.routes, | |
| ) | |
| # Serve OpenAPI JSON | |
| async def openapi_json(): | |
| return openapi_schema | |
| html_content = f"""<!DOCTYPE html> | |
| <html> | |
| <head> | |
| <meta charset="UTF-8"> | |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> | |
| <title>{app.title} - Swagger UI</title> | |
| <link rel="stylesheet" type="text/css" href="https://cdn.jsdelivr.net/npm/swagger-ui-dist@5/swagger-ui.css"> | |
| <link rel="icon" type="image/svg+xml" href="data:image/svg+xml,%3Csvg xmlns='http://www.w3.org/2000/svg' viewBox='0 0 100 100'%3E%3Ctext y='.9em' font-size='90'%3E🎬%3C/text%3E%3C/svg%3E"> | |
| {DARKMEDIA_SWAGGER_CSS} | |
| </head> | |
| <body> | |
| <div id="swagger-ui"></div> | |
| <script src="https://cdn.jsdelivr.net/npm/swagger-ui-dist@5/swagger-ui-bundle.js" crossorigin></script> | |
| <script src="https://cdn.jsdelivr.net/npm/swagger-ui-dist@5/swagger-ui-standalone-preset.js" crossorigin></script> | |
| <script> | |
| window.onload = function() {{ | |
| const ui = SwaggerUIBundle({{ | |
| url: "/openapi.json", | |
| dom_id: '#swagger-ui', | |
| deepLinking: true, | |
| presets: [ | |
| SwaggerUIBundle.presets.apis, | |
| SwaggerUIStandalonePreset | |
| ], | |
| plugins: [ | |
| SwaggerUIBundle.plugins.DownloadUrl | |
| ], | |
| layout: "StandaloneLayout", | |
| persistAuthorization: true, | |
| displayRequestDuration: true, | |
| filter: true, | |
| tryItOutEnabled: true, | |
| }}); | |
| window.ui = ui; | |
| }}; | |
| </script> | |
| </body> | |
| </html>""" | |
| return HTMLResponse(content=html_content) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # --- Server-Side Cache --- | |
| class Cache: | |
| """Simple TTL cache with ETag support.""" | |
| def __init__(self): | |
| self._store = {} | |
| def get(self, key): | |
| if key in self._store: | |
| entry = self._store[key] | |
| if time.time() - entry["ts"] < entry["ttl"]: | |
| return entry["data"], entry["etag"] | |
| del self._store[key] | |
| return None, None | |
| def set(self, key, data, ttl=60): | |
| etag = hashlib.md5(json.dumps(data, sort_keys=True).encode()).hexdigest()[:12] | |
| self._store[key] = {"data": data, "etag": etag, "ts": time.time(), "ttl": ttl} | |
| return etag | |
| def invalidate(self, key): | |
| self._store.pop(key, None) | |
| cache = Cache() | |
| def cached_endpoint(key, ttl=60): | |
| """Decorator that adds server-side caching + ETag + Cache-Control headers.""" | |
| def decorator(func): | |
| async def wrapper(request: Request, *args, **kwargs): | |
| # Check If-None-Match for ETag | |
| if_none_match = request.headers.get("if-none-match") | |
| cached_data, cached_etag = cache.get(key) | |
| if cached_data and if_none_match and if_none_match.strip('"') == cached_etag: | |
| return JSONResponse(status_code=304, content={}, headers={ | |
| "ETag": f'"{cached_etag}"', | |
| "Cache-Control": f"public, max-age={ttl}, stale-while-revalidate={ttl*2}", | |
| }) | |
| if cached_data: | |
| return JSONResponse(content=cached_data, headers={ | |
| "ETag": f'"{cached_etag}"', | |
| "Cache-Control": f"public, max-age={ttl}, stale-while-revalidate={ttl*2}", | |
| }) | |
| result = await func(request, *args, **kwargs) | |
| if isinstance(result, dict): | |
| etag = cache.set(key, result, ttl) | |
| return JSONResponse(content=result, headers={ | |
| "ETag": f'"{etag}"', | |
| "Cache-Control": f"public, max-age={ttl}, stale-while-revalidate={ttl*2}", | |
| }) | |
| return result | |
| return wrapper | |
| return decorator | |
| def no_cache(func): | |
| """Decorator for endpoints that must always return fresh data.""" | |
| async def wrapper(*args, **kwargs): | |
| result = await func(*args, **kwargs) | |
| if isinstance(result, dict): | |
| return JSONResponse(content=result, headers={ | |
| "Cache-Control": "no-cache, no-store, must-revalidate", | |
| "Pragma": "no-cache", | |
| "Expires": "0", | |
| }) | |
| return result | |
| return wrapper | |
| # Paths | |
| SPACE_DIR = Path(__file__).parent.resolve() | |
| DATA_DIR = SPACE_DIR.parent / "data" | |
| if not DATA_DIR.exists(): | |
| # Fallback for some environments (like Hugging Face Space root) | |
| DATA_DIR = SPACE_DIR / "data" | |
| STORIES_DIR = DATA_DIR / "stories" | |
| ASSETS_DIR = DATA_DIR / "assets" | |
| VIDEOS_DIR = DATA_DIR / "videos" | |
| STATE_DIR = DATA_DIR / "state" | |
| for d in [DATA_DIR, STORIES_DIR, ASSETS_DIR, VIDEOS_DIR, STATE_DIR]: | |
| d.mkdir(parents=True, exist_ok=True) | |
| # --- R2 Client --- | |
| def get_r2_client(): | |
| endpoint = os.getenv("R2_ENDPOINT", "") | |
| access_key = os.getenv("R2_ACCESS_KEY_ID", "") | |
| secret_key = os.getenv("R2_SECRET_ACCESS_KEY", "") | |
| if not all([endpoint, access_key, secret_key]): | |
| return None | |
| try: | |
| import boto3 | |
| from botocore.config import Config | |
| return boto3.client( | |
| "s3", | |
| endpoint_url=endpoint, | |
| aws_access_key_id=access_key, | |
| aws_secret_access_key=secret_key, | |
| config=Config(signature_version="s3v4"), | |
| ) | |
| except Exception: | |
| return None | |
| def r2_list(prefix=""): | |
| client = get_r2_client() | |
| if client is None: | |
| return [] | |
| bucket = os.getenv("R2_BUCKET", "darkmedia-x-studio") | |
| try: | |
| paginator = client.get_paginator("list_objects_v2") | |
| keys = [] | |
| for page in paginator.paginate(Bucket=bucket, Prefix=prefix): | |
| for obj in page.get("Contents", []): | |
| keys.append(obj["Key"]) | |
| return keys | |
| except Exception: | |
| return [] | |
| def r2_read_text(key): | |
| client = get_r2_client() | |
| if client is None: | |
| return None | |
| bucket = os.getenv("R2_BUCKET", "darkmedia-x-studio") | |
| try: | |
| resp = client.get_object(Bucket=bucket, Key=key) | |
| return resp["Body"].read().decode("utf-8") | |
| except Exception: | |
| return None | |
| def r2_presigned_url(key, expires_in=3600): | |
| client = get_r2_client() | |
| if client is None: | |
| return None | |
| bucket = os.getenv("R2_BUCKET", "darkmedia-x-studio") | |
| try: | |
| return client.generate_presigned_url( | |
| "get_object", | |
| Params={"Bucket": bucket, "Key": key}, | |
| ExpiresIn=expires_in, | |
| ) | |
| except Exception: | |
| return None | |
| # --- Authentication --- | |
| async def auth_login(redirect: str = "/index.html"): | |
| """Redirect to the external auth provider login page.""" | |
| login_url = f"{AUTH_PROVIDER_URL}/login?redirect={redirect}" | |
| return RedirectResponse(url=login_url) | |
| async def auth_google(payload: dict): | |
| """Exchange Google credential for a JWT token via the auth provider.""" | |
| credential = payload.get("credential") or payload.get("idToken") | |
| if not credential: | |
| raise HTTPException(status_code=400, detail="Missing Google credential") | |
| try: | |
| headers = {"Content-Type": "application/json"} | |
| if AUTH_PROVIDER_KEY: | |
| headers["X-API-Key"] = AUTH_PROVIDER_KEY | |
| headers["Authorization"] = f"Bearer {AUTH_PROVIDER_KEY}" | |
| res = requests.post( | |
| f"{AUTH_PROVIDER_URL}/api/auth/login", | |
| headers=headers, | |
| json={"idToken": credential}, | |
| timeout=15 | |
| ) | |
| data = res.json() | |
| if res.status_code != 200 or data.get("statut") == "erreur": | |
| raise HTTPException( | |
| status_code=401, | |
| detail=data.get("message", "Authentication failed") | |
| ) | |
| token = data.get("token") or data.get("accessToken") or credential | |
| return {"token": token} | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def auth_callback(token: str = None): | |
| """Handle callback from auth provider, set cookie and redirect.""" | |
| if not token: | |
| return RedirectResponse(url="/login.html?error=missing_token") | |
| response = RedirectResponse(url="/index.html") | |
| response.set_cookie( | |
| key="auth_token", | |
| value=token, | |
| httponly=True, | |
| max_age=3600 * 24 * 7, # 1 week | |
| samesite="lax" | |
| ) | |
| return response | |
| async def get_auth_status(request: Request): | |
| """Check authentication status.""" | |
| token = "" | |
| auth = request.headers.get("Authorization", "") | |
| if auth.startswith("Bearer "): | |
| token = auth[7:] | |
| else: | |
| token = request.cookies.get("auth_token", "") | |
| is_valid = False | |
| if token: | |
| try: | |
| headers = {"Content-Type": "application/json"} | |
| if AUTH_PROVIDER_KEY: | |
| headers["X-API-Key"] = AUTH_PROVIDER_KEY | |
| headers["Authorization"] = f"Bearer {AUTH_PROVIDER_KEY}" | |
| res = requests.post( | |
| f"{AUTH_PROVIDER_URL}/api/auth/verify", | |
| headers=headers, | |
| json={"token": token}, | |
| timeout=5 | |
| ) | |
| is_valid = (res.status_code == 200) | |
| except: | |
| pass | |
| return { | |
| "authenticated": is_valid, | |
| "provider_url": AUTH_PROVIDER_URL, | |
| "has_token": bool(token) | |
| } | |
| async def proxy_image(image_path: str): | |
| """Proxy image from R2 to avoid CORS issues with direct R2 URLs.""" | |
| import urllib.parse | |
| from fastapi.responses import Response | |
| key = urllib.parse.unquote(image_path) | |
| # Remove leading slash if present | |
| if key.startswith("/"): | |
| key = key[1:] | |
| client = get_r2_client() | |
| if client is None: | |
| raise HTTPException(status_code=500, detail="R2 not configured") | |
| bucket = os.getenv("R2_BUCKET", "darkmedia-x-studio") | |
| try: | |
| resp = client.get_object(Bucket=bucket, Key=key) | |
| content = resp["Body"].read() | |
| content_type = resp.get("ContentType", "image/png") | |
| # Determine content type from extension if not set | |
| if not content_type or content_type == "binary/octet-stream": | |
| if key.lower().endswith(".png"): | |
| content_type = "image/png" | |
| elif key.lower().endswith((".jpg", ".jpeg")): | |
| content_type = "image/jpeg" | |
| elif key.lower().endswith(".gif"): | |
| content_type = "image/gif" | |
| elif key.lower().endswith(".webp"): | |
| content_type = "image/webp" | |
| return Response(content=content, media_type=content_type, headers={ | |
| "Cache-Control": "public, max-age=3600", | |
| "Access-Control-Allow-Origin": "*", | |
| }) | |
| except Exception as e: | |
| raise HTTPException(status_code=404, detail=f"Image not found: {str(e)}") | |
| # --- Stories --- | |
| async def get_stories(request: Request): | |
| stories = [] | |
| seen = set() | |
| # Local scan | |
| if STORIES_DIR.exists(): | |
| for dirpath, _, filenames in os.walk(str(STORIES_DIR)): | |
| for filename in filenames: | |
| if not filename.endswith(".md") or filename.startswith("README") or filename == "music_prompt.md": | |
| continue | |
| file_path = Path(dirpath) / filename | |
| rel_path = file_path.relative_to(STORIES_DIR) | |
| story_id = str(rel_path.parent).replace("\\", "/") | |
| if story_id in seen: | |
| continue | |
| seen.add(story_id) | |
| title = filename.replace(".md", "").replace("_", " ").strip() | |
| if title.lower() in {"story", "index", "readme"}: | |
| title = story_id.split("/")[-1].replace("_", " ").strip() | |
| img_dir = file_path.parent / "assets" / "images" | |
| if not img_dir.exists(): | |
| img_dir = file_path.parent / "images" | |
| image_count = 0 | |
| if img_dir.exists(): | |
| image_count = len([f for f in os.listdir(str(img_dir)) if f.lower().endswith((".png", ".jpg", ".jpeg"))]) | |
| scenes_dir = file_path.parent / "assets" / "scenes" | |
| total_scenes = 0 | |
| if scenes_dir.exists(): | |
| total_scenes = len([f for f in os.listdir(str(scenes_dir)) if f.startswith("scene_") and f.endswith(".txt")]) | |
| content = file_path.read_text(encoding="utf-8") | |
| word_count = len(content.split()) | |
| parent = file_path.parent | |
| category = parent.parent.name if parent.parent != STORIES_DIR else "General" | |
| stories.append({ | |
| "id": story_id, | |
| "title": title, | |
| "path": str(rel_path).replace("\\", "/"), | |
| "category": category, | |
| "processed": False, | |
| "image_count": image_count, | |
| "total_scenes": total_scenes if total_scenes > 0 else 10, | |
| "ready": image_count >= (total_scenes if total_scenes > 0 else 10), | |
| "word_count": word_count, | |
| "is_empty": word_count < 20, | |
| }) | |
| # R2 scan | |
| r2_keys = r2_list(prefix="stories/") | |
| for key in r2_keys: | |
| if not key.endswith(".md") or "README" in key or "music_prompt" in key: | |
| continue | |
| parts = key.split("/") | |
| if len(parts) < 3: | |
| continue | |
| story_id = "/".join(parts[1:-1]) | |
| if story_id in seen: | |
| continue | |
| seen.add(story_id) | |
| filename = parts[-1] | |
| title = filename.replace(".md", "").replace("_", " ").strip() | |
| if title.lower() in {"story", "index", "readme"}: | |
| title = story_id.split("/")[-1].replace("_", " ").strip() | |
| category = parts[1] | |
| # Count images on R2 | |
| image_prefix = "/".join(parts[:3]) + "/assets/images/" | |
| image_keys = [k for k in r2_keys if k.startswith(image_prefix) and k.endswith((".png", ".jpg", ".jpeg"))] | |
| image_count = len(image_keys) | |
| # Count audio files on R2 | |
| audio_prefix = "/".join(parts[:3]) + "/assets/audio/" | |
| audio_keys = [k for k in r2_keys if k.startswith(audio_prefix) and k.endswith(".mp3")] | |
| audio_count = len(audio_keys) | |
| content = r2_read_text(key) or "" | |
| word_count = len(content.split()) | |
| # Get model used from metadata | |
| import json as json_module | |
| image_model = None | |
| try: | |
| metadata_key = f"stories/{story_id}/metadata.json" | |
| metadata_content = r2_read_text(metadata_key) | |
| if metadata_content: | |
| metadata = json_module.loads(metadata_content) | |
| image_model = metadata.get("image_model") | |
| except: | |
| pass | |
| stories.append({ | |
| "id": story_id, | |
| "title": title, | |
| "path": key, | |
| "category": category, | |
| "processed": False, | |
| "image_count": image_count, | |
| "audio_count": audio_count, | |
| "total_scenes": 10, | |
| "ready": image_count >= 10, | |
| "word_count": word_count, | |
| "is_empty": word_count < 20, | |
| "image_model": image_model, | |
| }) | |
| return {"stories": sorted(stories, key=lambda s: s["title"])} | |
| async def get_story_content(request: Request, story_path: str): | |
| """Get story content by path (frontend calls /api/stories/content/{id}).""" | |
| # Try local first | |
| base = STORIES_DIR / story_path | |
| for f in ["story.md", "story.txt"]: | |
| if (base / f).exists(): | |
| return {"content": (base / f).read_text(encoding="utf-8"), "status": "success", "path": story_path, "filename": f} | |
| # Try R2 | |
| r2_key = f"stories/{story_path}/story.md" | |
| content = r2_read_text(r2_key) | |
| if content: | |
| return {"content": content, "status": "success", "path": story_path, "filename": "story.md"} | |
| raise HTTPException(status_code=404, detail="Story not found") | |
| async def get_story(request: Request, story_path: str): | |
| base = STORIES_DIR / story_path | |
| for f in ["story.md", "story.txt"]: | |
| if (base / f).exists(): | |
| return {"content": (base / f).read_text(encoding="utf-8")} | |
| # Try R2 | |
| r2_key = f"stories/{story_path}/story.md" | |
| content = r2_read_text(r2_key) | |
| if content: | |
| return {"content": content} | |
| raise HTTPException(status_code=404, detail="Story not found") | |
| # --- Assets --- | |
| async def get_music(request: Request): | |
| # First check local files | |
| music_dir = ASSETS_DIR / "background_music" | |
| files = [] | |
| if music_dir.exists(): | |
| files = [f.name for f in music_dir.glob("*.mp3") | music_dir.glob("*.wav")] | |
| # Also check R2 for music files | |
| r2_music_keys = r2_list(prefix="assets/music/") | |
| for key in r2_music_keys: | |
| if key.lower().endswith((".mp3", ".wav")): | |
| filename = key.split("/")[-1] | |
| if filename not in files: | |
| files.append(filename) | |
| return {"music": sorted(files), "files": sorted(files)} | |
| async def get_voice_samples(request: Request): | |
| voice_dir = ASSETS_DIR / "voice_samples" | |
| files = [] | |
| if voice_dir.exists(): | |
| files = [f.name for f in voice_dir.glob("*.wav") | voice_dir.glob("*.mp3")] | |
| return {"samples": sorted(files)} | |
| # --- Videos --- | |
| async def get_videos(request: Request): | |
| """List videos from R2 storage.""" | |
| videos = [] | |
| # Scan R2 for videos | |
| r2_keys = r2_list(prefix="videos/") | |
| for key in r2_keys: | |
| if key.lower().endswith(".mp4"): | |
| filename = key.split("/")[-1] | |
| # Extract story info from path if available | |
| parts = key.split("/") | |
| story = parts[1] if len(parts) > 1 else "Unknown" | |
| # Use proxy URL for video playback | |
| proxy_url = f"/api/video/play/{key}" | |
| videos.append({ | |
| "filename": filename, | |
| "path": key, | |
| "url": proxy_url, | |
| "story": story, | |
| }) | |
| # Also scan local directory (for local development) | |
| if VIDEOS_DIR.exists(): | |
| for f in VIDEOS_DIR.glob("*.mp4"): | |
| if not any(v["filename"] == f.name for v in videos): | |
| videos.append({ | |
| "filename": f.name, | |
| "path": f"videos/{f.name}", | |
| "url": f"/api/video/play/videos/{f.name}", | |
| "story": "local", | |
| }) | |
| return {"videos": sorted(videos, key=lambda x: x["filename"])} | |
| async def proxy_video(video_path: str): | |
| """Proxy video from R2 for playback.""" | |
| import urllib.parse | |
| from fastapi.responses import StreamingResponse | |
| key = urllib.parse.unquote(video_path) | |
| if key.startswith("/"): | |
| key = key[1:] | |
| client = get_r2_client() | |
| if client is None: | |
| raise HTTPException(status_code=500, detail="R2 not configured") | |
| bucket = os.getenv("R2_BUCKET", "darkmedia-x-studio") | |
| try: | |
| resp = client.get_object(Bucket=bucket, Key=key) | |
| content = resp["Body"].read() | |
| return StreamingResponse( | |
| iter([content]), | |
| media_type="video/mp4", | |
| headers={ | |
| "Cache-Control": "public, max-age=3600", | |
| "Access-Control-Allow-Origin": "*", | |
| "Accept-Ranges": "bytes", | |
| } | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=404, detail=f"Video not found: {str(e)}") | |
| # --- Audio / Narrations --- | |
| async def proxy_audio(audio_path: str): | |
| """Proxy audio from R2 for playback.""" | |
| import urllib.parse | |
| from fastapi.responses import StreamingResponse | |
| key = urllib.parse.unquote(audio_path) | |
| if key.startswith("/"): | |
| key = key[1:] | |
| client = get_r2_client() | |
| if client is None: | |
| raise HTTPException(status_code=500, detail="R2 not configured") | |
| bucket = os.getenv("R2_BUCKET", "darkmedia-x-studio") | |
| try: | |
| resp = client.get_object(Bucket=bucket, Key=key) | |
| content = resp["Body"].read() | |
| return StreamingResponse( | |
| iter([content]), | |
| media_type="audio/mp3", | |
| headers={ | |
| "Cache-Control": "public, max-age=3600", | |
| "Access-Control-Allow-Origin": "*", | |
| } | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=404, detail=f"Audio not found: {str(e)}") | |
| async def get_story_narrations(story_path: str): | |
| """Get list of generated narrations for a story.""" | |
| import urllib.parse | |
| story_id = urllib.parse.unquote(story_path) | |
| prefix = f"stories/{story_id}/assets/audio/" | |
| r2_keys = r2_list(prefix=prefix) | |
| narrations = [] | |
| for key in r2_keys: | |
| if key.endswith(".mp3"): | |
| scene_num = key.replace(prefix, "").replace(".mp3", "").replace("scene_", "") | |
| narrations.append({ | |
| "scene": scene_num, | |
| "url": f"/api/audio/{key}" | |
| }) | |
| return {"narrations": sorted(narrations, key=lambda x: x["scene"])} | |
| # --- Generated Images (R2) --- | |
| async def get_generated_images(request: Request): | |
| r2_keys = r2_list(prefix="stories/") | |
| images = [] | |
| for key in r2_keys: | |
| if not key.endswith((".png", ".jpg", ".jpeg")) or "/images/" not in key: | |
| continue | |
| parts = key.split("/") | |
| category = parts[1] if len(parts) > 1 else "General" | |
| story_id = "/".join(parts[1:3]) | |
| scene = parts[-1].rsplit(".", 1)[0] | |
| # Use proxy URL to avoid CORS issues | |
| proxy_url = f"/api/image/{key}" | |
| images.append({ | |
| "url": proxy_url, | |
| "path": key, | |
| "filename": parts[-1], | |
| "story_id": story_id, | |
| "category": category, | |
| "scene": scene, | |
| }) | |
| return {"images": sorted(images, key=lambda x: x.get("path", ""))} | |
| # --- Library (Gallery view) --- | |
| async def get_library(request: Request): | |
| """Scan R2 for generated scene images, return gallery-ready list.""" | |
| r2_keys = r2_list(prefix="stories/") | |
| library = [] | |
| for key in r2_keys: | |
| if not key.endswith(".png"): | |
| continue | |
| # Skip non-scene images (e.g. UI assets) | |
| if "/images/" not in key and "/depths/" not in key and "/blender_test/" not in key: | |
| continue | |
| parts = key.split("/") | |
| if len(parts) < 4: | |
| continue | |
| # story_name is usually at index 2 (stories/{category}/{story}/...) | |
| story_name = parts[2] if len(parts) > 2 else "Unknown" | |
| # Use proxy URL to avoid CORS issues | |
| proxy_url = f"/api/image/{key}" | |
| library.append({ | |
| "story": story_name, | |
| "filename": parts[-1], | |
| "url": proxy_url, | |
| "path": key, | |
| }) | |
| library.sort(key=lambda x: x["filename"], reverse=True) | |
| return {"library": library[:200]} | |
| async def delete_library_image(image_path: str): | |
| """Delete an image from R2 storage.""" | |
| client = get_r2_client() | |
| if client is None: | |
| return {"status": "error", "message": "R2 client not configured"} | |
| bucket = os.getenv("R2_BUCKET", "darkmedia-x-studio") | |
| # URL-decode the path | |
| import urllib.parse | |
| key = urllib.parse.unquote(image_path) | |
| try: | |
| client.delete_object(Bucket=bucket, Key=key) | |
| # Invalidate caches | |
| cache.invalidate("library") | |
| cache.invalidate("generated_images") | |
| cache.invalidate("stories") | |
| return {"status": "success", "message": f"Deleted {key}"} | |
| except Exception as e: | |
| return {"status": "error", "message": str(e)} | |
| # --- Current Frame / Preview --- | |
| async def get_current_frame(): | |
| return {"frame": None, "narration": ""} | |
| # --- Status --- | |
| async def get_status(): | |
| try: | |
| task_file = STATE_DIR / "current_task.json" | |
| if task_file.exists(): | |
| with open(task_file) as f: | |
| task = json.load(f) | |
| else: | |
| task = {"story": None, "step": "IDLE", "progress": 0} | |
| return { | |
| "status": "ready", | |
| "debug_version": "v1.0.1-auth-fix", | |
| "step": task.get("step", "IDLE"), | |
| "progress": task.get("progress", 0), | |
| "story": task.get("story"), | |
| "error": task.get("error"), | |
| "current_scene": task.get("current_scene", 0), | |
| "total_scenes": task.get("total_scenes", 0), | |
| "timestamp": time.time(), | |
| } | |
| except Exception as e: | |
| return {"status": "error", "message": str(e)} | |
| async def get_logs(): | |
| return {"logs": []} | |
| # --- System Info --- | |
| async def get_system(): | |
| try: | |
| ram = psutil.virtual_memory() | |
| return { | |
| "ram_used": ram.used / (1024**3), | |
| "ram_total": ram.total / (1024**3), | |
| "ram_percent": ram.percent, | |
| "cpu_percent": psutil.cpu_percent(interval=0.1), | |
| "python_version": os.sys.version.split()[0], | |
| } | |
| except Exception as e: | |
| return {"error": str(e)} | |
| # --- Engine Health --- | |
| async def get_engine_health(): | |
| try: | |
| task_file = STATE_DIR / "current_task.json" | |
| if task_file.exists(): | |
| with open(task_file) as f: | |
| task = json.load(f) | |
| story = task.get("story", "UNKNOWN") | |
| step = task.get("step", "IDLE") | |
| progress = task.get("progress", 0) | |
| else: | |
| story = "AUCUN" | |
| step = "IDLE" | |
| progress = 0 | |
| scenes_count = 0 | |
| total_scenes = 10 | |
| if story and story != "AUCUN": | |
| story_dir = STORIES_DIR / story | |
| img_dir = story_dir / "assets" / "images" | |
| if img_dir.exists(): | |
| scenes_count = len(list(img_dir.glob("scene_*.png"))) | |
| verdict = "HEALTHY" if progress == 0 else "RENDERING" | |
| verdict_color = "green" if progress == 0 else "yellow" | |
| return { | |
| "verdict": verdict, | |
| "verdict_color": verdict_color, | |
| "story": story, | |
| "step": step, | |
| "progress": progress, | |
| "scenes_generated": scenes_count, | |
| "total_scenes": total_scenes, | |
| "logs": [], | |
| } | |
| except Exception as e: | |
| return {"verdict": "ERROR", "verdict_color": "red", "message": str(e)} | |
| # --- UI Settings --- | |
| async def get_ui_settings(request: Request): | |
| settings_file = STATE_DIR / "ui_settings.json" | |
| if settings_file.exists(): | |
| with open(settings_file) as f: | |
| return json.load(f) | |
| return {} | |
| async def save_ui_settings(settings: dict): | |
| settings_file = STATE_DIR / "ui_settings.json" | |
| with open(settings_file, "w") as f: | |
| json.dump(settings, f) | |
| cache.invalidate("ui_settings") | |
| return {"status": "ok"} | |
| # --- TTS Preview --- | |
| async def get_tts_voices(): | |
| """List available TTS voices.""" | |
| return { | |
| "voices": [ | |
| {"id": "fr-FR-DeniseNeural", "name": "Denise (FR)", "gender": "female", "language": "fr-FR"}, | |
| {"id": "fr-FR-HenriNeural", "name": "Henri (FR)", "gender": "male", "language": "fr-FR"}, | |
| {"id": "fr-CA-SylvieNeural", "name": "Sylvie (CA)", "gender": "female", "language": "fr-CA"}, | |
| {"id": "fr-CA-JeanNeural", "name": "Jean (CA)", "gender": "male", "language": "fr-CA"}, | |
| {"id": "fr-FR-AlainNeural", "name": "Alain (FR)", "gender": "male", "language": "fr-FR"}, | |
| {"id": "fr-FR-BrigitteNeural", "name": "Brigitte (FR)", "gender": "female", "language": "fr-FR"}, | |
| {"id": "fr-FR-CelesteNeural", "name": "Celeste (FR)", "gender": "female", "language": "fr-FR"}, | |
| {"id": "fr-FR-ClaudeNeural", "name": "Claude (FR)", "gender": "male", "language": "fr-FR"}, | |
| {"id": "fr-FR-CoralieNeural", "name": "Coralie (FR)", "gender": "female", "language": "fr-FR"}, | |
| {"id": "fr-FR-EloiseNeural", "name": "Eloise (FR)", "gender": "female", "language": "fr-FR"}, | |
| {"id": "fr-FR-JacquelineNeural", "name": "Jacqueline (FR)", "gender": "female", "language": "fr-FR"}, | |
| {"id": "fr-FR-JeromeNeural", "name": "Jerome (FR)", "gender": "male", "language": "fr-FR"}, | |
| {"id": "fr-FR-JosephineNeural", "name": "Josephine (FR)", "gender": "female", "language": "fr-FR"}, | |
| {"id": "fr-FR-MauriceNeural", "name": "Maurice (FR)", "gender": "male", "language": "fr-FR"}, | |
| {"id": "fr-FR-YvesNeural", "name": "Yves (FR)", "gender": "male", "language": "fr-FR"}, | |
| {"id": "fr-FR-YvetteNeural", "name": "Yvette (FR)", "gender": "female", "language": "fr-FR"}, | |
| ] | |
| } | |
| async def tts_preview(payload: dict): | |
| import tempfile | |
| import edge_tts | |
| text = payload.get("text", "Test audio DarkMedia-X.") | |
| voice = payload.get("voice", "fr-FR-DeniseNeural") | |
| rate = payload.get("rate", "+0%") | |
| pitch = payload.get("pitch", "+0Hz") | |
| # Map edge-tts voice names | |
| voice_map = { | |
| "fr-FR-DeniseNeural": "fr-FR-DeniseNeural", | |
| "fr-FR-HenriNeural": "fr-FR-HenriNeural", | |
| "fr-CA-SylvieNeural": "fr-CA-SylvieNeural", | |
| "fr-CA-JeanNeural": "fr-CA-JeanNeural", | |
| } | |
| tts_voice = voice_map.get(voice, "fr-FR-DeniseNeural") | |
| try: | |
| with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as tmp: | |
| comm = edge_tts.Communicate(text, tts_voice, rate=rate, pitch=pitch) | |
| await comm.save(tmp.name) | |
| tmp.seek(0) | |
| audio_data = tmp.read() | |
| import base64 | |
| b64 = base64.b64encode(audio_data).decode() | |
| return {"status": "success", "url": f"data:audio/mp3;base64,{b64}"} | |
| except Exception as e: | |
| return {"status": "error", "message": str(e)} | |
| # --- Story Images (R2) --- | |
| async def get_story_images(story_id: str): | |
| """List images for a story from R2.""" | |
| import urllib.parse | |
| story_id = urllib.parse.unquote(story_id).strip("/") | |
| images = [] | |
| r2_keys = r2_list(prefix=f"stories/{story_id}/assets/images/") | |
| for key in r2_keys: | |
| if key.lower().endswith((".png", ".jpg", ".jpeg")): | |
| filename = key.split("/")[-1] | |
| # Use proxy URL to avoid CORS issues | |
| proxy_url = f"/api/image/{key}" | |
| images.append({ | |
| "path": key, | |
| "filename": filename, | |
| "url": proxy_url, | |
| "timestamp": 0, | |
| }) | |
| images.sort(key=lambda x: x["filename"]) | |
| return {"images": images} | |
| async def delete_story_image(story_id: str, filename: str): | |
| """Delete an image from R2.""" | |
| import urllib.parse | |
| story_id = urllib.parse.unquote(story_id).strip("/") | |
| filename = urllib.parse.unquote(filename).strip("/") | |
| key = f"stories/{story_id}/assets/images/{filename}" | |
| client = get_r2_client() | |
| if client is None: | |
| return {"status": "error", "message": "R2 not configured"} | |
| bucket = os.getenv("R2_BUCKET", "darkmedia-x-studio") | |
| try: | |
| client.delete_object(Bucket=bucket, Key=key) | |
| cache.invalidate("generated_images") | |
| cache.invalidate("library") | |
| cache.invalidate("stories") | |
| return {"status": "deleted", "filename": filename} | |
| except Exception as e: | |
| return {"status": "error", "message": str(e)} | |
| # --- Story Mutations (R2) --- | |
| async def delete_story(story_path: str): | |
| """Delete a story and all its assets from R2.""" | |
| import urllib.parse | |
| story_path = urllib.parse.unquote(story_path).strip("/") | |
| prefix = f"stories/{story_path}/" | |
| client = get_r2_client() | |
| if client is None: | |
| return {"status": "error", "message": "R2 not configured"} | |
| bucket = os.getenv("R2_BUCKET", "darkmedia-x-studio") | |
| try: | |
| # List and delete all objects under the story prefix | |
| paginator = client.get_paginator("list_objects_v2") | |
| for page in paginator.paginate(Bucket=bucket, Prefix=prefix): | |
| for obj in page.get("Contents", []): | |
| client.delete_object(Bucket=bucket, Key=obj["Key"]) | |
| cache.invalidate("stories") | |
| cache.invalidate("generated_images") | |
| cache.invalidate("library") | |
| return {"status": "success", "message": f"Story '{story_path}' deleted"} | |
| except Exception as e: | |
| return {"status": "error", "message": str(e)} | |
| async def save_story(payload: dict): | |
| """Save story content to R2.""" | |
| story_id = payload.get("story_id", "") | |
| content = payload.get("content", "") | |
| if not story_id or not content: | |
| return {"status": "error", "message": "story_id and content required"} | |
| client = get_r2_client() | |
| if client is None: | |
| return {"status": "error", "message": "R2 not configured"} | |
| bucket = os.getenv("R2_BUCKET", "darkmedia-x-studio") | |
| key = f"stories/{story_id}/story.md" | |
| try: | |
| client.put_object(Bucket=bucket, Key=key, Body=content.encode("utf-8"), ContentType="text/markdown") | |
| cache.invalidate("stories") | |
| return {"status": "success", "message": "Story saved"} | |
| except Exception as e: | |
| return {"status": "error", "message": str(e)} | |
| async def normalize_ai(payload: dict = None): | |
| """Normalize story prompts (stub - returns success).""" | |
| return {"status": "success", "message": "Normalization complete", "fixed_files": 0} | |
| async def generate_all_narrations(payload: dict = None): | |
| """Generate narration for all stories using AI.""" | |
| import requests | |
| api_key = os.getenv("GEMINI_API_KEY", "") | |
| if not api_key: | |
| return {"status": "error", "message": "GEMINI_API_KEY not configured"} | |
| # Get all stories | |
| r2_keys = r2_list(prefix="stories/") | |
| story_ids = set() | |
| for key in r2_keys: | |
| if key.endswith("story.md"): | |
| parts = key.replace("stories/", "").split("/") | |
| if len(parts) >= 2: | |
| story_ids.add("/".join(parts[:-1])) | |
| generated = 0 | |
| errors = [] | |
| for story_id in sorted(story_ids): | |
| try: | |
| story_key = f"stories/{story_id}/story.md" | |
| content = r2_read_text(story_key) | |
| if not content: | |
| continue | |
| # Check if story already has narrations | |
| if "**Narration" in content or "**Narration :" in content: | |
| continue | |
| # Extract scenes | |
| scene_pattern = re.compile(r'##\s*[Ss]c[eè]ne\s*(\d+)[^:]*:\s*([^\n]+)', re.MULTILINE) | |
| scenes = scene_pattern.findall(content) | |
| if not scenes: | |
| continue | |
| # Generate narration for each scene using Gemini | |
| new_content = content | |
| for scene_num, scene_title in scenes: | |
| prompt = f"""Génère une narration courte (15-25 mots) en français pour une scène d'horreur/anime sombre. | |
| Titre de la scène: {scene_title} | |
| La narration doit être: | |
| - En français québécois | |
| - Terrifiante et mystérieuse | |
| - Utiliser des mots évocateurs de mort, paranormal, mystère | |
| - Maximum 25 mots | |
| -Style: narration sombre et poétique | |
| Réponds UNIQUEMENT avec la narration, sans guillemets ni ponctuation inutile.""" | |
| try: | |
| url = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:generateContent?key={api_key}" | |
| resp = requests.post(url, json={ | |
| "contents": [{"parts": [{"text": prompt}]}] | |
| }, timeout=30) | |
| data = resp.json() | |
| narration = data.get("candidates", [{}])[0].get("content", {}).get("parts", [{}])[0].get("text", "").strip() | |
| narration = narration.strip('"').strip("'") | |
| # Add narration to story | |
| scene_marker = f"## Scene {scene_num} : {scene_title}" | |
| if scene_marker in new_content: | |
| new_content = new_content.replace( | |
| scene_marker, | |
| f"{scene_marker}\n**Narration :** \"{narration}\"" | |
| ) | |
| except Exception as e: | |
| print(f"Error generating narration for scene {scene_num}: {e}") | |
| # Save updated story | |
| client = get_r2_client() | |
| bucket = os.getenv("R2_BUCKET", "darkmedia-x-studio") | |
| client.put_object(Bucket=bucket, Key=story_key, Body=new_content.encode('utf-8')) | |
| generated += 1 | |
| print(f"Generated narrations for: {story_id}") | |
| except Exception as e: | |
| errors.append(f"{story_id}: {str(e)}") | |
| return {"status": "success", "message": f"Generated narrations for {generated} stories", "errors": errors} | |
| async def improve_narrations(payload: dict): | |
| """Improve/add emotional narrations for a story using AI.""" | |
| import requests | |
| story_id = payload.get("story_id", "") | |
| emotion = payload.get("emotion", "dramatic") | |
| if not story_id: | |
| return {"status": "error", "message": "story_id required"} | |
| api_key = os.getenv("GEMINI_API_KEY", "") | |
| if not api_key: | |
| return {"status": "error", "message": "GEMINI_API_KEY not configured"} | |
| story_key = f"stories/{story_id}/story.md" | |
| content = r2_read_text(story_key) | |
| if not content: | |
| return {"status": "error", "message": "Story not found"} | |
| # Emotion descriptions for the prompt | |
| emotion_styles = { | |
| "neutral": "narration neutre et descriptive", | |
| "dramatic": "narration dramatique, lente et grave, avec des pauses", | |
| "horror": "narration terrifiante, suspenseuse, voix glaciale", | |
| "whisper": "narration murmurée, intime, craintive", | |
| "tense": "narration tendue, stressante, pressée", | |
| "mysterious": "narration mystérieuse, énigmatique, envoûtante", | |
| "sad": "narration triste, mélancolique, plaintive" | |
| } | |
| emotion_style = emotion_styles.get(emotion, emotion_styles["dramatic"]) | |
| # Extract scenes | |
| scene_pattern = re.compile(r'##\s*[Ss]c[eè]ne\s*(\d+)[^:]*:\s*([^\n]+)', re.MULTILINE) | |
| scenes = scene_pattern.findall(content) | |
| if not scenes: | |
| return {"status": "error", "message": "No scenes found in story"} | |
| improved_count = 0 | |
| new_content = content | |
| for scene_num, scene_title in scenes: | |
| # Generate improved narration | |
| prompt = f"""Génère une narration courte (15-25 mots) en français québécois pour une scène d'horreur/anime sombre. | |
| Titre de la scène: {scene_title} | |
| Style demandé: {emotion_style} | |
| La narration doit: | |
| - Être en français québécois authentique | |
| - Avoir un ton émotionnel fort selon le style demandé | |
| - Utiliser des mots évocateurs de mort, paranormal, mystère | |
| - Maximum 25 mots | |
| - Être parfaite pour Parler-TTS (rythme fluide) | |
| Réponds UNIQUEMENT avec la narration, sans guillemets.""" | |
| try: | |
| url = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:generateContent?key={api_key}" | |
| resp = requests.post(url, json={ | |
| "contents": [{"parts": [{"text": prompt}]}] | |
| }, timeout=30) | |
| data = resp.json() | |
| narration = data.get("candidates", [{}])[0].get("content", {}).get("parts", [{}])[0].get("text", "").strip() | |
| narration = narration.strip('"').strip("'").strip() | |
| # Find and replace or add narration | |
| scene_marker = f"## Scene {scene_num} : {scene_title}" | |
| # Check if narration already exists | |
| narration_pattern = rf"{re.escape(scene_marker)}.*?\*\*Narration\s+:\*\*\s*\"[^\"]+\"" | |
| if re.search(narration_pattern, new_content, re.DOTALL): | |
| # Replace existing narration | |
| new_narration = f'{scene_marker}\n**Narration :** "{narration}"' | |
| new_content = re.sub(narration_pattern, new_narration, new_content, flags=re.DOTALL) | |
| else: | |
| # Add new narration after scene marker | |
| new_content = new_content.replace( | |
| f"## Scene {scene_num} : {scene_title}", | |
| f"## Scene {scene_num} : {scene_title}\n**Narration :** \"{narration}\"" | |
| ) | |
| improved_count += 1 | |
| print(f"Improved narration for scene {scene_num}: {narration[:50]}...") | |
| except Exception as e: | |
| print(f"Error improving scene {scene_num}: {e}") | |
| # Save updated story | |
| if improved_count > 0: | |
| client = get_r2_client() | |
| bucket = os.getenv("R2_BUCKET", "darkmedia-x-studio") | |
| client.put_object(Bucket=bucket, Key=story_key, Body=new_content.encode('utf-8')) | |
| return {"status": "success", "message": f"Improved {improved_count} narrations with {emotion} emotion"} | |
| async def improve_visuals(payload: dict): | |
| """Improve visual prompts for a story using AI.""" | |
| import requests | |
| story_id = payload.get("story_id", "") | |
| style = payload.get("style", "dark anime") | |
| if not story_id: | |
| return {"status": "error", "message": "story_id required"} | |
| api_key = os.getenv("GEMINI_API_KEY", "") | |
| if not api_key: | |
| return {"status": "error", "message": "GEMINI_API_KEY not configured"} | |
| story_key = f"stories/{story_id}/story.md" | |
| content = r2_read_text(story_key) | |
| if not content: | |
| return {"status": "error", "message": "Story not found"} | |
| # Extract scenes | |
| scene_pattern = re.compile(r'##\s*[Ss]c[eè]ne\s*(\d+)[^:]*:\s*([^\n]+)', re.MULTILINE) | |
| scenes = scene_pattern.findall(content) | |
| if not scenes: | |
| return {"status": "error", "message": "No scenes found in story"} | |
| improved_count = 0 | |
| new_content = content | |
| for scene_num, scene_title in scenes: | |
| prompt = f"""Améliore ce prompt d'image pour une génération stable diffusion / FLUX. | |
| Style: {style} (horreur, sombre, cinématographique) | |
| Scène: {scene_title} | |
| Génère un prompt descriptif, riche en détails visuels (éclairage, textures, atmosphère), optimisé pour l'IA générative. | |
| Maximum 60 mots. | |
| Réponds UNIQUEMENT avec le prompt amélioré en anglais, sans guillemets.""" | |
| try: | |
| url = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:generateContent?key={api_key}" | |
| resp = requests.post(url, json={ | |
| "contents": [{"parts": [{"text": prompt}]}] | |
| }, timeout=30) | |
| data = resp.json() | |
| visual_prompt = data.get("candidates", [{}])[0].get("content", {}).get("parts", [{}])[0].get("text", "").strip() | |
| visual_prompt = visual_prompt.strip('"').strip("'").strip() | |
| scene_marker = f"## Scene {scene_num} : {scene_title}" | |
| # Check if visual prompt already exists | |
| prompt_pattern = rf"{re.escape(scene_marker)}.*?\*\*Prompt d'image\s*:\*\*\s*\"[^\"]+\"" | |
| if re.search(prompt_pattern, new_content, re.DOTALL): | |
| new_visual = f'{scene_marker}\n**Prompt d\'image :** "{visual_prompt}"' | |
| new_content = re.sub(prompt_pattern, new_visual, new_content, flags=re.DOTALL) | |
| else: | |
| # Add after scene marker (or after narration if exists) | |
| new_content = new_content.replace( | |
| scene_marker, | |
| f"{scene_marker}\n**Prompt d'image :** \"{visual_prompt}\"" | |
| ) | |
| improved_count += 1 | |
| except Exception as e: | |
| print(f"Error improving visuals for scene {scene_num}: {e}") | |
| if improved_count > 0: | |
| client = get_r2_client() | |
| bucket = os.getenv("R2_BUCKET", "darkmedia-x-studio") | |
| client.put_object(Bucket=bucket, Key=story_key, Body=new_content.encode('utf-8')) | |
| return {"status": "success", "message": f"Improved {improved_count} visual prompts"} | |
| async def gemini_ask(payload: dict): | |
| """Call Gemini API for AI assistance.""" | |
| import requests | |
| api_key = os.getenv("GEMINI_API_KEY", "") | |
| if not api_key: | |
| return {"status": "error", "message": "GEMINI_API_KEY not configured"} | |
| prompt = payload.get("prompt", "") | |
| model = payload.get("model", "gemini-2.0-flash") | |
| try: | |
| url = f"https://generativelanguage.googleapis.com/v1beta/models/{model}:generateContent?key={api_key}" | |
| resp = requests.post(url, json={ | |
| "contents": [{"parts": [{"text": prompt}]}] | |
| }, timeout=60) | |
| data = resp.json() | |
| text = data.get("candidates", [{}])[0].get("content", {}).get("parts", [{}])[0].get("text", "") | |
| return {"status": "success", "response": text} | |
| except Exception as e: | |
| return {"status": "error", "message": str(e)} | |
| async def launch_pipeline(payload: dict): | |
| """Trigger image generation pipeline using HF Inference API (FLUX).""" | |
| import json | |
| import threading | |
| import io | |
| import re | |
| story_id = payload.get("story_id", "") | |
| story_path = payload.get("story_path", "") | |
| config = payload.get("config", {}) | |
| regenerate = payload.get("regenerate_all", False) | |
| if not story_id: | |
| return {"status": "error", "message": "story_id required"} | |
| # Write initial task status | |
| task_file = STATE_DIR / "current_task.json" | |
| task_data = { | |
| "story": story_id, | |
| "story_path": story_path, | |
| "step": "READING_STORY", | |
| "progress": 0, | |
| "config": config, | |
| "timestamp": time.time() | |
| } | |
| try: | |
| STATE_DIR.mkdir(parents=True, exist_ok=True) | |
| with open(task_file, "w") as f: | |
| json.dump(task_data, f) | |
| except Exception: | |
| pass | |
| cache.invalidate("status") | |
| def run_voice_generation(client, bucket, task_file, task_data, config): | |
| """Generate TTS audio for all scenes.""" | |
| import edge_tts | |
| import tempfile | |
| import asyncio | |
| story_key = f"stories/{story_id}/story.md" | |
| try: | |
| story_content = r2_read_text(story_key) | |
| if not story_content: | |
| with open(task_file, "w") as f: | |
| json.dump({**task_data, "step": "ERROR", "error": "Story not found"}, f) | |
| return | |
| except Exception as e: | |
| with open(task_file, "w") as f: | |
| json.dump({**task_data, "step": "ERROR", "error": str(e)}, f) | |
| return | |
| scenes = [] | |
| scene_pattern = re.compile(r'#+\s*[Ss]c[eè]ne\s*(\d+)[\s:]*\n*(.*?)(?=#+\s*[Ss]c[eè]ne|\Z)', re.DOTALL) | |
| matches = scene_pattern.findall(story_content) | |
| if not matches: | |
| blocks = [b.strip() for b in story_content.split('\n\n') if b.strip()] | |
| for i, block in enumerate(blocks[:10], 1): | |
| scenes.append({"id": i, "text": block[:500]}) | |
| else: | |
| for num, content in matches: | |
| # Extract only the Narration text (not the visual prompt) | |
| # Handle both "Narration:" and "Narration :" formats | |
| narration_match = re.search(r'\*\*Narration\s*:\*\*\s*["\']?([^"\']+)["\']?', content, re.IGNORECASE) | |
| if narration_match: | |
| scene_text = narration_match.group(1).strip() | |
| else: | |
| # Fallback: remove Visual Prompt sections | |
| clean_content = re.sub(r'\*\*Prompt d\'image:.*?(?=\*\*|\Z)', '', content, flags=re.DOTALL) | |
| clean_content = re.sub(r'\*\*Visual Prompt:.*?(?=\*\*|\Z)', '', clean_content, flags=re.DOTALL) | |
| scene_text = clean_content.strip()[:500] | |
| scenes.append({"id": int(num), "text": scene_text[:500]}) | |
| if not scenes: | |
| with open(task_file, "w") as f: | |
| json.dump({**task_data, "step": "ERROR", "error": "No scenes found"}, f) | |
| return | |
| voice = config.get("voice", "fr-FR-DeniseNeural") | |
| rate = config.get("rate", "+0%") | |
| total_scenes = len(scenes) | |
| # Check if using Parler-TTS | |
| use_parler = voice.startswith("parler-tts") | |
| # Get voice style for edge-tts modifications | |
| voice_style = config.get("voice_style", "neutral") | |
| # Style modifications for edge-tts (rate and pitch adjustments) | |
| style_mods = { | |
| "neutral": {"rate": "+0%", "pitch": "+0Hz"}, | |
| "dramatic": {"rate": "-20%", "pitch": "-20Hz"}, # Slow, grave | |
| "horror": {"rate": "-10%", "pitch": "+10Hz"}, # Suspenseful | |
| "whisper": {"rate": "-30%", "pitch": "-30Hz"}, # Quiet | |
| "tense": {"rate": "+10%", "pitch": "+15Hz"}, # Stressed | |
| "mysterious": {"rate": "-15%", "pitch": "-10Hz"}, # Enigmatic | |
| "sad": {"rate": "-25%", "pitch": "-25Hz"}, # Melancholic | |
| } | |
| # Use style rate/pitch if using edge-tts, otherwise use config | |
| style_rate = style_mods.get(voice_style, {}).get("rate", rate) | |
| style_pitch = style_mods.get(voice_style, {}).get("pitch", "+0Hz") | |
| async def generate_single_voice(scene_text, audio_key): | |
| """Async helper to generate a single voice file.""" | |
| if use_parler: | |
| # Use Parler-TTS via HF Inference API | |
| try: | |
| from huggingface_hub import InferenceClient | |
| hf_token = os.getenv("HF_TOKEN") | |
| if not hf_token: | |
| raise Exception("HF_TOKEN not configured") | |
| client_tts = InferenceClient("parler-tts/parler-tts-mini-v1", token=hf_token) | |
| audio_data = client_tts.text_to_speech(scene_text) | |
| # Upload to R2 | |
| client.put_object( | |
| Bucket=bucket, | |
| Key=audio_key, | |
| Body=audio_data, | |
| ContentType="audio/wav" | |
| ) | |
| except Exception as e: | |
| print(f"ERROR: Parler-TTS failed: {e}") | |
| # Fallback to edge-tts | |
| use_parler = False | |
| # Retry with edge-tts | |
| with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as tmp: | |
| comm = edge_tts.Communicate(scene_text, "fr-FR-DeniseNeural", rate=rate) | |
| await comm.save(tmp.name) | |
| tmp.seek(0) | |
| with open(tmp.name, "rb") as f: | |
| client.put_object( | |
| Bucket=bucket, | |
| Key=audio_key, | |
| Body=f.read(), | |
| ContentType="audio/mp3" | |
| ) | |
| else: | |
| # Use edge-tts with style modifications | |
| with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as tmp: | |
| # Use edge-tts with direct rate/pitch parameters | |
| # Edge-tts accepts rate like "+20%" or "-20%" and pitch like "+50Hz" or "-20Hz" | |
| if voice_style != "neutral": | |
| # Convert style_rate and style_pitch to edge-tts format | |
| # Remove the % for pitch handling | |
| comm = edge_tts.Communicate(scene_text, voice, rate=style_rate, pitch=style_pitch) | |
| await comm.save(tmp.name) | |
| else: | |
| comm = edge_tts.Communicate(scene_text, voice, rate=rate) | |
| await comm.save(tmp.name) | |
| tmp.seek(0) | |
| with open(tmp.name, "rb") as f: | |
| client.put_object( | |
| Bucket=bucket, | |
| Key=audio_key, | |
| Body=f.read(), | |
| ContentType="audio/mp3" | |
| ) | |
| for idx, scene in enumerate(scenes): | |
| scene_num = scene["id"] | |
| scene_text = scene["text"] | |
| progress = int((idx / total_scenes) * 100) | |
| with open(task_file, "w") as f: | |
| json.dump({ | |
| **task_data, | |
| "step": f"GENERATING_VOICE_{scene_num}", | |
| "progress": progress, | |
| "current_scene": scene_num, | |
| "total_scenes": total_scenes | |
| }, f) | |
| cache.invalidate("status") | |
| audio_key = f"stories/{story_id}/assets/audio/scene_{scene_num}.mp3" | |
| try: | |
| asyncio.run(generate_single_voice(scene_text, audio_key)) | |
| except Exception as e: | |
| import traceback | |
| error_msg = f"TTS error scene {scene_num}: {str(e)}\n{traceback.format_exc()}" | |
| with open(task_file, "w") as f: | |
| json.dump({**task_data, "step": "ERROR", "error": error_msg, "progress": progress}, f) | |
| return | |
| with open(task_file, "w") as f: | |
| json.dump({ | |
| **task_data, | |
| "step": "DONE", | |
| "progress": 100, | |
| "voices_generated": total_scenes | |
| }, f) | |
| cache.invalidate("status") | |
| cache.invalidate("stories") | |
| def run_music_generation(client, bucket, task_file, task_data, config): | |
| """Generate background music for the story.""" | |
| music_style = config.get("music_style", "dark_ambient") | |
| with open(task_file, "w") as f: | |
| json.dump({ | |
| **task_data, | |
| "step": "GENERATING_MUSIC", | |
| "progress": 50, | |
| "music_style": music_style | |
| }, f) | |
| cache.invalidate("status") | |
| # Placeholder: music generation not yet implemented | |
| # TODO: Integrate with MusicGen or similar | |
| with open(task_file, "w") as f: | |
| json.dump({ | |
| **task_data, | |
| "step": "DONE", | |
| "progress": 100, | |
| "music_generated": True | |
| }, f) | |
| cache.invalidate("status") | |
| def run_generation(): | |
| """Background thread for image/voice/music generation.""" | |
| hf_token = os.getenv("HF_TOKEN", "") | |
| if not hf_token: | |
| with open(task_file, "w") as f: | |
| json.dump({**task_data, "step": "ERROR", "error": "HF_TOKEN not configured"}, f) | |
| return | |
| client = get_r2_client() | |
| bucket = os.getenv("R2_BUCKET", "darkmedia-x-studio") | |
| # Check if this is a voice-only generation | |
| if config.get("voice_only", False): | |
| run_voice_generation(client, bucket, task_file, task_data, config) | |
| return | |
| # Check if this is a music-only generation | |
| if config.get("music_only", False): | |
| run_music_generation(client, bucket, task_file, task_data, config) | |
| return | |
| # Check if this is an images-only generation | |
| is_images_only = config.get("images_only", False) | |
| # 1. Read story content | |
| story_key = f"stories/{story_id}/story.md" | |
| try: | |
| story_content = r2_read_text(story_key) | |
| if not story_content: | |
| with open(task_file, "w") as f: | |
| json.dump({**task_data, "step": "ERROR", "error": "Story not found in R2"}, f) | |
| return | |
| except Exception as e: | |
| with open(task_file, "w") as f: | |
| json.dump({**task_data, "step": "ERROR", "error": str(e)}, f) | |
| return | |
| # 2. Parse scenes from story markdown | |
| scenes = [] | |
| scene_pattern = re.compile(r'#+\s*[Ss]c[eè]ne\s*(\d+)[\s:]*\n*(.*?)(?=#+\s*[Ss]c[eè]ne|\Z)', re.DOTALL) | |
| matches = scene_pattern.findall(story_content) | |
| if not matches: | |
| blocks = [b.strip() for b in story_content.split('\n\n') if b.strip()] | |
| for i, block in enumerate(blocks[:10], 1): | |
| scenes.append({"id": i, "prompt": block[:500]}) | |
| else: | |
| for num, content in matches: | |
| scenes.append({"id": int(num), "prompt": content.strip()[:500]}) | |
| if not scenes: | |
| with open(task_file, "w") as f: | |
| json.dump({**task_data, "step": "ERROR", "error": "No scenes found in story"}, f) | |
| return | |
| total_scenes = len(scenes) | |
| # 3. Generate images for each scene via Gradio Client (ZeroGPU) | |
| image_model = config.get("image_model", "flux") | |
| model_spaces = { | |
| "flux": os.getenv("HF_IMAGE_SPACE_FLUX", "cybermedia/flux-zerogpu"), | |
| "ssd": os.getenv("HF_IMAGE_SPACE_SSD", "cybermedia/ssd-zerogpu"), | |
| "sdxl": os.getenv("HF_IMAGE_SPACE_SDXL", "cybermedia/sdxl-zerogpu"), | |
| "playground": os.getenv("HF_IMAGE_SPACE_PLAYGROUND", "cybermedia/playground-zerogpu"), | |
| } | |
| gradio_space = model_spaces.get(image_model, model_spaces["flux"]) | |
| fallback_to_flux = False | |
| for idx, scene in enumerate(scenes): | |
| scene_num = scene["id"] | |
| scene_prompt = scene["prompt"] | |
| progress = int((idx / total_scenes) * 100) | |
| with open(task_file, "w") as f: | |
| json.dump({ | |
| **task_data, | |
| "step": f"GENERATING_SCENE_{scene_num}", | |
| "progress": progress, | |
| "current_scene": scene_num, | |
| "total_scenes": total_scenes | |
| }, f) | |
| cache.invalidate("status") | |
| img_key = f"stories/{story_id}/assets/images/scene_{scene_num}.png" | |
| if not regenerate: | |
| try: | |
| client.head_object(Bucket=bucket, Key=img_key) | |
| continue | |
| except Exception: | |
| pass | |
| enhanced_prompt = f"horror anime style, dark atmosphere, cinematic, {scene_prompt}" | |
| # Try selected model, fallback to FLUX on failure | |
| spaces_to_try = [gradio_space] | |
| if image_model != "flux" and not fallback_to_flux: | |
| spaces_to_try.append(model_spaces["flux"]) | |
| generation_success = False | |
| for space_id in spaces_to_try: | |
| if space_id != gradio_space: | |
| fallback_to_flux = True | |
| print(f"WARNING: {image_model.upper()} failed, falling back to FLUX") | |
| try: | |
| from gradio_client import Client | |
| gradio_client = Client(space_id) | |
| result = gradio_client.predict( | |
| prompt=enhanced_prompt, | |
| steps=20, | |
| seed=42, | |
| api_name="/generate" | |
| ) | |
| if isinstance(result, str): | |
| with open(result, "rb") as f: | |
| client.put_object( | |
| Bucket=bucket, | |
| Key=img_key, | |
| Body=f.read(), | |
| ContentType="image/png" | |
| ) | |
| generation_success = True | |
| break | |
| else: | |
| error_msg = f"{space_id.split('/')[-1].upper()} Gradio returned unexpected result: {result}" | |
| print(f"WARNING: {error_msg}") | |
| continue | |
| except Exception as e: | |
| error_msg = str(e) | |
| # Check for quota exhaustion - fail fast instead of retrying | |
| if "quota" in error_msg.lower() or "exceeded" in error_msg.lower() or "rate limit" in error_msg.lower(): | |
| quota_error = f"⚠️ QUOTA ÉPUISÉ: {space_id.split('/')[-1].upper()} - {error_msg[:150]}" | |
| print(quota_error) | |
| with open(task_file, "w") as f: | |
| json.dump({**task_data, "step": "ERROR", "error": quota_error, "progress": progress, "quota_exhausted": True}, f) | |
| cache.invalidate("status") | |
| return | |
| # Check for specific model errors - "does not support image input" means model type doesn't support this feature | |
| elif "does not support image input" in error_msg: | |
| model_error = f"⚠️ MODÈLE INCOMPATIBLE: {space_id.split('/')[-1].upper()} - Ce modèle ne supporte pas cette fonctionnalité" | |
| print(model_error) | |
| # Don't try other models if they're all ZeroGPU models - they'll likely fail too | |
| if "zerogpu" in space_id.lower(): | |
| with open(task_file, "w") as f: | |
| json.dump({**task_data, "step": "ERROR", "error": model_error, "progress": progress}, f) | |
| cache.invalidate("status") | |
| return | |
| elif "Cannot read" in error_msg: | |
| # This usually means model can't handle the input | |
| cannot_read_error = f"⚠️ ERREUR MODÈLE: {space_id.split('/')[-1].upper()} - {error_msg[:100]}" | |
| print(cannot_read_error) | |
| # Fail fast on ZeroGPU models | |
| if "zerogpu" in space_id.lower() or "does not support" in error_msg.lower(): | |
| with open(task_file, "w") as f: | |
| json.dump({**task_data, "step": "ERROR", "error": cannot_read_error, "progress": progress}, f) | |
| cache.invalidate("status") | |
| return | |
| else: | |
| print(f"WARNING: {space_id.split('/')[-1].upper()} Gradio error: {error_msg[:200]}") | |
| continue | |
| if not generation_success: | |
| with open(task_file, "w") as f: | |
| json.dump({**task_data, "step": "ERROR", "error": f"All models failed for scene {scene_num}", "progress": progress}, f) | |
| return | |
| # Save model used in story metadata | |
| model_used = image_model if not fallback_to_flux or image_model == "flux" else image_model | |
| metadata_key = f"stories/{story_id}/metadata.json" | |
| import json as json_module | |
| try: | |
| existing_metadata = {} | |
| try: | |
| existing = r2_read_text(metadata_key) | |
| if existing: | |
| existing_metadata = json_module.loads(existing) | |
| except: | |
| pass | |
| existing_metadata["image_model"] = model_used | |
| client.put_object( | |
| Bucket=bucket, | |
| Key=metadata_key, | |
| Body=json_module.dumps(existing_metadata), | |
| ContentType="application/json" | |
| ) | |
| except Exception as e: | |
| print(f"WARNING: Could not save model metadata: {e}") | |
| with open(task_file, "w") as f: | |
| json.dump({ | |
| **task_data, | |
| "step": "DONE", | |
| "progress": 100, | |
| "images_generated": total_scenes, | |
| "image_model_used": model_used | |
| }, f) | |
| cache.invalidate("status") | |
| cache.invalidate("stories") | |
| cache.invalidate("generated_images") | |
| # Start generation in background thread | |
| thread = threading.Thread(target=run_generation, daemon=True) | |
| thread.start() | |
| return {"status": "success", "message": f"Generation started for '{story_id}'", "job_id": story_id} | |
| async def stop_pipeline(): | |
| """Stop the current running generation.""" | |
| task_file = STATE_DIR / "current_task.json" | |
| try: | |
| # Read current task | |
| import json | |
| if task_file.exists(): | |
| with open(task_file, "r") as f: | |
| task = json.load(f) | |
| # Update status to stopped | |
| task["step"] = "STOPPED" | |
| task["stopped"] = True | |
| with open(task_file, "w") as f: | |
| json.dump(task, f) | |
| cache.invalidate("status") | |
| return {"status": "success", "message": "Task stopped"} | |
| except Exception as e: | |
| return {"status": "error", "message": str(e)} | |
| async def remix_story(payload: dict): | |
| """Remix a story with new settings.""" | |
| story_id = payload.get("story_id", "") | |
| if not story_id: | |
| return {"status": "error", "message": "story_id required"} | |
| return {"status": "success", "message": f"Remix queued for '{story_id}'"} | |
| async def apply_art_style(payload: dict): | |
| """Apply an art style filter to all images of a story.""" | |
| story_id = payload.get("story_id", "") | |
| style = payload.get("style", "none") | |
| if not story_id or style == "none": | |
| return {"status": "error", "message": "story_id and style required"} | |
| client = get_r2_client() | |
| if client is None: | |
| return {"status": "error", "message": "R2 not configured"} | |
| bucket = os.getenv("R2_BUCKET", "darkmedia-x-studio") | |
| # List images for this story | |
| prefix = f"stories/{story_id}/assets/images/" | |
| r2_keys = r2_list(prefix=prefix) | |
| image_keys = [k for k in r2_keys if k.lower().endswith((".png", ".jpg", ".jpeg"))] | |
| if not image_keys: | |
| return {"status": "error", "message": "No images found for this story"} | |
| processed = 0 | |
| errors = [] | |
| for key in image_keys: | |
| try: | |
| # Download image from R2 | |
| resp = client.get_object(Bucket=bucket, Key=key) | |
| img_data = resp["Body"].read() | |
| img = Image.open(io.BytesIO(img_data)) | |
| # Apply style | |
| styled_img = apply_style_filter(img, style) | |
| # Save back to R2 with _styled suffix | |
| output = io.BytesIO() | |
| styled_img.save(output, format="PNG") | |
| output.seek(0) | |
| styled_key = key.replace(".png", "_styled.png").replace(".jpg", "_styled.png").replace(".jpeg", "_styled.png") | |
| client.put_object(Bucket=bucket, Key=styled_key, Body=output.getvalue(), ContentType="image/png") | |
| processed += 1 | |
| except Exception as e: | |
| errors.append(f"{key}: {str(e)}") | |
| cache.invalidate("generated_images") | |
| cache.invalidate("library") | |
| cache.invalidate("stories") | |
| return { | |
| "status": "success", | |
| "message": f"Applied '{style}' to {processed} images", | |
| "processed": processed, | |
| "errors": errors[:5] # Limit error output | |
| } | |
| def apply_style_filter(img, style): | |
| """Apply a style filter to a PIL Image.""" | |
| if style == "oil_paint": | |
| return img.filter(ImageFilter.SMOOTH_MORE).filter(ImageFilter.EDGE_ENHANCE_MORE) | |
| elif style == "charcoal": | |
| return ImageOps.grayscale(img).point(lambda x: 255 if x > 128 else 0, mode="1").convert("RGB") | |
| elif style == "sketch": | |
| gray = ImageOps.grayscale(img) | |
| inverted = ImageOps.invert(gray) | |
| blurred = inverted.filter(ImageFilter.GaussianBlur(radius=2)) | |
| return ImageOps.grayscale(Image.blend(gray, blurred, 0.5)).convert("RGB") | |
| elif style == "vintage": | |
| enhancer = ImageEnhance.Color(img) | |
| img = enhancer.enhance(0.7) | |
| enhancer = ImageEnhance.Brightness(img) | |
| img = enhancer.enhance(0.9) | |
| enhancer = ImageEnhance.Contrast(img) | |
| img = enhancer.enhance(1.2) | |
| # Warm tint | |
| r, g, b = img.split() | |
| r = r.point(lambda x: min(255, int(x * 1.15 + 15))) | |
| b = b.point(lambda x: max(0, int(x * 0.85))) | |
| return Image.merge("RGB", (r, g, b)) | |
| elif style == "night_vision": | |
| gray = ImageOps.grayscale(img) | |
| r = gray.point(lambda x: int(x * 0.2)) | |
| g = gray.point(lambda x: min(255, int(x * 1.3))) | |
| b = gray.point(lambda x: int(x * 0.2)) | |
| return Image.merge("RGB", (r, g, b)) | |
| elif style == "pixel_art": | |
| w, h = img.size | |
| small = img.resize((w // 8, h // 8), Image.NEAREST) | |
| return small.resize((w, h), Image.NEAREST) | |
| elif style == "vhs_static": | |
| import random | |
| pixels = img.load() | |
| w, h = img.size | |
| for y in range(0, h, 3): | |
| for x in range(w): | |
| r, g, b = pixels[x, y][:3] | |
| noise = random.randint(-30, 30) | |
| pixels[x, y] = ( | |
| min(255, max(0, r + noise + 10)), | |
| min(255, max(0, g + noise)), | |
| min(255, max(0, b + noise - 10)) | |
| ) | |
| return img | |
| return img | |
| # --- Config --- | |
| async def get_config(request: Request): | |
| return { | |
| "ai_mode": os.getenv("AI_MODE", "cloud"), | |
| "image_gen_mode": os.getenv("IMAGE_GEN_MODE", "gemini"), | |
| "voice_gen_mode": os.getenv("VOICE_GEN_MODE", "edge-tts"), | |
| } | |
| # --- Health Check --- | |
| async def health(): | |
| return {"status": "ok", "service": "darkmedia-x-api"} | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |