""" Processing Service Module Orchestrates video processing jobs with real-time event streaming. Manages background processing threads, vehicle tracking, and WebSocket events. """ from __future__ import annotations import os import queue import threading from dataclasses import dataclass, field from datetime import datetime, timedelta from enum import Enum from typing import Dict, List, Optional, Set, Tuple, Any import cv2 import numpy as np from PIL import Image import supervision as sv from app import socketio from app.config import Config from app.models import SessionData, VehicleEvent from app.services.video_processor import VideoProcessor from app.services.firebase_service import FirebaseService from app.state import frame_queues from app.utils.math_utils import calculate_line_signed_distance # ============================================================================= # CONSTANTS & CONFIGURATION # ============================================================================= class ProcessingStatus(str, Enum): """Processing job status states.""" PENDING = 'pending' PROCESSING = 'processing' COMPLETED = 'completed' STOPPED = 'stopped' ERROR = 'error' class CameraRole(str, Enum): """Camera role identifiers.""" ENTRY = 'ENTRY' EXIT = 'EXIT' class VideoSource(str, Enum): """Video input source types.""" FILE = 'file' LIVE_STREAM = 'live_stream' @dataclass class ProcessingConfig: """Configuration constants for video processing.""" # Frame processing PROGRESS_UPDATE_INTERVAL: int = 10 # Emit progress every N frames STREAM_FRAME_INTERVAL: int = 2 # Stream every Nth frame # Tracking thresholds TRACK_THRESH: float = 0.25 TRACK_BUFFER: int = 30 MATCH_THRESH: float = 0.8 CROSSING_DISTANCE_THRESHOLD: float = 25.0 # Annotation colors (hex) ANNOTATION_COLORS: Tuple[str, ...] = ( "#ffff00", "#ff9b00", "#ff66ff", "#3399ff", "#ff66b2", "#ff8080", "#b266ff" ) LINE_COLOR: Tuple[int, int, int] = (0, 255, 0) # BGR LINE_THICKNESS: int = 2 # Global processing configuration PROC_CONFIG = ProcessingConfig() # Global state to track processing jobs: {session_id: {camera_role: job}} processing_jobs: Dict[str, Dict[str, ProcessingJob]] = {} # ============================================================================= # DATA CLASSES # ============================================================================= @dataclass class TrackingState: """Maintains state for vehicle tracking across frames.""" track_classes: Dict[int, int] = field(default_factory=dict) track_distances: Dict[int, Tuple[float, bool]] = field(default_factory=dict) counted_ids: Set[int] = field(default_factory=set) def reset(self): """Clear all tracking state.""" self.track_classes.clear() self.track_distances.clear() self.counted_ids.clear() @dataclass class ProcessingJob: """Represents a video processing job with its configuration and state.""" session_id: str video_path: str line_points: List[List[float]] location: str camera_role: str = CameraRole.ENTRY.value video_start_time: datetime = field(default_factory=datetime.now) status: str = ProcessingStatus.PENDING.value progress: int = 0 error: Optional[str] = None thread: Optional[threading.Thread] = None # Live stream support is_live_stream: bool = False should_stop: bool = False frames_processed: int = 0 # Firebase batching state last_firebase_save_time: datetime = field(default_factory=datetime.now) last_event_count_saved: int = 0 def to_dict(self) -> dict: """Convert job to dictionary for serialization.""" return { 'session_id': self.session_id, 'status': self.status, 'progress': self.progress, 'error': self.error, 'location': self.location, 'camera_role': self.camera_role, 'is_live_stream': self.is_live_stream, 'frames_processed': self.frames_processed } def stop(self) -> None: """Signal the job to stop processing.""" self.should_stop = True @property def line_points_int(self) -> Tuple[Tuple[int, int], Tuple[int, int]]: """Get line points as integer tuples.""" return ( (int(self.line_points[0][0]), int(self.line_points[0][1])), (int(self.line_points[1][0]), int(self.line_points[1][1])) ) # ============================================================================= # PUBLIC API # ============================================================================= def start_processing( session_id: str, video_path: str, line_points: List[List[float]], location: str, video_start_time: datetime = None, camera_role: str = CameraRole.ENTRY.value, is_live_stream: bool = False ) -> ProcessingJob: """ Start video processing in a background thread. Args: session_id: Unique identifier for the session video_path: Path to the video file or stream URL line_points: Counting line coordinates [[x1,y1], [x2,y2]] location: Location name for the session video_start_time: Timestamp for the video start camera_role: 'ENTRY' or 'EXIT' camera designation is_live_stream: Whether the source is a live stream (RTSP/HTTP) Returns: ProcessingJob instance for tracking progress """ # Check for existing job and ensure it is fully stopped if session_id in processing_jobs and camera_role in processing_jobs[session_id]: existing_job = processing_jobs[session_id][camera_role] # Signal stop if it's still running if existing_job.status == ProcessingStatus.PROCESSING.value: print(f"Stopping existing thread for {camera_role} before restart...") existing_job.stop() # Wait for the thread to actually finish (prevent queue conflict) if existing_job.thread and existing_job.thread.is_alive(): existing_job.thread.join(timeout=5.0) print(f"Existing thread for {camera_role} terminated.") # Clear stale frames from queue _clear_frame_queue(camera_role) # Auto-detect live stream if not explicitly set if not is_live_stream: is_live_stream = _is_live_stream(video_path) # Create job job = ProcessingJob( session_id=session_id, video_path=video_path, line_points=line_points, location=location, camera_role=camera_role, video_start_time=video_start_time or datetime.now(), is_live_stream=is_live_stream ) # Store job in global registry if session_id not in processing_jobs: processing_jobs[session_id] = {} processing_jobs[session_id][camera_role] = job # Start background thread job.thread = threading.Thread( target=_run_processing_job, args=(job,), daemon=True, name=f"ProcessingJob-{session_id}-{camera_role}" ) job.thread.start() return job def stop_processing(session_id: str, camera_role: str = None) -> bool: """ Stop a running processing job. Args: session_id: The session ID to stop camera_role: Specific camera to stop, or None to stop all Returns: True if job(s) were signaled to stop, False if not found """ if session_id not in processing_jobs: return False stopped = False jobs = processing_jobs[session_id] if camera_role: # Stop specific camera if camera_role in jobs: jobs[camera_role].stop() stopped = True else: # Stop all cameras in session for job in jobs.values(): job.stop() stopped = True return stopped def _is_live_stream(video_path: str) -> bool: """Check if the video source is a live stream URL.""" if not video_path: return False return video_path.lower().startswith(('rtsp://', 'http://', 'https://', 'rtmp://')) def get_job_status(session_id: str) -> Optional[Dict[str, dict]]: """ Get the status of all processing jobs for a session. Args: session_id: The session to query Returns: Dict mapping camera roles to job status dicts, or None if not found """ if session_id not in processing_jobs: return None return { role: job.to_dict() for role, job in processing_jobs[session_id].items() } # ============================================================================= # BACKGROUND PROCESSING # ============================================================================= def _run_processing_job(job: ProcessingJob) -> None: """ Background thread function that processes video and emits events. Args: job: The processing job to execute """ job.status = ProcessingStatus.PROCESSING.value _emit_status_update(job) try: # Initialize services processor = VideoProcessor(model_path=Config.MODEL_PATH) firebase = FirebaseService() # Create session data container session_data = SessionData(job.session_id, job.location) session_data.line_coordinates = job.line_points # Process based on source type if job.is_live_stream: _process_live_stream(processor, firebase, job, session_data) else: _process_video(processor, firebase, job, session_data) # Save final results with all events firebase.save_session(session_data, update_events=True, camera_role=job.camera_role) print(f"[{job.camera_role}] Final save: {len(session_data.events)} total events saved to Firebase") # Mark complete (or stopped for live streams) if job.should_stop: job.status = ProcessingStatus.STOPPED.value else: job.status = ProcessingStatus.COMPLETED.value job.progress = 100 _emit_status_update(job) _emit_processing_complete(job, session_data.get_statistics()) except Exception as e: job.status = ProcessingStatus.ERROR.value job.error = str(e) _emit_status_update(job) _emit_error(job, str(e)) print(f"Processing error for {job.camera_role}: {e}") def _process_video( processor: VideoProcessor, firebase: FirebaseService, job: ProcessingJob, session_data: SessionData ) -> str: """ Process video frame by frame with tracking and event emission. Args: processor: VideoProcessor instance for detection firebase: FirebaseService for persistence job: Current processing job session_data: Session data container Returns: Path to the processed output video """ # Initialize video capture cap = cv2.VideoCapture(job.video_path) fps = cap.get(cv2.CAP_PROP_FPS) or 30.0 total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) # Setup video writer output_path = _create_video_writer(job, cap, fps) writer = cv2.VideoWriter( output_path, cv2.VideoWriter_fourcc(*"mp4v"), fps, (Config.FRAME_WIDTH, Config.FRAME_HEIGHT) ) # Initialize tracker and annotators tracker = _create_tracker(fps) annotators = _create_annotators() # Tracking state tracking = TrackingState() # Processing state frame_queue = frame_queues.get(job.camera_role) frame_idx = 0 last_event_count = 0 try: while True: ret, frame = cap.read() if not ret: break # Process frame annotated_frame = _process_single_frame( frame=frame, processor=processor, tracker=tracker, annotators=annotators, tracking=tracking, job=job, session_data=session_data, frame_idx=frame_idx, fps=fps ) # Write to output writer.write(annotated_frame) # Stream frame for live display _stream_frame(frame_queue, annotated_frame, frame_idx) # Periodic updates if frame_idx % PROC_CONFIG.PROGRESS_UPDATE_INTERVAL == 0: last_event_count = _handle_periodic_updates( job=job, session_data=session_data, firebase=firebase, total_frames=total_frames, frame_idx=frame_idx, last_event_count=last_event_count ) frame_idx += 1 finally: cap.release() writer.release() return output_path def _process_live_stream( processor: VideoProcessor, firebase: FirebaseService, job: ProcessingJob, session_data: SessionData ) -> None: """ Process live stream continuously until stopped. Key differences from file processing: - No total_frames (runs indefinitely) - Handles reconnection on failure - Runs until job.should_stop is True - No video file output (streaming only) Args: processor: VideoProcessor instance for detection firebase: FirebaseService for persistence job: Current processing job session_data: Session data container """ import time cap = None retry_count = 0 max_retries = Config.LIVE_STREAM_RETRY_ATTEMPTS retry_delay = Config.LIVE_STREAM_RETRY_DELAY # Initialize tracker and annotators fps = 30.0 # Default for live streams tracker = _create_tracker(fps) annotators = _create_annotators() tracking = TrackingState() frame_queue = frame_queues.get(job.camera_role) frame_idx = 0 last_event_count = 0 print(f"Starting live stream processing for {job.camera_role}: {job.video_path}") try: while not job.should_stop: # Connect/reconnect to stream if cap is None or not cap.isOpened(): if retry_count >= max_retries: raise ConnectionError(f"Lost connection to stream after {max_retries} retries") if retry_count > 0: print(f"Reconnecting to stream (attempt {retry_count + 1}/{max_retries})...") _emit_status_update(job) # Notify client of reconnection attempt time.sleep(retry_delay) cap = cv2.VideoCapture(job.video_path) cap.set(cv2.CAP_PROP_BUFFERSIZE, Config.LIVE_STREAM_BUFFER_SIZE) if not cap.isOpened(): retry_count += 1 cap = None continue # Successfully connected actual_fps = cap.get(cv2.CAP_PROP_FPS) if actual_fps > 0: fps = actual_fps tracker = _create_tracker(fps) retry_count = 0 print(f"Connected to live stream at {fps:.1f} FPS") # Read frame ret, frame = cap.read() if not ret: retry_count += 1 cap.release() cap = None continue # Reset retry count on successful read retry_count = 0 # Process frame annotated_frame = _process_single_frame( frame=frame, processor=processor, tracker=tracker, annotators=annotators, tracking=tracking, job=job, session_data=session_data, frame_idx=frame_idx, fps=fps ) # Stream frame for live display _stream_frame(frame_queue, annotated_frame, frame_idx) # Update job stats job.frames_processed = frame_idx # Periodic updates (every 10 frames) if frame_idx % PROC_CONFIG.PROGRESS_UPDATE_INTERVAL == 0: last_event_count = _handle_live_stream_updates( job=job, session_data=session_data, firebase=firebase, frame_idx=frame_idx, last_event_count=last_event_count ) frame_idx += 1 finally: if cap: cap.release() print(f"Live stream processing stopped for {job.camera_role}. Frames processed: {frame_idx}") def _handle_live_stream_updates( job: ProcessingJob, session_data: SessionData, firebase: FirebaseService, frame_idx: int, last_event_count: int ) -> int: """ Handle periodic updates for live stream processing with batched Firebase writes. Returns: Updated event count """ # For live streams, emit a "live" status instead of progress percentage socketio.emit( 'processing_progress', { 'session_id': job.session_id, 'progress': -1, # -1 indicates live stream 'camera_role': job.camera_role, 'frames_processed': frame_idx, 'is_live': True }, room=job.session_id, namespace='/' ) # Check for new events and emit them immediately via WebSocket current_count = len(session_data.events) if current_count > last_event_count: # Emit new events to WebSocket (real-time UI updates) for event in session_data.events[last_event_count:]: _emit_vehicle_event(job, event) # Emit updated statistics to WebSocket _emit_statistics_update(job, session_data.get_statistics()) # Batched Firebase persistence (time-based) time_since_last_save = (datetime.now() - job.last_firebase_save_time).total_seconds() # Save to Firebase if interval elapsed if time_since_last_save >= Config.FIREBASE_LIVE_STREAM_INTERVAL: if current_count > job.last_event_count_saved: # Save session with batched events and updated statistics firebase.save_session( session_data, update_events=True, # Include all events in batch camera_role=job.camera_role ) job.last_event_count_saved = current_count print(f"[{job.camera_role}] Live stream: Batched {current_count - job.last_event_count_saved} events to Firebase") else: # Update statistics only (no new events) firebase.save_session( session_data, update_events=False, camera_role=job.camera_role ) job.last_firebase_save_time = datetime.now() return current_count def _process_single_frame( frame: np.ndarray, processor: VideoProcessor, tracker: sv.ByteTrack, annotators: dict, tracking: TrackingState, job: ProcessingJob, session_data: SessionData, frame_idx: int, fps: float ) -> np.ndarray: """ Process a single video frame: detect, track, annotate, count. Returns: Annotated frame ready for output/streaming """ # Resize and convert resized = cv2.resize(frame, (Config.FRAME_WIDTH, Config.FRAME_HEIGHT)) rgb_frame = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB) pil_image = Image.fromarray(rgb_frame) # Detect and track detections = processor.detect(pil_image) detections = tracker.update_with_detections(detections) # Build labels labels = _build_detection_labels(detections, processor) # Annotate frame annotated = _annotate_frame(resized, detections, labels, annotators, job) # Process crossings _detect_line_crossings( detections=detections, processor=processor, tracking=tracking, job=job, session_data=session_data, frame_idx=frame_idx, fps=fps ) return annotated # ============================================================================= # DETECTION & TRACKING HELPERS # ============================================================================= def _create_tracker(fps: float) -> sv.ByteTrack: """Create and configure ByteTrack tracker.""" return sv.ByteTrack( track_thresh=PROC_CONFIG.TRACK_THRESH, track_buffer=PROC_CONFIG.TRACK_BUFFER, match_thresh=PROC_CONFIG.MATCH_THRESH, frame_rate=int(fps) ) def _create_annotators() -> dict: """Create supervision annotators for frame visualization.""" colors = sv.ColorPalette.from_hex(list(PROC_CONFIG.ANNOTATION_COLORS)) return { 'bbox': sv.BoxAnnotator(color=colors), 'label': sv.LabelAnnotator(color=colors, text_color=sv.Color.BLACK), 'trace': sv.TraceAnnotator(color=colors) } def _build_detection_labels(detections: sv.Detections, processor: VideoProcessor) -> List[str]: """Build label strings for detected objects.""" if detections.tracker_id is None: return [] return [ f"#{tid} {processor.get_class_name(cid)} {conf:.2f}" for tid, cid, conf in zip( detections.tracker_id, detections.class_id, detections.confidence ) ] def _annotate_frame( frame: np.ndarray, detections: sv.Detections, labels: List[str], annotators: dict, job: ProcessingJob ) -> np.ndarray: """Apply all annotations to a frame.""" annotated = frame.copy() # Draw tracking traces, bounding boxes, and labels annotated = annotators['trace'].annotate(annotated, detections) annotated = annotators['bbox'].annotate(annotated, detections) if labels: annotated = annotators['label'].annotate(annotated, detections, labels) # Draw counting line pt1, pt2 = job.line_points_int cv2.line(annotated, pt1, pt2, PROC_CONFIG.LINE_COLOR, PROC_CONFIG.LINE_THICKNESS) return annotated def _detect_line_crossings( detections: sv.Detections, processor: VideoProcessor, tracking: TrackingState, job: ProcessingJob, session_data: SessionData, frame_idx: int, fps: float ) -> None: """ Detect vehicles crossing the counting line and create events. """ if detections.tracker_id is None: return pt1, pt2 = job.line_points_int for tracker_id, class_id, xyxy in zip( detections.tracker_id, detections.class_id, detections.xyxy ): if tracker_id is None: continue # Calculate centroid cx = int((xyxy[0] + xyxy[2]) / 2) cy = int((xyxy[1] + xyxy[3]) / 2) # Get signed distance to line dist, is_within = calculate_line_signed_distance(pt1, pt2, (cx, cy)) # Update tracking state tracking.track_classes[tracker_id] = int(class_id) prev_data = tracking.track_distances.get(tracker_id) # Check for line crossing if prev_data is not None and tracker_id not in tracking.counted_ids: prev_dist, prev_within = prev_data # Crossing detected: sign change + close to line + within bounds crossed = ( prev_dist * dist < 0 and min(abs(prev_dist), abs(dist)) < PROC_CONFIG.CROSSING_DISTANCE_THRESHOLD and (is_within or prev_within) ) if crossed: _create_crossing_event( tracker_id=tracker_id, processor=processor, tracking=tracking, job=job, session_data=session_data, frame_idx=frame_idx, fps=fps ) # Store current distance tracking.track_distances[tracker_id] = (dist, is_within) def _create_crossing_event( tracker_id: int, processor: VideoProcessor, tracking: TrackingState, job: ProcessingJob, session_data: SessionData, frame_idx: int, fps: float ) -> None: """Create and record a vehicle crossing event.""" class_id = tracking.track_classes[tracker_id] vehicle_type = processor.get_class_name(class_id) capacity = processor.get_vehicle_capacity(vehicle_type) # Direction based on camera role direction = 'OUT' if job.camera_role == CameraRole.EXIT.value else 'IN' # Calculate event timestamp time_offset = timedelta(seconds=frame_idx / fps) event_time = job.video_start_time + time_offset event = VehicleEvent( vehicle_type=vehicle_type, direction=direction, timestamp=event_time, seats_min=capacity['min'], seats_max=capacity['max'] ) session_data.add_event(event) tracking.counted_ids.add(tracker_id) # ============================================================================= # UTILITY FUNCTIONS # ============================================================================= def _clear_frame_queue(camera_role: str) -> None: """Clear any stale frames from the streaming queue.""" frame_queue = frame_queues.get(camera_role) if not frame_queue: return count = 0 while not frame_queue.empty(): try: frame_queue.get_nowait() count += 1 except queue.Empty: break if count > 0: print(f"Cleared {count} stale frames from {camera_role} queue") def _create_video_writer(job: ProcessingJob, cap: cv2.VideoCapture, fps: float) -> str: """Create output path for processed video.""" output_filename = f"{job.session_id}_{job.camera_role}_processed.mp4" return os.path.join(Config.OUTPUT_FOLDER, output_filename) def _stream_frame(frame_queue: Optional[queue.Queue], frame: np.ndarray, frame_idx: int) -> None: """Stream frame to queue for live display (non-blocking).""" if frame_queue is None: return if frame_idx % PROC_CONFIG.STREAM_FRAME_INTERVAL != 0: return try: frame_queue.put_nowait(frame.copy()) except queue.Full: pass # Skip frame if queue is full def _handle_periodic_updates( job: ProcessingJob, session_data: SessionData, firebase: FirebaseService, total_frames: int, frame_idx: int, last_event_count: int ) -> int: """ Handle periodic progress and event updates with batched Firebase writes. Returns: Updated event count """ # Update progress progress = int((frame_idx / total_frames) * 100) if total_frames > 0 else 0 if progress != job.progress: job.progress = progress _emit_progress_update(job) # Check for new events and emit them immediately via WebSocket current_count = len(session_data.events) if current_count > last_event_count: # Emit new events to WebSocket (real-time UI updates) for event in session_data.events[last_event_count:]: _emit_vehicle_event(job, event) # Emit updated statistics to WebSocket _emit_statistics_update(job, session_data.get_statistics()) # Batched Firebase persistence (time-based) time_since_last_save = (datetime.now() - job.last_firebase_save_time).total_seconds() # Save to Firebase if interval elapsed AND there are new events or statistics if time_since_last_save >= Config.FIREBASE_EVENT_BATCH_INTERVAL: if current_count > job.last_event_count_saved: # Save session with batched events and updated statistics firebase.save_session( session_data, update_events=True, # Include all events in batch camera_role=job.camera_role ) job.last_firebase_save_time = datetime.now() job.last_event_count_saved = current_count print(f"[{job.camera_role}] Batched {current_count - job.last_event_count_saved} events to Firebase") elif time_since_last_save >= Config.FIREBASE_STATISTICS_INTERVAL: # Update statistics only (no new events) firebase.save_session( session_data, update_events=False, camera_role=job.camera_role ) job.last_firebase_save_time = datetime.now() return current_count # ============================================================================= # SOCKETIO EVENT EMITTERS # ============================================================================= def _emit_status_update(job: ProcessingJob) -> None: """Emit job status update to connected clients.""" socketio.emit( 'processing_status', job.to_dict(), room=job.session_id, namespace='/' ) def _emit_progress_update(job: ProcessingJob) -> None: """Emit progress percentage update.""" socketio.emit( 'processing_progress', { 'session_id': job.session_id, 'progress': job.progress, 'camera_role': job.camera_role }, room=job.session_id, namespace='/' ) def _emit_vehicle_event(job: ProcessingJob, event: VehicleEvent) -> None: """Emit a new vehicle detection event.""" socketio.emit( 'vehicle_event', { 'session_id': job.session_id, 'event': event.to_dict(), 'camera_role': job.camera_role }, room=job.session_id, namespace='/' ) def _emit_statistics_update(job: ProcessingJob, stats: dict) -> None: """Emit updated session statistics.""" socketio.emit( 'statistics_update', { 'session_id': job.session_id, 'statistics': stats, 'camera_role': job.camera_role }, room=job.session_id, namespace='/' ) def _emit_processing_complete(job: ProcessingJob, final_stats: dict) -> None: """Emit processing completion event.""" socketio.emit( 'processing_complete', { 'session_id': job.session_id, 'statistics': final_stats, 'camera_role': job.camera_role }, room=job.session_id, namespace='/' ) def _emit_error(job: ProcessingJob, error_message: str) -> None: """Emit processing error event.""" socketio.emit( 'processing_error', { 'session_id': job.session_id, 'error': error_message, 'camera_role': job.camera_role }, room=job.session_id, namespace='/' )