File size: 12,556 Bytes
343eed9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
#!/usr/bin/env python3
"""
scripts/utils.py

Utility functions used across the DarkMedia‑X dashboard server.
All helpers are pure, type‑annotated and focus on a single task,
making the codebase easier to test and maintain.
"""

import json
import os
import re
import subprocess
import sys
import time
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple

import psutil


# ----------------------------------------------------------------------
# JSON helpers
# ----------------------------------------------------------------------
def load_json(file_path: Path) -> Dict[str, Any]:
    """Load a JSON file and return its content as a dict."""
    with file_path.open("r", encoding="utf-8") as f:
        return json.load(f)


def write_json(file_path: Path, data: Dict[str, Any]) -> None:
    """Write a dict to a JSON file with UTF‑8 encoding."""
    with file_path.open("w", encoding="utf-8") as f:
        json.dump(data, f, ensure_ascii=False, indent=2)


# ----------------------------------------------------------------------
# Process helpers
# ----------------------------------------------------------------------
def is_process_running(command_substring: str) -> bool:
    """
    Return True if any running process contains ``command_substring`` in its
    name or full command line (case‑insensitive).
    """
    for proc in psutil.process_iter(["name", "cmdline"]):
        try:
            name = proc.info["name"] or ""
            cmdline = " ".join(proc.info["cmdline"] or [])
            if command_substring.lower() in name.lower() or command_substring.lower() in cmdline.lower():
                return True
        except (psutil.NoSuchProcess, psutil.AccessDenied):
            continue
    return False


def start_detached_process(
    executable: str,
    args: List[str],
    cwd: Path,
    creationflags: int = 0,
) -> subprocess.Popen:
    """
    Launch a detached subprocess (cross‑platform).  Returns the Popen object.
    """
    kwargs: Dict[str, Any] = {"cwd": str(cwd), "close_fds": True}
    if sys.platform == "win32":
        kwargs["creationflags"] = creationflags
    return subprocess.Popen([executable] + args, **kwargs)


# ----------------------------------------------------------------------
# GPU statistics
# ----------------------------------------------------------------------
def get_gpu_stats() -> Optional[Dict[str, str]]:
    """Query nvidia‑smi and return a dict with load, temperature and VRAM usage."""
    try:
        cmd = (
            "nvidia-smi --query-gpu=utilization.gpu,temperature.gpu,"
            "memory.used,memory.total --format=csv,noheader,nounits"
        )
        kwargs: Dict[str, Any] = {"shell": True}
        if sys.platform == "win32":
            kwargs["creationflags"] = 0x08000000
        raw = subprocess.check_output(cmd, **kwargs).decode("utf-8").strip()
        if raw:
            load, temp, used, total = [p.strip() for p in raw.split(",")]
            return {
                "load": f"{load}%",
                "temp": f"{temp}°C",
                "vram_used": f"{round(int(used) / 1024, 1)} GB",
                "vram_total": f"{round(int(total) / 1024, 1)} GB",
            }
    except Exception as exc:
        print(f"[GPU Stats] {exc}")
    return None


# ----------------------------------------------------------------------
# File‑system helpers
# ----------------------------------------------------------------------
def ensure_directory(path: Path) -> None:
    """Create ``path`` if it does not exist."""
    path.mkdir(parents=True, exist_ok=True)


def count_scenes(file_path: Path) -> int:
    """
    Count the number of scenes in a markdown file by looking for '## Scene' or '## Scène'.
    """
    if not file_path.exists():
        return 0
    try:
        content = file_path.read_text(encoding="utf-8")
        # Matches '## Scene X' or '## Scène X' at the beginning of any line
        matches = re.findall(r"^##\s*(?:Scene|Scène)\s*\d+", content, flags=re.MULTILINE | re.IGNORECASE)
        return len(matches)
    except Exception:
        return 0


