polycorr-backend / data_analysis.py
dhruv575
yf version issue
f890a6c
"""
Data Analysis Tools
Handles fetching asset prices, normalization, and correlation calculation
"""
import yfinance as yf
import numpy as np
import pandas as pd
from typing import Dict, List, Tuple
from datetime import datetime, timedelta, timezone
import pytz
class DataAnalyzer:
"""Class to handle data normalization and correlation analysis"""
def __init__(self):
pass
def fetch_asset_prices(
self,
ticker: str,
start_unix: int,
end_unix: int,
interval: str = "1m"
) -> List[Dict]:
"""
Fetch asset prices from Yahoo Finance
Args:
ticker: Stock/ETF ticker symbol
start_unix: Start time as Unix timestamp
end_unix: End time as Unix timestamp
interval: Time interval (1m, 5m, 15m, 30m, 1h, 1d)
Note: 1m data only available for last 7 days
Returns:
List of price points:
[
{"timestamp": 1234567890, "price": 150.25},
...
]
"""
try:
# Convert Unix timestamps to timezone-aware datetime (UTC)
start_dt = datetime.fromtimestamp(start_unix, tz=timezone.utc)
end_dt = datetime.fromtimestamp(end_unix, tz=timezone.utc)
print(f"\n{'='*60}")
print(f"FETCHING DATA FOR {ticker}")
print(f"{'='*60}")
print(f"Requested range: {start_dt} to {end_dt}")
print(f"Requested interval: {interval}")
# Calculate time difference
time_diff = end_dt - start_dt
period_days = time_diff.days + (time_diff.seconds / 86400)
print(f"Time range: {period_days:.2f} days")
# Adjust interval based on data availability limits
original_interval = interval
if interval in ["1m", "2m", "5m"] and period_days > 7:
interval = "15m"
print(f"β†’ Adjusted from {original_interval} to {interval} (minute data limited to 7 days)")
if interval == "15m" and period_days > 60:
interval = "1h"
print(f"β†’ Adjusted from {original_interval} to {interval} (15m data limited to 60 days)")
if interval == "1h" and period_days > 730:
interval = "1d"
print(f"β†’ Adjusted from {original_interval} to {interval} (hourly data limited to ~2 years)")
print(f"Final interval: {interval}")
# Fetch data from Yahoo Finance
ticker_obj = yf.Ticker(ticker)
print(f"Calling yfinance history()...")
df = None
# Strategy 1: Use period parameter (more reliable for intraday data)
if interval in ["1m", "2m", "5m", "15m", "30m", "1h"]:
# Determine appropriate period
if period_days <= 1:
period = "1d"
elif period_days <= 5:
period = "5d"
elif period_days <= 30:
period = "1mo"
elif period_days <= 90:
period = "3mo"
else:
period = "1y"
print(f"Trying period='{period}' with interval='{interval}'")
try:
df = ticker_obj.history(period=period, interval=interval, auto_adjust=True, prepost=False)
# Filter to requested date range
if not df.empty:
df = df[(df.index >= start_dt) & (df.index <= end_dt)]
except Exception as e:
print(f"Period method failed: {e}")
# Strategy 2: Use start/end dates (better for daily+ intervals)
if df is None or df.empty:
print(f"Trying start/end dates with interval='{interval}'")
try:
# Convert to naive datetime for yfinance (it handles timezone internally)
start_naive = start_dt.replace(tzinfo=None)
end_naive = end_dt.replace(tzinfo=None) + timedelta(days=1) # Add buffer
df = ticker_obj.history(start=start_naive, end=end_naive, interval=interval, auto_adjust=True, prepost=False)
except Exception as e:
print(f"Start/end method failed: {e}")
# Strategy 3: Fallback to daily data if intraday fails
if (df is None or df.empty) and interval != "1d":
print(f"Falling back to daily interval")
try:
start_naive = start_dt.replace(tzinfo=None)
end_naive = end_dt.replace(tzinfo=None) + timedelta(days=1)
df = ticker_obj.history(start=start_naive, end=end_naive, interval="1d", auto_adjust=True, prepost=False)
except Exception as e:
print(f"Daily fallback failed: {e}")
# Check if we got data
if df is None or df.empty:
# Last resort: try with just period and no date filtering
print(f"Last resort: fetching with period='1y' and interval='1d'")
try:
df = ticker_obj.history(period="1y", interval="1d", auto_adjust=True, prepost=False)
if not df.empty:
df = df[(df.index >= start_dt.replace(tzinfo=None)) & (df.index <= end_dt.replace(tzinfo=None))]
except Exception as e:
print(f"Last resort failed: {e}")
print(f"Received {len(df) if df is not None else 0} rows from Yahoo Finance")
if df is not None and len(df) > 0:
print(f"First timestamp: {df.index[0]}")
print(f"Last timestamp: {df.index[-1]}")
print(f"Sample prices: {df['Close'].head(3).tolist()}")
else:
print(f"⚠️ WARNING: No data returned!")
# Final check - if still empty, raise error
if df is None or df.empty:
raise ValueError(
f"No data available for ticker {ticker}. "
f"Symbol may be delisted, invalid, or data unavailable for the requested period. "
f"Tried interval={interval}, period={period_days:.1f} days"
)
# Convert to our format
prices = []
for timestamp, row in df.iterrows():
prices.append({
"timestamp": int(timestamp.timestamp()),
"price": float(row['Close'])
})
print(f"βœ“ Successfully converted {len(prices)} data points")
print(f"{'='*60}\n")
return prices
except Exception as e:
print(f"βœ— ERROR for {ticker}: {str(e)}")
print(f"{'='*60}\n")
raise Exception(f"Error fetching data for {ticker}: {str(e)}")
def normalize_timeseries(
self,
timeseries: List[Dict],
method: str = "min-max"
) -> List[Dict]:
"""
Normalize a time series
Args:
timeseries: List of {"timestamp": int, "price": float}
method: Normalization method
- "min-max": Scale to 0-1 range
- "z-score": Standardize to mean=0, std=1
- "percent-change": Normalize to percent change from start
Returns:
Normalized timeseries with same structure
"""
if not timeseries:
return []
prices = np.array([point['price'] for point in timeseries])
if method == "min-max":
# Scale to 0-1 range
min_val = np.min(prices)
max_val = np.max(prices)
if max_val - min_val == 0:
normalized = np.zeros_like(prices)
else:
normalized = (prices - min_val) / (max_val - min_val)
elif method == "z-score":
# Standardize to mean=0, std=1
mean = np.mean(prices)
std = np.std(prices)
if std == 0:
normalized = np.zeros_like(prices)
else:
normalized = (prices - mean) / std
elif method == "percent-change":
# Normalize to percent change from first value
first_price = prices[0]
if first_price == 0:
normalized = np.zeros_like(prices)
else:
normalized = ((prices - first_price) / first_price) * 100
else:
raise ValueError(f"Unknown normalization method: {method}")
# Create normalized timeseries
normalized_ts = []
for i, point in enumerate(timeseries):
normalized_ts.append({
"timestamp": point['timestamp'],
"price": float(normalized[i]),
"original_price": point['price']
})
return normalized_ts
def align_timeseries(
self,
ts1: List[Dict],
ts2: List[Dict]
) -> Tuple[List[float], List[float]]:
"""
Align two time series so prices are comparable.
We treat ts1 as the reference timeline and forward-fill ts2 onto ts1's
timestamps. This avoids requiring exact matching timestamps (which is
unrealistic across different data sources).
Args:
ts1: First time series (reference), list of {"timestamp": int, "price": float}
ts2: Second time series, list of {"timestamp": int, "price": float}
Returns:
Tuple of (aligned_prices1, aligned_prices2)
"""
if not ts1 or not ts2:
raise ValueError("One or both time series are empty")
# Sort by timestamp just in case
ts1_sorted = sorted(ts1, key=lambda x: x["timestamp"])
ts2_sorted = sorted(ts2, key=lambda x: x["timestamp"])
print(f"\n--- ALIGNMENT PROCESS ---")
print(f"Polymarket (ts1): {len(ts1_sorted)} points")
print(f" Range: {datetime.fromtimestamp(ts1_sorted[0]['timestamp'])} to {datetime.fromtimestamp(ts1_sorted[-1]['timestamp'])}")
print(f"Asset (ts2): {len(ts2_sorted)} points")
print(f" Range: {datetime.fromtimestamp(ts2_sorted[0]['timestamp'])} to {datetime.fromtimestamp(ts2_sorted[-1]['timestamp'])}")
aligned1: List[float] = []
aligned2: List[float] = []
# Only include Polymarket points where we have recent stock data
# This creates distinct segments for each trading day instead of forward-filling 24/7
# We'll use a 2-hour window to match stock data to Polymarket timestamps
max_gap_seconds = 2 * 3600 # 2 hours - enough to cover after-hours but not all night
for point1 in ts1_sorted:
t1 = point1["timestamp"]
# Find the closest stock price to this Polymarket timestamp
closest_price = None
closest_time_diff = float('inf')
for point2 in ts2_sorted:
t2 = point2["timestamp"]
time_diff = abs(t1 - t2)
# Only consider stock prices within our time window
if time_diff <= max_gap_seconds and time_diff < closest_time_diff:
closest_price = point2["price"]
closest_time_diff = time_diff
# Only include this point if we found a close enough stock price
if closest_price is not None:
aligned1.append(point1["price"])
aligned2.append(closest_price)
print(f"After alignment: {len(aligned1)} overlapping points (out of {len(ts1_sorted)} Polymarket points)")
print(f" Coverage: {len(aligned1) / len(ts1_sorted) * 100:.1f}% of Polymarket timeline")
if len(aligned1) > 0:
# Find actual first and last timestamps
matched_indices = []
matched_count = 0
for i, point1 in enumerate(ts1_sorted):
if matched_count < len(aligned1):
t1 = point1["timestamp"]
# Check if this point was matched
for point2 in ts2_sorted:
if abs(t1 - point2["timestamp"]) <= max_gap_seconds:
matched_indices.append(i)
matched_count += 1
break
if matched_indices:
print(f" First aligned point: {datetime.fromtimestamp(ts1_sorted[matched_indices[0]]['timestamp'])}")
print(f" Last aligned point: {datetime.fromtimestamp(ts1_sorted[matched_indices[-1]]['timestamp'])}")
print(f"--- END ALIGNMENT ---\n")
if len(aligned1) < 2:
raise ValueError("Insufficient overlapping data between time series")
return aligned1, aligned2
def build_chart_series(
self,
reference_series: List[Dict],
target_series: List[Dict],
tolerance_seconds: int = 2 * 3600
) -> List[Dict]:
"""
Build a chart-ready series aligned to the reference timestamps.
We only include target prices that are within `tolerance_seconds` of a reference
timestamp. This creates distinct segments that match actual trading hours.
Args:
reference_series: Reference timeline (typically Polymarket)
target_series: Target asset series
tolerance_seconds: Maximum time difference to consider a match
Returns:
List of {"timestamp": int, "price": Optional[float]}
"""
if not reference_series:
return []
ref_sorted = sorted(reference_series, key=lambda x: x["timestamp"])
tgt_sorted = sorted(target_series, key=lambda x: x["timestamp"])
result = []
tgt_idx = 0
for ref_point in ref_sorted:
t_ref = ref_point["timestamp"]
closest_price = None
closest_diff = tolerance_seconds + 1
# Move pointer to relevant window
while tgt_idx < len(tgt_sorted) and tgt_sorted[tgt_idx]["timestamp"] < t_ref - tolerance_seconds:
tgt_idx += 1
# Check nearby target points (current and previous one)
for idx in [tgt_idx - 1, tgt_idx, tgt_idx + 1]:
if 0 <= idx < len(tgt_sorted):
t_target = tgt_sorted[idx]["timestamp"]
diff = abs(t_target - t_ref)
if diff <= tolerance_seconds and diff < closest_diff:
closest_diff = diff
closest_price = tgt_sorted[idx]["price"]
result.append({
"timestamp": t_ref,
"price": closest_price
})
return result
def calculate_correlation(
self,
prices1: List[float],
prices2: List[float]
) -> float:
"""
Calculate Pearson correlation coefficient between two price series
Args:
prices1: First price series
prices2: Second price series
Returns:
Correlation coefficient (-1 to 1)
"""
if len(prices1) != len(prices2):
raise ValueError("Price series must have same length")
if len(prices1) < 2:
raise ValueError("Need at least 2 data points to calculate correlation")
# Calculate Pearson correlation
correlation = np.corrcoef(prices1, prices2)[0, 1]
# Handle NaN (can occur if one series is constant)
if np.isnan(correlation):
return 0.0
return float(correlation)
def calculate_correlation_between_series(
self,
ts1: List[Dict],
ts2: List[Dict]
) -> float:
"""
Calculate correlation between two time series
Args:
ts1: First time series
ts2: Second time series
Returns:
Correlation coefficient (-1 to 1)
"""
# Align the time series
aligned1, aligned2 = self.align_timeseries(ts1, ts2)
# Calculate correlation
return self.calculate_correlation(aligned1, aligned2)
def analyze_multiple_assets(
self,
polymarket_data: List[Dict],
assets: List[Dict],
start_unix: int,
end_unix: int,
interval: str = "1h"
) -> Dict:
"""
Fetch and analyze multiple assets against Polymarket data
Args:
polymarket_data: Polymarket time series
assets: List of assets with ticker and reason
[{"ticker": "AAPL", "reason": "..."}, ...]
start_unix: Start time
end_unix: End time
interval: Time interval for asset data
Returns:
Dictionary with analysis results for each asset
"""
results = []
print(f"\n{'#'*60}")
print(f"ANALYZING {len(assets)} ASSETS")
print(f"Polymarket data: {len(polymarket_data)} points")
print(f"Time range: {datetime.fromtimestamp(start_unix)} to {datetime.fromtimestamp(end_unix)}")
print(f"Interval: {interval}")
print(f"{'#'*60}\n")
for asset in assets:
ticker = asset['ticker']
try:
# Fetch asset prices
asset_prices = self.fetch_asset_prices(
ticker,
start_unix,
end_unix,
interval
)
# Normalize both series
norm_polymarket = self.normalize_timeseries(polymarket_data, "min-max")
norm_asset = self.normalize_timeseries(asset_prices, "min-max")
# Build chart-aligned series (for gaps)
aligned_raw = self.build_chart_series(polymarket_data, asset_prices)
aligned_norm = self.build_chart_series(norm_polymarket, norm_asset)
# Calculate correlation using aligned normalized series (filter out None)
print(f"Calculating correlation for {ticker}...")
aligned_prices_polymarket, aligned_prices_asset = self.align_timeseries(
norm_polymarket,
norm_asset
)
correlation = self.calculate_correlation(
aligned_prices_polymarket,
aligned_prices_asset
)
print(f"βœ“ {ticker} correlation: {correlation:.4f}")
results.append({
"ticker": ticker,
"reason": asset.get('reason', ''),
"correlation": correlation,
"data_points": len(asset_prices),
"timeseries": asset_prices,
"normalized_timeseries": norm_asset,
"aligned_timeseries": aligned_raw,
"aligned_normalized_timeseries": aligned_norm,
"success": True
})
except Exception as e:
results.append({
"ticker": ticker,
"reason": asset.get('reason', ''),
"error": str(e),
"success": False
})
return results
# Example usage
if __name__ == "__main__":
analyzer = DataAnalyzer()
# Example: Fetch and analyze AAPL
from time_utils import TimeConverter
converter = TimeConverter()
# Use a date range that's more likely to have data (last week)
import time
from datetime import datetime, timedelta
# Get timestamps for last week
end_time = datetime.now()
start_time = end_time - timedelta(days=7)
start_unix = int(start_time.timestamp())
end_unix = int(end_time.timestamp())
try:
print(f"Testing data fetch for last 7 days...")
print(f"Start: {start_time}")
print(f"End: {end_time}\n")
print("Fetching AAPL data...")
prices = analyzer.fetch_asset_prices("AAPL", start_unix, end_unix, "1h")
print(f"βœ“ Fetched {len(prices)} data points")
print("\nNormalizing data...")
normalized = analyzer.normalize_timeseries(prices, "min-max")
print(f"βœ“ First 3 normalized points:")
for point in normalized[:3]:
dt = datetime.fromtimestamp(point['timestamp'])
print(f" {dt}: {point['price']:.4f} (original: {point['original_price']:.2f})")
except Exception as e:
print(f"βœ— Error: {str(e)}")