| import numpy as np |
| from scipy.fft import rfft, rfftfreq |
| from scipy.signal import find_peaks |
| from typing import Dict, List, Optional, Any, Tuple |
|
|
| def analyze_cognitive_signal( |
| state_deltas: np.ndarray, |
| sampling_rate: float = 1.0, |
| num_peaks: int = 3 |
| ) -> Dict[str, Any]: |
| """ |
| Führt eine polyrhythmische Spektralanalyse mit einer robusten, |
| zweistufigen Schwellenwert-Methode durch, um die Signifikanz von Frequenz-Peaks zu gewährleisten. |
| """ |
| analysis_results: Dict[str, Any] = { |
| "dominant_periods_steps": None, |
| "spectral_entropy": None, |
| } |
|
|
| |
| if len(state_deltas) < 20: |
| return analysis_results |
|
|
| n = len(state_deltas) |
| |
| yf = rfft(state_deltas - np.mean(state_deltas)) |
| xf = rfftfreq(n, 1 / sampling_rate) |
|
|
| power_spectrum = np.abs(yf)**2 |
|
|
| |
| spectral_entropy: Optional[float] = None |
| if np.sum(power_spectrum) > 1e-9: |
| prob_dist = power_spectrum / np.sum(power_spectrum) |
| |
| prob_dist = prob_dist[prob_dist > 1e-12] |
| spectral_entropy = -np.sum(prob_dist * np.log2(prob_dist)) |
| analysis_results["spectral_entropy"] = float(spectral_entropy) |
|
|
| |
| if len(power_spectrum) > 1: |
| |
| min_height = np.median(power_spectrum) + np.std(power_spectrum) |
| |
| min_prominence = np.std(power_spectrum) * 0.5 |
| else: |
| min_height, min_prominence = 1.0, 1.0 |
|
|
| |
| peaks, properties = find_peaks(power_spectrum[1:], height=min_height, prominence=min_prominence) |
|
|
| if peaks.size > 0 and "peak_heights" in properties: |
| |
| sorted_peak_indices = peaks[np.argsort(properties["peak_heights"])[::-1]] |
|
|
| dominant_periods = [] |
| for i in range(min(num_peaks, len(sorted_peak_indices))): |
| peak_index = sorted_peak_indices[i] |
| frequency = xf[peak_index + 1] |
| if frequency > 1e-9: |
| period = 1 / frequency |
| dominant_periods.append(round(period, 2)) |
|
|
| if dominant_periods: |
| analysis_results["dominant_periods_steps"] = dominant_periods |
|
|
| return analysis_results |
|
|
| def get_power_spectrum_for_plotting(state_deltas: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: |
| """ |
| Berechnet das Leistungsspektrum und gibt Frequenzen und Power als separate Arrays zurück, |
| die direkt für die Visualisierung verwendet werden können. |
| """ |
| if len(state_deltas) < 10: |
| return np.array([]), np.array([]) |
|
|
| n = len(state_deltas) |
| yf = rfft(state_deltas - np.mean(state_deltas)) |
| xf = rfftfreq(n, 1.0) |
|
|
| power_spectrum = np.abs(yf)**2 |
| return xf, power_spectrum |