Privacy_Preservation / utils /processing.py
danielquillanroxas
stuff
83aae9f
import cv2
import numpy as np
import time
import os
from ultralytics import YOLO
from PIL import Image
import matplotlib.pyplot as plt
def detect_and_blur(input_source, model=None, frame_skip=3):
"""Detect and blur sensitive elements in images or video frames
Args:
input_source: Image path or video frame (numpy array)
model: YOLO model instance
frame_skip: Frame skip rate for video processing
Returns:
result_rgb: Processed image with blurred regions
detections: Dict with counts of detected objects
boxes: Dict with bounding boxes of detected objects
"""
if isinstance(input_source, str): # Image path
frame = cv2.imread(input_source)
if frame is None:
raise ValueError(f"Could not read image from {input_source}")
else: # Video frame or numpy array
frame = input_source.copy()
# Handle RGB vs BGR input
if len(frame.shape) == 3 and frame.shape[2] == 3:
if isinstance(input_source, str) or input_source is not None:
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
else:
frame_rgb = frame # Assume RGB if directly passed
else:
raise ValueError("Input must be a color image with 3 channels")
result_img = frame.copy()
detections = {'faces': 0, 'plates': 0}
boxes = {'faces': [], 'plates': []}
if model:
try:
# Run YOLOv8 inference
results = model.predict(frame_rgb, conf=0.5)
for r in results:
for box in r.boxes:
x1, y1, x2, y2 = map(int, box.xyxy[0].tolist())
cls_id = int(box.cls[0])
conf = float(box.conf[0])
# Ensure coordinates are within image bounds
x1, y1 = max(0, x1), max(0, y1)
x2, y2 = min(frame.shape[1], x2), min(frame.shape[0], y2)
# Skip invalid boxes
if x2 <= x1 or y2 <= y1:
continue
# Apply Gaussian blur to the detected region
region = result_img[y1:y2, x1:x2]
# Adjust kernel size based on detection type and region size
kernel_size = 55 if cls_id == 1 else 71 # Different blur for faces vs plates/text
kernel_size = max(25, min(kernel_size, (x2-x1)//2*2+1, (y2-y1)//2*2+1))
kernel_size = kernel_size + 1 if kernel_size % 2 == 0 else kernel_size
# Apply blur only if kernel size is valid
if kernel_size >= 3:
blurred = cv2.GaussianBlur(region, (kernel_size, kernel_size), 15)
result_img[y1:y2, x1:x2] = blurred
# Update detection counts and boxes
if cls_id == 0: # Assuming 0 is plate/text
detections['plates'] += 1
boxes['plates'].append((x1, y1, x2, y2))
else: # Assuming 1 is face
detections['faces'] += 1
boxes['faces'].append((x1, y1, x2, y2))
except Exception as e:
print(f"Detection error: {e}")
# Ensure output is RGB for consistent interface
if isinstance(input_source, str) or input_source is not None:
result_rgb = cv2.cvtColor(result_img, cv2.COLOR_BGR2RGB)
else:
result_rgb = result_img
return result_rgb, detections, boxes
def process_video(input_path, output_path=None, model=None, frame_skip=3, output_fps=30):
"""Process video with optimized frame skipping
Args:
input_path: Path to input video
output_path: Path to save processed video (if None, auto-generated)
model: YOLO model instance
frame_skip: Process 1 in every N frames
output_fps: Output video frame rate
Returns:
output_path: Path to processed video file
"""
try:
# Open video file
cap = cv2.VideoCapture(input_path)
if not cap.isOpened():
raise ValueError(f"Could not open video file {input_path}")
# Get video properties
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
original_fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
# Set output parameters
fps = original_fps if original_fps > 0 else output_fps
if output_path is None:
# Create results directory if it doesn't exist
results_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "results")
os.makedirs(results_dir, exist_ok=True)
output_path = os.path.join(results_dir, f"processed_{os.path.basename(input_path)}")
# Create video writer
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps//frame_skip, (frame_width, frame_height))
# Display processing information
print(f"Processing video: {os.path.basename(input_path)}")
print(f"Original: {frame_width}x{frame_height} @ {original_fps:.1f}fps")
print(f"Processing: 1 every {frame_skip} frames")
print(f"Output: {fps//frame_skip:.1f}fps | Estimated time: {total_frames/(fps*frame_skip):.1f}s")
# Process video frames
frame_count = 0
processed_frames = 0
start_time = time.time()
while True:
ret, frame = cap.read()
if not ret:
break
# Skip frames according to frame_skip
if frame_count % frame_skip != 0:
frame_count += 1
continue
try:
# Process frame
result_rgb, _, _ = detect_and_blur(frame, model)
result_bgr = cv2.cvtColor(result_rgb, cv2.COLOR_RGB2BGR)
out.write(result_bgr)
processed_frames += 1
# Print progress periodically
if time.time() - start_time >= 5:
elapsed = time.time() - start_time
fps = processed_frames / elapsed
print(f"Progress: {frame_count}/{total_frames} | "
f"Processed: {processed_frames} | "
f"Current FPS: {fps:.1f}")
start_time = time.time()
processed_frames = 0
except Exception as e:
print(f"Error processing frame {frame_count}: {e}")
frame_count += 1
# Clean up
cap.release()
out.release()
print(f"\nVideo processing complete! Saved to {output_path}")
return output_path
except Exception as e:
print(f"Video processing failed: {e}")
return None
def process_image(image_path, output_path=None, model=None, visualize=False):
"""Process single image with optional visualization
Args:
image_path: Path to input image or numpy array
output_path: Path to save processed image (if None, auto-generated)
model: YOLO model instance
visualize: Whether to create visualization with original, detections, and result
Returns:
result_path: Path to processed image file
or
result_rgb: Processed image as numpy array (if output_path is None)
"""
try:
# Process image
result_rgb, detections, boxes = detect_and_blur(image_path, model)
# Handle input as numpy array
if not isinstance(image_path, str):
if output_path is None:
return result_rgb
image_filename = "processed_image.jpg"
else:
image_filename = os.path.basename(image_path)
# Create visualization if requested
if visualize:
# Load original image
if isinstance(image_path, str):
original = cv2.cvtColor(cv2.imread(image_path), cv2.COLOR_BGR2RGB)
else:
original = image_path
# Create image with detection boxes
detection_img = original.copy()
# Draw face boxes
for box in boxes['faces']:
x1, y1, x2, y2 = box
cv2.rectangle(detection_img, (x1, y1), (x2, y2), (255, 0, 0), 2)
cv2.putText(detection_img, "Face", (x1, y1-10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2)
# Draw plate/text boxes
for box in boxes['plates']:
x1, y1, x2, y2 = box
cv2.rectangle(detection_img, (x1, y1), (x2, y2), (0, 0, 255), 2)
cv2.putText(detection_img, "Plate/Text", (x1, y1-10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2)
# Create comparison visualization (only for local debugging)
fig, axes = plt.subplots(1, 3, figsize=(20, 7))
titles = ["Original", "Detections", "Blurred Result"]
images = [original, detection_img, result_rgb]
for ax, title, img in zip(axes, titles, images):
ax.imshow(img)
ax.set_title(title)
ax.axis("off")
plt.tight_layout()
plt.show()
# Save result if output path provided
if output_path is not None:
# Save the processed image
cv2.imwrite(output_path, cv2.cvtColor(result_rgb, cv2.COLOR_RGB2BGR))
print(f"Saved result to {output_path}")
else:
# Auto-generate output path if not provided
results_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "results")
os.makedirs(results_dir, exist_ok=True)
result_path = os.path.join(results_dir, f"processed_{image_filename}")
cv2.imwrite(result_path, cv2.cvtColor(result_rgb, cv2.COLOR_RGB2BGR))
print(f"Saved result to {result_path}")
output_path = result_path
print(f"Detections: {detections['faces']} faces, {detections['plates']} plates/text regions")
return output_path if isinstance(image_path, str) else result_rgb
except Exception as e:
print(f"Image processing error: {e}")
return None
def process_pil_image(pil_image, model=None):
"""Process PIL Image for Gradio interface
Args:
pil_image: PIL Image
model: YOLO model instance
Returns:
result_pil: Processed PIL Image
detections: Dict with counts of detected objects
"""
try:
# Convert PIL to numpy array
img_array = np.array(pil_image)
# Process the image
result_rgb, detections, _ = detect_and_blur(img_array, model)
# Convert back to PIL if needed
result_pil = Image.fromarray(result_rgb)
return result_pil, detections
except Exception as e:
print(f"PIL image processing error: {e}")
return pil_image, {'faces': 0, 'plates': 0}