""" Finance data tools using Yahoo Finance and analysis utilities. Provides utilities for: - Historical price data retrieval - Portfolio optimization benchmarks - Option pricing models - Risk metrics calculation """ import numpy as np import pandas as pd from datetime import datetime, timedelta from typing import Optional class FinanceDataCollector: """Collect and process financial data for quantum finance experiments.""" # Default benchmark assets DEFAULT_TICKERS = [ 'AAPL', 'MSFT', 'GOOGL', 'AMZN', 'META', # Tech 'JPM', 'BAC', 'GS', 'MS', 'C', # Finance 'JNJ', 'PFE', 'UNH', 'MRK', 'ABBV', # Healthcare 'XOM', 'CVX', 'COP', 'SLB', 'EOG', # Energy 'PG', 'KO', 'PEP', 'WMT', 'COST' # Consumer ] def __init__(self, tickers: Optional[list] = None): """Initialize with list of tickers to track.""" self.tickers = tickers or self.DEFAULT_TICKERS def fetch_historical_data( self, start_date: str = None, end_date: str = None, period: str = "1y" ) -> pd.DataFrame: """ Fetch historical price data using yfinance. Args: start_date: Start date (YYYY-MM-DD format) end_date: End date (YYYY-MM-DD format) period: Alternative to start/end - e.g., "1y", "6mo", "1mo" Returns: DataFrame with adjusted close prices """ try: import yfinance as yf if start_date and end_date: data = yf.download( self.tickers, start=start_date, end=end_date, progress=False ) else: data = yf.download( self.tickers, period=period, progress=False ) # Extract adjusted close prices if len(data) == 0: print("No data from yfinance, using synthetic data...") return self._generate_synthetic_data() if 'Adj Close' in data.columns.get_level_values(0): prices = data['Adj Close'] else: prices = data['Close'] # If prices are empty, use synthetic if prices.empty or prices.isna().all().all(): print("Empty price data, using synthetic data...") return self._generate_synthetic_data() return prices except (ImportError, Exception) as e: print(f"yfinance error ({e}), using synthetic data...") return self._generate_synthetic_data() def _generate_synthetic_data(self, days: int = 252) -> pd.DataFrame: """Generate synthetic price data for testing when yfinance unavailable.""" np.random.seed(42) dates = pd.date_range(end=datetime.now(), periods=days, freq='D') data = {} for ticker in self.tickers: # Geometric Brownian Motion simulation initial_price = np.random.uniform(50, 500) returns = np.random.normal(0.0005, 0.02, days) prices = initial_price * np.cumprod(1 + returns) data[ticker] = prices return pd.DataFrame(data, index=dates) def calculate_returns(self, prices: pd.DataFrame) -> pd.DataFrame: """Calculate daily returns from price data.""" return prices.pct_change(fill_method=None).dropna() def calculate_covariance_matrix(self, returns: pd.DataFrame) -> pd.DataFrame: """Calculate annualized covariance matrix.""" return returns.cov() * 252 # Annualize def calculate_expected_returns(self, returns: pd.DataFrame) -> pd.Series: """Calculate annualized expected returns.""" return returns.mean() * 252 # Annualize class ClassicalPortfolioOptimizer: """Classical portfolio optimization for benchmarking quantum approaches.""" def __init__(self, expected_returns: np.ndarray, covariance_matrix: np.ndarray): """ Initialize optimizer. Args: expected_returns: Expected returns vector covariance_matrix: Covariance matrix of returns """ self.mu = expected_returns self.sigma = covariance_matrix self.n_assets = len(expected_returns) def optimize_markowitz(self, target_return: float = None) -> dict: """ Solve Markowitz mean-variance optimization. Args: target_return: Target portfolio return (if None, maximize Sharpe) Returns: Optimization results """ try: from scipy.optimize import minimize def portfolio_volatility(weights): return np.sqrt(weights @ self.sigma @ weights) def negative_sharpe(weights): ret = weights @ self.mu vol = portfolio_volatility(weights) return -ret / vol if vol > 0 else 0 # Constraints: weights sum to 1, all weights >= 0 constraints = [{'type': 'eq', 'fun': lambda w: np.sum(w) - 1}] bounds = [(0, 1) for _ in range(self.n_assets)] # Initial guess: equal weights w0 = np.ones(self.n_assets) / self.n_assets if target_return is not None: constraints.append({ 'type': 'eq', 'fun': lambda w: w @ self.mu - target_return }) result = minimize( portfolio_volatility, w0, method='SLSQP', bounds=bounds, constraints=constraints ) else: result = minimize( negative_sharpe, w0, method='SLSQP', bounds=bounds, constraints=constraints ) optimal_weights = result.x portfolio_return = optimal_weights @ self.mu portfolio_vol = portfolio_volatility(optimal_weights) sharpe = portfolio_return / portfolio_vol if portfolio_vol > 0 else 0 return { 'weights': optimal_weights, 'expected_return': portfolio_return, 'volatility': portfolio_vol, 'sharpe_ratio': sharpe, 'optimization_success': result.success, 'solver': 'scipy.optimize.minimize (SLSQP)' } except ImportError: return {'error': 'scipy not available'} def benchmark_timing(self, n_trials: int = 100) -> dict: """ Benchmark classical optimization timing. Args: n_trials: Number of optimization runs Returns: Timing statistics """ import time times = [] for _ in range(n_trials): start = time.perf_counter() self.optimize_markowitz() times.append(time.perf_counter() - start) return { 'n_assets': self.n_assets, 'n_trials': n_trials, 'mean_time_ms': np.mean(times) * 1000, 'std_time_ms': np.std(times) * 1000, 'min_time_ms': np.min(times) * 1000, 'max_time_ms': np.max(times) * 1000 } class OptionPricer: """Classical option pricing for benchmarking quantum amplitude estimation.""" @staticmethod def black_scholes_call(S: float, K: float, T: float, r: float, sigma: float) -> float: """ Black-Scholes European call option price. Args: S: Current stock price K: Strike price T: Time to maturity (years) r: Risk-free rate sigma: Volatility Returns: Call option price """ from scipy.stats import norm d1 = (np.log(S/K) + (r + 0.5*sigma**2)*T) / (sigma*np.sqrt(T)) d2 = d1 - sigma*np.sqrt(T) return S * norm.cdf(d1) - K * np.exp(-r*T) * norm.cdf(d2) @staticmethod def monte_carlo_call( S: float, K: float, T: float, r: float, sigma: float, n_paths: int = 100000 ) -> dict: """ Monte Carlo European call option pricing. Args: S: Current stock price K: Strike price T: Time to maturity (years) r: Risk-free rate sigma: Volatility n_paths: Number of simulation paths Returns: Price estimate with confidence interval """ np.random.seed(42) # Simulate terminal prices Z = np.random.standard_normal(n_paths) ST = S * np.exp((r - 0.5*sigma**2)*T + sigma*np.sqrt(T)*Z) # Calculate payoffs payoffs = np.maximum(ST - K, 0) discounted = np.exp(-r*T) * payoffs price = np.mean(discounted) std_error = np.std(discounted) / np.sqrt(n_paths) return { 'price': price, 'std_error': std_error, 'confidence_interval_95': (price - 1.96*std_error, price + 1.96*std_error), 'n_paths': n_paths } @staticmethod def benchmark_monte_carlo_timing( n_paths_list: list = None, n_trials: int = 10 ) -> list: """Benchmark Monte Carlo timing for different path counts.""" import time if n_paths_list is None: n_paths_list = [1000, 10000, 100000, 1000000] results = [] for n_paths in n_paths_list: times = [] for _ in range(n_trials): start = time.perf_counter() OptionPricer.monte_carlo_call( S=100, K=100, T=1, r=0.05, sigma=0.2, n_paths=n_paths ) times.append(time.perf_counter() - start) results.append({ 'n_paths': n_paths, 'mean_time_ms': np.mean(times) * 1000, 'std_time_ms': np.std(times) * 1000 }) return results def collect_experiment_data(n_assets: int = 25) -> dict: """ Collect data for quantum finance experiments. Args: n_assets: Number of assets to include Returns: Dictionary with prices, returns, and optimization inputs """ collector = FinanceDataCollector( tickers=FinanceDataCollector.DEFAULT_TICKERS[:n_assets] ) print(f"Fetching data for {n_assets} assets...") prices = collector.fetch_historical_data(period="1y") returns = collector.calculate_returns(prices) cov_matrix = collector.calculate_covariance_matrix(returns) exp_returns = collector.calculate_expected_returns(returns) return { 'prices': prices, 'returns': returns, 'covariance_matrix': cov_matrix, 'expected_returns': exp_returns, 'n_assets': n_assets, 'n_observations': len(returns) } if __name__ == "__main__": # Demo data collection and classical benchmarking print("Finance Data Collection Demo") print("=" * 50) data = collect_experiment_data(n_assets=10) print(f"Collected {data['n_observations']} days of data for {data['n_assets']} assets") print("\nClassical Portfolio Optimization:") print("-" * 50) optimizer = ClassicalPortfolioOptimizer( data['expected_returns'].values, data['covariance_matrix'].values ) result = optimizer.optimize_markowitz() print(f"Expected Return: {result['expected_return']:.4f}") print(f"Volatility: {result['volatility']:.4f}") print(f"Sharpe Ratio: {result['sharpe_ratio']:.4f}") print("\nOptimization Timing Benchmark:") timing = optimizer.benchmark_timing(n_trials=100) print(f"Mean time: {timing['mean_time_ms']:.3f} ms")