ltx2 / Wan2GP /shared /utils /notification_sound.py
vidfom's picture
Upload folder using huggingface_hub
31112ad verified
"""Notification sounds for Wan2GP video generation application.
Pure Python audio notification system with multiple backend support.
Set WAN2GP_DISABLE_AUDIO=1 on headless servers to skip pygame/sounddevice initialization.
Set WAN2GP_FORCE_AUDIO=1 to bypass the automatic device detection when audio hardware is available.
"""
import os
import sys
import threading
from pathlib import Path
import numpy as np
os.environ["PYGAME_HIDE_SUPPORT_PROMPT"] = "1"
_cached_waveforms = {}
_sample_rate = 44100
_mixer_initialized = False
_mixer_lock = threading.Lock()
_TRUE_VALUES = {"1", "true", "yes", "on"}
def _env_flag(name):
value = os.environ.get(name, "")
return value.strip().lower() in _TRUE_VALUES
_FORCE_AUDIO_BACKENDS = _env_flag("WAN2GP_FORCE_AUDIO")
_DISABLE_AUDIO_BACKENDS = _env_flag("WAN2GP_DISABLE_AUDIO")
_audio_support_cache = None
_audio_support_reason = None
_audio_backends_failed = False
_last_beep_notice = None
def _linux_audio_available():
cards_path = Path("/proc/asound/cards")
if not cards_path.exists():
return False, "/proc/asound/cards is missing (no ALSA devices)"
try:
cards_content = cards_path.read_text(errors="ignore")
except (OSError, UnicodeDecodeError):
cards_content = ""
if not cards_content.strip():
return False, "ALSA reports no soundcards (/proc/asound/cards is empty)"
if "no soundcards" in cards_content.lower():
return False, "ALSA reports no soundcards (/proc/asound/cards)"
snd_path = Path("/dev/snd")
if not snd_path.exists():
return False, "/dev/snd is not available"
try:
device_names = [entry.name for entry in snd_path.iterdir()]
except PermissionError:
return False, "no permission to inspect /dev/snd"
pcm_like = [
name
for name in device_names
if name.startswith(("pcm", "controlC", "hwC", "midiC"))
]
if not pcm_like:
return False, "no ALSA pcm/control devices exposed under /dev/snd"
return True, None
def _detect_audio_support():
if _FORCE_AUDIO_BACKENDS:
return True, None
if _DISABLE_AUDIO_BACKENDS:
return False, "disabled via WAN2GP_DISABLE_AUDIO"
if sys.platform.startswith("linux"):
return _linux_audio_available()
return True, None
def _should_try_audio_backends():
global _audio_support_cache, _audio_support_reason
if _audio_backends_failed and not _FORCE_AUDIO_BACKENDS:
return False
if _audio_support_cache is None:
_audio_support_cache, _audio_support_reason = _detect_audio_support()
return _audio_support_cache
def _terminal_beep(reason=None):
global _last_beep_notice
message = "Audio notification fallback: using terminal beep."
if reason:
message = f"Audio notification fallback: {reason}; using terminal beep."
if message != _last_beep_notice:
print(message)
_last_beep_notice = message
sys.stdout.write("\a")
sys.stdout.flush()
def _generate_notification_beep(volume=50, sample_rate=_sample_rate):
"""Generate pleasant C major chord notification sound"""
if volume == 0:
return np.array([])
volume = max(0, min(100, volume))
# Volume curve mapping
if volume <= 25:
volume_mapped = (volume / 25.0) * 0.5
elif volume <= 50:
volume_mapped = 0.5 + ((volume - 25) / 25.0) * 0.25
elif volume <= 75:
volume_mapped = 0.75 + ((volume - 50) / 25.0) * 0.25
else:
volume_mapped = 1.0 + ((volume - 75) / 25.0) * 0.05
volume = volume_mapped
# C major chord frequencies
freq_c, freq_e, freq_g = 261.63, 329.63, 392.00
duration = 0.8
t = np.linspace(0, duration, int(sample_rate * duration), False)
# Generate chord components
wave = (
np.sin(freq_c * 2 * np.pi * t) * 0.4
+ np.sin(freq_e * 2 * np.pi * t) * 0.3
+ np.sin(freq_g * 2 * np.pi * t) * 0.2
)
# Normalize
max_amplitude = np.max(np.abs(wave))
if max_amplitude > 0:
wave = wave / max_amplitude * 0.8
# ADSR envelope
def apply_adsr_envelope(wave_data):
length = len(wave_data)
attack_time = int(0.2 * length)
decay_time = int(0.1 * length)
release_time = int(0.5 * length)
envelope = np.ones(length)
if attack_time > 0:
envelope[:attack_time] = np.power(np.linspace(0, 1, attack_time), 3)
if decay_time > 0:
start_idx, end_idx = attack_time, attack_time + decay_time
envelope[start_idx:end_idx] = np.linspace(1, 0.85, decay_time)
if release_time > 0:
start_idx = length - release_time
envelope[start_idx:] = 0.85 * np.exp(-4 * np.linspace(0, 1, release_time))
return wave_data * envelope
wave = apply_adsr_envelope(wave)
# Simple low-pass filter
def simple_lowpass_filter(signal, cutoff_ratio=0.8):
window_size = max(3, int(len(signal) * 0.001))
if window_size % 2 == 0:
window_size += 1
kernel = np.ones(window_size) / window_size
padded = np.pad(signal, window_size // 2, mode="edge")
filtered = np.convolve(padded, kernel, mode="same")
return filtered[window_size // 2 : -window_size // 2]
wave = simple_lowpass_filter(wave)
# Add reverb
if len(wave) > sample_rate // 4:
delay_samples = int(0.12 * sample_rate)
reverb = np.zeros_like(wave)
reverb[delay_samples:] = wave[:-delay_samples] * 0.08
wave = wave + reverb
# Apply volume & final normalize
wave = wave * volume * 0.5
max_amplitude = np.max(np.abs(wave))
if max_amplitude > 0.85:
wave = wave / max_amplitude * 0.85
return wave
def _get_cached_waveform(volume):
"""Return cached waveform for volume"""
if volume not in _cached_waveforms:
_cached_waveforms[volume] = _generate_notification_beep(volume)
return _cached_waveforms[volume]
def play_audio_with_pygame(audio_data, sample_rate=_sample_rate):
"""Play audio with pygame backend"""
global _mixer_initialized
try:
import pygame
with _mixer_lock:
if not _mixer_initialized:
pygame.mixer.pre_init(frequency=sample_rate, size=-16, channels=2, buffer=512)
pygame.mixer.init()
_mixer_initialized = True
mixer_info = pygame.mixer.get_init()
if mixer_info is None or mixer_info[2] != 2:
return False
audio_int16 = (audio_data * 32767).astype(np.int16)
if len(audio_int16.shape) > 1:
audio_int16 = audio_int16.flatten()
stereo_data = np.zeros((len(audio_int16), 2), dtype=np.int16)
stereo_data[:, 0] = audio_int16
stereo_data[:, 1] = audio_int16
sound = pygame.sndarray.make_sound(stereo_data)
pygame.mixer.stop()
sound.play()
duration_ms = int(len(audio_data) / sample_rate * 1000) + 50
pygame.time.wait(duration_ms)
return True
except ImportError:
return False
except Exception as e:
print(f"Pygame error: {e}")
return False
def play_audio_with_sounddevice(audio_data, sample_rate=_sample_rate):
"""Play audio using sounddevice backend"""
try:
import sounddevice as sd
sd.play(audio_data, sample_rate)
sd.wait()
return True
except ImportError:
return False
except Exception as e:
print(f"Sounddevice error: {e}")
return False
def play_audio_with_winsound(audio_data, sample_rate=_sample_rate):
"""Play audio using winsound backend (Windows only)"""
if sys.platform != "win32":
return False
try:
import winsound, wave, tempfile, uuid
temp_dir = tempfile.gettempdir()
temp_filename = os.path.join(temp_dir, f"notification_{uuid.uuid4().hex}.wav")
try:
with wave.open(temp_filename, "w") as wav_file:
wav_file.setnchannels(1)
wav_file.setsampwidth(2)
wav_file.setframerate(sample_rate)
audio_int16 = (audio_data * 32767).astype(np.int16)
wav_file.writeframes(audio_int16.tobytes())
winsound.PlaySound(temp_filename, winsound.SND_FILENAME)
finally:
try:
if os.path.exists(temp_filename):
os.unlink(temp_filename)
except:
pass
return True
except ImportError:
return False
except Exception as e:
print(f"Winsound error: {e}")
return False
def play_notification_sound(volume=50):
"""Play notification sound with specified volume"""
if volume == 0:
return
audio_data = _get_cached_waveform(volume)
if len(audio_data) == 0:
return
if not _should_try_audio_backends():
reason = _audio_support_reason or "audio backends unavailable"
_terminal_beep(reason)
return
audio_backends = [play_audio_with_pygame, play_audio_with_sounddevice, play_audio_with_winsound]
for backend in audio_backends:
try:
if backend(audio_data):
return
except Exception:
continue
global _audio_backends_failed
_audio_backends_failed = True
_terminal_beep("all audio backends failed")
def play_notification_async(volume=50):
"""Play notification sound asynchronously (non-blocking)"""
def play_sound():
try:
play_notification_sound(volume)
except Exception as e:
print(f"Error playing notification sound: {e}")
threading.Thread(target=play_sound, daemon=True).start()
def notify_video_completion(video_path=None, volume=50):
"""Notify about completed video generation"""
play_notification_async(volume)
for vol in (25, 50, 75, 100):
_get_cached_waveform(vol)