Spaces:
Sleeping
Sleeping
File size: 21,252 Bytes
5bde3f1 f890a6c 5bde3f1 f890a6c 5bde3f1 f890a6c 5bde3f1 f890a6c 5bde3f1 f890a6c 5bde3f1 f890a6c 5bde3f1 f890a6c 5bde3f1 f890a6c 5bde3f1 f890a6c 5bde3f1 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 | """
Data Analysis Tools
Handles fetching asset prices, normalization, and correlation calculation
"""
import yfinance as yf
import numpy as np
import pandas as pd
from typing import Dict, List, Tuple
from datetime import datetime, timedelta, timezone
import pytz
class DataAnalyzer:
"""Class to handle data normalization and correlation analysis"""
def __init__(self):
pass
def fetch_asset_prices(
self,
ticker: str,
start_unix: int,
end_unix: int,
interval: str = "1m"
) -> List[Dict]:
"""
Fetch asset prices from Yahoo Finance
Args:
ticker: Stock/ETF ticker symbol
start_unix: Start time as Unix timestamp
end_unix: End time as Unix timestamp
interval: Time interval (1m, 5m, 15m, 30m, 1h, 1d)
Note: 1m data only available for last 7 days
Returns:
List of price points:
[
{"timestamp": 1234567890, "price": 150.25},
...
]
"""
try:
# Convert Unix timestamps to timezone-aware datetime (UTC)
start_dt = datetime.fromtimestamp(start_unix, tz=timezone.utc)
end_dt = datetime.fromtimestamp(end_unix, tz=timezone.utc)
print(f"\n{'='*60}")
print(f"FETCHING DATA FOR {ticker}")
print(f"{'='*60}")
print(f"Requested range: {start_dt} to {end_dt}")
print(f"Requested interval: {interval}")
# Calculate time difference
time_diff = end_dt - start_dt
period_days = time_diff.days + (time_diff.seconds / 86400)
print(f"Time range: {period_days:.2f} days")
# Adjust interval based on data availability limits
original_interval = interval
if interval in ["1m", "2m", "5m"] and period_days > 7:
interval = "15m"
print(f"β Adjusted from {original_interval} to {interval} (minute data limited to 7 days)")
if interval == "15m" and period_days > 60:
interval = "1h"
print(f"β Adjusted from {original_interval} to {interval} (15m data limited to 60 days)")
if interval == "1h" and period_days > 730:
interval = "1d"
print(f"β Adjusted from {original_interval} to {interval} (hourly data limited to ~2 years)")
print(f"Final interval: {interval}")
# Fetch data from Yahoo Finance
ticker_obj = yf.Ticker(ticker)
print(f"Calling yfinance history()...")
df = None
# Strategy 1: Use period parameter (more reliable for intraday data)
if interval in ["1m", "2m", "5m", "15m", "30m", "1h"]:
# Determine appropriate period
if period_days <= 1:
period = "1d"
elif period_days <= 5:
period = "5d"
elif period_days <= 30:
period = "1mo"
elif period_days <= 90:
period = "3mo"
else:
period = "1y"
print(f"Trying period='{period}' with interval='{interval}'")
try:
df = ticker_obj.history(period=period, interval=interval, auto_adjust=True, prepost=False)
# Filter to requested date range
if not df.empty:
df = df[(df.index >= start_dt) & (df.index <= end_dt)]
except Exception as e:
print(f"Period method failed: {e}")
# Strategy 2: Use start/end dates (better for daily+ intervals)
if df is None or df.empty:
print(f"Trying start/end dates with interval='{interval}'")
try:
# Convert to naive datetime for yfinance (it handles timezone internally)
start_naive = start_dt.replace(tzinfo=None)
end_naive = end_dt.replace(tzinfo=None) + timedelta(days=1) # Add buffer
df = ticker_obj.history(start=start_naive, end=end_naive, interval=interval, auto_adjust=True, prepost=False)
except Exception as e:
print(f"Start/end method failed: {e}")
# Strategy 3: Fallback to daily data if intraday fails
if (df is None or df.empty) and interval != "1d":
print(f"Falling back to daily interval")
try:
start_naive = start_dt.replace(tzinfo=None)
end_naive = end_dt.replace(tzinfo=None) + timedelta(days=1)
df = ticker_obj.history(start=start_naive, end=end_naive, interval="1d", auto_adjust=True, prepost=False)
except Exception as e:
print(f"Daily fallback failed: {e}")
# Check if we got data
if df is None or df.empty:
# Last resort: try with just period and no date filtering
print(f"Last resort: fetching with period='1y' and interval='1d'")
try:
df = ticker_obj.history(period="1y", interval="1d", auto_adjust=True, prepost=False)
if not df.empty:
df = df[(df.index >= start_dt.replace(tzinfo=None)) & (df.index <= end_dt.replace(tzinfo=None))]
except Exception as e:
print(f"Last resort failed: {e}")
print(f"Received {len(df) if df is not None else 0} rows from Yahoo Finance")
if df is not None and len(df) > 0:
print(f"First timestamp: {df.index[0]}")
print(f"Last timestamp: {df.index[-1]}")
print(f"Sample prices: {df['Close'].head(3).tolist()}")
else:
print(f"β οΈ WARNING: No data returned!")
# Final check - if still empty, raise error
if df is None or df.empty:
raise ValueError(
f"No data available for ticker {ticker}. "
f"Symbol may be delisted, invalid, or data unavailable for the requested period. "
f"Tried interval={interval}, period={period_days:.1f} days"
)
# Convert to our format
prices = []
for timestamp, row in df.iterrows():
prices.append({
"timestamp": int(timestamp.timestamp()),
"price": float(row['Close'])
})
print(f"β Successfully converted {len(prices)} data points")
print(f"{'='*60}\n")
return prices
except Exception as e:
print(f"β ERROR for {ticker}: {str(e)}")
print(f"{'='*60}\n")
raise Exception(f"Error fetching data for {ticker}: {str(e)}")
def normalize_timeseries(
self,
timeseries: List[Dict],
method: str = "min-max"
) -> List[Dict]:
"""
Normalize a time series
Args:
timeseries: List of {"timestamp": int, "price": float}
method: Normalization method
- "min-max": Scale to 0-1 range
- "z-score": Standardize to mean=0, std=1
- "percent-change": Normalize to percent change from start
Returns:
Normalized timeseries with same structure
"""
if not timeseries:
return []
prices = np.array([point['price'] for point in timeseries])
if method == "min-max":
# Scale to 0-1 range
min_val = np.min(prices)
max_val = np.max(prices)
if max_val - min_val == 0:
normalized = np.zeros_like(prices)
else:
normalized = (prices - min_val) / (max_val - min_val)
elif method == "z-score":
# Standardize to mean=0, std=1
mean = np.mean(prices)
std = np.std(prices)
if std == 0:
normalized = np.zeros_like(prices)
else:
normalized = (prices - mean) / std
elif method == "percent-change":
# Normalize to percent change from first value
first_price = prices[0]
if first_price == 0:
normalized = np.zeros_like(prices)
else:
normalized = ((prices - first_price) / first_price) * 100
else:
raise ValueError(f"Unknown normalization method: {method}")
# Create normalized timeseries
normalized_ts = []
for i, point in enumerate(timeseries):
normalized_ts.append({
"timestamp": point['timestamp'],
"price": float(normalized[i]),
"original_price": point['price']
})
return normalized_ts
def align_timeseries(
self,
ts1: List[Dict],
ts2: List[Dict]
) -> Tuple[List[float], List[float]]:
"""
Align two time series so prices are comparable.
We treat ts1 as the reference timeline and forward-fill ts2 onto ts1's
timestamps. This avoids requiring exact matching timestamps (which is
unrealistic across different data sources).
Args:
ts1: First time series (reference), list of {"timestamp": int, "price": float}
ts2: Second time series, list of {"timestamp": int, "price": float}
Returns:
Tuple of (aligned_prices1, aligned_prices2)
"""
if not ts1 or not ts2:
raise ValueError("One or both time series are empty")
# Sort by timestamp just in case
ts1_sorted = sorted(ts1, key=lambda x: x["timestamp"])
ts2_sorted = sorted(ts2, key=lambda x: x["timestamp"])
print(f"\n--- ALIGNMENT PROCESS ---")
print(f"Polymarket (ts1): {len(ts1_sorted)} points")
print(f" Range: {datetime.fromtimestamp(ts1_sorted[0]['timestamp'])} to {datetime.fromtimestamp(ts1_sorted[-1]['timestamp'])}")
print(f"Asset (ts2): {len(ts2_sorted)} points")
print(f" Range: {datetime.fromtimestamp(ts2_sorted[0]['timestamp'])} to {datetime.fromtimestamp(ts2_sorted[-1]['timestamp'])}")
aligned1: List[float] = []
aligned2: List[float] = []
# Only include Polymarket points where we have recent stock data
# This creates distinct segments for each trading day instead of forward-filling 24/7
# We'll use a 2-hour window to match stock data to Polymarket timestamps
max_gap_seconds = 2 * 3600 # 2 hours - enough to cover after-hours but not all night
for point1 in ts1_sorted:
t1 = point1["timestamp"]
# Find the closest stock price to this Polymarket timestamp
closest_price = None
closest_time_diff = float('inf')
for point2 in ts2_sorted:
t2 = point2["timestamp"]
time_diff = abs(t1 - t2)
# Only consider stock prices within our time window
if time_diff <= max_gap_seconds and time_diff < closest_time_diff:
closest_price = point2["price"]
closest_time_diff = time_diff
# Only include this point if we found a close enough stock price
if closest_price is not None:
aligned1.append(point1["price"])
aligned2.append(closest_price)
print(f"After alignment: {len(aligned1)} overlapping points (out of {len(ts1_sorted)} Polymarket points)")
print(f" Coverage: {len(aligned1) / len(ts1_sorted) * 100:.1f}% of Polymarket timeline")
if len(aligned1) > 0:
# Find actual first and last timestamps
matched_indices = []
matched_count = 0
for i, point1 in enumerate(ts1_sorted):
if matched_count < len(aligned1):
t1 = point1["timestamp"]
# Check if this point was matched
for point2 in ts2_sorted:
if abs(t1 - point2["timestamp"]) <= max_gap_seconds:
matched_indices.append(i)
matched_count += 1
break
if matched_indices:
print(f" First aligned point: {datetime.fromtimestamp(ts1_sorted[matched_indices[0]]['timestamp'])}")
print(f" Last aligned point: {datetime.fromtimestamp(ts1_sorted[matched_indices[-1]]['timestamp'])}")
print(f"--- END ALIGNMENT ---\n")
if len(aligned1) < 2:
raise ValueError("Insufficient overlapping data between time series")
return aligned1, aligned2
def build_chart_series(
self,
reference_series: List[Dict],
target_series: List[Dict],
tolerance_seconds: int = 2 * 3600
) -> List[Dict]:
"""
Build a chart-ready series aligned to the reference timestamps.
We only include target prices that are within `tolerance_seconds` of a reference
timestamp. This creates distinct segments that match actual trading hours.
Args:
reference_series: Reference timeline (typically Polymarket)
target_series: Target asset series
tolerance_seconds: Maximum time difference to consider a match
Returns:
List of {"timestamp": int, "price": Optional[float]}
"""
if not reference_series:
return []
ref_sorted = sorted(reference_series, key=lambda x: x["timestamp"])
tgt_sorted = sorted(target_series, key=lambda x: x["timestamp"])
result = []
tgt_idx = 0
for ref_point in ref_sorted:
t_ref = ref_point["timestamp"]
closest_price = None
closest_diff = tolerance_seconds + 1
# Move pointer to relevant window
while tgt_idx < len(tgt_sorted) and tgt_sorted[tgt_idx]["timestamp"] < t_ref - tolerance_seconds:
tgt_idx += 1
# Check nearby target points (current and previous one)
for idx in [tgt_idx - 1, tgt_idx, tgt_idx + 1]:
if 0 <= idx < len(tgt_sorted):
t_target = tgt_sorted[idx]["timestamp"]
diff = abs(t_target - t_ref)
if diff <= tolerance_seconds and diff < closest_diff:
closest_diff = diff
closest_price = tgt_sorted[idx]["price"]
result.append({
"timestamp": t_ref,
"price": closest_price
})
return result
def calculate_correlation(
self,
prices1: List[float],
prices2: List[float]
) -> float:
"""
Calculate Pearson correlation coefficient between two price series
Args:
prices1: First price series
prices2: Second price series
Returns:
Correlation coefficient (-1 to 1)
"""
if len(prices1) != len(prices2):
raise ValueError("Price series must have same length")
if len(prices1) < 2:
raise ValueError("Need at least 2 data points to calculate correlation")
# Calculate Pearson correlation
correlation = np.corrcoef(prices1, prices2)[0, 1]
# Handle NaN (can occur if one series is constant)
if np.isnan(correlation):
return 0.0
return float(correlation)
def calculate_correlation_between_series(
self,
ts1: List[Dict],
ts2: List[Dict]
) -> float:
"""
Calculate correlation between two time series
Args:
ts1: First time series
ts2: Second time series
Returns:
Correlation coefficient (-1 to 1)
"""
# Align the time series
aligned1, aligned2 = self.align_timeseries(ts1, ts2)
# Calculate correlation
return self.calculate_correlation(aligned1, aligned2)
def analyze_multiple_assets(
self,
polymarket_data: List[Dict],
assets: List[Dict],
start_unix: int,
end_unix: int,
interval: str = "1h"
) -> Dict:
"""
Fetch and analyze multiple assets against Polymarket data
Args:
polymarket_data: Polymarket time series
assets: List of assets with ticker and reason
[{"ticker": "AAPL", "reason": "..."}, ...]
start_unix: Start time
end_unix: End time
interval: Time interval for asset data
Returns:
Dictionary with analysis results for each asset
"""
results = []
print(f"\n{'#'*60}")
print(f"ANALYZING {len(assets)} ASSETS")
print(f"Polymarket data: {len(polymarket_data)} points")
print(f"Time range: {datetime.fromtimestamp(start_unix)} to {datetime.fromtimestamp(end_unix)}")
print(f"Interval: {interval}")
print(f"{'#'*60}\n")
for asset in assets:
ticker = asset['ticker']
try:
# Fetch asset prices
asset_prices = self.fetch_asset_prices(
ticker,
start_unix,
end_unix,
interval
)
# Normalize both series
norm_polymarket = self.normalize_timeseries(polymarket_data, "min-max")
norm_asset = self.normalize_timeseries(asset_prices, "min-max")
# Build chart-aligned series (for gaps)
aligned_raw = self.build_chart_series(polymarket_data, asset_prices)
aligned_norm = self.build_chart_series(norm_polymarket, norm_asset)
# Calculate correlation using aligned normalized series (filter out None)
print(f"Calculating correlation for {ticker}...")
aligned_prices_polymarket, aligned_prices_asset = self.align_timeseries(
norm_polymarket,
norm_asset
)
correlation = self.calculate_correlation(
aligned_prices_polymarket,
aligned_prices_asset
)
print(f"β {ticker} correlation: {correlation:.4f}")
results.append({
"ticker": ticker,
"reason": asset.get('reason', ''),
"correlation": correlation,
"data_points": len(asset_prices),
"timeseries": asset_prices,
"normalized_timeseries": norm_asset,
"aligned_timeseries": aligned_raw,
"aligned_normalized_timeseries": aligned_norm,
"success": True
})
except Exception as e:
results.append({
"ticker": ticker,
"reason": asset.get('reason', ''),
"error": str(e),
"success": False
})
return results
# Example usage
if __name__ == "__main__":
analyzer = DataAnalyzer()
# Example: Fetch and analyze AAPL
from time_utils import TimeConverter
converter = TimeConverter()
# Use a date range that's more likely to have data (last week)
import time
from datetime import datetime, timedelta
# Get timestamps for last week
end_time = datetime.now()
start_time = end_time - timedelta(days=7)
start_unix = int(start_time.timestamp())
end_unix = int(end_time.timestamp())
try:
print(f"Testing data fetch for last 7 days...")
print(f"Start: {start_time}")
print(f"End: {end_time}\n")
print("Fetching AAPL data...")
prices = analyzer.fetch_asset_prices("AAPL", start_unix, end_unix, "1h")
print(f"β Fetched {len(prices)} data points")
print("\nNormalizing data...")
normalized = analyzer.normalize_timeseries(prices, "min-max")
print(f"β First 3 normalized points:")
for point in normalized[:3]:
dt = datetime.fromtimestamp(point['timestamp'])
print(f" {dt}: {point['price']:.4f} (original: {point['original_price']:.2f})")
except Exception as e:
print(f"β Error: {str(e)}") |