def list_markdown_stories(root: Path) -> List[Dict[str, Any]]:
    """
    Scan ``root`` for markdown story files and return a list of dictionaries
    containing id, title, relative path, category and processed flag.
    """
    stories: List[Dict[str, Any]] = []
    if not root.exists():
        return stories

    for dirpath, _, filenames in os.walk(str(root)):
        for filename in filenames:
            if not filename.endswith(".md") or filename.startswith("README") or filename == "music_prompt.md":
                continue
            file_path = Path(dirpath) / filename
            rel_path = file_path.relative_to(root)
            story_id = str(rel_path.parent).replace("\\", "/")
            title = filename.replace(".md", "").replace("_", " ").strip()
            if title.lower() in {"story", "index", "readme"}:
                title = story_id.replace("_", " ").strip()
            story_dir = file_path.parent
            image_dir = story_dir / "assets" / "images"
            if not image_dir.exists():
                image_dir = story_dir / "images"
            
            image_count = 0
            if image_dir.exists() and image_dir.is_dir():
                image_count = len([f for f in os.listdir(str(image_dir)) if f.lower().endswith(('.png', '.jpg', '.jpeg'))])

            total_scenes = count_scenes(file_path)
            if total_scenes == 0: total_scenes = 10 # Fallback

            processed = (story_dir / "final_video.mp4").exists() or (
                story_dir / f"TT_{story_id.replace(' ', '_').replace('/', '_')}_final.mp4"
            ).exists()
            
            stories.append(
                {
                    "id": story_id,
                    "title": title,
                    "path": str(rel_path).replace("\\", "/"),
                    "category": file_path.parent.parent.name
                    if file_path.parent.parent != root
                    else "General",
                    "processed": processed,
                    "image_count": image_count,
                    "total_scenes": total_scenes,
                    "ready": image_count >= total_scenes
                }
            )
    return sorted(stories, key=lambda s: s["title"])


def list_videos(videos_dir: Path) -> List[Dict[str, Any]]:
    """Return a list of video metadata dictionaries from ``videos_dir``."""
    videos: List[Dict[str, Any]] = []
    if not videos_dir.exists():
        return videos
    for dirpath, _, filenames in os.walk(str(videos_dir)):
        for filename in filenames:
            if filename.endswith(".mp4") and "_published" not in dirpath:
                file_path = Path(dirpath) / filename
                rel_path = file_path.relative_to(videos_dir)
                videos.append(
                    {
                        "title": filename.replace(".mp4", "").replace("_", " "),
                        "filename": filename,
                        "path": str(rel_path),
                        "story": file_path.parent.name.replace("_", " "),
                        "timestamp": file_path.stat().st_mtime,
                    }
                )
    return sorted(videos, key=lambda v: v["timestamp"], reverse=True)


def list_generated_images(task_file: Path, stories_root: Path) -> List[Dict[str, Any]]:
    """
    Return the most recent generated images (max 50) for the currently active story.
    """
    images: List[Dict[str, Any]] = []
    if not task_file.exists():
        return images
    task = load_json(task_file)
    story_path = Path(task.get("story_path", ""))
    story_id = task.get("story_id", "")

    # Primary location: assets/images inside the explicit story_path
    img_dir = story_path / "assets" / "images"
    if not img_dir.is_dir() and story_id:
        # Fallback: search for a folder that contains the story_id and an assets/images sub‑folder
        for dirpath, _, _ in os.walk(str(stories_root)):
            if story_id in dirpath and "assets" in dirpath:
                candidate = Path(dirpath) / "images"
                if candidate.is_dir():
                    img_dir = candidate
                    break

    if img_dir.is_dir():
        for img_file in sorted(img_dir.glob("*.png")) + sorted(img_dir.glob("*.jpg")):
            if img_file.is_file():
                images.append(
                    {
                        "path": f"/assets/images/{img_file.name}",
                        "filename": img_file.name,
                        "timestamp": img_file.stat().st_mtime,
                        "size": img_file.stat().st_size,
                    }
                )
    images.sort(key=lambda i: i["timestamp"], reverse=True)
    return images[:50]


