""" Data preparation script for DVC pipeline Prepares market data and technical indicators """ import os import sys import pandas as pd import numpy as np from datetime import datetime, timedelta import httpx import asyncio import yaml # Add parent directory to path sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) def load_params(): """Load parameters from params.yaml""" with open("params.yaml", "r") as f: return yaml.safe_load(f) async def fetch_market_data(symbol: str, asset_type: str, days: int): """Fetch market data for a symbol""" try: if asset_type == "CRYPTO": url = f"https://api.coingecko.com/api/v3/coins/{symbol.lower()}/market_chart" params = {"vs_currency": "usd", "days": str(days), "interval": "daily"} async with httpx.AsyncClient() as client: response = await client.get(url, params=params, timeout=10.0) if response.status_code == 200: data = response.json() if "prices" in data and len(data["prices"]) > 0: prices = [p[1] for p in data["prices"]] dates = [datetime.fromtimestamp(p[0]/1000) for p in data["prices"]] return pd.DataFrame({ "date": dates, "symbol": symbol, "asset_type": asset_type, "price": prices }) except Exception as e: print(f"Error fetching {symbol}: {e}") return pd.DataFrame() def calculate_technical_indicators(df: pd.DataFrame): """Calculate technical indicators""" if len(df) < 2: return df df = df.sort_values("date") # Moving averages df["sma_10"] = df["price"].rolling(window=min(10, len(df))).mean() df["sma_20"] = df["price"].rolling(window=min(20, len(df))).mean() # RSI delta = df["price"].diff() gain = delta.where(delta > 0, 0).rolling(window=min(14, len(df))).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=min(14, len(df))).mean() rs = gain / loss df["rsi"] = 100 - (100 / (1 + rs)) # Volatility df["volatility"] = df["price"].pct_change().rolling(window=min(14, len(df))).std() * 100 # Price position in range window = min(30, len(df)) df["high_window"] = df["price"].rolling(window=window).max() df["low_window"] = df["price"].rolling(window=window).min() df["price_position"] = ((df["price"] - df["low_window"]) / (df["high_window"] - df["low_window"]) * 100) return df async def main(): """Main data preparation function""" params = load_params() config = params["data"]["prepare"] # Create output directory os.makedirs("data/processed", exist_ok=True) # Fetch market data all_data = [] for symbol in config["symbols"]: asset_type = "CRYPTO" if symbol.lower() in ["bitcoin", "ethereum"] else "ETF" print(f"Fetching {symbol} ({asset_type})...") df = await fetch_market_data(symbol, asset_type, config["days"]) if not df.empty: all_data.append(df) if not all_data: print("No data fetched") return # Combine all data market_df = pd.concat(all_data, ignore_index=True) market_df.to_parquet("data/processed/market_data.parquet") print(f"Saved market data: {len(market_df)} rows") # Calculate indicators indicators_list = [] for symbol in market_df["symbol"].unique(): symbol_df = market_df[market_df["symbol"] == symbol].copy() symbol_df = calculate_technical_indicators(symbol_df) indicators_list.append(symbol_df) indicators_df = pd.concat(indicators_list, ignore_index=True) indicators_df.to_parquet("data/processed/indicators.parquet") print(f"Saved indicators: {len(indicators_df)} rows") if __name__ == "__main__": asyncio.run(main())