import json import os import time from contextlib import nullcontext from datetime import datetime from typing import Any, Callable from shared.utils.audio_metadata import save_audio_metadata from shared.utils.audio_video import save_image_metadata from shared.utils.video_metadata import save_video_metadata def _ensure_creation_metadata(configs: Any) -> Any: if not isinstance(configs, dict): return configs saved_configs = configs.copy() if "creation_date" in saved_configs and "creation_timestamp" in saved_configs: return saved_configs recorded_time = time.time() if "creation_date" not in saved_configs: saved_configs["creation_date"] = datetime.fromtimestamp(recorded_time).isoformat(timespec="seconds") if "creation_timestamp" not in saved_configs: saved_configs["creation_timestamp"] = int(recorded_time) return saved_configs def record_file_metadata(video_path: str | list[str], configs: Any, is_image: bool, audio_only: bool, gen: dict[str, Any], *, get_processed_queue: Callable[[dict[str, Any]], tuple[list[Any], list[Any], list[Any], list[Any]]], metadata_choice: str = "metadata", embedded_images: Any = None, replace_last_file: bool = False, lock: Any = None, verbose_level: int = 0) -> None: file_list, file_settings_list, audio_file_list, audio_file_settings_list = get_processed_queue(gen) paths = [video_path] if not isinstance(video_path, list) else video_path queue_lock = lock if lock is not None else nullcontext() for no, path in enumerate(paths): previous_path = None saved_configs = _ensure_creation_metadata(configs) if configs is not None: if metadata_choice == "json": with open(os.path.splitext(path)[0] + ".json", "w") as f: json.dump(saved_configs, f, indent=4) elif metadata_choice == "metadata": if audio_only: save_audio_metadata(path, saved_configs) elif is_image: save_image_metadata(path, saved_configs) else: save_video_metadata(path, saved_configs, embedded_images, allow_inplace_update=True, verbose_level=verbose_level) if verbose_level > 0: if audio_only: print(f"New audio file saved to Path: {path}") elif is_image: print(f"New image saved to Path: {path}") else: print(f"New video saved to Path: {path}") with queue_lock: if audio_only: audio_file_list.append(path) audio_file_settings_list.append(saved_configs) else: if replace_last_file and not is_image and no == 0 and len(file_list) > 0: previous_path = file_list[-1] file_list[-1] = path file_settings_list[-1] = saved_configs else: file_list.append(path) file_settings_list.append(saved_configs) gen["last_was_audio"] = audio_only if previous_path is not None and previous_path != path: if metadata_choice == "json": previous_json_path = os.path.splitext(previous_path)[0] + ".json" if os.path.isfile(previous_json_path): os.remove(previous_json_path) if os.path.isfile(previous_path): os.remove(previous_path)