Spaces:
Runtime error
Runtime error
| """ | |
| main.py — Self-Contained, Fully Integrated Safe Driving Assistant | |
| Consolidates all system configuration, custom non-blocking sound synthesizer, dlib Eye Landmark processor, | |
| Ollama SLM action voice-assistant parser, Flask SSE Telemetry Dashboard, and main drowsiness timer logic | |
| into one unified, ultra-premium script. | |
| """ | |
| import os | |
| import sys | |
| import time | |
| import json | |
| import queue | |
| import collections | |
| import threading | |
| import logging | |
| import urllib.request | |
| import urllib.parse | |
| import webbrowser | |
| import re | |
| # Force dummy audio driver for headless container environments | |
| os.environ["SDL_AUDIODRIVER"] = "dummy" | |
| import numpy as np | |
| import scipy.io.wavfile as wavfile | |
| import pygame | |
| import pyttsx3 | |
| import cv2 | |
| import face_recognition | |
| import speech_recognition as sr | |
| from flask import Flask, render_template, Response, jsonify, request | |
| # 1. DriveSafe Assistant — Configuration Settings | |
| CAMERA_ID = int(os.environ.get("CAMERA_ID", 0)) # Index of the webcam (usually 0) | |
| FRAME_WIDTH = int(os.environ.get("FRAME_WIDTH", 640)) # Video capture width | |
| FRAME_HEIGHT = int(os.environ.get("FRAME_HEIGHT", 480)) # Video capture height | |
| # Drowsiness Detection Thresholds | |
| EAR_THRESHOLD = 0.23 # Eye Aspect Ratio below this indicates closed eyes | |
| EAR_CONSEC_FRAMES = 3 # Consecutive frames below threshold to trigger eye-closed timer | |
| # Alert Severity Levels (Durations in Seconds) | |
| ALERT_LEVEL1_MIN = 3.0 # Min duration of closed eyes for Level 1 ("stay focused") | |
| ALERT_LEVEL1_MAX = 5.0 # Max duration of closed eyes for Level 1 | |
| ALERT_LEVEL2_MIN = 5.0 # Closed eyes duration for Level 2 ("wake up stay focus on road" louder) | |
| # Frequent Drowsiness Pattern Tracking | |
| FREQUENT_DROWSY_WINDOW = 60.0 # Sliding window (seconds) to monitor drowsiness event frequency | |
| FREQUENT_DROWSY_LIMIT = 2 # Max drowsiness warnings allowed in window before advising a break (Level 3) | |
| # Voice Assistant & SLM Settings | |
| OLLAMA_MODEL = os.environ.get("OLLAMA_MODEL", "drivesafe") # Our custom local Ollama model | |
| OLLAMA_API_URL = os.environ.get("OLLAMA_API_URL", "http://localhost:11434/api/generate") # Ollama generation endpoint | |
| SPEECH_RECOGNITION_TIMEOUT = 10 # Timeout for speech recognizer | |
| WAKE_WORD = "assistant" # Wake word for general conversations | |
| # Web HUD Dashboard Server | |
| FLASK_HOST = os.environ.get("FLASK_HOST", "127.0.0.1") | |
| FLASK_PORT = int(os.environ.get("FLASK_PORT", 5000)) | |
| # High Energy Music Links | |
| ENERGETIC_MUSIC_URL = "https://music.youtube.com/playlist?list=PLYBSqm--lNVt1H63PlRvigxvPU_unQe8m" | |
| # 2. Flask Web HUD Server & Shared DashboardState | |
| # Initialize Flask app | |
| app = Flask(__name__, template_folder='templates', static_folder='static') | |
| # Thread-safe global state for Flask-main loop communication | |
| class DashboardState: | |
| def __init__(self): | |
| self.lock = threading.Lock() | |
| self.latest_frame = None | |
| self.ear = 0.0 | |
| self.state = "NORMAL" | |
| self.drowsiness_count = 0 | |
| self.fps = 0 | |
| self.alert_message = "" | |
| self.chat_history = [] | |
| self.detection_active = True | |
| dashboard_state = DashboardState() | |
| def index(): | |
| """Renders the futuristic cyberpunk HUD dashboard.""" | |
| return render_template('index.html') | |
| def gen_video_feed(): | |
| """Generator function that yields JPEG frames for the live camera stream.""" | |
| while True: | |
| with dashboard_state.lock: | |
| if dashboard_state.latest_frame is None: | |
| frame_to_send = None | |
| else: | |
| frame_to_send = dashboard_state.latest_frame.copy() | |
| if frame_to_send is not None: | |
| # Encode BGR OpenCV frame to standard JPEG | |
| ret, jpeg = cv2.imencode('.jpg', frame_to_send) | |
| if ret: | |
| yield (b'--frame\r\n' | |
| b'Content-Type: image/jpeg\r\n\r\n' + jpeg.tobytes() + b'\r\n\r\n') | |
| # Frame-rate limiter (30 FPS max for the web stream to keep networking lightweight) | |
| time.sleep(1.0 / 30.0) | |
| def video_feed(): | |
| """Serves the real-time annotated video stream inside standard HTML img tags.""" | |
| return Response(gen_video_feed(), | |
| mimetype='multipart/x-mixed-replace; boundary=frame') | |
| def gen_telemetry_stream(): | |
| """Streams real-time system diagnostics to the browser via HTML5 Server-Sent Events (SSE).""" | |
| last_sent_time = 0 | |
| while True: | |
| # Throttle telemetry updates slightly (e.g. 15 updates/second) to keep browser rendering butter-smooth | |
| current_time = time.time() | |
| if current_time - last_sent_time >= 0.06: | |
| with dashboard_state.lock: | |
| data = { | |
| "ear": round(dashboard_state.ear, 3), | |
| "state": dashboard_state.state, | |
| "drowsiness_count": dashboard_state.drowsiness_count, | |
| "fps": dashboard_state.fps, | |
| "alert_message": dashboard_state.alert_message, | |
| "chat_history": dashboard_state.chat_history, | |
| "detection_active": dashboard_state.detection_active | |
| } | |
| # SSE data format: "data: <json>\n\n" | |
| yield f"data: {json.dumps(data)}\n\n" | |
| last_sent_time = current_time | |
| time.sleep(0.01) | |
| def telemetry(): | |
| """SSE endpoint for high-speed diagnostic telemetry streaming.""" | |
| return Response(gen_telemetry_stream(), mimetype='text/event-stream') | |
| # Interactive Control APIs | |
| def toggle_detection(): | |
| """Enables or disables active face and eye tracking.""" | |
| with dashboard_state.lock: | |
| dashboard_state.detection_active = not dashboard_state.detection_active | |
| status = dashboard_state.detection_active | |
| return jsonify({"status": "success", "detection_active": status}) | |
| def api_reset(): | |
| """Triggers a complete system reset from the dashboard panel.""" | |
| if hasattr(app, 'reset_callback') and app.reset_callback: | |
| app.reset_callback() | |
| return jsonify({"status": "success", "message": "System alerts and warning log reset."}) | |
| return jsonify({"status": "error", "message": "Reset callback not configured."}) | |
| def api_trigger_music(): | |
| """Manually triggers the energetic song from the dashboard panel.""" | |
| if hasattr(app, 'play_music_callback') and app.play_music_callback: | |
| app.play_music_callback() | |
| return jsonify({"status": "success", "message": "Playing energetic synthwave music!"}) | |
| return jsonify({"status": "error", "message": "Music callback not configured."}) | |
| def start_server_async(): | |
| """Runs the Flask development server on a dedicated background thread.""" | |
| # Suppress Flask development server startup messages to keep terminal clean | |
| log = logging.getLogger('werkzeug') | |
| log.setLevel(logging.ERROR) | |
| server_thread = threading.Thread( | |
| target=lambda: app.run(host=FLASK_HOST, port=FLASK_PORT, debug=False, use_reloader=False), | |
| daemon=True | |
| ) | |
| server_thread.start() | |
| print(f"[Flask Server] Running in background at http://{FLASK_HOST}:{FLASK_PORT}") | |
| # 3. AlertManager — Programmatic Sound Synthesis & Multi-Threaded Audio | |
| class AlertManager: | |
| def __init__(self): | |
| # Ensure directories exist | |
| os.makedirs("audio", exist_ok=True) | |
| # Programmatically synthesize our warning and chime audio files | |
| self._synthesize_audio_assets() | |
| # Initialize Pygame Mixer for non-blocking SFX playback | |
| pygame.mixer.init() | |
| # Audio file paths | |
| self.calm_beep_path = os.path.join("audio", "calm_beep.wav") | |
| self.urgent_beep_path = os.path.join("audio", "urgent_beep.wav") | |
| # Thread-safe speech queue & worker setup | |
| self.speech_queue = queue.Queue() | |
| self.is_speaking = False | |
| self.speech_thread = threading.Thread(target=self._speech_worker, daemon=True) | |
| self.speech_thread.start() | |
| def _synthesize_audio_assets(self): | |
| """Synthesizes custom chime and alert WAV files using numpy and scipy.""" | |
| sample_rate = 44100 | |
| # 1. Calm chime (gentle 550Hz sine wave decaying) | |
| duration = 0.4 | |
| t = np.linspace(0, duration, int(sample_rate * duration), False) | |
| envelope = np.exp(-5 * t) # decay envelope | |
| tone = np.sin(2 * np.pi * 550 * t) * envelope | |
| calm_data = (tone * 20000).astype(np.int16) | |
| wavfile.write(os.path.join("audio", "calm_beep.wav"), sample_rate, calm_data) | |
| # 2. Urgent pulsing beeps (three rapid 1200Hz pulse bursts) | |
| urgent_data = [] | |
| burst_duration = 0.08 | |
| gap_duration = 0.05 | |
| t_burst = np.linspace(0, burst_duration, int(sample_rate * burst_duration), False) | |
| burst = np.sin(2 * np.pi * 1200 * t_burst) * 32000 | |
| gap = np.zeros(int(sample_rate * gap_duration)) | |
| # Combine three bursts | |
| for _ in range(3): | |
| urgent_data.extend(burst) | |
| urgent_data.extend(gap) | |
| urgent_np = np.array(urgent_data, dtype=np.int16) | |
| wavfile.write(os.path.join("audio", "urgent_beep.wav"), sample_rate, urgent_np) | |
| def _speech_worker(self): | |
| """Background worker thread that serializes all speech requests using native PowerShell synthesis to prevent COM/threading locks.""" | |
| print("[AlertManager] Speech worker thread active.") | |
| import subprocess | |
| while True: | |
| try: | |
| # Blocks until an item is available | |
| text, volume, rate = self.speech_queue.get() | |
| self.is_speaking = True | |
| # Escape single quotes and backslashes for PowerShell safety | |
| escaped_text = text.replace("\\", "\\\\").replace("'", "''") | |
| # Map rate (150-190) to PowerShell Rate (-10 to 10) | |
| ps_rate = 0 | |
| if rate > 180: | |
| ps_rate = 2 | |
| elif rate < 150: | |
| ps_rate = -2 | |
| # Map volume (0.0 to 1.0) to PowerShell Volume (0 to 100) | |
| ps_volume = int(volume * 100) | |
| ps_command = ( | |
| f"Add-Type -AssemblyName System.Speech; " | |
| f"$speak = New-Object System.Speech.Synthesis.SpeechSynthesizer; " | |
| f"$speak.Rate = {ps_rate}; " | |
| f"$speak.Volume = {ps_volume}; " | |
| f"$speak.Speak('{escaped_text}')" | |
| ) | |
| # Run synchronously inside the worker thread to maintain sequential speech | |
| subprocess.run( | |
| ["powershell", "-NoProfile", "-ExecutionPolicy", "Bypass", "-Command", ps_command], | |
| stdout=subprocess.DEVNULL, | |
| stderr=subprocess.DEVNULL | |
| ) | |
| self.is_speaking = False | |
| self.speech_queue.task_done() | |
| except Exception as e: | |
| print(f"[AlertManager] Speech worker exception: {e}") | |
| self.is_speaking = False | |
| time.sleep(0.5) | |
| def speak(self, text, volume=0.8, rate=170): | |
| """Enqueues a text string to be spoken in the background thread.""" | |
| self.speech_queue.put((text, volume, rate)) | |
| def trigger_level1(self): | |
| """Level 1 Alert (3-5s closed): Soft chime, then calm voice.""" | |
| print("[AlertManager] Triggering Level 1 Alert: Calm Stay Focused") | |
| pygame.mixer.Sound(self.calm_beep_path).play() | |
| self.speak("Stay focused on the road", volume=0.7, rate=160) | |
| def trigger_level2(self): | |
| """Level 2 Alert (>5s closed): Loud siren beep, then loud voice.""" | |
| print("[AlertManager] Triggering Level 2 Alert: Loud WAKE UP!") | |
| pygame.mixer.Sound(self.urgent_beep_path).play() | |
| self.speak("Wake up! Stay focused on the road!", volume=1.0, rate=190) | |
| def trigger_level3_advisory(self): | |
| """Level 3 Alert (Frequent drowsiness): Ask to take a rest break on the side.""" | |
| print("[AlertManager] Triggering Level 3 Alert: Rest break advisory") | |
| pygame.mixer.Sound(self.urgent_beep_path).play() | |
| self.speak("You are getting drowsy frequently. Please pull over on the side and take a rest.", volume=0.9, rate=170) | |
| def ask_energetic_song(self): | |
| """Ask the driver if they want to listen to an energetic song.""" | |
| print("[AlertManager] Querying driver for energetic song") | |
| self.speak("Alright. Would you like to listen to an energetic song to help you stay awake?", volume=0.85, rate=170) | |
| def play_energetic_music(self): | |
| """Announce and play energetic music.""" | |
| print("[AlertManager] Playing energetic music") | |
| self.speak("Playing some high energy synthwave beats. Turn it up and stay alert!", volume=0.9, rate=170) | |
| webbrowser.open(ENERGETIC_MUSIC_URL) | |
| # 4. EyeDetector — 2x Downsampling dlib Eye Landmark Processor with Fallback | |
| class EyeDetector: | |
| def __init__(self): | |
| self.scale_factor = 2 # Resizes to 50% width/height (4x speedup) | |
| self.last_warning_time = 0 | |
| def _calculate_ear(self, eye_points): | |
| """Calculates the Eye Aspect Ratio (EAR) for a single eye list of 6 points.""" | |
| p1 = np.array(eye_points[0]) | |
| p2 = np.array(eye_points[1]) | |
| p3 = np.array(eye_points[2]) | |
| p4 = np.array(eye_points[3]) | |
| p5 = np.array(eye_points[4]) | |
| p6 = np.array(eye_points[5]) | |
| vertical1 = np.linalg.norm(p2 - p6) | |
| vertical2 = np.linalg.norm(p3 - p5) | |
| horizontal = np.linalg.norm(p1 - p4) | |
| if horizontal == 0: | |
| return 0.0 | |
| return (vertical1 + vertical2) / (2.0 * horizontal) | |
| def process_frame(self, frame): | |
| """Processes a single BGR camera frame with a robust full-res fallback.""" | |
| height, width, _ = frame.shape | |
| debug_frame = frame.copy() | |
| # 1. Downsample the frame for high-speed face detection | |
| small_frame = cv2.resize(frame, (0, 0), fx=1.0/self.scale_factor, fy=1.0/self.scale_factor) | |
| rgb_small_frame = cv2.cvtColor(small_frame, cv2.COLOR_BGR2RGB) | |
| # 2. Try fast downscaled detection first | |
| face_landmarks_list = face_recognition.face_landmarks(rgb_small_frame) | |
| current_scale = self.scale_factor | |
| # 3. Fallback: If no face found in small frame, try the full-resolution frame! | |
| if not face_landmarks_list: | |
| rgb_full_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| face_landmarks_list = face_recognition.face_landmarks(rgb_full_frame) | |
| current_scale = 1 | |
| avg_ear = None | |
| landmarks_found = None | |
| if face_landmarks_list: | |
| face_landmarks = face_landmarks_list[0] | |
| landmarks_found = face_landmarks | |
| left_eye_raw = face_landmarks.get('left_eye', []) | |
| right_eye_raw = face_landmarks.get('right_eye', []) | |
| if len(left_eye_raw) == 6 and len(right_eye_raw) == 6: | |
| # Scale coordinates back up to original frame dimensions | |
| left_eye = [(int(x * current_scale), int(y * current_scale)) for (x, y) in left_eye_raw] | |
| right_eye = [(int(x * current_scale), int(y * current_scale)) for (x, y) in right_eye_raw] | |
| left_ear = self._calculate_ear(left_eye) | |
| right_ear = self._calculate_ear(right_eye) | |
| avg_ear = (left_ear + right_ear) / 2.0 | |
| # Draw the glowing tech HUD outlines | |
| self._draw_eye_hud(debug_frame, left_eye, right_eye, avg_ear) | |
| else: | |
| # No face detected! Print throttled console warning and show overlay text | |
| current_time = time.time() | |
| if current_time - self.last_warning_time > 2.5: | |
| print("[EyeDetector] WARNING: No face detected in camera stream! Adjust position or lighting.") | |
| self.last_warning_time = current_time | |
| # Draw warning overlay on dashboard feed | |
| cv2.putText(debug_frame, "NO FACE DETECTED", (width // 2 - 120, height // 2), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2) | |
| cv2.putText(debug_frame, "Adjust Camera / Lighting", (width // 2 - 140, height // 2 + 30), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 165, 255), 1) | |
| return avg_ear, landmarks_found, debug_frame | |
| def _draw_eye_hud(self, frame, left_eye, right_eye, ear): | |
| """Draws glowing HUD tech contours on eyes and shows EAR readout.""" | |
| if ear is not None and ear < EAR_THRESHOLD: | |
| color = (0, 0, 255) # Red: Closed/Drowsy | |
| thickness = 2 | |
| else: | |
| color = (0, 255, 0) # Green: Open/Safe | |
| thickness = 1 | |
| left_pts = np.array(left_eye, np.int32) | |
| cv2.polylines(frame, [left_pts], True, color, thickness) | |
| right_pts = np.array(right_eye, np.int32) | |
| cv2.polylines(frame, [right_pts], True, color, thickness) | |
| for (x, y) in left_eye + right_eye: | |
| cv2.circle(frame, (x, y), 2, (255, 255, 0), -1) | |
| if ear is not None: | |
| text = f"EAR: {ear:.2f}" | |
| cv2.putText(frame, text, (30, 40), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2) | |
| # 5. VoiceAssistant — Dynamic Action Router & Background Speech Recognition | |
| class VoiceAssistant: | |
| def __init__(self, alert_manager, state_callbacks): | |
| self.alert_manager = alert_manager | |
| self.callbacks = state_callbacks | |
| self.recognizer = sr.Recognizer() | |
| self.recognizer.energy_threshold = 4000 | |
| self.recognizer.dynamic_energy_threshold = True | |
| self.running = True | |
| self.thread = threading.Thread(target=self._assistant_loop, daemon=True) | |
| self.thread.start() | |
| def query_ollama_slm(self, prompt): | |
| """Sends user transcription to the local custom drivesafe SLM on Ollama.""" | |
| payload = { | |
| "model": OLLAMA_MODEL, | |
| "prompt": prompt, | |
| "stream": False | |
| } | |
| headers = {"Content-Type": "application/json"} | |
| try: | |
| req = urllib.request.Request( | |
| OLLAMA_API_URL, | |
| data=json.dumps(payload).encode("utf-8"), | |
| headers=headers, | |
| method="POST" | |
| ) | |
| # Use a 10-second timeout to accommodate initial Ollama cold start weight loading | |
| with urllib.request.urlopen(req, timeout=10) as response: | |
| res_data = json.loads(response.read().decode("utf-8")) | |
| reply = res_data.get("response", "").strip() | |
| # Clean up any quotes or markdown from the SLM | |
| reply = reply.replace('"', '').replace('*', '').strip() | |
| return reply | |
| except Exception as e: | |
| print(f"[Ollama SLM] Error or timeout querying local model: {e}") | |
| lower_prompt = prompt.lower() | |
| if "hello" in lower_prompt or "hi" in lower_prompt: | |
| return "Hello! I am here. Eyes on the road, friend." | |
| elif "joke" in lower_prompt: | |
| return "Why did the scarecrow win an award? Because he was outstanding in his field. Stay alert!" | |
| else: | |
| return "Understood. Keep driving safely, stay focused on the road." | |
| def _assistant_loop(self): | |
| """Background continuous microphone listening loop.""" | |
| print("[VoiceAssistant] Speech recognizer thread started.") | |
| try: | |
| mic = sr.Microphone() | |
| except Exception as e: | |
| print(f"[VoiceAssistant] Error accessing microphone: {e}. Voice controls disabled.") | |
| return | |
| with mic as source: | |
| print("[VoiceAssistant] Calibrating microphone for driving background noise...") | |
| self.recognizer.adjust_for_ambient_noise(source, duration=2) | |
| print("[VoiceAssistant] Calibration complete. Ready for voice interaction.") | |
| while self.running: | |
| if self.alert_manager.is_speaking: | |
| time.sleep(0.3) | |
| continue | |
| try: | |
| audio = self.recognizer.listen(source, timeout=1.5, phrase_time_limit=4.0) | |
| except sr.WaitTimeoutError: | |
| continue | |
| except Exception as e: | |
| print(f"[VoiceAssistant] Microphone capture error: {e}") | |
| time.sleep(0.5) | |
| continue | |
| if self.alert_manager.is_speaking: | |
| continue | |
| # Run speech recognition in a separate thread to keep mic pipeline responsive | |
| threading.Thread(target=self._process_audio, args=(audio,), daemon=True).start() | |
| def _process_audio(self, audio): | |
| """Recognizes speech and routes commands dynamically.""" | |
| try: | |
| text = self.recognizer.recognize_google(audio) | |
| print(f"[Driver Heard] {text}") | |
| except sr.UnknownValueError: | |
| return | |
| except sr.RequestError: | |
| try: | |
| print("[VoiceAssistant] Cloud Speech API unavailable. Attempting local Whisper...") | |
| text = self.recognizer.recognize_whisper(audio, model="base.en") | |
| print(f"[Driver Heard (Whisper)] {text}") | |
| except Exception as e: | |
| print(f"[VoiceAssistant] Offline recognition failed: {e}") | |
| return | |
| cleaned_text = text.strip().lower() | |
| if not cleaned_text: | |
| return | |
| # STATE-SPECIFIC ROUTING (Emergency Rest / Song Prompts) | |
| current_state = self.callbacks['get_system_state']() | |
| # 1. State: Driver has been warned of frequent drowsiness (Level 3 Advisory) | |
| if current_state == "WAITING_REST_RESPONSE": | |
| refusal_words = ["no", "never", "can't", "wont", "won't", "refuse", "impossible", "fine", "good", "no thanks", "no rest", "keep driving"] | |
| accepted_words = ["yes", "yeah", "ok", "okay", "fine I will", "sure", "pulling over"] | |
| if any(word in cleaned_text for word in refusal_words): | |
| print("[VoiceAssistant] Driver refused rest. Prompting for energetic song.") | |
| self.callbacks['set_system_state']("WAITING_SONG_RESPONSE") | |
| self.callbacks['add_chat_log'](text, "No, I'm fine. I won't stop.") | |
| time.sleep(0.5) | |
| self.alert_manager.ask_energetic_song() | |
| self.callbacks['add_chat_log']("System", "Alright. Would you like to listen to an energetic song to help you stay awake?") | |
| return | |
| elif any(word in cleaned_text for word in accepted_words) or "pull" in cleaned_text: | |
| print("[VoiceAssistant] Driver accepted rest.") | |
| self.callbacks['reset_warnings']() | |
| self.callbacks['add_chat_log'](text, "Okay, pulling over.") | |
| self.alert_manager.speak("Good decision. Pull over safely and take some rest.") | |
| self.callbacks['add_chat_log']("System", "Good decision. Pull over safely and take some rest.") | |
| return | |
| # 2. State: Driver refused rest, now confirming if they want a song | |
| elif current_state == "WAITING_SONG_RESPONSE": | |
| accepted_words = ["yes", "yeah", "sure", "ok", "okay", "play", "song", "music", "please"] | |
| if any(word in cleaned_text for word in accepted_words): | |
| print("[VoiceAssistant] Driver accepted song.") | |
| self.callbacks['add_chat_log'](text, "Yes, play some music.") | |
| self.alert_manager.play_energetic_music() | |
| self.callbacks['add_chat_log']("System", "Playing energetic synthwave beats! Stay awake!") | |
| self.callbacks['set_system_state']("PLAYING_MUSIC") | |
| return | |
| else: | |
| print("[VoiceAssistant] Driver declined song.") | |
| self.callbacks['add_chat_log'](text, "No, I'm okay.") | |
| self.alert_manager.speak("Understood. Keep your eyes on the road. Stay focused.") | |
| self.callbacks['add_chat_log']("System", "Understood. Keep your eyes on the road. Stay focused.") | |
| self.callbacks['reset_warnings']() | |
| return | |
| # DIRECT SYSTEM BACKUP COMMANDS (Local Regex Override) | |
| if "reset" in cleaned_text or "clear" in cleaned_text or "awake" in cleaned_text or "focused" in cleaned_text: | |
| print("[VoiceAssistant] Safe state reset command received.") | |
| self.callbacks['reset_warnings']() | |
| self.callbacks['add_chat_log'](text, "Reset assistant") | |
| self.alert_manager.speak("System reset. Let's keep driving safely.") | |
| self.callbacks['add_chat_log']("System", "System reset. Let's keep driving safely.") | |
| return | |
| has_play = any(p in cleaned_text for p in ["play", "start", "turn on", "listen", "put on", "launch"]) | |
| has_music_kw = any(kw in cleaned_text for kw in ["music", "song", "beat", "tune", "track", "musc", "melody", "audio", "lofi"]) | |
| if has_play: | |
| # Extract query after the play keyword | |
| play_keyword = next((p for p in ["play", "start", "turn on", "listen to", "put on", "launch"] if p in cleaned_text), "play") | |
| idx = cleaned_text.find(play_keyword) | |
| music_query = cleaned_text[idx + len(play_keyword):].strip() | |
| # Clean common filler words | |
| for filler in ["some", "a", "the", "music", "song", "track", "musc"]: | |
| if music_query.startswith(filler): | |
| music_query = music_query[len(filler):].strip() | |
| if music_query.endswith(filler): | |
| music_query = music_query[:-len(filler)].strip() | |
| # If the remaining query is empty or generic, play the custom playlist | |
| if not music_query or music_query in ["music", "song", "beat", "tune", "track", "musc", "melody"]: | |
| print("[VoiceAssistant] General music command recognized locally. Playing playlist.") | |
| self.callbacks['add_chat_log'](text, "Requested general music playback") | |
| self.alert_manager.play_energetic_music() | |
| self.callbacks['set_system_state']("PLAYING_MUSIC") | |
| return | |
| else: | |
| # Play specific song directly! | |
| print(f"[VoiceAssistant] Specific song command recognized locally: {music_query}") | |
| confirm_msg = f"Sure thing! Autoplay in progress for {music_query}." | |
| self.callbacks['add_chat_log'](text, confirm_msg) | |
| self.alert_manager.speak(confirm_msg) | |
| # Fetch first search result and autoplay! | |
| search_url = f"https://www.youtube.com/results?search_query={urllib.parse.quote(music_query)}" | |
| headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'} | |
| try: | |
| req = urllib.request.Request(search_url, headers=headers) | |
| with urllib.request.urlopen(req, timeout=5) as res: | |
| html = res.read().decode('utf-8') | |
| video_ids = re.findall(r'/watch\?v=([a-zA-Z0-9_-]{11})', html) | |
| if video_ids: | |
| first_video_id = video_ids[0] | |
| direct_url = f"https://www.youtube.com/watch?v={first_video_id}&autoplay=1" | |
| print(f"[VoiceAssistant] Auto-playing first matching YouTube video: {direct_url}") | |
| webbrowser.open(direct_url) | |
| else: | |
| webbrowser.open(search_url) | |
| except Exception as e: | |
| print(f"[VoiceAssistant] Autoplay scraper failed: {e}. Falling back to search page.") | |
| webbrowser.open(search_url) | |
| self.callbacks['set_system_state']("PLAYING_MUSIC") | |
| return | |
| has_stop = any(s in cleaned_text for s in ["stop", "pause", "turn off", "mute", "quiet", "halt", "shut up"]) | |
| if has_stop and (has_music_kw or "music" in cleaned_text or "song" in cleaned_text or "sound" in cleaned_text or "radio" in cleaned_text): | |
| print("[VoiceAssistant] Flexible stop music command recognized.") | |
| # Simulate media play/pause key to halt browser/audio stream | |
| import ctypes | |
| VK_MEDIA_PLAY_PAUSE = 0xB3 | |
| try: | |
| ctypes.windll.user32.keybd_event(VK_MEDIA_PLAY_PAUSE, 0, 0, 0) | |
| ctypes.windll.user32.keybd_event(VK_MEDIA_PLAY_PAUSE, 0, 2, 0) | |
| except Exception as e: | |
| print(f"[VoiceAssistant] Failed simulating media key: {e}") | |
| self.callbacks['set_system_state']("NORMAL") | |
| self.callbacks['add_chat_log'](text, "Stop the music") | |
| self.alert_manager.speak("Stopping the music. Keep your eyes on the road.") | |
| self.callbacks['add_chat_log']("System", "Music stopped.") | |
| return | |
| # CONVERSATIONAL LOCAL SLM (Always Active - No Wake Word/Filters Required!) | |
| # Route ANY general speech dynamically straight to our local Ollama custom model! | |
| print(f"[Ollama Query] {text}") | |
| reply = self.query_ollama_slm(text) | |
| print(f"[SLM Reply] {reply}") | |
| # Check if Ollama returned a dynamic PLAY action tag (e.g. "[PLAY] paint it black") | |
| if "[play]" in reply.lower(): | |
| match = re.search(r'\[play\]\s*(.*)', reply, re.IGNORECASE) | |
| if match: | |
| music_query = match.group(1).strip() | |
| music_query = music_query.replace('"', '').replace('[', '').replace(']', '').strip() | |
| confirm_msg = f"Sure thing! Autoplay in progress for {music_query}." | |
| self.callbacks['add_chat_log'](text, confirm_msg) | |
| self.alert_manager.speak(confirm_msg) | |
| # Fetch the first search result from YouTube dynamically and play it directly! | |
| search_url = f"https://www.youtube.com/results?search_query={urllib.parse.quote(music_query)}" | |
| headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'} | |
| try: | |
| req = urllib.request.Request(search_url, headers=headers) | |
| with urllib.request.urlopen(req, timeout=5) as res: | |
| html = res.read().decode('utf-8') | |
| # Search for video watch paths | |
| video_ids = re.findall(r'/watch\?v=([a-zA-Z0-9_-]{11})', html) | |
| if video_ids: | |
| first_video_id = video_ids[0] | |
| direct_url = f"https://www.youtube.com/watch?v={first_video_id}&autoplay=1" | |
| print(f"[VoiceAssistant] Auto-playing first matching YouTube video: {direct_url}") | |
| webbrowser.open(direct_url) | |
| else: | |
| webbrowser.open(search_url) | |
| except Exception as e: | |
| print(f"[VoiceAssistant] Autoplay scraper failed: {e}. Falling back to search page.") | |
| webbrowser.open(search_url) | |
| self.callbacks['set_system_state']("PLAYING_MUSIC") | |
| return | |
| # Check if Ollama returned a dynamic STOP action tag (e.g. "[STOP]") | |
| if "[stop]" in reply.lower(): | |
| match = re.search(r'\[stop\]\s*(.*)', reply, re.IGNORECASE) | |
| clean_reply = match.group(1).strip() if match else "Stopping the music. Keep your eyes on the road!" | |
| clean_reply = clean_reply.replace('[', '').replace(']', '').strip() | |
| print("[VoiceAssistant] Action STOP triggered dynamically by Ollama.") | |
| # Simulate media play/pause key to stop the browser audio stream | |
| import ctypes | |
| VK_MEDIA_PLAY_PAUSE = 0xB3 | |
| try: | |
| ctypes.windll.user32.keybd_event(VK_MEDIA_PLAY_PAUSE, 0, 0, 0) | |
| ctypes.windll.user32.keybd_event(VK_MEDIA_PLAY_PAUSE, 0, 2, 0) | |
| except Exception as e: | |
| print(f"[VoiceAssistant] Failed simulating media key: {e}") | |
| self.callbacks['set_system_state']("NORMAL") | |
| self.callbacks['add_chat_log'](text, clean_reply) | |
| self.alert_manager.speak(clean_reply) | |
| return | |
| # Check if Ollama returned a dynamic RESET action tag (e.g. "[RESET]") | |
| if "[reset]" in reply.lower(): | |
| match = re.search(r'\[reset\]\s*(.*)', reply, re.IGNORECASE) | |
| clean_reply = match.group(1).strip() if match else "System warnings cleared. Drive safely!" | |
| clean_reply = clean_reply.replace('[', '').replace(']', '').strip() | |
| print("[VoiceAssistant] Action RESET triggered dynamically by Ollama.") | |
| self.callbacks['reset_warnings']() | |
| self.callbacks['add_chat_log'](text, clean_reply) | |
| self.alert_manager.speak(clean_reply) | |
| return | |
| # General conversational response | |
| self.callbacks['add_chat_log'](text, reply) | |
| self.alert_manager.speak(reply) | |
| def stop(self): | |
| """Stops the assistant background thread.""" | |
| self.running = False | |
| # 6. SafeDrivingAssistant Core Engine & Orchestrator Coordinator | |
| class SafeDrivingAssistant: | |
| def __init__(self): | |
| print("[CoreEngine] Initializing Safe Driving Assistant...") | |
| # Initialize Audio Alert & Sound Synthesizer | |
| self.alert_manager = AlertManager() | |
| # Initialize face_recognition Eye Landmark Processor | |
| self.detector = EyeDetector() | |
| # Tracking states and timelines | |
| self.consec_closed_frames = 0 | |
| self.eyes_closed_start_time = None | |
| self.active_alert_level = 0 # 0: None, 1: Stay Focused, 2: Wake Up Loud | |
| # Rolling log of drowsiness timestamps to monitor frequency | |
| self.drowsiness_events = collections.deque() | |
| # Keyboard reset helper | |
| self.last_key_press = None | |
| # Setup conversational callbacks for our Voice Assistant & SLM | |
| self.callbacks = { | |
| 'get_system_state': self.get_system_state, | |
| 'set_system_state': self.set_system_state, | |
| 'reset_warnings': self.reset_warnings, | |
| 'add_chat_log': self.add_chat_log | |
| } | |
| # Bind callbacks back to Flask REST API endpoints | |
| app.reset_callback = self.reset_warnings | |
| app.play_music_callback = self.play_energetic_music | |
| # Initialize speech listener thread | |
| self.assistant = VoiceAssistant(self.alert_manager, self.callbacks) | |
| # Boot Flask HUD Web Dashboard in the background | |
| start_server_async() | |
| # Coordinator Callback Handlers | |
| def get_system_state(self): | |
| """Thread-safe state getter for the Voice Assistant.""" | |
| with dashboard_state.lock: | |
| return dashboard_state.state | |
| def set_system_state(self, new_state): | |
| """Thread-safe state setter for the Voice Assistant.""" | |
| with dashboard_state.lock: | |
| dashboard_state.state = new_state | |
| if new_state == "NORMAL": | |
| dashboard_state.alert_message = "" | |
| elif new_state == "WAITING_REST_RESPONSE": | |
| dashboard_state.alert_message = "ADVISING REST BREAK" | |
| elif new_state == "WAITING_SONG_RESPONSE": | |
| dashboard_state.alert_message = "OFFERING ENERGETIC MUSIC" | |
| elif new_state == "PLAYING_MUSIC": | |
| dashboard_state.alert_message = "PLAYING HIGH ENERGY BEATS" | |
| def reset_warnings(self): | |
| """Complete reset of all active alarms, timers, and warning metrics.""" | |
| print("[CoreEngine] Performing comprehensive system alert reset.") | |
| with dashboard_state.lock: | |
| dashboard_state.state = "NORMAL" | |
| dashboard_state.alert_message = "" | |
| dashboard_state.drowsiness_count = 0 | |
| self.consec_closed_frames = 0 | |
| self.eyes_closed_start_time = None | |
| self.active_alert_level = 0 | |
| self.drowsiness_events.clear() | |
| # Enqueue a log message | |
| self.add_chat_log("System", "System alerts and warnings reset to NORMAL.") | |
| def add_chat_log(self, user_query, slm_reply=""): | |
| """Pushes voice transcripts to the dashboard log log history.""" | |
| with dashboard_state.lock: | |
| if user_query == "System": | |
| dashboard_state.chat_history.append({ | |
| "speaker": "System", | |
| "message": slm_reply | |
| }) | |
| else: | |
| dashboard_state.chat_history.append({ | |
| "speaker": "Driver", | |
| "query": user_query, | |
| "message": slm_reply | |
| }) | |
| def play_energetic_music(self): | |
| """Orchestrator hook to trigger the energetic music sequence.""" | |
| self.set_system_state("PLAYING_MUSIC") | |
| self.alert_manager.play_energetic_music() | |
| self.add_chat_log("System", "Energetic synthwave music started. Stay alert!") | |
| # Core Drowsiness Evaluation & Loop | |
| def run(self): | |
| """Main camera acquisition loop that drives the safe assistant.""" | |
| print("[CoreEngine] Accessing camera stream...") | |
| cap = cv2.VideoCapture(CAMERA_ID) | |
| # Configure video dimension overrides from settings | |
| cap.set(cv2.CAP_PROP_FRAME_WIDTH, FRAME_WIDTH) | |
| cap.set(cv2.CAP_PROP_FRAME_HEIGHT, FRAME_HEIGHT) | |
| # Detect if we are in a headless cloud environment without a webcam | |
| use_simulation = False | |
| if not cap.isOpened(): | |
| print("[CoreEngine] WARNING: Could not access physical web camera.") | |
| print("[CoreEngine] Pivoting to Cloud Simulation Mode to keep web HUD alive...") | |
| use_simulation = True | |
| else: | |
| print("[CoreEngine] Camera stream operational. System fully active.") | |
| print("[CoreEngine] System loop running. Use the Web Dashboard to monitor telemetry.") | |
| prev_time = time.time() | |
| try: | |
| while True: | |
| current_time = time.time() | |
| if use_simulation: | |
| # Generate an animated cyberpunk grid frame for the headless dashboard | |
| frame = np.zeros((FRAME_HEIGHT, FRAME_WIDTH, 3), dtype=np.uint8) | |
| # Create a scrolling scan line | |
| scan_y = int(current_time * 120) % FRAME_HEIGHT | |
| cv2.line(frame, (0, scan_y), (FRAME_WIDTH, scan_y), (40, 40, 40), 2) | |
| cv2.putText(frame, "CLOUD SIMULATION FEED (NO PHYSICAL CAM)", (20, FRAME_HEIGHT - 20), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.4, (0, 255, 0), 1) | |
| ret = True | |
| else: | |
| ret, frame = cap.read() | |
| if not ret: | |
| time.sleep(0.01) | |
| continue | |
| # Mirror frame for intuitive pilot HUD overlay | |
| frame = cv2.flip(frame, 1) | |
| # Check if tracking is active (controlled from Dashboard) | |
| with dashboard_state.lock: | |
| active = dashboard_state.detection_active | |
| if active: | |
| if use_simulation: | |
| # Cloud Demo Mode: Automatically simulate a drowsy event cycle every 25 seconds | |
| # to let you test your Flask dashboard overlays and system responses safely! | |
| cycle = int(current_time) % 25 | |
| if cycle > 18: # Simulate closed eyes for 7 seconds | |
| ear = 0.16 | |
| cv2.putText(frame, "SIMULATING DROWSINESS (EYES CLOSED)", (FRAME_WIDTH // 2 - 180, FRAME_HEIGHT // 2), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2) | |
| else: | |
| ear = 0.28 | |
| processed_frame = frame.copy() | |
| # Draw virtual telemetry eye dots onto the matrix background | |
| cv2.circle(processed_frame, (int(FRAME_WIDTH * 0.4), int(FRAME_HEIGHT * 0.45)), 8, (0, 255, 0) if ear > EAR_THRESHOLD else (0, 0, 255), -1) | |
| cv2.circle(processed_frame, (int(FRAME_WIDTH * 0.6), int(FRAME_HEIGHT * 0.45)), 8, (0, 255, 0) if ear > EAR_THRESHOLD else (0, 0, 255), -1) | |
| if ear is not None: | |
| cv2.putText(processed_frame, f"EAR: {ear:.2f}", (30, 40), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0) if ear > EAR_THRESHOLD else (0, 0, 255), 2) | |
| landmarks = {} | |
| else: | |
| # Calculate EAR and overlay glow contours on frame via physical camera | |
| ear, landmarks, processed_frame = self.detector.process_frame(frame) | |
| else: | |
| processed_frame = frame.copy() | |
| cv2.putText(processed_frame, "TRACKING PAUSED", (50, 50), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 165, 255), 2) | |
| ear = None | |
| # Calculate processing frame rate (FPS) | |
| fps = int(1.0 / (current_time - prev_time)) if (current_time - prev_time) > 0 else 30 | |
| prev_time = current_time | |
| # Drowsiness Logic Decision Engine | |
| if ear is not None and active: | |
| if ear < EAR_THRESHOLD: | |
| self.consec_closed_frames += 1 | |
| # Once consecutive frames pass noise filter, start duration timer | |
| if self.consec_closed_frames >= EAR_CONSEC_FRAMES: | |
| if self.eyes_closed_start_time is None: | |
| self.eyes_closed_start_time = current_time | |
| else: | |
| closed_duration = current_time - self.eyes_closed_start_time | |
| # LEVEL 1: Eyes Closed 3-5 seconds | |
| if ALERT_LEVEL1_MIN <= closed_duration < ALERT_LEVEL1_MAX: | |
| if self.active_alert_level < 1: | |
| self.active_alert_level = 1 | |
| self.set_system_state("CLOSED_3S") | |
| self.alert_manager.trigger_level1() | |
| # Record timestamp to sliding frequency tracker | |
| self.drowsiness_events.append(current_time) | |
| with dashboard_state.lock: | |
| dashboard_state.drowsiness_count += 1 | |
| self.add_chat_log("System", "WARNING: Eyes closed for 3 seconds! Stay focused!") | |
| # LEVEL 2: Eyes Closed > 5 seconds (Louder Warning!) | |
| elif closed_duration >= ALERT_LEVEL2_MIN: | |
| if self.active_alert_level < 2: | |
| self.active_alert_level = 2 | |
| self.set_system_state("CLOSED_5S") | |
| self.alert_manager.trigger_level2() | |
| # Record second timestamp | |
| self.drowsiness_events.append(current_time) | |
| with dashboard_state.lock: | |
| dashboard_state.drowsiness_count += 1 | |
| self.add_chat_log("System", "CRITICAL ALARM: Eyes closed for 5+ seconds! WAKE UP!") | |
| else: | |
| # Eyes are open! Reset filters and check for Level 3 Advisory escalation | |
| self.consec_closed_frames = 0 | |
| if self.eyes_closed_start_time is not None: | |
| self.eyes_closed_start_time = None | |
| while self.drowsiness_events and (current_time - self.drowsiness_events[0] > FREQUENT_DROWSY_WINDOW): | |
| self.drowsiness_events.popleft() | |
| # LEVEL 3: Frequent Drowsiness check (if closed events occur >= limit in last 60s) | |
| if len(self.drowsiness_events) >= FREQUENT_DROWSY_LIMIT: | |
| print(f"[CoreEngine] Frequent drowsiness detected ({len(self.drowsiness_events)} events in 60s). Escalating to Level 3.") | |
| self.set_system_state("WAITING_REST_RESPONSE") | |
| self.alert_manager.trigger_level3_advisory() | |
| self.add_chat_log("System", "FREQUENT DROWSINESS DETECTED. Prompting driver to pull over.") | |
| else: | |
| # Normal recovery | |
| current_state = self.get_system_state() | |
| if current_state not in ["WAITING_REST_RESPONSE", "WAITING_SONG_RESPONSE"]: | |
| self.set_system_state("NORMAL") | |
| self.active_alert_level = 0 | |
| else: | |
| self.consec_closed_frames = 0 | |
| self.eyes_closed_start_time = None | |
| # Update Global Telemetry Buffer for Flask Server | |
| with dashboard_state.lock: | |
| dashboard_state.latest_frame = processed_frame.copy() | |
| if ear is not None: | |
| dashboard_state.ear = ear | |
| else: | |
| dashboard_state.ear = 0.30 # Default baseline when no face present | |
| dashboard_state.fps = fps | |
| # OpenCV display output fallback (wrapped safely to prevent headless display context drops) | |
| try: | |
| cv2.imshow("DriveSafe HUD AI Console", processed_frame) | |
| key = cv2.waitKey(1) & 0xFF | |
| if key == ord('q') or key == 27: | |
| print("[CoreEngine] Exit key received. Terminating system.") | |
| break | |
| elif key == ord('r'): | |
| self.reset_warnings() | |
| except Exception: | |
| # Prevents crashes on platforms where standard desktop window pipelines are fully restricted | |
| time.sleep(0.03) | |
| except KeyboardInterrupt: | |
| print("[CoreEngine] Keyboard interrupt. Shutting down.") | |
| finally: | |
| print("[CoreEngine] Releasing resources...") | |
| cap.release() | |
| try: | |
| cv2.destroyAllWindows() | |
| except Exception: | |
| pass | |
| self.assistant.stop() | |
| sys.exit(0) | |
| if __name__ == "__main__": | |
| assistant_app = SafeDrivingAssistant() | |
| assistant_app.run() | |