Spaces:
Sleeping
Sleeping
| """ | |
| realtime_engine_v2.py β Advanced Real-Time Engine with Movement-Level Control | |
| Features: | |
| - Movement classification (Left/Straight/Right per lane) | |
| - Phase-based signal control (4 conflict-free phases) | |
| - User priority injection (boost specific routes) | |
| - Max-pressure phase decision algorithm | |
| - Real-time movement metrics | |
| """ | |
| import cv2 | |
| import numpy as np | |
| import threading | |
| import time | |
| from collections import deque | |
| import warnings | |
| warnings.filterwarnings('ignore') | |
| from movement_detector import HybridMovementDetector | |
| from phase_controller import PhaseManager | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Video Stream Generator (from v1 - unchanged) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class VideoStreamGenerator: | |
| """Generator that loops a video infinitely""" | |
| def __init__(self, video_path: str, resize_width: int = 640, resize_height: int = 360): | |
| self.video_path = video_path | |
| self.resize_width = resize_width | |
| self.resize_height = resize_height | |
| self.frame_count = 0 | |
| self.cap = None | |
| self._init_capture() | |
| def _init_capture(self): | |
| self.cap = cv2.VideoCapture(self.video_path) | |
| if not self.cap.isOpened(): | |
| raise RuntimeError(f"Cannot open video: {self.video_path}") | |
| def __iter__(self): | |
| return self | |
| def __next__(self): | |
| while True: | |
| ret, frame = self.cap.read() | |
| if not ret: | |
| self.cap.set(cv2.CAP_PROP_POS_FRAMES, 0) | |
| continue | |
| frame = cv2.resize(frame, (self.resize_width, self.resize_height)) | |
| self.frame_count += 1 | |
| return frame, self.frame_count | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Fast YOLO Detector (from v1 - unchanged) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class FastYOLODetector: | |
| """ | |
| YOLOv8 detector tuned for dense Indian traffic (high two-wheeler density). | |
| Key fixes vs original: | |
| - Motorcycle aspect ratio check now uses OR logic (wide OR large area OR tall-but-confident) | |
| - Bicycle accepts aspect ratios from 0.35 (covers head-on narrow bikes) | |
| - Height caps raised to 180px/160px (closer cameras show taller bounding boxes) | |
| - Added soft NMS via IoU deduplication to remove ghost detections | |
| - Class-specific confidence done AFTER YOLO inference at 0.05 (unchanged, correct) | |
| """ | |
| def __init__(self, model_path: str = "yolov8n.pt", skip_frames: int = 2, conf: float = 0.5): | |
| self.model_path = model_path | |
| self.skip_frames = skip_frames | |
| self.conf = conf | |
| self.frame_counter = 0 | |
| self.last_detections = None | |
| # YOLO COCO classes | |
| self.vehicle_classes = [2, 5, 7] # Car, Bus, Truck | |
| self.two_wheeler_classes = [1, 3] # Bicycle, Motorcycle | |
| self.person_class = 0 # MUST EXCLUDE | |
| # Per-class confidence thresholds (applied after YOLO runs at 0.05) | |
| self.conf_car = 0.25 | |
| self.conf_two_wheeler = 0.12 # Very lenient β dense traffic, small boxes | |
| self.conf_bus_truck = 0.30 | |
| # Minimum bounding box areas (pixelsΒ²) β small values intentional | |
| self.min_area_motorcycle = 5 | |
| self.min_area_bicycle = 4 | |
| self.min_area_car = 12 | |
| self.min_area_bus_truck = 25 | |
| # FIX #1: Raised height caps β original 120/100 was too strict for close cameras | |
| self.max_height_motorcycle = 180 | |
| self.max_height_bicycle = 160 | |
| # IoU threshold for deduplication | |
| self.nms_iou_threshold = 0.45 | |
| try: | |
| from ultralytics import YOLO | |
| self.model = YOLO(model_path, verbose=False) | |
| except Exception as e: | |
| raise RuntimeError(f"Cannot load YOLOv8: {e}") | |
| # ------------------------------------------------------------------ | |
| def _box_stats(self, box): | |
| """Return (area, aspect_ratio w/h, height) from a YOLO box.""" | |
| w, h = float(box.xywh[0][2]), float(box.xywh[0][3]) | |
| area = w * h | |
| ar = w / h if h > 0 else 1.0 | |
| return area, ar, h | |
| # ------------------------------------------------------------------ | |
| def _is_valid_motorcycle(self, ar: float, area: float, height: float, conf: float) -> bool: | |
| """ | |
| FIX #1 β original used AND logic that was too strict. | |
| A motorcycle is valid if ANY of these are true: | |
| a) Wide box (ar >= 0.6) β side view, typical | |
| b) Large enough area (area >= 80) β even a tall narrow box is probably real | |
| c) High confidence (conf >= 0.35) regardless of shape β model is sure | |
| d) NOT a person: people are very tall and narrow (ar < 0.4) AND small area | |
| Person rejection: only reject if BOTH ar < 0.4 AND area < 60 | |
| (people at a distance are small AND narrow; motorcycles are rarely both) | |
| """ | |
| # Hard reject: looks like a distant person silhouette | |
| if ar < 0.4 and area < 60: | |
| return False | |
| # Hard reject: implausibly tall (taller than 1.8Γ the max camera height cap) | |
| if height > self.max_height_motorcycle * 1.8: | |
| return False | |
| # Minimum area | |
| if area < self.min_area_motorcycle: | |
| return False | |
| # Accept if wide OR large OR confident | |
| if ar >= 0.6 or area >= 80 or conf >= 0.35: | |
| return True | |
| # Edge case: small, narrow, low-conf β skip | |
| return False | |
| # ------------------------------------------------------------------ | |
| def _is_valid_bicycle(self, ar: float, area: float, height: float, conf: float) -> bool: | |
| """ | |
| FIX #2 β original lower bound of 0.5 cut head-on bicycles (ar ~0.35). | |
| Accept if: | |
| - ar >= 0.35 (head-on narrow is OK) | |
| - area >= min_area_bicycle | |
| - NOT person silhouette: ar < 0.35 AND area < 50 AND conf < 0.3 | |
| """ | |
| if ar < 0.35 and area < 50 and conf < 0.3: | |
| return False # Looks like a very narrow distant person | |
| if height > self.max_height_bicycle * 1.8: | |
| return False | |
| if area < self.min_area_bicycle: | |
| return False | |
| return 0.35 <= ar <= 2.5 | |
| # ------------------------------------------------------------------ | |
| def _should_keep(self, cls_id: int, conf: float, area: float, | |
| ar: float, height: float) -> bool: | |
| """Master gate: exclude people, apply per-class rules.""" | |
| if cls_id == self.person_class: | |
| return False | |
| if cls_id not in self.vehicle_classes and cls_id not in self.two_wheeler_classes: | |
| return False | |
| if cls_id == 2: # Car | |
| return conf >= self.conf_car and area >= self.min_area_car | |
| if cls_id in [5, 7]: # Bus, Truck | |
| return conf >= self.conf_bus_truck and area >= self.min_area_bus_truck | |
| if cls_id == 3: # Motorcycle | |
| return conf >= self.conf_two_wheeler and self._is_valid_motorcycle(ar, area, height, conf) | |
| if cls_id == 1: # Bicycle | |
| return conf >= self.conf_two_wheeler and self._is_valid_bicycle(ar, area, height, conf) | |
| return False | |
| # ------------------------------------------------------------------ | |
| def _nms_deduplicate(self, detections: list) -> list: | |
| """ | |
| FIX #3 β Remove overlapping detections of the same class. | |
| Simple IoU-based NMS (already sorted by confidence descending). | |
| """ | |
| if not detections: | |
| return detections | |
| kept = [] | |
| used = [False] * len(detections) | |
| for i, d in enumerate(detections): | |
| if used[i]: | |
| continue | |
| kept.append(d) | |
| x1i, y1i = d['x'] - d['w'] / 2, d['y'] - d['h'] / 2 | |
| x2i, y2i = d['x'] + d['w'] / 2, d['y'] + d['h'] / 2 | |
| for j in range(i + 1, len(detections)): | |
| if used[j]: | |
| continue | |
| e = detections[j] | |
| # Only suppress same class | |
| if e['class'] != d['class']: | |
| continue | |
| x1j, y1j = e['x'] - e['w'] / 2, e['y'] - e['h'] / 2 | |
| x2j, y2j = e['x'] + e['w'] / 2, e['y'] + e['h'] / 2 | |
| ix1, iy1 = max(x1i, x1j), max(y1i, y1j) | |
| ix2, iy2 = min(x2i, x2j), min(y2i, y2j) | |
| inter = max(0, ix2 - ix1) * max(0, iy2 - iy1) | |
| union = (x2i - x1i) * (y2i - y1i) + (x2j - x1j) * (y2j - y1j) - inter | |
| if union > 0 and inter / union > self.nms_iou_threshold: | |
| used[j] = True | |
| return kept | |
| # ------------------------------------------------------------------ | |
| def detect(self, frame: np.ndarray, lane_region: tuple = None) -> list: | |
| """ | |
| Detect vehicles. Returns list of dicts with keys: | |
| x, y, w, h, conf, class, size, is_two_wheeler, aspect_ratio, height | |
| """ | |
| self.frame_counter += 1 | |
| if self.frame_counter % self.skip_frames != 0: | |
| return self.last_detections if self.last_detections is not None else [] | |
| try: | |
| results = self.model(frame, verbose=False, conf=0.05) | |
| detections = [] | |
| for result in results: | |
| for box in result.boxes: | |
| cls_id = int(box.cls) | |
| conf = float(box.conf) | |
| area, ar, height = self._box_stats(box) | |
| if not self._should_keep(cls_id, conf, area, ar, height): | |
| continue | |
| x, y = int(box.xywh[0][0]), int(box.xywh[0][1]) | |
| w, h = float(box.xywh[0][2]), float(box.xywh[0][3]) | |
| if lane_region: | |
| x1r, y1r, x2r, y2r = lane_region | |
| if not (x1r <= x <= x2r and y1r <= y <= y2r): | |
| continue | |
| detections.append({ | |
| 'x': x, 'y': y, 'w': w, 'h': h, | |
| 'conf': min(conf, 1.0), | |
| 'class': cls_id, | |
| 'size': area, | |
| 'is_two_wheeler': cls_id in self.two_wheeler_classes, | |
| 'aspect_ratio': ar, | |
| 'height': height, | |
| }) | |
| # Sort by confidence descending, then deduplicate | |
| detections.sort(key=lambda d: d['conf'], reverse=True) | |
| detections = self._nms_deduplicate(detections) | |
| # Cap at 120 (raised from 100 to handle dense scenes) | |
| self.last_detections = detections[:120] | |
| return self.last_detections | |
| except Exception: | |
| return self.last_detections if self.last_detections is not None else [] | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Movement Queue Tracker (UPGRADED) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class MovementQueueTracker: | |
| """Track queues per movement (left/straight/right)""" | |
| def __init__(self, window_size: int = 10): | |
| self.window_size = window_size | |
| self.history = { | |
| 'N': {'left': deque(maxlen=window_size), 'straight': deque(maxlen=window_size), 'right': deque(maxlen=window_size)}, | |
| 'S': {'left': deque(maxlen=window_size), 'straight': deque(maxlen=window_size), 'right': deque(maxlen=window_size)}, | |
| 'E': {'left': deque(maxlen=window_size), 'straight': deque(maxlen=window_size), 'right': deque(maxlen=window_size)}, | |
| 'W': {'left': deque(maxlen=window_size), 'straight': deque(maxlen=window_size), 'right': deque(maxlen=window_size)}, | |
| } | |
| def update(self, raw_counts: dict) -> dict: | |
| """ | |
| Update with new counts | |
| Args: | |
| raw_counts: { | |
| 'N': {'left': int, 'straight': int, 'right': int}, | |
| ... | |
| } | |
| Returns: | |
| Smoothed counts (rolling average) | |
| """ | |
| for lane in ['N', 'S', 'E', 'W']: | |
| for movement in ['left', 'straight', 'right']: | |
| count = raw_counts.get(lane, {}).get(movement, 0) | |
| self.history[lane][movement].append(count) | |
| # Return rolling average | |
| smoothed = {} | |
| for lane in ['N', 'S', 'E', 'W']: | |
| smoothed[lane] = {} | |
| for movement in ['left', 'straight', 'right']: | |
| if len(self.history[lane][movement]) > 0: | |
| avg = np.mean(list(self.history[lane][movement])) | |
| smoothed[lane][movement] = int(avg) | |
| else: | |
| smoothed[lane][movement] = 0 | |
| return smoothed | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Parallel Stream Reader (from v1 - unchanged) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class ParallelStreamReader: | |
| """Read frames from 4 video streams in parallel""" | |
| def __init__(self, video_paths: dict): | |
| self.streams = { | |
| lane: VideoStreamGenerator(path) | |
| for lane, path in video_paths.items() | |
| } | |
| self.frames = {'N': None, 'S': None, 'E': None, 'W': None} | |
| self.frame_numbers = {'N': 0, 'S': 0, 'E': 0, 'W': 0} | |
| self.running = False | |
| self.threads = {} | |
| def start(self): | |
| self.running = True | |
| for lane in self.streams: | |
| thread = threading.Thread(target=self._read_stream, args=(lane,), daemon=True) | |
| thread.start() | |
| self.threads[lane] = thread | |
| def _read_stream(self, lane: str): | |
| for frame, frame_num in self.streams[lane]: | |
| if not self.running: | |
| break | |
| self.frames[lane] = frame | |
| self.frame_numbers[lane] = frame_num | |
| def get_frames(self) -> dict: | |
| return self.frames.copy() | |
| def stop(self): | |
| self.running = False | |
| for thread in self.threads.values(): | |
| thread.join(timeout=1) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # User Priority Manager (NEW) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class UserPriorityManager: | |
| """ | |
| Boost queues for user-selected route | |
| Makes system optimize for user's path | |
| """ | |
| def __init__(self, priority_boost: float = 3.0): | |
| self.priority_boost = priority_boost | |
| self.user_from = None | |
| self.user_direction = None | |
| def set_user_route(self, from_lane: str, direction: str): | |
| """ | |
| Set user's desired route | |
| Args: | |
| from_lane: 'N', 'S', 'E', or 'W' | |
| direction: 'left', 'straight', or 'right' | |
| """ | |
| self.user_from = from_lane | |
| self.user_direction = direction | |
| def apply_priority(self, counts: dict) -> dict: | |
| """ | |
| Boost queue count for user's route | |
| Args: | |
| counts: Movement queue counts | |
| Returns: | |
| Modified counts with user boost applied | |
| """ | |
| if self.user_from is None or self.user_direction is None: | |
| return counts # No user preference | |
| # Deep copy | |
| boosted = { | |
| lane: { | |
| movement: count | |
| for movement, count in counts[lane].items() | |
| } | |
| for lane in counts | |
| } | |
| # Apply boost | |
| boosted[self.user_from][self.user_direction] += self.priority_boost | |
| return boosted | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Real-Time Decision Engine V2 | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class RealtimeDecisionEngineV2: | |
| """ | |
| Advanced engine with movement-level control | |
| """ | |
| def __init__(self, video_paths: dict, skip_frames: int = 2): | |
| """ | |
| Args: | |
| video_paths: {'N': path, 'S': path, 'E': path, 'W': path} | |
| skip_frames: Frame skip rate | |
| """ | |
| self.stream_reader = ParallelStreamReader(video_paths) | |
| self.detector = FastYOLODetector(skip_frames=skip_frames) | |
| self.movement_detector = HybridMovementDetector(use_optical_flow=False) | |
| self.queue_tracker = MovementQueueTracker(window_size=10) | |
| self.phase_manager = PhaseManager() | |
| self.user_priority = UserPriorityManager() | |
| # Lane regions | |
| self.lane_regions = { | |
| 'N': (0, 0, 640, 180), | |
| 'S': (0, 180, 640, 360), | |
| 'E': None, | |
| 'W': None, | |
| } | |
| # Metrics | |
| self.metrics = { | |
| 'frame_count': 0, | |
| 'detection_time': 0, | |
| 'movement_time': 0, | |
| 'decision_time': 0, | |
| 'current_phase': 'NS_STRAIGHT', | |
| 'movement_counts': {lane: {'left': 0, 'straight': 0, 'right': 0} for lane in ['N', 'S', 'E', 'W']}, | |
| 'signal_state': {}, | |
| } | |
| def process_frame(self) -> dict: | |
| """ | |
| Process single frame with movement-level control | |
| """ | |
| start_time = time.time() | |
| # 1. Get frames | |
| raw_frames = self.stream_reader.get_frames() | |
| # 2. Detect vehicles | |
| detection_start = time.time() | |
| detections = {} | |
| for lane, frame in raw_frames.items(): | |
| if frame is not None: | |
| region = self.lane_regions.get(lane) | |
| detections[lane] = self.detector.detect(frame, region) | |
| else: | |
| detections[lane] = [] | |
| self.metrics['detection_time'] = time.time() - detection_start | |
| # 3. Classify movements | |
| movement_start = time.time() | |
| raw_movements = {} | |
| for lane, frame in raw_frames.items(): | |
| if frame is not None: | |
| raw_movements[lane] = self.movement_detector.get_movements( | |
| frame, detections[lane], lane | |
| ) | |
| else: | |
| raw_movements[lane] = {'left': 0, 'straight': 0, 'right': 0} | |
| self.metrics['movement_time'] = time.time() - movement_start | |
| # 4. Smooth movements | |
| smoothed_movements = self.queue_tracker.update(raw_movements) | |
| # 5. Apply user priority (if set) | |
| prioritized_movements = self.user_priority.apply_priority(smoothed_movements) | |
| self.metrics['movement_counts'] = smoothed_movements.copy() | |
| # 6. Make phase decision | |
| decision_start = time.time() | |
| phase = self.phase_manager.decide_phase(prioritized_movements) | |
| signal_state = self.phase_manager.get_signal_state() | |
| self.metrics['decision_time'] = time.time() - decision_start | |
| self.metrics['current_phase'] = phase.value | |
| self.metrics['signal_state'] = signal_state | |
| self.metrics['frame_count'] += 1 | |
| return { | |
| 'phase': phase.value, | |
| 'movements': smoothed_movements, | |
| 'signal_state': signal_state, | |
| 'raw_movements': raw_movements, | |
| 'metrics': self.metrics.copy(), | |
| 'total_time_ms': (time.time() - start_time) * 1000, | |
| } | |
| def set_user_route(self, from_lane: str, direction: str): | |
| """Set user's desired route for priority""" | |
| self.user_priority.set_user_route(from_lane, direction) | |
| def start(self): | |
| """Start engine""" | |
| self.stream_reader.start() | |
| def stop(self): | |
| """Stop engine""" | |
| self.stream_reader.stop() | |
| def get_metrics(self) -> dict: | |
| """Get current metrics""" | |
| return self.metrics.copy() | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Test/Standalone Usage | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if __name__ == "__main__": | |
| print("π¦ Real-Time Decision Engine V2 (Movement-Level)") | |
| print("=" * 60) | |
| video_paths = { | |
| 'N': 'test_video_N.mp4', | |
| 'S': 'test_video_S.mp4', | |
| 'E': 'test_video_E.mp4', | |
| 'W': 'test_video_W.mp4', | |
| } | |
| # Initialize engine | |
| engine = RealtimeDecisionEngineV2(video_paths, skip_frames=2) | |
| engine.start() | |
| # Set user preference (optional) | |
| # engine.set_user_route(from_lane='N', direction='straight') | |
| print("β Engine started. Processing frames...") | |
| print("=" * 60) | |
| # Run for 10 seconds | |
| start = time.time() | |
| frame_count = 0 | |
| while time.time() - start < 10: | |
| result = engine.process_frame() | |
| frame_count += 1 | |
| if frame_count % 10 == 0: | |
| print(f"\nFrame {result['metrics']['frame_count']}") | |
| movements = result['movements'] | |
| print(f" Movements:") | |
| for lane in ['N', 'S', 'E', 'W']: | |
| print(f" {lane}: L={movements[lane]['left']} S={movements[lane]['straight']} R={movements[lane]['right']}") | |
| print(f" Phase: {result['phase']}") | |
| print(f" Time: {result['total_time_ms']:.1f}ms") | |
| time.sleep(0.05) | |
| engine.stop() | |
| print("\nβ Engine stopped") | |