neuralworm's picture
Update cognitive_mapping_probe/signal_analysis.py
75e847d verified
import numpy as np
from scipy.fft import rfft, rfftfreq
from scipy.signal import find_peaks
from typing import Dict, List, Optional, Any, Tuple
def analyze_cognitive_signal(
state_deltas: np.ndarray,
sampling_rate: float = 1.0,
num_peaks: int = 3
) -> Dict[str, Any]:
"""
Führt eine polyrhythmische Spektralanalyse mit einer robusten,
zweistufigen Schwellenwert-Methode durch, um die Signifikanz von Frequenz-Peaks zu gewährleisten.
"""
analysis_results: Dict[str, Any] = {
"dominant_periods_steps": None,
"spectral_entropy": None,
}
# Eine robuste Analyse erfordert eine minimale Signallänge.
if len(state_deltas) < 20:
return analysis_results
n = len(state_deltas)
# Entferne den DC-Anteil (Mittelwert) für eine saubere Frequenzanalyse.
yf = rfft(state_deltas - np.mean(state_deltas))
xf = rfftfreq(n, 1 / sampling_rate)
power_spectrum = np.abs(yf)**2
# Berechnung der spektralen Entropie zur Messung der Signal-Komplexität.
spectral_entropy: Optional[float] = None
if np.sum(power_spectrum) > 1e-9:
prob_dist = power_spectrum / np.sum(power_spectrum)
# Ignoriere numerisch instabile Nullen.
prob_dist = prob_dist[prob_dist > 1e-12]
spectral_entropy = -np.sum(prob_dist * np.log2(prob_dist))
analysis_results["spectral_entropy"] = float(spectral_entropy)
# Robuste, zweistufige Schwellenwert-Bestimmung für die Peak-Detektion.
if len(power_spectrum) > 1:
# 1. Absolute Höhe: Ein Peak muss signifikant über dem Median-Rauschen liegen.
min_height = np.median(power_spectrum) + np.std(power_spectrum)
# 2. Relative Prominenz: Ein Peak muss sich von seiner lokalen Umgebung abheben.
min_prominence = np.std(power_spectrum) * 0.5
else:
min_height, min_prominence = 1.0, 1.0
# Finde Peaks im Spektrum (ignoriere die Frequenz 0).
peaks, properties = find_peaks(power_spectrum[1:], height=min_height, prominence=min_prominence)
if peaks.size > 0 and "peak_heights" in properties:
# Sortiere die gefundenen Peaks nach ihrer Höhe (Power).
sorted_peak_indices = peaks[np.argsort(properties["peak_heights"])[::-1]]
dominant_periods = []
for i in range(min(num_peaks, len(sorted_peak_indices))):
peak_index = sorted_peak_indices[i]
frequency = xf[peak_index + 1] # +1, da wir den DC-Offset ignoriert haben.
if frequency > 1e-9:
period = 1 / frequency
dominant_periods.append(round(period, 2))
if dominant_periods:
analysis_results["dominant_periods_steps"] = dominant_periods
return analysis_results
def get_power_spectrum_for_plotting(state_deltas: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""
Berechnet das Leistungsspektrum und gibt Frequenzen und Power als separate Arrays zurück,
die direkt für die Visualisierung verwendet werden können.
"""
if len(state_deltas) < 10:
return np.array([]), np.array([])
n = len(state_deltas)
yf = rfft(state_deltas - np.mean(state_deltas))
xf = rfftfreq(n, 1.0)
power_spectrum = np.abs(yf)**2
return xf, power_spectrum