File size: 10,973 Bytes
7344bef
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
from __future__ import annotations

import os
import time
from datetime import datetime
from pathlib import Path

from shared.utils.video_metadata import read_metadata_from_video, save_video_metadata
from shared.utils.virtual_media import build_virtual_media_path


PLUGIN_NAME = "Process Full Video"
PROCESS_FULL_VIDEO_METADATA_KEY = "fill_process_video"


def format_time_hms(seconds: float | None) -> str:
    total_seconds = max(0, int(round(float(seconds or 0.0))))
    minutes, seconds_only = divmod(total_seconds, 60)
    hours, minutes = divmod(minutes, 60)
    return f"{hours:02d}:{minutes:02d}:{seconds_only:02d}"


def make_continuation_signature(file_path: str) -> dict | None:
    if not isinstance(file_path, str) or not os.path.isfile(file_path):
        return None
    try:
        stats = os.stat(file_path)
    except OSError:
        return None
    return {"path": str(Path(file_path).resolve()), "size": int(stats.st_size), "mtime_ns": int(getattr(stats, "st_mtime_ns", int(stats.st_mtime * 1_000_000_000)))}


def continuation_signature_key(signature: dict) -> tuple[str, int, int] | None:
    if not isinstance(signature, dict):
        return None
    path = str(signature.get("path") or "").strip()
    if len(path) == 0:
        return None
    try:
        return str(Path(path).resolve()), max(0, int(signature.get("size"))), max(0, int(signature.get("mtime_ns")))
    except (TypeError, ValueError):
        return None


def normalize_merged_continuation_signatures(signatures) -> list[dict]:
    normalized: list[dict] = []
    seen: set[tuple[str, int, int]] = set()
    for signature in list(signatures or []):
        key = continuation_signature_key(signature)
        if key is None or key in seen:
            continue
        seen.add(key)
        path, size, mtime_ns = key
        normalized.append({"path": path, "size": size, "mtime_ns": mtime_ns})
    return normalized


def append_merged_continuation_signature(signatures: list[dict], signature: dict | None) -> list[dict]:
    return normalize_merged_continuation_signatures([*list(signatures or []), *( [] if signature is None else [signature] )])


def read_merged_continuation_signatures(output_path: str) -> list[dict]:
    if not isinstance(output_path, str) or not os.path.isfile(output_path):
        return []
    metadata = read_metadata_from_video(output_path)
    if not isinstance(metadata, dict):
        return []
    process_metadata = metadata.get(PROCESS_FULL_VIDEO_METADATA_KEY)
    return [] if not isinstance(process_metadata, dict) else normalize_merged_continuation_signatures(process_metadata.get("merged_continuations"))


def store_process_progress(output_path: str, *, written_unique_frames: int, merged_signatures: list[dict], verbose_level: int = 0) -> bool:
    if not isinstance(output_path, str) or not os.path.isfile(output_path):
        return False
    metadata = read_metadata_from_video(output_path)
    if not isinstance(metadata, dict) or len(metadata) == 0:
        return False
    process_metadata = metadata.get(PROCESS_FULL_VIDEO_METADATA_KEY)
    process_metadata = {} if not isinstance(process_metadata, dict) else process_metadata.copy()
    process_metadata["written_unique_frames"] = int(written_unique_frames)
    process_metadata["merged_continuations"] = normalize_merged_continuation_signatures(merged_signatures)
    metadata[PROCESS_FULL_VIDEO_METADATA_KEY] = process_metadata
    metadata["video_length"] = int(written_unique_frames)
    metadata["frame_count"] = int(written_unique_frames)
    if save_video_metadata(output_path, metadata, allow_inplace_update=True, verbose_level=verbose_level):
        return True
    print(f"[Process Full Video] Warning: failed to store process progress in {output_path}")
    return False


def read_recorded_written_unique_frames(output_path: str) -> int:
    if not isinstance(output_path, str) or not os.path.isfile(output_path):
        return 0
    metadata = read_metadata_from_video(output_path)
    if not isinstance(metadata, dict):
        return 0
    process_metadata = metadata.get(PROCESS_FULL_VIDEO_METADATA_KEY)
    if not isinstance(process_metadata, dict):
        return 0
    try:
        return max(0, int(process_metadata.get("written_unique_frames") or 0))
    except (TypeError, ValueError):
        return 0


def normalize_identity_path(path_value: str) -> str:
    text = str(path_value or "").strip()
    if len(text) == 0:
        return ""
    try:
        return str(Path(text).resolve()).casefold()
    except (OSError, RuntimeError, ValueError):
        return text.casefold()


def read_output_identity(output_path: str) -> tuple[str, str, str] | None:
    if not isinstance(output_path, str) or not os.path.isfile(output_path):
        return None
    metadata = read_metadata_from_video(output_path)
    if not isinstance(metadata, dict) or len(metadata) == 0:
        return None
    process_metadata = metadata.get(PROCESS_FULL_VIDEO_METADATA_KEY)
    process_metadata = process_metadata if isinstance(process_metadata, dict) else {}
    process_name = str(process_metadata.get("process") or "").strip()
    if len(process_name) == 0:
        segments = metadata.get("segments")
        first_segment = next((segment for segment in list(segments or []) if isinstance(segment, dict)), None)
        if first_segment is not None:
            process_name = str(first_segment.get("process") or "").strip()
    source_video = str(process_metadata.get("source_video") or metadata.get("source_video") or "").strip()
    source_segment = str(process_metadata.get("source_segment") or metadata.get("source_segment") or "").strip()
    return process_name, source_video, source_segment


