import gradio as gr import yfinance as yf import numpy as np import pandas as pd import json from scipy import signal from scipy.fft import fft, fftfreq from scipy.stats import norm, pearsonr import warnings from datetime import datetime, timedelta warnings.filterwarnings('ignore') class SpectralStockAnalyzer: def __init__(self): self.data = None self.filtered_prices = None self.spectrum = None self.frequencies = None self.momentum_signal = None self.optimized_params = {} self.market_regime = None self.volatility = None def fetch_data(self, symbol, period="1y"): try: symbol = symbol.strip().upper() ticker = yf.Ticker(symbol) self.data = ticker.history(period=period) if len(self.data) < 60: return False, f"Insufficient data: Only {len(self.data)} days found for {symbol}" return True, f"{symbol} data successfully fetched: {len(self.data)} days" except Exception as e: return False, f"Data fetch error: {str(e)}" def calculate_volatility(self, window=20): returns = self.data['Close'].pct_change() self.volatility = returns.rolling(window=window).std() * np.sqrt(252) first_valid = self.volatility.first_valid_index() if first_valid is not None: first_vol = self.volatility.loc[first_valid] self.volatility = self.volatility.fillna(first_vol) else: self.volatility = pd.Series(0.2, index=self.data.index) return self.volatility.iloc[-1] def detect_market_regime(self, lookback=50): high = self.data['High'] low = self.data['Low'] close = self.data['Close'] tr1 = high - low tr2 = abs(high - close.shift()) tr3 = abs(low - close.shift()) tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1) up_move = high - high.shift() down_move = low.shift() - low plus_dm = np.where((up_move > down_move) & (up_move > 0), up_move, 0) minus_dm = np.where((down_move > up_move) & (down_move > 0), down_move, 0) atr = tr.rolling(window=lookback).mean() plus_di = pd.Series(plus_dm).rolling(window=lookback).mean() / atr * 100 minus_di = pd.Series(minus_dm).rolling(window=lookback).mean() / atr * 100 dx = abs(plus_di - minus_di) / (plus_di + minus_di) * 100 adx = dx.rolling(window=lookback).mean() adx = adx.fillna(20) current_adx = adx.iloc[-1] if current_adx > 25: self.market_regime = "Trending" elif current_adx < 20: self.market_regime = "Range-Bound" else: self.market_regime = "Transitional" return self.market_regime def optimize_parameters(self): vol = self.calculate_volatility() regime = self.detect_market_regime() base_cutoff = 0.1 base_order = 4 base_momentum_threshold = -0.3 base_price_threshold = -0.05 if vol > 0.3: cutoff_freq = base_cutoff * 0.7 filter_order = base_order + 1 elif vol < 0.15: cutoff_freq = base_cutoff * 1.3 filter_order = base_order - 1 else: cutoff_freq = base_cutoff filter_order = base_order if regime == "Trending": momentum_threshold = base_momentum_threshold * 0.8 price_threshold = base_price_threshold * 0.8 elif regime == "Range-Bound": momentum_threshold = base_momentum_threshold * 1.2 price_threshold = base_price_threshold * 1.2 else: momentum_threshold = base_momentum_threshold price_threshold = base_price_threshold self.optimized_params = { 'cutoff_freq': cutoff_freq, 'filter_order': max(2, min(6, filter_order)), 'momentum_threshold': momentum_threshold, 'price_threshold': price_threshold, 'volatility': vol, 'market_regime': regime } return self.optimized_params def apply_antialiasing_filter(self): if not self.optimized_params: self.optimize_parameters() prices = self.data['Close'].values nyquist = 0.5 normalized_cutoff = self.optimized_params['cutoff_freq'] / nyquist b, a = signal.butter(self.optimized_params['filter_order'], normalized_cutoff, btype='low') self.filtered_prices = signal.filtfilt(b, a, prices) def perform_spectral_analysis(self): n = len(self.filtered_prices) self.spectrum = fft(self.filtered_prices) self.frequencies = fftfreq(n, d=1.0) positive_freq_mask = self.frequencies > 0 self.frequencies = self.frequencies[positive_freq_mask] self.spectrum = self.spectrum[positive_freq_mask] def find_dominant_frequencies(self, top_n=5): magnitude = np.abs(self.spectrum) noise_floor = np.percentile(magnitude[self.frequencies < 0.05], 95) significant_mask = magnitude > noise_floor * 1.5 significant_freqs = self.frequencies[significant_mask] significant_mags = magnitude[significant_mask] if len(significant_freqs) == 0: significant_freqs = self.frequencies significant_mags = magnitude top_indices = np.argsort(significant_mags)[-top_n:][::-1] dominant_freqs = [] for idx in top_indices: freq = significant_freqs[idx] amplitude = significant_mags[idx] period_days = 1.0 / freq if freq != 0 else np.inf phase = np.angle(self.spectrum[np.where(self.frequencies == freq)[0][0]]) z_score = (amplitude - noise_floor) / np.std(magnitude[self.frequencies < 0.05]) p_value = 1 - norm.cdf(z_score) dominant_freqs.append({ 'frequency': freq, 'period_days': period_days, 'amplitude': amplitude, 'phase': phase, 'significance': 1 - p_value, 'z_score': z_score }) return dominant_freqs def calculate_multi_frequency_momentum(self, window_size=30): dominant_freqs = self.find_dominant_frequencies(top_n=3) if not dominant_freqs: return self.calculate_momentum_signal(window_size) momentum_values = [] weights = [] total_weight = sum(freq['amplitude'] * freq['significance'] for freq in dominant_freqs) for freq in dominant_freqs: weights.append((freq['amplitude'] * freq['significance']) / total_weight) for i in range(window_size, len(self.filtered_prices)): window_data = self.filtered_prices[i-window_size:i] window_spectrum = fft(window_data) window_freqs = fftfreq(len(window_data), d=1.0) weighted_momentum = 0 for j, freq_info in enumerate(dominant_freqs): closest_idx = np.argmin(np.abs(window_freqs - freq_info['frequency'])) phase = np.angle(window_spectrum[closest_idx]) next_peak_distance = (2*np.pi - phase) / (2*np.pi) freq_momentum = np.cos(phase) * (1 - next_peak_distance) weighted_momentum += freq_momentum * weights[j] momentum_values.append(weighted_momentum) self.momentum_signal = np.concatenate([ np.zeros(window_size), np.array(momentum_values) ]) def calculate_momentum_signal(self, window_size=30): momentum_values = [] for i in range(window_size, len(self.filtered_prices)): window_data = self.filtered_prices[i-window_size:i] window_spectrum = fft(window_data) magnitude = np.abs(window_spectrum) dominant_idx = np.argmax(magnitude[1:]) + 1 phase = np.angle(window_spectrum[dominant_idx]) next_peak_distance = (2*np.pi - phase) / (2*np.pi) momentum = np.cos(phase) * (1 - next_peak_distance) momentum_values.append(momentum) self.momentum_signal = np.concatenate([ np.zeros(window_size), np.array(momentum_values) ]) def detect_bottom_signals(self): if not self.optimized_params: self.optimize_parameters() signals = [] momentum_threshold = self.optimized_params['momentum_threshold'] price_change_threshold = self.optimized_params['price_threshold'] for i in range(1, len(self.momentum_signal)-1): momentum_condition = ( self.momentum_signal[i-1] < momentum_threshold and self.momentum_signal[i] > self.momentum_signal[i-1] ) if i >= 5: recent_change = (self.filtered_prices[i] - self.filtered_prices[i-5]) / self.filtered_prices[i-5] price_condition = recent_change < price_change_threshold else: price_condition = False if momentum_condition and price_condition: signals.append({ 'date': self.data.index[i].strftime('%Y-%m-%d'), 'price': float(self.data['Close'].iloc[i]), 'momentum': float(self.momentum_signal[i]), 'price_change': float(recent_change if i >= 5 else 0) }) return signals def validate_signals(self, signals, forward_days=20): if not signals: return {} validation_results = [] for signal in signals: signal_date = datetime.strptime(signal['date'], '%Y-%m-%d') signal_price = signal['price'] future_date = signal_date + timedelta(days=forward_days) future_data = self.data[self.data.index > signal_date] if not future_data.empty: future_price = future_data.iloc[0]['Close'] if future_date in future_data.index else future_data.iloc[-1]['Close'] forward_return = (future_price - signal_price) / signal_price validation_results.append({ 'signal_date': signal['date'], 'forward_return': forward_return, 'success': forward_return > 0 }) if validation_results: success_rate = sum(1 for r in validation_results if r['success']) / len(validation_results) avg_return = np.mean([r['forward_return'] for r in validation_results]) sharpe_ratio = avg_return / np.std([r['forward_return'] for r in validation_results]) if len(validation_results) > 1 else 0 return { 'success_rate': success_rate, 'avg_return': avg_return, 'sharpe_ratio': sharpe_ratio, 'num_signals': len(signals) } return {} def predict_momentum_direction(self): dominant_freqs = self.find_dominant_frequencies(top_n=3) if not dominant_freqs: return None total_weight = sum(freq['amplitude'] * freq['significance'] for freq in dominant_freqs) weighted_phase = 0 weighted_freq = 0 weighted_period = 0 for freq in dominant_freqs: weight = (freq['amplitude'] * freq['significance']) / total_weight n = len(self.filtered_prices) current_phase = (freq['phase'] + 2 * np.pi * freq['frequency'] * (n-1)) % (2 * np.pi) weighted_phase += current_phase * weight weighted_freq += freq['frequency'] * weight weighted_period += freq['period_days'] * weight momentum_direction = np.cos(weighted_phase) if momentum_direction > 0.2: momentum_status = "Strong Positive Momentum" momentum_color = "green" elif momentum_direction > 0: momentum_status = "Weak Positive Momentum" momentum_color = "lightgreen" elif momentum_direction > -0.2: momentum_status = "Weak Negative Momentum" momentum_color = "orange" else: momentum_status = "Strong Negative Momentum" momentum_color = "red" if momentum_direction > 0: days_to_peak = (np.pi/2 - weighted_phase) / (2 * np.pi * weighted_freq) days_to_peak = max(0, days_to_peak) days_to_trough = None else: days_to_trough = (3*np.pi/2 - weighted_phase) / (2 * np.pi * weighted_freq) days_to_trough = max(0, days_to_trough) days_to_peak = None return { 'momentum_status': momentum_status, 'momentum_color': momentum_color, 'momentum_value': float(momentum_direction), 'current_phase': float(weighted_phase), 'days_to_peak': days_to_peak, 'days_to_trough': days_to_trough, 'dominant_period': float(weighted_period), 'weighted_frequency': float(weighted_freq) } def get_analysis_data(self, symbol): dates = [d.strftime('%Y-%m-%d') for d in self.data.index] original_prices = [float(p) for p in self.data['Close'].values] filtered_prices = [float(p) for p in self.filtered_prices] # Volatility bands upper_band = [float(p * (1 + v/100)) for p, v in zip(self.filtered_prices, self.volatility)] lower_band = [float(p * (1 - v/100)) for p, v in zip(self.filtered_prices, self.volatility)] # Frequency spectrum frequencies = [float(f) for f in self.frequencies] magnitude = [float(m) for m in np.abs(self.spectrum)] # Momentum signal momentum_signal = [float(m) for m in self.momentum_signal] # Buy signals signals = self.detect_bottom_signals() # Validation validation = self.validate_signals(signals) # Momentum prediction momentum_pred = self.predict_momentum_direction() # Dominant frequencies dominant_freqs = self.find_dominant_frequencies(top_n=5) # Optimized parameters optimized_params = self.optimize_parameters() # Prepare data for JSON serialization analysis_data = { 'symbol': symbol.upper(), 'dates': dates, 'original_prices': original_prices, 'filtered_prices': filtered_prices, 'upper_band': upper_band, 'lower_band': lower_band, 'frequencies': frequencies, 'magnitude': magnitude, 'momentum_signal': momentum_signal, 'signals': signals, 'validation': validation, 'momentum_prediction': momentum_pred, 'dominant_frequencies': dominant_freqs[:3], 'optimized_params': optimized_params, 'current_price': float(self.data['Close'].iloc[-1]), 'current_momentum': float(self.momentum_signal[-1]), 'market_regime': self.market_regime, 'volatility': float(self.volatility.iloc[-1]) } return analysis_data def analyze_stock(stock_symbol, analysis_period): if not stock_symbol or stock_symbol.strip() == "": return None, "Please enter a stock symbol!", "" try: analyzer = SpectralStockAnalyzer() success, message = analyzer.fetch_data(stock_symbol, analysis_period) if not success: return None, f"Error: {message}", "" status_msg = f"{message}\n" optimized_params = analyzer.optimize_parameters() status_msg += f"Optimized parameters for {optimized_params['market_regime']} market (Vol: {optimized_params['volatility']:.1%})\n" analyzer.apply_antialiasing_filter() status_msg += "Applied adaptive anti-aliasing filter\n" analyzer.perform_spectral_analysis() status_msg += "Completed spectral analysis\n" dominant_freqs = analyzer.find_dominant_frequencies(top_n=5) status_msg += f"Identified {len(dominant_freqs)} significant frequencies\n" analyzer.calculate_multi_frequency_momentum(window_size=30) status_msg += "Calculated multi-frequency momentum\n" analysis_data = analyzer.get_analysis_data(stock_symbol) status_msg += "Generated analysis data\n" # Convert to JSON string json_data = json.dumps(analysis_data) # Generate markdown results results = f""" ## ENHANCED ANALYSIS RESULTS: {stock_symbol.upper()} ### Market Conditions: - **Market Regime:** {optimized_params['market_regime']} - **Volatility:** {optimized_params['volatility']:.1%} - **Optimized Parameters:** - Cutoff Frequency: {optimized_params['cutoff_freq']:.3f} - Filter Order: {optimized_params['filter_order']} - Momentum Threshold: {optimized_params['momentum_threshold']:.3f} - Price Threshold: {optimized_params['price_threshold']:.3f} ### Significant Frequencies: """ for i, freq_info in enumerate(dominant_freqs[:3]): results += f""" **{i+1}. Frequency:** - Frequency: {freq_info['frequency']:.4f} cycles/day - Period: {freq_info['period_days']:.1f} days - Amplitude: {freq_info['amplitude']:.0f} - Phase: {np.degrees(freq_info['phase']):.1f}° - Significance: {freq_info['significance']:.1%} - Z-Score: {freq_info['z_score']:.2f} """ results += f"\n### Buy Signals: **{len(analysis_data['signals'])} detected**\n" if analysis_data['signals']: results += "\n| Date | Price | Momentum | Change |\n|-------|-------|----------|----------|\n" for signal in analysis_data['signals'][-10:]: results += f"| {signal['date']} | ${signal['price']:.2f} | {signal['momentum']:.3f} | {signal['price_change']:.1%} |\n" else: results += "\nNo buy signals detected in this period.\n" if analysis_data['validation']: results += f""" ### Signal Validation (20-day forward): - **Success Rate:** {analysis_data['validation']['success_rate']:.1%} - **Average Return:** {analysis_data['validation']['avg_return']:.1%} - **Sharpe Ratio:** {analysis_data['validation']['sharpe_ratio']:.2f} - **Total Signals:** {analysis_data['validation']['num_signals']} """ current_price = analysis_data['current_price'] current_momentum = analysis_data['current_momentum'] momentum_status = "Unknown" momentum_color = "gray" if analysis_data['momentum_prediction']: momentum_status = analysis_data['momentum_prediction']['momentum_status'] momentum_color = analysis_data['momentum_prediction']['momentum_color'] results += f""" ### Multi-Frequency Momentum Prediction: - **Current Phase Position:** {np.degrees(analysis_data['momentum_prediction']['current_phase']):.1f}° - **Predicted Momentum:** {momentum_status} ({analysis_data['momentum_prediction']['momentum_value']:.3f}) - **Weighted Frequency:** {analysis_data['momentum_prediction']['weighted_frequency']:.4f} cycles/day - **Dominant Cycle Period:** {analysis_data['momentum_prediction']['dominant_period']:.1f} days """ if analysis_data['momentum_prediction']['days_to_peak'] is not None: results += f"- **Days to Next Peak:** {analysis_data['momentum_prediction']['days_to_peak']:.1f} days\n" if analysis_data['momentum_prediction']['days_to_trough'] is not None: results += f"- **Days to Next Trough:** {analysis_data['momentum_prediction']['days_to_trough']:.1f} days\n" results += f""" ### Current Status: - **Last Price:** ${current_price:.2f} - **Current Momentum:** {current_momentum:.3f} - **Status:** {'Potential Buy Zone' if current_momentum < optimized_params['momentum_threshold'] else 'Normal' if current_momentum < 0.1 else 'High Level'} ### Enhanced Model Features: - **Dynamic Parameter Optimization:** Adapts to volatility and market regime - **Multi-Frequency Integration:** Combines multiple significant cycles - **Statistical Validation:** Tests signal quality with forward returns - **Market Regime Detection:** Identifies trending vs range-bound conditions - **Volatility-Based Filtering:** Adjusts noise reduction based on market conditions **Risk Warning:** This enhanced model provides deeper insights but still requires professional validation for investment decisions. """ return json_data, status_msg + "Enhanced analysis completed successfully!", results except Exception as e: error_msg = f"Analysis error: {str(e)}" return None, error_msg, "" def create_interface(): with gr.Blocks( theme=gr.themes.Soft(), title="Enhanced Spectral Stock Analysis", css=""" .main-header { text-align: center; background: linear-gradient(90deg, #1976d2 0%, #42a5f5 100%); color: white; padding: 25px; border-radius: 8px; margin-bottom: 25px; box-shadow: 0 4px 6px rgba(0,0,0,0.1); font-family: 'Roboto', sans-serif; } .parameter-group { background-color: #f5f5f5; padding: 20px; border-radius: 8px; margin-bottom: 20px; box-shadow: 0 2px 4px rgba(0,0,0,0.05); border: 1px solid #e0e0e0; } .results-group { background-color: #fafafa; padding: 20px; border-radius: 8px; margin-top: 20px; box-shadow: 0 2px 4px rgba(0,0,0,0.05); border: 1px solid #e0e0e0; } .gr-button { background-color: #1976d2 !important; color: white !important; border: none !important; border-radius: 4px !important; font-weight: 500 !important; text-transform: uppercase !important; letter-spacing: 0.5px !important; box-shadow: 0 2px 4px rgba(0,0,0,0.2) !important; transition: all 0.3s ease !important; } .gr-button:hover { background-color: #1565c0 !important; box-shadow: 0 4px 8px rgba(0,0,0,0.3) !important; } .gr-textbox, .gr-dropdown { border-radius: 4px !important; border: 1px solid #bdbdbd !important; padding: 8px 12px !important; font-family: 'Roboto', sans-serif; } .gr-textbox:focus, .gr-dropdown:focus { border-color: #1976d2 !important; box-shadow: 0 0 0 2px rgba(25, 118, 210, 0.2) !important; } h1, h2, h3, h4 { font-family: 'Roboto', sans-serif; font-weight: 500; } body { font-family: 'Roboto', sans-serif; background-color: #f5f5f5; } """ ) as demo: gr.HTML("""

