Spaces:
Sleeping
Sleeping
| """ | |
| Data Analysis Tools | |
| Handles fetching asset prices, normalization, and correlation calculation | |
| """ | |
| import yfinance as yf | |
| import numpy as np | |
| import pandas as pd | |
| from typing import Dict, List, Tuple | |
| from datetime import datetime, timedelta, timezone | |
| import pytz | |
| class DataAnalyzer: | |
| """Class to handle data normalization and correlation analysis""" | |
| def __init__(self): | |
| pass | |
| def fetch_asset_prices( | |
| self, | |
| ticker: str, | |
| start_unix: int, | |
| end_unix: int, | |
| interval: str = "1m" | |
| ) -> List[Dict]: | |
| """ | |
| Fetch asset prices from Yahoo Finance | |
| Args: | |
| ticker: Stock/ETF ticker symbol | |
| start_unix: Start time as Unix timestamp | |
| end_unix: End time as Unix timestamp | |
| interval: Time interval (1m, 5m, 15m, 30m, 1h, 1d) | |
| Note: 1m data only available for last 7 days | |
| Returns: | |
| List of price points: | |
| [ | |
| {"timestamp": 1234567890, "price": 150.25}, | |
| ... | |
| ] | |
| """ | |
| try: | |
| # Convert Unix timestamps to timezone-aware datetime (UTC) | |
| start_dt = datetime.fromtimestamp(start_unix, tz=timezone.utc) | |
| end_dt = datetime.fromtimestamp(end_unix, tz=timezone.utc) | |
| print(f"\n{'='*60}") | |
| print(f"FETCHING DATA FOR {ticker}") | |
| print(f"{'='*60}") | |
| print(f"Requested range: {start_dt} to {end_dt}") | |
| print(f"Requested interval: {interval}") | |
| # Calculate time difference | |
| time_diff = end_dt - start_dt | |
| period_days = time_diff.days + (time_diff.seconds / 86400) | |
| print(f"Time range: {period_days:.2f} days") | |
| # Adjust interval based on data availability limits | |
| original_interval = interval | |
| if interval in ["1m", "2m", "5m"] and period_days > 7: | |
| interval = "15m" | |
| print(f"β Adjusted from {original_interval} to {interval} (minute data limited to 7 days)") | |
| if interval == "15m" and period_days > 60: | |
| interval = "1h" | |
| print(f"β Adjusted from {original_interval} to {interval} (15m data limited to 60 days)") | |
| if interval == "1h" and period_days > 730: | |
| interval = "1d" | |
| print(f"β Adjusted from {original_interval} to {interval} (hourly data limited to ~2 years)") | |
| print(f"Final interval: {interval}") | |
| # Fetch data from Yahoo Finance | |
| ticker_obj = yf.Ticker(ticker) | |
| print(f"Calling yfinance history()...") | |
| df = None | |
| # Strategy 1: Use period parameter (more reliable for intraday data) | |
| if interval in ["1m", "2m", "5m", "15m", "30m", "1h"]: | |
| # Determine appropriate period | |
| if period_days <= 1: | |
| period = "1d" | |
| elif period_days <= 5: | |
| period = "5d" | |
| elif period_days <= 30: | |
| period = "1mo" | |
| elif period_days <= 90: | |
| period = "3mo" | |
| else: | |
| period = "1y" | |
| print(f"Trying period='{period}' with interval='{interval}'") | |
| try: | |
| df = ticker_obj.history(period=period, interval=interval, auto_adjust=True, prepost=False) | |
| # Filter to requested date range | |
| if not df.empty: | |
| df = df[(df.index >= start_dt) & (df.index <= end_dt)] | |
| except Exception as e: | |
| print(f"Period method failed: {e}") | |
| # Strategy 2: Use start/end dates (better for daily+ intervals) | |
| if df is None or df.empty: | |
| print(f"Trying start/end dates with interval='{interval}'") | |
| try: | |
| # Convert to naive datetime for yfinance (it handles timezone internally) | |
| start_naive = start_dt.replace(tzinfo=None) | |
| end_naive = end_dt.replace(tzinfo=None) + timedelta(days=1) # Add buffer | |
| df = ticker_obj.history(start=start_naive, end=end_naive, interval=interval, auto_adjust=True, prepost=False) | |
| except Exception as e: | |
| print(f"Start/end method failed: {e}") | |
| # Strategy 3: Fallback to daily data if intraday fails | |
| if (df is None or df.empty) and interval != "1d": | |
| print(f"Falling back to daily interval") | |
| try: | |
| start_naive = start_dt.replace(tzinfo=None) | |
| end_naive = end_dt.replace(tzinfo=None) + timedelta(days=1) | |
| df = ticker_obj.history(start=start_naive, end=end_naive, interval="1d", auto_adjust=True, prepost=False) | |
| except Exception as e: | |
| print(f"Daily fallback failed: {e}") | |
| # Check if we got data | |
| if df is None or df.empty: | |
| # Last resort: try with just period and no date filtering | |
| print(f"Last resort: fetching with period='1y' and interval='1d'") | |
| try: | |
| df = ticker_obj.history(period="1y", interval="1d", auto_adjust=True, prepost=False) | |
| if not df.empty: | |
| df = df[(df.index >= start_dt.replace(tzinfo=None)) & (df.index <= end_dt.replace(tzinfo=None))] | |
| except Exception as e: | |
| print(f"Last resort failed: {e}") | |
| print(f"Received {len(df) if df is not None else 0} rows from Yahoo Finance") | |
| if df is not None and len(df) > 0: | |
| print(f"First timestamp: {df.index[0]}") | |
| print(f"Last timestamp: {df.index[-1]}") | |
| print(f"Sample prices: {df['Close'].head(3).tolist()}") | |
| else: | |
| print(f"β οΈ WARNING: No data returned!") | |
| # Final check - if still empty, raise error | |
| if df is None or df.empty: | |
| raise ValueError( | |
| f"No data available for ticker {ticker}. " | |
| f"Symbol may be delisted, invalid, or data unavailable for the requested period. " | |
| f"Tried interval={interval}, period={period_days:.1f} days" | |
| ) | |
| # Convert to our format | |
| prices = [] | |
| for timestamp, row in df.iterrows(): | |
| prices.append({ | |
| "timestamp": int(timestamp.timestamp()), | |
| "price": float(row['Close']) | |
| }) | |
| print(f"β Successfully converted {len(prices)} data points") | |
| print(f"{'='*60}\n") | |
| return prices | |
| except Exception as e: | |
| print(f"β ERROR for {ticker}: {str(e)}") | |
| print(f"{'='*60}\n") | |
| raise Exception(f"Error fetching data for {ticker}: {str(e)}") | |
| def normalize_timeseries( | |
| self, | |
| timeseries: List[Dict], | |
| method: str = "min-max" | |
| ) -> List[Dict]: | |
| """ | |
| Normalize a time series | |
| Args: | |
| timeseries: List of {"timestamp": int, "price": float} | |
| method: Normalization method | |
| - "min-max": Scale to 0-1 range | |
| - "z-score": Standardize to mean=0, std=1 | |
| - "percent-change": Normalize to percent change from start | |
| Returns: | |
| Normalized timeseries with same structure | |
| """ | |
| if not timeseries: | |
| return [] | |
| prices = np.array([point['price'] for point in timeseries]) | |
| if method == "min-max": | |
| # Scale to 0-1 range | |
| min_val = np.min(prices) | |
| max_val = np.max(prices) | |
| if max_val - min_val == 0: | |
| normalized = np.zeros_like(prices) | |
| else: | |
| normalized = (prices - min_val) / (max_val - min_val) | |
| elif method == "z-score": | |
| # Standardize to mean=0, std=1 | |
| mean = np.mean(prices) | |
| std = np.std(prices) | |
| if std == 0: | |
| normalized = np.zeros_like(prices) | |
| else: | |
| normalized = (prices - mean) / std | |
| elif method == "percent-change": | |
| # Normalize to percent change from first value | |
| first_price = prices[0] | |
| if first_price == 0: | |
| normalized = np.zeros_like(prices) | |
| else: | |
| normalized = ((prices - first_price) / first_price) * 100 | |
| else: | |
| raise ValueError(f"Unknown normalization method: {method}") | |
| # Create normalized timeseries | |
| normalized_ts = [] | |
| for i, point in enumerate(timeseries): | |
| normalized_ts.append({ | |
| "timestamp": point['timestamp'], | |
| "price": float(normalized[i]), | |
| "original_price": point['price'] | |
| }) | |
| return normalized_ts | |
| def align_timeseries( | |
| self, | |
| ts1: List[Dict], | |
| ts2: List[Dict] | |
| ) -> Tuple[List[float], List[float]]: | |
| """ | |
| Align two time series so prices are comparable. | |
| We treat ts1 as the reference timeline and forward-fill ts2 onto ts1's | |
| timestamps. This avoids requiring exact matching timestamps (which is | |
| unrealistic across different data sources). | |
| Args: | |
| ts1: First time series (reference), list of {"timestamp": int, "price": float} | |
| ts2: Second time series, list of {"timestamp": int, "price": float} | |
| Returns: | |
| Tuple of (aligned_prices1, aligned_prices2) | |
| """ | |
| if not ts1 or not ts2: | |
| raise ValueError("One or both time series are empty") | |
| # Sort by timestamp just in case | |
| ts1_sorted = sorted(ts1, key=lambda x: x["timestamp"]) | |
| ts2_sorted = sorted(ts2, key=lambda x: x["timestamp"]) | |
| print(f"\n--- ALIGNMENT PROCESS ---") | |
| print(f"Polymarket (ts1): {len(ts1_sorted)} points") | |
| print(f" Range: {datetime.fromtimestamp(ts1_sorted[0]['timestamp'])} to {datetime.fromtimestamp(ts1_sorted[-1]['timestamp'])}") | |
| print(f"Asset (ts2): {len(ts2_sorted)} points") | |
| print(f" Range: {datetime.fromtimestamp(ts2_sorted[0]['timestamp'])} to {datetime.fromtimestamp(ts2_sorted[-1]['timestamp'])}") | |
| aligned1: List[float] = [] | |
| aligned2: List[float] = [] | |
| # Only include Polymarket points where we have recent stock data | |
| # This creates distinct segments for each trading day instead of forward-filling 24/7 | |
| # We'll use a 2-hour window to match stock data to Polymarket timestamps | |
| max_gap_seconds = 2 * 3600 # 2 hours - enough to cover after-hours but not all night | |
| for point1 in ts1_sorted: | |
| t1 = point1["timestamp"] | |
| # Find the closest stock price to this Polymarket timestamp | |
| closest_price = None | |
| closest_time_diff = float('inf') | |
| for point2 in ts2_sorted: | |
| t2 = point2["timestamp"] | |
| time_diff = abs(t1 - t2) | |
| # Only consider stock prices within our time window | |
| if time_diff <= max_gap_seconds and time_diff < closest_time_diff: | |
| closest_price = point2["price"] | |
| closest_time_diff = time_diff | |
| # Only include this point if we found a close enough stock price | |
| if closest_price is not None: | |
| aligned1.append(point1["price"]) | |
| aligned2.append(closest_price) | |
| print(f"After alignment: {len(aligned1)} overlapping points (out of {len(ts1_sorted)} Polymarket points)") | |
| print(f" Coverage: {len(aligned1) / len(ts1_sorted) * 100:.1f}% of Polymarket timeline") | |
| if len(aligned1) > 0: | |
| # Find actual first and last timestamps | |
| matched_indices = [] | |
| matched_count = 0 | |
| for i, point1 in enumerate(ts1_sorted): | |
| if matched_count < len(aligned1): | |
| t1 = point1["timestamp"] | |
| # Check if this point was matched | |
| for point2 in ts2_sorted: | |
| if abs(t1 - point2["timestamp"]) <= max_gap_seconds: | |
| matched_indices.append(i) | |
| matched_count += 1 | |
| break | |
| if matched_indices: | |
| print(f" First aligned point: {datetime.fromtimestamp(ts1_sorted[matched_indices[0]]['timestamp'])}") | |
| print(f" Last aligned point: {datetime.fromtimestamp(ts1_sorted[matched_indices[-1]]['timestamp'])}") | |
| print(f"--- END ALIGNMENT ---\n") | |
| if len(aligned1) < 2: | |
| raise ValueError("Insufficient overlapping data between time series") | |
| return aligned1, aligned2 | |
| def build_chart_series( | |
| self, | |
| reference_series: List[Dict], | |
| target_series: List[Dict], | |
| tolerance_seconds: int = 2 * 3600 | |
| ) -> List[Dict]: | |
| """ | |
| Build a chart-ready series aligned to the reference timestamps. | |
| We only include target prices that are within `tolerance_seconds` of a reference | |
| timestamp. This creates distinct segments that match actual trading hours. | |
| Args: | |
| reference_series: Reference timeline (typically Polymarket) | |
| target_series: Target asset series | |
| tolerance_seconds: Maximum time difference to consider a match | |
| Returns: | |
| List of {"timestamp": int, "price": Optional[float]} | |
| """ | |
| if not reference_series: | |
| return [] | |
| ref_sorted = sorted(reference_series, key=lambda x: x["timestamp"]) | |
| tgt_sorted = sorted(target_series, key=lambda x: x["timestamp"]) | |
| result = [] | |
| tgt_idx = 0 | |
| for ref_point in ref_sorted: | |
| t_ref = ref_point["timestamp"] | |
| closest_price = None | |
| closest_diff = tolerance_seconds + 1 | |
| # Move pointer to relevant window | |
| while tgt_idx < len(tgt_sorted) and tgt_sorted[tgt_idx]["timestamp"] < t_ref - tolerance_seconds: | |
| tgt_idx += 1 | |
| # Check nearby target points (current and previous one) | |
| for idx in [tgt_idx - 1, tgt_idx, tgt_idx + 1]: | |
| if 0 <= idx < len(tgt_sorted): | |
| t_target = tgt_sorted[idx]["timestamp"] | |
| diff = abs(t_target - t_ref) | |
| if diff <= tolerance_seconds and diff < closest_diff: | |
| closest_diff = diff | |
| closest_price = tgt_sorted[idx]["price"] | |
| result.append({ | |
| "timestamp": t_ref, | |
| "price": closest_price | |
| }) | |
| return result | |
| def calculate_correlation( | |
| self, | |
| prices1: List[float], | |
| prices2: List[float] | |
| ) -> float: | |
| """ | |
| Calculate Pearson correlation coefficient between two price series | |
| Args: | |
| prices1: First price series | |
| prices2: Second price series | |
| Returns: | |
| Correlation coefficient (-1 to 1) | |
| """ | |
| if len(prices1) != len(prices2): | |
| raise ValueError("Price series must have same length") | |
| if len(prices1) < 2: | |
| raise ValueError("Need at least 2 data points to calculate correlation") | |
| # Calculate Pearson correlation | |
| correlation = np.corrcoef(prices1, prices2)[0, 1] | |
| # Handle NaN (can occur if one series is constant) | |
| if np.isnan(correlation): | |
| return 0.0 | |
| return float(correlation) | |
| def calculate_correlation_between_series( | |
| self, | |
| ts1: List[Dict], | |
| ts2: List[Dict] | |
| ) -> float: | |
| """ | |
| Calculate correlation between two time series | |
| Args: | |
| ts1: First time series | |
| ts2: Second time series | |
| Returns: | |
| Correlation coefficient (-1 to 1) | |
| """ | |
| # Align the time series | |
| aligned1, aligned2 = self.align_timeseries(ts1, ts2) | |
| # Calculate correlation | |
| return self.calculate_correlation(aligned1, aligned2) | |
| def analyze_multiple_assets( | |
| self, | |
| polymarket_data: List[Dict], | |
| assets: List[Dict], | |
| start_unix: int, | |
| end_unix: int, | |
| interval: str = "1h" | |
| ) -> Dict: | |
| """ | |
| Fetch and analyze multiple assets against Polymarket data | |
| Args: | |
| polymarket_data: Polymarket time series | |
| assets: List of assets with ticker and reason | |
| [{"ticker": "AAPL", "reason": "..."}, ...] | |
| start_unix: Start time | |
| end_unix: End time | |
| interval: Time interval for asset data | |
| Returns: | |
| Dictionary with analysis results for each asset | |
| """ | |
| results = [] | |
| print(f"\n{'#'*60}") | |
| print(f"ANALYZING {len(assets)} ASSETS") | |
| print(f"Polymarket data: {len(polymarket_data)} points") | |
| print(f"Time range: {datetime.fromtimestamp(start_unix)} to {datetime.fromtimestamp(end_unix)}") | |
| print(f"Interval: {interval}") | |
| print(f"{'#'*60}\n") | |
| for asset in assets: | |
| ticker = asset['ticker'] | |
| try: | |
| # Fetch asset prices | |
| asset_prices = self.fetch_asset_prices( | |
| ticker, | |
| start_unix, | |
| end_unix, | |
| interval | |
| ) | |
| # Normalize both series | |
| norm_polymarket = self.normalize_timeseries(polymarket_data, "min-max") | |
| norm_asset = self.normalize_timeseries(asset_prices, "min-max") | |
| # Build chart-aligned series (for gaps) | |
| aligned_raw = self.build_chart_series(polymarket_data, asset_prices) | |
| aligned_norm = self.build_chart_series(norm_polymarket, norm_asset) | |
| # Calculate correlation using aligned normalized series (filter out None) | |
| print(f"Calculating correlation for {ticker}...") | |
| aligned_prices_polymarket, aligned_prices_asset = self.align_timeseries( | |
| norm_polymarket, | |
| norm_asset | |
| ) | |
| correlation = self.calculate_correlation( | |
| aligned_prices_polymarket, | |
| aligned_prices_asset | |
| ) | |
| print(f"β {ticker} correlation: {correlation:.4f}") | |
| results.append({ | |
| "ticker": ticker, | |
| "reason": asset.get('reason', ''), | |
| "correlation": correlation, | |
| "data_points": len(asset_prices), | |
| "timeseries": asset_prices, | |
| "normalized_timeseries": norm_asset, | |
| "aligned_timeseries": aligned_raw, | |
| "aligned_normalized_timeseries": aligned_norm, | |
| "success": True | |
| }) | |
| except Exception as e: | |
| results.append({ | |
| "ticker": ticker, | |
| "reason": asset.get('reason', ''), | |
| "error": str(e), | |
| "success": False | |
| }) | |
| return results | |
| # Example usage | |
| if __name__ == "__main__": | |
| analyzer = DataAnalyzer() | |
| # Example: Fetch and analyze AAPL | |
| from time_utils import TimeConverter | |
| converter = TimeConverter() | |
| # Use a date range that's more likely to have data (last week) | |
| import time | |
| from datetime import datetime, timedelta | |
| # Get timestamps for last week | |
| end_time = datetime.now() | |
| start_time = end_time - timedelta(days=7) | |
| start_unix = int(start_time.timestamp()) | |
| end_unix = int(end_time.timestamp()) | |
| try: | |
| print(f"Testing data fetch for last 7 days...") | |
| print(f"Start: {start_time}") | |
| print(f"End: {end_time}\n") | |
| print("Fetching AAPL data...") | |
| prices = analyzer.fetch_asset_prices("AAPL", start_unix, end_unix, "1h") | |
| print(f"β Fetched {len(prices)} data points") | |
| print("\nNormalizing data...") | |
| normalized = analyzer.normalize_timeseries(prices, "min-max") | |
| print(f"β First 3 normalized points:") | |
| for point in normalized[:3]: | |
| dt = datetime.fromtimestamp(point['timestamp']) | |
| print(f" {dt}: {point['price']:.4f} (original: {point['original_price']:.2f})") | |
| except Exception as e: | |
| print(f"β Error: {str(e)}") |