def get_output_identity_mismatch_message(output_path: str, *, process_name: str, source_path: str, source_segment: str) -> str | None:
    if not isinstance(output_path, str) or not os.path.isfile(output_path):
        return None
    identity = read_output_identity(output_path)
    if identity is None:
        return f"Output file already exists at {output_path}, but it does not contain readable WanGP metadata. Processing was stopped."
    existing_process, existing_source_video, existing_source_segment = identity
    mismatches: list[str] = []
    if str(existing_process or "").strip() != str(process_name or "").strip():
        mismatches.append("process")
    if normalize_identity_path(existing_source_video) != normalize_identity_path(source_path):
        mismatches.append("source_video")
    if str(existing_source_segment or "").strip() != str(source_segment or "").strip():
        mismatches.append("source_segment")
    if len(mismatches) == 0:
        return None
    mismatch_text = ", ".join(mismatches)
    return f"Output file already exists at {output_path}, but its metadata does not match the current {mismatch_text}. Processing was stopped."


def log_existing_output_metadata(output_path: str, verbose_level: int) -> None:
    if int(verbose_level or 0) < 2 or not os.path.isfile(output_path):
        return
    metadata = read_metadata_from_video(output_path)
    if not isinstance(metadata, dict) or len(metadata) == 0:
        print(f"[Process Full Video] Existing output metadata not found in {output_path}")
        return
    creation_date = str(metadata.get("creation_date") or "unknown")
    generation_time = metadata.get("generation_time")
    generation_time_text = "unknown" if generation_time in (None, "") else str(generation_time)
    print(f"[Process Full Video] Existing output metadata found: creation_date={creation_date}, generation_time={generation_time_text}")


def store_output_metadata(output_path: str, last_segment_path: str | None, *, source_path: str, process_name: str, source_start_seconds: float, start_frame: int, fps_float: float, selected_audio_track: int | None, total_generation_time: float, actual_frame_count: int, process_metadata: dict | None = None, verbose_level: int = 0) -> bool:
    if not os.path.isfile(output_path):
        return False
    metadata = {}
    if last_segment_path and os.path.isfile(last_segment_path):
        loaded_metadata = read_metadata_from_video(last_segment_path)
        if isinstance(loaded_metadata, dict) and len(loaded_metadata) > 0:
            metadata = loaded_metadata
        elif Path(last_segment_path).resolve() != Path(output_path).resolve():
            print(f"[Process Full Video] Warning: failed to read WanGP metadata from {last_segment_path}")
    elif last_segment_path:
        print(f"[Process Full Video] Warning: no segment metadata source was available for {output_path}")
    final_metadata = metadata.copy()
    source_name = os.path.basename(source_path)
    end_frame = max(int(start_frame), int(start_frame) + max(0, int(actual_frame_count)) - 1)
    start_seconds = max(0.0, float(source_start_seconds or 0.0))
    end_seconds = start_seconds + max(0, int(actual_frame_count)) / float(fps_float)
    final_metadata["video_guide"] = build_virtual_media_path(source_path, start_frame=start_frame, end_frame=end_frame, audio_track_no=selected_audio_track)
    final_metadata["video_length"] = int(actual_frame_count)
    final_metadata["frame_count"] = int(actual_frame_count)
    final_metadata["generation_time"] = max(0.0, float(total_generation_time))
    operation_comment = f'{PLUGIN_NAME}: {process_name} on "{source_name}" Start { format_time_hms(start_seconds) } End { format_time_hms(end_seconds) }'
    existing_comments = str(final_metadata.get("comments") or "").strip()
    final_metadata["comments"] = operation_comment if len(existing_comments) == 0 else f"{existing_comments}\n{operation_comment}"
    final_metadata["segments"] = [{
        "plugin": PLUGIN_NAME,
        "process": process_name,
        "source_file": source_name,
        "start_frame": int(start_frame),
        "end_frame": end_frame,
        "start_seconds": start_seconds,
        "end_seconds": end_seconds,
    }]
    final_metadata[PROCESS_FULL_VIDEO_METADATA_KEY] = process_metadata.copy() if isinstance(process_metadata, dict) else {}
    for key in ("plugin", "process", "source_video", "source_segment", "video_source"):
        final_metadata.pop(key, None)
    final_metadata["creation_date"] = datetime.now().isoformat(timespec="seconds")
    final_metadata["creation_timestamp"] = int(time.time())
    if save_video_metadata(output_path, final_metadata, allow_inplace_update=True, verbose_level=verbose_level):
        return True
    print(f"[Process Full Video] Warning: failed to write metadata to {output_path}")
    return False


def read_metadata_generation_time(video_path: str | None) -> float:
    if not video_path or not os.path.isfile(video_path):
        return 0.0
    metadata = read_metadata_from_video(video_path)
    if not isinstance(metadata, dict):
        return 0.0
    try:
        return max(0.0, float(metadata.get("generation_time") or 0.0))
    except (TypeError, ValueError):
        return 0.0