File size: 5,263 Bytes
76d374e
 
14f9889
8103160
ee68890
76d374e
c0e04d3
76d374e
8103160
76d374e
8deab67
c0e04d3
3a2601c
 
baf5377
 
 
97fe512
3a2601c
ee9ebff
 
 
 
 
 
 
 
 
97fe512
ee9ebff
 
97fe512
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8deab67
ee9ebff
c0e04d3
8deab67
3a2601c
 
 
76d374e
14f9889
3a2601c
ee9ebff
97fe512
ee9ebff
14f9889
76d374e
 
baf5377
76d374e
14f9889
baf5377
ee9ebff
c0e04d3
ee9ebff
c0e04d3
 
97fe512
baf5377
 
97fe512
76d374e
baf5377
76d374e
c0e04d3
baf5377
c0e04d3
3a2601c
97fe512
 
 
 
 
 
 
 
 
 
ee9ebff
97fe512
3a2601c
97fe512
baf5377
 
97fe512
 
ee9ebff
97fe512
 
 
ee9ebff
 
 
 
 
97fe512
 
ee9ebff
 
 
97fe512
 
 
ee9ebff
 
3a2601c
baf5377
 
3a2601c
76d374e
3a2601c
97fe512
baf5377
97fe512
76d374e
baf5377
76d374e
ee68890
 
ee9ebff
97fe512
 
ee68890
97fe512
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import gradio as gr
import cv2
import tempfile
import numpy as np
import json
from ultralytics import YOLO
from deep_sort_realtime.deepsort_tracker import DeepSort

model = YOLO("best.pt")
class_names = model.names
tracker = DeepSort(max_age=30)

def analyze_articulated_motion(frame, prev_frame, bbox):
    x1, y1, x2, y2 = map(int, bbox)
    x1, y1 = max(0, x1), max(0, y1)
    x2, y2 = min(frame.shape[1], x2), min(frame.shape[0], y2)
    h, w = y2 - y1, x2 - x1
    if h < 10 or w < 10: return False, "none", 0

    mid_y = y1 + int(h * 0.5)
    try:
        roi_curr = cv2.cvtColor(frame[y1:mid_y, x1:x2], cv2.COLOR_BGR2GRAY)
        roi_prev = cv2.cvtColor(prev_frame[y1:mid_y, x1:x2], cv2.COLOR_BGR2GRAY)
        diff = cv2.absdiff(roi_curr, roi_prev)
        _, thresh = cv2.threshold(diff, 12, 255, cv2.THRESH_BINARY)
        motion_score = np.mean(thresh)
        
        if motion_score > 0.15:
            return True, "arm_only", motion_score
    except:
        pass
    return False, "none", 0

def classify_activity(history, is_active, motion_source):
    if not is_active:
        return "Waiting"
    
    if len(history) < 10: 
        return "Digging"

    dx = history[-1][0] - history[-10][0] 
    dy = history[-1][1] - history[-10][1] 

    if abs(dx) > abs(dy) * 2:
        return "Swinging/Loading"
    
    if dy > 1.5:
        return "Digging"
    
    if dy < -1.5:
        return "Dumping"

    return "Digging"

def process_video(video_file):
    cap = cv2.VideoCapture(video_file)
    fps = cap.get(cv2.CAP_PROP_FPS) or 24
    output_video_path = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4").name
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    out = cv2.VideoWriter(output_video_path, fourcc, fps, (640, 360))

    frame_id = 0
    prev_frame = None
    track_stats = {}
    track_history = {} 
    final_json_data = []

    while True:
        ret, frame = cap.read()
        if not ret: break

        frame_id += 1
        frame_resized = cv2.resize(frame, (640, 360))
        results = model(frame_resized, verbose=False)[0]
        detections = []
        
        for box in results.boxes:
            cls_id = int(box.cls[0])
            if class_names[cls_id] == "C_E":
                x1, y1, x2, y2 = box.xyxy[0].tolist()
                conf = float(box.conf[0])
                detections.append(([x1, y1, x2-x1, y2-y1], conf, "C_E"))

        tracks = tracker.update_tracks(detections, frame=frame_resized)

        for t in tracks:
            if not t.is_confirmed(): continue
            track_id = t.track_id
            bbox = t.to_ltrb()
            cx, cy = (bbox[0]+bbox[2])/2, (bbox[1]+bbox[3])/2

            if track_id not in track_history: track_history[track_id] = []
            track_history[track_id].append((cx, cy))
            if len(track_history[track_id]) > 30: track_history[track_id].pop(0)

            is_active, motion_src, _ = analyze_articulated_motion(frame_resized, prev_frame, bbox) if prev_frame is not None else (False, "none", 0)
            current_act = classify_activity(track_history[track_id], is_active, motion_src)

            if track_id not in track_stats: track_stats[track_id] = {"active_f": 0, "total_f": 0}
            track_stats[track_id]["total_f"] += 1
            if current_act != "Waiting": track_stats[track_id]["active_f"] += 1

            color = (0, 255, 0) if current_act != "Waiting" else (0, 0, 255)
            ix1, iy1, ix2, iy2 = map(int, bbox)
            cv2.rectangle(frame_resized, (ix1, iy1), (ix2, iy2), color, 2)
            cv2.putText(frame_resized, f"EX-{track_id}: {current_act}", (ix1, iy1-10), 
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

            if frame_id % int(fps) == 0:
                total_s = track_stats[track_id]["total_f"]/fps
                act_s = track_stats[track_id]["active_f"]/fps
                final_json_data.append({
                    "frame_id": frame_id,
                    "equipment_id": f"EX-{track_id}",
                    "timestamp": f"00:00:{frame_id/fps:06.3f}",
                    "utilization": {
                        "current_state": "ACTIVE" if current_act != "Waiting" else "INACTIVE",
                        "current_activity": current_act.upper(),
                        "motion_source": motion_src
                    },
                    "time_analytics": {
                        "total_tracked_seconds": round(total_s, 1),
                        "total_active_seconds": round(act_s, 1),
                        "utilization_percent": round((act_s/total_s)*100, 1)
                    }
                })

        out.write(frame_resized)
        prev_frame = frame_resized.copy()

    cap.release()
    out.release()
    
    json_path = tempfile.NamedTemporaryFile(delete=False, suffix=".json").name
    with open(json_path, "w") as f: json.dump(final_json_data, f, indent=2)

    return output_video_path, json.dumps(final_json_data, indent=2), json_path

demo = gr.Interface(
    fn=process_video,
    inputs=gr.Video(label="Upload Construction Video"),
    outputs=[gr.Video(label="Analysis"), gr.Textbox(label="JSON Report", lines=15), gr.File(label="Download")],
    title="Gaglevision: Equipment Activity Tracker"
)
demo.launch()