| from __future__ import annotations |
|
|
| import os |
| import time |
| from datetime import datetime |
| from pathlib import Path |
|
|
| from shared.utils.video_metadata import read_metadata_from_video, save_video_metadata |
| from shared.utils.virtual_media import build_virtual_media_path |
|
|
|
|
| PLUGIN_NAME = "Process Full Video" |
| PROCESS_FULL_VIDEO_METADATA_KEY = "fill_process_video" |
|
|
|
|
| def format_time_hms(seconds: float | None) -> str: |
| total_seconds = max(0, int(round(float(seconds or 0.0)))) |
| minutes, seconds_only = divmod(total_seconds, 60) |
| hours, minutes = divmod(minutes, 60) |
| return f"{hours:02d}:{minutes:02d}:{seconds_only:02d}" |
|
|
|
|
| def make_continuation_signature(file_path: str) -> dict | None: |
| if not isinstance(file_path, str) or not os.path.isfile(file_path): |
| return None |
| try: |
| stats = os.stat(file_path) |
| except OSError: |
| return None |
| return {"path": str(Path(file_path).resolve()), "size": int(stats.st_size), "mtime_ns": int(getattr(stats, "st_mtime_ns", int(stats.st_mtime * 1_000_000_000)))} |
|
|
|
|
| def continuation_signature_key(signature: dict) -> tuple[str, int, int] | None: |
| if not isinstance(signature, dict): |
| return None |
| path = str(signature.get("path") or "").strip() |
| if len(path) == 0: |
| return None |
| try: |
| return str(Path(path).resolve()), max(0, int(signature.get("size"))), max(0, int(signature.get("mtime_ns"))) |
| except (TypeError, ValueError): |
| return None |
|
|
|
|
| def normalize_merged_continuation_signatures(signatures) -> list[dict]: |
| normalized: list[dict] = [] |
| seen: set[tuple[str, int, int]] = set() |
| for signature in list(signatures or []): |
| key = continuation_signature_key(signature) |
| if key is None or key in seen: |
| continue |
| seen.add(key) |
| path, size, mtime_ns = key |
| normalized.append({"path": path, "size": size, "mtime_ns": mtime_ns}) |
| return normalized |
|
|
|
|
| def append_merged_continuation_signature(signatures: list[dict], signature: dict | None) -> list[dict]: |
| return normalize_merged_continuation_signatures([*list(signatures or []), *( [] if signature is None else [signature] )]) |
|
|
|
|
| def read_merged_continuation_signatures(output_path: str) -> list[dict]: |
| if not isinstance(output_path, str) or not os.path.isfile(output_path): |
| return [] |
| metadata = read_metadata_from_video(output_path) |
| if not isinstance(metadata, dict): |
| return [] |
| process_metadata = metadata.get(PROCESS_FULL_VIDEO_METADATA_KEY) |
| return [] if not isinstance(process_metadata, dict) else normalize_merged_continuation_signatures(process_metadata.get("merged_continuations")) |
|
|
|
|
| def store_process_progress(output_path: str, *, written_unique_frames: int, merged_signatures: list[dict], verbose_level: int = 0) -> bool: |
| if not isinstance(output_path, str) or not os.path.isfile(output_path): |
| return False |
| metadata = read_metadata_from_video(output_path) |
| if not isinstance(metadata, dict) or len(metadata) == 0: |
| return False |
| process_metadata = metadata.get(PROCESS_FULL_VIDEO_METADATA_KEY) |
| process_metadata = {} if not isinstance(process_metadata, dict) else process_metadata.copy() |
| process_metadata["written_unique_frames"] = int(written_unique_frames) |
| process_metadata["merged_continuations"] = normalize_merged_continuation_signatures(merged_signatures) |
| metadata[PROCESS_FULL_VIDEO_METADATA_KEY] = process_metadata |
| metadata["video_length"] = int(written_unique_frames) |
| metadata["frame_count"] = int(written_unique_frames) |
| if save_video_metadata(output_path, metadata, allow_inplace_update=True, verbose_level=verbose_level): |
| return True |
| print(f"[Process Full Video] Warning: failed to store process progress in {output_path}") |
| return False |
|
|
|
|
| def read_recorded_written_unique_frames(output_path: str) -> int: |
| if not isinstance(output_path, str) or not os.path.isfile(output_path): |
| return 0 |
| metadata = read_metadata_from_video(output_path) |
| if not isinstance(metadata, dict): |
| return 0 |
| process_metadata = metadata.get(PROCESS_FULL_VIDEO_METADATA_KEY) |
| if not isinstance(process_metadata, dict): |
| return 0 |
| try: |
| return max(0, int(process_metadata.get("written_unique_frames") or 0)) |
| except (TypeError, ValueError): |
| return 0 |
|
|
|
|
| def normalize_identity_path(path_value: str) -> str: |
| text = str(path_value or "").strip() |
| if len(text) == 0: |
| return "" |
| try: |
| return str(Path(text).resolve()).casefold() |
| except (OSError, RuntimeError, ValueError): |
| return text.casefold() |
|
|
|
|
| def read_output_identity(output_path: str) -> tuple[str, str, str] | None: |
| if not isinstance(output_path, str) or not os.path.isfile(output_path): |
| return None |
| metadata = read_metadata_from_video(output_path) |
| if not isinstance(metadata, dict) or len(metadata) == 0: |
| return None |
| process_metadata = metadata.get(PROCESS_FULL_VIDEO_METADATA_KEY) |
| process_metadata = process_metadata if isinstance(process_metadata, dict) else {} |
| process_name = str(process_metadata.get("process") or "").strip() |
| if len(process_name) == 0: |
| segments = metadata.get("segments") |
| first_segment = next((segment for segment in list(segments or []) if isinstance(segment, dict)), None) |
| if first_segment is not None: |
| process_name = str(first_segment.get("process") or "").strip() |
| source_video = str(process_metadata.get("source_video") or metadata.get("source_video") or "").strip() |
| source_segment = str(process_metadata.get("source_segment") or metadata.get("source_segment") or "").strip() |
| return process_name, source_video, source_segment |
|
|
|
|
| def get_output_identity_mismatch_message(output_path: str, *, process_name: str, source_path: str, source_segment: str) -> str | None: |
| if not isinstance(output_path, str) or not os.path.isfile(output_path): |
| return None |
| identity = read_output_identity(output_path) |
| if identity is None: |
| return f"Output file already exists at {output_path}, but it does not contain readable WanGP metadata. Processing was stopped." |
| existing_process, existing_source_video, existing_source_segment = identity |
| mismatches: list[str] = [] |
| if str(existing_process or "").strip() != str(process_name or "").strip(): |
| mismatches.append("process") |
| if normalize_identity_path(existing_source_video) != normalize_identity_path(source_path): |
| mismatches.append("source_video") |
| if str(existing_source_segment or "").strip() != str(source_segment or "").strip(): |
| mismatches.append("source_segment") |
| if len(mismatches) == 0: |
| return None |
| mismatch_text = ", ".join(mismatches) |
| return f"Output file already exists at {output_path}, but its metadata does not match the current {mismatch_text}. Processing was stopped." |
|
|
|
|
| def log_existing_output_metadata(output_path: str, verbose_level: int) -> None: |
| if int(verbose_level or 0) < 2 or not os.path.isfile(output_path): |
| return |
| metadata = read_metadata_from_video(output_path) |
| if not isinstance(metadata, dict) or len(metadata) == 0: |
| print(f"[Process Full Video] Existing output metadata not found in {output_path}") |
| return |
| creation_date = str(metadata.get("creation_date") or "unknown") |
| generation_time = metadata.get("generation_time") |
| generation_time_text = "unknown" if generation_time in (None, "") else str(generation_time) |
| print(f"[Process Full Video] Existing output metadata found: creation_date={creation_date}, generation_time={generation_time_text}") |
|
|
|
|
| def store_output_metadata(output_path: str, last_segment_path: str | None, *, source_path: str, process_name: str, source_start_seconds: float, start_frame: int, fps_float: float, selected_audio_track: int | None, total_generation_time: float, actual_frame_count: int, process_metadata: dict | None = None, verbose_level: int = 0) -> bool: |
| if not os.path.isfile(output_path): |
| return False |
| metadata = {} |
| if last_segment_path and os.path.isfile(last_segment_path): |
| loaded_metadata = read_metadata_from_video(last_segment_path) |
| if isinstance(loaded_metadata, dict) and len(loaded_metadata) > 0: |
| metadata = loaded_metadata |
| elif Path(last_segment_path).resolve() != Path(output_path).resolve(): |
| print(f"[Process Full Video] Warning: failed to read WanGP metadata from {last_segment_path}") |
| elif last_segment_path: |
| print(f"[Process Full Video] Warning: no segment metadata source was available for {output_path}") |
| final_metadata = metadata.copy() |
| source_name = os.path.basename(source_path) |
| end_frame = max(int(start_frame), int(start_frame) + max(0, int(actual_frame_count)) - 1) |
| start_seconds = max(0.0, float(source_start_seconds or 0.0)) |
| end_seconds = start_seconds + max(0, int(actual_frame_count)) / float(fps_float) |
| final_metadata["video_guide"] = build_virtual_media_path(source_path, start_frame=start_frame, end_frame=end_frame, audio_track_no=selected_audio_track) |
| final_metadata["video_length"] = int(actual_frame_count) |
| final_metadata["frame_count"] = int(actual_frame_count) |
| final_metadata["generation_time"] = max(0.0, float(total_generation_time)) |
| operation_comment = f'{PLUGIN_NAME}: {process_name} on "{source_name}" Start { format_time_hms(start_seconds) } End { format_time_hms(end_seconds) }' |
| existing_comments = str(final_metadata.get("comments") or "").strip() |
| final_metadata["comments"] = operation_comment if len(existing_comments) == 0 else f"{existing_comments}\n{operation_comment}" |
| final_metadata["segments"] = [{ |
| "plugin": PLUGIN_NAME, |
| "process": process_name, |
| "source_file": source_name, |
| "start_frame": int(start_frame), |
| "end_frame": end_frame, |
| "start_seconds": start_seconds, |
| "end_seconds": end_seconds, |
| }] |
| final_metadata[PROCESS_FULL_VIDEO_METADATA_KEY] = process_metadata.copy() if isinstance(process_metadata, dict) else {} |
| for key in ("plugin", "process", "source_video", "source_segment", "video_source"): |
| final_metadata.pop(key, None) |
| final_metadata["creation_date"] = datetime.now().isoformat(timespec="seconds") |
| final_metadata["creation_timestamp"] = int(time.time()) |
| if save_video_metadata(output_path, final_metadata, allow_inplace_update=True, verbose_level=verbose_level): |
| return True |
| print(f"[Process Full Video] Warning: failed to write metadata to {output_path}") |
| return False |
|
|
|
|
| def read_metadata_generation_time(video_path: str | None) -> float: |
| if not video_path or not os.path.isfile(video_path): |
| return 0.0 |
| metadata = read_metadata_from_video(video_path) |
| if not isinstance(metadata, dict): |
| return 0.0 |
| try: |
| return max(0.0, float(metadata.get("generation_time") or 0.0)) |
| except (TypeError, ValueError): |
| return 0.0 |
|
|