""" realtime_engine_v2.py — Advanced Real-Time Engine with Movement-Level Control Features: - Movement classification (Left/Straight/Right per lane) - Phase-based signal control (4 conflict-free phases) - User priority injection (boost specific routes) - Max-pressure phase decision algorithm - Real-time movement metrics """ import cv2 import numpy as np import threading import time from collections import deque import warnings warnings.filterwarnings('ignore') from movement_detector import HybridMovementDetector from phase_controller import PhaseManager # ───────────────────────────────────────────────────────────────── # Video Stream Generator (from v1 - unchanged) # ───────────────────────────────────────────────────────────────── class VideoStreamGenerator: """Generator that loops a video infinitely""" def __init__(self, video_path: str, resize_width: int = 640, resize_height: int = 360): self.video_path = video_path self.resize_width = resize_width self.resize_height = resize_height self.frame_count = 0 self.cap = None self._init_capture() def _init_capture(self): self.cap = cv2.VideoCapture(self.video_path) if not self.cap.isOpened(): raise RuntimeError(f"Cannot open video: {self.video_path}") def __iter__(self): return self def __next__(self): while True: ret, frame = self.cap.read() if not ret: self.cap.set(cv2.CAP_PROP_POS_FRAMES, 0) continue frame = cv2.resize(frame, (self.resize_width, self.resize_height)) self.frame_count += 1 return frame, self.frame_count # ───────────────────────────────────────────────────────────────── # Fast YOLO Detector (from v1 - unchanged) # ───────────────────────────────────────────────────────────────── class FastYOLODetector: """ YOLOv8 detector tuned for dense Indian traffic (high two-wheeler density). Key fixes vs original: - Motorcycle aspect ratio check now uses OR logic (wide OR large area OR tall-but-confident) - Bicycle accepts aspect ratios from 0.35 (covers head-on narrow bikes) - Height caps raised to 180px/160px (closer cameras show taller bounding boxes) - Added soft NMS via IoU deduplication to remove ghost detections - Class-specific confidence done AFTER YOLO inference at 0.05 (unchanged, correct) """ def __init__(self, model_path: str = "yolov8n.pt", skip_frames: int = 2, conf: float = 0.5): self.model_path = model_path self.skip_frames = skip_frames self.conf = conf self.frame_counter = 0 self.last_detections = None # YOLO COCO classes self.vehicle_classes = [2, 5, 7] # Car, Bus, Truck self.two_wheeler_classes = [1, 3] # Bicycle, Motorcycle self.person_class = 0 # MUST EXCLUDE # Per-class confidence thresholds (applied after YOLO runs at 0.05) self.conf_car = 0.25 self.conf_two_wheeler = 0.12 # Very lenient — dense traffic, small boxes self.conf_bus_truck = 0.30 # Minimum bounding box areas (pixels²) — small values intentional self.min_area_motorcycle = 5 self.min_area_bicycle = 4 self.min_area_car = 12 self.min_area_bus_truck = 25 # FIX #1: Raised height caps — original 120/100 was too strict for close cameras self.max_height_motorcycle = 180 self.max_height_bicycle = 160 # IoU threshold for deduplication self.nms_iou_threshold = 0.45 try: from ultralytics import YOLO self.model = YOLO(model_path, verbose=False) except Exception as e: raise RuntimeError(f"Cannot load YOLOv8: {e}") # ------------------------------------------------------------------ def _box_stats(self, box): """Return (area, aspect_ratio w/h, height) from a YOLO box.""" w, h = float(box.xywh[0][2]), float(box.xywh[0][3]) area = w * h ar = w / h if h > 0 else 1.0 return area, ar, h # ------------------------------------------------------------------ def _is_valid_motorcycle(self, ar: float, area: float, height: float, conf: float) -> bool: """ FIX #1 — original used AND logic that was too strict. A motorcycle is valid if ANY of these are true: a) Wide box (ar >= 0.6) — side view, typical b) Large enough area (area >= 80) — even a tall narrow box is probably real c) High confidence (conf >= 0.35) regardless of shape — model is sure d) NOT a person: people are very tall and narrow (ar < 0.4) AND small area Person rejection: only reject if BOTH ar < 0.4 AND area < 60 (people at a distance are small AND narrow; motorcycles are rarely both) """ # Hard reject: looks like a distant person silhouette if ar < 0.4 and area < 60: return False # Hard reject: implausibly tall (taller than 1.8× the max camera height cap) if height > self.max_height_motorcycle * 1.8: return False # Minimum area if area < self.min_area_motorcycle: return False # Accept if wide OR large OR confident if ar >= 0.6 or area >= 80 or conf >= 0.35: return True # Edge case: small, narrow, low-conf — skip return False # ------------------------------------------------------------------ def _is_valid_bicycle(self, ar: float, area: float, height: float, conf: float) -> bool: """ FIX #2 — original lower bound of 0.5 cut head-on bicycles (ar ~0.35). Accept if: - ar >= 0.35 (head-on narrow is OK) - area >= min_area_bicycle - NOT person silhouette: ar < 0.35 AND area < 50 AND conf < 0.3 """ if ar < 0.35 and area < 50 and conf < 0.3: return False # Looks like a very narrow distant person if height > self.max_height_bicycle * 1.8: return False if area < self.min_area_bicycle: return False return 0.35 <= ar <= 2.5 # ------------------------------------------------------------------ def _should_keep(self, cls_id: int, conf: float, area: float, ar: float, height: float) -> bool: """Master gate: exclude people, apply per-class rules.""" if cls_id == self.person_class: return False if cls_id not in self.vehicle_classes and cls_id not in self.two_wheeler_classes: return False if cls_id == 2: # Car return conf >= self.conf_car and area >= self.min_area_car if cls_id in [5, 7]: # Bus, Truck return conf >= self.conf_bus_truck and area >= self.min_area_bus_truck if cls_id == 3: # Motorcycle return conf >= self.conf_two_wheeler and self._is_valid_motorcycle(ar, area, height, conf) if cls_id == 1: # Bicycle return conf >= self.conf_two_wheeler and self._is_valid_bicycle(ar, area, height, conf) return False # ------------------------------------------------------------------ def _nms_deduplicate(self, detections: list) -> list: """ FIX #3 — Remove overlapping detections of the same class. Simple IoU-based NMS (already sorted by confidence descending). """ if not detections: return detections kept = [] used = [False] * len(detections) for i, d in enumerate(detections): if used[i]: continue kept.append(d) x1i, y1i = d['x'] - d['w'] / 2, d['y'] - d['h'] / 2 x2i, y2i = d['x'] + d['w'] / 2, d['y'] + d['h'] / 2 for j in range(i + 1, len(detections)): if used[j]: continue e = detections[j] # Only suppress same class if e['class'] != d['class']: continue x1j, y1j = e['x'] - e['w'] / 2, e['y'] - e['h'] / 2 x2j, y2j = e['x'] + e['w'] / 2, e['y'] + e['h'] / 2 ix1, iy1 = max(x1i, x1j), max(y1i, y1j) ix2, iy2 = min(x2i, x2j), min(y2i, y2j) inter = max(0, ix2 - ix1) * max(0, iy2 - iy1) union = (x2i - x1i) * (y2i - y1i) + (x2j - x1j) * (y2j - y1j) - inter if union > 0 and inter / union > self.nms_iou_threshold: used[j] = True return kept # ------------------------------------------------------------------ def detect(self, frame: np.ndarray, lane_region: tuple = None) -> list: """ Detect vehicles. Returns list of dicts with keys: x, y, w, h, conf, class, size, is_two_wheeler, aspect_ratio, height """ self.frame_counter += 1 if self.frame_counter % self.skip_frames != 0: return self.last_detections if self.last_detections is not None else [] try: results = self.model(frame, verbose=False, conf=0.05) detections = [] for result in results: for box in result.boxes: cls_id = int(box.cls) conf = float(box.conf) area, ar, height = self._box_stats(box) if not self._should_keep(cls_id, conf, area, ar, height): continue x, y = int(box.xywh[0][0]), int(box.xywh[0][1]) w, h = float(box.xywh[0][2]), float(box.xywh[0][3]) if lane_region: x1r, y1r, x2r, y2r = lane_region if not (x1r <= x <= x2r and y1r <= y <= y2r): continue detections.append({ 'x': x, 'y': y, 'w': w, 'h': h, 'conf': min(conf, 1.0), 'class': cls_id, 'size': area, 'is_two_wheeler': cls_id in self.two_wheeler_classes, 'aspect_ratio': ar, 'height': height, }) # Sort by confidence descending, then deduplicate detections.sort(key=lambda d: d['conf'], reverse=True) detections = self._nms_deduplicate(detections) # Cap at 120 (raised from 100 to handle dense scenes) self.last_detections = detections[:120] return self.last_detections except Exception: return self.last_detections if self.last_detections is not None else [] # ───────────────────────────────────────────────────────────────── # Movement Queue Tracker (UPGRADED) # ───────────────────────────────────────────────────────────────── class MovementQueueTracker: """Track queues per movement (left/straight/right)""" def __init__(self, window_size: int = 10): self.window_size = window_size self.history = { 'N': {'left': deque(maxlen=window_size), 'straight': deque(maxlen=window_size), 'right': deque(maxlen=window_size)}, 'S': {'left': deque(maxlen=window_size), 'straight': deque(maxlen=window_size), 'right': deque(maxlen=window_size)}, 'E': {'left': deque(maxlen=window_size), 'straight': deque(maxlen=window_size), 'right': deque(maxlen=window_size)}, 'W': {'left': deque(maxlen=window_size), 'straight': deque(maxlen=window_size), 'right': deque(maxlen=window_size)}, } def update(self, raw_counts: dict) -> dict: """ Update with new counts Args: raw_counts: { 'N': {'left': int, 'straight': int, 'right': int}, ... } Returns: Smoothed counts (rolling average) """ for lane in ['N', 'S', 'E', 'W']: for movement in ['left', 'straight', 'right']: count = raw_counts.get(lane, {}).get(movement, 0) self.history[lane][movement].append(count) # Return rolling average smoothed = {} for lane in ['N', 'S', 'E', 'W']: smoothed[lane] = {} for movement in ['left', 'straight', 'right']: if len(self.history[lane][movement]) > 0: avg = np.mean(list(self.history[lane][movement])) smoothed[lane][movement] = int(avg) else: smoothed[lane][movement] = 0 return smoothed # ───────────────────────────────────────────────────────────────── # Parallel Stream Reader (from v1 - unchanged) # ───────────────────────────────────────────────────────────────── class ParallelStreamReader: """Read frames from 4 video streams in parallel""" def __init__(self, video_paths: dict): self.streams = { lane: VideoStreamGenerator(path) for lane, path in video_paths.items() } self.frames = {'N': None, 'S': None, 'E': None, 'W': None} self.frame_numbers = {'N': 0, 'S': 0, 'E': 0, 'W': 0} self.running = False self.threads = {} def start(self): self.running = True for lane in self.streams: thread = threading.Thread(target=self._read_stream, args=(lane,), daemon=True) thread.start() self.threads[lane] = thread def _read_stream(self, lane: str): for frame, frame_num in self.streams[lane]: if not self.running: break self.frames[lane] = frame self.frame_numbers[lane] = frame_num def get_frames(self) -> dict: return self.frames.copy() def stop(self): self.running = False for thread in self.threads.values(): thread.join(timeout=1) # ───────────────────────────────────────────────────────────────── # User Priority Manager (NEW) # ───────────────────────────────────────────────────────────────── class UserPriorityManager: """ Boost queues for user-selected route Makes system optimize for user's path """ def __init__(self, priority_boost: float = 3.0): self.priority_boost = priority_boost self.user_from = None self.user_direction = None def set_user_route(self, from_lane: str, direction: str): """ Set user's desired route Args: from_lane: 'N', 'S', 'E', or 'W' direction: 'left', 'straight', or 'right' """ self.user_from = from_lane self.user_direction = direction def apply_priority(self, counts: dict) -> dict: """ Boost queue count for user's route Args: counts: Movement queue counts Returns: Modified counts with user boost applied """ if self.user_from is None or self.user_direction is None: return counts # No user preference # Deep copy boosted = { lane: { movement: count for movement, count in counts[lane].items() } for lane in counts } # Apply boost boosted[self.user_from][self.user_direction] += self.priority_boost return boosted # ───────────────────────────────────────────────────────────────── # Real-Time Decision Engine V2 # ───────────────────────────────────────────────────────────────── class RealtimeDecisionEngineV2: """ Advanced engine with movement-level control """ def __init__(self, video_paths: dict, skip_frames: int = 2): """ Args: video_paths: {'N': path, 'S': path, 'E': path, 'W': path} skip_frames: Frame skip rate """ self.stream_reader = ParallelStreamReader(video_paths) self.detector = FastYOLODetector(skip_frames=skip_frames) self.movement_detector = HybridMovementDetector(use_optical_flow=False) self.queue_tracker = MovementQueueTracker(window_size=10) self.phase_manager = PhaseManager() self.user_priority = UserPriorityManager() # Lane regions self.lane_regions = { 'N': (0, 0, 640, 180), 'S': (0, 180, 640, 360), 'E': None, 'W': None, } # Metrics self.metrics = { 'frame_count': 0, 'detection_time': 0, 'movement_time': 0, 'decision_time': 0, 'current_phase': 'NS_STRAIGHT', 'movement_counts': {lane: {'left': 0, 'straight': 0, 'right': 0} for lane in ['N', 'S', 'E', 'W']}, 'signal_state': {}, } def process_frame(self) -> dict: """ Process single frame with movement-level control """ start_time = time.time() # 1. Get frames raw_frames = self.stream_reader.get_frames() # 2. Detect vehicles detection_start = time.time() detections = {} for lane, frame in raw_frames.items(): if frame is not None: region = self.lane_regions.get(lane) detections[lane] = self.detector.detect(frame, region) else: detections[lane] = [] self.metrics['detection_time'] = time.time() - detection_start # 3. Classify movements movement_start = time.time() raw_movements = {} for lane, frame in raw_frames.items(): if frame is not None: raw_movements[lane] = self.movement_detector.get_movements( frame, detections[lane], lane ) else: raw_movements[lane] = {'left': 0, 'straight': 0, 'right': 0} self.metrics['movement_time'] = time.time() - movement_start # 4. Smooth movements smoothed_movements = self.queue_tracker.update(raw_movements) # 5. Apply user priority (if set) prioritized_movements = self.user_priority.apply_priority(smoothed_movements) self.metrics['movement_counts'] = smoothed_movements.copy() # 6. Make phase decision decision_start = time.time() phase = self.phase_manager.decide_phase(prioritized_movements) signal_state = self.phase_manager.get_signal_state() self.metrics['decision_time'] = time.time() - decision_start self.metrics['current_phase'] = phase.value self.metrics['signal_state'] = signal_state self.metrics['frame_count'] += 1 return { 'phase': phase.value, 'movements': smoothed_movements, 'signal_state': signal_state, 'raw_movements': raw_movements, 'metrics': self.metrics.copy(), 'total_time_ms': (time.time() - start_time) * 1000, } def set_user_route(self, from_lane: str, direction: str): """Set user's desired route for priority""" self.user_priority.set_user_route(from_lane, direction) def start(self): """Start engine""" self.stream_reader.start() def stop(self): """Stop engine""" self.stream_reader.stop() def get_metrics(self) -> dict: """Get current metrics""" return self.metrics.copy() # ───────────────────────────────────────────────────────────────── # Test/Standalone Usage # ───────────────────────────────────────────────────────────────── if __name__ == "__main__": print("🚦 Real-Time Decision Engine V2 (Movement-Level)") print("=" * 60) video_paths = { 'N': 'test_video_N.mp4', 'S': 'test_video_S.mp4', 'E': 'test_video_E.mp4', 'W': 'test_video_W.mp4', } # Initialize engine engine = RealtimeDecisionEngineV2(video_paths, skip_frames=2) engine.start() # Set user preference (optional) # engine.set_user_route(from_lane='N', direction='straight') print("✓ Engine started. Processing frames...") print("=" * 60) # Run for 10 seconds start = time.time() frame_count = 0 while time.time() - start < 10: result = engine.process_frame() frame_count += 1 if frame_count % 10 == 0: print(f"\nFrame {result['metrics']['frame_count']}") movements = result['movements'] print(f" Movements:") for lane in ['N', 'S', 'E', 'W']: print(f" {lane}: L={movements[lane]['left']} S={movements[lane]['straight']} R={movements[lane]['right']}") print(f" Phase: {result['phase']}") print(f" Time: {result['total_time_ms']:.1f}ms") time.sleep(0.05) engine.stop() print("\n✓ Engine stopped")