themehmi's picture
Update main.py
1e64e4d verified
"""
main.py — Self-Contained, Fully Integrated Safe Driving Assistant
Consolidates all system configuration, custom non-blocking sound synthesizer, dlib Eye Landmark processor,
Ollama SLM action voice-assistant parser, Flask SSE Telemetry Dashboard, and main drowsiness timer logic
into one unified, ultra-premium script.
"""
import os
import sys
import time
import json
import queue
import collections
import threading
import logging
import urllib.request
import urllib.parse
import webbrowser
import re
# Force dummy audio driver for headless container environments
os.environ["SDL_AUDIODRIVER"] = "dummy"
import numpy as np
import scipy.io.wavfile as wavfile
import pygame
import pyttsx3
import cv2
import face_recognition
import speech_recognition as sr
from flask import Flask, render_template, Response, jsonify, request
# 1. DriveSafe Assistant — Configuration Settings
CAMERA_ID = int(os.environ.get("CAMERA_ID", 0)) # Index of the webcam (usually 0)
FRAME_WIDTH = int(os.environ.get("FRAME_WIDTH", 640)) # Video capture width
FRAME_HEIGHT = int(os.environ.get("FRAME_HEIGHT", 480)) # Video capture height
# Drowsiness Detection Thresholds
EAR_THRESHOLD = 0.23 # Eye Aspect Ratio below this indicates closed eyes
EAR_CONSEC_FRAMES = 3 # Consecutive frames below threshold to trigger eye-closed timer
# Alert Severity Levels (Durations in Seconds)
ALERT_LEVEL1_MIN = 3.0 # Min duration of closed eyes for Level 1 ("stay focused")
ALERT_LEVEL1_MAX = 5.0 # Max duration of closed eyes for Level 1
ALERT_LEVEL2_MIN = 5.0 # Closed eyes duration for Level 2 ("wake up stay focus on road" louder)
# Frequent Drowsiness Pattern Tracking
FREQUENT_DROWSY_WINDOW = 60.0 # Sliding window (seconds) to monitor drowsiness event frequency
FREQUENT_DROWSY_LIMIT = 2 # Max drowsiness warnings allowed in window before advising a break (Level 3)
# Voice Assistant & SLM Settings
OLLAMA_MODEL = os.environ.get("OLLAMA_MODEL", "drivesafe") # Our custom local Ollama model
OLLAMA_API_URL = os.environ.get("OLLAMA_API_URL", "http://localhost:11434/api/generate") # Ollama generation endpoint
SPEECH_RECOGNITION_TIMEOUT = 10 # Timeout for speech recognizer
WAKE_WORD = "assistant" # Wake word for general conversations
# Web HUD Dashboard Server
FLASK_HOST = os.environ.get("FLASK_HOST", "127.0.0.1")
FLASK_PORT = int(os.environ.get("FLASK_PORT", 5000))
# High Energy Music Links
ENERGETIC_MUSIC_URL = "https://music.youtube.com/playlist?list=PLYBSqm--lNVt1H63PlRvigxvPU_unQe8m"
# 2. Flask Web HUD Server & Shared DashboardState
# Initialize Flask app
app = Flask(__name__, template_folder='templates', static_folder='static')
# Thread-safe global state for Flask-main loop communication
class DashboardState:
def __init__(self):
self.lock = threading.Lock()
self.latest_frame = None
self.ear = 0.0
self.state = "NORMAL"
self.drowsiness_count = 0
self.fps = 0
self.alert_message = ""
self.chat_history = []
self.detection_active = True
dashboard_state = DashboardState()
@app.route('/')
def index():
"""Renders the futuristic cyberpunk HUD dashboard."""
return render_template('index.html')
def gen_video_feed():
"""Generator function that yields JPEG frames for the live camera stream."""
while True:
with dashboard_state.lock:
if dashboard_state.latest_frame is None:
frame_to_send = None
else:
frame_to_send = dashboard_state.latest_frame.copy()
if frame_to_send is not None:
# Encode BGR OpenCV frame to standard JPEG
ret, jpeg = cv2.imencode('.jpg', frame_to_send)
if ret:
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + jpeg.tobytes() + b'\r\n\r\n')
# Frame-rate limiter (30 FPS max for the web stream to keep networking lightweight)
time.sleep(1.0 / 30.0)
@app.route('/video_feed')
def video_feed():
"""Serves the real-time annotated video stream inside standard HTML img tags."""
return Response(gen_video_feed(),
mimetype='multipart/x-mixed-replace; boundary=frame')
def gen_telemetry_stream():
"""Streams real-time system diagnostics to the browser via HTML5 Server-Sent Events (SSE)."""
last_sent_time = 0
while True:
# Throttle telemetry updates slightly (e.g. 15 updates/second) to keep browser rendering butter-smooth
current_time = time.time()
if current_time - last_sent_time >= 0.06:
with dashboard_state.lock:
data = {
"ear": round(dashboard_state.ear, 3),
"state": dashboard_state.state,
"drowsiness_count": dashboard_state.drowsiness_count,
"fps": dashboard_state.fps,
"alert_message": dashboard_state.alert_message,
"chat_history": dashboard_state.chat_history,
"detection_active": dashboard_state.detection_active
}
# SSE data format: "data: <json>\n\n"
yield f"data: {json.dumps(data)}\n\n"
last_sent_time = current_time
time.sleep(0.01)
@app.route('/telemetry')
def telemetry():
"""SSE endpoint for high-speed diagnostic telemetry streaming."""
return Response(gen_telemetry_stream(), mimetype='text/event-stream')
# Interactive Control APIs
@app.route('/api/toggle_detection', methods=['POST'])
def toggle_detection():
"""Enables or disables active face and eye tracking."""
with dashboard_state.lock:
dashboard_state.detection_active = not dashboard_state.detection_active
status = dashboard_state.detection_active
return jsonify({"status": "success", "detection_active": status})
@app.route('/api/reset', methods=['POST'])
def api_reset():
"""Triggers a complete system reset from the dashboard panel."""
if hasattr(app, 'reset_callback') and app.reset_callback:
app.reset_callback()
return jsonify({"status": "success", "message": "System alerts and warning log reset."})
return jsonify({"status": "error", "message": "Reset callback not configured."})
@app.route('/api/trigger_music', methods=['POST'])
def api_trigger_music():
"""Manually triggers the energetic song from the dashboard panel."""
if hasattr(app, 'play_music_callback') and app.play_music_callback:
app.play_music_callback()
return jsonify({"status": "success", "message": "Playing energetic synthwave music!"})
return jsonify({"status": "error", "message": "Music callback not configured."})
def start_server_async():
"""Runs the Flask development server on a dedicated background thread."""
# Suppress Flask development server startup messages to keep terminal clean
log = logging.getLogger('werkzeug')
log.setLevel(logging.ERROR)
server_thread = threading.Thread(
target=lambda: app.run(host=FLASK_HOST, port=FLASK_PORT, debug=False, use_reloader=False),
daemon=True
)
server_thread.start()
print(f"[Flask Server] Running in background at http://{FLASK_HOST}:{FLASK_PORT}")
# 3. AlertManager — Programmatic Sound Synthesis & Multi-Threaded Audio
class AlertManager:
def __init__(self):
# Ensure directories exist
os.makedirs("audio", exist_ok=True)
# Programmatically synthesize our warning and chime audio files
self._synthesize_audio_assets()
# Initialize Pygame Mixer for non-blocking SFX playback
pygame.mixer.init()
# Audio file paths
self.calm_beep_path = os.path.join("audio", "calm_beep.wav")
self.urgent_beep_path = os.path.join("audio", "urgent_beep.wav")
# Thread-safe speech queue & worker setup
self.speech_queue = queue.Queue()
self.is_speaking = False
self.speech_thread = threading.Thread(target=self._speech_worker, daemon=True)
self.speech_thread.start()
def _synthesize_audio_assets(self):
"""Synthesizes custom chime and alert WAV files using numpy and scipy."""
sample_rate = 44100
# 1. Calm chime (gentle 550Hz sine wave decaying)
duration = 0.4
t = np.linspace(0, duration, int(sample_rate * duration), False)
envelope = np.exp(-5 * t) # decay envelope
tone = np.sin(2 * np.pi * 550 * t) * envelope
calm_data = (tone * 20000).astype(np.int16)
wavfile.write(os.path.join("audio", "calm_beep.wav"), sample_rate, calm_data)
# 2. Urgent pulsing beeps (three rapid 1200Hz pulse bursts)
urgent_data = []
burst_duration = 0.08
gap_duration = 0.05
t_burst = np.linspace(0, burst_duration, int(sample_rate * burst_duration), False)
burst = np.sin(2 * np.pi * 1200 * t_burst) * 32000
gap = np.zeros(int(sample_rate * gap_duration))
# Combine three bursts
for _ in range(3):
urgent_data.extend(burst)
urgent_data.extend(gap)
urgent_np = np.array(urgent_data, dtype=np.int16)
wavfile.write(os.path.join("audio", "urgent_beep.wav"), sample_rate, urgent_np)
def _speech_worker(self):
"""Background worker thread that serializes all speech requests using native PowerShell synthesis to prevent COM/threading locks."""
print("[AlertManager] Speech worker thread active.")
import subprocess
while True:
try:
# Blocks until an item is available
text, volume, rate = self.speech_queue.get()
self.is_speaking = True
# Escape single quotes and backslashes for PowerShell safety
escaped_text = text.replace("\\", "\\\\").replace("'", "''")
# Map rate (150-190) to PowerShell Rate (-10 to 10)
ps_rate = 0
if rate > 180:
ps_rate = 2
elif rate < 150:
ps_rate = -2
# Map volume (0.0 to 1.0) to PowerShell Volume (0 to 100)
ps_volume = int(volume * 100)
ps_command = (
f"Add-Type -AssemblyName System.Speech; "
f"$speak = New-Object System.Speech.Synthesis.SpeechSynthesizer; "
f"$speak.Rate = {ps_rate}; "
f"$speak.Volume = {ps_volume}; "
f"$speak.Speak('{escaped_text}')"
)
# Run synchronously inside the worker thread to maintain sequential speech
subprocess.run(
["powershell", "-NoProfile", "-ExecutionPolicy", "Bypass", "-Command", ps_command],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
self.is_speaking = False
self.speech_queue.task_done()
except Exception as e:
print(f"[AlertManager] Speech worker exception: {e}")
self.is_speaking = False
time.sleep(0.5)
def speak(self, text, volume=0.8, rate=170):
"""Enqueues a text string to be spoken in the background thread."""
self.speech_queue.put((text, volume, rate))
def trigger_level1(self):
"""Level 1 Alert (3-5s closed): Soft chime, then calm voice."""
print("[AlertManager] Triggering Level 1 Alert: Calm Stay Focused")
pygame.mixer.Sound(self.calm_beep_path).play()
self.speak("Stay focused on the road", volume=0.7, rate=160)
def trigger_level2(self):
"""Level 2 Alert (>5s closed): Loud siren beep, then loud voice."""
print("[AlertManager] Triggering Level 2 Alert: Loud WAKE UP!")
pygame.mixer.Sound(self.urgent_beep_path).play()
self.speak("Wake up! Stay focused on the road!", volume=1.0, rate=190)
def trigger_level3_advisory(self):
"""Level 3 Alert (Frequent drowsiness): Ask to take a rest break on the side."""
print("[AlertManager] Triggering Level 3 Alert: Rest break advisory")
pygame.mixer.Sound(self.urgent_beep_path).play()
self.speak("You are getting drowsy frequently. Please pull over on the side and take a rest.", volume=0.9, rate=170)
def ask_energetic_song(self):
"""Ask the driver if they want to listen to an energetic song."""
print("[AlertManager] Querying driver for energetic song")
self.speak("Alright. Would you like to listen to an energetic song to help you stay awake?", volume=0.85, rate=170)
def play_energetic_music(self):
"""Announce and play energetic music."""
print("[AlertManager] Playing energetic music")
self.speak("Playing some high energy synthwave beats. Turn it up and stay alert!", volume=0.9, rate=170)
webbrowser.open(ENERGETIC_MUSIC_URL)
# 4. EyeDetector — 2x Downsampling dlib Eye Landmark Processor with Fallback
class EyeDetector:
def __init__(self):
self.scale_factor = 2 # Resizes to 50% width/height (4x speedup)
self.last_warning_time = 0
def _calculate_ear(self, eye_points):
"""Calculates the Eye Aspect Ratio (EAR) for a single eye list of 6 points."""
p1 = np.array(eye_points[0])
p2 = np.array(eye_points[1])
p3 = np.array(eye_points[2])
p4 = np.array(eye_points[3])
p5 = np.array(eye_points[4])
p6 = np.array(eye_points[5])
vertical1 = np.linalg.norm(p2 - p6)
vertical2 = np.linalg.norm(p3 - p5)
horizontal = np.linalg.norm(p1 - p4)
if horizontal == 0:
return 0.0
return (vertical1 + vertical2) / (2.0 * horizontal)
def process_frame(self, frame):
"""Processes a single BGR camera frame with a robust full-res fallback."""
height, width, _ = frame.shape
debug_frame = frame.copy()
# 1. Downsample the frame for high-speed face detection
small_frame = cv2.resize(frame, (0, 0), fx=1.0/self.scale_factor, fy=1.0/self.scale_factor)
rgb_small_frame = cv2.cvtColor(small_frame, cv2.COLOR_BGR2RGB)
# 2. Try fast downscaled detection first
face_landmarks_list = face_recognition.face_landmarks(rgb_small_frame)
current_scale = self.scale_factor
# 3. Fallback: If no face found in small frame, try the full-resolution frame!
if not face_landmarks_list:
rgb_full_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
face_landmarks_list = face_recognition.face_landmarks(rgb_full_frame)
current_scale = 1
avg_ear = None
landmarks_found = None
if face_landmarks_list:
face_landmarks = face_landmarks_list[0]
landmarks_found = face_landmarks
left_eye_raw = face_landmarks.get('left_eye', [])
right_eye_raw = face_landmarks.get('right_eye', [])
if len(left_eye_raw) == 6 and len(right_eye_raw) == 6:
# Scale coordinates back up to original frame dimensions
left_eye = [(int(x * current_scale), int(y * current_scale)) for (x, y) in left_eye_raw]
right_eye = [(int(x * current_scale), int(y * current_scale)) for (x, y) in right_eye_raw]
left_ear = self._calculate_ear(left_eye)
right_ear = self._calculate_ear(right_eye)
avg_ear = (left_ear + right_ear) / 2.0
# Draw the glowing tech HUD outlines
self._draw_eye_hud(debug_frame, left_eye, right_eye, avg_ear)
else:
# No face detected! Print throttled console warning and show overlay text
current_time = time.time()
if current_time - self.last_warning_time > 2.5:
print("[EyeDetector] WARNING: No face detected in camera stream! Adjust position or lighting.")
self.last_warning_time = current_time
# Draw warning overlay on dashboard feed
cv2.putText(debug_frame, "NO FACE DETECTED", (width // 2 - 120, height // 2),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
cv2.putText(debug_frame, "Adjust Camera / Lighting", (width // 2 - 140, height // 2 + 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 165, 255), 1)
return avg_ear, landmarks_found, debug_frame
def _draw_eye_hud(self, frame, left_eye, right_eye, ear):
"""Draws glowing HUD tech contours on eyes and shows EAR readout."""
if ear is not None and ear < EAR_THRESHOLD:
color = (0, 0, 255) # Red: Closed/Drowsy
thickness = 2
else:
color = (0, 255, 0) # Green: Open/Safe
thickness = 1
left_pts = np.array(left_eye, np.int32)
cv2.polylines(frame, [left_pts], True, color, thickness)
right_pts = np.array(right_eye, np.int32)
cv2.polylines(frame, [right_pts], True, color, thickness)
for (x, y) in left_eye + right_eye:
cv2.circle(frame, (x, y), 2, (255, 255, 0), -1)
if ear is not None:
text = f"EAR: {ear:.2f}"
cv2.putText(frame, text, (30, 40), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
# 5. VoiceAssistant — Dynamic Action Router & Background Speech Recognition
class VoiceAssistant:
def __init__(self, alert_manager, state_callbacks):
self.alert_manager = alert_manager
self.callbacks = state_callbacks
self.recognizer = sr.Recognizer()
self.recognizer.energy_threshold = 4000
self.recognizer.dynamic_energy_threshold = True
self.running = True
self.thread = threading.Thread(target=self._assistant_loop, daemon=True)
self.thread.start()
def query_ollama_slm(self, prompt):
"""Sends user transcription to the local custom drivesafe SLM on Ollama."""
payload = {
"model": OLLAMA_MODEL,
"prompt": prompt,
"stream": False
}
headers = {"Content-Type": "application/json"}
try:
req = urllib.request.Request(
OLLAMA_API_URL,
data=json.dumps(payload).encode("utf-8"),
headers=headers,
method="POST"
)
# Use a 10-second timeout to accommodate initial Ollama cold start weight loading
with urllib.request.urlopen(req, timeout=10) as response:
res_data = json.loads(response.read().decode("utf-8"))
reply = res_data.get("response", "").strip()
# Clean up any quotes or markdown from the SLM
reply = reply.replace('"', '').replace('*', '').strip()
return reply
except Exception as e:
print(f"[Ollama SLM] Error or timeout querying local model: {e}")
lower_prompt = prompt.lower()
if "hello" in lower_prompt or "hi" in lower_prompt:
return "Hello! I am here. Eyes on the road, friend."
elif "joke" in lower_prompt:
return "Why did the scarecrow win an award? Because he was outstanding in his field. Stay alert!"
else:
return "Understood. Keep driving safely, stay focused on the road."
def _assistant_loop(self):
"""Background continuous microphone listening loop."""
print("[VoiceAssistant] Speech recognizer thread started.")
try:
mic = sr.Microphone()
except Exception as e:
print(f"[VoiceAssistant] Error accessing microphone: {e}. Voice controls disabled.")
return
with mic as source:
print("[VoiceAssistant] Calibrating microphone for driving background noise...")
self.recognizer.adjust_for_ambient_noise(source, duration=2)
print("[VoiceAssistant] Calibration complete. Ready for voice interaction.")
while self.running:
if self.alert_manager.is_speaking:
time.sleep(0.3)
continue
try:
audio = self.recognizer.listen(source, timeout=1.5, phrase_time_limit=4.0)
except sr.WaitTimeoutError:
continue
except Exception as e:
print(f"[VoiceAssistant] Microphone capture error: {e}")
time.sleep(0.5)
continue
if self.alert_manager.is_speaking:
continue
# Run speech recognition in a separate thread to keep mic pipeline responsive
threading.Thread(target=self._process_audio, args=(audio,), daemon=True).start()
def _process_audio(self, audio):
"""Recognizes speech and routes commands dynamically."""
try:
text = self.recognizer.recognize_google(audio)
print(f"[Driver Heard] {text}")
except sr.UnknownValueError:
return
except sr.RequestError:
try:
print("[VoiceAssistant] Cloud Speech API unavailable. Attempting local Whisper...")
text = self.recognizer.recognize_whisper(audio, model="base.en")
print(f"[Driver Heard (Whisper)] {text}")
except Exception as e:
print(f"[VoiceAssistant] Offline recognition failed: {e}")
return
cleaned_text = text.strip().lower()
if not cleaned_text:
return
# STATE-SPECIFIC ROUTING (Emergency Rest / Song Prompts)
current_state = self.callbacks['get_system_state']()
# 1. State: Driver has been warned of frequent drowsiness (Level 3 Advisory)
if current_state == "WAITING_REST_RESPONSE":
refusal_words = ["no", "never", "can't", "wont", "won't", "refuse", "impossible", "fine", "good", "no thanks", "no rest", "keep driving"]
accepted_words = ["yes", "yeah", "ok", "okay", "fine I will", "sure", "pulling over"]
if any(word in cleaned_text for word in refusal_words):
print("[VoiceAssistant] Driver refused rest. Prompting for energetic song.")
self.callbacks['set_system_state']("WAITING_SONG_RESPONSE")
self.callbacks['add_chat_log'](text, "No, I'm fine. I won't stop.")
time.sleep(0.5)
self.alert_manager.ask_energetic_song()
self.callbacks['add_chat_log']("System", "Alright. Would you like to listen to an energetic song to help you stay awake?")
return
elif any(word in cleaned_text for word in accepted_words) or "pull" in cleaned_text:
print("[VoiceAssistant] Driver accepted rest.")
self.callbacks['reset_warnings']()
self.callbacks['add_chat_log'](text, "Okay, pulling over.")
self.alert_manager.speak("Good decision. Pull over safely and take some rest.")
self.callbacks['add_chat_log']("System", "Good decision. Pull over safely and take some rest.")
return
# 2. State: Driver refused rest, now confirming if they want a song
elif current_state == "WAITING_SONG_RESPONSE":
accepted_words = ["yes", "yeah", "sure", "ok", "okay", "play", "song", "music", "please"]
if any(word in cleaned_text for word in accepted_words):
print("[VoiceAssistant] Driver accepted song.")
self.callbacks['add_chat_log'](text, "Yes, play some music.")
self.alert_manager.play_energetic_music()
self.callbacks['add_chat_log']("System", "Playing energetic synthwave beats! Stay awake!")
self.callbacks['set_system_state']("PLAYING_MUSIC")
return
else:
print("[VoiceAssistant] Driver declined song.")
self.callbacks['add_chat_log'](text, "No, I'm okay.")
self.alert_manager.speak("Understood. Keep your eyes on the road. Stay focused.")
self.callbacks['add_chat_log']("System", "Understood. Keep your eyes on the road. Stay focused.")
self.callbacks['reset_warnings']()
return
# DIRECT SYSTEM BACKUP COMMANDS (Local Regex Override)
if "reset" in cleaned_text or "clear" in cleaned_text or "awake" in cleaned_text or "focused" in cleaned_text:
print("[VoiceAssistant] Safe state reset command received.")
self.callbacks['reset_warnings']()
self.callbacks['add_chat_log'](text, "Reset assistant")
self.alert_manager.speak("System reset. Let's keep driving safely.")
self.callbacks['add_chat_log']("System", "System reset. Let's keep driving safely.")
return
has_play = any(p in cleaned_text for p in ["play", "start", "turn on", "listen", "put on", "launch"])
has_music_kw = any(kw in cleaned_text for kw in ["music", "song", "beat", "tune", "track", "musc", "melody", "audio", "lofi"])
if has_play:
# Extract query after the play keyword
play_keyword = next((p for p in ["play", "start", "turn on", "listen to", "put on", "launch"] if p in cleaned_text), "play")
idx = cleaned_text.find(play_keyword)
music_query = cleaned_text[idx + len(play_keyword):].strip()
# Clean common filler words
for filler in ["some", "a", "the", "music", "song", "track", "musc"]:
if music_query.startswith(filler):
music_query = music_query[len(filler):].strip()
if music_query.endswith(filler):
music_query = music_query[:-len(filler)].strip()
# If the remaining query is empty or generic, play the custom playlist
if not music_query or music_query in ["music", "song", "beat", "tune", "track", "musc", "melody"]:
print("[VoiceAssistant] General music command recognized locally. Playing playlist.")
self.callbacks['add_chat_log'](text, "Requested general music playback")
self.alert_manager.play_energetic_music()
self.callbacks['set_system_state']("PLAYING_MUSIC")
return
else:
# Play specific song directly!
print(f"[VoiceAssistant] Specific song command recognized locally: {music_query}")
confirm_msg = f"Sure thing! Autoplay in progress for {music_query}."
self.callbacks['add_chat_log'](text, confirm_msg)
self.alert_manager.speak(confirm_msg)
# Fetch first search result and autoplay!
search_url = f"https://www.youtube.com/results?search_query={urllib.parse.quote(music_query)}"
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'}
try:
req = urllib.request.Request(search_url, headers=headers)
with urllib.request.urlopen(req, timeout=5) as res:
html = res.read().decode('utf-8')
video_ids = re.findall(r'/watch\?v=([a-zA-Z0-9_-]{11})', html)
if video_ids:
first_video_id = video_ids[0]
direct_url = f"https://www.youtube.com/watch?v={first_video_id}&autoplay=1"
print(f"[VoiceAssistant] Auto-playing first matching YouTube video: {direct_url}")
webbrowser.open(direct_url)
else:
webbrowser.open(search_url)
except Exception as e:
print(f"[VoiceAssistant] Autoplay scraper failed: {e}. Falling back to search page.")
webbrowser.open(search_url)
self.callbacks['set_system_state']("PLAYING_MUSIC")
return
has_stop = any(s in cleaned_text for s in ["stop", "pause", "turn off", "mute", "quiet", "halt", "shut up"])
if has_stop and (has_music_kw or "music" in cleaned_text or "song" in cleaned_text or "sound" in cleaned_text or "radio" in cleaned_text):
print("[VoiceAssistant] Flexible stop music command recognized.")
# Simulate media play/pause key to halt browser/audio stream
import ctypes
VK_MEDIA_PLAY_PAUSE = 0xB3
try:
ctypes.windll.user32.keybd_event(VK_MEDIA_PLAY_PAUSE, 0, 0, 0)
ctypes.windll.user32.keybd_event(VK_MEDIA_PLAY_PAUSE, 0, 2, 0)
except Exception as e:
print(f"[VoiceAssistant] Failed simulating media key: {e}")
self.callbacks['set_system_state']("NORMAL")
self.callbacks['add_chat_log'](text, "Stop the music")
self.alert_manager.speak("Stopping the music. Keep your eyes on the road.")
self.callbacks['add_chat_log']("System", "Music stopped.")
return
# CONVERSATIONAL LOCAL SLM (Always Active - No Wake Word/Filters Required!)
# Route ANY general speech dynamically straight to our local Ollama custom model!
print(f"[Ollama Query] {text}")
reply = self.query_ollama_slm(text)
print(f"[SLM Reply] {reply}")
# Check if Ollama returned a dynamic PLAY action tag (e.g. "[PLAY] paint it black")
if "[play]" in reply.lower():
match = re.search(r'\[play\]\s*(.*)', reply, re.IGNORECASE)
if match:
music_query = match.group(1).strip()
music_query = music_query.replace('"', '').replace('[', '').replace(']', '').strip()
confirm_msg = f"Sure thing! Autoplay in progress for {music_query}."
self.callbacks['add_chat_log'](text, confirm_msg)
self.alert_manager.speak(confirm_msg)
# Fetch the first search result from YouTube dynamically and play it directly!
search_url = f"https://www.youtube.com/results?search_query={urllib.parse.quote(music_query)}"
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'}
try:
req = urllib.request.Request(search_url, headers=headers)
with urllib.request.urlopen(req, timeout=5) as res:
html = res.read().decode('utf-8')
# Search for video watch paths
video_ids = re.findall(r'/watch\?v=([a-zA-Z0-9_-]{11})', html)
if video_ids:
first_video_id = video_ids[0]
direct_url = f"https://www.youtube.com/watch?v={first_video_id}&autoplay=1"
print(f"[VoiceAssistant] Auto-playing first matching YouTube video: {direct_url}")
webbrowser.open(direct_url)
else:
webbrowser.open(search_url)
except Exception as e:
print(f"[VoiceAssistant] Autoplay scraper failed: {e}. Falling back to search page.")
webbrowser.open(search_url)
self.callbacks['set_system_state']("PLAYING_MUSIC")
return
# Check if Ollama returned a dynamic STOP action tag (e.g. "[STOP]")
if "[stop]" in reply.lower():
match = re.search(r'\[stop\]\s*(.*)', reply, re.IGNORECASE)
clean_reply = match.group(1).strip() if match else "Stopping the music. Keep your eyes on the road!"
clean_reply = clean_reply.replace('[', '').replace(']', '').strip()
print("[VoiceAssistant] Action STOP triggered dynamically by Ollama.")
# Simulate media play/pause key to stop the browser audio stream
import ctypes
VK_MEDIA_PLAY_PAUSE = 0xB3
try:
ctypes.windll.user32.keybd_event(VK_MEDIA_PLAY_PAUSE, 0, 0, 0)
ctypes.windll.user32.keybd_event(VK_MEDIA_PLAY_PAUSE, 0, 2, 0)
except Exception as e:
print(f"[VoiceAssistant] Failed simulating media key: {e}")
self.callbacks['set_system_state']("NORMAL")
self.callbacks['add_chat_log'](text, clean_reply)
self.alert_manager.speak(clean_reply)
return
# Check if Ollama returned a dynamic RESET action tag (e.g. "[RESET]")
if "[reset]" in reply.lower():
match = re.search(r'\[reset\]\s*(.*)', reply, re.IGNORECASE)
clean_reply = match.group(1).strip() if match else "System warnings cleared. Drive safely!"
clean_reply = clean_reply.replace('[', '').replace(']', '').strip()
print("[VoiceAssistant] Action RESET triggered dynamically by Ollama.")
self.callbacks['reset_warnings']()
self.callbacks['add_chat_log'](text, clean_reply)
self.alert_manager.speak(clean_reply)
return
# General conversational response
self.callbacks['add_chat_log'](text, reply)
self.alert_manager.speak(reply)
def stop(self):
"""Stops the assistant background thread."""
self.running = False
# 6. SafeDrivingAssistant Core Engine & Orchestrator Coordinator
class SafeDrivingAssistant:
def __init__(self):
print("[CoreEngine] Initializing Safe Driving Assistant...")
# Initialize Audio Alert & Sound Synthesizer
self.alert_manager = AlertManager()
# Initialize face_recognition Eye Landmark Processor
self.detector = EyeDetector()
# Tracking states and timelines
self.consec_closed_frames = 0
self.eyes_closed_start_time = None
self.active_alert_level = 0 # 0: None, 1: Stay Focused, 2: Wake Up Loud
# Rolling log of drowsiness timestamps to monitor frequency
self.drowsiness_events = collections.deque()
# Keyboard reset helper
self.last_key_press = None
# Setup conversational callbacks for our Voice Assistant & SLM
self.callbacks = {
'get_system_state': self.get_system_state,
'set_system_state': self.set_system_state,
'reset_warnings': self.reset_warnings,
'add_chat_log': self.add_chat_log
}
# Bind callbacks back to Flask REST API endpoints
app.reset_callback = self.reset_warnings
app.play_music_callback = self.play_energetic_music
# Initialize speech listener thread
self.assistant = VoiceAssistant(self.alert_manager, self.callbacks)
# Boot Flask HUD Web Dashboard in the background
start_server_async()
# Coordinator Callback Handlers
def get_system_state(self):
"""Thread-safe state getter for the Voice Assistant."""
with dashboard_state.lock:
return dashboard_state.state
def set_system_state(self, new_state):
"""Thread-safe state setter for the Voice Assistant."""
with dashboard_state.lock:
dashboard_state.state = new_state
if new_state == "NORMAL":
dashboard_state.alert_message = ""
elif new_state == "WAITING_REST_RESPONSE":
dashboard_state.alert_message = "ADVISING REST BREAK"
elif new_state == "WAITING_SONG_RESPONSE":
dashboard_state.alert_message = "OFFERING ENERGETIC MUSIC"
elif new_state == "PLAYING_MUSIC":
dashboard_state.alert_message = "PLAYING HIGH ENERGY BEATS"
def reset_warnings(self):
"""Complete reset of all active alarms, timers, and warning metrics."""
print("[CoreEngine] Performing comprehensive system alert reset.")
with dashboard_state.lock:
dashboard_state.state = "NORMAL"
dashboard_state.alert_message = ""
dashboard_state.drowsiness_count = 0
self.consec_closed_frames = 0
self.eyes_closed_start_time = None
self.active_alert_level = 0
self.drowsiness_events.clear()
# Enqueue a log message
self.add_chat_log("System", "System alerts and warnings reset to NORMAL.")
def add_chat_log(self, user_query, slm_reply=""):
"""Pushes voice transcripts to the dashboard log log history."""
with dashboard_state.lock:
if user_query == "System":
dashboard_state.chat_history.append({
"speaker": "System",
"message": slm_reply
})
else:
dashboard_state.chat_history.append({
"speaker": "Driver",
"query": user_query,
"message": slm_reply
})
def play_energetic_music(self):
"""Orchestrator hook to trigger the energetic music sequence."""
self.set_system_state("PLAYING_MUSIC")
self.alert_manager.play_energetic_music()
self.add_chat_log("System", "Energetic synthwave music started. Stay alert!")
# Core Drowsiness Evaluation & Loop
def run(self):
"""Main camera acquisition loop that drives the safe assistant."""
print("[CoreEngine] Accessing camera stream...")
cap = cv2.VideoCapture(CAMERA_ID)
# Configure video dimension overrides from settings
cap.set(cv2.CAP_PROP_FRAME_WIDTH, FRAME_WIDTH)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, FRAME_HEIGHT)
# Detect if we are in a headless cloud environment without a webcam
use_simulation = False
if not cap.isOpened():
print("[CoreEngine] WARNING: Could not access physical web camera.")
print("[CoreEngine] Pivoting to Cloud Simulation Mode to keep web HUD alive...")
use_simulation = True
else:
print("[CoreEngine] Camera stream operational. System fully active.")
print("[CoreEngine] System loop running. Use the Web Dashboard to monitor telemetry.")
prev_time = time.time()
try:
while True:
current_time = time.time()
if use_simulation:
# Generate an animated cyberpunk grid frame for the headless dashboard
frame = np.zeros((FRAME_HEIGHT, FRAME_WIDTH, 3), dtype=np.uint8)
# Create a scrolling scan line
scan_y = int(current_time * 120) % FRAME_HEIGHT
cv2.line(frame, (0, scan_y), (FRAME_WIDTH, scan_y), (40, 40, 40), 2)
cv2.putText(frame, "CLOUD SIMULATION FEED (NO PHYSICAL CAM)", (20, FRAME_HEIGHT - 20),
cv2.FONT_HERSHEY_SIMPLEX, 0.4, (0, 255, 0), 1)
ret = True
else:
ret, frame = cap.read()
if not ret:
time.sleep(0.01)
continue
# Mirror frame for intuitive pilot HUD overlay
frame = cv2.flip(frame, 1)
# Check if tracking is active (controlled from Dashboard)
with dashboard_state.lock:
active = dashboard_state.detection_active
if active:
if use_simulation:
# Cloud Demo Mode: Automatically simulate a drowsy event cycle every 25 seconds
# to let you test your Flask dashboard overlays and system responses safely!
cycle = int(current_time) % 25
if cycle > 18: # Simulate closed eyes for 7 seconds
ear = 0.16
cv2.putText(frame, "SIMULATING DROWSINESS (EYES CLOSED)", (FRAME_WIDTH // 2 - 180, FRAME_HEIGHT // 2),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)
else:
ear = 0.28
processed_frame = frame.copy()
# Draw virtual telemetry eye dots onto the matrix background
cv2.circle(processed_frame, (int(FRAME_WIDTH * 0.4), int(FRAME_HEIGHT * 0.45)), 8, (0, 255, 0) if ear > EAR_THRESHOLD else (0, 0, 255), -1)
cv2.circle(processed_frame, (int(FRAME_WIDTH * 0.6), int(FRAME_HEIGHT * 0.45)), 8, (0, 255, 0) if ear > EAR_THRESHOLD else (0, 0, 255), -1)
if ear is not None:
cv2.putText(processed_frame, f"EAR: {ear:.2f}", (30, 40), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0) if ear > EAR_THRESHOLD else (0, 0, 255), 2)
landmarks = {}
else:
# Calculate EAR and overlay glow contours on frame via physical camera
ear, landmarks, processed_frame = self.detector.process_frame(frame)
else:
processed_frame = frame.copy()
cv2.putText(processed_frame, "TRACKING PAUSED", (50, 50),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 165, 255), 2)
ear = None
# Calculate processing frame rate (FPS)
fps = int(1.0 / (current_time - prev_time)) if (current_time - prev_time) > 0 else 30
prev_time = current_time
# Drowsiness Logic Decision Engine
if ear is not None and active:
if ear < EAR_THRESHOLD:
self.consec_closed_frames += 1
# Once consecutive frames pass noise filter, start duration timer
if self.consec_closed_frames >= EAR_CONSEC_FRAMES:
if self.eyes_closed_start_time is None:
self.eyes_closed_start_time = current_time
else:
closed_duration = current_time - self.eyes_closed_start_time
# LEVEL 1: Eyes Closed 3-5 seconds
if ALERT_LEVEL1_MIN <= closed_duration < ALERT_LEVEL1_MAX:
if self.active_alert_level < 1:
self.active_alert_level = 1
self.set_system_state("CLOSED_3S")
self.alert_manager.trigger_level1()
# Record timestamp to sliding frequency tracker
self.drowsiness_events.append(current_time)
with dashboard_state.lock:
dashboard_state.drowsiness_count += 1
self.add_chat_log("System", "WARNING: Eyes closed for 3 seconds! Stay focused!")
# LEVEL 2: Eyes Closed > 5 seconds (Louder Warning!)
elif closed_duration >= ALERT_LEVEL2_MIN:
if self.active_alert_level < 2:
self.active_alert_level = 2
self.set_system_state("CLOSED_5S")
self.alert_manager.trigger_level2()
# Record second timestamp
self.drowsiness_events.append(current_time)
with dashboard_state.lock:
dashboard_state.drowsiness_count += 1
self.add_chat_log("System", "CRITICAL ALARM: Eyes closed for 5+ seconds! WAKE UP!")
else:
# Eyes are open! Reset filters and check for Level 3 Advisory escalation
self.consec_closed_frames = 0
if self.eyes_closed_start_time is not None:
self.eyes_closed_start_time = None
while self.drowsiness_events and (current_time - self.drowsiness_events[0] > FREQUENT_DROWSY_WINDOW):
self.drowsiness_events.popleft()
# LEVEL 3: Frequent Drowsiness check (if closed events occur >= limit in last 60s)
if len(self.drowsiness_events) >= FREQUENT_DROWSY_LIMIT:
print(f"[CoreEngine] Frequent drowsiness detected ({len(self.drowsiness_events)} events in 60s). Escalating to Level 3.")
self.set_system_state("WAITING_REST_RESPONSE")
self.alert_manager.trigger_level3_advisory()
self.add_chat_log("System", "FREQUENT DROWSINESS DETECTED. Prompting driver to pull over.")
else:
# Normal recovery
current_state = self.get_system_state()
if current_state not in ["WAITING_REST_RESPONSE", "WAITING_SONG_RESPONSE"]:
self.set_system_state("NORMAL")
self.active_alert_level = 0
else:
self.consec_closed_frames = 0
self.eyes_closed_start_time = None
# Update Global Telemetry Buffer for Flask Server
with dashboard_state.lock:
dashboard_state.latest_frame = processed_frame.copy()
if ear is not None:
dashboard_state.ear = ear
else:
dashboard_state.ear = 0.30 # Default baseline when no face present
dashboard_state.fps = fps
# OpenCV display output fallback (wrapped safely to prevent headless display context drops)
try:
cv2.imshow("DriveSafe HUD AI Console", processed_frame)
key = cv2.waitKey(1) & 0xFF
if key == ord('q') or key == 27:
print("[CoreEngine] Exit key received. Terminating system.")
break
elif key == ord('r'):
self.reset_warnings()
except Exception:
# Prevents crashes on platforms where standard desktop window pipelines are fully restricted
time.sleep(0.03)
except KeyboardInterrupt:
print("[CoreEngine] Keyboard interrupt. Shutting down.")
finally:
print("[CoreEngine] Releasing resources...")
cap.release()
try:
cv2.destroyAllWindows()
except Exception:
pass
self.assistant.stop()
sys.exit(0)
if __name__ == "__main__":
assistant_app = SafeDrivingAssistant()
assistant_app.run()