Spaces:
Sleeping
Sleeping
| """ | |
| Module: ml.spectral_analyzer | |
| Description: Spectral analysis using Fourier transforms for government transparency data | |
| Author: Anderson H. Silva | |
| Date: 2025-07-19 | |
| License: Proprietary - All rights reserved | |
| """ | |
| import numpy as np | |
| import pandas as pd | |
| from typing import Dict, List, Optional, Tuple, Any | |
| from dataclasses import dataclass | |
| from datetime import datetime, timedelta | |
| from scipy.fft import fft, fftfreq, ifft, rfft, rfftfreq | |
| from scipy.signal import find_peaks, welch, periodogram, spectrogram | |
| from scipy.stats import zscore | |
| import warnings | |
| warnings.filterwarnings('ignore') | |
| from src.core import get_logger | |
| logger = get_logger(__name__) | |
| class SpectralFeatures: | |
| """Spectral characteristics of a time series.""" | |
| dominant_frequencies: List[float] | |
| dominant_periods: List[float] | |
| spectral_entropy: float | |
| power_spectrum: np.ndarray | |
| frequencies: np.ndarray | |
| peak_frequencies: List[float] | |
| seasonal_components: Dict[str, float] | |
| anomaly_score: float | |
| trend_component: np.ndarray | |
| residual_component: np.ndarray | |
| class SpectralAnomaly: | |
| """Spectral anomaly detection result.""" | |
| timestamp: datetime | |
| anomaly_type: str | |
| severity: str # "low", "medium", "high", "critical" | |
| frequency_band: Tuple[float, float] | |
| anomaly_score: float | |
| description: str | |
| evidence: Dict[str, Any] | |
| recommendations: List[str] | |
| class PeriodicPattern: | |
| """Detected periodic pattern in spending data.""" | |
| period_days: float | |
| frequency_hz: float | |
| amplitude: float | |
| confidence: float | |
| pattern_type: str # "seasonal", "cyclical", "irregular", "suspicious" | |
| business_interpretation: str | |
| statistical_significance: float | |
| class SpectralAnalyzer: | |
| """ | |
| Advanced spectral analysis for government transparency data using Fourier transforms. | |
| Capabilities: | |
| - Seasonal pattern detection in public spending | |
| - Cyclical anomaly identification | |
| - Frequency-domain correlation analysis | |
| - Spectral anomaly detection | |
| - Periodic pattern classification | |
| - Cross-spectral analysis between entities | |
| """ | |
| def __init__( | |
| self, | |
| sampling_frequency: float = 1.0, # Daily sampling by default | |
| anomaly_threshold: float = 2.5, # Z-score threshold for anomalies | |
| min_period_days: int = 7, # Minimum period for pattern detection | |
| max_period_days: int = 365, # Maximum period for pattern detection | |
| ): | |
| """ | |
| Initialize the Spectral Analyzer. | |
| Args: | |
| sampling_frequency: Sampling frequency in Hz (1.0 = daily) | |
| anomaly_threshold: Z-score threshold for anomaly detection | |
| min_period_days: Minimum period in days for pattern detection | |
| max_period_days: Maximum period in days for pattern detection | |
| """ | |
| self.fs = sampling_frequency | |
| self.anomaly_threshold = anomaly_threshold | |
| self.min_period = min_period_days | |
| self.max_period = max_period_days | |
| self.logger = logger | |
| # Pre-computed frequency bands for Brazilian government patterns | |
| self.frequency_bands = { | |
| "daily": (1/1, 1/3), # 1-3 day cycles | |
| "weekly": (1/7, 1/10), # Weekly patterns | |
| "biweekly": (1/14, 1/21), # Bi-weekly patterns | |
| "monthly": (1/30, 1/45), # Monthly cycles | |
| "quarterly": (1/90, 1/120), # Quarterly patterns | |
| "semester": (1/180, 1/200), # Semester patterns | |
| "annual": (1/365, 1/400), # Annual cycles | |
| "suspicious": (1/2, 1/5) # Very high frequency (potentially manipulated) | |
| } | |
| def analyze_time_series( | |
| self, | |
| data: pd.Series, | |
| timestamps: Optional[pd.DatetimeIndex] = None | |
| ) -> SpectralFeatures: | |
| """ | |
| Perform comprehensive spectral analysis of a time series. | |
| Args: | |
| data: Time series data (spending amounts, contract counts, etc.) | |
| timestamps: Optional datetime index | |
| Returns: | |
| SpectralFeatures object with complete spectral characteristics | |
| """ | |
| try: | |
| # Prepare data | |
| if timestamps is None: | |
| timestamps = pd.date_range(start='2020-01-01', periods=len(data), freq='D') | |
| # Ensure data is numeric and handle missing values | |
| data_clean = self._preprocess_data(data) | |
| # Compute FFT | |
| fft_values = rfft(data_clean) | |
| frequencies = rfftfreq(len(data_clean), d=1/self.fs) | |
| # Power spectrum | |
| power_spectrum = np.abs(fft_values) ** 2 | |
| # Find dominant frequencies | |
| dominant_freqs, dominant_periods = self._find_dominant_frequencies( | |
| frequencies, power_spectrum | |
| ) | |
| # Calculate spectral entropy | |
| spectral_entropy = self._calculate_spectral_entropy(power_spectrum) | |
| # Find peaks in spectrum | |
| peak_frequencies = self._find_peak_frequencies(frequencies, power_spectrum) | |
| # Detect seasonal components | |
| seasonal_components = self._detect_seasonal_components( | |
| frequencies, power_spectrum | |
| ) | |
| # Decompose signal | |
| trend, residual = self._decompose_signal(data_clean) | |
| # Calculate anomaly score | |
| anomaly_score = self._calculate_spectral_anomaly_score( | |
| power_spectrum, frequencies | |
| ) | |
| return SpectralFeatures( | |
| dominant_frequencies=dominant_freqs, | |
| dominant_periods=dominant_periods, | |
| spectral_entropy=spectral_entropy, | |
| power_spectrum=power_spectrum, | |
| frequencies=frequencies, | |
| peak_frequencies=peak_frequencies, | |
| seasonal_components=seasonal_components, | |
| anomaly_score=anomaly_score, | |
| trend_component=trend, | |
| residual_component=residual | |
| ) | |
| except Exception as e: | |
| self.logger.error(f"Error in spectral analysis: {str(e)}") | |
| raise | |
| def detect_anomalies( | |
| self, | |
| data: pd.Series, | |
| timestamps: pd.DatetimeIndex, | |
| context: Optional[Dict[str, Any]] = None | |
| ) -> List[SpectralAnomaly]: | |
| """ | |
| Detect anomalies using spectral analysis techniques. | |
| Args: | |
| data: Time series data | |
| timestamps: Datetime index | |
| context: Additional context (entity name, spending category, etc.) | |
| Returns: | |
| List of detected spectral anomalies | |
| """ | |
| anomalies = [] | |
| try: | |
| # Get spectral features | |
| features = self.analyze_time_series(data, timestamps) | |
| # Anomaly 1: Unusual frequency peaks | |
| freq_anomalies = self._detect_frequency_anomalies(features) | |
| anomalies.extend(freq_anomalies) | |
| # Anomaly 2: Sudden spectral changes | |
| spectral_change_anomalies = self._detect_spectral_changes(data, timestamps) | |
| anomalies.extend(spectral_change_anomalies) | |
| # Anomaly 3: Suspicious periodic patterns | |
| suspicious_patterns = self._detect_suspicious_patterns(features, context) | |
| anomalies.extend(suspicious_patterns) | |
| # Anomaly 4: High-frequency noise (potential manipulation) | |
| noise_anomalies = self._detect_high_frequency_noise(features) | |
| anomalies.extend(noise_anomalies) | |
| # Sort by severity and timestamp | |
| anomalies.sort(key=lambda x: ( | |
| {"critical": 4, "high": 3, "medium": 2, "low": 1}[x.severity], | |
| x.timestamp | |
| ), reverse=True) | |
| return anomalies | |
| except Exception as e: | |
| self.logger.error(f"Error detecting spectral anomalies: {str(e)}") | |
| return [] | |
| def find_periodic_patterns( | |
| self, | |
| data: pd.Series, | |
| timestamps: pd.DatetimeIndex, | |
| entity_name: Optional[str] = None | |
| ) -> List[PeriodicPattern]: | |
| """ | |
| Find and classify periodic patterns in spending data. | |
| Args: | |
| data: Time series data | |
| timestamps: Datetime index | |
| entity_name: Name of the entity being analyzed | |
| Returns: | |
| List of detected periodic patterns | |
| """ | |
| patterns = [] | |
| try: | |
| features = self.analyze_time_series(data, timestamps) | |
| # Analyze each frequency band | |
| for band_name, (min_freq, max_freq) in self.frequency_bands.items(): | |
| pattern = self._analyze_frequency_band( | |
| features, band_name, min_freq, max_freq, entity_name | |
| ) | |
| if pattern: | |
| patterns.append(pattern) | |
| # Sort by amplitude (strongest patterns first) | |
| patterns.sort(key=lambda x: x.amplitude, reverse=True) | |
| return patterns | |
| except Exception as e: | |
| self.logger.error(f"Error finding periodic patterns: {str(e)}") | |
| return [] | |
| def cross_spectral_analysis( | |
| self, | |
| data1: pd.Series, | |
| data2: pd.Series, | |
| entity1_name: str, | |
| entity2_name: str, | |
| timestamps: Optional[pd.DatetimeIndex] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Perform cross-spectral analysis between two entities. | |
| Args: | |
| data1: First time series | |
| data2: Second time series | |
| entity1_name: Name of first entity | |
| entity2_name: Name of second entity | |
| timestamps: Datetime index | |
| Returns: | |
| Cross-spectral analysis results | |
| """ | |
| try: | |
| # Ensure same length | |
| min_len = min(len(data1), len(data2)) | |
| data1_clean = self._preprocess_data(data1[:min_len]) | |
| data2_clean = self._preprocess_data(data2[:min_len]) | |
| # Cross-power spectrum | |
| fft1 = rfft(data1_clean) | |
| fft2 = rfft(data2_clean) | |
| cross_spectrum = fft1 * np.conj(fft2) | |
| frequencies = rfftfreq(min_len, d=1/self.fs) | |
| # Coherence | |
| coherence = np.abs(cross_spectrum) ** 2 / ( | |
| (np.abs(fft1) ** 2) * (np.abs(fft2) ** 2) | |
| ) | |
| # Phase difference | |
| phase_diff = np.angle(cross_spectrum) | |
| # Find highly correlated frequency bands | |
| high_coherence_indices = np.where(coherence > 0.7)[0] | |
| correlated_frequencies = frequencies[high_coherence_indices] | |
| correlated_periods = 1 / correlated_frequencies[correlated_frequencies > 0] | |
| # Statistical significance | |
| correlation_coeff = np.corrcoef(data1_clean, data2_clean)[0, 1] | |
| return { | |
| "entities": [entity1_name, entity2_name], | |
| "correlation_coefficient": correlation_coeff, | |
| "coherence_spectrum": coherence, | |
| "phase_spectrum": phase_diff, | |
| "frequencies": frequencies, | |
| "correlated_frequencies": correlated_frequencies.tolist(), | |
| "correlated_periods_days": correlated_periods.tolist(), | |
| "max_coherence": np.max(coherence), | |
| "mean_coherence": np.mean(coherence), | |
| "synchronization_score": self._calculate_synchronization_score(coherence), | |
| "business_interpretation": self._interpret_cross_spectral_results( | |
| correlation_coeff, coherence, correlated_periods, | |
| entity1_name, entity2_name | |
| ) | |
| } | |
| except Exception as e: | |
| self.logger.error(f"Error in cross-spectral analysis: {str(e)}") | |
| return {} | |
| def _preprocess_data(self, data: pd.Series) -> np.ndarray: | |
| """Preprocess time series data for spectral analysis.""" | |
| # Convert to numeric and handle missing values | |
| data_numeric = pd.to_numeric(data, errors='coerce') | |
| # Fill missing values with interpolation | |
| data_filled = data_numeric.interpolate(method='linear') | |
| # Fill remaining NaN values with median | |
| data_filled = data_filled.fillna(data_filled.median()) | |
| # Remove trend (detrending) | |
| data_detrended = data_filled - data_filled.rolling(window=30, center=True).mean().fillna(data_filled.mean()) | |
| # Apply window function to reduce spectral leakage | |
| window = np.hanning(len(data_detrended)) | |
| data_windowed = data_detrended * window | |
| return data_windowed.values | |
| def _find_dominant_frequencies( | |
| self, | |
| frequencies: np.ndarray, | |
| power_spectrum: np.ndarray | |
| ) -> Tuple[List[float], List[float]]: | |
| """Find dominant frequencies in the power spectrum.""" | |
| # Find peaks in power spectrum | |
| peaks, properties = find_peaks( | |
| power_spectrum, | |
| height=np.mean(power_spectrum) + 2*np.std(power_spectrum), | |
| distance=5 | |
| ) | |
| # Get frequencies and periods for peaks | |
| dominant_freqs = frequencies[peaks].tolist() | |
| dominant_periods = [1/f if f > 0 else np.inf for f in dominant_freqs] | |
| # Sort by power (strongest first) | |
| peak_powers = power_spectrum[peaks] | |
| sorted_indices = np.argsort(peak_powers)[::-1] | |
| dominant_freqs = [dominant_freqs[i] for i in sorted_indices] | |
| dominant_periods = [dominant_periods[i] for i in sorted_indices] | |
| return dominant_freqs[:10], dominant_periods[:10] # Top 10 | |
| def _calculate_spectral_entropy(self, power_spectrum: np.ndarray) -> float: | |
| """Calculate spectral entropy as a measure of spectral complexity.""" | |
| # Normalize power spectrum | |
| normalized_spectrum = power_spectrum / np.sum(power_spectrum) | |
| # Avoid log(0) | |
| normalized_spectrum = normalized_spectrum[normalized_spectrum > 0] | |
| # Calculate entropy | |
| entropy = -np.sum(normalized_spectrum * np.log2(normalized_spectrum)) | |
| # Normalize by maximum possible entropy | |
| max_entropy = np.log2(len(normalized_spectrum)) | |
| return entropy / max_entropy if max_entropy > 0 else 0 | |
| def _find_peak_frequencies( | |
| self, | |
| frequencies: np.ndarray, | |
| power_spectrum: np.ndarray | |
| ) -> List[float]: | |
| """Find significant peak frequencies.""" | |
| # Use adaptive threshold | |
| threshold = np.mean(power_spectrum) + np.std(power_spectrum) | |
| peaks, _ = find_peaks(power_spectrum, height=threshold) | |
| peak_frequencies = frequencies[peaks] | |
| # Filter by relevant frequency range | |
| relevant_peaks = peak_frequencies[ | |
| (peak_frequencies >= 1/self.max_period) & | |
| (peak_frequencies <= 1/self.min_period) | |
| ] | |
| return relevant_peaks.tolist() | |
| def _detect_seasonal_components( | |
| self, | |
| frequencies: np.ndarray, | |
| power_spectrum: np.ndarray | |
| ) -> Dict[str, float]: | |
| """Detect seasonal components in the spectrum.""" | |
| seasonal_components = {} | |
| # Define seasonal frequencies (cycles per day) | |
| seasonal_freqs = { | |
| "weekly": 1/7, | |
| "monthly": 1/30, | |
| "quarterly": 1/91, | |
| "biannual": 1/182, | |
| "annual": 1/365 | |
| } | |
| for component, target_freq in seasonal_freqs.items(): | |
| # Find closest frequency in spectrum | |
| freq_idx = np.argmin(np.abs(frequencies - target_freq)) | |
| if freq_idx < len(power_spectrum): | |
| # Calculate relative power in this component | |
| window_size = max(1, len(frequencies) // 50) | |
| start_idx = max(0, freq_idx - window_size//2) | |
| end_idx = min(len(power_spectrum), freq_idx + window_size//2) | |
| component_power = np.mean(power_spectrum[start_idx:end_idx]) | |
| total_power = np.mean(power_spectrum) | |
| seasonal_components[component] = component_power / total_power if total_power > 0 else 0 | |
| return seasonal_components | |
| def _decompose_signal(self, data: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: | |
| """Decompose signal into trend and residual components.""" | |
| # Simple trend extraction using moving average | |
| window_size = min(30, len(data) // 4) | |
| trend = np.convolve(data, np.ones(window_size)/window_size, mode='same') | |
| # Residual after removing trend | |
| residual = data - trend | |
| return trend, residual | |
| def _calculate_spectral_anomaly_score( | |
| self, | |
| power_spectrum: np.ndarray, | |
| frequencies: np.ndarray | |
| ) -> float: | |
| """Calculate overall anomaly score based on spectral characteristics.""" | |
| # Factor 1: Spectral entropy (lower entropy = more anomalous) | |
| entropy = self._calculate_spectral_entropy(power_spectrum) | |
| entropy_score = 1 - entropy # Invert so higher = more anomalous | |
| # Factor 2: High-frequency content | |
| high_freq_mask = frequencies > 1/self.min_period | |
| high_freq_power = np.sum(power_spectrum[high_freq_mask]) | |
| total_power = np.sum(power_spectrum) | |
| high_freq_ratio = high_freq_power / total_power if total_power > 0 else 0 | |
| # Factor 3: Peak concentration | |
| peak_indices, _ = find_peaks(power_spectrum) | |
| if len(peak_indices) > 0: | |
| peak_concentration = np.sum(power_spectrum[peak_indices]) / total_power | |
| else: | |
| peak_concentration = 0 | |
| # Combine factors | |
| anomaly_score = ( | |
| 0.4 * entropy_score + | |
| 0.3 * high_freq_ratio + | |
| 0.3 * peak_concentration | |
| ) | |
| return min(anomaly_score, 1.0) | |
| def _detect_frequency_anomalies(self, features: SpectralFeatures) -> List[SpectralAnomaly]: | |
| """Detect anomalies in frequency domain.""" | |
| anomalies = [] | |
| # Check for unusual dominant frequencies | |
| for freq in features.dominant_frequencies: | |
| if freq > 0: | |
| period_days = 1 / freq | |
| # Very short periods might indicate manipulation | |
| if period_days < 3: | |
| anomalies.append(SpectralAnomaly( | |
| timestamp=datetime.now(), | |
| anomaly_type="high_frequency_pattern", | |
| severity="high", | |
| frequency_band=(freq * 0.9, freq * 1.1), | |
| anomaly_score=0.8, | |
| description=f"Suspicious high-frequency pattern detected (period: {period_days:.1f} days)", | |
| evidence={"frequency_hz": freq, "period_days": period_days}, | |
| recommendations=[ | |
| "Investigate potential data manipulation", | |
| "Check for automated/systematic processes", | |
| "Verify data source integrity" | |
| ] | |
| )) | |
| return anomalies | |
| def _detect_spectral_changes( | |
| self, | |
| data: pd.Series, | |
| timestamps: pd.DatetimeIndex | |
| ) -> List[SpectralAnomaly]: | |
| """Detect sudden changes in spectral characteristics.""" | |
| anomalies = [] | |
| if len(data) < 60: # Need sufficient data | |
| return anomalies | |
| # Split data into segments | |
| segment_size = len(data) // 4 | |
| segments = [data[i:i+segment_size] for i in range(0, len(data)-segment_size, segment_size)] | |
| # Compare spectral entropy between segments | |
| entropies = [] | |
| for segment in segments: | |
| if len(segment) > 10: | |
| features = self.analyze_time_series(segment) | |
| entropies.append(features.spectral_entropy) | |
| if len(entropies) > 1: | |
| entropy_changes = np.diff(entropies) | |
| # Detect significant changes | |
| for i, change in enumerate(entropy_changes): | |
| if abs(change) > 0.3: # Significant spectral change | |
| timestamp = timestamps[i * segment_size] if i * segment_size < len(timestamps) else datetime.now() | |
| anomalies.append(SpectralAnomaly( | |
| timestamp=timestamp, | |
| anomaly_type="spectral_regime_change", | |
| severity="medium", | |
| frequency_band=(0, 0.5), | |
| anomaly_score=abs(change), | |
| description=f"Significant change in spending pattern complexity detected", | |
| evidence={"entropy_change": change, "segment": i}, | |
| recommendations=[ | |
| "Investigate policy or procedural changes", | |
| "Check for organizational restructuring", | |
| "Verify data consistency" | |
| ] | |
| )) | |
| return anomalies | |
| def _detect_suspicious_patterns( | |
| self, | |
| features: SpectralFeatures, | |
| context: Optional[Dict[str, Any]] | |
| ) -> List[SpectralAnomaly]: | |
| """Detect patterns that might indicate irregular activities.""" | |
| anomalies = [] | |
| # Check seasonal components for anomalies | |
| seasonal = features.seasonal_components | |
| # Excessive quarterly activity might indicate budget manipulation | |
| if seasonal.get("quarterly", 0) > 0.4: | |
| anomalies.append(SpectralAnomaly( | |
| timestamp=datetime.now(), | |
| anomaly_type="excessive_quarterly_pattern", | |
| severity="medium", | |
| frequency_band=(1/120, 1/60), | |
| anomaly_score=seasonal["quarterly"], | |
| description="Excessive quarterly spending pattern detected", | |
| evidence={"quarterly_component": seasonal["quarterly"]}, | |
| recommendations=[ | |
| "Investigate budget execution practices", | |
| "Check for end-of-quarter rushing", | |
| "Review budget planning processes" | |
| ] | |
| )) | |
| # Very regular weekly patterns in government spending might be suspicious | |
| if seasonal.get("weekly", 0) > 0.3: | |
| anomalies.append(SpectralAnomaly( | |
| timestamp=datetime.now(), | |
| anomaly_type="unusual_weekly_regularity", | |
| severity="low", | |
| frequency_band=(1/10, 1/5), | |
| anomaly_score=seasonal["weekly"], | |
| description="Unusually regular weekly spending pattern", | |
| evidence={"weekly_component": seasonal["weekly"]}, | |
| recommendations=[ | |
| "Verify if pattern matches business processes", | |
| "Check for automated payments", | |
| "Review spending authorization patterns" | |
| ] | |
| )) | |
| return anomalies | |
| def _detect_high_frequency_noise(self, features: SpectralFeatures) -> List[SpectralAnomaly]: | |
| """Detect high-frequency noise that might indicate data manipulation.""" | |
| anomalies = [] | |
| # Check power in high-frequency band | |
| high_freq_mask = features.frequencies > 0.2 # > 5 day period | |
| high_freq_power = np.sum(features.power_spectrum[high_freq_mask]) | |
| total_power = np.sum(features.power_spectrum) | |
| high_freq_ratio = high_freq_power / total_power if total_power > 0 else 0 | |
| if high_freq_ratio > 0.3: # More than 30% power in high frequencies | |
| anomalies.append(SpectralAnomaly( | |
| timestamp=datetime.now(), | |
| anomaly_type="high_frequency_noise", | |
| severity="medium", | |
| frequency_band=(0.2, np.max(features.frequencies)), | |
| anomaly_score=high_freq_ratio, | |
| description="High-frequency noise detected in spending data", | |
| evidence={"high_freq_ratio": high_freq_ratio}, | |
| recommendations=[ | |
| "Check data collection processes", | |
| "Investigate potential data manipulation", | |
| "Verify data source reliability" | |
| ] | |
| )) | |
| return anomalies | |
| def _analyze_frequency_band( | |
| self, | |
| features: SpectralFeatures, | |
| band_name: str, | |
| min_freq: float, | |
| max_freq: float, | |
| entity_name: Optional[str] | |
| ) -> Optional[PeriodicPattern]: | |
| """Analyze specific frequency band for patterns.""" | |
| # Find frequencies in this band | |
| mask = (features.frequencies >= min_freq) & (features.frequencies <= max_freq) | |
| if not np.any(mask): | |
| return None | |
| band_power = features.power_spectrum[mask] | |
| band_frequencies = features.frequencies[mask] | |
| if len(band_power) == 0: | |
| return None | |
| # Find peak in this band | |
| max_idx = np.argmax(band_power) | |
| peak_frequency = band_frequencies[max_idx] | |
| peak_power = band_power[max_idx] | |
| # Calculate relative amplitude | |
| total_power = np.sum(features.power_spectrum) | |
| relative_amplitude = peak_power / total_power if total_power > 0 else 0 | |
| # Skip if amplitude is too low | |
| if relative_amplitude < 0.05: | |
| return None | |
| # Calculate confidence based on peak prominence | |
| mean_power = np.mean(band_power) | |
| confidence = (peak_power - mean_power) / mean_power if mean_power > 0 else 0 | |
| confidence = min(confidence / 3, 1.0) # Normalize | |
| # Determine pattern type and business interpretation | |
| period_days = 1 / peak_frequency if peak_frequency > 0 else 0 | |
| pattern_type = self._classify_pattern_type(band_name, period_days, relative_amplitude) | |
| business_interpretation = self._interpret_pattern( | |
| band_name, period_days, relative_amplitude, entity_name | |
| ) | |
| return PeriodicPattern( | |
| period_days=period_days, | |
| frequency_hz=peak_frequency, | |
| amplitude=relative_amplitude, | |
| confidence=confidence, | |
| pattern_type=pattern_type, | |
| business_interpretation=business_interpretation, | |
| statistical_significance=confidence | |
| ) | |
| def _classify_pattern_type( | |
| self, | |
| band_name: str, | |
| period_days: float, | |
| amplitude: float | |
| ) -> str: | |
| """Classify the type of periodic pattern.""" | |
| if band_name in ["weekly", "monthly", "quarterly", "annual"]: | |
| if amplitude > 0.2: | |
| return "seasonal" | |
| else: | |
| return "cyclical" | |
| elif band_name == "suspicious" or period_days < 3: | |
| return "suspicious" | |
| else: | |
| return "irregular" | |
| def _interpret_pattern( | |
| self, | |
| band_name: str, | |
| period_days: float, | |
| amplitude: float, | |
| entity_name: Optional[str] | |
| ) -> str: | |
| """Provide business interpretation of detected pattern.""" | |
| entity_str = f" for {entity_name}" if entity_name else "" | |
| interpretations = { | |
| "weekly": f"Weekly spending cycle detected{entity_str} (period: {period_days:.1f} days, strength: {amplitude:.1%})", | |
| "monthly": f"Monthly budget cycle identified{entity_str} (period: {period_days:.1f} days, strength: {amplitude:.1%})", | |
| "quarterly": f"Quarterly spending pattern found{entity_str} (period: {period_days:.1f} days, strength: {amplitude:.1%})", | |
| "annual": f"Annual budget cycle detected{entity_str} (period: {period_days:.1f} days, strength: {amplitude:.1%})", | |
| "suspicious": f"Potentially suspicious high-frequency pattern{entity_str} (period: {period_days:.1f} days)" | |
| } | |
| return interpretations.get(band_name, f"Periodic pattern detected{entity_str} (period: {period_days:.1f} days)") | |
| def _calculate_synchronization_score(self, coherence: np.ndarray) -> float: | |
| """Calculate synchronization score between two entities.""" | |
| # Weight higher frequencies less (focus on meaningful business cycles) | |
| weights = np.exp(-np.linspace(0, 5, len(coherence))) | |
| weighted_coherence = coherence * weights | |
| return np.mean(weighted_coherence) | |
| def _interpret_cross_spectral_results( | |
| self, | |
| correlation: float, | |
| coherence: np.ndarray, | |
| correlated_periods: List[float], | |
| entity1: str, | |
| entity2: str | |
| ) -> str: | |
| """Interpret cross-spectral analysis results.""" | |
| if correlation > 0.7: | |
| correlation_strength = "strong" | |
| elif correlation > 0.4: | |
| correlation_strength = "moderate" | |
| else: | |
| correlation_strength = "weak" | |
| interpretation = f"{correlation_strength.capitalize()} correlation detected between {entity1} and {entity2} (r={correlation:.3f}). " | |
| if len(correlated_periods) > 0: | |
| main_periods = [p for p in correlated_periods if 7 <= p <= 365] # Focus on business-relevant periods | |
| if main_periods: | |
| interpretation += f"Synchronized patterns found at periods: {', '.join([f'{p:.0f} days' for p in main_periods[:3]])}." | |
| max_coherence = np.max(coherence) | |
| if max_coherence > 0.8: | |
| interpretation += " High spectral coherence suggests systematic coordination or shared external factors." | |
| elif max_coherence > 0.6: | |
| interpretation += " Moderate spectral coherence indicates some shared patterns or influences." | |
| return interpretation |