""" Data Analysis Tools Handles fetching asset prices, normalization, and correlation calculation """ import yfinance as yf import numpy as np import pandas as pd from typing import Dict, List, Tuple from datetime import datetime, timedelta, timezone import pytz class DataAnalyzer: """Class to handle data normalization and correlation analysis""" def __init__(self): pass def fetch_asset_prices( self, ticker: str, start_unix: int, end_unix: int, interval: str = "1m" ) -> List[Dict]: """ Fetch asset prices from Yahoo Finance Args: ticker: Stock/ETF ticker symbol start_unix: Start time as Unix timestamp end_unix: End time as Unix timestamp interval: Time interval (1m, 5m, 15m, 30m, 1h, 1d) Note: 1m data only available for last 7 days Returns: List of price points: [ {"timestamp": 1234567890, "price": 150.25}, ... ] """ try: # Convert Unix timestamps to timezone-aware datetime (UTC) start_dt = datetime.fromtimestamp(start_unix, tz=timezone.utc) end_dt = datetime.fromtimestamp(end_unix, tz=timezone.utc) print(f"\n{'='*60}") print(f"FETCHING DATA FOR {ticker}") print(f"{'='*60}") print(f"Requested range: {start_dt} to {end_dt}") print(f"Requested interval: {interval}") # Calculate time difference time_diff = end_dt - start_dt period_days = time_diff.days + (time_diff.seconds / 86400) print(f"Time range: {period_days:.2f} days") # Adjust interval based on data availability limits original_interval = interval if interval in ["1m", "2m", "5m"] and period_days > 7: interval = "15m" print(f"→ Adjusted from {original_interval} to {interval} (minute data limited to 7 days)") if interval == "15m" and period_days > 60: interval = "1h" print(f"→ Adjusted from {original_interval} to {interval} (15m data limited to 60 days)") if interval == "1h" and period_days > 730: interval = "1d" print(f"→ Adjusted from {original_interval} to {interval} (hourly data limited to ~2 years)") print(f"Final interval: {interval}") # Fetch data from Yahoo Finance ticker_obj = yf.Ticker(ticker) print(f"Calling yfinance history()...") df = None # Strategy 1: Use period parameter (more reliable for intraday data) if interval in ["1m", "2m", "5m", "15m", "30m", "1h"]: # Determine appropriate period if period_days <= 1: period = "1d" elif period_days <= 5: period = "5d" elif period_days <= 30: period = "1mo" elif period_days <= 90: period = "3mo" else: period = "1y" print(f"Trying period='{period}' with interval='{interval}'") try: df = ticker_obj.history(period=period, interval=interval, auto_adjust=True, prepost=False) # Filter to requested date range if not df.empty: df = df[(df.index >= start_dt) & (df.index <= end_dt)] except Exception as e: print(f"Period method failed: {e}") # Strategy 2: Use start/end dates (better for daily+ intervals) if df is None or df.empty: print(f"Trying start/end dates with interval='{interval}'") try: # Convert to naive datetime for yfinance (it handles timezone internally) start_naive = start_dt.replace(tzinfo=None) end_naive = end_dt.replace(tzinfo=None) + timedelta(days=1) # Add buffer df = ticker_obj.history(start=start_naive, end=end_naive, interval=interval, auto_adjust=True, prepost=False) except Exception as e: print(f"Start/end method failed: {e}") # Strategy 3: Fallback to daily data if intraday fails if (df is None or df.empty) and interval != "1d": print(f"Falling back to daily interval") try: start_naive = start_dt.replace(tzinfo=None) end_naive = end_dt.replace(tzinfo=None) + timedelta(days=1) df = ticker_obj.history(start=start_naive, end=end_naive, interval="1d", auto_adjust=True, prepost=False) except Exception as e: print(f"Daily fallback failed: {e}") # Check if we got data if df is None or df.empty: # Last resort: try with just period and no date filtering print(f"Last resort: fetching with period='1y' and interval='1d'") try: df = ticker_obj.history(period="1y", interval="1d", auto_adjust=True, prepost=False) if not df.empty: df = df[(df.index >= start_dt.replace(tzinfo=None)) & (df.index <= end_dt.replace(tzinfo=None))] except Exception as e: print(f"Last resort failed: {e}") print(f"Received {len(df) if df is not None else 0} rows from Yahoo Finance") if df is not None and len(df) > 0: print(f"First timestamp: {df.index[0]}") print(f"Last timestamp: {df.index[-1]}") print(f"Sample prices: {df['Close'].head(3).tolist()}") else: print(f"⚠️ WARNING: No data returned!") # Final check - if still empty, raise error if df is None or df.empty: raise ValueError( f"No data available for ticker {ticker}. " f"Symbol may be delisted, invalid, or data unavailable for the requested period. " f"Tried interval={interval}, period={period_days:.1f} days" ) # Convert to our format prices = [] for timestamp, row in df.iterrows(): prices.append({ "timestamp": int(timestamp.timestamp()), "price": float(row['Close']) }) print(f"✓ Successfully converted {len(prices)} data points") print(f"{'='*60}\n") return prices except Exception as e: print(f"✗ ERROR for {ticker}: {str(e)}") print(f"{'='*60}\n") raise Exception(f"Error fetching data for {ticker}: {str(e)}") def normalize_timeseries( self, timeseries: List[Dict], method: str = "min-max" ) -> List[Dict]: """ Normalize a time series Args: timeseries: List of {"timestamp": int, "price": float} method: Normalization method - "min-max": Scale to 0-1 range - "z-score": Standardize to mean=0, std=1 - "percent-change": Normalize to percent change from start Returns: Normalized timeseries with same structure """ if not timeseries: return [] prices = np.array([point['price'] for point in timeseries]) if method == "min-max": # Scale to 0-1 range min_val = np.min(prices) max_val = np.max(prices) if max_val - min_val == 0: normalized = np.zeros_like(prices) else: normalized = (prices - min_val) / (max_val - min_val) elif method == "z-score": # Standardize to mean=0, std=1 mean = np.mean(prices) std = np.std(prices) if std == 0: normalized = np.zeros_like(prices) else: normalized = (prices - mean) / std elif method == "percent-change": # Normalize to percent change from first value first_price = prices[0] if first_price == 0: normalized = np.zeros_like(prices) else: normalized = ((prices - first_price) / first_price) * 100 else: raise ValueError(f"Unknown normalization method: {method}") # Create normalized timeseries normalized_ts = [] for i, point in enumerate(timeseries): normalized_ts.append({ "timestamp": point['timestamp'], "price": float(normalized[i]), "original_price": point['price'] }) return normalized_ts def align_timeseries( self, ts1: List[Dict], ts2: List[Dict] ) -> Tuple[List[float], List[float]]: """ Align two time series so prices are comparable. We treat ts1 as the reference timeline and forward-fill ts2 onto ts1's timestamps. This avoids requiring exact matching timestamps (which is unrealistic across different data sources). Args: ts1: First time series (reference), list of {"timestamp": int, "price": float} ts2: Second time series, list of {"timestamp": int, "price": float} Returns: Tuple of (aligned_prices1, aligned_prices2) """ if not ts1 or not ts2: raise ValueError("One or both time series are empty") # Sort by timestamp just in case ts1_sorted = sorted(ts1, key=lambda x: x["timestamp"]) ts2_sorted = sorted(ts2, key=lambda x: x["timestamp"]) print(f"\n--- ALIGNMENT PROCESS ---") print(f"Polymarket (ts1): {len(ts1_sorted)} points") print(f" Range: {datetime.fromtimestamp(ts1_sorted[0]['timestamp'])} to {datetime.fromtimestamp(ts1_sorted[-1]['timestamp'])}") print(f"Asset (ts2): {len(ts2_sorted)} points") print(f" Range: {datetime.fromtimestamp(ts2_sorted[0]['timestamp'])} to {datetime.fromtimestamp(ts2_sorted[-1]['timestamp'])}") aligned1: List[float] = [] aligned2: List[float] = [] # Only include Polymarket points where we have recent stock data # This creates distinct segments for each trading day instead of forward-filling 24/7 # We'll use a 2-hour window to match stock data to Polymarket timestamps max_gap_seconds = 2 * 3600 # 2 hours - enough to cover after-hours but not all night for point1 in ts1_sorted: t1 = point1["timestamp"] # Find the closest stock price to this Polymarket timestamp closest_price = None closest_time_diff = float('inf') for point2 in ts2_sorted: t2 = point2["timestamp"] time_diff = abs(t1 - t2) # Only consider stock prices within our time window if time_diff <= max_gap_seconds and time_diff < closest_time_diff: closest_price = point2["price"] closest_time_diff = time_diff # Only include this point if we found a close enough stock price if closest_price is not None: aligned1.append(point1["price"]) aligned2.append(closest_price) print(f"After alignment: {len(aligned1)} overlapping points (out of {len(ts1_sorted)} Polymarket points)") print(f" Coverage: {len(aligned1) / len(ts1_sorted) * 100:.1f}% of Polymarket timeline") if len(aligned1) > 0: # Find actual first and last timestamps matched_indices = [] matched_count = 0 for i, point1 in enumerate(ts1_sorted): if matched_count < len(aligned1): t1 = point1["timestamp"] # Check if this point was matched for point2 in ts2_sorted: if abs(t1 - point2["timestamp"]) <= max_gap_seconds: matched_indices.append(i) matched_count += 1 break if matched_indices: print(f" First aligned point: {datetime.fromtimestamp(ts1_sorted[matched_indices[0]]['timestamp'])}") print(f" Last aligned point: {datetime.fromtimestamp(ts1_sorted[matched_indices[-1]]['timestamp'])}") print(f"--- END ALIGNMENT ---\n") if len(aligned1) < 2: raise ValueError("Insufficient overlapping data between time series") return aligned1, aligned2 def build_chart_series( self, reference_series: List[Dict], target_series: List[Dict], tolerance_seconds: int = 2 * 3600 ) -> List[Dict]: """ Build a chart-ready series aligned to the reference timestamps. We only include target prices that are within `tolerance_seconds` of a reference timestamp. This creates distinct segments that match actual trading hours. Args: reference_series: Reference timeline (typically Polymarket) target_series: Target asset series tolerance_seconds: Maximum time difference to consider a match Returns: List of {"timestamp": int, "price": Optional[float]} """ if not reference_series: return [] ref_sorted = sorted(reference_series, key=lambda x: x["timestamp"]) tgt_sorted = sorted(target_series, key=lambda x: x["timestamp"]) result = [] tgt_idx = 0 for ref_point in ref_sorted: t_ref = ref_point["timestamp"] closest_price = None closest_diff = tolerance_seconds + 1 # Move pointer to relevant window while tgt_idx < len(tgt_sorted) and tgt_sorted[tgt_idx]["timestamp"] < t_ref - tolerance_seconds: tgt_idx += 1 # Check nearby target points (current and previous one) for idx in [tgt_idx - 1, tgt_idx, tgt_idx + 1]: if 0 <= idx < len(tgt_sorted): t_target = tgt_sorted[idx]["timestamp"] diff = abs(t_target - t_ref) if diff <= tolerance_seconds and diff < closest_diff: closest_diff = diff closest_price = tgt_sorted[idx]["price"] result.append({ "timestamp": t_ref, "price": closest_price }) return result def calculate_correlation( self, prices1: List[float], prices2: List[float] ) -> float: """ Calculate Pearson correlation coefficient between two price series Args: prices1: First price series prices2: Second price series Returns: Correlation coefficient (-1 to 1) """ if len(prices1) != len(prices2): raise ValueError("Price series must have same length") if len(prices1) < 2: raise ValueError("Need at least 2 data points to calculate correlation") # Calculate Pearson correlation correlation = np.corrcoef(prices1, prices2)[0, 1] # Handle NaN (can occur if one series is constant) if np.isnan(correlation): return 0.0 return float(correlation) def calculate_correlation_between_series( self, ts1: List[Dict], ts2: List[Dict] ) -> float: """ Calculate correlation between two time series Args: ts1: First time series ts2: Second time series Returns: Correlation coefficient (-1 to 1) """ # Align the time series aligned1, aligned2 = self.align_timeseries(ts1, ts2) # Calculate correlation return self.calculate_correlation(aligned1, aligned2) def analyze_multiple_assets( self, polymarket_data: List[Dict], assets: List[Dict], start_unix: int, end_unix: int, interval: str = "1h" ) -> Dict: """ Fetch and analyze multiple assets against Polymarket data Args: polymarket_data: Polymarket time series assets: List of assets with ticker and reason [{"ticker": "AAPL", "reason": "..."}, ...] start_unix: Start time end_unix: End time interval: Time interval for asset data Returns: Dictionary with analysis results for each asset """ results = [] print(f"\n{'#'*60}") print(f"ANALYZING {len(assets)} ASSETS") print(f"Polymarket data: {len(polymarket_data)} points") print(f"Time range: {datetime.fromtimestamp(start_unix)} to {datetime.fromtimestamp(end_unix)}") print(f"Interval: {interval}") print(f"{'#'*60}\n") for asset in assets: ticker = asset['ticker'] try: # Fetch asset prices asset_prices = self.fetch_asset_prices( ticker, start_unix, end_unix, interval ) # Normalize both series norm_polymarket = self.normalize_timeseries(polymarket_data, "min-max") norm_asset = self.normalize_timeseries(asset_prices, "min-max") # Build chart-aligned series (for gaps) aligned_raw = self.build_chart_series(polymarket_data, asset_prices) aligned_norm = self.build_chart_series(norm_polymarket, norm_asset) # Calculate correlation using aligned normalized series (filter out None) print(f"Calculating correlation for {ticker}...") aligned_prices_polymarket, aligned_prices_asset = self.align_timeseries( norm_polymarket, norm_asset ) correlation = self.calculate_correlation( aligned_prices_polymarket, aligned_prices_asset ) print(f"✓ {ticker} correlation: {correlation:.4f}") results.append({ "ticker": ticker, "reason": asset.get('reason', ''), "correlation": correlation, "data_points": len(asset_prices), "timeseries": asset_prices, "normalized_timeseries": norm_asset, "aligned_timeseries": aligned_raw, "aligned_normalized_timeseries": aligned_norm, "success": True }) except Exception as e: results.append({ "ticker": ticker, "reason": asset.get('reason', ''), "error": str(e), "success": False }) return results # Example usage if __name__ == "__main__": analyzer = DataAnalyzer() # Example: Fetch and analyze AAPL from time_utils import TimeConverter converter = TimeConverter() # Use a date range that's more likely to have data (last week) import time from datetime import datetime, timedelta # Get timestamps for last week end_time = datetime.now() start_time = end_time - timedelta(days=7) start_unix = int(start_time.timestamp()) end_unix = int(end_time.timestamp()) try: print(f"Testing data fetch for last 7 days...") print(f"Start: {start_time}") print(f"End: {end_time}\n") print("Fetching AAPL data...") prices = analyzer.fetch_asset_prices("AAPL", start_unix, end_unix, "1h") print(f"✓ Fetched {len(prices)} data points") print("\nNormalizing data...") normalized = analyzer.normalize_timeseries(prices, "min-max") print(f"✓ First 3 normalized points:") for point in normalized[:3]: dt = datetime.fromtimestamp(point['timestamp']) print(f" {dt}: {point['price']:.4f} (original: {point['original_price']:.2f})") except Exception as e: print(f"✗ Error: {str(e)}")