import gradio as gr import cv2 import tempfile import numpy as np import json from ultralytics import YOLO from deep_sort_realtime.deepsort_tracker import DeepSort model = YOLO("best.pt") class_names = model.names tracker = DeepSort(max_age=30) def analyze_articulated_motion(frame, prev_frame, bbox): x1, y1, x2, y2 = map(int, bbox) x1, y1 = max(0, x1), max(0, y1) x2, y2 = min(frame.shape[1], x2), min(frame.shape[0], y2) h, w = y2 - y1, x2 - x1 if h < 10 or w < 10: return False, "none", 0 mid_y = y1 + int(h * 0.5) try: roi_curr = cv2.cvtColor(frame[y1:mid_y, x1:x2], cv2.COLOR_BGR2GRAY) roi_prev = cv2.cvtColor(prev_frame[y1:mid_y, x1:x2], cv2.COLOR_BGR2GRAY) diff = cv2.absdiff(roi_curr, roi_prev) _, thresh = cv2.threshold(diff, 12, 255, cv2.THRESH_BINARY) motion_score = np.mean(thresh) if motion_score > 0.15: return True, "arm_only", motion_score except: pass return False, "none", 0 def classify_activity(history, is_active, motion_source): if not is_active: return "Waiting" if len(history) < 10: return "Digging" dx = history[-1][0] - history[-10][0] dy = history[-1][1] - history[-10][1] if abs(dx) > abs(dy) * 2: return "Swinging/Loading" if dy > 1.5: return "Digging" if dy < -1.5: return "Dumping" return "Digging" def process_video(video_file): cap = cv2.VideoCapture(video_file) fps = cap.get(cv2.CAP_PROP_FPS) or 24 output_video_path = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4").name fourcc = cv2.VideoWriter_fourcc(*"mp4v") out = cv2.VideoWriter(output_video_path, fourcc, fps, (640, 360)) frame_id = 0 prev_frame = None track_stats = {} track_history = {} final_json_data = [] while True: ret, frame = cap.read() if not ret: break frame_id += 1 frame_resized = cv2.resize(frame, (640, 360)) results = model(frame_resized, verbose=False)[0] detections = [] for box in results.boxes: cls_id = int(box.cls[0]) if class_names[cls_id] == "C_E": x1, y1, x2, y2 = box.xyxy[0].tolist() conf = float(box.conf[0]) detections.append(([x1, y1, x2-x1, y2-y1], conf, "C_E")) tracks = tracker.update_tracks(detections, frame=frame_resized) for t in tracks: if not t.is_confirmed(): continue track_id = t.track_id bbox = t.to_ltrb() cx, cy = (bbox[0]+bbox[2])/2, (bbox[1]+bbox[3])/2 if track_id not in track_history: track_history[track_id] = [] track_history[track_id].append((cx, cy)) if len(track_history[track_id]) > 30: track_history[track_id].pop(0) is_active, motion_src, _ = analyze_articulated_motion(frame_resized, prev_frame, bbox) if prev_frame is not None else (False, "none", 0) current_act = classify_activity(track_history[track_id], is_active, motion_src) if track_id not in track_stats: track_stats[track_id] = {"active_f": 0, "total_f": 0} track_stats[track_id]["total_f"] += 1 if current_act != "Waiting": track_stats[track_id]["active_f"] += 1 color = (0, 255, 0) if current_act != "Waiting" else (0, 0, 255) ix1, iy1, ix2, iy2 = map(int, bbox) cv2.rectangle(frame_resized, (ix1, iy1), (ix2, iy2), color, 2) cv2.putText(frame_resized, f"EX-{track_id}: {current_act}", (ix1, iy1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2) if frame_id % int(fps) == 0: total_s = track_stats[track_id]["total_f"]/fps act_s = track_stats[track_id]["active_f"]/fps final_json_data.append({ "frame_id": frame_id, "equipment_id": f"EX-{track_id}", "timestamp": f"00:00:{frame_id/fps:06.3f}", "utilization": { "current_state": "ACTIVE" if current_act != "Waiting" else "INACTIVE", "current_activity": current_act.upper(), "motion_source": motion_src }, "time_analytics": { "total_tracked_seconds": round(total_s, 1), "total_active_seconds": round(act_s, 1), "utilization_percent": round((act_s/total_s)*100, 1) } }) out.write(frame_resized) prev_frame = frame_resized.copy() cap.release() out.release() json_path = tempfile.NamedTemporaryFile(delete=False, suffix=".json").name with open(json_path, "w") as f: json.dump(final_json_data, f, indent=2) return output_video_path, json.dumps(final_json_data, indent=2), json_path demo = gr.Interface( fn=process_video, inputs=gr.Video(label="Upload Construction Video"), outputs=[gr.Video(label="Analysis"), gr.Textbox(label="JSON Report", lines=15), gr.File(label="Download")], title="Gaglevision: Equipment Activity Tracker" ) demo.launch()