| | """Notification sounds for Wan2GP video generation application.
|
| |
|
| | Pure Python audio notification system with multiple backend support.
|
| | Set WAN2GP_DISABLE_AUDIO=1 on headless servers to skip pygame/sounddevice initialization.
|
| | Set WAN2GP_FORCE_AUDIO=1 to bypass the automatic device detection when audio hardware is available.
|
| | """
|
| |
|
| | import os
|
| | import sys
|
| | import threading
|
| | from pathlib import Path
|
| |
|
| | import numpy as np
|
| |
|
| | os.environ["PYGAME_HIDE_SUPPORT_PROMPT"] = "1"
|
| |
|
| | _cached_waveforms = {}
|
| | _sample_rate = 44100
|
| | _mixer_initialized = False
|
| | _mixer_lock = threading.Lock()
|
| | _TRUE_VALUES = {"1", "true", "yes", "on"}
|
| |
|
| |
|
| | def _env_flag(name):
|
| | value = os.environ.get(name, "")
|
| | return value.strip().lower() in _TRUE_VALUES
|
| |
|
| |
|
| | _FORCE_AUDIO_BACKENDS = _env_flag("WAN2GP_FORCE_AUDIO")
|
| | _DISABLE_AUDIO_BACKENDS = _env_flag("WAN2GP_DISABLE_AUDIO")
|
| | _audio_support_cache = None
|
| | _audio_support_reason = None
|
| | _audio_backends_failed = False
|
| | _last_beep_notice = None
|
| |
|
| |
|
| | def _linux_audio_available():
|
| | cards_path = Path("/proc/asound/cards")
|
| | if not cards_path.exists():
|
| | return False, "/proc/asound/cards is missing (no ALSA devices)"
|
| | try:
|
| | cards_content = cards_path.read_text(errors="ignore")
|
| | except (OSError, UnicodeDecodeError):
|
| | cards_content = ""
|
| | if not cards_content.strip():
|
| | return False, "ALSA reports no soundcards (/proc/asound/cards is empty)"
|
| | if "no soundcards" in cards_content.lower():
|
| | return False, "ALSA reports no soundcards (/proc/asound/cards)"
|
| |
|
| | snd_path = Path("/dev/snd")
|
| | if not snd_path.exists():
|
| | return False, "/dev/snd is not available"
|
| | try:
|
| | device_names = [entry.name for entry in snd_path.iterdir()]
|
| | except PermissionError:
|
| | return False, "no permission to inspect /dev/snd"
|
| | pcm_like = [
|
| | name
|
| | for name in device_names
|
| | if name.startswith(("pcm", "controlC", "hwC", "midiC"))
|
| | ]
|
| | if not pcm_like:
|
| | return False, "no ALSA pcm/control devices exposed under /dev/snd"
|
| | return True, None
|
| |
|
| |
|
| | def _detect_audio_support():
|
| | if _FORCE_AUDIO_BACKENDS:
|
| | return True, None
|
| | if _DISABLE_AUDIO_BACKENDS:
|
| | return False, "disabled via WAN2GP_DISABLE_AUDIO"
|
| | if sys.platform.startswith("linux"):
|
| | return _linux_audio_available()
|
| | return True, None
|
| |
|
| |
|
| | def _should_try_audio_backends():
|
| | global _audio_support_cache, _audio_support_reason
|
| | if _audio_backends_failed and not _FORCE_AUDIO_BACKENDS:
|
| | return False
|
| | if _audio_support_cache is None:
|
| | _audio_support_cache, _audio_support_reason = _detect_audio_support()
|
| | return _audio_support_cache
|
| |
|
| |
|
| | def _terminal_beep(reason=None):
|
| | global _last_beep_notice
|
| | message = "Audio notification fallback: using terminal beep."
|
| | if reason:
|
| | message = f"Audio notification fallback: {reason}; using terminal beep."
|
| | if message != _last_beep_notice:
|
| | print(message)
|
| | _last_beep_notice = message
|
| | sys.stdout.write("\a")
|
| | sys.stdout.flush()
|
| |
|
| | def _generate_notification_beep(volume=50, sample_rate=_sample_rate):
|
| | """Generate pleasant C major chord notification sound"""
|
| | if volume == 0:
|
| | return np.array([])
|
| |
|
| | volume = max(0, min(100, volume))
|
| |
|
| |
|
| | if volume <= 25:
|
| | volume_mapped = (volume / 25.0) * 0.5
|
| | elif volume <= 50:
|
| | volume_mapped = 0.5 + ((volume - 25) / 25.0) * 0.25
|
| | elif volume <= 75:
|
| | volume_mapped = 0.75 + ((volume - 50) / 25.0) * 0.25
|
| | else:
|
| | volume_mapped = 1.0 + ((volume - 75) / 25.0) * 0.05
|
| |
|
| | volume = volume_mapped
|
| |
|
| |
|
| | freq_c, freq_e, freq_g = 261.63, 329.63, 392.00
|
| | duration = 0.8
|
| | t = np.linspace(0, duration, int(sample_rate * duration), False)
|
| |
|
| |
|
| | wave = (
|
| | np.sin(freq_c * 2 * np.pi * t) * 0.4
|
| | + np.sin(freq_e * 2 * np.pi * t) * 0.3
|
| | + np.sin(freq_g * 2 * np.pi * t) * 0.2
|
| | )
|
| |
|
| |
|
| | max_amplitude = np.max(np.abs(wave))
|
| | if max_amplitude > 0:
|
| | wave = wave / max_amplitude * 0.8
|
| |
|
| |
|
| | def apply_adsr_envelope(wave_data):
|
| | length = len(wave_data)
|
| | attack_time = int(0.2 * length)
|
| | decay_time = int(0.1 * length)
|
| | release_time = int(0.5 * length)
|
| |
|
| | envelope = np.ones(length)
|
| |
|
| | if attack_time > 0:
|
| | envelope[:attack_time] = np.power(np.linspace(0, 1, attack_time), 3)
|
| |
|
| | if decay_time > 0:
|
| | start_idx, end_idx = attack_time, attack_time + decay_time
|
| | envelope[start_idx:end_idx] = np.linspace(1, 0.85, decay_time)
|
| |
|
| | if release_time > 0:
|
| | start_idx = length - release_time
|
| | envelope[start_idx:] = 0.85 * np.exp(-4 * np.linspace(0, 1, release_time))
|
| |
|
| | return wave_data * envelope
|
| |
|
| | wave = apply_adsr_envelope(wave)
|
| |
|
| |
|
| | def simple_lowpass_filter(signal, cutoff_ratio=0.8):
|
| | window_size = max(3, int(len(signal) * 0.001))
|
| | if window_size % 2 == 0:
|
| | window_size += 1
|
| |
|
| | kernel = np.ones(window_size) / window_size
|
| | padded = np.pad(signal, window_size // 2, mode="edge")
|
| | filtered = np.convolve(padded, kernel, mode="same")
|
| | return filtered[window_size // 2 : -window_size // 2]
|
| |
|
| | wave = simple_lowpass_filter(wave)
|
| |
|
| |
|
| | if len(wave) > sample_rate // 4:
|
| | delay_samples = int(0.12 * sample_rate)
|
| | reverb = np.zeros_like(wave)
|
| | reverb[delay_samples:] = wave[:-delay_samples] * 0.08
|
| | wave = wave + reverb
|
| |
|
| |
|
| | wave = wave * volume * 0.5
|
| | max_amplitude = np.max(np.abs(wave))
|
| | if max_amplitude > 0.85:
|
| | wave = wave / max_amplitude * 0.85
|
| |
|
| | return wave
|
| |
|
| | def _get_cached_waveform(volume):
|
| | """Return cached waveform for volume"""
|
| | if volume not in _cached_waveforms:
|
| | _cached_waveforms[volume] = _generate_notification_beep(volume)
|
| | return _cached_waveforms[volume]
|
| |
|
| |
|
| | def play_audio_with_pygame(audio_data, sample_rate=_sample_rate):
|
| | """Play audio with pygame backend"""
|
| | global _mixer_initialized
|
| | try:
|
| | import pygame
|
| |
|
| | with _mixer_lock:
|
| | if not _mixer_initialized:
|
| | pygame.mixer.pre_init(frequency=sample_rate, size=-16, channels=2, buffer=512)
|
| | pygame.mixer.init()
|
| | _mixer_initialized = True
|
| |
|
| | mixer_info = pygame.mixer.get_init()
|
| | if mixer_info is None or mixer_info[2] != 2:
|
| | return False
|
| |
|
| | audio_int16 = (audio_data * 32767).astype(np.int16)
|
| | if len(audio_int16.shape) > 1:
|
| | audio_int16 = audio_int16.flatten()
|
| |
|
| | stereo_data = np.zeros((len(audio_int16), 2), dtype=np.int16)
|
| | stereo_data[:, 0] = audio_int16
|
| | stereo_data[:, 1] = audio_int16
|
| |
|
| | sound = pygame.sndarray.make_sound(stereo_data)
|
| | pygame.mixer.stop()
|
| | sound.play()
|
| |
|
| | duration_ms = int(len(audio_data) / sample_rate * 1000) + 50
|
| | pygame.time.wait(duration_ms)
|
| |
|
| | return True
|
| |
|
| | except ImportError:
|
| | return False
|
| | except Exception as e:
|
| | print(f"Pygame error: {e}")
|
| | return False
|
| |
|
| | def play_audio_with_sounddevice(audio_data, sample_rate=_sample_rate):
|
| | """Play audio using sounddevice backend"""
|
| | try:
|
| | import sounddevice as sd
|
| | sd.play(audio_data, sample_rate)
|
| | sd.wait()
|
| | return True
|
| | except ImportError:
|
| | return False
|
| | except Exception as e:
|
| | print(f"Sounddevice error: {e}")
|
| | return False
|
| |
|
| | def play_audio_with_winsound(audio_data, sample_rate=_sample_rate):
|
| | """Play audio using winsound backend (Windows only)"""
|
| | if sys.platform != "win32":
|
| | return False
|
| | try:
|
| | import winsound, wave, tempfile, uuid
|
| |
|
| | temp_dir = tempfile.gettempdir()
|
| | temp_filename = os.path.join(temp_dir, f"notification_{uuid.uuid4().hex}.wav")
|
| |
|
| | try:
|
| | with wave.open(temp_filename, "w") as wav_file:
|
| | wav_file.setnchannels(1)
|
| | wav_file.setsampwidth(2)
|
| | wav_file.setframerate(sample_rate)
|
| | audio_int16 = (audio_data * 32767).astype(np.int16)
|
| | wav_file.writeframes(audio_int16.tobytes())
|
| |
|
| | winsound.PlaySound(temp_filename, winsound.SND_FILENAME)
|
| |
|
| | finally:
|
| | try:
|
| | if os.path.exists(temp_filename):
|
| | os.unlink(temp_filename)
|
| | except:
|
| | pass
|
| |
|
| | return True
|
| | except ImportError:
|
| | return False
|
| | except Exception as e:
|
| | print(f"Winsound error: {e}")
|
| | return False
|
| |
|
| | def play_notification_sound(volume=50):
|
| | """Play notification sound with specified volume"""
|
| | if volume == 0:
|
| | return
|
| |
|
| | audio_data = _get_cached_waveform(volume)
|
| | if len(audio_data) == 0:
|
| | return
|
| |
|
| | if not _should_try_audio_backends():
|
| | reason = _audio_support_reason or "audio backends unavailable"
|
| | _terminal_beep(reason)
|
| | return
|
| |
|
| | audio_backends = [play_audio_with_pygame, play_audio_with_sounddevice, play_audio_with_winsound]
|
| | for backend in audio_backends:
|
| | try:
|
| | if backend(audio_data):
|
| | return
|
| | except Exception:
|
| | continue
|
| |
|
| | global _audio_backends_failed
|
| | _audio_backends_failed = True
|
| | _terminal_beep("all audio backends failed")
|
| |
|
| | def play_notification_async(volume=50):
|
| | """Play notification sound asynchronously (non-blocking)"""
|
| | def play_sound():
|
| | try:
|
| | play_notification_sound(volume)
|
| | except Exception as e:
|
| | print(f"Error playing notification sound: {e}")
|
| |
|
| | threading.Thread(target=play_sound, daemon=True).start()
|
| |
|
| | def notify_video_completion(video_path=None, volume=50):
|
| | """Notify about completed video generation"""
|
| | play_notification_async(volume)
|
| |
|
| | for vol in (25, 50, 75, 100):
|
| | _get_cached_waveform(vol) |