File size: 3,376 Bytes
ccdd4a4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import cv2
from ultralytics import YOLO
import json
import os

def analyze_video_for_ppe(video_path, model_path='yolov8n.pt', frames_per_sec=1.0):
    """
    Analyzes a video for PPE compliance using a YOLOv8 model.
    """
    # 1. Load the YOLOv8 model (You'd replace 'yolov8n.pt' with a fine-tuned PPE model)
    # The search results indicate YOLOv8 is excellent for this.
    try:
        model = YOLO(model_path)
    except Exception as e:
        print(f"Error loading model: {e}. Ensure you have a valid YOLOv8 model path.")
        return []

    # 2. Open the video file
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Error: Could not open video file {video_path}")
        return []

    # Get video properties
    fps = cap.get(cv2.CAP_PROP_FPS)
    frame_interval = int(fps / frames_per_sec) # Calculate interval to sample frames
    frame_count = 0
    analysis_results = []
    
    # 

    print(f"Video FPS: {fps}, Analyzing every {frame_interval} frames...")

    while cap.isOpened():
        # Read the next frame
        ret, frame = cap.read()

        if not ret:
            break

        # Check if the current frame is a sample frame
        if frame_count % frame_interval == 0:
            timestamp_sec = frame_count / fps
            
            # 3. Run detection on the frame
            results = model(frame, verbose=False) # Run detection
            
            # 4. Process and structure results
            detections = []
            for r in results:
                # r.boxes.data is a tensor with [x1, y1, x2, y2, confidence, class_id]
                for box in r.boxes.data.tolist():
                    x1, y1, x2, y2, conf, cls = box
                    label = model.names[int(cls)]
                    
                    # Store only the necessary info
                    detections.append({
                        'label': label,
                        'confidence': round(conf, 2),
                        'bbox': [int(x1), int(y1), int(x2), int(y2)] # Bounding Box
                    })
            
            # Store structured result
            analysis_results.append({
                'video_id': os.path.basename(video_path),
                'frame_id': frame_count,
                'timestamp_sec': round(timestamp_sec, 2),
                'detections': detections
            })

        frame_count += 1

    # 5. Release video object
    cap.release()
    print(f"Analysis complete. Total frames analyzed: {len(analysis_results)}")
    return analysis_results

# Example Usage:
if __name__ == '__main__':
    # NOTE: You'll need a sample video file in the same directory (e.g., 'construction.mp4')
    # and a trained PPE model file. For a quick test, you can use the default 'yolov8n.pt'
    # which detects general objects (like 'person') until you fine-tune a PPE model.
    if not os.path.exists('construction.mp4'):
        print("Please place a video file named 'construction.mp4' in the current directory.")
    else:
        results = analyze_video_for_ppe('construction.mp4', frames_per_sec=0.5)
        
        # Save raw results (optional, but good for debugging)
        with open('raw_analysis.json', 'w') as f:
            json.dump(results, f, indent=4)
        
        print(f"Raw analysis saved to raw_analysis.json. {len(results)} records created.")