Spaces:
Paused
Paused
| """ | |
| Metadata utilities for FramePack Studio. | |
| This module provides functions for generating and saving metadata. | |
| """ | |
| import os | |
| import json | |
| import time | |
| import traceback # Moved to top | |
| import numpy as np # Added | |
| from PIL import Image, ImageDraw, ImageFont | |
| from PIL.PngImagePlugin import PngInfo | |
| from modules.version import APP_VERSION | |
| def get_placeholder_color(model_type): | |
| """ | |
| Get the placeholder image color for a specific model type. | |
| Args: | |
| model_type: The model type string | |
| Returns: | |
| RGB tuple for the placeholder image color | |
| """ | |
| # Define color mapping for different model types | |
| color_map = { | |
| "Original": (0, 0, 0), # Black | |
| "F1": (0, 0, 128), # Blue | |
| "Video": (0, 128, 0), # Green | |
| "XY Plot": (128, 128, 0), # Yellow | |
| "F1 with Endframe": (0, 128, 128), # Teal | |
| "Original with Endframe": (128, 0, 128), # Purple | |
| } | |
| # Return the color for the model type, or black as default | |
| return color_map.get(model_type, (0, 0, 0)) | |
| # Function to save the starting image with comprehensive metadata | |
| def save_job_start_image(job_params, job_id, settings): | |
| """ | |
| Saves the job's starting input image to the output directory with comprehensive metadata. | |
| This is intended to be called early in the job processing and is the ONLY place metadata should be saved. | |
| """ | |
| # Get output directory from settings or job_params | |
| output_dir_path = job_params.get("output_dir") or settings.get("output_dir") | |
| metadata_dir_path = job_params.get("metadata_dir") or settings.get("metadata_dir") | |
| if not output_dir_path: | |
| print(f"[JOB_START_IMG_ERROR] No output directory found in job_params or settings") | |
| return False | |
| # Ensure directories exist | |
| os.makedirs(output_dir_path, exist_ok=True) | |
| os.makedirs(metadata_dir_path, exist_ok=True) | |
| actual_start_image_target_path = os.path.join(output_dir_path, f'{job_id}.png') | |
| actual_input_image_np = job_params.get('input_image') | |
| # Create comprehensive metadata dictionary | |
| metadata_dict = create_metadata(job_params, job_id, settings) | |
| # Save JSON metadata with the same job_id | |
| json_metadata_path = os.path.join(metadata_dir_path, f'{job_id}.json') | |
| try: | |
| with open(json_metadata_path, 'w') as f: | |
| import json | |
| json.dump(metadata_dict, f, indent=2) | |
| except Exception as e: | |
| traceback.print_exc() | |
| # Save the input image if it's a numpy array | |
| if actual_input_image_np is not None and isinstance(actual_input_image_np, np.ndarray): | |
| try: | |
| # Create PNG metadata | |
| png_metadata = PngInfo() | |
| png_metadata.add_text("prompt", job_params.get('prompt_text', '')) | |
| png_metadata.add_text("seed", str(job_params.get('seed', 0))) | |
| png_metadata.add_text("model_type", job_params.get('model_type', "Unknown")) | |
| # Add more metadata fields | |
| for key, value in metadata_dict.items(): | |
| if isinstance(value, (str, int, float, bool)) or value is None: | |
| png_metadata.add_text(key, str(value)) | |
| # Convert image if needed | |
| image_to_save_np = actual_input_image_np | |
| if actual_input_image_np.dtype != np.uint8: | |
| if actual_input_image_np.max() <= 1.0 and actual_input_image_np.min() >= -1.0 and actual_input_image_np.dtype in [np.float32, np.float64]: | |
| image_to_save_np = ((actual_input_image_np + 1.0) / 2.0 * 255.0).clip(0, 255).astype(np.uint8) | |
| elif actual_input_image_np.max() <= 1.0 and actual_input_image_np.min() >= 0.0 and actual_input_image_np.dtype in [np.float32, np.float64]: | |
| image_to_save_np = (actual_input_image_np * 255.0).clip(0,255).astype(np.uint8) | |
| else: | |
| image_to_save_np = actual_input_image_np.clip(0, 255).astype(np.uint8) | |
| # Save the image with metadata | |
| start_image_pil = Image.fromarray(image_to_save_np) | |
| start_image_pil.save(actual_start_image_target_path, pnginfo=png_metadata) | |
| return True # Indicate success | |
| except Exception as e: | |
| traceback.print_exc() | |
| return False # Indicate failure or inability to save | |
| def create_metadata(job_params, job_id, settings, save_placeholder=False): | |
| """ | |
| Create metadata for the job. | |
| Args: | |
| job_params: Dictionary of job parameters | |
| job_id: The job ID | |
| settings: Dictionary of settings | |
| save_placeholder: Whether to save the placeholder image (default: False) | |
| Returns: | |
| Metadata dictionary | |
| """ | |
| if not settings.get("save_metadata"): | |
| return None | |
| metadata_dir_path = settings.get("metadata_dir") | |
| output_dir_path = settings.get("output_dir") | |
| os.makedirs(metadata_dir_path, exist_ok=True) | |
| os.makedirs(output_dir_path, exist_ok=True) # Ensure output_dir also exists | |
| # Get model type and determine placeholder image color | |
| model_type = job_params.get('model_type', "Original") | |
| placeholder_color = get_placeholder_color(model_type) | |
| # Create a placeholder image | |
| height = job_params.get('height', 640) | |
| width = job_params.get('width', 640) | |
| # Use resolutionH and resolutionW if height and width are not available | |
| if not height: | |
| height = job_params.get('resolutionH', 640) | |
| if not width: | |
| width = job_params.get('resolutionW', 640) | |
| placeholder_img = Image.new('RGB', (width, height), placeholder_color) | |
| # Add XY plot parameters to the image if applicable | |
| if model_type == "XY Plot": | |
| x_param = job_params.get('x_param', '') | |
| y_param = job_params.get('y_param', '') | |
| x_values = job_params.get('x_values', []) | |
| y_values = job_params.get('y_values', []) | |
| draw = ImageDraw.Draw(placeholder_img) | |
| try: | |
| # Try to use a system font | |
| font = ImageFont.truetype("Arial", 20) | |
| except: | |
| # Fall back to default font | |
| font = ImageFont.load_default() | |
| text = f"X: {x_param} - {x_values}\nY: {y_param} - {y_values}" | |
| draw.text((10, 10), text, fill=(255, 255, 255), font=font) | |
| # Create PNG metadata | |
| metadata = PngInfo() | |
| metadata.add_text("prompt", job_params.get('prompt_text', '')) | |
| metadata.add_text("seed", str(job_params.get('seed', 0))) | |
| # Add model-specific metadata to PNG | |
| if model_type == "XY Plot": | |
| metadata.add_text("x_param", job_params.get('x_param', '')) | |
| metadata.add_text("y_param", job_params.get('y_param', '')) | |
| # Determine end_frame_used value safely (avoiding NumPy array boolean ambiguity) | |
| end_frame_image = job_params.get('end_frame_image') | |
| end_frame_used = False | |
| if end_frame_image is not None: | |
| if isinstance(end_frame_image, np.ndarray): | |
| end_frame_used = end_frame_image.any() # True if any element is non-zero | |
| else: | |
| end_frame_used = True | |
| # Create comprehensive JSON metadata with all possible parameters | |
| # This is created before file saving logic that might use it (e.g. JSON dump) | |
| # but PngInfo 'metadata' is used for images. | |
| metadata_dict = { | |
| # Version information | |
| "app_version": APP_VERSION, # Using numeric version without 'v' prefix for metadata | |
| # Common parameters | |
| "prompt": job_params.get('prompt_text', ''), | |
| "negative_prompt": job_params.get('n_prompt', ''), | |
| "seed": job_params.get('seed', 0), | |
| "steps": job_params.get('steps', 25), | |
| "cfg": job_params.get('cfg', 1.0), | |
| "gs": job_params.get('gs', 10.0), | |
| "rs": job_params.get('rs', 0.0), | |
| "latent_type": job_params.get('latent_type', 'Black'), | |
| "timestamp": time.time(), | |
| "resolutionW": job_params.get('resolutionW', 640), | |
| "resolutionH": job_params.get('resolutionH', 640), | |
| "model_type": model_type, | |
| "generation_type": job_params.get('generation_type', model_type), | |
| "has_input_image": job_params.get('has_input_image', False), | |
| "input_image_path": job_params.get('input_image_path', None), | |
| # Video-related parameters | |
| "total_second_length": job_params.get('total_second_length', 6), | |
| "blend_sections": job_params.get('blend_sections', 4), | |
| "latent_window_size": job_params.get('latent_window_size', 9), | |
| "num_cleaned_frames": job_params.get('num_cleaned_frames', 5), | |
| # Endframe-related parameters | |
| "end_frame_strength": job_params.get('end_frame_strength', None), | |
| "end_frame_image_path": job_params.get('end_frame_image_path', None), | |
| "end_frame_used": str(end_frame_used), | |
| # Video input-related parameters | |
| "input_video": os.path.basename(job_params.get('input_image', '')) if job_params.get('input_image') is not None and model_type == "Video" else None, | |
| "video_path": job_params.get('input_image') if model_type == "Video" else None, | |
| # XY Plot-related parameters | |
| "x_param": job_params.get('x_param', None), | |
| "y_param": job_params.get('y_param', None), | |
| "x_values": job_params.get('x_values', None), | |
| "y_values": job_params.get('y_values', None), | |
| # Combine with source video | |
| "combine_with_source": job_params.get('combine_with_source', False), | |
| # Tea cache parameters | |
| "use_teacache": job_params.get('use_teacache', False), | |
| "teacache_num_steps": job_params.get('teacache_num_steps', 0), | |
| "teacache_rel_l1_thresh": job_params.get('teacache_rel_l1_thresh', 0.0), | |
| # MagCache parameters | |
| "use_magcache": job_params.get('use_magcache', False), | |
| "magcache_threshold": job_params.get('magcache_threshold', 0.1), | |
| "magcache_max_consecutive_skips": job_params.get('magcache_max_consecutive_skips', 2), | |
| "magcache_retention_ratio": job_params.get('magcache_retention_ratio', 0.25), | |
| } | |
| # Add LoRA information if present | |
| selected_loras = job_params.get('selected_loras', []) | |
| lora_values = job_params.get('lora_values', []) | |
| lora_loaded_names = job_params.get('lora_loaded_names', []) | |
| if isinstance(selected_loras, list) and len(selected_loras) > 0: | |
| lora_data = {} | |
| for lora_name in selected_loras: | |
| try: | |
| idx = lora_loaded_names.index(lora_name) | |
| # Fix for NumPy array boolean ambiguity | |
| has_lora_values = lora_values is not None and len(lora_values) > 0 | |
| weight = lora_values[idx] if has_lora_values and idx < len(lora_values) else 1.0 | |
| # Handle different types of weight values | |
| if isinstance(weight, np.ndarray): | |
| # Convert NumPy array to a scalar value | |
| weight_value = float(weight.item()) if weight.size == 1 else float(weight.mean()) | |
| elif isinstance(weight, list): | |
| # Handle list type weights | |
| has_items = weight is not None and len(weight) > 0 | |
| weight_value = float(weight[0]) if has_items else 1.0 | |
| else: | |
| # Handle scalar weights | |
| weight_value = float(weight) if weight is not None else 1.0 | |
| lora_data[lora_name] = weight_value | |
| except ValueError: | |
| lora_data[lora_name] = 1.0 | |
| except Exception as e: | |
| lora_data[lora_name] = 1.0 | |
| traceback.print_exc() | |
| metadata_dict["loras"] = lora_data | |
| else: | |
| metadata_dict["loras"] = {} | |
| # This function now only creates the metadata dictionary without saving files | |
| # The actual saving is done by save_job_start_image() at the beginning of the generation process | |
| # This prevents duplicate metadata files from being created | |
| # For backward compatibility, we still create the placeholder image | |
| # and save it if explicitly requested | |
| placeholder_target_path = os.path.join(metadata_dir_path, f'{job_id}.png') | |
| # Save the placeholder image if requested | |
| if save_placeholder: | |
| try: | |
| placeholder_img.save(placeholder_target_path, pnginfo=metadata) | |
| except Exception as e: | |
| traceback.print_exc() | |
| return metadata_dict | |
| def save_last_video_frame(job_params, job_id, settings, last_frame_np): | |
| """ | |
| Saves the last frame of the input video to the output directory with metadata. | |
| """ | |
| output_dir_path = job_params.get("output_dir") or settings.get("output_dir") | |
| if not output_dir_path: | |
| print(f"[SAVE_LAST_FRAME_ERROR] No output directory found.") | |
| return False | |
| os.makedirs(output_dir_path, exist_ok=True) | |
| last_frame_path = os.path.join(output_dir_path, f'{job_id}.png') | |
| metadata_dict = create_metadata(job_params, job_id, settings) | |
| if last_frame_np is not None and isinstance(last_frame_np, np.ndarray): | |
| try: | |
| png_metadata = PngInfo() | |
| for key, value in metadata_dict.items(): | |
| if isinstance(value, (str, int, float, bool)) or value is None: | |
| png_metadata.add_text(key, str(value)) | |
| image_to_save_np = last_frame_np | |
| if last_frame_np.dtype != np.uint8: | |
| if last_frame_np.max() <= 1.0 and last_frame_np.min() >= -1.0 and last_frame_np.dtype in [np.float32, np.float64]: | |
| image_to_save_np = ((last_frame_np + 1.0) / 2.0 * 255.0).clip(0, 255).astype(np.uint8) | |
| elif last_frame_np.max() <= 1.0 and last_frame_np.min() >= 0.0 and last_frame_np.dtype in [np.float32, np.float64]: | |
| image_to_save_np = (last_frame_np * 255.0).clip(0,255).astype(np.uint8) | |
| else: | |
| image_to_save_np = last_frame_np.clip(0, 255).astype(np.uint8) | |
| last_frame_pil = Image.fromarray(image_to_save_np) | |
| last_frame_pil.save(last_frame_path, pnginfo=png_metadata) | |
| print(f"Saved last video frame for job {job_id} to {last_frame_path}") | |
| return True | |
| except Exception as e: | |
| traceback.print_exc() | |
| return False | |