shimaa22 commited on
Commit
8deab67
ยท
verified ยท
1 Parent(s): 66e59ff

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +103 -57
app.py CHANGED
@@ -8,11 +8,19 @@ from deep_sort_realtime.deepsort_tracker import DeepSort
8
 
9
  model = YOLO("best.pt")
10
  class_names = model.names
11
- tracker = DeepSort(max_age=15)
12
 
13
 
 
 
 
14
  def analyze_articulated_motion(frame, prev_frame, bbox):
15
-
 
 
 
 
 
16
  x1, y1, x2, y2 = map(int, bbox)
17
 
18
  x1 = max(0, x1)
@@ -21,42 +29,69 @@ def analyze_articulated_motion(frame, prev_frame, bbox):
21
  y2 = min(frame.shape[0], y2)
22
 
23
  h = y2 - y1
24
- if h < 10:
 
 
25
  return False, "none"
26
 
27
  mid_y = y1 + h // 2
28
 
 
29
  upper_curr = cv2.cvtColor(frame[y1:mid_y, x1:x2], cv2.COLOR_BGR2GRAY)
30
  upper_prev = cv2.cvtColor(prev_frame[y1:mid_y, x1:x2], cv2.COLOR_BGR2GRAY)
31
 
 
32
  lower_curr = cv2.cvtColor(frame[mid_y:y2, x1:x2], cv2.COLOR_BGR2GRAY)
33
  lower_prev = cv2.cvtColor(prev_frame[mid_y:y2, x1:x2], cv2.COLOR_BGR2GRAY)
34
 
35
  upper_motion = np.mean(cv2.absdiff(upper_curr, upper_prev))
36
  lower_motion = np.mean(cv2.absdiff(lower_curr, lower_prev))
37
 
38
- if upper_motion > 8 and lower_motion < 5:
39
- return True, "arm_only"
40
- elif lower_motion > 8:
41
- return True, "full_body"
 
 
 
 
42
  else:
43
- return False, "none"
 
44
 
 
 
 
 
 
 
 
45
 
46
- def get_activity(history):
47
  if len(history) < 5:
48
  return "DIGGING"
49
 
50
  dx = history[-1][0] - history[0][0]
51
  dy = history[-1][1] - history[0][1]
52
 
53
- if abs(dx) + abs(dy) < 10:
 
 
 
 
 
 
 
 
 
 
 
54
  return "WAITING"
55
- if dy > 15:
56
  return "DIGGING"
57
  if abs(dx) > abs(dy):
58
  return "SWINGING/LOADING"
59
- if dy < -15:
60
  return "DUMPING"
61
 
62
  return "WAITING"
@@ -65,9 +100,9 @@ def get_activity(history):
65
  def process_video(video_file, selected_classes):
66
 
67
  cap = cv2.VideoCapture(video_file)
68
- fps = cap.get(cv2.CAP_PROP_FPS) or 25
69
 
70
-
71
  output_video_path = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4").name
72
  fourcc = cv2.VideoWriter_fourcc(*"mp4v")
73
  out = cv2.VideoWriter(output_video_path, fourcc, fps, (640, 360))
@@ -75,10 +110,10 @@ def process_video(video_file, selected_classes):
75
  frame_id = 0
76
  prev_frame = None
77
 
78
- track_memory = {}
79
- track_stats = {}
80
- track_bbox = {}
81
- track_motion = {}
82
 
83
  selected_ids = [k for k, v in class_names.items() if v in selected_classes]
84
 
@@ -89,9 +124,7 @@ def process_video(video_file, selected_classes):
89
 
90
  frame_id += 1
91
 
92
- if frame_id % 2 != 0:
93
- continue
94
-
95
  frame = cv2.resize(frame, (640, 360))
96
 
97
  results = model(frame)[0]
@@ -118,42 +151,47 @@ def process_video(video_file, selected_classes):
118
  cy = (y1 + y2) / 2
119
 
120
  if track_id not in track_memory:
121
- track_memory[track_id] = []
122
- track_stats[track_id] = {"active": 0, "idle": 0}
123
- track_motion[track_id] = "none"
 
124
 
125
  track_memory[track_id].append((cx, cy))
126
- if len(track_memory[track_id]) > 10:
127
  track_memory[track_id].pop(0)
128
 
129
- track_bbox[track_id] = bbox
130
-
131
  if prev_frame is not None:
132
- is_active, motion_source = analyze_articulated_motion(frame, prev_frame, bbox)
 
 
133
  track_motion[track_id] = motion_source
134
  else:
135
- is_active = False
136
  motion_source = "none"
137
 
138
- activity = get_activity(track_memory[track_id])
 
 
 