def list_library_images(stories_root: Path) -> List[Dict[str, Any]]:
    """
    Scan all story folders for generated scene images and return a flat list
    (max 200 entries) ordered by newest first.
    """
    library: List[Dict[str, Any]] = []
    for dirpath, _, filenames in os.walk(str(stories_root)):
        if "assets" in dirpath and "images" in dirpath:
            story_dir = Path(dirpath).parent.parent
            story_name = story_dir.name
            for filename in filenames:
                if filename.endswith(".png") and filename.startswith("scene_"):
                    rel_path = Path(dirpath).joinpath(filename).relative_to(stories_root)
                    url = f"/stories/{rel_path.as_posix()}"
                    library.append(
                        {
                            "story": story_name,
                            "filename": filename,
                            "url": url,
                            "timestamp": (Path(dirpath) / filename).stat().st_mtime,
                        }
                    )
    library.sort(key=lambda i: i["timestamp"], reverse=True)
    return library[:200]


# ----------------------------------------------------------------------
# Audio / TTS helpers
# ----------------------------------------------------------------------
def clean_old_previews(voice_dir: Path, max_age_seconds: int = 300) -> None:
    """Remove preview files older than ``max_age_seconds``."""
    for old_file in voice_dir.glob("ui_preview_*.mp3"):
        if time.time() - old_file.stat().st_mtime > max_age_seconds:
            try:
                old_file.unlink()
            except Exception:
                pass


def generate_tts(
    text: str,
    voice: str,
    rate: str,
    pitch: str,
    effect: str,
    voice_dir: Path,
    python_exe: str,
) -> Tuple[bool, str]:
    """
    Run ``edge_tts`` to synthesize ``text`` and optionally post‑process with FFmpeg.
    Returns ``(success, message_or_url)``.
    """
    import imageio_ffmpeg

    ts = int(time.time() * 1000)
    raw_path = voice_dir / f"raw_{ts}.mp3"
    out_path = voice_dir / f"ui_preview_{ts}.mp3"

    # 1️⃣  Generate raw TTS
    tts_cmd = [
        python_exe,
        "-m",
        "edge_tts",
        "--voice",
        voice,
        "--text",
        text[:200],
        "--write-media",
        str(raw_path),
        f"--rate={rate}",
        f"--pitch={pitch}",
    ]
    result = subprocess.run(tts_cmd, capture_output=True, text=True)
    if result.returncode != 0:
        return False, f"TTS engine error: {result.stderr}"

    # 2️⃣  Apply optional FFmpeg filter
    ffmpeg_filters = {
        "demonic": "aecho=0.8:0.88:60:0.4,asetrate=44100*0.8,aresample=44100",
        "ghostly": "aecho=0.8:0.88:100:0.6,asetrate=44100*1.2,aresample=44100",
        "radio": "highpass=f=500,lowpass=f=3000",
        "reverb": "aecho=0.8:0.88:60:0.4",
        "none": "anull",
    }
    filter_chain = ffmpeg_filters.get(effect, "anull")
    ffmpeg_exe = "ffmpeg"
    try:
        subprocess.run([ffmpeg_exe, "-version"], capture_output=True, check=True)
    except Exception:
        ffmpeg_exe = imageio_ffmpeg.get_ffmpeg_exe()

    ffmpeg_cmd = [
        ffmpeg_exe,
        "-y",
        "-i",
        str(raw_path),
        "-af",
        filter_chain,
        str(out_path),
    ]
    ffmpeg_res = subprocess.run(ffmpeg_cmd, capture_output=True, text=True)
    if ffmpeg_res.returncode != 0:
        # Fallback: copy raw file unchanged
        out_path.write_bytes(raw_path.read_bytes())

    # Clean up temporary raw file
    try:
        raw_path.unlink()
    except Exception:
        pass

    return True, f"/assets/voice_samples/{out_path.name}"