""" main.py — Self-Contained, Fully Integrated Safe Driving Assistant Consolidates all system configuration, custom non-blocking sound synthesizer, dlib Eye Landmark processor, Ollama SLM action voice-assistant parser, Flask SSE Telemetry Dashboard, and main drowsiness timer logic into one unified, ultra-premium script. """ import os import sys import time import json import queue import collections import threading import logging import urllib.request import urllib.parse import webbrowser import re # Force dummy audio driver for headless container environments os.environ["SDL_AUDIODRIVER"] = "dummy" import numpy as np import scipy.io.wavfile as wavfile import pygame import pyttsx3 import cv2 import face_recognition import speech_recognition as sr from flask import Flask, render_template, Response, jsonify, request # 1. DriveSafe Assistant — Configuration Settings CAMERA_ID = int(os.environ.get("CAMERA_ID", 0)) # Index of the webcam (usually 0) FRAME_WIDTH = int(os.environ.get("FRAME_WIDTH", 640)) # Video capture width FRAME_HEIGHT = int(os.environ.get("FRAME_HEIGHT", 480)) # Video capture height # Drowsiness Detection Thresholds EAR_THRESHOLD = 0.23 # Eye Aspect Ratio below this indicates closed eyes EAR_CONSEC_FRAMES = 3 # Consecutive frames below threshold to trigger eye-closed timer # Alert Severity Levels (Durations in Seconds) ALERT_LEVEL1_MIN = 3.0 # Min duration of closed eyes for Level 1 ("stay focused") ALERT_LEVEL1_MAX = 5.0 # Max duration of closed eyes for Level 1 ALERT_LEVEL2_MIN = 5.0 # Closed eyes duration for Level 2 ("wake up stay focus on road" louder) # Frequent Drowsiness Pattern Tracking FREQUENT_DROWSY_WINDOW = 60.0 # Sliding window (seconds) to monitor drowsiness event frequency FREQUENT_DROWSY_LIMIT = 2 # Max drowsiness warnings allowed in window before advising a break (Level 3) # Voice Assistant & SLM Settings OLLAMA_MODEL = os.environ.get("OLLAMA_MODEL", "drivesafe") # Our custom local Ollama model OLLAMA_API_URL = os.environ.get("OLLAMA_API_URL", "http://localhost:11434/api/generate") # Ollama generation endpoint SPEECH_RECOGNITION_TIMEOUT = 10 # Timeout for speech recognizer WAKE_WORD = "assistant" # Wake word for general conversations # Web HUD Dashboard Server FLASK_HOST = os.environ.get("FLASK_HOST", "127.0.0.1") FLASK_PORT = int(os.environ.get("FLASK_PORT", 5000)) # High Energy Music Links ENERGETIC_MUSIC_URL = "https://music.youtube.com/playlist?list=PLYBSqm--lNVt1H63PlRvigxvPU_unQe8m" # 2. Flask Web HUD Server & Shared DashboardState # Initialize Flask app app = Flask(__name__, template_folder='templates', static_folder='static') # Thread-safe global state for Flask-main loop communication class DashboardState: def __init__(self): self.lock = threading.Lock() self.latest_frame = None self.ear = 0.0 self.state = "NORMAL" self.drowsiness_count = 0 self.fps = 0 self.alert_message = "" self.chat_history = [] self.detection_active = True dashboard_state = DashboardState() @app.route('/') def index(): """Renders the futuristic cyberpunk HUD dashboard.""" return render_template('index.html') def gen_video_feed(): """Generator function that yields JPEG frames for the live camera stream.""" while True: with dashboard_state.lock: if dashboard_state.latest_frame is None: frame_to_send = None else: frame_to_send = dashboard_state.latest_frame.copy() if frame_to_send is not None: # Encode BGR OpenCV frame to standard JPEG ret, jpeg = cv2.imencode('.jpg', frame_to_send) if ret: yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + jpeg.tobytes() + b'\r\n\r\n') # Frame-rate limiter (30 FPS max for the web stream to keep networking lightweight) time.sleep(1.0 / 30.0) @app.route('/video_feed') def video_feed(): """Serves the real-time annotated video stream inside standard HTML img tags.""" return Response(gen_video_feed(), mimetype='multipart/x-mixed-replace; boundary=frame') def gen_telemetry_stream(): """Streams real-time system diagnostics to the browser via HTML5 Server-Sent Events (SSE).""" last_sent_time = 0 while True: # Throttle telemetry updates slightly (e.g. 15 updates/second) to keep browser rendering butter-smooth current_time = time.time() if current_time - last_sent_time >= 0.06: with dashboard_state.lock: data = { "ear": round(dashboard_state.ear, 3), "state": dashboard_state.state, "drowsiness_count": dashboard_state.drowsiness_count, "fps": dashboard_state.fps, "alert_message": dashboard_state.alert_message, "chat_history": dashboard_state.chat_history, "detection_active": dashboard_state.detection_active } # SSE data format: "data: \n\n" yield f"data: {json.dumps(data)}\n\n" last_sent_time = current_time time.sleep(0.01) @app.route('/telemetry') def telemetry(): """SSE endpoint for high-speed diagnostic telemetry streaming.""" return Response(gen_telemetry_stream(), mimetype='text/event-stream') # Interactive Control APIs @app.route('/api/toggle_detection', methods=['POST']) def toggle_detection(): """Enables or disables active face and eye tracking.""" with dashboard_state.lock: dashboard_state.detection_active = not dashboard_state.detection_active status = dashboard_state.detection_active return jsonify({"status": "success", "detection_active": status}) @app.route('/api/reset', methods=['POST']) def api_reset(): """Triggers a complete system reset from the dashboard panel.""" if hasattr(app, 'reset_callback') and app.reset_callback: app.reset_callback() return jsonify({"status": "success", "message": "System alerts and warning log reset."}) return jsonify({"status": "error", "message": "Reset callback not configured."}) @app.route('/api/trigger_music', methods=['POST']) def api_trigger_music(): """Manually triggers the energetic song from the dashboard panel.""" if hasattr(app, 'play_music_callback') and app.play_music_callback: app.play_music_callback() return jsonify({"status": "success", "message": "Playing energetic synthwave music!"}) return jsonify({"status": "error", "message": "Music callback not configured."}) def start_server_async(): """Runs the Flask development server on a dedicated background thread.""" # Suppress Flask development server startup messages to keep terminal clean log = logging.getLogger('werkzeug') log.setLevel(logging.ERROR) server_thread = threading.Thread( target=lambda: app.run(host=FLASK_HOST, port=FLASK_PORT, debug=False, use_reloader=False), daemon=True ) server_thread.start() print(f"[Flask Server] Running in background at http://{FLASK_HOST}:{FLASK_PORT}") # 3. AlertManager — Programmatic Sound Synthesis & Multi-Threaded Audio class AlertManager: def __init__(self): # Ensure directories exist os.makedirs("audio", exist_ok=True) # Programmatically synthesize our warning and chime audio files self._synthesize_audio_assets() # Initialize Pygame Mixer for non-blocking SFX playback pygame.mixer.init() # Audio file paths self.calm_beep_path = os.path.join("audio", "calm_beep.wav") self.urgent_beep_path = os.path.join("audio", "urgent_beep.wav") # Thread-safe speech queue & worker setup self.speech_queue = queue.Queue() self.is_speaking = False self.speech_thread = threading.Thread(target=self._speech_worker, daemon=True) self.speech_thread.start() def _synthesize_audio_assets(self): """Synthesizes custom chime and alert WAV files using numpy and scipy.""" sample_rate = 44100 # 1. Calm chime (gentle 550Hz sine wave decaying) duration = 0.4 t = np.linspace(0, duration, int(sample_rate * duration), False) envelope = np.exp(-5 * t) # decay envelope tone = np.sin(2 * np.pi * 550 * t) * envelope calm_data = (tone * 20000).astype(np.int16) wavfile.write(os.path.join("audio", "calm_beep.wav"), sample_rate, calm_data) # 2. Urgent pulsing beeps (three rapid 1200Hz pulse bursts) urgent_data = [] burst_duration = 0.08 gap_duration = 0.05 t_burst = np.linspace(0, burst_duration, int(sample_rate * burst_duration), False) burst = np.sin(2 * np.pi * 1200 * t_burst) * 32000 gap = np.zeros(int(sample_rate * gap_duration)) # Combine three bursts for _ in range(3): urgent_data.extend(burst) urgent_data.extend(gap) urgent_np = np.array(urgent_data, dtype=np.int16) wavfile.write(os.path.join("audio", "urgent_beep.wav"), sample_rate, urgent_np) def _speech_worker(self): """Background worker thread that serializes all speech requests using native PowerShell synthesis to prevent COM/threading locks.""" print("[AlertManager] Speech worker thread active.") import subprocess while True: try: # Blocks until an item is available text, volume, rate = self.speech_queue.get() self.is_speaking = True # Escape single quotes and backslashes for PowerShell safety escaped_text = text.replace("\\", "\\\\").replace("'", "''") # Map rate (150-190) to PowerShell Rate (-10 to 10) ps_rate = 0 if rate > 180: ps_rate = 2 elif rate < 150: ps_rate = -2 # Map volume (0.0 to 1.0) to PowerShell Volume (0 to 100) ps_volume = int(volume * 100) ps_command = ( f"Add-Type -AssemblyName System.Speech; " f"$speak = New-Object System.Speech.Synthesis.SpeechSynthesizer; " f"$speak.Rate = {ps_rate}; " f"$speak.Volume = {ps_volume}; " f"$speak.Speak('{escaped_text}')" ) # Run synchronously inside the worker thread to maintain sequential speech subprocess.run( ["powershell", "-NoProfile", "-ExecutionPolicy", "Bypass", "-Command", ps_command], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) self.is_speaking = False self.speech_queue.task_done() except Exception as e: print(f"[AlertManager] Speech worker exception: {e}") self.is_speaking = False time.sleep(0.5) def speak(self, text, volume=0.8, rate=170): """Enqueues a text string to be spoken in the background thread.""" self.speech_queue.put((text, volume, rate)) def trigger_level1(self): """Level 1 Alert (3-5s closed): Soft chime, then calm voice.""" print("[AlertManager] Triggering Level 1 Alert: Calm Stay Focused") pygame.mixer.Sound(self.calm_beep_path).play() self.speak("Stay focused on the road", volume=0.7, rate=160) def trigger_level2(self): """Level 2 Alert (>5s closed): Loud siren beep, then loud voice.""" print("[AlertManager] Triggering Level 2 Alert: Loud WAKE UP!") pygame.mixer.Sound(self.urgent_beep_path).play() self.speak("Wake up! Stay focused on the road!", volume=1.0, rate=190) def trigger_level3_advisory(self): """Level 3 Alert (Frequent drowsiness): Ask to take a rest break on the side.""" print("[AlertManager] Triggering Level 3 Alert: Rest break advisory") pygame.mixer.Sound(self.urgent_beep_path).play() self.speak("You are getting drowsy frequently. Please pull over on the side and take a rest.", volume=0.9, rate=170) def ask_energetic_song(self): """Ask the driver if they want to listen to an energetic song.""" print("[AlertManager] Querying driver for energetic song") self.speak("Alright. Would you like to listen to an energetic song to help you stay awake?", volume=0.85, rate=170) def play_energetic_music(self): """Announce and play energetic music.""" print("[AlertManager] Playing energetic music") self.speak("Playing some high energy synthwave beats. Turn it up and stay alert!", volume=0.9, rate=170) webbrowser.open(ENERGETIC_MUSIC_URL) # 4. EyeDetector — 2x Downsampling dlib Eye Landmark Processor with Fallback class EyeDetector: def __init__(self): self.scale_factor = 2 # Resizes to 50% width/height (4x speedup) self.last_warning_time = 0 def _calculate_ear(self, eye_points): """Calculates the Eye Aspect Ratio (EAR) for a single eye list of 6 points.""" p1 = np.array(eye_points[0]) p2 = np.array(eye_points[1]) p3 = np.array(eye_points[2]) p4 = np.array(eye_points[3]) p5 = np.array(eye_points[4]) p6 = np.array(eye_points[5]) vertical1 = np.linalg.norm(p2 - p6) vertical2 = np.linalg.norm(p3 - p5) horizontal = np.linalg.norm(p1 - p4) if horizontal == 0: return 0.0 return (vertical1 + vertical2) / (2.0 * horizontal) def process_frame(self, frame): """Processes a single BGR camera frame with a robust full-res fallback.""" height, width, _ = frame.shape debug_frame = frame.copy() # 1. Downsample the frame for high-speed face detection small_frame = cv2.resize(frame, (0, 0), fx=1.0/self.scale_factor, fy=1.0/self.scale_factor) rgb_small_frame = cv2.cvtColor(small_frame, cv2.COLOR_BGR2RGB) # 2. Try fast downscaled detection first face_landmarks_list = face_recognition.face_landmarks(rgb_small_frame) current_scale = self.scale_factor # 3. Fallback: If no face found in small frame, try the full-resolution frame! if not face_landmarks_list: rgb_full_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) face_landmarks_list = face_recognition.face_landmarks(rgb_full_frame) current_scale = 1 avg_ear = None landmarks_found = None if face_landmarks_list: face_landmarks = face_landmarks_list[0] landmarks_found = face_landmarks left_eye_raw = face_landmarks.get('left_eye', []) right_eye_raw = face_landmarks.get('right_eye', []) if len(left_eye_raw) == 6 and len(right_eye_raw) == 6: # Scale coordinates back up to original frame dimensions left_eye = [(int(x * current_scale), int(y * current_scale)) for (x, y) in left_eye_raw] right_eye = [(int(x * current_scale), int(y * current_scale)) for (x, y) in right_eye_raw] left_ear = self._calculate_ear(left_eye) right_ear = self._calculate_ear(right_eye) avg_ear = (left_ear + right_ear) / 2.0 # Draw the glowing tech HUD outlines self._draw_eye_hud(debug_frame, left_eye, right_eye, avg_ear) else: # No face detected! Print throttled console warning and show overlay text current_time = time.time() if current_time - self.last_warning_time > 2.5: print("[EyeDetector] WARNING: No face detected in camera stream! Adjust position or lighting.") self.last_warning_time = current_time # Draw warning overlay on dashboard feed cv2.putText(debug_frame, "NO FACE DETECTED", (width // 2 - 120, height // 2), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2) cv2.putText(debug_frame, "Adjust Camera / Lighting", (width // 2 - 140, height // 2 + 30), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 165, 255), 1) return avg_ear, landmarks_found, debug_frame def _draw_eye_hud(self, frame, left_eye, right_eye, ear): """Draws glowing HUD tech contours on eyes and shows EAR readout.""" if ear is not None and ear < EAR_THRESHOLD: color = (0, 0, 255) # Red: Closed/Drowsy thickness = 2 else: color = (0, 255, 0) # Green: Open/Safe thickness = 1 left_pts = np.array(left_eye, np.int32) cv2.polylines(frame, [left_pts], True, color, thickness) right_pts = np.array(right_eye, np.int32) cv2.polylines(frame, [right_pts], True, color, thickness) for (x, y) in left_eye + right_eye: cv2.circle(frame, (x, y), 2, (255, 255, 0), -1) if ear is not None: text = f"EAR: {ear:.2f}" cv2.putText(frame, text, (30, 40), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2) # 5. VoiceAssistant — Dynamic Action Router & Background Speech Recognition class VoiceAssistant: def __init__(self, alert_manager, state_callbacks): self.alert_manager = alert_manager self.callbacks = state_callbacks self.recognizer = sr.Recognizer() self.recognizer.energy_threshold = 4000 self.recognizer.dynamic_energy_threshold = True self.running = True self.thread = threading.Thread(target=self._assistant_loop, daemon=True) self.thread.start() def query_ollama_slm(self, prompt): """Sends user transcription to the local custom drivesafe SLM on Ollama.""" payload = { "model": OLLAMA_MODEL, "prompt": prompt, "stream": False } headers = {"Content-Type": "application/json"} try: req = urllib.request.Request( OLLAMA_API_URL, data=json.dumps(payload).encode("utf-8"), headers=headers, method="POST" ) # Use a 10-second timeout to accommodate initial Ollama cold start weight loading with urllib.request.urlopen(req, timeout=10) as response: res_data = json.loads(response.read().decode("utf-8")) reply = res_data.get("response", "").strip() # Clean up any quotes or markdown from the SLM reply = reply.replace('"', '').replace('*', '').strip() return reply except Exception as e: print(f"[Ollama SLM] Error or timeout querying local model: {e}") lower_prompt = prompt.lower() if "hello" in lower_prompt or "hi" in lower_prompt: return "Hello! I am here. Eyes on the road, friend." elif "joke" in lower_prompt: return "Why did the scarecrow win an award? Because he was outstanding in his field. Stay alert!" else: return "Understood. Keep driving safely, stay focused on the road." def _assistant_loop(self): """Background continuous microphone listening loop.""" print("[VoiceAssistant] Speech recognizer thread started.") try: mic = sr.Microphone() except Exception as e: print(f"[VoiceAssistant] Error accessing microphone: {e}. Voice controls disabled.") return with mic as source: print("[VoiceAssistant] Calibrating microphone for driving background noise...") self.recognizer.adjust_for_ambient_noise(source, duration=2) print("[VoiceAssistant] Calibration complete. Ready for voice interaction.") while self.running: if self.alert_manager.is_speaking: time.sleep(0.3) continue try: audio = self.recognizer.listen(source, timeout=1.5, phrase_time_limit=4.0) except sr.WaitTimeoutError: continue except Exception as e: print(f"[VoiceAssistant] Microphone capture error: {e}") time.sleep(0.5) continue if self.alert_manager.is_speaking: continue # Run speech recognition in a separate thread to keep mic pipeline responsive threading.Thread(target=self._process_audio, args=(audio,), daemon=True).start() def _process_audio(self, audio): """Recognizes speech and routes commands dynamically.""" try: text = self.recognizer.recognize_google(audio) print(f"[Driver Heard] {text}") except sr.UnknownValueError: return except sr.RequestError: try: print("[VoiceAssistant] Cloud Speech API unavailable. Attempting local Whisper...") text = self.recognizer.recognize_whisper(audio, model="base.en") print(f"[Driver Heard (Whisper)] {text}") except Exception as e: print(f"[VoiceAssistant] Offline recognition failed: {e}") return cleaned_text = text.strip().lower() if not cleaned_text: return # STATE-SPECIFIC ROUTING (Emergency Rest / Song Prompts) current_state = self.callbacks['get_system_state']() # 1. State: Driver has been warned of frequent drowsiness (Level 3 Advisory) if current_state == "WAITING_REST_RESPONSE": refusal_words = ["no", "never", "can't", "wont", "won't", "refuse", "impossible", "fine", "good", "no thanks", "no rest", "keep driving"] accepted_words = ["yes", "yeah", "ok", "okay", "fine I will", "sure", "pulling over"] if any(word in cleaned_text for word in refusal_words): print("[VoiceAssistant] Driver refused rest. Prompting for energetic song.") self.callbacks['set_system_state']("WAITING_SONG_RESPONSE") self.callbacks['add_chat_log'](text, "No, I'm fine. I won't stop.") time.sleep(0.5) self.alert_manager.ask_energetic_song() self.callbacks['add_chat_log']("System", "Alright. Would you like to listen to an energetic song to help you stay awake?") return elif any(word in cleaned_text for word in accepted_words) or "pull" in cleaned_text: print("[VoiceAssistant] Driver accepted rest.") self.callbacks['reset_warnings']() self.callbacks['add_chat_log'](text, "Okay, pulling over.") self.alert_manager.speak("Good decision. Pull over safely and take some rest.") self.callbacks['add_chat_log']("System", "Good decision. Pull over safely and take some rest.") return # 2. State: Driver refused rest, now confirming if they want a song elif current_state == "WAITING_SONG_RESPONSE": accepted_words = ["yes", "yeah", "sure", "ok", "okay", "play", "song", "music", "please"] if any(word in cleaned_text for word in accepted_words): print("[VoiceAssistant] Driver accepted song.") self.callbacks['add_chat_log'](text, "Yes, play some music.") self.alert_manager.play_energetic_music() self.callbacks['add_chat_log']("System", "Playing energetic synthwave beats! Stay awake!") self.callbacks['set_system_state']("PLAYING_MUSIC") return else: print("[VoiceAssistant] Driver declined song.") self.callbacks['add_chat_log'](text, "No, I'm okay.") self.alert_manager.speak("Understood. Keep your eyes on the road. Stay focused.") self.callbacks['add_chat_log']("System", "Understood. Keep your eyes on the road. Stay focused.") self.callbacks['reset_warnings']() return # DIRECT SYSTEM BACKUP COMMANDS (Local Regex Override) if "reset" in cleaned_text or "clear" in cleaned_text or "awake" in cleaned_text or "focused" in cleaned_text: print("[VoiceAssistant] Safe state reset command received.") self.callbacks['reset_warnings']() self.callbacks['add_chat_log'](text, "Reset assistant") self.alert_manager.speak("System reset. Let's keep driving safely.") self.callbacks['add_chat_log']("System", "System reset. Let's keep driving safely.") return has_play = any(p in cleaned_text for p in ["play", "start", "turn on", "listen", "put on", "launch"]) has_music_kw = any(kw in cleaned_text for kw in ["music", "song", "beat", "tune", "track", "musc", "melody", "audio", "lofi"]) if has_play: # Extract query after the play keyword play_keyword = next((p for p in ["play", "start", "turn on", "listen to", "put on", "launch"] if p in cleaned_text), "play") idx = cleaned_text.find(play_keyword) music_query = cleaned_text[idx + len(play_keyword):].strip() # Clean common filler words for filler in ["some", "a", "the", "music", "song", "track", "musc"]: if music_query.startswith(filler): music_query = music_query[len(filler):].strip() if music_query.endswith(filler): music_query = music_query[:-len(filler)].strip() # If the remaining query is empty or generic, play the custom playlist if not music_query or music_query in ["music", "song", "beat", "tune", "track", "musc", "melody"]: print("[VoiceAssistant] General music command recognized locally. Playing playlist.") self.callbacks['add_chat_log'](text, "Requested general music playback") self.alert_manager.play_energetic_music() self.callbacks['set_system_state']("PLAYING_MUSIC") return else: # Play specific song directly! print(f"[VoiceAssistant] Specific song command recognized locally: {music_query}") confirm_msg = f"Sure thing! Autoplay in progress for {music_query}." self.callbacks['add_chat_log'](text, confirm_msg) self.alert_manager.speak(confirm_msg) # Fetch first search result and autoplay! search_url = f"https://www.youtube.com/results?search_query={urllib.parse.quote(music_query)}" headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'} try: req = urllib.request.Request(search_url, headers=headers) with urllib.request.urlopen(req, timeout=5) as res: html = res.read().decode('utf-8') video_ids = re.findall(r'/watch\?v=([a-zA-Z0-9_-]{11})', html) if video_ids: first_video_id = video_ids[0] direct_url = f"https://www.youtube.com/watch?v={first_video_id}&autoplay=1" print(f"[VoiceAssistant] Auto-playing first matching YouTube video: {direct_url}") webbrowser.open(direct_url) else: webbrowser.open(search_url) except Exception as e: print(f"[VoiceAssistant] Autoplay scraper failed: {e}. Falling back to search page.") webbrowser.open(search_url) self.callbacks['set_system_state']("PLAYING_MUSIC") return has_stop = any(s in cleaned_text for s in ["stop", "pause", "turn off", "mute", "quiet", "halt", "shut up"]) if has_stop and (has_music_kw or "music" in cleaned_text or "song" in cleaned_text or "sound" in cleaned_text or "radio" in cleaned_text): print("[VoiceAssistant] Flexible stop music command recognized.") # Simulate media play/pause key to halt browser/audio stream import ctypes VK_MEDIA_PLAY_PAUSE = 0xB3 try: ctypes.windll.user32.keybd_event(VK_MEDIA_PLAY_PAUSE, 0, 0, 0) ctypes.windll.user32.keybd_event(VK_MEDIA_PLAY_PAUSE, 0, 2, 0) except Exception as e: print(f"[VoiceAssistant] Failed simulating media key: {e}") self.callbacks['set_system_state']("NORMAL") self.callbacks['add_chat_log'](text, "Stop the music") self.alert_manager.speak("Stopping the music. Keep your eyes on the road.") self.callbacks['add_chat_log']("System", "Music stopped.") return # CONVERSATIONAL LOCAL SLM (Always Active - No Wake Word/Filters Required!) # Route ANY general speech dynamically straight to our local Ollama custom model! print(f"[Ollama Query] {text}") reply = self.query_ollama_slm(text) print(f"[SLM Reply] {reply}") # Check if Ollama returned a dynamic PLAY action tag (e.g. "[PLAY] paint it black") if "[play]" in reply.lower(): match = re.search(r'\[play\]\s*(.*)', reply, re.IGNORECASE) if match: music_query = match.group(1).strip() music_query = music_query.replace('"', '').replace('[', '').replace(']', '').strip() confirm_msg = f"Sure thing! Autoplay in progress for {music_query}." self.callbacks['add_chat_log'](text, confirm_msg) self.alert_manager.speak(confirm_msg) # Fetch the first search result from YouTube dynamically and play it directly! search_url = f"https://www.youtube.com/results?search_query={urllib.parse.quote(music_query)}" headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'} try: req = urllib.request.Request(search_url, headers=headers) with urllib.request.urlopen(req, timeout=5) as res: html = res.read().decode('utf-8') # Search for video watch paths video_ids = re.findall(r'/watch\?v=([a-zA-Z0-9_-]{11})', html) if video_ids: first_video_id = video_ids[0] direct_url = f"https://www.youtube.com/watch?v={first_video_id}&autoplay=1" print(f"[VoiceAssistant] Auto-playing first matching YouTube video: {direct_url}") webbrowser.open(direct_url) else: webbrowser.open(search_url) except Exception as e: print(f"[VoiceAssistant] Autoplay scraper failed: {e}. Falling back to search page.") webbrowser.open(search_url) self.callbacks['set_system_state']("PLAYING_MUSIC") return # Check if Ollama returned a dynamic STOP action tag (e.g. "[STOP]") if "[stop]" in reply.lower(): match = re.search(r'\[stop\]\s*(.*)', reply, re.IGNORECASE) clean_reply = match.group(1).strip() if match else "Stopping the music. Keep your eyes on the road!" clean_reply = clean_reply.replace('[', '').replace(']', '').strip() print("[VoiceAssistant] Action STOP triggered dynamically by Ollama.") # Simulate media play/pause key to stop the browser audio stream import ctypes VK_MEDIA_PLAY_PAUSE = 0xB3 try: ctypes.windll.user32.keybd_event(VK_MEDIA_PLAY_PAUSE, 0, 0, 0) ctypes.windll.user32.keybd_event(VK_MEDIA_PLAY_PAUSE, 0, 2, 0) except Exception as e: print(f"[VoiceAssistant] Failed simulating media key: {e}") self.callbacks['set_system_state']("NORMAL") self.callbacks['add_chat_log'](text, clean_reply) self.alert_manager.speak(clean_reply) return # Check if Ollama returned a dynamic RESET action tag (e.g. "[RESET]") if "[reset]" in reply.lower(): match = re.search(r'\[reset\]\s*(.*)', reply, re.IGNORECASE) clean_reply = match.group(1).strip() if match else "System warnings cleared. Drive safely!" clean_reply = clean_reply.replace('[', '').replace(']', '').strip() print("[VoiceAssistant] Action RESET triggered dynamically by Ollama.") self.callbacks['reset_warnings']() self.callbacks['add_chat_log'](text, clean_reply) self.alert_manager.speak(clean_reply) return # General conversational response self.callbacks['add_chat_log'](text, reply) self.alert_manager.speak(reply) def stop(self): """Stops the assistant background thread.""" self.running = False # 6. SafeDrivingAssistant Core Engine & Orchestrator Coordinator class SafeDrivingAssistant: def __init__(self): print("[CoreEngine] Initializing Safe Driving Assistant...") # Initialize Audio Alert & Sound Synthesizer self.alert_manager = AlertManager() # Initialize face_recognition Eye Landmark Processor self.detector = EyeDetector() # Tracking states and timelines self.consec_closed_frames = 0 self.eyes_closed_start_time = None self.active_alert_level = 0 # 0: None, 1: Stay Focused, 2: Wake Up Loud # Rolling log of drowsiness timestamps to monitor frequency self.drowsiness_events = collections.deque() # Keyboard reset helper self.last_key_press = None # Setup conversational callbacks for our Voice Assistant & SLM self.callbacks = { 'get_system_state': self.get_system_state, 'set_system_state': self.set_system_state, 'reset_warnings': self.reset_warnings, 'add_chat_log': self.add_chat_log } # Bind callbacks back to Flask REST API endpoints app.reset_callback = self.reset_warnings app.play_music_callback = self.play_energetic_music # Initialize speech listener thread self.assistant = VoiceAssistant(self.alert_manager, self.callbacks) # Boot Flask HUD Web Dashboard in the background start_server_async() # Coordinator Callback Handlers def get_system_state(self): """Thread-safe state getter for the Voice Assistant.""" with dashboard_state.lock: return dashboard_state.state def set_system_state(self, new_state): """Thread-safe state setter for the Voice Assistant.""" with dashboard_state.lock: dashboard_state.state = new_state if new_state == "NORMAL": dashboard_state.alert_message = "" elif new_state == "WAITING_REST_RESPONSE": dashboard_state.alert_message = "ADVISING REST BREAK" elif new_state == "WAITING_SONG_RESPONSE": dashboard_state.alert_message = "OFFERING ENERGETIC MUSIC" elif new_state == "PLAYING_MUSIC": dashboard_state.alert_message = "PLAYING HIGH ENERGY BEATS" def reset_warnings(self): """Complete reset of all active alarms, timers, and warning metrics.""" print("[CoreEngine] Performing comprehensive system alert reset.") with dashboard_state.lock: dashboard_state.state = "NORMAL" dashboard_state.alert_message = "" dashboard_state.drowsiness_count = 0 self.consec_closed_frames = 0 self.eyes_closed_start_time = None self.active_alert_level = 0 self.drowsiness_events.clear() # Enqueue a log message self.add_chat_log("System", "System alerts and warnings reset to NORMAL.") def add_chat_log(self, user_query, slm_reply=""): """Pushes voice transcripts to the dashboard log log history.""" with dashboard_state.lock: if user_query == "System": dashboard_state.chat_history.append({ "speaker": "System", "message": slm_reply }) else: dashboard_state.chat_history.append({ "speaker": "Driver", "query": user_query, "message": slm_reply }) def play_energetic_music(self): """Orchestrator hook to trigger the energetic music sequence.""" self.set_system_state("PLAYING_MUSIC") self.alert_manager.play_energetic_music() self.add_chat_log("System", "Energetic synthwave music started. Stay alert!") # Core Drowsiness Evaluation & Loop def run(self): """Main camera acquisition loop that drives the safe assistant.""" print("[CoreEngine] Accessing camera stream...") cap = cv2.VideoCapture(CAMERA_ID) # Configure video dimension overrides from settings cap.set(cv2.CAP_PROP_FRAME_WIDTH, FRAME_WIDTH) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, FRAME_HEIGHT) # Detect if we are in a headless cloud environment without a webcam use_simulation = False if not cap.isOpened(): print("[CoreEngine] WARNING: Could not access physical web camera.") print("[CoreEngine] Pivoting to Cloud Simulation Mode to keep web HUD alive...") use_simulation = True else: print("[CoreEngine] Camera stream operational. System fully active.") print("[CoreEngine] System loop running. Use the Web Dashboard to monitor telemetry.") prev_time = time.time() try: while True: current_time = time.time() if use_simulation: # Generate an animated cyberpunk grid frame for the headless dashboard frame = np.zeros((FRAME_HEIGHT, FRAME_WIDTH, 3), dtype=np.uint8) # Create a scrolling scan line scan_y = int(current_time * 120) % FRAME_HEIGHT cv2.line(frame, (0, scan_y), (FRAME_WIDTH, scan_y), (40, 40, 40), 2) cv2.putText(frame, "CLOUD SIMULATION FEED (NO PHYSICAL CAM)", (20, FRAME_HEIGHT - 20), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (0, 255, 0), 1) ret = True else: ret, frame = cap.read() if not ret: time.sleep(0.01) continue # Mirror frame for intuitive pilot HUD overlay frame = cv2.flip(frame, 1) # Check if tracking is active (controlled from Dashboard) with dashboard_state.lock: active = dashboard_state.detection_active if active: if use_simulation: # Cloud Demo Mode: Automatically simulate a drowsy event cycle every 25 seconds # to let you test your Flask dashboard overlays and system responses safely! cycle = int(current_time) % 25 if cycle > 18: # Simulate closed eyes for 7 seconds ear = 0.16 cv2.putText(frame, "SIMULATING DROWSINESS (EYES CLOSED)", (FRAME_WIDTH // 2 - 180, FRAME_HEIGHT // 2), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2) else: ear = 0.28 processed_frame = frame.copy() # Draw virtual telemetry eye dots onto the matrix background cv2.circle(processed_frame, (int(FRAME_WIDTH * 0.4), int(FRAME_HEIGHT * 0.45)), 8, (0, 255, 0) if ear > EAR_THRESHOLD else (0, 0, 255), -1) cv2.circle(processed_frame, (int(FRAME_WIDTH * 0.6), int(FRAME_HEIGHT * 0.45)), 8, (0, 255, 0) if ear > EAR_THRESHOLD else (0, 0, 255), -1) if ear is not None: cv2.putText(processed_frame, f"EAR: {ear:.2f}", (30, 40), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0) if ear > EAR_THRESHOLD else (0, 0, 255), 2) landmarks = {} else: # Calculate EAR and overlay glow contours on frame via physical camera ear, landmarks, processed_frame = self.detector.process_frame(frame) else: processed_frame = frame.copy() cv2.putText(processed_frame, "TRACKING PAUSED", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 165, 255), 2) ear = None # Calculate processing frame rate (FPS) fps = int(1.0 / (current_time - prev_time)) if (current_time - prev_time) > 0 else 30 prev_time = current_time # Drowsiness Logic Decision Engine if ear is not None and active: if ear < EAR_THRESHOLD: self.consec_closed_frames += 1 # Once consecutive frames pass noise filter, start duration timer if self.consec_closed_frames >= EAR_CONSEC_FRAMES: if self.eyes_closed_start_time is None: self.eyes_closed_start_time = current_time else: closed_duration = current_time - self.eyes_closed_start_time # LEVEL 1: Eyes Closed 3-5 seconds if ALERT_LEVEL1_MIN <= closed_duration < ALERT_LEVEL1_MAX: if self.active_alert_level < 1: self.active_alert_level = 1 self.set_system_state("CLOSED_3S") self.alert_manager.trigger_level1() # Record timestamp to sliding frequency tracker self.drowsiness_events.append(current_time) with dashboard_state.lock: dashboard_state.drowsiness_count += 1 self.add_chat_log("System", "WARNING: Eyes closed for 3 seconds! Stay focused!") # LEVEL 2: Eyes Closed > 5 seconds (Louder Warning!) elif closed_duration >= ALERT_LEVEL2_MIN: if self.active_alert_level < 2: self.active_alert_level = 2 self.set_system_state("CLOSED_5S") self.alert_manager.trigger_level2() # Record second timestamp self.drowsiness_events.append(current_time) with dashboard_state.lock: dashboard_state.drowsiness_count += 1 self.add_chat_log("System", "CRITICAL ALARM: Eyes closed for 5+ seconds! WAKE UP!") else: # Eyes are open! Reset filters and check for Level 3 Advisory escalation self.consec_closed_frames = 0 if self.eyes_closed_start_time is not None: self.eyes_closed_start_time = None while self.drowsiness_events and (current_time - self.drowsiness_events[0] > FREQUENT_DROWSY_WINDOW): self.drowsiness_events.popleft() # LEVEL 3: Frequent Drowsiness check (if closed events occur >= limit in last 60s) if len(self.drowsiness_events) >= FREQUENT_DROWSY_LIMIT: print(f"[CoreEngine] Frequent drowsiness detected ({len(self.drowsiness_events)} events in 60s). Escalating to Level 3.") self.set_system_state("WAITING_REST_RESPONSE") self.alert_manager.trigger_level3_advisory() self.add_chat_log("System", "FREQUENT DROWSINESS DETECTED. Prompting driver to pull over.") else: # Normal recovery current_state = self.get_system_state() if current_state not in ["WAITING_REST_RESPONSE", "WAITING_SONG_RESPONSE"]: self.set_system_state("NORMAL") self.active_alert_level = 0 else: self.consec_closed_frames = 0 self.eyes_closed_start_time = None # Update Global Telemetry Buffer for Flask Server with dashboard_state.lock: dashboard_state.latest_frame = processed_frame.copy() if ear is not None: dashboard_state.ear = ear else: dashboard_state.ear = 0.30 # Default baseline when no face present dashboard_state.fps = fps # OpenCV display output fallback (wrapped safely to prevent headless display context drops) try: cv2.imshow("DriveSafe HUD AI Console", processed_frame) key = cv2.waitKey(1) & 0xFF if key == ord('q') or key == 27: print("[CoreEngine] Exit key received. Terminating system.") break elif key == ord('r'): self.reset_warnings() except Exception: # Prevents crashes on platforms where standard desktop window pipelines are fully restricted time.sleep(0.03) except KeyboardInterrupt: print("[CoreEngine] Keyboard interrupt. Shutting down.") finally: print("[CoreEngine] Releasing resources...") cap.release() try: cv2.destroyAllWindows() except Exception: pass self.assistant.stop() sys.exit(0) if __name__ == "__main__": assistant_app = SafeDrivingAssistant() assistant_app.run()