Spaces:
Running
Running
| import gradio as gr | |
| import cv2 | |
| import tempfile | |
| import numpy as np | |
| import json | |
| from ultralytics import YOLO | |
| from deep_sort_realtime.deepsort_tracker import DeepSort | |
| model = YOLO("best.pt") | |
| class_names = model.names | |
| tracker = DeepSort(max_age=30) | |
| def analyze_articulated_motion(frame, prev_frame, bbox): | |
| x1, y1, x2, y2 = map(int, bbox) | |
| x1, y1 = max(0, x1), max(0, y1) | |
| x2, y2 = min(frame.shape[1], x2), min(frame.shape[0], y2) | |
| h, w = y2 - y1, x2 - x1 | |
| if h < 10 or w < 10: return False, "none", 0 | |
| mid_y = y1 + int(h * 0.5) | |
| try: | |
| roi_curr = cv2.cvtColor(frame[y1:mid_y, x1:x2], cv2.COLOR_BGR2GRAY) | |
| roi_prev = cv2.cvtColor(prev_frame[y1:mid_y, x1:x2], cv2.COLOR_BGR2GRAY) | |
| diff = cv2.absdiff(roi_curr, roi_prev) | |
| _, thresh = cv2.threshold(diff, 12, 255, cv2.THRESH_BINARY) | |
| motion_score = np.mean(thresh) | |
| if motion_score > 0.15: | |
| return True, "arm_only", motion_score | |
| except: | |
| pass | |
| return False, "none", 0 | |
| def classify_activity(history, is_active, motion_source): | |
| if not is_active: | |
| return "Waiting" | |
| if len(history) < 10: | |
| return "Digging" | |
| dx = history[-1][0] - history[-10][0] | |
| dy = history[-1][1] - history[-10][1] | |
| if abs(dx) > abs(dy) * 2: | |
| return "Swinging/Loading" | |
| if dy > 1.5: | |
| return "Digging" | |
| if dy < -1.5: | |
| return "Dumping" | |
| return "Digging" | |
| def process_video(video_file): | |
| cap = cv2.VideoCapture(video_file) | |
| fps = cap.get(cv2.CAP_PROP_FPS) or 24 | |
| output_video_path = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4").name | |
| fourcc = cv2.VideoWriter_fourcc(*"mp4v") | |
| out = cv2.VideoWriter(output_video_path, fourcc, fps, (640, 360)) | |
| frame_id = 0 | |
| prev_frame = None | |
| track_stats = {} | |
| track_history = {} | |
| final_json_data = [] | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: break | |
| frame_id += 1 | |
| frame_resized = cv2.resize(frame, (640, 360)) | |
| results = model(frame_resized, verbose=False)[0] | |
| detections = [] | |
| for box in results.boxes: | |
| cls_id = int(box.cls[0]) | |
| if class_names[cls_id] == "C_E": | |
| x1, y1, x2, y2 = box.xyxy[0].tolist() | |
| conf = float(box.conf[0]) | |
| detections.append(([x1, y1, x2-x1, y2-y1], conf, "C_E")) | |
| tracks = tracker.update_tracks(detections, frame=frame_resized) | |
| for t in tracks: | |
| if not t.is_confirmed(): continue | |
| track_id = t.track_id | |
| bbox = t.to_ltrb() | |
| cx, cy = (bbox[0]+bbox[2])/2, (bbox[1]+bbox[3])/2 | |
| if track_id not in track_history: track_history[track_id] = [] | |
| track_history[track_id].append((cx, cy)) | |
| if len(track_history[track_id]) > 30: track_history[track_id].pop(0) | |
| is_active, motion_src, _ = analyze_articulated_motion(frame_resized, prev_frame, bbox) if prev_frame is not None else (False, "none", 0) | |
| current_act = classify_activity(track_history[track_id], is_active, motion_src) | |
| if track_id not in track_stats: track_stats[track_id] = {"active_f": 0, "total_f": 0} | |
| track_stats[track_id]["total_f"] += 1 | |
| if current_act != "Waiting": track_stats[track_id]["active_f"] += 1 | |
| color = (0, 255, 0) if current_act != "Waiting" else (0, 0, 255) | |
| ix1, iy1, ix2, iy2 = map(int, bbox) | |
| cv2.rectangle(frame_resized, (ix1, iy1), (ix2, iy2), color, 2) | |
| cv2.putText(frame_resized, f"EX-{track_id}: {current_act}", (ix1, iy1-10), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2) | |
| if frame_id % int(fps) == 0: | |
| total_s = track_stats[track_id]["total_f"]/fps | |
| act_s = track_stats[track_id]["active_f"]/fps | |
| final_json_data.append({ | |
| "frame_id": frame_id, | |
| "equipment_id": f"EX-{track_id}", | |
| "timestamp": f"00:00:{frame_id/fps:06.3f}", | |
| "utilization": { | |
| "current_state": "ACTIVE" if current_act != "Waiting" else "INACTIVE", | |
| "current_activity": current_act.upper(), | |
| "motion_source": motion_src | |
| }, | |
| "time_analytics": { | |
| "total_tracked_seconds": round(total_s, 1), | |
| "total_active_seconds": round(act_s, 1), | |
| "utilization_percent": round((act_s/total_s)*100, 1) | |
| } | |
| }) | |
| out.write(frame_resized) | |
| prev_frame = frame_resized.copy() | |
| cap.release() | |
| out.release() | |
| json_path = tempfile.NamedTemporaryFile(delete=False, suffix=".json").name | |
| with open(json_path, "w") as f: json.dump(final_json_data, f, indent=2) | |
| return output_video_path, json.dumps(final_json_data, indent=2), json_path | |
| demo = gr.Interface( | |
| fn=process_video, | |
| inputs=gr.Video(label="Upload Construction Video"), | |
| outputs=[gr.Video(label="Analysis"), gr.Textbox(label="JSON Report", lines=15), gr.File(label="Download")], | |
| title="Gaglevision: Equipment Activity Tracker" | |
| ) | |
| demo.launch() |