139
 
140
- if activity == "WAITING" and not is_active:
141
  track_stats[track_id]["idle"] += 1
142
  else:
143
  track_stats[track_id]["active"] += 1
144
 
145
-
146
  ix1, iy1, ix2, iy2 = map(int, bbox)
147
  color = (0, 255, 0) if activity != "WAITING" else (0, 0, 255)
148
 
149
  cv2.rectangle(frame, (ix1, iy1), (ix2, iy2), color, 2)
150
-
151
- label = f"EX-{track_id} | {activity}"
152
- cv2.putText(frame, label, (ix1, iy1 - 8),
153
  cv2.FONT_HERSHEY_SIMPLEX, 0.45, color, 1)
154
-
155
- cv2.putText(frame, f"motion: {track_motion[track_id]}",
156
- (ix1, iy2 + 14),
157
  cv2.FONT_HERSHEY_SIMPLEX, 0.35, (255, 255, 0), 1)
158
 
159
  out.write(frame)
@@ -162,46 +200,54 @@ def process_video(video_file, selected_classes):
162
  cap.release()
163
  out.release()
164
 
165
-
 
 
166
  final_output = []
167
 
168
  for track_id, data in track_stats.items():
169
- track_id = int(track_id)
170
  active = data["active"]
171
- idle = data["idle"]
172
- total = active + idle
173
 
174
  active_sec = active / fps
175
- idle_sec = idle / fps
176
- total_sec = total / fps
177
 
178
  utilization = (active_sec / (total_sec + 1e-6)) * 100
179
 
180
- timestamp = f"00:00:{(frame_id / fps):06.3f}"
 
 
 
 
 
 
 
181
 
182
  result = {
183
  "frame_id": int(frame_id),
184
- "equipment_id": f"EX-{track_id:03d}",
185
  "equipment_class": "excavator",
186
  "timestamp": timestamp,
187
  "utilization": {
188
- "current_state": "ACTIVE" if active_sec > idle_sec else "IDLE",
189
- "current_activity": "DIGGING",
190
- "motion_source": track_motion.get(track_id, track_motion.get(str(track_id), "none"))
191
  },
192
  "time_analytics": {
193
- "total_tracked_seconds": round(total_sec, 1),
194
- "total_active_seconds": round(active_sec, 1),
195
- "total_idle_seconds": round(idle_sec, 1),
196
- "utilization_percent": round(utilization, 1)
197
  }
198
  }
199
-
200
  final_output.append(result)
201
 
202
  json_file = tempfile.NamedTemporaryFile(delete=False, suffix=".json").name
203
  with open(json_file, "w") as f:
204
- json.dump(final_output, f, indent=2)
205
 
206
  return output_video_path, json.dumps(final_output, indent=2), json_file
207
 
@@ -216,11 +262,11 @@ demo = gr.Interface(
216
  gr.CheckboxGroup(choices=list(class_names.values()), label="Select Classes")
217
  ],
218
  outputs=[
219
- gr.Video(label="๐Ÿ“น Processed Video"),
220
  gr.Textbox(label="JSON Output", lines=25),
221
  gr.File(label="โฌ‡๏ธ Download JSON")
222
  ],
223
- title="Equipment Utilization & Activity Classification Prototype"
224
  )
225
 
226
  demo.launch()
 
8
 
9
  model = YOLO("best.pt")
10
  class_names = model.names
11
+ tracker = DeepSort(max_age=30)
12
 
13
 
14
+ # ===================================================
15
+ # Articulated Motion โ€” ู†ู‚ุณู… ุงู„ู€ box ู„ู€ zones
16
+ # ===================================================
17
  def analyze_articulated_motion(frame, prev_frame, bbox):
18
+ """
19
+ ู†ู‚ุณู… ุงู„ู€ bounding box ู„ุฌุฒุฆูŠู†:
20
+ - upper zone = ุงู„ู€ arm / bucket (ููˆู‚)
21
+ - lower zone = ุงู„ู€ tracks / body (ุชุญุช)
22
+ ู„ูˆ ุงู„ุฌุฒุก ุงู„ุนู„ูˆูŠ ุจูŠุชุญุฑูƒ โ†’ ACTIVE ุญุชู‰ ู„ูˆ ุงู„ุฌุณู… ูˆุงู‚ู
23
+ """
24
  x1, y1, x2, y2 = map(int, bbox)
25
 
26
  x1 = max(0, x1)
 
29
  y2 = min(frame.shape[0], y2)
30
 
31
  h = y2 - y1
32
+ w = x2 - x1
33
+
34
+ if h < 20 or w < 20:
35
  return False, "none"
36
 
37
  mid_y = y1 + h // 2
