import os import time import tempfile import numpy as np import cv2 from collections import defaultdict def _side(p, a, b): return np.sign((b[0] - a[0]) * (p[1] - a[1]) - (b[1] - a[1]) * (p[0] - a[0])) def _point_to_segment_dist(px, py, ax, ay, bx, by): A = np.array([ax, ay], dtype=float) B = np.array([bx, by], dtype=float) P = np.array([px, py], dtype=float) AB = B - A t = np.clip(np.dot(P - A, AB) / np.dot(AB, AB), 0, 1) return np.linalg.norm(P - (A + t * AB)) # Lightweight drawing colors (BGR for OpenCV) _CLR_BOX = (230, 180, 50) # teal-ish _CLR_LINE = (80, 220, 100) # green _CLR_TEXT_BG = (30, 30, 30) # dark bg for text def _draw_annotations(frame, boxes, ids, line_pts): """Draw bounding boxes, track IDs, and counting line on frame in-place.""" # Counting line cv2.line(frame, tuple(line_pts[0]), tuple(line_pts[1]), _CLR_LINE, 3, cv2.LINE_AA) if boxes is not None and ids is not None: for box, obj_id in zip(boxes, ids): x1, y1, x2, y2 = map(int, box) cv2.rectangle(frame, (x1, y1), (x2, y2), _CLR_BOX, 2) label = f"ID:{int(obj_id)}" (tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.4, 1) cv2.rectangle(frame, (x1, y1 - th - 6), (x1 + tw + 6, y1), _CLR_TEXT_BG, -1) cv2.putText(frame, label, (x1 + 3, y1 - 4), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 255), 1, cv2.LINE_AA) def run(model, video_path, line, config, on_frame, save_annotated=False): """ Runs YOLO tracking on video. Calls on_frame(update_dict) after each processed frame. line: [[x1,y1], [x2,y2]] save_annotated: if True, writes annotated MP4 with boxes + IDs + counting line """ cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) out_w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) out_h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) cap.release() stride = config["detect_stride"] total_iters = total // stride # Annotated video writer (temp directory — auto-cleaned on container shutdown) annotated_path = None writer = None if save_annotated: annotated_dir = os.path.join(tempfile.gettempdir(), "funky_reports") os.makedirs(annotated_dir, exist_ok=True) annotated_path = os.path.join(annotated_dir, "annotated.mp4") fourcc = cv2.VideoWriter_fourcc(*"mp4v") writer = cv2.VideoWriter(annotated_path, fourcc, fps / stride, (out_w, out_h)) prev_side = {} counted_ids = set() class_in = defaultdict(int) class_out = defaultdict(int) congestion = [] flow_times = [] conf_scores = [] start = time.time() results = model.track( source=video_path, tracker="bytetrack.yaml", imgsz=config["imgsz"], conf=config.get("conf", 0.12), iou=config.get("iou", 0.6), vid_stride=stride, stream=True, verbose=False, persist=True ) a = line[0] b = line[1] for frame_idx, r in enumerate(results): active = 0 cur_boxes = None cur_ids = None if r.boxes.id is not None: ids = r.boxes.id.cpu().numpy() cls = r.boxes.cls.cpu().numpy() xyxy = r.boxes.xyxy.cpu().numpy() active = len(ids) confs = r.boxes.conf.cpu().numpy().tolist() conf_scores.extend(confs) cur_boxes = xyxy cur_ids = ids for obj_id, c, box in zip(ids, cls, xyxy): cx = int((box[0] + box[2]) / 2) cy = int((box[1] + box[3]) / 2) current = _side((cx, cy), a, b) if obj_id in prev_side and obj_id not in counted_ids: if prev_side[obj_id] != current: dist = _point_to_segment_dist(cx, cy, a[0], a[1], b[0], b[1]) if dist < 12: t = frame_idx * stride / fps flow_times.append(round(t, 2)) if current > 0: class_in[int(c)] += 1 else: class_out[int(c)] += 1 counted_ids.add(obj_id) prev_side[obj_id] = current # Write annotated frame (only for frames we already process) if writer is not None: frame = r.orig_img.copy() _draw_annotations(frame, cur_boxes, cur_ids, [a, b]) writer.write(frame) congestion.append(active) elapsed = time.time() - start update = { "frame_index": frame_idx + 1, "total_iters": total_iters, "total_frames": total, "active": active, "congestion": congestion.copy(), "class_in": {str(k): v for k, v in class_in.items()}, "class_out": {str(k): v for k, v in class_out.items()}, "flow_times": flow_times.copy(), "elapsed": round(elapsed, 2), "fps": round((frame_idx + 1) / elapsed, 2) if elapsed > 0 else 0, } on_frame(update) if writer is not None: writer.release() processing_time = round(time.time() - start, 2) actual_fps = round(config["frames"] / processing_time, 2) if processing_time > 0 else 0 speed_vs_rt = round(actual_fps / fps, 2) if fps > 0 else 0 result = { "class_in": dict(class_in), "class_out": dict(class_out), "congestion": congestion, "flow_times": flow_times, "conf_scores": conf_scores, "processing_time": processing_time, "actual_fps": actual_fps, "speed_vs_realtime": speed_vs_rt, } if annotated_path and os.path.exists(annotated_path): result["annotated_video"] = annotated_path return result