UrbanFlow / backend /engine.py
Subh775's picture
heavy add-ons: 13+ features implemented
d9ebe88
raw
history blame
10.5 kB
import os
import time
import tempfile
import threading
import queue
import numpy as np
import cv2
from collections import defaultdict
from constants import MODEL_CLASSES
from tracker_config import get_tracker_path
from speed import estimate_speeds
from pcu import compute_pcu
def _side(p, a, b):
return np.sign((b[0] - a[0]) * (p[1] - a[1]) - (b[1] - a[1]) * (p[0] - a[0]))
def _point_to_segment_dist(px, py, ax, ay, bx, by):
A = np.array([ax, ay], dtype=float)
B = np.array([bx, by], dtype=float)
P = np.array([px, py], dtype=float)
AB = B - A
t = np.clip(np.dot(P - A, AB) / np.dot(AB, AB), 0, 1)
return np.linalg.norm(P - (A + t * AB))
# Lightweight drawing colors (BGR for OpenCV)
_CLR_BOX = (230, 180, 50) # teal-ish
_CLR_LINE = (80, 220, 100) # green
_CLR_TEXT_BG = (30, 30, 30) # dark bg for text
class ThreadedVideoWriter:
def __init__(self, path, fps, size):
self.path = path
self.fps = fps
self.size = size
self.queue = queue.Queue(maxsize=128)
self.stopped = False
self.writer = cv2.VideoWriter(path, cv2.VideoWriter_fourcc(*"mp4v"), fps, size)
self.thread = threading.Thread(target=self._run, daemon=True)
self.thread.start()
def _run(self):
while not self.stopped or not self.queue.empty():
try:
frame = self.queue.get(timeout=1.0)
if frame is not None:
self.writer.write(frame)
self.queue.task_done()
except queue.Empty:
continue
self.writer.release()
print(f"[BACKEND] Threaded writer finished: {self.path}")
def write(self, frame):
if not self.stopped:
try:
# If queue is full, we might want to wait or drop, but here we'll wait
# to ensure the export video is complete and accurate.
self.queue.put(frame.copy())
except Exception as e:
print(f"[BACKEND] Writer queue error: {e}")
def stop(self):
self.stopped = True
self.thread.join()
def _draw_annotations(frame, boxes, ids, clses, line_pts, options):
"""Draw bounding boxes, track IDs, and counting line on frame in-place."""
# Counting line (Spatial Boundary)
if options.get("spatial", True):
cv2.line(frame, tuple(line_pts[0]), tuple(line_pts[1]), _CLR_LINE, 3, cv2.LINE_AA)
if boxes is not None and ids is not None and clses is not None:
for box, obj_id, cls_idx in zip(boxes, ids, clses):
x1, y1, x2, y2 = map(int, box)
# Bounding Box
if options.get("bbox", True):
cv2.rectangle(frame, (x1, y1), (x2, y2), _CLR_BOX, 2)
# Labels
labels = []
if options.get("track_id", True):
labels.append(f"ID:{int(obj_id)}")
if options.get("class_name", True):
labels.append(MODEL_CLASSES.get(int(cls_idx), "Unknown"))
elif options.get("class_id", False):
labels.append(f"C:{int(cls_idx)}")
if labels:
label_text = " | ".join(labels)
(tw, th), _ = cv2.getTextSize(label_text, cv2.FONT_HERSHEY_SIMPLEX, 0.4, 1)
cv2.rectangle(frame, (x1, y1 - th - 6), (x1 + tw + 6, y1), _CLR_TEXT_BG, -1)
cv2.putText(frame, label_text, (x1 + 3, y1 - 4), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 255), 1, cv2.LINE_AA)
def run(model, video_path, line, config, on_frame, save_annotated=False, annotated_options=None):
"""
Runs YOLO tracking on video. Calls on_frame(update_dict) after each processed frame.
line: [[x1,y1], [x2,y2]]
save_annotated: if True, writes annotated MP4 with boxes + IDs + counting line
annotated_options: dict of toggleable visual overlays
"""
if annotated_options is None:
annotated_options = {"bbox": True, "track_id": True, "spatial": True}
# Force bbox to True if export is enabled (user requirement)
if save_annotated:
annotated_options["bbox"] = True
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
out_w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
out_h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
cap.release()
# Dynamic crossing threshold: 5% of frame height, min 40px
cross_dist = max(40, int(out_h * 0.05))
stride = config["detect_stride"]
total_iters = total // stride
# Annotated video writer (temp directory — auto-cleaned on container shutdown)
annotated_path = None
writer = None
if save_annotated:
annotated_dir = os.path.join(tempfile.gettempdir(), "funky_reports")
os.makedirs(annotated_dir, exist_ok=True)
annotated_path = os.path.join(annotated_dir, f"annotated_{os.path.basename(video_path)}.mp4")
writer_fps = max(1.0, fps / stride)
writer = ThreadedVideoWriter(annotated_path, writer_fps, (out_w, out_h))
prev_side = {}
counted_ids = set()
class_in = defaultdict(int)
class_out = defaultdict(int)
congestion = []
flow_times = []
conf_scores = []
heatmap_points = []
track_positions = defaultdict(list)
raw_events = [["frame_index", "timestamp_sec", "vehicle_id", "class_name", "direction"]]
start = time.time()
# https://docs.ultralytics.com/modes/predict/#inference-sources
# https://docs.ultralytics.com/modes/track/#why-choose-ultralytics-yolo-for-object-tracking
# ExecuTorch: https://docs.ultralytics.com/integrations/executorch/#what-are-the-system-requirements-for-executorch-export
results = model.track(
source=video_path,
tracker=get_tracker_path(),
imgsz=736, # MUST match OpenVINO export imgsz — compiled graph is fixed shape
conf=config.get("conf", 0.12),
iou=config.get("iou", 0.6),
vid_stride=stride,
stream=True,
verbose=False,
persist=False, # MUST be False — True causes ByteTracker state leak across runs
batch=2 # MUST match OpenVINO export batch size
)
a = line[0]
b = line[1]
iterator = iter(enumerate(results))
while True:
try:
frame_idx, r = next(iterator)
except StopIteration:
break
except RuntimeError as e:
if "incompatible" in str(e) and "shape=" in str(e):
print(f"[BACKEND] Ignored OpenVINO shape mismatch on final trailing batch.")
break
raise e
active = 0
cur_boxes = None
cur_ids = None
if r.boxes.id is not None:
ids = r.boxes.id.cpu().numpy()
cls = r.boxes.cls.cpu().numpy()
xyxy = r.boxes.xyxy.cpu().numpy()
active = len(ids)
confs = r.boxes.conf.cpu().numpy().tolist()
conf_scores.extend(confs)
cur_boxes = xyxy
cur_ids = ids
for obj_id, c, box in zip(ids, cls, xyxy):
cx = int((box[0] + box[2]) / 2)
cy = int((box[1] + box[3]) / 2)
heatmap_points.append([cx, cy])
track_positions[obj_id].append((frame_idx, cx, cy))
current = _side((cx, cy), a, b)
# Skip if centroid is exactly on the line (cross-product == 0)
# — avoids misfired crossings due to floating-point boundary hits
if current == 0:
continue
if obj_id in prev_side and obj_id not in counted_ids:
if prev_side[obj_id] != current:
dist = _point_to_segment_dist(cx, cy, a[0], a[1], b[0], b[1])
if dist < cross_dist:
t = frame_idx * stride / fps
flow_times.append(round(t, 2))
if current > 0:
class_in[int(c)] += 1
raw_events.append([frame_idx + 1, round(t, 2), int(obj_id), MODEL_CLASSES.get(int(c), f"cls_{int(c)}"), "IN"])
else:
class_out[int(c)] += 1
raw_events.append([frame_idx + 1, round(t, 2), int(obj_id), MODEL_CLASSES.get(int(c), f"cls_{int(c)}"), "OUT"])
counted_ids.add(obj_id)
prev_side[obj_id] = current
# Write annotated frame
cur_clses = cls if r.boxes.id is not None else None
if writer is not None:
frame = r.orig_img.copy()
_draw_annotations(frame, cur_boxes, cur_ids, cur_clses, [a, b], annotated_options)
writer.write(frame)
congestion.append(active)
elapsed = time.time() - start
update = {
"frame_index": frame_idx + 1,
"total_iters": total_iters,
"total_frames": total,
"active": active,
"congestion": congestion.copy(),
"class_in": {str(k): v for k, v in class_in.items()},
"class_out": {str(k): v for k, v in class_out.items()},
"flow_times": flow_times.copy(),
"elapsed": round(elapsed, 2),
"fps": round((frame_idx + 1) / elapsed, 2) if elapsed > 0 else 0,
}
on_frame(update)
if writer is not None:
writer.stop()
processing_time = round(time.time() - start, 2)
actual_fps = round(total / processing_time, 2) if processing_time > 0 else 0
speed_vs_rt = round(actual_fps / fps, 2) if fps > 0 else 0
speed_data = estimate_speeds(dict(track_positions))
pcu_data = compute_pcu(
{str(k): v for k, v in class_in.items()},
{str(k): v for k, v in class_out.items()},
)
result = {
"class_in": dict(class_in),
"class_out": dict(class_out),
"congestion": congestion,
"flow_times": flow_times,
"conf_scores": conf_scores,
"heatmap_points": heatmap_points,
"raw_events": raw_events,
"processing_time": processing_time,
"actual_fps": actual_fps,
"speed_vs_realtime": speed_vs_rt,
"speed": speed_data,
"pcu": pcu_data,
}
if annotated_path and os.path.exists(annotated_path):
result["annotated_video"] = annotated_path
return result