38
 
39
+ # ุงู„ุฌุฒุก ุงู„ุนู„ูˆูŠ (ุงู„ู€ arm)
40
  upper_curr = cv2.cvtColor(frame[y1:mid_y, x1:x2], cv2.COLOR_BGR2GRAY)
41
  upper_prev = cv2.cvtColor(prev_frame[y1:mid_y, x1:x2], cv2.COLOR_BGR2GRAY)
42
 
43
+ # ุงู„ุฌุฒุก ุงู„ุณูู„ูŠ (ุงู„ู€ tracks)
44
  lower_curr = cv2.cvtColor(frame[mid_y:y2, x1:x2], cv2.COLOR_BGR2GRAY)
45
  lower_prev = cv2.cvtColor(prev_frame[mid_y:y2, x1:x2], cv2.COLOR_BGR2GRAY)
46
 
47
  upper_motion = np.mean(cv2.absdiff(upper_curr, upper_prev))
48
  lower_motion = np.mean(cv2.absdiff(lower_curr, lower_prev))
49
 
50
+ # threshold ู…ู†ุฎูุถ ุนุดุงู† ูŠูƒุชุดู ุญุฑูƒุฉ ุงู„ู€ arm ุงู„ุตุบูŠุฑุฉ
51
+ if upper_motion > 3:
52
+ if lower_motion < 3:
53
+ return True, "arm_only" # ุงู„ู€ arm ุจุณ ุจูŠุชุญุฑูƒ
54
+ else:
55
+ return True, "full_body" # ุงู„ุฌุณู… ูƒู„ู‡ ุจูŠุชุญุฑูƒ
56
+ elif lower_motion > 3:
57
+ return True, "full_body"
58
  else:
59
+ return False, "none" # ูˆุงู‚ู ุชู…ุงู…ุงู‹
60
+
61
 
62
+ # ===================================================
63
+ # Activity Classification
64
+ # ===================================================
65
+ def get_activity(history, is_active, motion_source):
66
+ # ู„ูˆ ู…ููŠุด motion ุฎุงู„ุต โ†’ WAITING
67
+ if not is_active and motion_source == "none":
68
+ return "WAITING"
69
 
70
+ # ู„ูˆ ุงู„ุชุงุฑูŠุฎ ู‚ุตูŠุฑ ุจุณ ููŠ motion โ†’ DIGGING
71
  if len(history) < 5:
72
  return "DIGGING"
73
 
74
  dx = history[-1][0] - history[0][0]
75
  dy = history[-1][1] - history[0][1]
76
 
77
+ # ู„ูˆ ุงู„ู€ arm ุจุณ ุจูŠุชุญุฑูƒ (ุฌุณู… ูˆุงู‚ู)
78
+ if motion_source == "arm_only":
79
+ if dy > 5:
80
+ return "DIGGING"
81
+ elif dy < -5:
82
+ return "DUMPING"
83
+ else:
84
+ return "DIGGING"
85
+
86
+ # ู„ูˆ ุงู„ุฌุณู… ุจูŠุชุญุฑูƒ
87
+ total_move = abs(dx) + abs(dy)
88
+ if total_move < 8:
89
  return "WAITING"
90
+ if dy > 10:
91
  return "DIGGING"
92
  if abs(dx) > abs(dy):
93
  return "SWINGING/LOADING"
94
+ if dy < -10:
95
  return "DUMPING"
96
 
97
  return "WAITING"
 
100
  def process_video(video_file, selected_classes):
101
 
102
  cap = cv2.VideoCapture(video_file)
103
+ fps = cap.get(cv2.CAP_PROP_FPS) or 24
104
 
105
+ # VideoWriter ู„ู†ุฑุฌุน ููŠุฏูŠูˆ ุจุงู„ู€ bounding boxes
106
  output_video_path = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4").name
107
  fourcc = cv2.VideoWriter_fourcc(*"mp4v")
108
  out = cv2.VideoWriter(output_video_path, fourcc, fps, (640, 360))
 
110
  frame_id = 0
111
  prev_frame = None
112
 
113
+ track_memory = {}
114
+ track_stats = {}
115
+ track_motion = {}
116
+ track_activity = {}
117
 
118
  selected_ids = [k for k, v in class_names.items() if v in selected_classes]
119
 
 
124
 
125
  frame_id += 1
126
 
127
+ # โœ… ู†ุดุชุบู„ ุนู„ู‰ ูƒู„ ุงู„ูุฑูŠู…ุงุช (ู…ุด ูƒู„ ูุฑูŠู… ุชุงู†ูŠ)
 
 
128
  frame = cv2.resize(frame, (640, 360))
