Spaces:
Build error
Build error
| import yfinance as yf | |
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from datetime import datetime | |
| from PIL import Image | |
| import io | |
| import gradio as gr | |
| from cachetools import cached, TTLCache | |
| import cProfile | |
| import pstats | |
| # Global fontsize variable | |
| FONT_SIZE = 32 | |
| # Company ticker mapping | |
| COMPANY_TICKERS = { | |
| 'iShares Core US Aggregate Bond': 'AGG', | |
| 'iShares 3-7 Year Treasury Bond': 'IEI', | |
| 'iShares 20+ Year Treasury Bond': 'TLT', | |
| 'Communication Services': 'XLC', | |
| 'Financial SPDR': 'XLF', | |
| 'Technology SPDR': 'XLK', | |
| 'Utilities SPDR': 'XLU', | |
| 'Consumer Discretionary': 'XLY', | |
| 'SPDR Gold Shares': 'GLD', | |
| 'SPDR S&P 500 ETF Trust': 'SPY', | |
| 'Materials SPDR': 'XLB', | |
| 'Energy SPDR': 'XLE', | |
| 'Industrial SPDR': 'XLI', | |
| 'Consumer Staples SPDR': 'XLP', | |
| 'Health Care SPDR': 'XLV' | |
| } | |
| # Cache with 1-day TTL | |
| cache = TTLCache(maxsize=100, ttl=86400) | |
| def fetch_historical_data(ticker, start_date, end_date): | |
| """Fetch historical stock data from Yahoo Finance.""" | |
| try: | |
| data = yf.download(ticker, start=start_date, end=end_date) | |
| if data.empty: | |
| raise ValueError(f"No data found for ticker {ticker}") | |
| return data | |
| except Exception as e: | |
| print(f"Error fetching data for {ticker}: {e}") | |
| return None | |
| def plot_to_image(plt, title): | |
| """Convert plot to a PIL Image object.""" | |
| plt.title(title, fontsize=FONT_SIZE + 1, pad=40) | |
| plt.legend(fontsize=FONT_SIZE) | |
| plt.xlabel('Date', fontsize=FONT_SIZE) | |
| plt.ylabel('', fontsize=FONT_SIZE) | |
| plt.grid(True) | |
| plt.xticks(rotation=45, ha='right', fontsize=FONT_SIZE) | |
| plt.yticks(fontsize=FONT_SIZE) | |
| plt.tight_layout(rect=[0, 0, 1, 0.88]) | |
| buf = io.BytesIO() | |
| plt.savefig(buf, format='png', dpi=300) | |
| plt.close() | |
| buf.seek(0) | |
| return Image.open(buf) | |
| def plot_indicator(data, company_name, ticker, indicator): | |
| """Plot selected technical indicator for a single company.""" | |
| plt.figure(figsize=(16, 10)) | |
| if indicator == "SMA": | |
| sma_55 = data['Close'].rolling(window=55).mean() | |
| sma_200 = data['Close'].rolling(window=252).mean() | |
| plt.plot(data.index, data['Close'], label='Close') | |
| plt.plot(data.index, sma_55, label='55-day SMA') | |
| plt.plot(data.index, sma_200, label='252-day SMA') | |
| plt.ylabel('Price', fontsize=FONT_SIZE) | |
| elif indicator == "MACD": | |
| exp1 = data['Close'].ewm(span=12, adjust=False).mean() | |
| exp2 = data['Close'].ewm(span=26, adjust=False).mean() | |
| macd = exp1 - exp2 | |
| signal = macd.ewm(span=9, adjust=False).mean() | |
| plt.plot(data.index, macd, label='MACD') | |
| plt.plot(data.index, signal, label='Signal Line') | |
| plt.bar(data.index, macd - signal, label='MACD Histogram') | |
| plt.ylabel('MACD', fontsize=FONT_SIZE) | |
| return plot_to_image(plt, f'{company_name} ({ticker}) {indicator}') | |
| def plot_indicators(company_names, indicator_types): | |
| """Plot the selected indicators for the selected companies.""" | |
| images = [] | |
| if len(company_names) > 7: | |
| return None, "You can select up to 7 companies at the same time.", None | |
| if len(company_names) > 1 and len(indicator_types) > 1: | |
| return None, "You can only select one indicator when selecting multiple companies.", None | |
| with ThreadPoolExecutor() as executor: | |
| future_to_company = { | |
| executor.submit(fetch_historical_data, COMPANY_TICKERS[company], '2000-01-01', datetime.now().strftime('%Y-%m-%d')): (company, indicator) | |
| for company in company_names | |
| for indicator in indicator_types | |
| } | |
| for future in as_completed(future_to_company): | |
| company, indicator = future_to_company[future] | |
| ticker = COMPANY_TICKERS[company] | |
| data = future.result() | |
| if data is None: | |
| continue | |
| images.append(plot_indicator(data, company, ticker, indicator)) | |
| return images, "", None | |
| def select_all_indicators(select_all): | |
| """Select or deselect all indicators based on the select_all flag.""" | |
| indicators = ["SMA", "MACD"] | |
| return indicators if select_all else [] | |
| def launch_gradio_app(): | |
| """Launch the Gradio app for interactive plotting.""" | |
| company_choices = list(COMPANY_TICKERS.keys()) | |
| indicators = ["SMA", "MACD"] | |
| def fetch_and_plot(company_names, indicator_types): | |
| images, error_message, _ = plot_indicators(company_names, indicator_types) | |
| if error_message: | |
| return [None] * len(indicator_types), error_message, None | |
| return images, "", "" | |
| with gr.Blocks() as demo: | |
| company_checkboxgroup = gr.CheckboxGroup(choices=company_choices, label="Select Companies") | |
| select_all_checkbox = gr.Checkbox(label="Select All Indicators", value=False, interactive=True) | |
| indicator_types_checkboxgroup = gr.CheckboxGroup(choices=indicators, label="Select Technical Indicators") | |
| select_all_checkbox.change(select_all_indicators, inputs=select_all_checkbox, outputs=indicator_types_checkboxgroup) | |
| plot_gallery = gr.Gallery(label="Indicator Plots") | |
| error_markdown = gr.Markdown() | |
| gr.Interface( | |
| fetch_and_plot, | |
| [company_checkboxgroup, indicator_types_checkboxgroup], | |
| [plot_gallery, error_markdown] | |
| ) | |
| demo.launch() | |
| def profile_code(): | |
| """Profile the main functions to find speed bottlenecks.""" | |
| profiler = cProfile.Profile() | |
| profiler.enable() | |
| launch_gradio_app() | |
| profiler.disable() | |
| stats = pstats.Stats(profiler).sort_stats('cumtime') | |
| stats.print_stats(10) | |
| if __name__ == "__main__": | |
| profile_code() | |