import cv2 import numpy as np import time import os from ultralytics import YOLO from PIL import Image import matplotlib.pyplot as plt def detect_and_blur(input_source, model=None, frame_skip=3): """Detect and blur sensitive elements in images or video frames Args: input_source: Image path or video frame (numpy array) model: YOLO model instance frame_skip: Frame skip rate for video processing Returns: result_rgb: Processed image with blurred regions detections: Dict with counts of detected objects boxes: Dict with bounding boxes of detected objects """ if isinstance(input_source, str): # Image path frame = cv2.imread(input_source) if frame is None: raise ValueError(f"Could not read image from {input_source}") else: # Video frame or numpy array frame = input_source.copy() # Handle RGB vs BGR input if len(frame.shape) == 3 and frame.shape[2] == 3: if isinstance(input_source, str) or input_source is not None: frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) else: frame_rgb = frame # Assume RGB if directly passed else: raise ValueError("Input must be a color image with 3 channels") result_img = frame.copy() detections = {'faces': 0, 'plates': 0} boxes = {'faces': [], 'plates': []} if model: try: # Run YOLOv8 inference results = model.predict(frame_rgb, conf=0.5) for r in results: for box in r.boxes: x1, y1, x2, y2 = map(int, box.xyxy[0].tolist()) cls_id = int(box.cls[0]) conf = float(box.conf[0]) # Ensure coordinates are within image bounds x1, y1 = max(0, x1), max(0, y1) x2, y2 = min(frame.shape[1], x2), min(frame.shape[0], y2) # Skip invalid boxes if x2 <= x1 or y2 <= y1: continue # Apply Gaussian blur to the detected region region = result_img[y1:y2, x1:x2] # Adjust kernel size based on detection type and region size kernel_size = 55 if cls_id == 1 else 71 # Different blur for faces vs plates/text kernel_size = max(25, min(kernel_size, (x2-x1)//2*2+1, (y2-y1)//2*2+1)) kernel_size = kernel_size + 1 if kernel_size % 2 == 0 else kernel_size # Apply blur only if kernel size is valid if kernel_size >= 3: blurred = cv2.GaussianBlur(region, (kernel_size, kernel_size), 15) result_img[y1:y2, x1:x2] = blurred # Update detection counts and boxes if cls_id == 0: # Assuming 0 is plate/text detections['plates'] += 1 boxes['plates'].append((x1, y1, x2, y2)) else: # Assuming 1 is face detections['faces'] += 1 boxes['faces'].append((x1, y1, x2, y2)) except Exception as e: print(f"Detection error: {e}") # Ensure output is RGB for consistent interface if isinstance(input_source, str) or input_source is not None: result_rgb = cv2.cvtColor(result_img, cv2.COLOR_BGR2RGB) else: result_rgb = result_img return result_rgb, detections, boxes def process_video(input_path, output_path=None, model=None, frame_skip=3, output_fps=30): """Process video with optimized frame skipping Args: input_path: Path to input video output_path: Path to save processed video (if None, auto-generated) model: YOLO model instance frame_skip: Process 1 in every N frames output_fps: Output video frame rate Returns: output_path: Path to processed video file """ try: # Open video file cap = cv2.VideoCapture(input_path) if not cap.isOpened(): raise ValueError(f"Could not open video file {input_path}") # Get video properties frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) original_fps = cap.get(cv2.CAP_PROP_FPS) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) # Set output parameters fps = original_fps if original_fps > 0 else output_fps if output_path is None: # Create results directory if it doesn't exist results_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "results") os.makedirs(results_dir, exist_ok=True) output_path = os.path.join(results_dir, f"processed_{os.path.basename(input_path)}") # Create video writer fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(output_path, fourcc, fps//frame_skip, (frame_width, frame_height)) # Display processing information print(f"Processing video: {os.path.basename(input_path)}") print(f"Original: {frame_width}x{frame_height} @ {original_fps:.1f}fps") print(f"Processing: 1 every {frame_skip} frames") print(f"Output: {fps//frame_skip:.1f}fps | Estimated time: {total_frames/(fps*frame_skip):.1f}s") # Process video frames frame_count = 0 processed_frames = 0 start_time = time.time() while True: ret, frame = cap.read() if not ret: break # Skip frames according to frame_skip if frame_count % frame_skip != 0: frame_count += 1 continue try: # Process frame result_rgb, _, _ = detect_and_blur(frame, model) result_bgr = cv2.cvtColor(result_rgb, cv2.COLOR_RGB2BGR) out.write(result_bgr) processed_frames += 1 # Print progress periodically if time.time() - start_time >= 5: elapsed = time.time() - start_time fps = processed_frames / elapsed print(f"Progress: {frame_count}/{total_frames} | " f"Processed: {processed_frames} | " f"Current FPS: {fps:.1f}") start_time = time.time() processed_frames = 0 except Exception as e: print(f"Error processing frame {frame_count}: {e}") frame_count += 1 # Clean up cap.release() out.release() print(f"\nVideo processing complete! Saved to {output_path}") return output_path except Exception as e: print(f"Video processing failed: {e}") return None def process_image(image_path, output_path=None, model=None, visualize=False): """Process single image with optional visualization Args: image_path: Path to input image or numpy array output_path: Path to save processed image (if None, auto-generated) model: YOLO model instance visualize: Whether to create visualization with original, detections, and result Returns: result_path: Path to processed image file or result_rgb: Processed image as numpy array (if output_path is None) """ try: # Process image result_rgb, detections, boxes = detect_and_blur(image_path, model) # Handle input as numpy array if not isinstance(image_path, str): if output_path is None: return result_rgb image_filename = "processed_image.jpg" else: image_filename = os.path.basename(image_path) # Create visualization if requested if visualize: # Load original image if isinstance(image_path, str): original = cv2.cvtColor(cv2.imread(image_path), cv2.COLOR_BGR2RGB) else: original = image_path # Create image with detection boxes detection_img = original.copy() # Draw face boxes for box in boxes['faces']: x1, y1, x2, y2 = box cv2.rectangle(detection_img, (x1, y1), (x2, y2), (255, 0, 0), 2) cv2.putText(detection_img, "Face", (x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2) # Draw plate/text boxes for box in boxes['plates']: x1, y1, x2, y2 = box cv2.rectangle(detection_img, (x1, y1), (x2, y2), (0, 0, 255), 2) cv2.putText(detection_img, "Plate/Text", (x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2) # Create comparison visualization (only for local debugging) fig, axes = plt.subplots(1, 3, figsize=(20, 7)) titles = ["Original", "Detections", "Blurred Result"] images = [original, detection_img, result_rgb] for ax, title, img in zip(axes, titles, images): ax.imshow(img) ax.set_title(title) ax.axis("off") plt.tight_layout() plt.show() # Save result if output path provided if output_path is not None: # Save the processed image cv2.imwrite(output_path, cv2.cvtColor(result_rgb, cv2.COLOR_RGB2BGR)) print(f"Saved result to {output_path}") else: # Auto-generate output path if not provided results_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "results") os.makedirs(results_dir, exist_ok=True) result_path = os.path.join(results_dir, f"processed_{image_filename}") cv2.imwrite(result_path, cv2.cvtColor(result_rgb, cv2.COLOR_RGB2BGR)) print(f"Saved result to {result_path}") output_path = result_path print(f"Detections: {detections['faces']} faces, {detections['plates']} plates/text regions") return output_path if isinstance(image_path, str) else result_rgb except Exception as e: print(f"Image processing error: {e}") return None def process_pil_image(pil_image, model=None): """Process PIL Image for Gradio interface Args: pil_image: PIL Image model: YOLO model instance Returns: result_pil: Processed PIL Image detections: Dict with counts of detected objects """ try: # Convert PIL to numpy array img_array = np.array(pil_image) # Process the image result_rgb, detections, _ = detect_and_blur(img_array, model) # Convert back to PIL if needed result_pil = Image.fromarray(result_rgb) return result_pil, detections except Exception as e: print(f"PIL image processing error: {e}") return pil_image, {'faces': 0, 'plates': 0}