Spaces:
Sleeping
Sleeping
| import io | |
| import os | |
| import gc | |
| import re | |
| import cv2 | |
| import time | |
| import zipfile | |
| import tempfile | |
| import traceback | |
| import numpy as np | |
| import gradio as gr | |
| import imgutils.detect.person as person_detector | |
| import imgutils.detect.halfbody as halfbody_detector | |
| import imgutils.detect.head as head_detector | |
| import imgutils.detect.face as face_detector | |
| import imgutils.metrics.ccip as ccip_analyzer | |
| import imgutils.metrics.dbaesthetic as dbaesthetic_analyzer | |
| import imgutils.metrics.lpips as lpips_module | |
| from PIL import Image | |
| from typing import List, Tuple, Dict, Any, Union, Optional, Iterator | |
| # --- Constants for File Types --- | |
| IMAGE_EXTENSIONS = ('.png', '.jpg', '.jpeg', '.webp', '.bmp', '.tiff', '.tif', '.gif') | |
| VIDEO_EXTENSIONS = ('.mp4', '.avi', '.mov', '.mkv', '.flv', '.webm', '.mpeg', '.mpg') | |
| # --- Helper Functions --- | |
| def sanitize_filename(filename: str, max_len: int = 50) -> str: | |
| """Removes invalid characters and shortens a filename for safe use.""" | |
| # Remove path components | |
| base_name = os.path.basename(filename) | |
| # Remove extension | |
| name_part, _ = os.path.splitext(base_name) | |
| # Replace spaces and problematic characters with underscores | |
| sanitized = re.sub(r'[\\/*?:"<>|\s]+', '_', name_part) | |
| # Remove leading/trailing underscores/periods | |
| sanitized = sanitized.strip('._') | |
| # Limit length (important for temp paths and OS limits) | |
| sanitized = sanitized[:max_len] | |
| # Ensure it's not empty after sanitization | |
| if not sanitized: | |
| return "file" | |
| return sanitized | |
| def convert_to_pil(frame: np.ndarray) -> Image.Image: | |
| """Converts an OpenCV frame (BGR) to a PIL Image (RGB).""" | |
| # Add error handling for potentially empty frames | |
| if frame is None or frame.size == 0: | |
| raise ValueError("Cannot convert empty frame to PIL Image") | |
| try: | |
| return Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) | |
| except Exception as e: | |
| # Re-raise with more context if conversion fails | |
| raise RuntimeError(f"Failed to convert frame to PIL Image: {e}") | |
| def image_to_bytes(img: Image.Image, format: str = 'PNG') -> bytes: | |
| """Converts a PIL Image to bytes.""" | |
| if img is None: | |
| raise ValueError("Cannot convert None image to bytes") | |
| byte_arr = io.BytesIO() | |
| img.save(byte_arr, format=format) | |
| return byte_arr.getvalue() | |
| def create_zip_file(image_data: Dict[str, bytes], output_path: str) -> None: | |
| """ | |
| Creates a zip file containing the provided images directly at the output_path. | |
| Args: | |
| image_data: A dictionary where keys are filenames (including paths within zip) | |
| and values are image bytes. | |
| output_path: The full path where the zip file should be created. | |
| """ | |
| if not image_data: | |
| raise ValueError("No image data provided to create zip file.") | |
| if not output_path: | |
| raise ValueError("No output path provided for the zip file.") | |
| print(f"Creating zip file at: {output_path}") | |
| try: | |
| # Ensure parent directory exists (useful if output_path is nested) | |
| # Though NamedTemporaryFile usually handles this for its own path. | |
| parent_dir = os.path.dirname(output_path) | |
| if parent_dir: # Check if there is a parent directory component | |
| os.makedirs(parent_dir, exist_ok=True) | |
| with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zipf: | |
| # Sort items for potentially better organization and predictability | |
| for filename, img_bytes in sorted(image_data.items()): | |
| zipf.writestr(filename, img_bytes) | |
| print(f"Successfully created zip file with {len(image_data)} items at {output_path}.") | |
| # No return value needed as we are writing to a path | |
| except Exception as e: | |
| print(f"Error creating zip file at {output_path}: {e}") | |
| # If zip creation fails, attempt to remove the partially created file | |
| if os.path.exists(output_path): | |
| try: | |
| os.remove(output_path) | |
| print(f"Removed partially created/failed zip file: {output_path}") | |
| except OSError as remove_err: | |
| print(f"Warning: Could not remove failed zip file {output_path}: {remove_err}") | |
| raise # Re-raise the original exception | |
| def generate_filename( | |
| base_name: str, # Should be the core identifier, e.g., "frame_X_person_Y_scoreZ" | |
| aesthetic_label: Optional[str] = None, | |
| ccip_cluster_id_for_lpips_logic: Optional[int] = None, # Original CCIP ID, used to decide if LPIPS is sub-cluster | |
| ccip_folder_naming_index: Optional[int] = None, # The new 000, 001, ... index based on image count | |
| source_prefix_for_ccip_folder: Optional[str] = None, # The source filename prefix for CCIP folder | |
| lpips_folder_naming_index: Optional[Union[int, str]] = None, # New: Can be int (0,1,2...) or "noise" | |
| file_extension: str = '.png', | |
| # Suffix flags for this specific image: | |
| is_halfbody_primary_target_type: bool = False, # If this image itself was a halfbody primary target | |
| is_derived_head_crop: bool = False, | |
| is_derived_face_crop: bool = False, | |
| ) -> str: | |
| """ | |
| Generates the final filename, incorporating aesthetic label, cluster directory, | |
| and crop indicators. CCIP and LPIPS folder names are sorted by image count. | |
| """ | |
| filename_stem = base_name | |
| # Add suffixes for derived crops. | |
| # For halfbody primary targets, the base_name should already contain "halfbody". | |
| # This flag is more for potentially adding a suffix if desired, but currently not used to add a suffix. | |
| # if is_halfbody_primary_target_type: | |
| # filename_stem += "_halfbody" # Potentially redundant if base_name good. | |
| if is_derived_head_crop: | |
| filename_stem += "_headCrop" | |
| if is_derived_face_crop: | |
| filename_stem += "_faceCrop" | |
| filename_with_extension = filename_stem + file_extension | |
| path_parts = [] | |
| # New CCIP folder naming based on source prefix and sorted index | |
| if ccip_folder_naming_index is not None and source_prefix_for_ccip_folder is not None: | |
| path_parts.append(f"{source_prefix_for_ccip_folder}_ccip_{ccip_folder_naming_index:03d}") | |
| # LPIPS folder naming based on the new sorted index or "noise" | |
| if lpips_folder_naming_index is not None: | |
| lpips_folder_name_part_str: Optional[str] = None | |
| if isinstance(lpips_folder_naming_index, str) and lpips_folder_naming_index == "noise": | |
| lpips_folder_name_part_str = "noise" | |
| elif isinstance(lpips_folder_naming_index, int): | |
| lpips_folder_name_part_str = f"{lpips_folder_naming_index:03d}" | |
| if lpips_folder_name_part_str is not None: | |
| # Determine prefix based on whether the item was originally in a CCIP cluster | |
| if ccip_cluster_id_for_lpips_logic is not None: # LPIPS is sub-cluster if item had an original CCIP ID | |
| lpips_folder_name_base = "lpips_sub_" | |
| else: # No CCIP, LPIPS is primary | |
| lpips_folder_name_base = "lpips_" | |
| path_parts.append(f"{lpips_folder_name_base}{lpips_folder_name_part_str}") | |
| final_filename_part = filename_with_extension | |
| if aesthetic_label: | |
| final_filename_part = f"{aesthetic_label}_{filename_with_extension}" | |
| if path_parts: | |
| return f"{'/'.join(path_parts)}/{final_filename_part}" | |
| else: | |
| return final_filename_part | |
| # --- Core Processing Function for a single source (video or image sequence) --- | |
| def _process_input_source_frames( | |
| source_file_prefix: str, # Sanitized name for this source (e.g., "myvideo" or "ImageGroup123") | |
| # Iterator yielding: (PIL.Image, frame_identifier_string, current_item_index, total_items_for_desc) | |
| # For videos, current_item_index is the 1-based raw frame number. | |
| # For images, current_item_index is the 1-based image number in the sequence. | |
| frames_provider: Iterator[Tuple[Image.Image, int, int, int]], | |
| is_video_source: bool, # To adjust some logging/stats messages | |
| # Person Detection | |
| enable_person_detection: bool, | |
| min_target_width_person_percentage: float, | |
| person_model_name: str, | |
| person_conf_threshold: float, | |
| person_iou_threshold: float, | |
| # Half-Body Detection | |
| enable_halfbody_detection: bool, | |
| enable_halfbody_cropping: bool, | |
| min_target_width_halfbody_percentage: float, | |
| halfbody_model_name: str, | |
| halfbody_conf_threshold: float, | |
| halfbody_iou_threshold: float, | |
| # Head Detection | |
| enable_head_detection: bool, | |
| enable_head_cropping: bool, | |
| min_crop_width_head_percentage: float, | |
| enable_head_filtering: bool, | |
| head_model_name: str, | |
| head_conf_threshold: float, | |
| head_iou_threshold: float, | |
| # Face Detection | |
| enable_face_detection: bool, | |
| enable_face_cropping: bool, | |
| min_crop_width_face_percentage: float, | |
| enable_face_filtering: bool, | |
| face_model_name: str, | |
| face_conf_threshold: float, | |
| face_iou_threshold: float, | |
| # CCIP Classification | |
| enable_ccip_classification: bool, | |
| ccip_model_name: str, | |
| ccip_threshold: float, | |
| # LPIPS Clustering | |
| enable_lpips_clustering: bool, | |
| lpips_threshold: float, | |
| # Aesthetic Analysis | |
| enable_aesthetic_analysis: bool, | |
| aesthetic_model_name: str, | |
| # Gradio Progress (specific to this source's processing) | |
| progress_updater # Function: (progress_value: float, desc: str) -> None | |
| ) -> Tuple[str | None, str]: | |
| """ | |
| Processes frames from a given source (video or image sequence) according to the specified parameters. | |
| Order: Person => Half-Body (alternative) => Face Detection => Head Detection => CCIP => Aesthetic. | |
| Returns: | |
| A tuple containing: | |
| - Path to the output zip file (or None if error). | |
| - Status message string. | |
| """ | |
| # This list will hold data for images that pass all filters, BEFORE LPIPS and final zipping | |
| images_pending_final_processing: List[Dict[str, Any]] = [] | |
| # CCIP specific data | |
| ccip_clusters_info: List[Tuple[int, np.ndarray]] = [] | |
| next_ccip_cluster_id = 0 | |
| # Stats | |
| processed_items_count = 0 | |
| total_persons_detected_raw, total_halfbodies_detected_raw = 0, 0 | |
| person_targets_processed_count, halfbody_targets_processed_count, fullframe_targets_processed_count = 0, 0, 0 | |
| total_faces_detected_on_targets, total_heads_detected_on_targets = 0, 0 | |
| # These count items added to images_pending_final_processing | |
| main_targets_pending_count, face_crops_pending_count, head_crops_pending_count = 0, 0, 0 | |
| items_filtered_by_face_count, items_filtered_by_head_count = 0, 0 | |
| ccip_applied_count, aesthetic_applied_count = 0, 0 | |
| # LPIPS stats | |
| lpips_images_subject_to_clustering, total_lpips_clusters_created, total_lpips_noise_samples = 0, 0, 0 | |
| gc_interval = 100 # items from provider | |
| start_time = time.time() | |
| # Progress update for initializing this specific video | |
| progress_updater(0, desc=f"Initializing {source_file_prefix}...") | |
| output_zip_path_temp = None | |
| output_zip_path_final = None | |
| try: | |
| # --- Main Loop for processing items from the frames_provider --- | |
| for pil_image_full_frame, frame_specific_index, current_item_index, total_items_for_desc in frames_provider: | |
| progress_value_for_updater = (current_item_index) / total_items_for_desc if total_items_for_desc > 0 else 1.0 | |
| # The description string should reflect what current_item_index means | |
| item_description = "" | |
| if is_video_source: | |
| # For video, total_items_in_source_for_description is total raw frames. | |
| # current_item_index is the raw frame index of the *sampled* frame. | |
| # We also need a counter for *sampled* frames for a "processed X of Y (sampled)" message. | |
| # processed_items_count counts sampled frames. | |
| item_description = f"Scanning frame {current_item_index}/{total_items_for_desc} (processed {processed_items_count + 1} sampled)" | |
| else: # For images | |
| item_description = f"image {current_item_index}/{total_items_for_desc}" | |
| progress_updater( | |
| min(progress_value_for_updater, 1.0), # Cap progress at 1.0 | |
| desc=f"Processing {item_description} for {source_file_prefix}" | |
| ) | |
| # processed_items_count still counts how many items are yielded by the provider | |
| # (i.e., how many sampled frames for video, or how many images for image sequence) | |
| processed_items_count += 1 | |
| try: | |
| full_frame_width = pil_image_full_frame.width # Store for percentage calculations | |
| print(f"--- Processing item ID {frame_specific_index} (Width: {full_frame_width}px) for {source_file_prefix} ---") | |
| # List to hold PIL images that are the primary subjects for this frame | |
| # Each element: {'pil': Image, 'base_name': str, 'source_type': 'person'/'halfbody'/'fullframe'} | |
| primary_targets_for_frame: List[Dict[str, Any]] = [] | |
| processed_primary_source_this_frame = False # Flag if Person or HalfBody yielded targets | |
| # --- 1. Person Detection --- | |
| if enable_person_detection and full_frame_width > 0: | |
| print(" Attempting Person Detection...") | |
| min_person_target_px_width = full_frame_width * min_target_width_person_percentage | |
| person_detections = person_detector.detect_person( | |
| pil_image_full_frame, model_name=person_model_name, | |
| conf_threshold=person_conf_threshold, iou_threshold=person_iou_threshold | |
| ) | |
| total_persons_detected_raw += len(person_detections) | |
| if person_detections: | |
| print(f" Detected {len(person_detections)} raw persons.") | |
| valid_person_targets = 0 | |
| for i, (bbox, _, score) in enumerate(person_detections): | |
| # Check width before full crop for minor optimization | |
| detected_person_width = bbox[2] - bbox[0] | |
| if detected_person_width >= min_person_target_px_width: | |
| primary_targets_for_frame.append({ | |
| 'pil': pil_image_full_frame.crop(bbox), | |
| 'base_name': f"{source_file_prefix}_item_{frame_specific_index}_person_{i}_score{int(score*100)}", | |
| 'source_type': 'person'}) | |
| person_targets_processed_count +=1 | |
| valid_person_targets +=1 | |
| else: | |
| print(f" Person {i} width {detected_person_width}px < min {min_person_target_px_width:.0f}px. Skipping.") | |
| if valid_person_targets > 0: | |
| processed_primary_source_this_frame = True | |
| print(f" Added {valid_person_targets} persons as primary targets.") | |
| # --- 2. Half-Body Detection (if Person not processed and HBD enabled) --- | |
| if not processed_primary_source_this_frame and enable_halfbody_detection and full_frame_width > 0: | |
| print(" Attempting Half-Body Detection (on full item)...") | |
| min_halfbody_target_px_width = full_frame_width * min_target_width_halfbody_percentage | |
| halfbody_detections = halfbody_detector.detect_halfbody( | |
| pil_image_full_frame, model_name=halfbody_model_name, | |
| conf_threshold=halfbody_conf_threshold, iou_threshold=halfbody_iou_threshold | |
| ) | |
| total_halfbodies_detected_raw += len(halfbody_detections) | |
| if halfbody_detections: | |
| print(f" Detected {len(halfbody_detections)} raw half-bodies.") | |
| valid_halfbody_targets = 0 | |
| for i, (bbox, _, score) in enumerate(halfbody_detections): | |
| detected_hb_width = bbox[2] - bbox[0] | |
| # Cropping must be enabled and width must be sufficient for it to be a target | |
| if enable_halfbody_cropping and detected_hb_width >= min_halfbody_target_px_width: | |
| primary_targets_for_frame.append({ | |
| 'pil': pil_image_full_frame.crop(bbox), | |
| 'base_name': f"{source_file_prefix}_item_{frame_specific_index}_halfbody_{i}_score{int(score*100)}", | |
| 'source_type': 'halfbody'}) | |
| halfbody_targets_processed_count +=1 | |
| valid_halfbody_targets +=1 | |
| elif enable_halfbody_cropping: | |
| print(f" Half-body {i} width {detected_hb_width}px < min {min_halfbody_target_px_width:.0f}px. Skipping.") | |
| if valid_halfbody_targets > 0: | |
| processed_primary_source_this_frame = True | |
| print(f" Added {valid_halfbody_targets} half-bodies as primary targets.") | |
| # --- 3. Full Frame/Image (fallback) --- | |
| if not processed_primary_source_this_frame: | |
| print(" Processing Full Item as primary target.") | |
| primary_targets_for_frame.append({ | |
| 'pil': pil_image_full_frame.copy(), | |
| 'base_name': f"{source_file_prefix}_item_{frame_specific_index}_full", | |
| 'source_type': 'fullframe'}) | |
| fullframe_targets_processed_count += 1 | |
| # --- Process each identified primary_target_for_frame --- | |
| for target_data in primary_targets_for_frame: | |
| current_pil: Image.Image = target_data['pil'] | |
| current_base_name: str = target_data['base_name'] # Base name for this main target | |
| current_source_type: str = target_data['source_type'] | |
| current_pil_width = current_pil.width # For sub-crop percentage calculations | |
| print(f" Processing target: {current_base_name} (type: {current_source_type}, width: {current_pil_width}px)") | |
| # Store PILs of successful crops from current_pil for this target | |
| keep_this_target = True | |
| item_area = current_pil_width * current_pil.height | |
| potential_face_crops_pil: List[Image.Image] = [] | |
| potential_head_crops_pil: List[Image.Image] = [] | |
| # --- A. Face Detection --- | |
| if keep_this_target and enable_face_detection and current_pil_width > 0: | |
| print(f" Detecting faces in {current_base_name}...") | |
| min_face_crop_px_width = current_pil_width * min_crop_width_face_percentage | |
| face_detections = face_detector.detect_faces( | |
| current_pil, model_name=face_model_name, | |
| conf_threshold=face_conf_threshold, iou_threshold=face_iou_threshold | |
| ) | |
| total_faces_detected_on_targets += len(face_detections) | |
| if not face_detections and enable_face_filtering: | |
| keep_this_target = False | |
| items_filtered_by_face_count += 1 | |
| print(f" FILTERING TARGET {current_base_name} (no face).") | |
| elif face_detections and enable_face_cropping: | |
| for f_idx, (f_bbox, _, _) in enumerate(face_detections): | |
| if (f_bbox[2]-f_bbox[0]) >= min_face_crop_px_width: | |
| potential_face_crops_pil.append(current_pil.crop(f_bbox)) | |
| else: | |
| print(f" Face {f_idx} too small. Skipping crop.") | |
| # --- B. Head Detection --- | |
| if keep_this_target and enable_head_detection and current_pil_width > 0: | |
| print(f" Detecting heads in {current_base_name}...") | |
| min_head_crop_px_width = current_pil_width * min_crop_width_head_percentage | |
| head_detections = head_detector.detect_heads( | |
| current_pil, model_name=head_model_name, | |
| conf_threshold=head_conf_threshold, iou_threshold=head_iou_threshold | |
| ) | |
| total_heads_detected_on_targets += len(head_detections) | |
| if not head_detections and enable_head_filtering: | |
| keep_this_target = False | |
| items_filtered_by_head_count += 1 | |
| print(f" FILTERING TARGET {current_base_name} (no head).") | |
| potential_face_crops_pil.clear() # Clear faces if head filter removed target | |
| elif head_detections and enable_head_cropping: | |
| for h_idx, (h_bbox, _, _) in enumerate(head_detections): | |
| h_w = h_bbox[2]-h_bbox[0] # h_h = h_bbox[3]-h_bbox[1] | |
| if h_w >= min_head_crop_px_width and item_area > 0: | |
| potential_head_crops_pil.append(current_pil.crop(h_bbox)) | |
| else: | |
| print(f" Head {h_idx} too small or too large relative to parent. Skipping crop.") | |
| # --- If target is filtered, clean up and skip to next target --- | |
| if not keep_this_target: | |
| print(f" Target {current_base_name} was filtered by face/head presence rules. Discarding it and its potential crops.") | |
| if current_pil is not None: | |
| del current_pil | |
| potential_face_crops_pil.clear() | |
| potential_head_crops_pil.clear() | |
| continue # To the next primary_target_for_frame | |
| # --- C. CCIP Classification (on current_pil, if it's kept) --- | |
| assigned_ccip_id = None # This is the original CCIP ID | |
| if enable_ccip_classification: | |
| print(f" Classifying {current_base_name} with CCIP...") | |
| try: | |
| feature = ccip_analyzer.ccip_extract_feature(current_pil, model=ccip_model_name) | |
| best_match_cid = None | |
| min_diff = float('inf') | |
| # Find the best potential match among existing clusters | |
| if ccip_clusters_info: # Only loop if there are clusters to compare against | |
| for cid, rep_f in ccip_clusters_info: | |
| diff = ccip_analyzer.ccip_difference(feature, rep_f, model=ccip_model_name) | |
| if diff < min_diff: | |
| min_diff = diff | |
| best_match_cid = cid | |
| # Decide whether to use the best match or create a new cluster | |
| if best_match_cid is not None and min_diff < ccip_threshold: | |
| assigned_ccip_id = best_match_cid | |
| print(f" -> Matched Cluster {assigned_ccip_id} (Diff: {min_diff:.6f} <= Threshold {ccip_threshold:.3f})") | |
| else: | |
| # No suitable match found (either no clusters existed, or the best match's diff was strictly greater than threshold) | |
| # Create a new cluster | |
| assigned_ccip_id = next_ccip_cluster_id | |
| ccip_clusters_info.append((assigned_ccip_id, feature)) | |
| if not ccip_clusters_info or len(ccip_clusters_info) == 1: | |
| print(f" -> New Cluster {assigned_ccip_id} (First item or no prior suitable clusters)") | |
| else: | |
| # MODIFIED: Log message reflecting that new cluster is formed if diff > threshold | |
| print(f" -> New Cluster {assigned_ccip_id} (Min diff to others: {min_diff:.6f} > Threshold {ccip_threshold:.3f})") | |
| next_ccip_cluster_id += 1 | |
| print(f" CCIP: Target {current_base_name} -> Original Cluster ID {assigned_ccip_id}") | |
| del feature | |
| ccip_applied_count += 1 | |
| except Exception as e_ccip: | |
| print(f" Error CCIP: {e_ccip}") | |
| # --- D. Aesthetic Analysis (on current_pil, if it's kept) --- | |
| item_aesthetic_label = None | |
| if enable_aesthetic_analysis: | |
| print(f" Analyzing {current_base_name} for aesthetics...") | |
| try: | |
| res = dbaesthetic_analyzer.anime_dbaesthetic(current_pil, model_name=aesthetic_model_name) | |
| if isinstance(res, tuple) and len(res) >= 1: | |
| item_aesthetic_label = res[0] | |
| print(f" Aesthetic: Target {current_base_name} -> {item_aesthetic_label}") | |
| aesthetic_applied_count += 1 | |
| except Exception as e_aes: | |
| print(f" Error Aesthetic: {e_aes}") | |
| add_current_pil_to_pending_list = True | |
| if current_source_type == 'fullframe': | |
| can_skip_fullframe_target = False | |
| if enable_face_detection or enable_head_detection: | |
| found_valid_sub_crop_from_enabled_detector = False | |
| if enable_face_detection and len(potential_face_crops_pil) > 0: | |
| found_valid_sub_crop_from_enabled_detector = True | |
| if not found_valid_sub_crop_from_enabled_detector and \ | |
| enable_head_detection and len(potential_head_crops_pil) > 0: | |
| found_valid_sub_crop_from_enabled_detector = True | |
| if not found_valid_sub_crop_from_enabled_detector: # No valid crops from any enabled sub-detector | |
| can_skip_fullframe_target = True # All enabled sub-detectors failed | |
| if can_skip_fullframe_target: | |
| add_current_pil_to_pending_list = False | |
| print(f" Skipping save of fullframe target '{current_base_name}' because all enabled sub-detectors (Face/Head) yielded no valid-width crops.") | |
| if add_current_pil_to_pending_list: | |
| # --- E. Save current_pil (if it passed all filters) --- | |
| # Add main target to pending list | |
| images_pending_final_processing.append({ | |
| 'pil_image': current_pil.copy(), 'base_name_for_filename': current_base_name, | |
| 'ccip_cluster_id': assigned_ccip_id, 'aesthetic_label': item_aesthetic_label, | |
| 'is_halfbody_primary_target_type': (current_source_type == 'halfbody'), | |
| 'is_derived_head_crop': False, 'is_derived_face_crop': False, | |
| 'lpips_cluster_id': None, # Will be filled by LPIPS clustering | |
| 'lpips_folder_naming_index': None # Will be filled by LPIPS renaming | |
| }) | |
| main_targets_pending_count +=1 | |
| # --- F. Save Face Crops (derived from current_pil) --- | |
| for i, fc_pil in enumerate(potential_face_crops_pil): | |
| images_pending_final_processing.append({ | |
| 'pil_image': fc_pil, 'base_name_for_filename': f"{current_base_name}_face{i}", | |
| 'ccip_cluster_id': assigned_ccip_id, 'aesthetic_label': item_aesthetic_label, | |
| 'is_halfbody_primary_target_type': False, | |
| 'is_derived_head_crop': False, 'is_derived_face_crop': True, | |
| 'lpips_cluster_id': None, | |
| 'lpips_folder_naming_index': None | |
| }) | |
| face_crops_pending_count+=1 | |
| potential_face_crops_pil.clear() | |
| # --- G. Save Head Crops (derived from current_pil) --- | |
| for i, hc_pil in enumerate(potential_head_crops_pil): | |
| images_pending_final_processing.append({ | |
| 'pil_image': hc_pil, 'base_name_for_filename': f"{current_base_name}_head{i}", | |
| 'ccip_cluster_id': assigned_ccip_id, 'aesthetic_label': item_aesthetic_label, | |
| 'is_halfbody_primary_target_type': False, | |
| 'is_derived_head_crop': True, 'is_derived_face_crop': False, | |
| 'lpips_cluster_id': None, | |
| 'lpips_folder_naming_index': None | |
| }) | |
| head_crops_pending_count+=1 | |
| potential_head_crops_pil.clear() | |
| if current_pil is not None: # Ensure current_pil exists before attempting to delete | |
| del current_pil # Clean up the PIL for this target_data | |
| primary_targets_for_frame.clear() | |
| except Exception as item_proc_err: | |
| print(f"!! Major Error processing item ID {frame_specific_index} for {source_file_prefix}: {item_proc_err}") | |
| traceback.print_exc() | |
| # Cleanup local vars for this item if error | |
| if 'primary_targets_for_frame' in locals(): | |
| primary_targets_for_frame.clear() | |
| # Also ensure current_pil from inner loop is cleaned up if error happened mid-loop | |
| if 'current_pil' in locals() and current_pil is not None: | |
| del current_pil | |
| if processed_items_count % gc_interval == 0: | |
| gc.collect() | |
| print(f" [GC triggered at {processed_items_count} items for {source_file_prefix}]") | |
| # --- End of Main Item Processing Loop --- | |
| print(f"\nRunning final GC before LPIPS/Zipping for {source_file_prefix}...") | |
| gc.collect() | |
| if not images_pending_final_processing: | |
| status_message = f"Processing for {source_file_prefix} finished, but no images were generated or passed filters for LPIPS/Zipping." | |
| print(status_message) | |
| return None, status_message | |
| # --- LPIPS Clustering Stage --- | |
| print(f"\n--- LPIPS Clustering Stage for {source_file_prefix} (Images pending: {len(images_pending_final_processing)}) ---") | |
| if enable_lpips_clustering: | |
| print(f" LPIPS Clustering enabled with threshold: {lpips_threshold}") | |
| lpips_images_subject_to_clustering = len(images_pending_final_processing) | |
| if enable_ccip_classification and next_ccip_cluster_id > 0: # CCIP was used | |
| print(" LPIPS clustering within CCIP clusters.") | |
| images_by_ccip: Dict[Optional[int], List[int]] = {} # ccip_id -> list of original indices | |
| for i, item_data in enumerate(images_pending_final_processing): | |
| ccip_id = item_data['ccip_cluster_id'] # Original CCIP ID | |
| if ccip_id not in images_by_ccip: | |
| images_by_ccip[ccip_id] = [] | |
| images_by_ccip[ccip_id].append(i) | |
| for ccip_id, indices_in_ccip_cluster in images_by_ccip.items(): | |
| pils_for_lpips_sub_cluster = [images_pending_final_processing[idx]['pil_image'] for idx in indices_in_ccip_cluster] | |
| if len(pils_for_lpips_sub_cluster) > 1: | |
| print(f" Clustering {len(pils_for_lpips_sub_cluster)} images in CCIP cluster {ccip_id}...") | |
| try: | |
| lpips_sub_ids = lpips_module.lpips_clustering(pils_for_lpips_sub_cluster, threshold=lpips_threshold) | |
| for i_sub, lpips_id in enumerate(lpips_sub_ids): | |
| original_idx = indices_in_ccip_cluster[i_sub] | |
| images_pending_final_processing[original_idx]['lpips_cluster_id'] = lpips_id | |
| except Exception as e_lpips_sub: | |
| print(f" Error LPIPS sub-cluster CCIP {ccip_id}: {e_lpips_sub}") | |
| elif len(pils_for_lpips_sub_cluster) == 1: | |
| images_pending_final_processing[indices_in_ccip_cluster[0]]['lpips_cluster_id'] = 0 # type: ignore | |
| del images_by_ccip | |
| if 'pils_for_lpips_sub_cluster' in locals(): | |
| del pils_for_lpips_sub_cluster # Ensure cleanup | |
| else: # LPIPS on all images globally | |
| print(" LPIPS clustering on all collected images.") | |
| all_pils_for_global_lpips = [item['pil_image'] for item in images_pending_final_processing] | |
| if len(all_pils_for_global_lpips) > 1: | |
| try: | |
| lpips_global_ids = lpips_module.lpips_clustering(all_pils_for_global_lpips, threshold=lpips_threshold) | |
| for i, lpips_id in enumerate(lpips_global_ids): | |
| images_pending_final_processing[i]['lpips_cluster_id'] = lpips_id | |
| except Exception as e_lpips_global: | |
| print(f" Error LPIPS global: {e_lpips_global}") | |
| elif len(all_pils_for_global_lpips) == 1: | |
| images_pending_final_processing[0]['lpips_cluster_id'] = 0 # type: ignore | |
| del all_pils_for_global_lpips | |
| # Calculate LPIPS stats | |
| all_final_lpips_ids = [item.get('lpips_cluster_id') for item in images_pending_final_processing if item.get('lpips_cluster_id') is not None] | |
| if all_final_lpips_ids: | |
| unique_lpips_clusters = set(filter(lambda x: x != -1, all_final_lpips_ids)) | |
| total_lpips_clusters_created = len(unique_lpips_clusters) | |
| total_lpips_noise_samples = sum(1 for x in all_final_lpips_ids if x == -1) | |
| else: | |
| print(" LPIPS Clustering disabled.") | |
| # --- CCIP Folder Renaming Logic --- | |
| original_ccip_id_to_new_naming_index: Dict[int, int] = {} | |
| if enable_ccip_classification: | |
| print(f" Preparing CCIP folder renaming for {source_file_prefix}...") | |
| ccip_image_counts: Dict[int, int] = {} # original_ccip_id -> count of images in it | |
| for item_data_for_count in images_pending_final_processing: | |
| original_ccip_id_val = item_data_for_count.get('ccip_cluster_id') | |
| if original_ccip_id_val is not None: | |
| ccip_image_counts[original_ccip_id_val] = ccip_image_counts.get(original_ccip_id_val, 0) + 1 | |
| if ccip_image_counts: | |
| # Sort original ccip_ids by their counts in descending order | |
| sorted_ccip_groups_by_count: List[Tuple[int, int]] = sorted( | |
| ccip_image_counts.items(), | |
| key=lambda item: item[1], # Sort by count | |
| reverse=True | |
| ) | |
| for new_idx, (original_id, count) in enumerate(sorted_ccip_groups_by_count): | |
| original_ccip_id_to_new_naming_index[original_id] = new_idx | |
| print(f" CCIP Remap for {source_file_prefix}: Original ID {original_id} (count: {count}) -> New Naming Index {new_idx:03d}") | |
| else: | |
| print(f" No CCIP-assigned images found for {source_file_prefix} to perform renaming.") | |
| # --- LPIPS Folder Renaming Logic --- | |
| if enable_lpips_clustering: | |
| print(f" Preparing LPIPS folder renaming for {source_file_prefix}...") | |
| # Initialize/Reset lpips_folder_naming_index for all items | |
| for item_data in images_pending_final_processing: | |
| item_data['lpips_folder_naming_index'] = None | |
| if enable_ccip_classification and next_ccip_cluster_id > 0: # LPIPS within CCIP | |
| print(f" LPIPS renaming within CCIP clusters for {source_file_prefix}.") | |
| items_grouped_by_original_ccip: Dict[Optional[int], List[Dict[str, Any]]] = {} | |
| for item_data in images_pending_final_processing: | |
| original_ccip_id = item_data.get('ccip_cluster_id') | |
| if original_ccip_id not in items_grouped_by_original_ccip: items_grouped_by_original_ccip[original_ccip_id] = [] | |
| items_grouped_by_original_ccip[original_ccip_id].append(item_data) | |
| for original_ccip_id, items_in_ccip in items_grouped_by_original_ccip.items(): | |
| lpips_counts_in_ccip: Dict[int, int] = {} # original_lpips_id (non-noise) -> count | |
| for item_data in items_in_ccip: | |
| lpips_id = item_data.get('lpips_cluster_id') | |
| if lpips_id is not None and lpips_id != -1: | |
| lpips_counts_in_ccip[lpips_id] = lpips_counts_in_ccip.get(lpips_id, 0) + 1 | |
| lpips_id_to_naming_in_ccip: Dict[int, Union[int, str]] = {} | |
| if lpips_counts_in_ccip: | |
| sorted_lpips = sorted(lpips_counts_in_ccip.items(), key=lambda x: x[1], reverse=True) | |
| for new_idx, (lpips_id, count) in enumerate(sorted_lpips): | |
| lpips_id_to_naming_in_ccip[lpips_id] = new_idx | |
| ccip_disp = f"OrigCCIP-{original_ccip_id}" if original_ccip_id is not None else "NoCCIP" | |
| print(f" LPIPS Remap in {ccip_disp}: OrigLPIPS ID {lpips_id} (count: {count}) -> New Naming Index {new_idx:03d}") | |
| for item_data in items_in_ccip: | |
| lpips_id = item_data.get('lpips_cluster_id') | |
| if lpips_id is not None: | |
| if lpips_id == -1: item_data['lpips_folder_naming_index'] = "noise" | |
| elif lpips_id in lpips_id_to_naming_in_ccip: | |
| item_data['lpips_folder_naming_index'] = lpips_id_to_naming_in_ccip[lpips_id] | |
| del items_grouped_by_original_ccip | |
| else: # Global LPIPS | |
| print(f" Global LPIPS renaming for {source_file_prefix}.") | |
| global_lpips_counts: Dict[int, int] = {} | |
| for item_data in images_pending_final_processing: | |
| lpips_id = item_data.get('lpips_cluster_id') | |
| if lpips_id is not None and lpips_id != -1: | |
| global_lpips_counts[lpips_id] = global_lpips_counts.get(lpips_id, 0) + 1 | |
| global_lpips_id_to_naming: Dict[int, Union[int, str]] = {} | |
| if global_lpips_counts: | |
| sorted_global_lpips = sorted(global_lpips_counts.items(), key=lambda x: x[1], reverse=True) | |
| for new_idx, (lpips_id, count) in enumerate(sorted_global_lpips): | |
| global_lpips_id_to_naming[lpips_id] = new_idx | |
| print(f" Global LPIPS Remap: OrigLPIPS ID {lpips_id} (count: {count}) -> New Naming Index {new_idx:03d}") | |
| for item_data in images_pending_final_processing: | |
| lpips_id = item_data.get('lpips_cluster_id') | |
| if lpips_id is not None: | |
| if lpips_id == -1: item_data['lpips_folder_naming_index'] = "noise" | |
| elif lpips_id in global_lpips_id_to_naming: | |
| item_data['lpips_folder_naming_index'] = global_lpips_id_to_naming[lpips_id] | |
| gc.collect() | |
| # --- Final Zipping Stage --- | |
| images_to_zip: Dict[str, bytes] = {} | |
| print(f"\n--- Final Zipping Stage for {source_file_prefix} ({len(images_pending_final_processing)} items) ---") | |
| for item_data in images_pending_final_processing: | |
| original_ccip_id_for_item = item_data.get('ccip_cluster_id') | |
| current_ccip_naming_idx_for_folder: Optional[int] = None | |
| if enable_ccip_classification and original_ccip_id_for_item is not None and \ | |
| original_ccip_id_for_item in original_ccip_id_to_new_naming_index: | |
| current_ccip_naming_idx_for_folder = original_ccip_id_to_new_naming_index[original_ccip_id_for_item] | |
| current_lpips_naming_idx_for_folder = item_data.get('lpips_folder_naming_index') | |
| final_filename = generate_filename( | |
| base_name=item_data['base_name_for_filename'], | |
| aesthetic_label=item_data.get('aesthetic_label'), | |
| ccip_cluster_id_for_lpips_logic=original_ccip_id_for_item, | |
| ccip_folder_naming_index=current_ccip_naming_idx_for_folder, | |
| source_prefix_for_ccip_folder=source_file_prefix if current_ccip_naming_idx_for_folder is not None else None, | |
| lpips_folder_naming_index=current_lpips_naming_idx_for_folder, | |
| is_halfbody_primary_target_type=item_data['is_halfbody_primary_target_type'], | |
| is_derived_head_crop=item_data['is_derived_head_crop'], | |
| is_derived_face_crop=item_data['is_derived_face_crop'] | |
| ) | |
| try: | |
| images_to_zip[final_filename] = image_to_bytes(item_data['pil_image']) | |
| except Exception as e_bytes: | |
| print(f" Error converting/adding {final_filename} to zip: {e_bytes}") | |
| finally: | |
| if 'pil_image' in item_data and item_data['pil_image'] is not None: | |
| del item_data['pil_image'] | |
| images_pending_final_processing.clear() | |
| if not images_to_zip: | |
| status_message = f"Processing for {source_file_prefix} finished, but no images were converted for zipping." | |
| print(status_message) | |
| return None, status_message | |
| print(f"Preparing zip file for {source_file_prefix} with {len(images_to_zip)} images...") | |
| progress_updater(1.0, desc=f"Creating Zip File for {source_file_prefix}...") | |
| zip_start_time = time.time() | |
| # Use NamedTemporaryFile with delete=False for the final output path | |
| # This file will persist until manually cleaned or OS cleanup | |
| temp_zip_file = tempfile.NamedTemporaryFile(delete=False, suffix=".zip") | |
| output_zip_path_temp = temp_zip_file.name | |
| temp_zip_file.close() # Close the handle, but file remains | |
| try: | |
| # Write data to the temporary file path | |
| create_zip_file(images_to_zip, output_zip_path_temp) | |
| zip_duration = time.time() - zip_start_time | |
| print(f"Temporary zip file for {source_file_prefix} created in {zip_duration:.2f} seconds at {output_zip_path_temp}") | |
| # Construct the new, desired filename | |
| temp_dir = os.path.dirname(output_zip_path_temp) | |
| timestamp = int(time.time()) | |
| desired_filename = f"{source_file_prefix}_processed_{timestamp}.zip" | |
| output_zip_path_final = os.path.join(temp_dir, desired_filename) | |
| # Rename the temporary file to the desired name | |
| print(f"Renaming temp file for {source_file_prefix} to: {output_zip_path_final}") | |
| os.rename(output_zip_path_temp, output_zip_path_final) | |
| print("Rename successful.") | |
| output_zip_path_temp = None # Clear temp path as it's been renamed | |
| except Exception as zip_or_rename_err: | |
| print(f"Error during zip creation or renaming for {source_file_prefix}: {zip_or_rename_err}") | |
| # Clean up the *original* temp file if it still exists and renaming failed | |
| if output_zip_path_temp and os.path.exists(output_zip_path_temp): | |
| try: | |
| os.remove(output_zip_path_temp) | |
| except OSError: | |
| pass | |
| if output_zip_path_final and os.path.exists(output_zip_path_final): # Check if rename partially happened | |
| try: | |
| os.remove(output_zip_path_final) | |
| except OSError: | |
| pass | |
| raise zip_or_rename_err # Re-raise the error | |
| # --- Prepare Status Message --- | |
| processing_duration = time.time() - start_time - zip_duration # Exclude zipping time from processing time | |
| total_duration = time.time() - start_time # Includes zipping/renaming | |
| # --- Build final status message --- | |
| person_stats = "N/A" | |
| if enable_person_detection: | |
| person_stats = f"{total_persons_detected_raw} raw, {person_targets_processed_count} targets (>{min_target_width_person_percentage*100:.1f}% itemW)" | |
| halfbody_stats = "N/A" | |
| if enable_halfbody_detection: | |
| halfbody_stats = f"{total_halfbodies_detected_raw} raw, {halfbody_targets_processed_count} targets (>{min_target_width_halfbody_percentage*100:.1f}% itemW)" | |
| fullframe_stats = f"{fullframe_targets_processed_count} targets" | |
| face_stats = "N/A" | |
| if enable_face_detection: | |
| face_stats = f"{total_faces_detected_on_targets} on targets, {face_crops_pending_count} crops pending (>{min_crop_width_face_percentage*100:.1f}% parentW)" | |
| if enable_face_filtering: | |
| face_stats += f", {items_filtered_by_face_count} targets filtered" | |
| head_stats = "N/A" | |
| if enable_head_detection: | |
| head_stats = f"{total_heads_detected_on_targets} on targets, {head_crops_pending_count} crops pending (>{min_crop_width_head_percentage*100:.1f}% parentW)" | |
| if enable_head_filtering: | |
| head_stats += f", {items_filtered_by_head_count} targets filtered" | |
| ccip_stats = "N/A" | |
| if enable_ccip_classification: | |
| ccip_stats = f"{next_ccip_cluster_id} original clusters created, on {ccip_applied_count} targets. Folders renamed by image count." | |
| lpips_stats = "N/A" | |
| if enable_lpips_clustering: | |
| lpips_stats = f"{lpips_images_subject_to_clustering} images processed, {total_lpips_clusters_created} clusters, {total_lpips_noise_samples} noise. Folders renamed by image count." | |
| aesthetic_stats = "N/A" | |
| if enable_aesthetic_analysis: | |
| aesthetic_stats = f"On {aesthetic_applied_count} targets" | |
| item_desc_for_stats = "Items from Provider" if not is_video_source else "Sampled Frames" | |
| status_message = ( | |
| f"Processing for '{source_file_prefix}' Complete!\n" | |
| f"Total time: {total_duration:.2f}s (Proc: {processing_duration:.2f}s, Zip: {zip_duration:.2f}s)\n" | |
| f"{item_desc_for_stats}: {total_items_for_desc}, Processed Items: {processed_items_count}\n" | |
| f"--- Primary Targets Processed ---\n" | |
| f" Person Detection: {person_stats}\n" | |
| f" Half-Body Detection: {halfbody_stats}\n" | |
| f" Full Item Processing: {fullframe_stats}\n" | |
| f"--- Items Pending Final Processing ({main_targets_pending_count} main, {face_crops_pending_count} face, {head_crops_pending_count} head) ---\n" | |
| f" Face Detection: {face_stats}\n" | |
| f" Head Detection: {head_stats}\n" | |
| f" CCIP Classification: {ccip_stats}\n" | |
| f" LPIPS Clustering: {lpips_stats}\n" | |
| f" Aesthetic Analysis: {aesthetic_stats}\n" | |
| f"Zip file contains {len(images_to_zip)} images.\n" | |
| f"Output Zip: {output_zip_path_final}" | |
| ) | |
| print(status_message) | |
| progress_updater(1.0, desc=f"Finished {source_file_prefix}!") | |
| # Return the path to the zip file | |
| return output_zip_path_final, status_message | |
| except Exception as e: | |
| print(f"!! An unhandled error occurred during processing of {source_file_prefix}: {e}") | |
| traceback.print_exc() # Print detailed traceback for debugging | |
| # Clean up main data structures | |
| images_pending_final_processing.clear() | |
| ccip_clusters_info.clear() | |
| gc.collect() | |
| # Clean up temp file if it exists on general error | |
| if output_zip_path_temp and os.path.exists(output_zip_path_temp): | |
| try: | |
| os.remove(output_zip_path_temp) | |
| except OSError: | |
| pass | |
| # Clean up final file if it exists on general error (maybe renaming succeeded but later code failed) | |
| if output_zip_path_final and os.path.exists(output_zip_path_final): | |
| try: | |
| os.remove(output_zip_path_final) | |
| except OSError: | |
| pass | |
| return None, f"An error occurred with {source_file_prefix}: {e}" | |
| # --- Main Processing Function for Input files --- | |
| def process_inputs_main( | |
| input_file_objects: List[Any], # Gradio File component gives list of tempfile._TemporaryFileWrapper | |
| sample_interval_ms: int, # Relevant for videos only | |
| # Person Detection | |
| enable_person_detection: bool, | |
| min_target_width_person_percentage: float, | |
| person_model_name: str, | |
| person_conf_threshold: float, | |
| person_iou_threshold: float, | |
| # Half-Body Detection | |
| enable_halfbody_detection: bool, | |
| enable_halfbody_cropping: bool, | |
| min_target_width_halfbody_percentage: float, | |
| halfbody_model_name: str, | |
| halfbody_conf_threshold: float, | |
| halfbody_iou_threshold: float, | |
| # Head Detection | |
| enable_head_detection: bool, | |
| enable_head_cropping: bool, | |
| min_crop_width_head_percentage: float, | |
| enable_head_filtering: bool, | |
| head_model_name: str, | |
| head_conf_threshold: float, | |
| head_iou_threshold: float, | |
| # Face Detection | |
| enable_face_detection: bool, | |
| enable_face_cropping: bool, | |
| min_crop_width_face_percentage: float, | |
| enable_face_filtering: bool, | |
| face_model_name: str, | |
| face_conf_threshold: float, | |
| face_iou_threshold: float, | |
| # CCIP Classification | |
| enable_ccip_classification: bool, | |
| ccip_model_name: str, | |
| ccip_threshold: float, | |
| # LPIPS Clustering | |
| enable_lpips_clustering: bool, | |
| lpips_threshold: float, | |
| # Aesthetic Analysis | |
| enable_aesthetic_analysis: bool, | |
| aesthetic_model_name: str, | |
| progress=gr.Progress(track_tqdm=True) # Gradio progress for overall processing | |
| ) -> Tuple[Optional[List[str]], str]: # Returns list of ZIP paths and combined status | |
| if not input_file_objects: | |
| return [], "Error: No files provided." | |
| video_file_temp_objects: List[Any] = [] | |
| image_file_temp_objects: List[Any] = [] | |
| for file_obj in input_file_objects: | |
| # gr.Files returns a list of tempfile._TemporaryFileWrapper objects | |
| # We need the .name attribute to get the actual file path | |
| file_name = getattr(file_obj, 'orig_name', file_obj.name) # Use original name if available | |
| if isinstance(file_name, str): | |
| lower_file_name = file_name.lower() | |
| if any(lower_file_name.endswith(ext) for ext in VIDEO_EXTENSIONS): | |
| video_file_temp_objects.append(file_obj) | |
| elif any(lower_file_name.endswith(ext) for ext in IMAGE_EXTENSIONS): | |
| image_file_temp_objects.append(file_obj) | |
| else: | |
| print(f"Warning: File '{file_name}' has an unrecognized extension and will be skipped.") | |
| else: | |
| print(f"Warning: File object {file_obj} does not have a valid name and will be skipped.") | |
| output_zip_paths_all_sources = [] | |
| all_status_messages = [] | |
| total_processing_tasks = (1 if image_file_temp_objects else 0) + len(video_file_temp_objects) | |
| if total_processing_tasks == 0: | |
| return [], "No processable video or image files found in the input." | |
| tasks_completed_count = 0 | |
| # Print overall settings once | |
| print(f"--- Overall Batch Processing Settings ---") | |
| print(f" Number of image sequences to process: {1 if image_file_temp_objects else 0}") | |
| print(f" Number of videos to process: {len(video_file_temp_objects)}") | |
| print(f" Sample Interval (for videos): {sample_interval_ms}ms") | |
| print(f" Detection Order: Person => Half-Body (alt) => Face => Head. Then: CCIP => LPIPS => Aesthetic.") | |
| print(f" Person Detect = {enable_person_detection}" + (f" (MinW:{min_target_width_person_percentage*100:.1f}%, Mdl:{person_model_name}, Conf:{person_conf_threshold:.2f}, IoU:{person_iou_threshold:.2f})" if enable_person_detection else "")) | |
| print(f" HalfBody Detect = {enable_halfbody_detection}" + (f" (FullFrameOnly, Crop:{enable_halfbody_cropping}, MinW:{min_target_width_halfbody_percentage*100:.1f}%, Mdl:{halfbody_model_name}, Conf:{halfbody_conf_threshold:.2f}, IoU:{halfbody_iou_threshold:.2f})" if enable_halfbody_detection else "")) | |
| print(f" Face Detect = {enable_face_detection}" + (f" (Crop:{enable_face_cropping}, MinW:{min_crop_width_face_percentage*100:.1f}%, Filter:{enable_face_filtering}, Mdl:{face_model_name}, Conf:{face_conf_threshold:.2f}, IoU:{face_iou_threshold:.2f})" if enable_face_detection else "")) | |
| print(f" Head Detect = {enable_head_detection}" + (f" (Crop:{enable_head_cropping}, MinW:{min_crop_width_head_percentage*100:.1f}%, Filter:{enable_head_filtering}, Mdl:{head_model_name}, Conf:{head_conf_threshold:.2f}, IoU:{head_iou_threshold:.2f})" if enable_head_detection else "")) | |
| print(f" CCIP Classify = {enable_ccip_classification}" + (f" (Mdl:{ccip_model_name}, Thr:{ccip_threshold:.3f})" if enable_ccip_classification else "")) | |
| print(f" LPIPS Clustering = {enable_lpips_clustering}" + (f" (Thr:{lpips_threshold:.3f})" if enable_lpips_clustering else "")) | |
| print(f" Aesthetic Analyze = {enable_aesthetic_analysis}" + (f" (Mdl:{aesthetic_model_name})" if enable_aesthetic_analysis else "")) | |
| print(f"--- End of Overall Settings ---") | |
| # --- Process Image Sequence (if any) --- | |
| if image_file_temp_objects: | |
| image_group_label_base = "ImageGroup" | |
| # Attempt to use first image name for more uniqueness, fallback to timestamp | |
| try: | |
| first_image_orig_name = getattr(image_file_temp_objects[0], 'orig_name', image_file_temp_objects[0].name) | |
| image_group_label_base = sanitize_filename(first_image_orig_name, max_len=20) | |
| except: | |
| pass # Stick with "ImageGroup" | |
| image_source_file_prefix = f"{image_group_label_base}_{int(time.time())}" | |
| current_task_number = tasks_completed_count + 1 | |
| progress_description_prefix = f"Image Seq. {current_task_number}/{total_processing_tasks} ({image_source_file_prefix})" | |
| progress(tasks_completed_count / total_processing_tasks, desc=f"{progress_description_prefix}: Starting...") | |
| print(f"\n>>> Processing Image Sequence: {image_source_file_prefix} ({len(image_file_temp_objects)} images) <<<") | |
| def image_frames_provider_generator() -> Iterator[Tuple[Image.Image, int, int, int]]: | |
| num_images = len(image_file_temp_objects) | |
| for idx, img_obj in enumerate(image_file_temp_objects): | |
| try: | |
| pil_img = Image.open(img_obj.name).convert('RGB') | |
| yield pil_img, idx, idx + 1, num_images | |
| except Exception as e_load: | |
| print(f"Error loading image {getattr(img_obj, 'orig_name', img_obj.name)}: {e_load}. Skipping.") | |
| # If we skip, the total_items_in_source for _process_input_source_frames might be off | |
| # For simplicity, we'll proceed, but this could be refined to adjust total_items dynamically. | |
| # Or, pre-filter loadable images. For now, just skip. | |
| continue | |
| def image_group_progress_updater(item_progress_value: float, desc: str): | |
| overall_progress = (tasks_completed_count + item_progress_value) / total_processing_tasks | |
| progress(overall_progress, desc=f"{progress_description_prefix}: {desc}") | |
| try: | |
| zip_file_path_single, status_message_single = _process_input_source_frames( | |
| source_file_prefix=image_source_file_prefix, | |
| frames_provider=image_frames_provider_generator(), | |
| is_video_source=False, | |
| enable_person_detection=enable_person_detection, | |
| min_target_width_person_percentage=min_target_width_person_percentage, | |
| person_model_name=person_model_name, | |
| person_conf_threshold=person_conf_threshold, | |
| person_iou_threshold=person_iou_threshold, | |
| enable_halfbody_detection=enable_halfbody_detection, | |
| enable_halfbody_cropping=enable_halfbody_cropping, | |
| min_target_width_halfbody_percentage=min_target_width_halfbody_percentage, | |
| halfbody_model_name=halfbody_model_name, | |
| halfbody_conf_threshold=halfbody_conf_threshold, | |
| halfbody_iou_threshold=halfbody_iou_threshold, | |
| enable_head_detection=enable_head_detection, | |
| enable_head_cropping=enable_head_cropping, | |
| min_crop_width_head_percentage=min_crop_width_head_percentage, | |
| enable_head_filtering=enable_head_filtering, | |
| head_model_name=head_model_name, | |
| head_conf_threshold=head_conf_threshold, | |
| head_iou_threshold=head_iou_threshold, | |
| enable_face_detection=enable_face_detection, | |
| enable_face_cropping=enable_face_cropping, | |
| min_crop_width_face_percentage=min_crop_width_face_percentage, | |
| enable_face_filtering=enable_face_filtering, | |
| face_model_name=face_model_name, | |
| face_conf_threshold=face_conf_threshold, | |
| face_iou_threshold=face_iou_threshold, | |
| enable_ccip_classification=enable_ccip_classification, | |
| ccip_model_name=ccip_model_name, | |
| ccip_threshold=ccip_threshold, | |
| enable_lpips_clustering=enable_lpips_clustering, | |
| lpips_threshold=lpips_threshold, | |
| enable_aesthetic_analysis=enable_aesthetic_analysis, | |
| aesthetic_model_name=aesthetic_model_name, | |
| progress_updater=image_group_progress_updater | |
| ) | |
| if zip_file_path_single: | |
| output_zip_paths_all_sources.append(zip_file_path_single) | |
| all_status_messages.append(f"--- Image Sequence ({image_source_file_prefix}) Processing Succeeded ---\n{status_message_single}") | |
| else: | |
| all_status_messages.append(f"--- Image Sequence ({image_source_file_prefix}) Processing Failed ---\n{status_message_single}") | |
| except Exception as e_img_seq: | |
| error_msg = f"Critical error during processing of image sequence {image_source_file_prefix}: {e_img_seq}" | |
| print(error_msg) | |
| traceback.print_exc() | |
| all_status_messages.append(f"--- Image Sequence ({image_source_file_prefix}) Processing CRITICALLY FAILED ---\n{error_msg}") | |
| tasks_completed_count += 1 | |
| print(f">>> Finished attempt for Image Sequence: {image_source_file_prefix} <<<") | |
| # --- Process Video Files (if any) --- | |
| for video_idx, video_file_temp_obj in enumerate(video_file_temp_objects): | |
| video_path_temp = video_file_temp_obj.name | |
| video_original_filename = os.path.basename(getattr(video_file_temp_obj, 'orig_name', video_path_temp)) | |
| video_source_file_prefix = sanitize_filename(video_original_filename) | |
| current_task_number = tasks_completed_count + 1 | |
| progress_description_prefix = f"Video {current_task_number}/{total_processing_tasks}" | |
| print(f"\n>>> Processing Video: {video_original_filename} (Sanitized Prefix: {video_source_file_prefix}) <<<") | |
| progress(tasks_completed_count / total_processing_tasks, desc=f"{progress_description_prefix}: Starting processing...") | |
| # It yields: (PIL.Image, frame_identifier_string, current_raw_frame_index_from_video, total_items_for_desc) | |
| # The third element will be the raw frame number based on CAP_PROP_POS_FRAMES or current_pos_ms | |
| # to align progress with total_items_for_desc (raw frame count). | |
| def video_frames_provider_generator(video_path: str, interval_ms: int) -> Iterator[Tuple[Image.Image, int, int, int]]: | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| print(f"Error: Could not open video file for provider: {video_path}") | |
| return | |
| total_items_for_desc = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| if total_items_for_desc <= 0: | |
| print(f"Warning: Video {video_original_filename} reported {total_items_for_desc} frames. This might be inaccurate. Proceeding...") | |
| # If it's 0, the progress in _process_input_source_frames might behave unexpectedly. | |
| # Setting to 1 to avoid division by zero, but this means progress won't be very useful. | |
| total_items_for_desc = 1 # Fallback to prevent division by zero | |
| # processed_count_in_provider = 0 # Counts *sampled* frames, not used for progress index | |
| last_processed_ms = -float('inf') | |
| raw_frames_read_by_provider = 0 # Counts all frames read by cap.read() | |
| try: | |
| while True: | |
| # For progress, use current_pos_ms or CAP_PROP_POS_FRAMES | |
| # CAP_PROP_POS_FRAMES is a 0-based index of the next frame to be decoded/captured. | |
| current_raw_frame_index = int(cap.get(cv2.CAP_PROP_POS_FRAMES)) # Use this for progress | |
| current_pos_ms_in_provider = cap.get(cv2.CAP_PROP_POS_MSEC) | |
| # Loop break condition (more robust) | |
| if raw_frames_read_by_provider > 0 and current_pos_ms_in_provider <= last_processed_ms and interval_ms > 0 : | |
| # If interval_ms is 0 or very small, current_pos_ms might not advance much for consecutive reads. | |
| # Adding a check for raw_frames_read_by_provider against a large number or CAP_PROP_FRAME_COUNT | |
| # could be an additional safety, but CAP_PROP_FRAME_COUNT can be unreliable. | |
| # The ret_frame check is the primary exit. | |
| pass # Let ret_frame handle the actual end. This check is for stuck videos. | |
| should_process_this_frame = current_pos_ms_in_provider >= last_processed_ms + interval_ms - 1 | |
| ret_frame, frame_cv_data = cap.read() | |
| if not ret_frame: # Primary exit point for the loop | |
| break | |
| raw_frames_read_by_provider +=1 # Incremented after successful read | |
| if should_process_this_frame: | |
| try: | |
| pil_img = convert_to_pil(frame_cv_data) | |
| last_processed_ms = current_pos_ms_in_provider | |
| yield pil_img, int(current_pos_ms_in_provider), current_raw_frame_index + 1, total_items_for_desc # Yield 1-based raw frame index | |
| except Exception as e_conv: | |
| print(f"Error converting frame at {current_pos_ms_in_provider}ms (raw index {current_raw_frame_index}) for {video_original_filename}: {e_conv}. Skipping.") | |
| finally: | |
| pass | |
| finally: | |
| if cap.isOpened(): | |
| cap.release() | |
| print(f" Video capture for provider ({video_original_filename}) released.") | |
| def video_progress_updater(item_progress_value: float, desc: str): | |
| overall_progress = (tasks_completed_count + item_progress_value) / total_processing_tasks | |
| progress(overall_progress, desc=f"{progress_description_prefix}: {desc}") | |
| try: | |
| zip_file_path_single, status_message_single = _process_input_source_frames( | |
| source_file_prefix=video_source_file_prefix, | |
| frames_provider=video_frames_provider_generator(video_path_temp, sample_interval_ms), | |
| is_video_source=True, | |
| enable_person_detection=enable_person_detection, | |
| min_target_width_person_percentage=min_target_width_person_percentage, | |
| person_model_name=person_model_name, | |
| person_conf_threshold=person_conf_threshold, | |
| person_iou_threshold=person_iou_threshold, | |
| enable_halfbody_detection=enable_halfbody_detection, | |
| enable_halfbody_cropping=enable_halfbody_cropping, | |
| min_target_width_halfbody_percentage=min_target_width_halfbody_percentage, | |
| halfbody_model_name=halfbody_model_name, | |
| halfbody_conf_threshold=halfbody_conf_threshold, | |
| halfbody_iou_threshold=halfbody_iou_threshold, | |
| enable_head_detection=enable_head_detection, | |
| enable_head_cropping=enable_head_cropping, | |
| min_crop_width_head_percentage=min_crop_width_head_percentage, | |
| enable_head_filtering=enable_head_filtering, | |
| head_model_name=head_model_name, | |
| head_conf_threshold=head_conf_threshold, | |
| head_iou_threshold=head_iou_threshold, | |
| enable_face_detection=enable_face_detection, | |
| enable_face_cropping=enable_face_cropping, | |
| min_crop_width_face_percentage=min_crop_width_face_percentage, | |
| enable_face_filtering=enable_face_filtering, | |
| face_model_name=face_model_name, | |
| face_conf_threshold=face_conf_threshold, | |
| face_iou_threshold=face_iou_threshold, | |
| enable_ccip_classification=enable_ccip_classification, | |
| ccip_model_name=ccip_model_name, | |
| ccip_threshold=ccip_threshold, | |
| enable_lpips_clustering=enable_lpips_clustering, | |
| lpips_threshold=lpips_threshold, | |
| enable_aesthetic_analysis=enable_aesthetic_analysis, | |
| aesthetic_model_name=aesthetic_model_name, | |
| progress_updater=video_progress_updater | |
| ) | |
| if zip_file_path_single: | |
| output_zip_paths_all_sources.append(zip_file_path_single) | |
| all_status_messages.append(f"--- Video ({video_original_filename}) Processing Succeeded ---\n{status_message_single}") | |
| else: | |
| all_status_messages.append(f"--- Video ({video_original_filename}) Processing Failed ---\n{status_message_single}") | |
| except Exception as e_vid: | |
| # This catches errors if process_video itself raises an unhandled exception | |
| # (though process_video has its own try-except) | |
| error_msg = f"Critical error during processing of video {video_original_filename}: {e_vid}" | |
| print(error_msg) | |
| traceback.print_exc() | |
| all_status_messages.append(f"--- Video ({video_original_filename}) Processing CRITICALLY FAILED ---\n{error_msg}") | |
| tasks_completed_count += 1 | |
| print(f">>> Finished attempt for Video: {video_original_filename} <<<") | |
| # Gradio manages the lifecycle of video_path_temp (the uploaded temp file) | |
| final_summary_message = "\n\n==============================\n\n".join(all_status_messages) | |
| successful_zips_count = len(output_zip_paths_all_sources) | |
| if successful_zips_count == 0 and total_processing_tasks > 0: | |
| final_summary_message = f"ALL {total_processing_tasks} INPUT SOURCE(S) FAILED TO PRODUCE A ZIP FILE.\n\n" + final_summary_message | |
| elif total_processing_tasks > 0: | |
| final_summary_message = f"Successfully processed {successful_zips_count} out of {total_processing_tasks} input source(s).\n\n" + final_summary_message | |
| else: # Should be caught earlier by "No processable files" | |
| final_summary_message = "No inputs were processed." | |
| progress(1.0, desc="All processing attempts finished.") | |
| # gr.Files output expects a list of file paths. An empty list is fine if no files. | |
| return output_zip_paths_all_sources, final_summary_message | |
| # --- Gradio Interface Setup --- | |
| css = """ | |
| /* Default (Light Mode) Styles */ | |
| #warning { | |
| background-color: #FFCCCB; /* Light red background */ | |
| padding: 10px; | |
| border-radius: 5px; | |
| color: #A00000; /* Dark red text */ | |
| border: 1px solid #E5B8B7; /* A slightly darker border for more definition */ | |
| } | |
| /* Dark Mode Styles */ | |
| @media (prefers-color-scheme: dark) { | |
| #warning { | |
| background-color: #5C1A1A; /* Darker red background, suitable for dark mode */ | |
| color: #FFDDDD; /* Light pink text, for good contrast against the dark red background */ | |
| border: 1px solid #8B0000; /* A more prominent dark red border in dark mode */ | |
| } | |
| } | |
| #status_box { | |
| white-space: pre-wrap !important; /* Ensure status messages show newlines */ | |
| font-family: monospace; /* Optional: Use monospace for better alignment */ | |
| } | |
| """ | |
| # --- Define Model Lists --- | |
| person_models = ['person_detect_v1.3_s', 'person_detect_v1.2_s', 'person_detect_v1.1_s', 'person_detect_v1.1_m', 'person_detect_v1_m', 'person_detect_v1.1_n', 'person_detect_v0_s', 'person_detect_v0_m', 'person_detect_v0_x'] | |
| halfbody_models = ['halfbody_detect_v1.0_s', 'halfbody_detect_v1.0_n', 'halfbody_detect_v0.4_s', 'halfbody_detect_v0.3_s', 'halfbody_detect_v0.2_s'] | |
| head_models = ['head_detect_v2.0_s', 'head_detect_v2.0_m', 'head_detect_v2.0_n', 'head_detect_v2.0_x', 'head_detect_v2.0_s_yv11', 'head_detect_v2.0_m_yv11', 'head_detect_v2.0_n_yv11', 'head_detect_v2.0_x_yv11', 'head_detect_v2.0_l_yv11'] | |
| face_models = ['face_detect_v1.4_s', 'face_detect_v1.4_n', 'face_detect_v1.3_s', 'face_detect_v1.3_n', 'face_detect_v1.2_s', 'face_detect_v1.1_s', 'face_detect_v1.1_n', 'face_detect_v1_s', 'face_detect_v1_n', 'face_detect_v0_s', 'face_detect_v0_n'] | |
| ccip_models = ['ccip-caformer-24-randaug-pruned', 'ccip-caformer-6-randaug-pruned_fp32', 'ccip-caformer-5_fp32'] | |
| aesthetic_models = ['swinv2pv3_v0_448_ls0.2_x', 'swinv2pv3_v0_448_ls0.2', 'caformer_s36_v0_ls0.2'] | |
| with gr.Blocks(css=css) as demo: | |
| gr.Markdown("# Video Processor using dghs-imgutils") | |
| gr.Markdown("Upload one or more videos, or a sequence of images. Videos are processed individually, while multiple images are treated as a single sequence. Each processed source (video or image sequence) is then sequentially analyzed by [dghs-imgutils](https://github.com/deepghs/imgutils) to detect subjects, classify items, and process its content according to your settings, ultimately generating a ZIP file with the extracted images.") | |
| gr.Markdown("**Detection Flow:** " + | |
| "[Person](https://dghs-imgutils.deepghs.org/main/api_doc/detect/person.html) ⇒ " + | |
| "[Half-Body](https://dghs-imgutils.deepghs.org/main/api_doc/detect/halfbody.html) (if no person) ⇒ " + | |
| "[Face](https://dghs-imgutils.deepghs.org/main/api_doc/detect/face.html) (on target) ⇒ " + | |
| "[Head](https://dghs-imgutils.deepghs.org/main/api_doc/detect/head.html) (on target).") | |
| gr.Markdown("**Analysis Flow:** " + | |
| "[CCIP](https://dghs-imgutils.deepghs.org/main/api_doc/metrics/ccip.html) Clustering ⇒ " + | |
| "[LPIPS](https://dghs-imgutils.deepghs.org/main/api_doc/metrics/lpips.html) Clustering ⇒ " + | |
| "[Aesthetic](https://dghs-imgutils.deepghs.org/main/api_doc/metrics/dbaesthetic.html) Labeling.") | |
| gr.Markdown("**Note on CCIP Folders:** CCIP cluster folders are named `{source_prefix}_ccip_XXX`, sorted by image count (most images = `_ccip_000`).") | |
| gr.Markdown("**Note on LPIPS Folders:** LPIPS cluster folders (e.g., `lpips_XXX` or `lpips_sub_XXX`) are also sorted by image count within their scope. 'noise' folders are named explicitly.") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| # --- Input Components --- | |
| process_button = gr.Button("Process Input(s) & Generate ZIP(s)", variant="primary") | |
| input_files = gr.Files(label="Upload Videos or Image Sequences", file_types=['video', 'image'], file_count="multiple") | |
| sample_interval_ms = gr.Number(label="Sample Interval (ms, for videos)", value=1000, minimum=1, step=100) | |
| # --- Detection Options --- | |
| gr.Markdown("**Detection Options**") | |
| # --- Person Detection Block --- | |
| with gr.Accordion("Person Detection Options", open=True): | |
| enable_person_detection = gr.Checkbox(label="Enable Person Detection", value=True) | |
| with gr.Group() as person_detection_params_group: | |
| min_target_width_person_percentage_slider = gr.Slider( | |
| minimum=0.0, maximum=1.0, value=0.25, step=0.01, | |
| label="Min Target Width (% of Item Width)", | |
| info="Minimum width for a detected person to be processed (e.g., 0.25 = 25%)." | |
| ) | |
| person_model_name_dd = gr.Dropdown(person_models, label="PD Model", value=person_models[0]) | |
| person_conf_threshold = gr.Slider(0.0, 1.0, value=0.3, step=0.05, label="PD Conf") | |
| person_iou_threshold = gr.Slider(0.0, 1.0, value=0.5, step=0.05, label="PD IoU") | |
| enable_person_detection.change(fn=lambda e: gr.update(visible=e), inputs=enable_person_detection, outputs=person_detection_params_group) | |
| # --- Half-Body Detection Block --- | |
| with gr.Accordion("Half-Body Detection Options", open=True): | |
| enable_halfbody_detection = gr.Checkbox(label="Enable Half-Body Detection", value=True) | |
| with gr.Group() as halfbody_params_group: | |
| gr.Markdown("<small>_Detects half-bodies in full items if Person Detection is off/fails._</small>") | |
| enable_halfbody_cropping = gr.Checkbox(label="Use Half-Bodies as Targets", value=True) | |
| min_target_width_halfbody_percentage_slider = gr.Slider( | |
| minimum=0.0, maximum=1.0, value=0.25, step=0.01, | |
| label="Min Target Width (% of Item Width)", | |
| info="Minimum width for a detected half-body to be processed (e.g., 0.25 = 25%)." | |
| ) | |
| halfbody_model_name_dd = gr.Dropdown(halfbody_models, label="HBD Model", value=halfbody_models[0]) | |
| halfbody_conf_threshold = gr.Slider(0.0, 1.0, value=0.5, step=0.05, label="HBD Conf") | |
| halfbody_iou_threshold = gr.Slider(0.0, 1.0, value=0.7, step=0.05, label="HBD IoU") | |
| enable_halfbody_detection.change(fn=lambda e: gr.update(visible=e), inputs=enable_halfbody_detection, outputs=halfbody_params_group) | |
| # --- Face Detection Block --- | |
| with gr.Accordion("Face Detection Options", open=True): | |
| enable_face_detection = gr.Checkbox(label="Enable Face Detection", value=True) | |
| with gr.Group() as face_params_group: | |
| enable_face_filtering = gr.Checkbox(label="Filter Targets Without Detected Faces", value=True) | |
| enable_face_cropping = gr.Checkbox(label="Crop Detected Faces", value=False) | |
| min_crop_width_face_percentage_slider = gr.Slider( | |
| minimum=0.0, maximum=1.0, value=0.2, step=0.01, | |
| label="Min Crop Width (% of Parent Width)", | |
| info="Minimum width for a face crop relative to its parent image's width (e.g., 0.2 = 20%)." | |
| ) | |
| face_model_name_dd = gr.Dropdown(face_models, label="FD Model", value=face_models[0]) | |
| face_conf_threshold = gr.Slider(0.0, 1.0, value=0.25, step=0.05, label="FD Conf") | |
| face_iou_threshold = gr.Slider(0.0, 1.0, value=0.7, step=0.05, label="FD IoU") | |
| enable_face_detection.change(fn=lambda e: gr.update(visible=e), inputs=enable_face_detection, outputs=face_params_group) | |
| # --- Head Detection Block --- | |
| with gr.Accordion("Head Detection Options", open=True): | |
| enable_head_detection = gr.Checkbox(label="Enable Head Detection", value=True) | |
| with gr.Group() as head_params_group: | |
| gr.Markdown("<small>_Detects heads in targets. Crops if meets width req._</small>") | |
| enable_head_filtering = gr.Checkbox(label="Filter Targets Without Heads", value=True) | |
| enable_head_cropping = gr.Checkbox(label="Crop Detected Heads", value=False) | |
| min_crop_width_head_percentage_slider = gr.Slider( | |
| minimum=0.0, maximum=1.0, value=0.2, step=0.01, | |
| label="Min Crop Width (% of Parent Width)", | |
| info="Minimum width for a head crop relative to its parent image's width (e.g., 0.2 = 20%)." | |
| ) | |
| head_model_name_dd = gr.Dropdown(head_models, label="HD Model", value=head_models[0]) | |
| head_conf_threshold = gr.Slider(0.0, 1.0, value=0.4, step=0.05, label="HD Conf") | |
| head_iou_threshold = gr.Slider(0.0, 1.0, value=0.7, step=0.05, label="HD IoU") | |
| enable_head_detection.change(fn=lambda e: gr.update(visible=e), inputs=enable_head_detection, outputs=head_params_group) | |
| # --- Analysis/Classification Options --- | |
| gr.Markdown("**Analysis & Classification**") | |
| # --- CCIP Classification Block --- | |
| with gr.Accordion("CCIP Classification Options", open=True): | |
| enable_ccip_classification = gr.Checkbox(label="Enable CCIP Classification", value=True) | |
| with gr.Group() as ccip_params_group: | |
| gr.Markdown("<small>_Clusters results by similarity. Folders sorted by image count._</small>") | |
| ccip_model_name_dd = gr.Dropdown(ccip_models, label="CCIP Model", value=ccip_models[0]) | |
| ccip_threshold_slider = gr.Slider(0.0, 1.0, step=0.01, value=0.20, label="CCIP Similarity Threshold") | |
| enable_ccip_classification.change(fn=lambda e: gr.update(visible=e), inputs=enable_ccip_classification, outputs=ccip_params_group) | |
| # LPIPS Clustering Options | |
| with gr.Accordion("LPIPS Clustering Options", open=True): | |
| enable_lpips_clustering = gr.Checkbox(label="Enable LPIPS Clustering", value=True) | |
| with gr.Group() as lpips_params_group: | |
| gr.Markdown("<small>_Clusters images by LPIPS similarity. Applied after CCIP (if enabled) or globally. Folders sorted by image count._</small>") | |
| lpips_threshold_slider = gr.Slider(0.0, 1.0, step=0.01, value=0.45, label="LPIPS Similarity Threshold") | |
| enable_lpips_clustering.change(fn=lambda e: gr.update(visible=e), inputs=enable_lpips_clustering, outputs=lpips_params_group) | |
| # --- Aesthetic Analysis Block --- | |
| with gr.Accordion("Aesthetic Analysis Options", open=True): | |
| enable_aesthetic_analysis = gr.Checkbox(label="Enable Aesthetic Analysis (Anime)", value=True) | |
| with gr.Group() as aesthetic_params_group: | |
| gr.Markdown("<small>_Prepends aesthetic label to filenames._</small>") | |
| aesthetic_model_name_dd = gr.Dropdown(aesthetic_models, label="Aesthetic Model", value=aesthetic_models[0]) | |
| enable_aesthetic_analysis.change(fn=lambda e: gr.update(visible=e), inputs=enable_aesthetic_analysis, outputs=aesthetic_params_group) | |
| gr.Markdown("---") | |
| gr.Markdown("**Warning:** Complex combinations can be slow. Models downloaded on first use.", elem_id="warning") | |
| with gr.Column(scale=1): | |
| # --- Output Components --- | |
| status_text = gr.Textbox(label="Processing Status", interactive=False, lines=20, elem_id="status_box") | |
| output_zips = gr.Files(label="Download Processed Images (ZIPs)") | |
| # Connect button click | |
| process_button.click( | |
| fn=process_inputs_main, | |
| inputs=[ | |
| input_files, sample_interval_ms, | |
| # Person Detect | |
| enable_person_detection, min_target_width_person_percentage_slider, | |
| person_model_name_dd, person_conf_threshold, person_iou_threshold, | |
| # HalfBody Detect | |
| enable_halfbody_detection, enable_halfbody_cropping, min_target_width_halfbody_percentage_slider, | |
| halfbody_model_name_dd, halfbody_conf_threshold, halfbody_iou_threshold, | |
| # Head Detect | |
| enable_head_detection, enable_head_cropping, min_crop_width_head_percentage_slider, | |
| enable_head_filtering, head_model_name_dd, head_conf_threshold, head_iou_threshold, | |
| # Face Detect | |
| enable_face_detection, enable_face_cropping, min_crop_width_face_percentage_slider, | |
| enable_face_filtering, face_model_name_dd, face_conf_threshold, face_iou_threshold, | |
| # CCIP | |
| enable_ccip_classification, ccip_model_name_dd, ccip_threshold_slider, | |
| # LPIPS | |
| enable_lpips_clustering, lpips_threshold_slider, | |
| # Aesthetic | |
| enable_aesthetic_analysis, aesthetic_model_name_dd, | |
| ], | |
| outputs=[output_zips, status_text] | |
| ) | |
| # --- Launch Script --- | |
| if __name__ == "__main__": | |
| print("Starting Gradio App...") | |
| # Model pre-check | |
| try: | |
| print("Checking/Downloading models (this might take a moment)...") | |
| # Use simple, small images for checks | |
| dummy_img_pil = Image.new('RGB', (64, 64), color = 'orange') | |
| print(" - Person detection...") | |
| _ = person_detector.detect_person(dummy_img_pil, model_name=person_models[0]) | |
| print(" - HalfBody detection...") | |
| _ = halfbody_detector.detect_halfbody(dummy_img_pil, model_name=halfbody_models[0]) | |
| print(" - Head detection...") | |
| _ = head_detector.detect_heads(dummy_img_pil, model_name=head_models[0]) | |
| print(" - Face detection...") | |
| _ = face_detector.detect_faces(dummy_img_pil, model_name=face_models[0]) | |
| print(" - CCIP feature extraction...") | |
| _ = ccip_analyzer.ccip_extract_feature(dummy_img_pil, size=384, model=ccip_models[0]) | |
| print(" - LPIPS feature extraction...") | |
| _ = lpips_module.lpips_extract_feature(dummy_img_pil) | |
| print(" - Aesthetic analysis...") | |
| _ = dbaesthetic_analyzer.anime_dbaesthetic(dummy_img_pil, model_name=aesthetic_models[0]) | |
| print("Models seem ready or downloaded.") | |
| del dummy_img_pil | |
| gc.collect() | |
| except Exception as model_err: | |
| print(f"\n--- !!! WARNING !!! ---") | |
| print(f"Could not pre-check/download all models: {model_err}") | |
| print(f"Models will be downloaded when first used by the application, which may cause a delay on the first run.") | |
| print(f"Check your internet connection and library installation (pip install \"dghs-imgutils[gpu]\").") | |
| print(f"-----------------------\n") | |
| # Launch the app | |
| demo.launch(inbrowser=True) | |