des137's picture
Deploy Quantum Finance Analyzer
6413ecc
"""
Finance data tools using Yahoo Finance and analysis utilities.
Provides utilities for:
- Historical price data retrieval
- Portfolio optimization benchmarks
- Option pricing models
- Risk metrics calculation
"""
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from typing import Optional
class FinanceDataCollector:
"""Collect and process financial data for quantum finance experiments."""
# Default benchmark assets
DEFAULT_TICKERS = [
'AAPL', 'MSFT', 'GOOGL', 'AMZN', 'META', # Tech
'JPM', 'BAC', 'GS', 'MS', 'C', # Finance
'JNJ', 'PFE', 'UNH', 'MRK', 'ABBV', # Healthcare
'XOM', 'CVX', 'COP', 'SLB', 'EOG', # Energy
'PG', 'KO', 'PEP', 'WMT', 'COST' # Consumer
]
def __init__(self, tickers: Optional[list] = None):
"""Initialize with list of tickers to track."""
self.tickers = tickers or self.DEFAULT_TICKERS
def fetch_historical_data(
self,
start_date: str = None,
end_date: str = None,
period: str = "1y"
) -> pd.DataFrame:
"""
Fetch historical price data using yfinance.
Args:
start_date: Start date (YYYY-MM-DD format)
end_date: End date (YYYY-MM-DD format)
period: Alternative to start/end - e.g., "1y", "6mo", "1mo"
Returns:
DataFrame with adjusted close prices
"""
try:
import yfinance as yf
if start_date and end_date:
data = yf.download(
self.tickers,
start=start_date,
end=end_date,
progress=False
)
else:
data = yf.download(
self.tickers,
period=period,
progress=False
)
# Extract adjusted close prices
if len(data) == 0:
print("No data from yfinance, using synthetic data...")
return self._generate_synthetic_data()
if 'Adj Close' in data.columns.get_level_values(0):
prices = data['Adj Close']
else:
prices = data['Close']
# If prices are empty, use synthetic
if prices.empty or prices.isna().all().all():
print("Empty price data, using synthetic data...")
return self._generate_synthetic_data()
return prices
except (ImportError, Exception) as e:
print(f"yfinance error ({e}), using synthetic data...")
return self._generate_synthetic_data()
def _generate_synthetic_data(self, days: int = 252) -> pd.DataFrame:
"""Generate synthetic price data for testing when yfinance unavailable."""
np.random.seed(42)
dates = pd.date_range(end=datetime.now(), periods=days, freq='D')
data = {}
for ticker in self.tickers:
# Geometric Brownian Motion simulation
initial_price = np.random.uniform(50, 500)
returns = np.random.normal(0.0005, 0.02, days)
prices = initial_price * np.cumprod(1 + returns)
data[ticker] = prices
return pd.DataFrame(data, index=dates)
def calculate_returns(self, prices: pd.DataFrame) -> pd.DataFrame:
"""Calculate daily returns from price data."""
return prices.pct_change(fill_method=None).dropna()
def calculate_covariance_matrix(self, returns: pd.DataFrame) -> pd.DataFrame:
"""Calculate annualized covariance matrix."""
return returns.cov() * 252 # Annualize
def calculate_expected_returns(self, returns: pd.DataFrame) -> pd.Series:
"""Calculate annualized expected returns."""
return returns.mean() * 252 # Annualize
class ClassicalPortfolioOptimizer:
"""Classical portfolio optimization for benchmarking quantum approaches."""
def __init__(self, expected_returns: np.ndarray, covariance_matrix: np.ndarray):
"""
Initialize optimizer.
Args:
expected_returns: Expected returns vector
covariance_matrix: Covariance matrix of returns
"""
self.mu = expected_returns
self.sigma = covariance_matrix
self.n_assets = len(expected_returns)
def optimize_markowitz(self, target_return: float = None) -> dict:
"""
Solve Markowitz mean-variance optimization.
Args:
target_return: Target portfolio return (if None, maximize Sharpe)
Returns:
Optimization results
"""
try:
from scipy.optimize import minimize
def portfolio_volatility(weights):
return np.sqrt(weights @ self.sigma @ weights)
def negative_sharpe(weights):
ret = weights @ self.mu
vol = portfolio_volatility(weights)
return -ret / vol if vol > 0 else 0
# Constraints: weights sum to 1, all weights >= 0
constraints = [{'type': 'eq', 'fun': lambda w: np.sum(w) - 1}]
bounds = [(0, 1) for _ in range(self.n_assets)]
# Initial guess: equal weights
w0 = np.ones(self.n_assets) / self.n_assets
if target_return is not None:
constraints.append({
'type': 'eq',
'fun': lambda w: w @ self.mu - target_return
})
result = minimize(
portfolio_volatility, w0,
method='SLSQP', bounds=bounds, constraints=constraints
)
else:
result = minimize(
negative_sharpe, w0,
method='SLSQP', bounds=bounds, constraints=constraints
)
optimal_weights = result.x
portfolio_return = optimal_weights @ self.mu
portfolio_vol = portfolio_volatility(optimal_weights)
sharpe = portfolio_return / portfolio_vol if portfolio_vol > 0 else 0
return {
'weights': optimal_weights,
'expected_return': portfolio_return,
'volatility': portfolio_vol,
'sharpe_ratio': sharpe,
'optimization_success': result.success,
'solver': 'scipy.optimize.minimize (SLSQP)'
}
except ImportError:
return {'error': 'scipy not available'}
def benchmark_timing(self, n_trials: int = 100) -> dict:
"""
Benchmark classical optimization timing.
Args:
n_trials: Number of optimization runs
Returns:
Timing statistics
"""
import time
times = []
for _ in range(n_trials):
start = time.perf_counter()
self.optimize_markowitz()
times.append(time.perf_counter() - start)
return {
'n_assets': self.n_assets,
'n_trials': n_trials,
'mean_time_ms': np.mean(times) * 1000,
'std_time_ms': np.std(times) * 1000,
'min_time_ms': np.min(times) * 1000,
'max_time_ms': np.max(times) * 1000
}
class OptionPricer:
"""Classical option pricing for benchmarking quantum amplitude estimation."""
@staticmethod
def black_scholes_call(S: float, K: float, T: float, r: float, sigma: float) -> float:
"""
Black-Scholes European call option price.
Args:
S: Current stock price
K: Strike price
T: Time to maturity (years)
r: Risk-free rate
sigma: Volatility
Returns:
Call option price
"""
from scipy.stats import norm
d1 = (np.log(S/K) + (r + 0.5*sigma**2)*T) / (sigma*np.sqrt(T))
d2 = d1 - sigma*np.sqrt(T)
return S * norm.cdf(d1) - K * np.exp(-r*T) * norm.cdf(d2)
@staticmethod
def monte_carlo_call(
S: float, K: float, T: float, r: float, sigma: float,
n_paths: int = 100000
) -> dict:
"""
Monte Carlo European call option pricing.
Args:
S: Current stock price
K: Strike price
T: Time to maturity (years)
r: Risk-free rate
sigma: Volatility
n_paths: Number of simulation paths
Returns:
Price estimate with confidence interval
"""
np.random.seed(42)
# Simulate terminal prices
Z = np.random.standard_normal(n_paths)
ST = S * np.exp((r - 0.5*sigma**2)*T + sigma*np.sqrt(T)*Z)
# Calculate payoffs
payoffs = np.maximum(ST - K, 0)
discounted = np.exp(-r*T) * payoffs
price = np.mean(discounted)
std_error = np.std(discounted) / np.sqrt(n_paths)
return {
'price': price,
'std_error': std_error,
'confidence_interval_95': (price - 1.96*std_error, price + 1.96*std_error),
'n_paths': n_paths
}
@staticmethod
def benchmark_monte_carlo_timing(
n_paths_list: list = None,
n_trials: int = 10
) -> list:
"""Benchmark Monte Carlo timing for different path counts."""
import time
if n_paths_list is None:
n_paths_list = [1000, 10000, 100000, 1000000]
results = []
for n_paths in n_paths_list:
times = []
for _ in range(n_trials):
start = time.perf_counter()
OptionPricer.monte_carlo_call(
S=100, K=100, T=1, r=0.05, sigma=0.2,
n_paths=n_paths
)
times.append(time.perf_counter() - start)
results.append({
'n_paths': n_paths,
'mean_time_ms': np.mean(times) * 1000,
'std_time_ms': np.std(times) * 1000
})
return results
def collect_experiment_data(n_assets: int = 25) -> dict:
"""
Collect data for quantum finance experiments.
Args:
n_assets: Number of assets to include
Returns:
Dictionary with prices, returns, and optimization inputs
"""
collector = FinanceDataCollector(
tickers=FinanceDataCollector.DEFAULT_TICKERS[:n_assets]
)
print(f"Fetching data for {n_assets} assets...")
prices = collector.fetch_historical_data(period="1y")
returns = collector.calculate_returns(prices)
cov_matrix = collector.calculate_covariance_matrix(returns)
exp_returns = collector.calculate_expected_returns(returns)
return {
'prices': prices,
'returns': returns,
'covariance_matrix': cov_matrix,
'expected_returns': exp_returns,
'n_assets': n_assets,
'n_observations': len(returns)
}
if __name__ == "__main__":
# Demo data collection and classical benchmarking
print("Finance Data Collection Demo")
print("=" * 50)
data = collect_experiment_data(n_assets=10)
print(f"Collected {data['n_observations']} days of data for {data['n_assets']} assets")
print("\nClassical Portfolio Optimization:")
print("-" * 50)
optimizer = ClassicalPortfolioOptimizer(
data['expected_returns'].values,
data['covariance_matrix'].values
)
result = optimizer.optimize_markowitz()
print(f"Expected Return: {result['expected_return']:.4f}")
print(f"Volatility: {result['volatility']:.4f}")
print(f"Sharpe Ratio: {result['sharpe_ratio']:.4f}")
print("\nOptimization Timing Benchmark:")
timing = optimizer.benchmark_timing(n_trials=100)
print(f"Mean time: {timing['mean_time_ms']:.3f} ms")