pranit / realtime_engine_v2.py
RushiMane2003's picture
Upload 41 files
99f938a verified
"""
realtime_engine_v2.py β€” Advanced Real-Time Engine with Movement-Level Control
Features:
- Movement classification (Left/Straight/Right per lane)
- Phase-based signal control (4 conflict-free phases)
- User priority injection (boost specific routes)
- Max-pressure phase decision algorithm
- Real-time movement metrics
"""
import cv2
import numpy as np
import threading
import time
from collections import deque
import warnings
warnings.filterwarnings('ignore')
from movement_detector import HybridMovementDetector
from phase_controller import PhaseManager
# ─────────────────────────────────────────────────────────────────
# Video Stream Generator (from v1 - unchanged)
# ─────────────────────────────────────────────────────────────────
class VideoStreamGenerator:
"""Generator that loops a video infinitely"""
def __init__(self, video_path: str, resize_width: int = 640, resize_height: int = 360):
self.video_path = video_path
self.resize_width = resize_width
self.resize_height = resize_height
self.frame_count = 0
self.cap = None
self._init_capture()
def _init_capture(self):
self.cap = cv2.VideoCapture(self.video_path)
if not self.cap.isOpened():
raise RuntimeError(f"Cannot open video: {self.video_path}")
def __iter__(self):
return self
def __next__(self):
while True:
ret, frame = self.cap.read()
if not ret:
self.cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
continue
frame = cv2.resize(frame, (self.resize_width, self.resize_height))
self.frame_count += 1
return frame, self.frame_count
# ─────────────────────────────────────────────────────────────────
# Fast YOLO Detector (from v1 - unchanged)
# ─────────────────────────────────────────────────────────────────
class FastYOLODetector:
"""
YOLOv8 detector tuned for dense Indian traffic (high two-wheeler density).
Key fixes vs original:
- Motorcycle aspect ratio check now uses OR logic (wide OR large area OR tall-but-confident)
- Bicycle accepts aspect ratios from 0.35 (covers head-on narrow bikes)
- Height caps raised to 180px/160px (closer cameras show taller bounding boxes)
- Added soft NMS via IoU deduplication to remove ghost detections
- Class-specific confidence done AFTER YOLO inference at 0.05 (unchanged, correct)
"""
def __init__(self, model_path: str = "yolov8n.pt", skip_frames: int = 2, conf: float = 0.5):
self.model_path = model_path
self.skip_frames = skip_frames
self.conf = conf
self.frame_counter = 0
self.last_detections = None
# YOLO COCO classes
self.vehicle_classes = [2, 5, 7] # Car, Bus, Truck
self.two_wheeler_classes = [1, 3] # Bicycle, Motorcycle
self.person_class = 0 # MUST EXCLUDE
# Per-class confidence thresholds (applied after YOLO runs at 0.05)
self.conf_car = 0.25
self.conf_two_wheeler = 0.12 # Very lenient β€” dense traffic, small boxes
self.conf_bus_truck = 0.30
# Minimum bounding box areas (pixelsΒ²) β€” small values intentional
self.min_area_motorcycle = 5
self.min_area_bicycle = 4
self.min_area_car = 12
self.min_area_bus_truck = 25
# FIX #1: Raised height caps β€” original 120/100 was too strict for close cameras
self.max_height_motorcycle = 180
self.max_height_bicycle = 160
# IoU threshold for deduplication
self.nms_iou_threshold = 0.45
try:
from ultralytics import YOLO
self.model = YOLO(model_path, verbose=False)
except Exception as e:
raise RuntimeError(f"Cannot load YOLOv8: {e}")
# ------------------------------------------------------------------
def _box_stats(self, box):
"""Return (area, aspect_ratio w/h, height) from a YOLO box."""
w, h = float(box.xywh[0][2]), float(box.xywh[0][3])
area = w * h
ar = w / h if h > 0 else 1.0
return area, ar, h
# ------------------------------------------------------------------
def _is_valid_motorcycle(self, ar: float, area: float, height: float, conf: float) -> bool:
"""
FIX #1 β€” original used AND logic that was too strict.
A motorcycle is valid if ANY of these are true:
a) Wide box (ar >= 0.6) β€” side view, typical
b) Large enough area (area >= 80) β€” even a tall narrow box is probably real
c) High confidence (conf >= 0.35) regardless of shape β€” model is sure
d) NOT a person: people are very tall and narrow (ar < 0.4) AND small area
Person rejection: only reject if BOTH ar < 0.4 AND area < 60
(people at a distance are small AND narrow; motorcycles are rarely both)
"""
# Hard reject: looks like a distant person silhouette
if ar < 0.4 and area < 60:
return False
# Hard reject: implausibly tall (taller than 1.8Γ— the max camera height cap)
if height > self.max_height_motorcycle * 1.8:
return False
# Minimum area
if area < self.min_area_motorcycle:
return False
# Accept if wide OR large OR confident
if ar >= 0.6 or area >= 80 or conf >= 0.35:
return True
# Edge case: small, narrow, low-conf β€” skip
return False
# ------------------------------------------------------------------
def _is_valid_bicycle(self, ar: float, area: float, height: float, conf: float) -> bool:
"""
FIX #2 β€” original lower bound of 0.5 cut head-on bicycles (ar ~0.35).
Accept if:
- ar >= 0.35 (head-on narrow is OK)
- area >= min_area_bicycle
- NOT person silhouette: ar < 0.35 AND area < 50 AND conf < 0.3
"""
if ar < 0.35 and area < 50 and conf < 0.3:
return False # Looks like a very narrow distant person
if height > self.max_height_bicycle * 1.8:
return False
if area < self.min_area_bicycle:
return False
return 0.35 <= ar <= 2.5
# ------------------------------------------------------------------
def _should_keep(self, cls_id: int, conf: float, area: float,
ar: float, height: float) -> bool:
"""Master gate: exclude people, apply per-class rules."""
if cls_id == self.person_class:
return False
if cls_id not in self.vehicle_classes and cls_id not in self.two_wheeler_classes:
return False
if cls_id == 2: # Car
return conf >= self.conf_car and area >= self.min_area_car
if cls_id in [5, 7]: # Bus, Truck
return conf >= self.conf_bus_truck and area >= self.min_area_bus_truck
if cls_id == 3: # Motorcycle
return conf >= self.conf_two_wheeler and self._is_valid_motorcycle(ar, area, height, conf)
if cls_id == 1: # Bicycle
return conf >= self.conf_two_wheeler and self._is_valid_bicycle(ar, area, height, conf)
return False
# ------------------------------------------------------------------
def _nms_deduplicate(self, detections: list) -> list:
"""
FIX #3 β€” Remove overlapping detections of the same class.
Simple IoU-based NMS (already sorted by confidence descending).
"""
if not detections:
return detections
kept = []
used = [False] * len(detections)
for i, d in enumerate(detections):
if used[i]:
continue
kept.append(d)
x1i, y1i = d['x'] - d['w'] / 2, d['y'] - d['h'] / 2
x2i, y2i = d['x'] + d['w'] / 2, d['y'] + d['h'] / 2
for j in range(i + 1, len(detections)):
if used[j]:
continue
e = detections[j]
# Only suppress same class
if e['class'] != d['class']:
continue
x1j, y1j = e['x'] - e['w'] / 2, e['y'] - e['h'] / 2
x2j, y2j = e['x'] + e['w'] / 2, e['y'] + e['h'] / 2
ix1, iy1 = max(x1i, x1j), max(y1i, y1j)
ix2, iy2 = min(x2i, x2j), min(y2i, y2j)
inter = max(0, ix2 - ix1) * max(0, iy2 - iy1)
union = (x2i - x1i) * (y2i - y1i) + (x2j - x1j) * (y2j - y1j) - inter
if union > 0 and inter / union > self.nms_iou_threshold:
used[j] = True
return kept
# ------------------------------------------------------------------
def detect(self, frame: np.ndarray, lane_region: tuple = None) -> list:
"""
Detect vehicles. Returns list of dicts with keys:
x, y, w, h, conf, class, size, is_two_wheeler, aspect_ratio, height
"""
self.frame_counter += 1
if self.frame_counter % self.skip_frames != 0:
return self.last_detections if self.last_detections is not None else []
try:
results = self.model(frame, verbose=False, conf=0.05)
detections = []
for result in results:
for box in result.boxes:
cls_id = int(box.cls)
conf = float(box.conf)
area, ar, height = self._box_stats(box)
if not self._should_keep(cls_id, conf, area, ar, height):
continue
x, y = int(box.xywh[0][0]), int(box.xywh[0][1])
w, h = float(box.xywh[0][2]), float(box.xywh[0][3])
if lane_region:
x1r, y1r, x2r, y2r = lane_region
if not (x1r <= x <= x2r and y1r <= y <= y2r):
continue
detections.append({
'x': x, 'y': y, 'w': w, 'h': h,
'conf': min(conf, 1.0),
'class': cls_id,
'size': area,
'is_two_wheeler': cls_id in self.two_wheeler_classes,
'aspect_ratio': ar,
'height': height,
})
# Sort by confidence descending, then deduplicate
detections.sort(key=lambda d: d['conf'], reverse=True)
detections = self._nms_deduplicate(detections)
# Cap at 120 (raised from 100 to handle dense scenes)
self.last_detections = detections[:120]
return self.last_detections
except Exception:
return self.last_detections if self.last_detections is not None else []
# ─────────────────────────────────────────────────────────────────
# Movement Queue Tracker (UPGRADED)
# ─────────────────────────────────────────────────────────────────
class MovementQueueTracker:
"""Track queues per movement (left/straight/right)"""
def __init__(self, window_size: int = 10):
self.window_size = window_size
self.history = {
'N': {'left': deque(maxlen=window_size), 'straight': deque(maxlen=window_size), 'right': deque(maxlen=window_size)},
'S': {'left': deque(maxlen=window_size), 'straight': deque(maxlen=window_size), 'right': deque(maxlen=window_size)},
'E': {'left': deque(maxlen=window_size), 'straight': deque(maxlen=window_size), 'right': deque(maxlen=window_size)},
'W': {'left': deque(maxlen=window_size), 'straight': deque(maxlen=window_size), 'right': deque(maxlen=window_size)},
}
def update(self, raw_counts: dict) -> dict:
"""
Update with new counts
Args:
raw_counts: {
'N': {'left': int, 'straight': int, 'right': int},
...
}
Returns:
Smoothed counts (rolling average)
"""
for lane in ['N', 'S', 'E', 'W']:
for movement in ['left', 'straight', 'right']:
count = raw_counts.get(lane, {}).get(movement, 0)
self.history[lane][movement].append(count)
# Return rolling average
smoothed = {}
for lane in ['N', 'S', 'E', 'W']:
smoothed[lane] = {}
for movement in ['left', 'straight', 'right']:
if len(self.history[lane][movement]) > 0:
avg = np.mean(list(self.history[lane][movement]))
smoothed[lane][movement] = int(avg)
else:
smoothed[lane][movement] = 0
return smoothed
# ─────────────────────────────────────────────────────────────────
# Parallel Stream Reader (from v1 - unchanged)
# ─────────────────────────────────────────────────────────────────
class ParallelStreamReader:
"""Read frames from 4 video streams in parallel"""
def __init__(self, video_paths: dict):
self.streams = {
lane: VideoStreamGenerator(path)
for lane, path in video_paths.items()
}
self.frames = {'N': None, 'S': None, 'E': None, 'W': None}
self.frame_numbers = {'N': 0, 'S': 0, 'E': 0, 'W': 0}
self.running = False
self.threads = {}
def start(self):
self.running = True
for lane in self.streams:
thread = threading.Thread(target=self._read_stream, args=(lane,), daemon=True)
thread.start()
self.threads[lane] = thread
def _read_stream(self, lane: str):
for frame, frame_num in self.streams[lane]:
if not self.running:
break
self.frames[lane] = frame
self.frame_numbers[lane] = frame_num
def get_frames(self) -> dict:
return self.frames.copy()
def stop(self):
self.running = False
for thread in self.threads.values():
thread.join(timeout=1)
# ─────────────────────────────────────────────────────────────────
# User Priority Manager (NEW)
# ─────────────────────────────────────────────────────────────────
class UserPriorityManager:
"""
Boost queues for user-selected route
Makes system optimize for user's path
"""
def __init__(self, priority_boost: float = 3.0):
self.priority_boost = priority_boost
self.user_from = None
self.user_direction = None
def set_user_route(self, from_lane: str, direction: str):
"""
Set user's desired route
Args:
from_lane: 'N', 'S', 'E', or 'W'
direction: 'left', 'straight', or 'right'
"""
self.user_from = from_lane
self.user_direction = direction
def apply_priority(self, counts: dict) -> dict:
"""
Boost queue count for user's route
Args:
counts: Movement queue counts
Returns:
Modified counts with user boost applied
"""
if self.user_from is None or self.user_direction is None:
return counts # No user preference
# Deep copy
boosted = {
lane: {
movement: count
for movement, count in counts[lane].items()
}
for lane in counts
}
# Apply boost
boosted[self.user_from][self.user_direction] += self.priority_boost
return boosted
# ─────────────────────────────────────────────────────────────────
# Real-Time Decision Engine V2
# ─────────────────────────────────────────────────────────────────
class RealtimeDecisionEngineV2:
"""
Advanced engine with movement-level control
"""
def __init__(self, video_paths: dict, skip_frames: int = 2):
"""
Args:
video_paths: {'N': path, 'S': path, 'E': path, 'W': path}
skip_frames: Frame skip rate
"""
self.stream_reader = ParallelStreamReader(video_paths)
self.detector = FastYOLODetector(skip_frames=skip_frames)
self.movement_detector = HybridMovementDetector(use_optical_flow=False)
self.queue_tracker = MovementQueueTracker(window_size=10)
self.phase_manager = PhaseManager()
self.user_priority = UserPriorityManager()
# Lane regions
self.lane_regions = {
'N': (0, 0, 640, 180),
'S': (0, 180, 640, 360),
'E': None,
'W': None,
}
# Metrics
self.metrics = {
'frame_count': 0,
'detection_time': 0,
'movement_time': 0,
'decision_time': 0,
'current_phase': 'NS_STRAIGHT',
'movement_counts': {lane: {'left': 0, 'straight': 0, 'right': 0} for lane in ['N', 'S', 'E', 'W']},
'signal_state': {},
}
def process_frame(self) -> dict:
"""
Process single frame with movement-level control
"""
start_time = time.time()
# 1. Get frames
raw_frames = self.stream_reader.get_frames()
# 2. Detect vehicles
detection_start = time.time()
detections = {}
for lane, frame in raw_frames.items():
if frame is not None:
region = self.lane_regions.get(lane)
detections[lane] = self.detector.detect(frame, region)
else:
detections[lane] = []
self.metrics['detection_time'] = time.time() - detection_start
# 3. Classify movements
movement_start = time.time()
raw_movements = {}
for lane, frame in raw_frames.items():
if frame is not None:
raw_movements[lane] = self.movement_detector.get_movements(
frame, detections[lane], lane
)
else:
raw_movements[lane] = {'left': 0, 'straight': 0, 'right': 0}
self.metrics['movement_time'] = time.time() - movement_start
# 4. Smooth movements
smoothed_movements = self.queue_tracker.update(raw_movements)
# 5. Apply user priority (if set)
prioritized_movements = self.user_priority.apply_priority(smoothed_movements)
self.metrics['movement_counts'] = smoothed_movements.copy()
# 6. Make phase decision
decision_start = time.time()
phase = self.phase_manager.decide_phase(prioritized_movements)
signal_state = self.phase_manager.get_signal_state()
self.metrics['decision_time'] = time.time() - decision_start
self.metrics['current_phase'] = phase.value
self.metrics['signal_state'] = signal_state
self.metrics['frame_count'] += 1
return {
'phase': phase.value,
'movements': smoothed_movements,
'signal_state': signal_state,
'raw_movements': raw_movements,
'metrics': self.metrics.copy(),
'total_time_ms': (time.time() - start_time) * 1000,
}
def set_user_route(self, from_lane: str, direction: str):
"""Set user's desired route for priority"""
self.user_priority.set_user_route(from_lane, direction)
def start(self):
"""Start engine"""
self.stream_reader.start()
def stop(self):
"""Stop engine"""
self.stream_reader.stop()
def get_metrics(self) -> dict:
"""Get current metrics"""
return self.metrics.copy()
# ─────────────────────────────────────────────────────────────────
# Test/Standalone Usage
# ─────────────────────────────────────────────────────────────────
if __name__ == "__main__":
print("🚦 Real-Time Decision Engine V2 (Movement-Level)")
print("=" * 60)
video_paths = {
'N': 'test_video_N.mp4',
'S': 'test_video_S.mp4',
'E': 'test_video_E.mp4',
'W': 'test_video_W.mp4',
}
# Initialize engine
engine = RealtimeDecisionEngineV2(video_paths, skip_frames=2)
engine.start()
# Set user preference (optional)
# engine.set_user_route(from_lane='N', direction='straight')
print("βœ“ Engine started. Processing frames...")
print("=" * 60)
# Run for 10 seconds
start = time.time()
frame_count = 0
while time.time() - start < 10:
result = engine.process_frame()
frame_count += 1
if frame_count % 10 == 0:
print(f"\nFrame {result['metrics']['frame_count']}")
movements = result['movements']
print(f" Movements:")
for lane in ['N', 'S', 'E', 'W']:
print(f" {lane}: L={movements[lane]['left']} S={movements[lane]['straight']} R={movements[lane]['right']}")
print(f" Phase: {result['phase']}")
print(f" Time: {result['total_time_ms']:.1f}ms")
time.sleep(0.05)
engine.stop()
print("\nβœ“ Engine stopped")