Enhanced Spectral Stock Analysis

Multi-Frequency Integration + Dynamic Optimization + Statistical Validation

""") with gr.Row(): with gr.Column(scale=1): gr.HTML("

Analysis Parameters

") with gr.Group(elem_classes="parameter-group"): stock_input = gr.Textbox( label="Stock Symbol", placeholder="AAPL, MSFT, GOOGL, TSLA...", info="Enter symbols from NYSE, NASDAQ or other major exchanges" ) period_input = gr.Dropdown( label="Analysis Period", choices=["6mo", "1y", "2y", "5y"], value="1y", info="Longer periods provide more stable results" ) analyze_btn = gr.Button( "START ENHANCED ANALYSIS", variant="primary", size="lg" ) status_output = gr.Textbox( label="Analysis Status", interactive=False, lines=10, max_lines=12 ) with gr.Column(scale=2): # We'll show a placeholder for the plot plot_placeholder = gr.HTML( value="
Analysis charts will appear here after processing
" ) gr.HTML("
") with gr.Group(elem_classes="results-group"): results_output = gr.Markdown( label="Detailed Results", value="Enhanced analysis results will appear here..." ) analyze_btn.click( fn=analyze_stock, inputs=[stock_input, period_input], outputs=[plot_placeholder, status_output, results_output] ) gr.HTML("""

Enhanced Features:

This enhanced model provides deeper insights into market cycles and momentum dynamics.

""") return demo if __name__ == "__main__": demo = create_interface() demo.launch( server_name="0.0.0.0", server_port=7860, share=True )