Spaces:
Sleeping
Sleeping
| """ | |
| Sound Management System for EEG Motor Imagery Classification (Clean Transition Version) | |
| ------------------------------------------------------------------------------- | |
| Handles sound mapping, layering, and music composition based on motor imagery predictions. | |
| Supports seamless transition from building (layering) to DJ (effects) phase. | |
| """ | |
| import numpy as np | |
| import soundfile as sf | |
| import os | |
| from typing import Dict, Optional, List | |
| from pathlib import Path | |
| class AudioEffectsProcessor: | |
| def apply_high_pass_filter(data: np.ndarray, samplerate: int, cutoff: float = 800.0) -> np.ndarray: | |
| from scipy import signal | |
| nyquist = samplerate / 2 | |
| normalized_cutoff = cutoff / nyquist | |
| b, a = signal.butter(4, normalized_cutoff, btype='high', analog=False) | |
| return signal.filtfilt(b, a, data) | |
| def apply_low_pass_filter(data: np.ndarray, samplerate: int, cutoff: float = 1200.0) -> np.ndarray: | |
| from scipy import signal | |
| nyquist = samplerate / 2 | |
| normalized_cutoff = cutoff / nyquist | |
| b, a = signal.butter(4, normalized_cutoff, btype='low', analog=False) | |
| return signal.filtfilt(b, a, data) | |
| def apply_reverb(data: np.ndarray, samplerate: int, room_size: float = 0.5) -> np.ndarray: | |
| delay_samples = int(0.08 * samplerate) | |
| decay = 0.4 * room_size | |
| reverb_data = np.copy(data) | |
| for i in range(3): | |
| delay = delay_samples * (i + 1) | |
| if delay < len(data): | |
| gain = decay ** (i + 1) | |
| reverb_data[delay:] += data[:-delay] * gain | |
| return 0.7 * data + 0.3 * reverb_data | |
| def apply_echo(data: np.ndarray, samplerate: int, delay_time: float = 0.3, feedback: float = 0.4) -> np.ndarray: | |
| delay_samples = int(delay_time * samplerate) | |
| echo_data = np.copy(data) | |
| for i in range(delay_samples, len(data)): | |
| echo_data[i] += feedback * echo_data[i - delay_samples] | |
| return 0.7 * data + 0.3 * echo_data | |
| def apply_compressor(data: np.ndarray, samplerate: int, threshold: float = 0.2, ratio: float = 4.0) -> np.ndarray: | |
| # Simple compressor: reduce gain above threshold | |
| compressed = np.copy(data) | |
| over_threshold = np.abs(compressed) > threshold | |
| compressed[over_threshold] = np.sign(compressed[over_threshold]) * (threshold + (np.abs(compressed[over_threshold]) - threshold) / ratio) | |
| return compressed | |
| def process_layer_with_effects(audio_data: np.ndarray, samplerate: int, movement: str, active_effects: Dict[str, bool]) -> np.ndarray: | |
| processed_data = np.copy(audio_data) | |
| effect_map = { | |
| "left_hand": AudioEffectsProcessor.apply_echo, # Echo | |
| "right_hand": AudioEffectsProcessor.apply_low_pass_filter, # Low Pass | |
| "left_leg": AudioEffectsProcessor.apply_compressor, # Compressor | |
| "right_leg": AudioEffectsProcessor.apply_high_pass_filter, # High Pass | |
| } | |
| effect_func = effect_map.get(movement) | |
| if active_effects.get(movement, False) and effect_func: | |
| processed_data = effect_func(processed_data, samplerate) | |
| return processed_data | |
| class SoundManager: | |
| def __init__(self, sound_dir: str = "sounds"): | |
| self.available_sounds = [ | |
| "SoundHelix-Song-4_bass.wav", | |
| "SoundHelix-Song-4_drums.wav", | |
| "SoundHelix-Song-4_instruments.wav", | |
| "SoundHelix-Song-4_vocals.wav" | |
| ] | |
| self.sound_dir = Path(sound_dir) | |
| self.current_cycle = 0 | |
| self.current_step = 0 | |
| self.cycle_complete = False | |
| self.completed_cycles = 0 | |
| self.max_cycles = 2 | |
| self.composition_layers = {} | |
| self.current_phase = "building" | |
| self.active_effects = {m: False for m in ["left_hand", "right_hand", "left_leg", "right_leg"]} | |
| self.active_movements = ["left_hand", "right_hand", "left_leg", "right_leg"] | |
| self.current_movement_sequence = [] | |
| self.movements_completed = set() | |
| self.active_layers: Dict[str, str] = {} | |
| self.loaded_sounds = {} | |
| self._generate_new_sequence() | |
| self._load_sound_files() | |
| # Provide mapping from movement to sound file name for compatibility | |
| self.current_sound_mapping = {m: f for m, f in zip(self.active_movements, self.available_sounds)} | |
| # Track DJ effect trigger counts for each movement | |
| self.dj_effect_counters = {m: 0 for m in self.active_movements} | |
| self.cycle_stats = {'total_cycles': 0, 'successful_classifications': 0, 'total_attempts': 0} | |
| def _load_sound_files(self): | |
| self.loaded_sounds = {} | |
| for movement, filename in self.current_sound_mapping.items(): | |
| file_path = self.sound_dir / filename | |
| if file_path.exists(): | |
| data, sample_rate = sf.read(str(file_path)) | |
| if len(data.shape) > 1: | |
| data = np.mean(data, axis=1) | |
| self.loaded_sounds[movement] = {'data': data, 'sample_rate': sample_rate, 'sound_file': str(file_path)} | |
| def _generate_new_sequence(self): | |
| # Fixed movement order and mapping | |
| self.current_movement_sequence = ["left_hand", "right_hand", "left_leg", "right_leg"] | |
| self.current_sound_mapping = { | |
| "left_hand": "SoundHelix-Song-4_instruments.wav", | |
| "right_hand": "SoundHelix-Song-4_bass.wav", | |
| "left_leg": "SoundHelix-Song-4_drums.wav", | |
| "right_leg": "SoundHelix-Song-4_vocals.wav" | |
| } | |
| print(f"DEBUG: Fixed sound mapping for this cycle: {self.current_sound_mapping}") | |
| self.movements_completed = set() | |
| self.current_step = 0 | |
| self._load_sound_files() | |
| def get_current_target_movement(self) -> str: | |
| # Randomly select a movement from those not yet completed | |
| import random | |
| incomplete = [m for m in self.active_movements if m not in self.movements_completed] | |
| if not incomplete: | |
| print("DEBUG: All movements completed, cycle complete.") | |
| return "cycle_complete" | |
| movement = random.choice(incomplete) | |
| print(f"DEBUG: Next target is {movement}, completed: {self.movements_completed}") | |
| return movement | |
| def process_classification(self, predicted_class: str, confidence: float, threshold: float = 0.7, force_add: bool = False) -> Dict: | |
| result = {'sound_added': False, 'cycle_complete': False, 'audio_file': None} | |
| # If force_add is True, allow adding sound for any valid movement not already completed | |
| if force_add: | |
| if ( | |
| confidence >= threshold and | |
| predicted_class in self.loaded_sounds and | |
| predicted_class not in self.composition_layers | |
| ): | |
| print(f"DEBUG: [FORCE] Adding sound for {predicted_class}") | |
| sound_info = dict(self.loaded_sounds[predicted_class]) | |
| sound_info['confidence'] = confidence | |
| self.composition_layers[predicted_class] = sound_info | |
| self.movements_completed.add(predicted_class) | |
| result['sound_added'] = True | |
| else: | |
| print("DEBUG: [FORCE] Not adding sound. Condition failed.") | |
| else: | |
| current_target = self.get_current_target_movement() | |
| print(f"DEBUG: process_classification: predicted={predicted_class}, target={current_target}, confidence={confidence}, completed={self.movements_completed}") | |
| if ( | |
| predicted_class == current_target and | |
| confidence >= threshold and | |
| predicted_class in self.loaded_sounds and | |
| predicted_class not in self.composition_layers | |
| ): | |
| print(f"DEBUG: Adding sound for {predicted_class} (target={current_target})") | |
| sound_info = dict(self.loaded_sounds[predicted_class]) | |
| sound_info['confidence'] = confidence | |
| self.composition_layers[predicted_class] = sound_info | |
| self.movements_completed.add(predicted_class) | |
| result['sound_added'] = True | |
| else: | |
| print("DEBUG: Not adding sound. Condition failed.") | |
| if len(self.movements_completed) >= len(self.active_movements): | |
| result['cycle_complete'] = True | |
| self.current_phase = "dj_effects" | |
| return result | |
| def start_new_cycle(self): | |
| self.current_cycle += 1 | |
| self.current_step = 0 | |
| self.cycle_complete = False | |
| self.cycle_stats['total_cycles'] += 1 | |
| self._generate_new_sequence() | |
| self.composition_layers = {} # Clear layers for new cycle | |
| self.movements_completed = set() | |
| self.current_phase = "building" | |
| self.active_layers = {} | |
| def transition_to_dj_phase(self): | |
| if len(self.composition_layers) >= len(self.active_movements): | |
| self.current_phase = "dj_effects" | |
| return True | |
| return False | |
| def toggle_dj_effect(self, movement: str, brief: bool = True, duration: float = 1.0) -> dict: | |
| import threading | |
| if self.current_phase != "dj_effects": | |
| return {"effect_applied": False, "message": "Not in DJ effects phase"} | |
| if movement not in self.active_effects: | |
| return {"effect_applied": False, "message": f"Unknown movement: {movement}"} | |
| # Only toggle effect every 4th time this movement is detected | |
| self.dj_effect_counters[movement] += 1 | |
| if self.dj_effect_counters[movement] % 4 != 0: | |
| print(f"🎛️ {movement}: Skipped effect toggle (count={self.dj_effect_counters[movement]})") | |
| return {"effect_applied": False, "message": f"Effect for {movement} only toggled every 4th time (count={self.dj_effect_counters[movement]})"} | |
| # Toggle effect ON | |
| self.active_effects[movement] = True | |
| effect_status = "ON" | |
| print(f"🎛️ {movement}: {effect_status} (brief={brief}) [count={self.dj_effect_counters[movement]}]") | |
| # Schedule effect OFF after duration if brief | |
| def turn_off_effect(): | |
| self.active_effects[movement] = False | |
| print(f"🎛️ {movement}: OFF (auto)") | |
| if brief: | |
| timer = threading.Timer(duration, turn_off_effect) | |
| timer.daemon = True | |
| timer.start() | |
| return {"effect_applied": True, "effect_name": movement, "effect_status": effect_status, "count": self.dj_effect_counters[movement]} | |
| def get_composition_info(self) -> Dict: | |
| layers_by_cycle = {0: []} | |
| for movement, layer_info in self.composition_layers.items(): | |
| confidence = layer_info.get('confidence', 0) if isinstance(layer_info, dict) else 0 | |
| layers_by_cycle[0].append({'movement': movement, 'confidence': confidence}) | |
| # Add DJ effect status for each movement | |
| dj_effects_status = {m: self.active_effects.get(m, False) for m in self.active_movements} | |
| return {'layers_by_cycle': layers_by_cycle, 'dj_effects_status': dj_effects_status} | |
| def get_sound_mapping_options(self) -> Dict: | |
| return { | |
| 'movements': self.active_movements, | |
| 'available_sounds': self.available_sounds, | |
| 'current_mapping': {m: self.loaded_sounds[m]['sound_file'] for m in self.loaded_sounds} | |
| } | |
| def get_all_layers(self): | |
| return {m: info['sound_file'] for m, info in self.composition_layers.items() if 'sound_file' in info} | |