Spaces:
Runtime error
Runtime error
File size: 48,572 Bytes
c3383a4 c6753d1 c3383a4 1e64e4d c3383a4 1e64e4d c3383a4 1e64e4d c3383a4 1e64e4d c3383a4 1e64e4d c3383a4 1e64e4d c3383a4 1e64e4d c3383a4 1e64e4d c3383a4 1e64e4d c3383a4 1e64e4d c3383a4 1e64e4d c3383a4 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 | """
main.py — Self-Contained, Fully Integrated Safe Driving Assistant
Consolidates all system configuration, custom non-blocking sound synthesizer, dlib Eye Landmark processor,
Ollama SLM action voice-assistant parser, Flask SSE Telemetry Dashboard, and main drowsiness timer logic
into one unified, ultra-premium script.
"""
import os
import sys
import time
import json
import queue
import collections
import threading
import logging
import urllib.request
import urllib.parse
import webbrowser
import re
# Force dummy audio driver for headless container environments
os.environ["SDL_AUDIODRIVER"] = "dummy"
import numpy as np
import scipy.io.wavfile as wavfile
import pygame
import pyttsx3
import cv2
import face_recognition
import speech_recognition as sr
from flask import Flask, render_template, Response, jsonify, request
# 1. DriveSafe Assistant — Configuration Settings
CAMERA_ID = int(os.environ.get("CAMERA_ID", 0)) # Index of the webcam (usually 0)
FRAME_WIDTH = int(os.environ.get("FRAME_WIDTH", 640)) # Video capture width
FRAME_HEIGHT = int(os.environ.get("FRAME_HEIGHT", 480)) # Video capture height
# Drowsiness Detection Thresholds
EAR_THRESHOLD = 0.23 # Eye Aspect Ratio below this indicates closed eyes
EAR_CONSEC_FRAMES = 3 # Consecutive frames below threshold to trigger eye-closed timer
# Alert Severity Levels (Durations in Seconds)
ALERT_LEVEL1_MIN = 3.0 # Min duration of closed eyes for Level 1 ("stay focused")
ALERT_LEVEL1_MAX = 5.0 # Max duration of closed eyes for Level 1
ALERT_LEVEL2_MIN = 5.0 # Closed eyes duration for Level 2 ("wake up stay focus on road" louder)
# Frequent Drowsiness Pattern Tracking
FREQUENT_DROWSY_WINDOW = 60.0 # Sliding window (seconds) to monitor drowsiness event frequency
FREQUENT_DROWSY_LIMIT = 2 # Max drowsiness warnings allowed in window before advising a break (Level 3)
# Voice Assistant & SLM Settings
OLLAMA_MODEL = os.environ.get("OLLAMA_MODEL", "drivesafe") # Our custom local Ollama model
OLLAMA_API_URL = os.environ.get("OLLAMA_API_URL", "http://localhost:11434/api/generate") # Ollama generation endpoint
SPEECH_RECOGNITION_TIMEOUT = 10 # Timeout for speech recognizer
WAKE_WORD = "assistant" # Wake word for general conversations
# Web HUD Dashboard Server
FLASK_HOST = os.environ.get("FLASK_HOST", "127.0.0.1")
FLASK_PORT = int(os.environ.get("FLASK_PORT", 5000))
# High Energy Music Links
ENERGETIC_MUSIC_URL = "https://music.youtube.com/playlist?list=PLYBSqm--lNVt1H63PlRvigxvPU_unQe8m"
# 2. Flask Web HUD Server & Shared DashboardState
# Initialize Flask app
app = Flask(__name__, template_folder='templates', static_folder='static')
# Thread-safe global state for Flask-main loop communication
class DashboardState:
def __init__(self):
self.lock = threading.Lock()
self.latest_frame = None
self.ear = 0.0
self.state = "NORMAL"
self.drowsiness_count = 0
self.fps = 0
self.alert_message = ""
self.chat_history = []
self.detection_active = True
dashboard_state = DashboardState()
@app.route('/')
def index():
"""Renders the futuristic cyberpunk HUD dashboard."""
return render_template('index.html')
def gen_video_feed():
"""Generator function that yields JPEG frames for the live camera stream."""
while True:
with dashboard_state.lock:
if dashboard_state.latest_frame is None:
frame_to_send = None
else:
frame_to_send = dashboard_state.latest_frame.copy()
if frame_to_send is not None:
# Encode BGR OpenCV frame to standard JPEG
ret, jpeg = cv2.imencode('.jpg', frame_to_send)
if ret:
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + jpeg.tobytes() + b'\r\n\r\n')
# Frame-rate limiter (30 FPS max for the web stream to keep networking lightweight)
time.sleep(1.0 / 30.0)
@app.route('/video_feed')
def video_feed():
"""Serves the real-time annotated video stream inside standard HTML img tags."""
return Response(gen_video_feed(),
mimetype='multipart/x-mixed-replace; boundary=frame')
def gen_telemetry_stream():
"""Streams real-time system diagnostics to the browser via HTML5 Server-Sent Events (SSE)."""
last_sent_time = 0
while True:
# Throttle telemetry updates slightly (e.g. 15 updates/second) to keep browser rendering butter-smooth
current_time = time.time()
if current_time - last_sent_time >= 0.06:
with dashboard_state.lock:
data = {
"ear": round(dashboard_state.ear, 3),
"state": dashboard_state.state,
"drowsiness_count": dashboard_state.drowsiness_count,
"fps": dashboard_state.fps,
"alert_message": dashboard_state.alert_message,
"chat_history": dashboard_state.chat_history,
"detection_active": dashboard_state.detection_active
}
# SSE data format: "data: <json>\n\n"
yield f"data: {json.dumps(data)}\n\n"
last_sent_time = current_time
time.sleep(0.01)
@app.route('/telemetry')
def telemetry():
"""SSE endpoint for high-speed diagnostic telemetry streaming."""
return Response(gen_telemetry_stream(), mimetype='text/event-stream')
# Interactive Control APIs
@app.route('/api/toggle_detection', methods=['POST'])
def toggle_detection():
"""Enables or disables active face and eye tracking."""
with dashboard_state.lock:
dashboard_state.detection_active = not dashboard_state.detection_active
status = dashboard_state.detection_active
return jsonify({"status": "success", "detection_active": status})
@app.route('/api/reset', methods=['POST'])
def api_reset():
"""Triggers a complete system reset from the dashboard panel."""
if hasattr(app, 'reset_callback') and app.reset_callback:
app.reset_callback()
return jsonify({"status": "success", "message": "System alerts and warning log reset."})
return jsonify({"status": "error", "message": "Reset callback not configured."})
@app.route('/api/trigger_music', methods=['POST'])
def api_trigger_music():
"""Manually triggers the energetic song from the dashboard panel."""
if hasattr(app, 'play_music_callback') and app.play_music_callback:
app.play_music_callback()
return jsonify({"status": "success", "message": "Playing energetic synthwave music!"})
return jsonify({"status": "error", "message": "Music callback not configured."})
def start_server_async():
"""Runs the Flask development server on a dedicated background thread."""
# Suppress Flask development server startup messages to keep terminal clean
log = logging.getLogger('werkzeug')
log.setLevel(logging.ERROR)
server_thread = threading.Thread(
target=lambda: app.run(host=FLASK_HOST, port=FLASK_PORT, debug=False, use_reloader=False),
daemon=True
)
server_thread.start()
print(f"[Flask Server] Running in background at http://{FLASK_HOST}:{FLASK_PORT}")
# 3. AlertManager — Programmatic Sound Synthesis & Multi-Threaded Audio
class AlertManager:
def __init__(self):
# Ensure directories exist
os.makedirs("audio", exist_ok=True)
# Programmatically synthesize our warning and chime audio files
self._synthesize_audio_assets()
# Initialize Pygame Mixer for non-blocking SFX playback
pygame.mixer.init()
# Audio file paths
self.calm_beep_path = os.path.join("audio", "calm_beep.wav")
self.urgent_beep_path = os.path.join("audio", "urgent_beep.wav")
# Thread-safe speech queue & worker setup
self.speech_queue = queue.Queue()
self.is_speaking = False
self.speech_thread = threading.Thread(target=self._speech_worker, daemon=True)
self.speech_thread.start()
def _synthesize_audio_assets(self):
"""Synthesizes custom chime and alert WAV files using numpy and scipy."""
sample_rate = 44100
# 1. Calm chime (gentle 550Hz sine wave decaying)
duration = 0.4
t = np.linspace(0, duration, int(sample_rate * duration), False)
envelope = np.exp(-5 * t) # decay envelope
tone = np.sin(2 * np.pi * 550 * t) * envelope
calm_data = (tone * 20000).astype(np.int16)
wavfile.write(os.path.join("audio", "calm_beep.wav"), sample_rate, calm_data)
# 2. Urgent pulsing beeps (three rapid 1200Hz pulse bursts)
urgent_data = []
burst_duration = 0.08
gap_duration = 0.05
t_burst = np.linspace(0, burst_duration, int(sample_rate * burst_duration), False)
burst = np.sin(2 * np.pi * 1200 * t_burst) * 32000
gap = np.zeros(int(sample_rate * gap_duration))
# Combine three bursts
for _ in range(3):
urgent_data.extend(burst)
urgent_data.extend(gap)
urgent_np = np.array(urgent_data, dtype=np.int16)
wavfile.write(os.path.join("audio", "urgent_beep.wav"), sample_rate, urgent_np)
def _speech_worker(self):
"""Background worker thread that serializes all speech requests using native PowerShell synthesis to prevent COM/threading locks."""
print("[AlertManager] Speech worker thread active.")
import subprocess
while True:
try:
# Blocks until an item is available
text, volume, rate = self.speech_queue.get()
self.is_speaking = True
# Escape single quotes and backslashes for PowerShell safety
escaped_text = text.replace("\\", "\\\\").replace("'", "''")
# Map rate (150-190) to PowerShell Rate (-10 to 10)
ps_rate = 0
if rate > 180:
ps_rate = 2
elif rate < 150:
ps_rate = -2
# Map volume (0.0 to 1.0) to PowerShell Volume (0 to 100)
ps_volume = int(volume * 100)
ps_command = (
f"Add-Type -AssemblyName System.Speech; "
f"$speak = New-Object System.Speech.Synthesis.SpeechSynthesizer; "
f"$speak.Rate = {ps_rate}; "
f"$speak.Volume = {ps_volume}; "
f"$speak.Speak('{escaped_text}')"
)
# Run synchronously inside the worker thread to maintain sequential speech
subprocess.run(
["powershell", "-NoProfile", "-ExecutionPolicy", "Bypass", "-Command", ps_command],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
self.is_speaking = False
self.speech_queue.task_done()
except Exception as e:
print(f"[AlertManager] Speech worker exception: {e}")
self.is_speaking = False
time.sleep(0.5)
def speak(self, text, volume=0.8, rate=170):
"""Enqueues a text string to be spoken in the background thread."""
self.speech_queue.put((text, volume, rate))
def trigger_level1(self):
"""Level 1 Alert (3-5s closed): Soft chime, then calm voice."""
print("[AlertManager] Triggering Level 1 Alert: Calm Stay Focused")
pygame.mixer.Sound(self.calm_beep_path).play()
self.speak("Stay focused on the road", volume=0.7, rate=160)
def trigger_level2(self):
"""Level 2 Alert (>5s closed): Loud siren beep, then loud voice."""
print("[AlertManager] Triggering Level 2 Alert: Loud WAKE UP!")
pygame.mixer.Sound(self.urgent_beep_path).play()
self.speak("Wake up! Stay focused on the road!", volume=1.0, rate=190)
def trigger_level3_advisory(self):
"""Level 3 Alert (Frequent drowsiness): Ask to take a rest break on the side."""
print("[AlertManager] Triggering Level 3 Alert: Rest break advisory")
pygame.mixer.Sound(self.urgent_beep_path).play()
self.speak("You are getting drowsy frequently. Please pull over on the side and take a rest.", volume=0.9, rate=170)
def ask_energetic_song(self):
"""Ask the driver if they want to listen to an energetic song."""
print("[AlertManager] Querying driver for energetic song")
self.speak("Alright. Would you like to listen to an energetic song to help you stay awake?", volume=0.85, rate=170)
def play_energetic_music(self):
"""Announce and play energetic music."""
print("[AlertManager] Playing energetic music")
self.speak("Playing some high energy synthwave beats. Turn it up and stay alert!", volume=0.9, rate=170)
webbrowser.open(ENERGETIC_MUSIC_URL)
# 4. EyeDetector — 2x Downsampling dlib Eye Landmark Processor with Fallback
class EyeDetector:
def __init__(self):
self.scale_factor = 2 # Resizes to 50% width/height (4x speedup)
self.last_warning_time = 0
def _calculate_ear(self, eye_points):
"""Calculates the Eye Aspect Ratio (EAR) for a single eye list of 6 points."""
p1 = np.array(eye_points[0])
p2 = np.array(eye_points[1])
p3 = np.array(eye_points[2])
p4 = np.array(eye_points[3])
p5 = np.array(eye_points[4])
p6 = np.array(eye_points[5])
vertical1 = np.linalg.norm(p2 - p6)
vertical2 = np.linalg.norm(p3 - p5)
horizontal = np.linalg.norm(p1 - p4)
if horizontal == 0:
return 0.0
return (vertical1 + vertical2) / (2.0 * horizontal)
def process_frame(self, frame):
"""Processes a single BGR camera frame with a robust full-res fallback."""
height, width, _ = frame.shape
debug_frame = frame.copy()
# 1. Downsample the frame for high-speed face detection
small_frame = cv2.resize(frame, (0, 0), fx=1.0/self.scale_factor, fy=1.0/self.scale_factor)
rgb_small_frame = cv2.cvtColor(small_frame, cv2.COLOR_BGR2RGB)
# 2. Try fast downscaled detection first
face_landmarks_list = face_recognition.face_landmarks(rgb_small_frame)
current_scale = self.scale_factor
# 3. Fallback: If no face found in small frame, try the full-resolution frame!
if not face_landmarks_list:
rgb_full_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
face_landmarks_list = face_recognition.face_landmarks(rgb_full_frame)
current_scale = 1
avg_ear = None
landmarks_found = None
if face_landmarks_list:
face_landmarks = face_landmarks_list[0]
landmarks_found = face_landmarks
left_eye_raw = face_landmarks.get('left_eye', [])
right_eye_raw = face_landmarks.get('right_eye', [])
if len(left_eye_raw) == 6 and len(right_eye_raw) == 6:
# Scale coordinates back up to original frame dimensions
left_eye = [(int(x * current_scale), int(y * current_scale)) for (x, y) in left_eye_raw]
right_eye = [(int(x * current_scale), int(y * current_scale)) for (x, y) in right_eye_raw]
left_ear = self._calculate_ear(left_eye)
right_ear = self._calculate_ear(right_eye)
avg_ear = (left_ear + right_ear) / 2.0
# Draw the glowing tech HUD outlines
self._draw_eye_hud(debug_frame, left_eye, right_eye, avg_ear)
else:
# No face detected! Print throttled console warning and show overlay text
current_time = time.time()
if current_time - self.last_warning_time > 2.5:
print("[EyeDetector] WARNING: No face detected in camera stream! Adjust position or lighting.")
self.last_warning_time = current_time
# Draw warning overlay on dashboard feed
cv2.putText(debug_frame, "NO FACE DETECTED", (width // 2 - 120, height // 2),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
cv2.putText(debug_frame, "Adjust Camera / Lighting", (width // 2 - 140, height // 2 + 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 165, 255), 1)
return avg_ear, landmarks_found, debug_frame
def _draw_eye_hud(self, frame, left_eye, right_eye, ear):
"""Draws glowing HUD tech contours on eyes and shows EAR readout."""
if ear is not None and ear < EAR_THRESHOLD:
color = (0, 0, 255) # Red: Closed/Drowsy
thickness = 2
else:
color = (0, 255, 0) # Green: Open/Safe
thickness = 1
left_pts = np.array(left_eye, np.int32)
cv2.polylines(frame, [left_pts], True, color, thickness)
right_pts = np.array(right_eye, np.int32)
cv2.polylines(frame, [right_pts], True, color, thickness)
for (x, y) in left_eye + right_eye:
cv2.circle(frame, (x, y), 2, (255, 255, 0), -1)
if ear is not None:
text = f"EAR: {ear:.2f}"
cv2.putText(frame, text, (30, 40), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
# 5. VoiceAssistant — Dynamic Action Router & Background Speech Recognition
class VoiceAssistant:
def __init__(self, alert_manager, state_callbacks):
self.alert_manager = alert_manager
self.callbacks = state_callbacks
self.recognizer = sr.Recognizer()
self.recognizer.energy_threshold = 4000
self.recognizer.dynamic_energy_threshold = True
self.running = True
self.thread = threading.Thread(target=self._assistant_loop, daemon=True)
self.thread.start()
def query_ollama_slm(self, prompt):
"""Sends user transcription to the local custom drivesafe SLM on Ollama."""
payload = {
"model": OLLAMA_MODEL,
"prompt": prompt,
"stream": False
}
headers = {"Content-Type": "application/json"}
try:
req = urllib.request.Request(
OLLAMA_API_URL,
data=json.dumps(payload).encode("utf-8"),
headers=headers,
method="POST"
)
# Use a 10-second timeout to accommodate initial Ollama cold start weight loading
with urllib.request.urlopen(req, timeout=10) as response:
res_data = json.loads(response.read().decode("utf-8"))
reply = res_data.get("response", "").strip()
# Clean up any quotes or markdown from the SLM
reply = reply.replace('"', '').replace('*', '').strip()
return reply
except Exception as e:
print(f"[Ollama SLM] Error or timeout querying local model: {e}")
lower_prompt = prompt.lower()
if "hello" in lower_prompt or "hi" in lower_prompt:
return "Hello! I am here. Eyes on the road, friend."
elif "joke" in lower_prompt:
return "Why did the scarecrow win an award? Because he was outstanding in his field. Stay alert!"
else:
return "Understood. Keep driving safely, stay focused on the road."
def _assistant_loop(self):
"""Background continuous microphone listening loop."""
print("[VoiceAssistant] Speech recognizer thread started.")
try:
mic = sr.Microphone()
except Exception as e:
print(f"[VoiceAssistant] Error accessing microphone: {e}. Voice controls disabled.")
return
with mic as source:
print("[VoiceAssistant] Calibrating microphone for driving background noise...")
self.recognizer.adjust_for_ambient_noise(source, duration=2)
print("[VoiceAssistant] Calibration complete. Ready for voice interaction.")
while self.running:
if self.alert_manager.is_speaking:
time.sleep(0.3)
continue
try:
audio = self.recognizer.listen(source, timeout=1.5, phrase_time_limit=4.0)
except sr.WaitTimeoutError:
continue
except Exception as e:
print(f"[VoiceAssistant] Microphone capture error: {e}")
time.sleep(0.5)
continue
if self.alert_manager.is_speaking:
continue
# Run speech recognition in a separate thread to keep mic pipeline responsive
threading.Thread(target=self._process_audio, args=(audio,), daemon=True).start()
def _process_audio(self, audio):
"""Recognizes speech and routes commands dynamically."""
try:
text = self.recognizer.recognize_google(audio)
print(f"[Driver Heard] {text}")
except sr.UnknownValueError:
return
except sr.RequestError:
try:
print("[VoiceAssistant] Cloud Speech API unavailable. Attempting local Whisper...")
text = self.recognizer.recognize_whisper(audio, model="base.en")
print(f"[Driver Heard (Whisper)] {text}")
except Exception as e:
print(f"[VoiceAssistant] Offline recognition failed: {e}")
return
cleaned_text = text.strip().lower()
if not cleaned_text:
return
# STATE-SPECIFIC ROUTING (Emergency Rest / Song Prompts)
current_state = self.callbacks['get_system_state']()
# 1. State: Driver has been warned of frequent drowsiness (Level 3 Advisory)
if current_state == "WAITING_REST_RESPONSE":
refusal_words = ["no", "never", "can't", "wont", "won't", "refuse", "impossible", "fine", "good", "no thanks", "no rest", "keep driving"]
accepted_words = ["yes", "yeah", "ok", "okay", "fine I will", "sure", "pulling over"]
if any(word in cleaned_text for word in refusal_words):
print("[VoiceAssistant] Driver refused rest. Prompting for energetic song.")
self.callbacks['set_system_state']("WAITING_SONG_RESPONSE")
self.callbacks['add_chat_log'](text, "No, I'm fine. I won't stop.")
time.sleep(0.5)
self.alert_manager.ask_energetic_song()
self.callbacks['add_chat_log']("System", "Alright. Would you like to listen to an energetic song to help you stay awake?")
return
elif any(word in cleaned_text for word in accepted_words) or "pull" in cleaned_text:
print("[VoiceAssistant] Driver accepted rest.")
self.callbacks['reset_warnings']()
self.callbacks['add_chat_log'](text, "Okay, pulling over.")
self.alert_manager.speak("Good decision. Pull over safely and take some rest.")
self.callbacks['add_chat_log']("System", "Good decision. Pull over safely and take some rest.")
return
# 2. State: Driver refused rest, now confirming if they want a song
elif current_state == "WAITING_SONG_RESPONSE":
accepted_words = ["yes", "yeah", "sure", "ok", "okay", "play", "song", "music", "please"]
if any(word in cleaned_text for word in accepted_words):
print("[VoiceAssistant] Driver accepted song.")
self.callbacks['add_chat_log'](text, "Yes, play some music.")
self.alert_manager.play_energetic_music()
self.callbacks['add_chat_log']("System", "Playing energetic synthwave beats! Stay awake!")
self.callbacks['set_system_state']("PLAYING_MUSIC")
return
else:
print("[VoiceAssistant] Driver declined song.")
self.callbacks['add_chat_log'](text, "No, I'm okay.")
self.alert_manager.speak("Understood. Keep your eyes on the road. Stay focused.")
self.callbacks['add_chat_log']("System", "Understood. Keep your eyes on the road. Stay focused.")
self.callbacks['reset_warnings']()
return
# DIRECT SYSTEM BACKUP COMMANDS (Local Regex Override)
if "reset" in cleaned_text or "clear" in cleaned_text or "awake" in cleaned_text or "focused" in cleaned_text:
print("[VoiceAssistant] Safe state reset command received.")
self.callbacks['reset_warnings']()
self.callbacks['add_chat_log'](text, "Reset assistant")
self.alert_manager.speak("System reset. Let's keep driving safely.")
self.callbacks['add_chat_log']("System", "System reset. Let's keep driving safely.")
return
has_play = any(p in cleaned_text for p in ["play", "start", "turn on", "listen", "put on", "launch"])
has_music_kw = any(kw in cleaned_text for kw in ["music", "song", "beat", "tune", "track", "musc", "melody", "audio", "lofi"])
if has_play:
# Extract query after the play keyword
play_keyword = next((p for p in ["play", "start", "turn on", "listen to", "put on", "launch"] if p in cleaned_text), "play")
idx = cleaned_text.find(play_keyword)
music_query = cleaned_text[idx + len(play_keyword):].strip()
# Clean common filler words
for filler in ["some", "a", "the", "music", "song", "track", "musc"]:
if music_query.startswith(filler):
music_query = music_query[len(filler):].strip()
if music_query.endswith(filler):
music_query = music_query[:-len(filler)].strip()
# If the remaining query is empty or generic, play the custom playlist
if not music_query or music_query in ["music", "song", "beat", "tune", "track", "musc", "melody"]:
print("[VoiceAssistant] General music command recognized locally. Playing playlist.")
self.callbacks['add_chat_log'](text, "Requested general music playback")
self.alert_manager.play_energetic_music()
self.callbacks['set_system_state']("PLAYING_MUSIC")
return
else:
# Play specific song directly!
print(f"[VoiceAssistant] Specific song command recognized locally: {music_query}")
confirm_msg = f"Sure thing! Autoplay in progress for {music_query}."
self.callbacks['add_chat_log'](text, confirm_msg)
self.alert_manager.speak(confirm_msg)
# Fetch first search result and autoplay!
search_url = f"https://www.youtube.com/results?search_query={urllib.parse.quote(music_query)}"
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'}
try:
req = urllib.request.Request(search_url, headers=headers)
with urllib.request.urlopen(req, timeout=5) as res:
html = res.read().decode('utf-8')
video_ids = re.findall(r'/watch\?v=([a-zA-Z0-9_-]{11})', html)
if video_ids:
first_video_id = video_ids[0]
direct_url = f"https://www.youtube.com/watch?v={first_video_id}&autoplay=1"
print(f"[VoiceAssistant] Auto-playing first matching YouTube video: {direct_url}")
webbrowser.open(direct_url)
else:
webbrowser.open(search_url)
except Exception as e:
print(f"[VoiceAssistant] Autoplay scraper failed: {e}. Falling back to search page.")
webbrowser.open(search_url)
self.callbacks['set_system_state']("PLAYING_MUSIC")
return
has_stop = any(s in cleaned_text for s in ["stop", "pause", "turn off", "mute", "quiet", "halt", "shut up"])
if has_stop and (has_music_kw or "music" in cleaned_text or "song" in cleaned_text or "sound" in cleaned_text or "radio" in cleaned_text):
print("[VoiceAssistant] Flexible stop music command recognized.")
# Simulate media play/pause key to halt browser/audio stream
import ctypes
VK_MEDIA_PLAY_PAUSE = 0xB3
try:
ctypes.windll.user32.keybd_event(VK_MEDIA_PLAY_PAUSE, 0, 0, 0)
ctypes.windll.user32.keybd_event(VK_MEDIA_PLAY_PAUSE, 0, 2, 0)
except Exception as e:
print(f"[VoiceAssistant] Failed simulating media key: {e}")
self.callbacks['set_system_state']("NORMAL")
self.callbacks['add_chat_log'](text, "Stop the music")
self.alert_manager.speak("Stopping the music. Keep your eyes on the road.")
self.callbacks['add_chat_log']("System", "Music stopped.")
return
# CONVERSATIONAL LOCAL SLM (Always Active - No Wake Word/Filters Required!)
# Route ANY general speech dynamically straight to our local Ollama custom model!
print(f"[Ollama Query] {text}")
reply = self.query_ollama_slm(text)
print(f"[SLM Reply] {reply}")
# Check if Ollama returned a dynamic PLAY action tag (e.g. "[PLAY] paint it black")
if "[play]" in reply.lower():
match = re.search(r'\[play\]\s*(.*)', reply, re.IGNORECASE)
if match:
music_query = match.group(1).strip()
music_query = music_query.replace('"', '').replace('[', '').replace(']', '').strip()
confirm_msg = f"Sure thing! Autoplay in progress for {music_query}."
self.callbacks['add_chat_log'](text, confirm_msg)
self.alert_manager.speak(confirm_msg)
# Fetch the first search result from YouTube dynamically and play it directly!
search_url = f"https://www.youtube.com/results?search_query={urllib.parse.quote(music_query)}"
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'}
try:
req = urllib.request.Request(search_url, headers=headers)
with urllib.request.urlopen(req, timeout=5) as res:
html = res.read().decode('utf-8')
# Search for video watch paths
video_ids = re.findall(r'/watch\?v=([a-zA-Z0-9_-]{11})', html)
if video_ids:
first_video_id = video_ids[0]
direct_url = f"https://www.youtube.com/watch?v={first_video_id}&autoplay=1"
print(f"[VoiceAssistant] Auto-playing first matching YouTube video: {direct_url}")
webbrowser.open(direct_url)
else:
webbrowser.open(search_url)
except Exception as e:
print(f"[VoiceAssistant] Autoplay scraper failed: {e}. Falling back to search page.")
webbrowser.open(search_url)
self.callbacks['set_system_state']("PLAYING_MUSIC")
return
# Check if Ollama returned a dynamic STOP action tag (e.g. "[STOP]")
if "[stop]" in reply.lower():
match = re.search(r'\[stop\]\s*(.*)', reply, re.IGNORECASE)
clean_reply = match.group(1).strip() if match else "Stopping the music. Keep your eyes on the road!"
clean_reply = clean_reply.replace('[', '').replace(']', '').strip()
print("[VoiceAssistant] Action STOP triggered dynamically by Ollama.")
# Simulate media play/pause key to stop the browser audio stream
import ctypes
VK_MEDIA_PLAY_PAUSE = 0xB3
try:
ctypes.windll.user32.keybd_event(VK_MEDIA_PLAY_PAUSE, 0, 0, 0)
ctypes.windll.user32.keybd_event(VK_MEDIA_PLAY_PAUSE, 0, 2, 0)
except Exception as e:
print(f"[VoiceAssistant] Failed simulating media key: {e}")
self.callbacks['set_system_state']("NORMAL")
self.callbacks['add_chat_log'](text, clean_reply)
self.alert_manager.speak(clean_reply)
return
# Check if Ollama returned a dynamic RESET action tag (e.g. "[RESET]")
if "[reset]" in reply.lower():
match = re.search(r'\[reset\]\s*(.*)', reply, re.IGNORECASE)
clean_reply = match.group(1).strip() if match else "System warnings cleared. Drive safely!"
clean_reply = clean_reply.replace('[', '').replace(']', '').strip()
print("[VoiceAssistant] Action RESET triggered dynamically by Ollama.")
self.callbacks['reset_warnings']()
self.callbacks['add_chat_log'](text, clean_reply)
self.alert_manager.speak(clean_reply)
return
# General conversational response
self.callbacks['add_chat_log'](text, reply)
self.alert_manager.speak(reply)
def stop(self):
"""Stops the assistant background thread."""
self.running = False
# 6. SafeDrivingAssistant Core Engine & Orchestrator Coordinator
class SafeDrivingAssistant:
def __init__(self):
print("[CoreEngine] Initializing Safe Driving Assistant...")
# Initialize Audio Alert & Sound Synthesizer
self.alert_manager = AlertManager()
# Initialize face_recognition Eye Landmark Processor
self.detector = EyeDetector()
# Tracking states and timelines
self.consec_closed_frames = 0
self.eyes_closed_start_time = None
self.active_alert_level = 0 # 0: None, 1: Stay Focused, 2: Wake Up Loud
# Rolling log of drowsiness timestamps to monitor frequency
self.drowsiness_events = collections.deque()
# Keyboard reset helper
self.last_key_press = None
# Setup conversational callbacks for our Voice Assistant & SLM
self.callbacks = {
'get_system_state': self.get_system_state,
'set_system_state': self.set_system_state,
'reset_warnings': self.reset_warnings,
'add_chat_log': self.add_chat_log
}
# Bind callbacks back to Flask REST API endpoints
app.reset_callback = self.reset_warnings
app.play_music_callback = self.play_energetic_music
# Initialize speech listener thread
self.assistant = VoiceAssistant(self.alert_manager, self.callbacks)
# Boot Flask HUD Web Dashboard in the background
start_server_async()
# Coordinator Callback Handlers
def get_system_state(self):
"""Thread-safe state getter for the Voice Assistant."""
with dashboard_state.lock:
return dashboard_state.state
def set_system_state(self, new_state):
"""Thread-safe state setter for the Voice Assistant."""
with dashboard_state.lock:
dashboard_state.state = new_state
if new_state == "NORMAL":
dashboard_state.alert_message = ""
elif new_state == "WAITING_REST_RESPONSE":
dashboard_state.alert_message = "ADVISING REST BREAK"
elif new_state == "WAITING_SONG_RESPONSE":
dashboard_state.alert_message = "OFFERING ENERGETIC MUSIC"
elif new_state == "PLAYING_MUSIC":
dashboard_state.alert_message = "PLAYING HIGH ENERGY BEATS"
def reset_warnings(self):
"""Complete reset of all active alarms, timers, and warning metrics."""
print("[CoreEngine] Performing comprehensive system alert reset.")
with dashboard_state.lock:
dashboard_state.state = "NORMAL"
dashboard_state.alert_message = ""
dashboard_state.drowsiness_count = 0
self.consec_closed_frames = 0
self.eyes_closed_start_time = None
self.active_alert_level = 0
self.drowsiness_events.clear()
# Enqueue a log message
self.add_chat_log("System", "System alerts and warnings reset to NORMAL.")
def add_chat_log(self, user_query, slm_reply=""):
"""Pushes voice transcripts to the dashboard log log history."""
with dashboard_state.lock:
if user_query == "System":
dashboard_state.chat_history.append({
"speaker": "System",
"message": slm_reply
})
else:
dashboard_state.chat_history.append({
"speaker": "Driver",
"query": user_query,
"message": slm_reply
})
def play_energetic_music(self):
"""Orchestrator hook to trigger the energetic music sequence."""
self.set_system_state("PLAYING_MUSIC")
self.alert_manager.play_energetic_music()
self.add_chat_log("System", "Energetic synthwave music started. Stay alert!")
# Core Drowsiness Evaluation & Loop
def run(self):
"""Main camera acquisition loop that drives the safe assistant."""
print("[CoreEngine] Accessing camera stream...")
cap = cv2.VideoCapture(CAMERA_ID)
# Configure video dimension overrides from settings
cap.set(cv2.CAP_PROP_FRAME_WIDTH, FRAME_WIDTH)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, FRAME_HEIGHT)
# Detect if we are in a headless cloud environment without a webcam
use_simulation = False
if not cap.isOpened():
print("[CoreEngine] WARNING: Could not access physical web camera.")
print("[CoreEngine] Pivoting to Cloud Simulation Mode to keep web HUD alive...")
use_simulation = True
else:
print("[CoreEngine] Camera stream operational. System fully active.")
print("[CoreEngine] System loop running. Use the Web Dashboard to monitor telemetry.")
prev_time = time.time()
try:
while True:
current_time = time.time()
if use_simulation:
# Generate an animated cyberpunk grid frame for the headless dashboard
frame = np.zeros((FRAME_HEIGHT, FRAME_WIDTH, 3), dtype=np.uint8)
# Create a scrolling scan line
scan_y = int(current_time * 120) % FRAME_HEIGHT
cv2.line(frame, (0, scan_y), (FRAME_WIDTH, scan_y), (40, 40, 40), 2)
cv2.putText(frame, "CLOUD SIMULATION FEED (NO PHYSICAL CAM)", (20, FRAME_HEIGHT - 20),
cv2.FONT_HERSHEY_SIMPLEX, 0.4, (0, 255, 0), 1)
ret = True
else:
ret, frame = cap.read()
if not ret:
time.sleep(0.01)
continue
# Mirror frame for intuitive pilot HUD overlay
frame = cv2.flip(frame, 1)
# Check if tracking is active (controlled from Dashboard)
with dashboard_state.lock:
active = dashboard_state.detection_active
if active:
if use_simulation:
# Cloud Demo Mode: Automatically simulate a drowsy event cycle every 25 seconds
# to let you test your Flask dashboard overlays and system responses safely!
cycle = int(current_time) % 25
if cycle > 18: # Simulate closed eyes for 7 seconds
ear = 0.16
cv2.putText(frame, "SIMULATING DROWSINESS (EYES CLOSED)", (FRAME_WIDTH // 2 - 180, FRAME_HEIGHT // 2),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)
else:
ear = 0.28
processed_frame = frame.copy()
# Draw virtual telemetry eye dots onto the matrix background
cv2.circle(processed_frame, (int(FRAME_WIDTH * 0.4), int(FRAME_HEIGHT * 0.45)), 8, (0, 255, 0) if ear > EAR_THRESHOLD else (0, 0, 255), -1)
cv2.circle(processed_frame, (int(FRAME_WIDTH * 0.6), int(FRAME_HEIGHT * 0.45)), 8, (0, 255, 0) if ear > EAR_THRESHOLD else (0, 0, 255), -1)
if ear is not None:
cv2.putText(processed_frame, f"EAR: {ear:.2f}", (30, 40), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0) if ear > EAR_THRESHOLD else (0, 0, 255), 2)
landmarks = {}
else:
# Calculate EAR and overlay glow contours on frame via physical camera
ear, landmarks, processed_frame = self.detector.process_frame(frame)
else:
processed_frame = frame.copy()
cv2.putText(processed_frame, "TRACKING PAUSED", (50, 50),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 165, 255), 2)
ear = None
# Calculate processing frame rate (FPS)
fps = int(1.0 / (current_time - prev_time)) if (current_time - prev_time) > 0 else 30
prev_time = current_time
# Drowsiness Logic Decision Engine
if ear is not None and active:
if ear < EAR_THRESHOLD:
self.consec_closed_frames += 1
# Once consecutive frames pass noise filter, start duration timer
if self.consec_closed_frames >= EAR_CONSEC_FRAMES:
if self.eyes_closed_start_time is None:
self.eyes_closed_start_time = current_time
else:
closed_duration = current_time - self.eyes_closed_start_time
# LEVEL 1: Eyes Closed 3-5 seconds
if ALERT_LEVEL1_MIN <= closed_duration < ALERT_LEVEL1_MAX:
if self.active_alert_level < 1:
self.active_alert_level = 1
self.set_system_state("CLOSED_3S")
self.alert_manager.trigger_level1()
# Record timestamp to sliding frequency tracker
self.drowsiness_events.append(current_time)
with dashboard_state.lock:
dashboard_state.drowsiness_count += 1
self.add_chat_log("System", "WARNING: Eyes closed for 3 seconds! Stay focused!")
# LEVEL 2: Eyes Closed > 5 seconds (Louder Warning!)
elif closed_duration >= ALERT_LEVEL2_MIN:
if self.active_alert_level < 2:
self.active_alert_level = 2
self.set_system_state("CLOSED_5S")
self.alert_manager.trigger_level2()
# Record second timestamp
self.drowsiness_events.append(current_time)
with dashboard_state.lock:
dashboard_state.drowsiness_count += 1
self.add_chat_log("System", "CRITICAL ALARM: Eyes closed for 5+ seconds! WAKE UP!")
else:
# Eyes are open! Reset filters and check for Level 3 Advisory escalation
self.consec_closed_frames = 0
if self.eyes_closed_start_time is not None:
self.eyes_closed_start_time = None
while self.drowsiness_events and (current_time - self.drowsiness_events[0] > FREQUENT_DROWSY_WINDOW):
self.drowsiness_events.popleft()
# LEVEL 3: Frequent Drowsiness check (if closed events occur >= limit in last 60s)
if len(self.drowsiness_events) >= FREQUENT_DROWSY_LIMIT:
print(f"[CoreEngine] Frequent drowsiness detected ({len(self.drowsiness_events)} events in 60s). Escalating to Level 3.")
self.set_system_state("WAITING_REST_RESPONSE")
self.alert_manager.trigger_level3_advisory()
self.add_chat_log("System", "FREQUENT DROWSINESS DETECTED. Prompting driver to pull over.")
else:
# Normal recovery
current_state = self.get_system_state()
if current_state not in ["WAITING_REST_RESPONSE", "WAITING_SONG_RESPONSE"]:
self.set_system_state("NORMAL")
self.active_alert_level = 0
else:
self.consec_closed_frames = 0
self.eyes_closed_start_time = None
# Update Global Telemetry Buffer for Flask Server
with dashboard_state.lock:
dashboard_state.latest_frame = processed_frame.copy()
if ear is not None:
dashboard_state.ear = ear
else:
dashboard_state.ear = 0.30 # Default baseline when no face present
dashboard_state.fps = fps
# OpenCV display output fallback (wrapped safely to prevent headless display context drops)
try:
cv2.imshow("DriveSafe HUD AI Console", processed_frame)
key = cv2.waitKey(1) & 0xFF
if key == ord('q') or key == 27:
print("[CoreEngine] Exit key received. Terminating system.")
break
elif key == ord('r'):
self.reset_warnings()
except Exception:
# Prevents crashes on platforms where standard desktop window pipelines are fully restricted
time.sleep(0.03)
except KeyboardInterrupt:
print("[CoreEngine] Keyboard interrupt. Shutting down.")
finally:
print("[CoreEngine] Releasing resources...")
cap.release()
try:
cv2.destroyAllWindows()
except Exception:
pass
self.assistant.stop()
sys.exit(0)
if __name__ == "__main__":
assistant_app = SafeDrivingAssistant()
assistant_app.run()
|