Spaces:
Sleeping
Sleeping
| """ | |
| Finance data tools using Yahoo Finance and analysis utilities. | |
| Provides utilities for: | |
| - Historical price data retrieval | |
| - Portfolio optimization benchmarks | |
| - Option pricing models | |
| - Risk metrics calculation | |
| """ | |
| import numpy as np | |
| import pandas as pd | |
| from datetime import datetime, timedelta | |
| from typing import Optional | |
| class FinanceDataCollector: | |
| """Collect and process financial data for quantum finance experiments.""" | |
| # Default benchmark assets | |
| DEFAULT_TICKERS = [ | |
| 'AAPL', 'MSFT', 'GOOGL', 'AMZN', 'META', # Tech | |
| 'JPM', 'BAC', 'GS', 'MS', 'C', # Finance | |
| 'JNJ', 'PFE', 'UNH', 'MRK', 'ABBV', # Healthcare | |
| 'XOM', 'CVX', 'COP', 'SLB', 'EOG', # Energy | |
| 'PG', 'KO', 'PEP', 'WMT', 'COST' # Consumer | |
| ] | |
| def __init__(self, tickers: Optional[list] = None): | |
| """Initialize with list of tickers to track.""" | |
| self.tickers = tickers or self.DEFAULT_TICKERS | |
| def fetch_historical_data( | |
| self, | |
| start_date: str = None, | |
| end_date: str = None, | |
| period: str = "1y" | |
| ) -> pd.DataFrame: | |
| """ | |
| Fetch historical price data using yfinance. | |
| Args: | |
| start_date: Start date (YYYY-MM-DD format) | |
| end_date: End date (YYYY-MM-DD format) | |
| period: Alternative to start/end - e.g., "1y", "6mo", "1mo" | |
| Returns: | |
| DataFrame with adjusted close prices | |
| """ | |
| try: | |
| import yfinance as yf | |
| if start_date and end_date: | |
| data = yf.download( | |
| self.tickers, | |
| start=start_date, | |
| end=end_date, | |
| progress=False | |
| ) | |
| else: | |
| data = yf.download( | |
| self.tickers, | |
| period=period, | |
| progress=False | |
| ) | |
| # Extract adjusted close prices | |
| if len(data) == 0: | |
| print("No data from yfinance, using synthetic data...") | |
| return self._generate_synthetic_data() | |
| if 'Adj Close' in data.columns.get_level_values(0): | |
| prices = data['Adj Close'] | |
| else: | |
| prices = data['Close'] | |
| # If prices are empty, use synthetic | |
| if prices.empty or prices.isna().all().all(): | |
| print("Empty price data, using synthetic data...") | |
| return self._generate_synthetic_data() | |
| return prices | |
| except (ImportError, Exception) as e: | |
| print(f"yfinance error ({e}), using synthetic data...") | |
| return self._generate_synthetic_data() | |
| def _generate_synthetic_data(self, days: int = 252) -> pd.DataFrame: | |
| """Generate synthetic price data for testing when yfinance unavailable.""" | |
| np.random.seed(42) | |
| dates = pd.date_range(end=datetime.now(), periods=days, freq='D') | |
| data = {} | |
| for ticker in self.tickers: | |
| # Geometric Brownian Motion simulation | |
| initial_price = np.random.uniform(50, 500) | |
| returns = np.random.normal(0.0005, 0.02, days) | |
| prices = initial_price * np.cumprod(1 + returns) | |
| data[ticker] = prices | |
| return pd.DataFrame(data, index=dates) | |
| def calculate_returns(self, prices: pd.DataFrame) -> pd.DataFrame: | |
| """Calculate daily returns from price data.""" | |
| return prices.pct_change(fill_method=None).dropna() | |
| def calculate_covariance_matrix(self, returns: pd.DataFrame) -> pd.DataFrame: | |
| """Calculate annualized covariance matrix.""" | |
| return returns.cov() * 252 # Annualize | |
| def calculate_expected_returns(self, returns: pd.DataFrame) -> pd.Series: | |
| """Calculate annualized expected returns.""" | |
| return returns.mean() * 252 # Annualize | |
| class ClassicalPortfolioOptimizer: | |
| """Classical portfolio optimization for benchmarking quantum approaches.""" | |
| def __init__(self, expected_returns: np.ndarray, covariance_matrix: np.ndarray): | |
| """ | |
| Initialize optimizer. | |
| Args: | |
| expected_returns: Expected returns vector | |
| covariance_matrix: Covariance matrix of returns | |
| """ | |
| self.mu = expected_returns | |
| self.sigma = covariance_matrix | |
| self.n_assets = len(expected_returns) | |
| def optimize_markowitz(self, target_return: float = None) -> dict: | |
| """ | |
| Solve Markowitz mean-variance optimization. | |
| Args: | |
| target_return: Target portfolio return (if None, maximize Sharpe) | |
| Returns: | |
| Optimization results | |
| """ | |
| try: | |
| from scipy.optimize import minimize | |
| def portfolio_volatility(weights): | |
| return np.sqrt(weights @ self.sigma @ weights) | |
| def negative_sharpe(weights): | |
| ret = weights @ self.mu | |
| vol = portfolio_volatility(weights) | |
| return -ret / vol if vol > 0 else 0 | |
| # Constraints: weights sum to 1, all weights >= 0 | |
| constraints = [{'type': 'eq', 'fun': lambda w: np.sum(w) - 1}] | |
| bounds = [(0, 1) for _ in range(self.n_assets)] | |
| # Initial guess: equal weights | |
| w0 = np.ones(self.n_assets) / self.n_assets | |
| if target_return is not None: | |
| constraints.append({ | |
| 'type': 'eq', | |
| 'fun': lambda w: w @ self.mu - target_return | |
| }) | |
| result = minimize( | |
| portfolio_volatility, w0, | |
| method='SLSQP', bounds=bounds, constraints=constraints | |
| ) | |
| else: | |
| result = minimize( | |
| negative_sharpe, w0, | |
| method='SLSQP', bounds=bounds, constraints=constraints | |
| ) | |
| optimal_weights = result.x | |
| portfolio_return = optimal_weights @ self.mu | |
| portfolio_vol = portfolio_volatility(optimal_weights) | |
| sharpe = portfolio_return / portfolio_vol if portfolio_vol > 0 else 0 | |
| return { | |
| 'weights': optimal_weights, | |
| 'expected_return': portfolio_return, | |
| 'volatility': portfolio_vol, | |
| 'sharpe_ratio': sharpe, | |
| 'optimization_success': result.success, | |
| 'solver': 'scipy.optimize.minimize (SLSQP)' | |
| } | |
| except ImportError: | |
| return {'error': 'scipy not available'} | |
| def benchmark_timing(self, n_trials: int = 100) -> dict: | |
| """ | |
| Benchmark classical optimization timing. | |
| Args: | |
| n_trials: Number of optimization runs | |
| Returns: | |
| Timing statistics | |
| """ | |
| import time | |
| times = [] | |
| for _ in range(n_trials): | |
| start = time.perf_counter() | |
| self.optimize_markowitz() | |
| times.append(time.perf_counter() - start) | |
| return { | |
| 'n_assets': self.n_assets, | |
| 'n_trials': n_trials, | |
| 'mean_time_ms': np.mean(times) * 1000, | |
| 'std_time_ms': np.std(times) * 1000, | |
| 'min_time_ms': np.min(times) * 1000, | |
| 'max_time_ms': np.max(times) * 1000 | |
| } | |
| class OptionPricer: | |
| """Classical option pricing for benchmarking quantum amplitude estimation.""" | |
| def black_scholes_call(S: float, K: float, T: float, r: float, sigma: float) -> float: | |
| """ | |
| Black-Scholes European call option price. | |
| Args: | |
| S: Current stock price | |
| K: Strike price | |
| T: Time to maturity (years) | |
| r: Risk-free rate | |
| sigma: Volatility | |
| Returns: | |
| Call option price | |
| """ | |
| from scipy.stats import norm | |
| d1 = (np.log(S/K) + (r + 0.5*sigma**2)*T) / (sigma*np.sqrt(T)) | |
| d2 = d1 - sigma*np.sqrt(T) | |
| return S * norm.cdf(d1) - K * np.exp(-r*T) * norm.cdf(d2) | |
| def monte_carlo_call( | |
| S: float, K: float, T: float, r: float, sigma: float, | |
| n_paths: int = 100000 | |
| ) -> dict: | |
| """ | |
| Monte Carlo European call option pricing. | |
| Args: | |
| S: Current stock price | |
| K: Strike price | |
| T: Time to maturity (years) | |
| r: Risk-free rate | |
| sigma: Volatility | |
| n_paths: Number of simulation paths | |
| Returns: | |
| Price estimate with confidence interval | |
| """ | |
| np.random.seed(42) | |
| # Simulate terminal prices | |
| Z = np.random.standard_normal(n_paths) | |
| ST = S * np.exp((r - 0.5*sigma**2)*T + sigma*np.sqrt(T)*Z) | |
| # Calculate payoffs | |
| payoffs = np.maximum(ST - K, 0) | |
| discounted = np.exp(-r*T) * payoffs | |
| price = np.mean(discounted) | |
| std_error = np.std(discounted) / np.sqrt(n_paths) | |
| return { | |
| 'price': price, | |
| 'std_error': std_error, | |
| 'confidence_interval_95': (price - 1.96*std_error, price + 1.96*std_error), | |
| 'n_paths': n_paths | |
| } | |
| def benchmark_monte_carlo_timing( | |
| n_paths_list: list = None, | |
| n_trials: int = 10 | |
| ) -> list: | |
| """Benchmark Monte Carlo timing for different path counts.""" | |
| import time | |
| if n_paths_list is None: | |
| n_paths_list = [1000, 10000, 100000, 1000000] | |
| results = [] | |
| for n_paths in n_paths_list: | |
| times = [] | |
| for _ in range(n_trials): | |
| start = time.perf_counter() | |
| OptionPricer.monte_carlo_call( | |
| S=100, K=100, T=1, r=0.05, sigma=0.2, | |
| n_paths=n_paths | |
| ) | |
| times.append(time.perf_counter() - start) | |
| results.append({ | |
| 'n_paths': n_paths, | |
| 'mean_time_ms': np.mean(times) * 1000, | |
| 'std_time_ms': np.std(times) * 1000 | |
| }) | |
| return results | |
| def collect_experiment_data(n_assets: int = 25) -> dict: | |
| """ | |
| Collect data for quantum finance experiments. | |
| Args: | |
| n_assets: Number of assets to include | |
| Returns: | |
| Dictionary with prices, returns, and optimization inputs | |
| """ | |
| collector = FinanceDataCollector( | |
| tickers=FinanceDataCollector.DEFAULT_TICKERS[:n_assets] | |
| ) | |
| print(f"Fetching data for {n_assets} assets...") | |
| prices = collector.fetch_historical_data(period="1y") | |
| returns = collector.calculate_returns(prices) | |
| cov_matrix = collector.calculate_covariance_matrix(returns) | |
| exp_returns = collector.calculate_expected_returns(returns) | |
| return { | |
| 'prices': prices, | |
| 'returns': returns, | |
| 'covariance_matrix': cov_matrix, | |
| 'expected_returns': exp_returns, | |
| 'n_assets': n_assets, | |
| 'n_observations': len(returns) | |
| } | |
| if __name__ == "__main__": | |
| # Demo data collection and classical benchmarking | |
| print("Finance Data Collection Demo") | |
| print("=" * 50) | |
| data = collect_experiment_data(n_assets=10) | |
| print(f"Collected {data['n_observations']} days of data for {data['n_assets']} assets") | |
| print("\nClassical Portfolio Optimization:") | |
| print("-" * 50) | |
| optimizer = ClassicalPortfolioOptimizer( | |
| data['expected_returns'].values, | |
| data['covariance_matrix'].values | |
| ) | |
| result = optimizer.optimize_markowitz() | |
| print(f"Expected Return: {result['expected_return']:.4f}") | |
| print(f"Volatility: {result['volatility']:.4f}") | |
| print(f"Sharpe Ratio: {result['sharpe_ratio']:.4f}") | |
| print("\nOptimization Timing Benchmark:") | |
| timing = optimizer.benchmark_timing(n_trials=100) | |
| print(f"Mean time: {timing['mean_time_ms']:.3f} ms") | |