Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| scripts/utils.py | |
| Utility functions used across the DarkMedia‑X dashboard server. | |
| All helpers are pure, type‑annotated and focus on a single task, | |
| making the codebase easier to test and maintain. | |
| """ | |
| import json | |
| import os | |
| import re | |
| import subprocess | |
| import sys | |
| import time | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional, Tuple | |
| import psutil | |
| # ---------------------------------------------------------------------- | |
| # JSON helpers | |
| # ---------------------------------------------------------------------- | |
| def load_json(file_path: Path) -> Dict[str, Any]: | |
| """Load a JSON file and return its content as a dict.""" | |
| with file_path.open("r", encoding="utf-8") as f: | |
| return json.load(f) | |
| def write_json(file_path: Path, data: Dict[str, Any]) -> None: | |
| """Write a dict to a JSON file with UTF‑8 encoding.""" | |
| with file_path.open("w", encoding="utf-8") as f: | |
| json.dump(data, f, ensure_ascii=False, indent=2) | |
| # ---------------------------------------------------------------------- | |
| # Process helpers | |
| # ---------------------------------------------------------------------- | |
| def is_process_running(command_substring: str) -> bool: | |
| """ | |
| Return True if any running process contains ``command_substring`` in its | |
| name or full command line (case‑insensitive). | |
| """ | |
| for proc in psutil.process_iter(["name", "cmdline"]): | |
| try: | |
| name = proc.info["name"] or "" | |
| cmdline = " ".join(proc.info["cmdline"] or []) | |
| if command_substring.lower() in name.lower() or command_substring.lower() in cmdline.lower(): | |
| return True | |
| except (psutil.NoSuchProcess, psutil.AccessDenied): | |
| continue | |
| return False | |
| def start_detached_process( | |
| executable: str, | |
| args: List[str], | |
| cwd: Path, | |
| creationflags: int = 0, | |
| ) -> subprocess.Popen: | |
| """ | |
| Launch a detached subprocess (cross‑platform). Returns the Popen object. | |
| """ | |
| kwargs: Dict[str, Any] = {"cwd": str(cwd), "close_fds": True} | |
| if sys.platform == "win32": | |
| kwargs["creationflags"] = creationflags | |
| return subprocess.Popen([executable] + args, **kwargs) | |
| # ---------------------------------------------------------------------- | |
| # GPU statistics | |
| # ---------------------------------------------------------------------- | |
| def get_gpu_stats() -> Optional[Dict[str, str]]: | |
| """Query nvidia‑smi and return a dict with load, temperature and VRAM usage.""" | |
| try: | |
| cmd = ( | |
| "nvidia-smi --query-gpu=utilization.gpu,temperature.gpu," | |
| "memory.used,memory.total --format=csv,noheader,nounits" | |
| ) | |
| kwargs: Dict[str, Any] = {"shell": True} | |
| if sys.platform == "win32": | |
| kwargs["creationflags"] = 0x08000000 | |
| raw = subprocess.check_output(cmd, **kwargs).decode("utf-8").strip() | |
| if raw: | |
| load, temp, used, total = [p.strip() for p in raw.split(",")] | |
| return { | |
| "load": f"{load}%", | |
| "temp": f"{temp}°C", | |
| "vram_used": f"{round(int(used) / 1024, 1)} GB", | |
| "vram_total": f"{round(int(total) / 1024, 1)} GB", | |
| } | |
| except Exception as exc: | |
| print(f"[GPU Stats] {exc}") | |
| return None | |
| # ---------------------------------------------------------------------- | |
| # File‑system helpers | |
| # ---------------------------------------------------------------------- | |
| def ensure_directory(path: Path) -> None: | |
| """Create ``path`` if it does not exist.""" | |
| path.mkdir(parents=True, exist_ok=True) | |
| def count_scenes(file_path: Path) -> int: | |
| """ | |
| Count the number of scenes in a markdown file by looking for '## Scene' or '## Scène'. | |
| """ | |
| if not file_path.exists(): | |
| return 0 | |
| try: | |
| content = file_path.read_text(encoding="utf-8") | |
| # Matches '## Scene X' or '## Scène X' at the beginning of any line | |
| matches = re.findall(r"^##\s*(?:Scene|Scène)\s*\d+", content, flags=re.MULTILINE | re.IGNORECASE) | |
| return len(matches) | |
| except Exception: | |
| return 0 | |
| def list_markdown_stories(root: Path) -> List[Dict[str, Any]]: | |
| """ | |
| Scan ``root`` for markdown story files and return a list of dictionaries | |
| containing id, title, relative path, category and processed flag. | |
| """ | |
| stories: List[Dict[str, Any]] = [] | |
| if not root.exists(): | |
| return stories | |
| for dirpath, _, filenames in os.walk(str(root)): | |
| for filename in filenames: | |
| if not filename.endswith(".md") or filename.startswith("README") or filename == "music_prompt.md": | |
| continue | |
| file_path = Path(dirpath) / filename | |
| rel_path = file_path.relative_to(root) | |
| story_id = str(rel_path.parent).replace("\\", "/") | |
| title = filename.replace(".md", "").replace("_", " ").strip() | |
| if title.lower() in {"story", "index", "readme"}: | |
| title = story_id.replace("_", " ").strip() | |
| story_dir = file_path.parent | |
| image_dir = story_dir / "assets" / "images" | |
| if not image_dir.exists(): | |
| image_dir = story_dir / "images" | |
| image_count = 0 | |
| if image_dir.exists() and image_dir.is_dir(): | |
| image_count = len([f for f in os.listdir(str(image_dir)) if f.lower().endswith(('.png', '.jpg', '.jpeg'))]) | |
| total_scenes = count_scenes(file_path) | |
| if total_scenes == 0: total_scenes = 10 # Fallback | |
| processed = (story_dir / "final_video.mp4").exists() or ( | |
| story_dir / f"TT_{story_id.replace(' ', '_').replace('/', '_')}_final.mp4" | |
| ).exists() | |
| stories.append( | |
| { | |
| "id": story_id, | |
| "title": title, | |
| "path": str(rel_path).replace("\\", "/"), | |
| "category": file_path.parent.parent.name | |
| if file_path.parent.parent != root | |
| else "General", | |
| "processed": processed, | |
| "image_count": image_count, | |
| "total_scenes": total_scenes, | |
| "ready": image_count >= total_scenes | |
| } | |
| ) | |
| return sorted(stories, key=lambda s: s["title"]) | |
| def list_videos(videos_dir: Path) -> List[Dict[str, Any]]: | |
| """Return a list of video metadata dictionaries from ``videos_dir``.""" | |
| videos: List[Dict[str, Any]] = [] | |
| if not videos_dir.exists(): | |
| return videos | |
| for dirpath, _, filenames in os.walk(str(videos_dir)): | |
| for filename in filenames: | |
| if filename.endswith(".mp4") and "_published" not in dirpath: | |
| file_path = Path(dirpath) / filename | |
| rel_path = file_path.relative_to(videos_dir) | |
| videos.append( | |
| { | |
| "title": filename.replace(".mp4", "").replace("_", " "), | |
| "filename": filename, | |
| "path": str(rel_path), | |
| "story": file_path.parent.name.replace("_", " "), | |
| "timestamp": file_path.stat().st_mtime, | |
| } | |
| ) | |
| return sorted(videos, key=lambda v: v["timestamp"], reverse=True) | |
| def list_generated_images(task_file: Path, stories_root: Path) -> List[Dict[str, Any]]: | |
| """ | |
| Return the most recent generated images (max 50) for the currently active story. | |
| """ | |
| images: List[Dict[str, Any]] = [] | |
| if not task_file.exists(): | |
| return images | |
| task = load_json(task_file) | |
| story_path = Path(task.get("story_path", "")) | |
| story_id = task.get("story_id", "") | |
| # Primary location: assets/images inside the explicit story_path | |
| img_dir = story_path / "assets" / "images" | |
| if not img_dir.is_dir() and story_id: | |
| # Fallback: search for a folder that contains the story_id and an assets/images sub‑folder | |
| for dirpath, _, _ in os.walk(str(stories_root)): | |
| if story_id in dirpath and "assets" in dirpath: | |
| candidate = Path(dirpath) / "images" | |
| if candidate.is_dir(): | |
| img_dir = candidate | |
| break | |
| if img_dir.is_dir(): | |
| for img_file in sorted(img_dir.glob("*.png")) + sorted(img_dir.glob("*.jpg")): | |
| if img_file.is_file(): | |
| images.append( | |
| { | |
| "path": f"/assets/images/{img_file.name}", | |
| "filename": img_file.name, | |
| "timestamp": img_file.stat().st_mtime, | |
| "size": img_file.stat().st_size, | |
| } | |
| ) | |
| images.sort(key=lambda i: i["timestamp"], reverse=True) | |
| return images[:50] | |
| def list_library_images(stories_root: Path) -> List[Dict[str, Any]]: | |
| """ | |
| Scan all story folders for generated scene images and return a flat list | |
| (max 200 entries) ordered by newest first. | |
| """ | |
| library: List[Dict[str, Any]] = [] | |
| for dirpath, _, filenames in os.walk(str(stories_root)): | |
| if "assets" in dirpath and "images" in dirpath: | |
| story_dir = Path(dirpath).parent.parent | |
| story_name = story_dir.name | |
| for filename in filenames: | |
| if filename.endswith(".png") and filename.startswith("scene_"): | |
| rel_path = Path(dirpath).joinpath(filename).relative_to(stories_root) | |
| url = f"/stories/{rel_path.as_posix()}" | |
| library.append( | |
| { | |
| "story": story_name, | |
| "filename": filename, | |
| "url": url, | |
| "timestamp": (Path(dirpath) / filename).stat().st_mtime, | |
| } | |
| ) | |
| library.sort(key=lambda i: i["timestamp"], reverse=True) | |
| return library[:200] | |
| # ---------------------------------------------------------------------- | |
| # Audio / TTS helpers | |
| # ---------------------------------------------------------------------- | |
| def clean_old_previews(voice_dir: Path, max_age_seconds: int = 300) -> None: | |
| """Remove preview files older than ``max_age_seconds``.""" | |
| for old_file in voice_dir.glob("ui_preview_*.mp3"): | |
| if time.time() - old_file.stat().st_mtime > max_age_seconds: | |
| try: | |
| old_file.unlink() | |
| except Exception: | |
| pass | |
| def generate_tts( | |
| text: str, | |
| voice: str, | |
| rate: str, | |
| pitch: str, | |
| effect: str, | |
| voice_dir: Path, | |
| python_exe: str, | |
| ) -> Tuple[bool, str]: | |
| """ | |
| Run ``edge_tts`` to synthesize ``text`` and optionally post‑process with FFmpeg. | |
| Returns ``(success, message_or_url)``. | |
| """ | |
| import imageio_ffmpeg | |
| ts = int(time.time() * 1000) | |
| raw_path = voice_dir / f"raw_{ts}.mp3" | |
| out_path = voice_dir / f"ui_preview_{ts}.mp3" | |
| # 1️⃣ Generate raw TTS | |
| tts_cmd = [ | |
| python_exe, | |
| "-m", | |
| "edge_tts", | |
| "--voice", | |
| voice, | |
| "--text", | |
| text[:200], | |
| "--write-media", | |
| str(raw_path), | |
| f"--rate={rate}", | |
| f"--pitch={pitch}", | |
| ] | |
| result = subprocess.run(tts_cmd, capture_output=True, text=True) | |
| if result.returncode != 0: | |
| return False, f"TTS engine error: {result.stderr}" | |
| # 2️⃣ Apply optional FFmpeg filter | |
| ffmpeg_filters = { | |
| "demonic": "aecho=0.8:0.88:60:0.4,asetrate=44100*0.8,aresample=44100", | |
| "ghostly": "aecho=0.8:0.88:100:0.6,asetrate=44100*1.2,aresample=44100", | |
| "radio": "highpass=f=500,lowpass=f=3000", | |
| "reverb": "aecho=0.8:0.88:60:0.4", | |
| "none": "anull", | |
| } | |
| filter_chain = ffmpeg_filters.get(effect, "anull") | |
| ffmpeg_exe = "ffmpeg" | |
| try: | |
| subprocess.run([ffmpeg_exe, "-version"], capture_output=True, check=True) | |
| except Exception: | |
| ffmpeg_exe = imageio_ffmpeg.get_ffmpeg_exe() | |
| ffmpeg_cmd = [ | |
| ffmpeg_exe, | |
| "-y", | |
| "-i", | |
| str(raw_path), | |
| "-af", | |
| filter_chain, | |
| str(out_path), | |
| ] | |
| ffmpeg_res = subprocess.run(ffmpeg_cmd, capture_output=True, text=True) | |
| if ffmpeg_res.returncode != 0: | |
| # Fallback: copy raw file unchanged | |
| out_path.write_bytes(raw_path.read_bytes()) | |
| # Clean up temporary raw file | |
| try: | |
| raw_path.unlink() | |
| except Exception: | |
| pass | |
| return True, f"/assets/voice_samples/{out_path.name}" |