dbase.v1 / app.py
kaganseyda's picture
Update app.py
9358033 verified
import gradio as gr
import yfinance as yf
import numpy as np
import pandas as pd
import json
from scipy import signal
from scipy.fft import fft, fftfreq
from scipy.stats import norm, pearsonr
import warnings
from datetime import datetime, timedelta
warnings.filterwarnings('ignore')
class SpectralStockAnalyzer:
def __init__(self):
self.data = None
self.filtered_prices = None
self.spectrum = None
self.frequencies = None
self.momentum_signal = None
self.optimized_params = {}
self.market_regime = None
self.volatility = None
def fetch_data(self, symbol, period="1y"):
try:
symbol = symbol.strip().upper()
ticker = yf.Ticker(symbol)
self.data = ticker.history(period=period)
if len(self.data) < 60:
return False, f"Insufficient data: Only {len(self.data)} days found for {symbol}"
return True, f"{symbol} data successfully fetched: {len(self.data)} days"
except Exception as e:
return False, f"Data fetch error: {str(e)}"
def calculate_volatility(self, window=20):
returns = self.data['Close'].pct_change()
self.volatility = returns.rolling(window=window).std() * np.sqrt(252)
first_valid = self.volatility.first_valid_index()
if first_valid is not None:
first_vol = self.volatility.loc[first_valid]
self.volatility = self.volatility.fillna(first_vol)
else:
self.volatility = pd.Series(0.2, index=self.data.index)
return self.volatility.iloc[-1]
def detect_market_regime(self, lookback=50):
high = self.data['High']
low = self.data['Low']
close = self.data['Close']
tr1 = high - low
tr2 = abs(high - close.shift())
tr3 = abs(low - close.shift())
tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
up_move = high - high.shift()
down_move = low.shift() - low
plus_dm = np.where((up_move > down_move) & (up_move > 0), up_move, 0)
minus_dm = np.where((down_move > up_move) & (down_move > 0), down_move, 0)
atr = tr.rolling(window=lookback).mean()
plus_di = pd.Series(plus_dm).rolling(window=lookback).mean() / atr * 100
minus_di = pd.Series(minus_dm).rolling(window=lookback).mean() / atr * 100
dx = abs(plus_di - minus_di) / (plus_di + minus_di) * 100
adx = dx.rolling(window=lookback).mean()
adx = adx.fillna(20)
current_adx = adx.iloc[-1]
if current_adx > 25:
self.market_regime = "Trending"
elif current_adx < 20:
self.market_regime = "Range-Bound"
else:
self.market_regime = "Transitional"
return self.market_regime
def optimize_parameters(self):
vol = self.calculate_volatility()
regime = self.detect_market_regime()
base_cutoff = 0.1
base_order = 4
base_momentum_threshold = -0.3
base_price_threshold = -0.05
if vol > 0.3:
cutoff_freq = base_cutoff * 0.7
filter_order = base_order + 1
elif vol < 0.15:
cutoff_freq = base_cutoff * 1.3
filter_order = base_order - 1
else:
cutoff_freq = base_cutoff
filter_order = base_order
if regime == "Trending":
momentum_threshold = base_momentum_threshold * 0.8
price_threshold = base_price_threshold * 0.8
elif regime == "Range-Bound":
momentum_threshold = base_momentum_threshold * 1.2
price_threshold = base_price_threshold * 1.2
else:
momentum_threshold = base_momentum_threshold
price_threshold = base_price_threshold
self.optimized_params = {
'cutoff_freq': cutoff_freq,
'filter_order': max(2, min(6, filter_order)),
'momentum_threshold': momentum_threshold,
'price_threshold': price_threshold,
'volatility': vol,
'market_regime': regime
}
return self.optimized_params
def apply_antialiasing_filter(self):
if not self.optimized_params:
self.optimize_parameters()
prices = self.data['Close'].values
nyquist = 0.5
normalized_cutoff = self.optimized_params['cutoff_freq'] / nyquist
b, a = signal.butter(self.optimized_params['filter_order'], normalized_cutoff, btype='low')
self.filtered_prices = signal.filtfilt(b, a, prices)
def perform_spectral_analysis(self):
n = len(self.filtered_prices)
self.spectrum = fft(self.filtered_prices)
self.frequencies = fftfreq(n, d=1.0)
positive_freq_mask = self.frequencies > 0
self.frequencies = self.frequencies[positive_freq_mask]
self.spectrum = self.spectrum[positive_freq_mask]
def find_dominant_frequencies(self, top_n=5):
magnitude = np.abs(self.spectrum)
noise_floor = np.percentile(magnitude[self.frequencies < 0.05], 95)
significant_mask = magnitude > noise_floor * 1.5
significant_freqs = self.frequencies[significant_mask]
significant_mags = magnitude[significant_mask]
if len(significant_freqs) == 0:
significant_freqs = self.frequencies
significant_mags = magnitude
top_indices = np.argsort(significant_mags)[-top_n:][::-1]
dominant_freqs = []
for idx in top_indices:
freq = significant_freqs[idx]
amplitude = significant_mags[idx]
period_days = 1.0 / freq if freq != 0 else np.inf
phase = np.angle(self.spectrum[np.where(self.frequencies == freq)[0][0]])
z_score = (amplitude - noise_floor) / np.std(magnitude[self.frequencies < 0.05])
p_value = 1 - norm.cdf(z_score)
dominant_freqs.append({
'frequency': freq,
'period_days': period_days,
'amplitude': amplitude,
'phase': phase,
'significance': 1 - p_value,
'z_score': z_score
})
return dominant_freqs
def calculate_multi_frequency_momentum(self, window_size=30):
dominant_freqs = self.find_dominant_frequencies(top_n=3)
if not dominant_freqs:
return self.calculate_momentum_signal(window_size)
momentum_values = []
weights = []
total_weight = sum(freq['amplitude'] * freq['significance'] for freq in dominant_freqs)
for freq in dominant_freqs:
weights.append((freq['amplitude'] * freq['significance']) / total_weight)
for i in range(window_size, len(self.filtered_prices)):
window_data = self.filtered_prices[i-window_size:i]
window_spectrum = fft(window_data)
window_freqs = fftfreq(len(window_data), d=1.0)
weighted_momentum = 0
for j, freq_info in enumerate(dominant_freqs):
closest_idx = np.argmin(np.abs(window_freqs - freq_info['frequency']))
phase = np.angle(window_spectrum[closest_idx])
next_peak_distance = (2*np.pi - phase) / (2*np.pi)
freq_momentum = np.cos(phase) * (1 - next_peak_distance)
weighted_momentum += freq_momentum * weights[j]
momentum_values.append(weighted_momentum)
self.momentum_signal = np.concatenate([
np.zeros(window_size),
np.array(momentum_values)
])
def calculate_momentum_signal(self, window_size=30):
momentum_values = []
for i in range(window_size, len(self.filtered_prices)):
window_data = self.filtered_prices[i-window_size:i]
window_spectrum = fft(window_data)
magnitude = np.abs(window_spectrum)
dominant_idx = np.argmax(magnitude[1:]) + 1
phase = np.angle(window_spectrum[dominant_idx])
next_peak_distance = (2*np.pi - phase) / (2*np.pi)
momentum = np.cos(phase) * (1 - next_peak_distance)
momentum_values.append(momentum)
self.momentum_signal = np.concatenate([
np.zeros(window_size),
np.array(momentum_values)
])
def detect_bottom_signals(self):
if not self.optimized_params:
self.optimize_parameters()
signals = []
momentum_threshold = self.optimized_params['momentum_threshold']
price_change_threshold = self.optimized_params['price_threshold']
for i in range(1, len(self.momentum_signal)-1):
momentum_condition = (
self.momentum_signal[i-1] < momentum_threshold and
self.momentum_signal[i] > self.momentum_signal[i-1]
)
if i >= 5:
recent_change = (self.filtered_prices[i] - self.filtered_prices[i-5]) / self.filtered_prices[i-5]
price_condition = recent_change < price_change_threshold
else:
price_condition = False
if momentum_condition and price_condition:
signals.append({
'date': self.data.index[i].strftime('%Y-%m-%d'),
'price': float(self.data['Close'].iloc[i]),
'momentum': float(self.momentum_signal[i]),
'price_change': float(recent_change if i >= 5 else 0)
})
return signals
def validate_signals(self, signals, forward_days=20):
if not signals:
return {}
validation_results = []
for signal in signals:
signal_date = datetime.strptime(signal['date'], '%Y-%m-%d')
signal_price = signal['price']
future_date = signal_date + timedelta(days=forward_days)
future_data = self.data[self.data.index > signal_date]
if not future_data.empty:
future_price = future_data.iloc[0]['Close'] if future_date in future_data.index else future_data.iloc[-1]['Close']
forward_return = (future_price - signal_price) / signal_price
validation_results.append({
'signal_date': signal['date'],
'forward_return': forward_return,
'success': forward_return > 0
})
if validation_results:
success_rate = sum(1 for r in validation_results if r['success']) / len(validation_results)
avg_return = np.mean([r['forward_return'] for r in validation_results])
sharpe_ratio = avg_return / np.std([r['forward_return'] for r in validation_results]) if len(validation_results) > 1 else 0
return {
'success_rate': success_rate,
'avg_return': avg_return,
'sharpe_ratio': sharpe_ratio,
'num_signals': len(signals)
}
return {}
def predict_momentum_direction(self):
dominant_freqs = self.find_dominant_frequencies(top_n=3)
if not dominant_freqs:
return None
total_weight = sum(freq['amplitude'] * freq['significance'] for freq in dominant_freqs)
weighted_phase = 0
weighted_freq = 0
weighted_period = 0
for freq in dominant_freqs:
weight = (freq['amplitude'] * freq['significance']) / total_weight
n = len(self.filtered_prices)
current_phase = (freq['phase'] + 2 * np.pi * freq['frequency'] * (n-1)) % (2 * np.pi)
weighted_phase += current_phase * weight
weighted_freq += freq['frequency'] * weight
weighted_period += freq['period_days'] * weight
momentum_direction = np.cos(weighted_phase)
if momentum_direction > 0.2:
momentum_status = "Strong Positive Momentum"
momentum_color = "green"
elif momentum_direction > 0:
momentum_status = "Weak Positive Momentum"
momentum_color = "lightgreen"
elif momentum_direction > -0.2:
momentum_status = "Weak Negative Momentum"
momentum_color = "orange"
else:
momentum_status = "Strong Negative Momentum"
momentum_color = "red"
if momentum_direction > 0:
days_to_peak = (np.pi/2 - weighted_phase) / (2 * np.pi * weighted_freq)
days_to_peak = max(0, days_to_peak)
days_to_trough = None
else:
days_to_trough = (3*np.pi/2 - weighted_phase) / (2 * np.pi * weighted_freq)
days_to_trough = max(0, days_to_trough)
days_to_peak = None
return {
'momentum_status': momentum_status,
'momentum_color': momentum_color,
'momentum_value': float(momentum_direction),
'current_phase': float(weighted_phase),
'days_to_peak': days_to_peak,
'days_to_trough': days_to_trough,
'dominant_period': float(weighted_period),
'weighted_frequency': float(weighted_freq)
}
def get_analysis_data(self, symbol):
dates = [d.strftime('%Y-%m-%d') for d in self.data.index]
original_prices = [float(p) for p in self.data['Close'].values]
filtered_prices = [float(p) for p in self.filtered_prices]
# Volatility bands
upper_band = [float(p * (1 + v/100)) for p, v in zip(self.filtered_prices, self.volatility)]
lower_band = [float(p * (1 - v/100)) for p, v in zip(self.filtered_prices, self.volatility)]
# Frequency spectrum
frequencies = [float(f) for f in self.frequencies]
magnitude = [float(m) for m in np.abs(self.spectrum)]
# Momentum signal
momentum_signal = [float(m) for m in self.momentum_signal]
# Buy signals
signals = self.detect_bottom_signals()
# Validation
validation = self.validate_signals(signals)
# Momentum prediction
momentum_pred = self.predict_momentum_direction()
# Dominant frequencies
dominant_freqs = self.find_dominant_frequencies(top_n=5)
# Optimized parameters
optimized_params = self.optimize_parameters()
# Prepare data for JSON serialization
analysis_data = {
'symbol': symbol.upper(),
'dates': dates,
'original_prices': original_prices,
'filtered_prices': filtered_prices,
'upper_band': upper_band,
'lower_band': lower_band,
'frequencies': frequencies,
'magnitude': magnitude,
'momentum_signal': momentum_signal,
'signals': signals,
'validation': validation,
'momentum_prediction': momentum_pred,
'dominant_frequencies': dominant_freqs[:3],
'optimized_params': optimized_params,
'current_price': float(self.data['Close'].iloc[-1]),
'current_momentum': float(self.momentum_signal[-1]),
'market_regime': self.market_regime,
'volatility': float(self.volatility.iloc[-1])
}
return analysis_data
def analyze_stock(stock_symbol, analysis_period):
if not stock_symbol or stock_symbol.strip() == "":
return None, "Please enter a stock symbol!", ""
try:
analyzer = SpectralStockAnalyzer()
success, message = analyzer.fetch_data(stock_symbol, analysis_period)
if not success:
return None, f"Error: {message}", ""
status_msg = f"{message}\n"
optimized_params = analyzer.optimize_parameters()
status_msg += f"Optimized parameters for {optimized_params['market_regime']} market (Vol: {optimized_params['volatility']:.1%})\n"
analyzer.apply_antialiasing_filter()
status_msg += "Applied adaptive anti-aliasing filter\n"
analyzer.perform_spectral_analysis()
status_msg += "Completed spectral analysis\n"
dominant_freqs = analyzer.find_dominant_frequencies(top_n=5)
status_msg += f"Identified {len(dominant_freqs)} significant frequencies\n"
analyzer.calculate_multi_frequency_momentum(window_size=30)
status_msg += "Calculated multi-frequency momentum\n"
analysis_data = analyzer.get_analysis_data(stock_symbol)
status_msg += "Generated analysis data\n"
# Convert to JSON string
json_data = json.dumps(analysis_data)
# Generate markdown results
results = f"""
## ENHANCED ANALYSIS RESULTS: {stock_symbol.upper()}
### Market Conditions:
- **Market Regime:** {optimized_params['market_regime']}
- **Volatility:** {optimized_params['volatility']:.1%}
- **Optimized Parameters:**
- Cutoff Frequency: {optimized_params['cutoff_freq']:.3f}
- Filter Order: {optimized_params['filter_order']}
- Momentum Threshold: {optimized_params['momentum_threshold']:.3f}
- Price Threshold: {optimized_params['price_threshold']:.3f}
### Significant Frequencies:
"""
for i, freq_info in enumerate(dominant_freqs[:3]):
results += f"""
**{i+1}. Frequency:**
- Frequency: {freq_info['frequency']:.4f} cycles/day
- Period: {freq_info['period_days']:.1f} days
- Amplitude: {freq_info['amplitude']:.0f}
- Phase: {np.degrees(freq_info['phase']):.1f}°
- Significance: {freq_info['significance']:.1%}
- Z-Score: {freq_info['z_score']:.2f}
"""
results += f"\n### Buy Signals: **{len(analysis_data['signals'])} detected**\n"
if analysis_data['signals']:
results += "\n| Date | Price | Momentum | Change |\n|-------|-------|----------|----------|\n"
for signal in analysis_data['signals'][-10:]:
results += f"| {signal['date']} | ${signal['price']:.2f} | {signal['momentum']:.3f} | {signal['price_change']:.1%} |\n"
else:
results += "\nNo buy signals detected in this period.\n"
if analysis_data['validation']:
results += f"""
### Signal Validation (20-day forward):
- **Success Rate:** {analysis_data['validation']['success_rate']:.1%}
- **Average Return:** {analysis_data['validation']['avg_return']:.1%}
- **Sharpe Ratio:** {analysis_data['validation']['sharpe_ratio']:.2f}
- **Total Signals:** {analysis_data['validation']['num_signals']}
"""
current_price = analysis_data['current_price']
current_momentum = analysis_data['current_momentum']
momentum_status = "Unknown"
momentum_color = "gray"
if analysis_data['momentum_prediction']:
momentum_status = analysis_data['momentum_prediction']['momentum_status']
momentum_color = analysis_data['momentum_prediction']['momentum_color']
results += f"""
### Multi-Frequency Momentum Prediction:
- **Current Phase Position:** {np.degrees(analysis_data['momentum_prediction']['current_phase']):.1f}°
- **Predicted Momentum:** <span style="color:{momentum_color}">{momentum_status}</span> ({analysis_data['momentum_prediction']['momentum_value']:.3f})
- **Weighted Frequency:** {analysis_data['momentum_prediction']['weighted_frequency']:.4f} cycles/day
- **Dominant Cycle Period:** {analysis_data['momentum_prediction']['dominant_period']:.1f} days
"""
if analysis_data['momentum_prediction']['days_to_peak'] is not None:
results += f"- **Days to Next Peak:** {analysis_data['momentum_prediction']['days_to_peak']:.1f} days\n"
if analysis_data['momentum_prediction']['days_to_trough'] is not None:
results += f"- **Days to Next Trough:** {analysis_data['momentum_prediction']['days_to_trough']:.1f} days\n"
results += f"""
### Current Status:
- **Last Price:** ${current_price:.2f}
- **Current Momentum:** {current_momentum:.3f}
- **Status:** {'Potential Buy Zone' if current_momentum < optimized_params['momentum_threshold'] else 'Normal' if current_momentum < 0.1 else 'High Level'}
### Enhanced Model Features:
- **Dynamic Parameter Optimization:** Adapts to volatility and market regime
- **Multi-Frequency Integration:** Combines multiple significant cycles
- **Statistical Validation:** Tests signal quality with forward returns
- **Market Regime Detection:** Identifies trending vs range-bound conditions
- **Volatility-Based Filtering:** Adjusts noise reduction based on market conditions
**Risk Warning:** This enhanced model provides deeper insights but still requires professional validation for investment decisions.
"""
return json_data, status_msg + "Enhanced analysis completed successfully!", results
except Exception as e:
error_msg = f"Analysis error: {str(e)}"
return None, error_msg, ""
def create_interface():
with gr.Blocks(
theme=gr.themes.Soft(),
title="Enhanced Spectral Stock Analysis",
css="""
.main-header {
text-align: center;
background: linear-gradient(90deg, #1976d2 0%, #42a5f5 100%);
color: white;
padding: 25px;
border-radius: 8px;
margin-bottom: 25px;
box-shadow: 0 4px 6px rgba(0,0,0,0.1);
font-family: 'Roboto', sans-serif;
}
.parameter-group {
background-color: #f5f5f5;
padding: 20px;
border-radius: 8px;
margin-bottom: 20px;
box-shadow: 0 2px 4px rgba(0,0,0,0.05);
border: 1px solid #e0e0e0;
}
.results-group {
background-color: #fafafa;
padding: 20px;
border-radius: 8px;
margin-top: 20px;
box-shadow: 0 2px 4px rgba(0,0,0,0.05);
border: 1px solid #e0e0e0;
}
.gr-button {
background-color: #1976d2 !important;
color: white !important;
border: none !important;
border-radius: 4px !important;
font-weight: 500 !important;
text-transform: uppercase !important;
letter-spacing: 0.5px !important;
box-shadow: 0 2px 4px rgba(0,0,0,0.2) !important;
transition: all 0.3s ease !important;
}
.gr-button:hover {
background-color: #1565c0 !important;
box-shadow: 0 4px 8px rgba(0,0,0,0.3) !important;
}
.gr-textbox, .gr-dropdown {
border-radius: 4px !important;
border: 1px solid #bdbdbd !important;
padding: 8px 12px !important;
font-family: 'Roboto', sans-serif;
}
.gr-textbox:focus, .gr-dropdown:focus {
border-color: #1976d2 !important;
box-shadow: 0 0 0 2px rgba(25, 118, 210, 0.2) !important;
}
h1, h2, h3, h4 {
font-family: 'Roboto', sans-serif;
font-weight: 500;
}
body {
font-family: 'Roboto', sans-serif;
background-color: #f5f5f5;
}
"""
) as demo:
gr.HTML("""
<div class="main-header">
<h1>Enhanced Spectral Stock Analysis</h1>
<p>Multi-Frequency Integration + Dynamic Optimization + Statistical Validation</p>
</div>
""")
with gr.Row():
with gr.Column(scale=1):
gr.HTML("<h3>Analysis Parameters</h3>")
with gr.Group(elem_classes="parameter-group"):
stock_input = gr.Textbox(
label="Stock Symbol",
placeholder="AAPL, MSFT, GOOGL, TSLA...",
info="Enter symbols from NYSE, NASDAQ or other major exchanges"
)
period_input = gr.Dropdown(
label="Analysis Period",
choices=["6mo", "1y", "2y", "5y"],
value="1y",
info="Longer periods provide more stable results"
)
analyze_btn = gr.Button(
"START ENHANCED ANALYSIS",
variant="primary",
size="lg"
)
status_output = gr.Textbox(
label="Analysis Status",
interactive=False,
lines=10,
max_lines=12
)
with gr.Column(scale=2):
# We'll show a placeholder for the plot
plot_placeholder = gr.HTML(
value="<div style='padding: 20px; text-align: center; color: #666;'>Analysis charts will appear here after processing</div>"
)
gr.HTML("<hr>")
with gr.Group(elem_classes="results-group"):
results_output = gr.Markdown(
label="Detailed Results",
value="Enhanced analysis results will appear here..."
)
analyze_btn.click(
fn=analyze_stock,
inputs=[stock_input, period_input],
outputs=[plot_placeholder, status_output, results_output]
)
gr.HTML("""
<div style="margin-top: 30px; padding: 20px; background-color: #e3f2fd; border-radius: 8px; box-shadow: 0 2px 4px rgba(0,0,0,0.05); border: 1px solid #bbdefb;">
<h4>Enhanced Features:</h4>
<ul>
<li><strong>Dynamic Parameter Optimization:</strong> Adapts to market volatility and regime</li>
<li><strong>Multi-Frequency Integration:</strong> Combines multiple significant cycles</li>
<li><strong>Statistical Validation:</strong> Tests signal quality with forward returns</li>
<li><strong>Market Regime Detection:</strong> Identifies trending vs range-bound conditions</li>
<li><strong>Volatility-Based Filtering:</strong> Adjusts noise reduction based on market conditions</li>
</ul>
<p><em>This enhanced model provides deeper insights into market cycles and momentum dynamics.</em></p>
</div>
""")
return demo
if __name__ == "__main__":
demo = create_interface()
demo.launch(
server_name="0.0.0.0",
server_port=7860,
share=True
)