File size: 11,335 Bytes
83aae9f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
import cv2
import numpy as np
import time
import os
from ultralytics import YOLO
from PIL import Image
import matplotlib.pyplot as plt

def detect_and_blur(input_source, model=None, frame_skip=3):
    """Detect and blur sensitive elements in images or video frames
    
    Args:
        input_source: Image path or video frame (numpy array)
        model: YOLO model instance
        frame_skip: Frame skip rate for video processing
        
    Returns:
        result_rgb: Processed image with blurred regions
        detections: Dict with counts of detected objects
        boxes: Dict with bounding boxes of detected objects
    """
    if isinstance(input_source, str):  # Image path
        frame = cv2.imread(input_source)
        if frame is None:
            raise ValueError(f"Could not read image from {input_source}")
    else:  # Video frame or numpy array
        frame = input_source.copy()

    # Handle RGB vs BGR input
    if len(frame.shape) == 3 and frame.shape[2] == 3:
        if isinstance(input_source, str) or input_source is not None:
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        else:
            frame_rgb = frame  # Assume RGB if directly passed
    else:
        raise ValueError("Input must be a color image with 3 channels")

    result_img = frame.copy()
    detections = {'faces': 0, 'plates': 0}
    boxes = {'faces': [], 'plates': []}

    if model:
        try:
            # Run YOLOv8 inference
            results = model.predict(frame_rgb, conf=0.5)
            for r in results:
                for box in r.boxes:
                    x1, y1, x2, y2 = map(int, box.xyxy[0].tolist())
                    cls_id = int(box.cls[0])
                    conf = float(box.conf[0])

                    # Ensure coordinates are within image bounds
                    x1, y1 = max(0, x1), max(0, y1)
                    x2, y2 = min(frame.shape[1], x2), min(frame.shape[0], y2)

                    # Skip invalid boxes
                    if x2 <= x1 or y2 <= y1:
                        continue

                    # Apply Gaussian blur to the detected region
                    region = result_img[y1:y2, x1:x2]
                    # Adjust kernel size based on detection type and region size
                    kernel_size = 55 if cls_id == 1 else 71  # Different blur for faces vs plates/text
                    kernel_size = max(25, min(kernel_size, (x2-x1)//2*2+1, (y2-y1)//2*2+1))
                    kernel_size = kernel_size + 1 if kernel_size % 2 == 0 else kernel_size

                    # Apply blur only if kernel size is valid
                    if kernel_size >= 3:
                        blurred = cv2.GaussianBlur(region, (kernel_size, kernel_size), 15)
                        result_img[y1:y2, x1:x2] = blurred

                        # Update detection counts and boxes
                        if cls_id == 0:  # Assuming 0 is plate/text
                            detections['plates'] += 1
                            boxes['plates'].append((x1, y1, x2, y2))
                        else:  # Assuming 1 is face
                            detections['faces'] += 1
                            boxes['faces'].append((x1, y1, x2, y2))
        except Exception as e:
            print(f"Detection error: {e}")

    # Ensure output is RGB for consistent interface
    if isinstance(input_source, str) or input_source is not None:
        result_rgb = cv2.cvtColor(result_img, cv2.COLOR_BGR2RGB)
    else:
        result_rgb = result_img

    return result_rgb, detections, boxes

def process_video(input_path, output_path=None, model=None, frame_skip=3, output_fps=30):
    """Process video with optimized frame skipping
    
    Args:
        input_path: Path to input video
        output_path: Path to save processed video (if None, auto-generated)
        model: YOLO model instance
        frame_skip: Process 1 in every N frames
        output_fps: Output video frame rate
        
    Returns:
        output_path: Path to processed video file
    """
    try:
        # Open video file
        cap = cv2.VideoCapture(input_path)
        if not cap.isOpened():
            raise ValueError(f"Could not open video file {input_path}")

        # Get video properties
        frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        original_fps = cap.get(cv2.CAP_PROP_FPS)
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

        # Set output parameters
        fps = original_fps if original_fps > 0 else output_fps
        if output_path is None:
            # Create results directory if it doesn't exist
            results_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "results")
            os.makedirs(results_dir, exist_ok=True)
            output_path = os.path.join(results_dir, f"processed_{os.path.basename(input_path)}")
        
        # Create video writer
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(output_path, fourcc, fps//frame_skip, (frame_width, frame_height))

        # Display processing information
        print(f"Processing video: {os.path.basename(input_path)}")
        print(f"Original: {frame_width}x{frame_height} @ {original_fps:.1f}fps")
        print(f"Processing: 1 every {frame_skip} frames")
        print(f"Output: {fps//frame_skip:.1f}fps | Estimated time: {total_frames/(fps*frame_skip):.1f}s")

        # Process video frames
        frame_count = 0
        processed_frames = 0
        start_time = time.time()

        while True:
            ret, frame = cap.read()
            if not ret:
                break

            # Skip frames according to frame_skip
            if frame_count % frame_skip != 0:
                frame_count += 1
                continue

            try:
                # Process frame
                result_rgb, _, _ = detect_and_blur(frame, model)
                result_bgr = cv2.cvtColor(result_rgb, cv2.COLOR_RGB2BGR)
                out.write(result_bgr)
                processed_frames += 1

                # Print progress periodically
                if time.time() - start_time >= 5:
                    elapsed = time.time() - start_time
                    fps = processed_frames / elapsed
                    print(f"Progress: {frame_count}/{total_frames} | "
                          f"Processed: {processed_frames} | "
                          f"Current FPS: {fps:.1f}")
                    start_time = time.time()
                    processed_frames = 0

            except Exception as e:
                print(f"Error processing frame {frame_count}: {e}")

            frame_count += 1

        # Clean up
        cap.release()
        out.release()
        print(f"\nVideo processing complete! Saved to {output_path}")
        return output_path

    except Exception as e:
        print(f"Video processing failed: {e}")
        return None

def process_image(image_path, output_path=None, model=None, visualize=False):
    """Process single image with optional visualization
    
    Args:
        image_path: Path to input image or numpy array
        output_path: Path to save processed image (if None, auto-generated)
        model: YOLO model instance
        visualize: Whether to create visualization with original, detections, and result
        
    Returns:
        result_path: Path to processed image file
        or
        result_rgb: Processed image as numpy array (if output_path is None)
    """
    try:
        # Process image
        result_rgb, detections, boxes = detect_and_blur(image_path, model)
        
        # Handle input as numpy array
        if not isinstance(image_path, str):
            if output_path is None:
                return result_rgb
            image_filename = "processed_image.jpg"
        else:
            image_filename = os.path.basename(image_path)
        
        # Create visualization if requested
        if visualize:
            # Load original image
            if isinstance(image_path, str):
                original = cv2.cvtColor(cv2.imread(image_path), cv2.COLOR_BGR2RGB)
            else:
                original = image_path
                
            # Create image with detection boxes
            detection_img = original.copy()
            
            # Draw face boxes
            for box in boxes['faces']:
                x1, y1, x2, y2 = box
                cv2.rectangle(detection_img, (x1, y1), (x2, y2), (255, 0, 0), 2)
                cv2.putText(detection_img, "Face", (x1, y1-10),
                           cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2)
            
            # Draw plate/text boxes
            for box in boxes['plates']:
                x1, y1, x2, y2 = box
                cv2.rectangle(detection_img, (x1, y1), (x2, y2), (0, 0, 255), 2)
                cv2.putText(detection_img, "Plate/Text", (x1, y1-10),
                           cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2)
            
            # Create comparison visualization (only for local debugging)
            fig, axes = plt.subplots(1, 3, figsize=(20, 7))
            titles = ["Original", "Detections", "Blurred Result"]
            images = [original, detection_img, result_rgb]
            
            for ax, title, img in zip(axes, titles, images):
                ax.imshow(img)
                ax.set_title(title)
                ax.axis("off")
            
            plt.tight_layout()
            plt.show()
        
        # Save result if output path provided
        if output_path is not None:
            # Save the processed image
            cv2.imwrite(output_path, cv2.cvtColor(result_rgb, cv2.COLOR_RGB2BGR))
            print(f"Saved result to {output_path}")
        else:
            # Auto-generate output path if not provided
            results_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "results")
            os.makedirs(results_dir, exist_ok=True)
            result_path = os.path.join(results_dir, f"processed_{image_filename}")
            cv2.imwrite(result_path, cv2.cvtColor(result_rgb, cv2.COLOR_RGB2BGR))
            print(f"Saved result to {result_path}")
            output_path = result_path
            
        print(f"Detections: {detections['faces']} faces, {detections['plates']} plates/text regions")
        return output_path if isinstance(image_path, str) else result_rgb

    except Exception as e:
        print(f"Image processing error: {e}")
        return None

def process_pil_image(pil_image, model=None):
    """Process PIL Image for Gradio interface
    
    Args:
        pil_image: PIL Image
        model: YOLO model instance
        
    Returns:
        result_pil: Processed PIL Image
        detections: Dict with counts of detected objects
    """
    try:
        # Convert PIL to numpy array
        img_array = np.array(pil_image)
        
        # Process the image
        result_rgb, detections, _ = detect_and_blur(img_array, model)
        
        # Convert back to PIL if needed
        result_pil = Image.fromarray(result_rgb)
        
        return result_pil, detections
    
    except Exception as e:
        print(f"PIL image processing error: {e}")
        return pil_image, {'faces': 0, 'plates': 0}