File size: 10,973 Bytes
7344bef | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 | from __future__ import annotations
import os
import time
from datetime import datetime
from pathlib import Path
from shared.utils.video_metadata import read_metadata_from_video, save_video_metadata
from shared.utils.virtual_media import build_virtual_media_path
PLUGIN_NAME = "Process Full Video"
PROCESS_FULL_VIDEO_METADATA_KEY = "fill_process_video"
def format_time_hms(seconds: float | None) -> str:
total_seconds = max(0, int(round(float(seconds or 0.0))))
minutes, seconds_only = divmod(total_seconds, 60)
hours, minutes = divmod(minutes, 60)
return f"{hours:02d}:{minutes:02d}:{seconds_only:02d}"
def make_continuation_signature(file_path: str) -> dict | None:
if not isinstance(file_path, str) or not os.path.isfile(file_path):
return None
try:
stats = os.stat(file_path)
except OSError:
return None
return {"path": str(Path(file_path).resolve()), "size": int(stats.st_size), "mtime_ns": int(getattr(stats, "st_mtime_ns", int(stats.st_mtime * 1_000_000_000)))}
def continuation_signature_key(signature: dict) -> tuple[str, int, int] | None:
if not isinstance(signature, dict):
return None
path = str(signature.get("path") or "").strip()
if len(path) == 0:
return None
try:
return str(Path(path).resolve()), max(0, int(signature.get("size"))), max(0, int(signature.get("mtime_ns")))
except (TypeError, ValueError):
return None
def normalize_merged_continuation_signatures(signatures) -> list[dict]:
normalized: list[dict] = []
seen: set[tuple[str, int, int]] = set()
for signature in list(signatures or []):
key = continuation_signature_key(signature)
if key is None or key in seen:
continue
seen.add(key)
path, size, mtime_ns = key
normalized.append({"path": path, "size": size, "mtime_ns": mtime_ns})
return normalized
def append_merged_continuation_signature(signatures: list[dict], signature: dict | None) -> list[dict]:
return normalize_merged_continuation_signatures([*list(signatures or []), *( [] if signature is None else [signature] )])
def read_merged_continuation_signatures(output_path: str) -> list[dict]:
if not isinstance(output_path, str) or not os.path.isfile(output_path):
return []
metadata = read_metadata_from_video(output_path)
if not isinstance(metadata, dict):
return []
process_metadata = metadata.get(PROCESS_FULL_VIDEO_METADATA_KEY)
return [] if not isinstance(process_metadata, dict) else normalize_merged_continuation_signatures(process_metadata.get("merged_continuations"))
def store_process_progress(output_path: str, *, written_unique_frames: int, merged_signatures: list[dict], verbose_level: int = 0) -> bool:
if not isinstance(output_path, str) or not os.path.isfile(output_path):
return False
metadata = read_metadata_from_video(output_path)
if not isinstance(metadata, dict) or len(metadata) == 0:
return False
process_metadata = metadata.get(PROCESS_FULL_VIDEO_METADATA_KEY)
process_metadata = {} if not isinstance(process_metadata, dict) else process_metadata.copy()
process_metadata["written_unique_frames"] = int(written_unique_frames)
process_metadata["merged_continuations"] = normalize_merged_continuation_signatures(merged_signatures)
metadata[PROCESS_FULL_VIDEO_METADATA_KEY] = process_metadata
metadata["video_length"] = int(written_unique_frames)
metadata["frame_count"] = int(written_unique_frames)
if save_video_metadata(output_path, metadata, allow_inplace_update=True, verbose_level=verbose_level):
return True
print(f"[Process Full Video] Warning: failed to store process progress in {output_path}")
return False
def read_recorded_written_unique_frames(output_path: str) -> int:
if not isinstance(output_path, str) or not os.path.isfile(output_path):
return 0
metadata = read_metadata_from_video(output_path)
if not isinstance(metadata, dict):
return 0
process_metadata = metadata.get(PROCESS_FULL_VIDEO_METADATA_KEY)
if not isinstance(process_metadata, dict):
return 0
try:
return max(0, int(process_metadata.get("written_unique_frames") or 0))
except (TypeError, ValueError):
return 0
def normalize_identity_path(path_value: str) -> str:
text = str(path_value or "").strip()
if len(text) == 0:
return ""
try:
return str(Path(text).resolve()).casefold()
except (OSError, RuntimeError, ValueError):
return text.casefold()
def read_output_identity(output_path: str) -> tuple[str, str, str] | None:
if not isinstance(output_path, str) or not os.path.isfile(output_path):
return None
metadata = read_metadata_from_video(output_path)
if not isinstance(metadata, dict) or len(metadata) == 0:
return None
process_metadata = metadata.get(PROCESS_FULL_VIDEO_METADATA_KEY)
process_metadata = process_metadata if isinstance(process_metadata, dict) else {}
process_name = str(process_metadata.get("process") or "").strip()
if len(process_name) == 0:
segments = metadata.get("segments")
first_segment = next((segment for segment in list(segments or []) if isinstance(segment, dict)), None)
if first_segment is not None:
process_name = str(first_segment.get("process") or "").strip()
source_video = str(process_metadata.get("source_video") or metadata.get("source_video") or "").strip()
source_segment = str(process_metadata.get("source_segment") or metadata.get("source_segment") or "").strip()
return process_name, source_video, source_segment
def get_output_identity_mismatch_message(output_path: str, *, process_name: str, source_path: str, source_segment: str) -> str | None:
if not isinstance(output_path, str) or not os.path.isfile(output_path):
return None
identity = read_output_identity(output_path)
if identity is None:
return f"Output file already exists at {output_path}, but it does not contain readable WanGP metadata. Processing was stopped."
existing_process, existing_source_video, existing_source_segment = identity
mismatches: list[str] = []
if str(existing_process or "").strip() != str(process_name or "").strip():
mismatches.append("process")
if normalize_identity_path(existing_source_video) != normalize_identity_path(source_path):
mismatches.append("source_video")
if str(existing_source_segment or "").strip() != str(source_segment or "").strip():
mismatches.append("source_segment")
if len(mismatches) == 0:
return None
mismatch_text = ", ".join(mismatches)
return f"Output file already exists at {output_path}, but its metadata does not match the current {mismatch_text}. Processing was stopped."
def log_existing_output_metadata(output_path: str, verbose_level: int) -> None:
if int(verbose_level or 0) < 2 or not os.path.isfile(output_path):
return
metadata = read_metadata_from_video(output_path)
if not isinstance(metadata, dict) or len(metadata) == 0:
print(f"[Process Full Video] Existing output metadata not found in {output_path}")
return
creation_date = str(metadata.get("creation_date") or "unknown")
generation_time = metadata.get("generation_time")
generation_time_text = "unknown" if generation_time in (None, "") else str(generation_time)
print(f"[Process Full Video] Existing output metadata found: creation_date={creation_date}, generation_time={generation_time_text}")
def store_output_metadata(output_path: str, last_segment_path: str | None, *, source_path: str, process_name: str, source_start_seconds: float, start_frame: int, fps_float: float, selected_audio_track: int | None, total_generation_time: float, actual_frame_count: int, process_metadata: dict | None = None, verbose_level: int = 0) -> bool:
if not os.path.isfile(output_path):
return False
metadata = {}
if last_segment_path and os.path.isfile(last_segment_path):
loaded_metadata = read_metadata_from_video(last_segment_path)
if isinstance(loaded_metadata, dict) and len(loaded_metadata) > 0:
metadata = loaded_metadata
elif Path(last_segment_path).resolve() != Path(output_path).resolve():
print(f"[Process Full Video] Warning: failed to read WanGP metadata from {last_segment_path}")
elif last_segment_path:
print(f"[Process Full Video] Warning: no segment metadata source was available for {output_path}")
final_metadata = metadata.copy()
source_name = os.path.basename(source_path)
end_frame = max(int(start_frame), int(start_frame) + max(0, int(actual_frame_count)) - 1)
start_seconds = max(0.0, float(source_start_seconds or 0.0))
end_seconds = start_seconds + max(0, int(actual_frame_count)) / float(fps_float)
final_metadata["video_guide"] = build_virtual_media_path(source_path, start_frame=start_frame, end_frame=end_frame, audio_track_no=selected_audio_track)
final_metadata["video_length"] = int(actual_frame_count)
final_metadata["frame_count"] = int(actual_frame_count)
final_metadata["generation_time"] = max(0.0, float(total_generation_time))
operation_comment = f'{PLUGIN_NAME}: {process_name} on "{source_name}" Start { format_time_hms(start_seconds) } End { format_time_hms(end_seconds) }'
existing_comments = str(final_metadata.get("comments") or "").strip()
final_metadata["comments"] = operation_comment if len(existing_comments) == 0 else f"{existing_comments}\n{operation_comment}"
final_metadata["segments"] = [{
"plugin": PLUGIN_NAME,
"process": process_name,
"source_file": source_name,
"start_frame": int(start_frame),
"end_frame": end_frame,
"start_seconds": start_seconds,
"end_seconds": end_seconds,
}]
final_metadata[PROCESS_FULL_VIDEO_METADATA_KEY] = process_metadata.copy() if isinstance(process_metadata, dict) else {}
for key in ("plugin", "process", "source_video", "source_segment", "video_source"):
final_metadata.pop(key, None)
final_metadata["creation_date"] = datetime.now().isoformat(timespec="seconds")
final_metadata["creation_timestamp"] = int(time.time())
if save_video_metadata(output_path, final_metadata, allow_inplace_update=True, verbose_level=verbose_level):
return True
print(f"[Process Full Video] Warning: failed to write metadata to {output_path}")
return False
def read_metadata_generation_time(video_path: str | None) -> float:
if not video_path or not os.path.isfile(video_path):
return 0.0
metadata = read_metadata_from_video(video_path)
if not isinstance(metadata, dict):
return 0.0
try:
return max(0.0, float(metadata.get("generation_time") or 0.0))
except (TypeError, ValueError):
return 0.0
|