129
 
130
  results = model(frame)[0]
 
151
  cy = (y1 + y2) / 2
152
 
153
  if track_id not in track_memory:
154
+ track_memory[track_id] = []
155
+ track_stats[track_id] = {"active": 0, "idle": 0}
156
+ track_motion[track_id] = "none"
157
+ track_activity[track_id] = "WAITING"
158
 
159
  track_memory[track_id].append((cx, cy))
160
+ if len(track_memory[track_id]) > 15:
161
  track_memory[track_id].pop(0)
162
 
163
+ # Zone-based articulated motion
 
164
  if prev_frame is not None:
165
+ is_active, motion_source = analyze_articulated_motion(
166
+ frame, prev_frame, bbox
167
+ )
168
  track_motion[track_id] = motion_source
169
  else:
170
+ is_active = False
171
  motion_source = "none"
172
 
173
+ activity = get_activity(
174
+ track_memory[track_id], is_active, motion_source
175
+ )
176
+ track_activity[track_id] = activity
177
 
178
+ if activity == "WAITING":
179
  track_stats[track_id]["idle"] += 1
180
  else:
181
  track_stats[track_id]["active"] += 1
182
 
183
+ # ุฑุณู… ุนู„ู‰ ุงู„ูุฑูŠู…
184
  ix1, iy1, ix2, iy2 = map(int, bbox)
185
  color = (0, 255, 0) if activity != "WAITING" else (0, 0, 255)
186
 
187
  cv2.rectangle(frame, (ix1, iy1), (ix2, iy2), color, 2)
188
+ cv2.putText(frame,
189
+ f"EX-{track_id} | {activity}",
190
+ (ix1, max(iy1 - 8, 10)),
191
  cv2.FONT_HERSHEY_SIMPLEX, 0.45, color, 1)
192
+ cv2.putText(frame,
193
+ f"motion: {motion_source}",
194
+ (ix1, min(iy2 + 14, frame.shape[0] - 4)),
195
  cv2.FONT_HERSHEY_SIMPLEX, 0.35, (255, 255, 0), 1)
196
 
197
  out.write(frame)
 
200
  cap.release()
201
  out.release()
202
 
203
+ # ===================================================
204
+ # Build JSON โ€” ูƒู„ ุงู„ู€ tracks โœ…
205
+ # ===================================================
206
  final_output = []
207
 
208
  for track_id, data in track_stats.items():
209
+ tid = int(track_id)
210
  active = data["active"]
211
+ idle = data["idle"]
212
+ total = active + idle
213
 
214
  active_sec = active / fps
215
+ idle_sec = idle / fps
216
+ total_sec = total / fps
217
 
218
  utilization = (active_sec / (total_sec + 1e-6)) * 100
219
 
220
+ total_video_sec = frame_id / fps
221
+ mm = int(total_video_sec // 60)
222
+ ss = total_video_sec % 60
223
+ timestamp = f"00:{mm:02d}:{ss:06.3f}"
224
+
225
+ last_activity = track_activity.get(track_id, "WAITING")
226
+ motion_src = track_motion.get(track_id,
227
+ track_motion.get(str(track_id), "none"))
228
 
229
  result = {
230
  "frame_id": int(frame_id),
231
+ "equipment_id": f"EX-{tid:03d}",
232
  "equipment_class": "excavator",
233
  "timestamp": timestamp,
234
  "utilization": {
235
+ "current_state": "ACTIVE" if active_sec > idle_sec else "IDLE",
236
+ "current_activity": last_activity,
237
+ "motion_source": motion_src
238
  },
239
  "time_analytics": {
240
+ "total_tracked_seconds": round(total_sec, 1),
241
+ "total_active_seconds": round(active_sec, 1),
242
+ "total_idle_seconds": round(idle_sec, 1),
243
+ "utilization_percent": round(utilization, 1)
244
  }
245
  }
 
246
  final_output.append(result)
247
 
248
  json_file = tempfile.NamedTemporaryFile(delete=False, suffix=".json").name
249
  with open(json_file, "w") as f:
250
+ json.dump(final_output, f, indent=2)
251
 
252
  return output_video_path, json.dumps(final_output, indent=2), json_file
253
 
 
262
  gr.CheckboxGroup(choices=list(class_names.values()), label="Select Classes")
263
  ],
264
  outputs=[
265
+ gr.Video(label="๐Ÿ“น Processed Video"),
266
  gr.Textbox(label="JSON Output", lines=25),
267
  gr.File(label="โฌ‡๏ธ Download JSON")
268
  ],
269
+ title="๐Ÿšœ Excavator Activity Analyzer"
270
  )
271
 
272
  demo.launch()