Spaces:
Build error
Build error
| """ | |
| Data preparation script for DVC pipeline | |
| Prepares market data and technical indicators | |
| """ | |
| import os | |
| import sys | |
| import pandas as pd | |
| import numpy as np | |
| from datetime import datetime, timedelta | |
| import httpx | |
| import asyncio | |
| import yaml | |
| # Add parent directory to path | |
| sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| def load_params(): | |
| """Load parameters from params.yaml""" | |
| with open("params.yaml", "r") as f: | |
| return yaml.safe_load(f) | |
| async def fetch_market_data(symbol: str, asset_type: str, days: int): | |
| """Fetch market data for a symbol""" | |
| try: | |
| if asset_type == "CRYPTO": | |
| url = f"https://api.coingecko.com/api/v3/coins/{symbol.lower()}/market_chart" | |
| params = {"vs_currency": "usd", "days": str(days), "interval": "daily"} | |
| async with httpx.AsyncClient() as client: | |
| response = await client.get(url, params=params, timeout=10.0) | |
| if response.status_code == 200: | |
| data = response.json() | |
| if "prices" in data and len(data["prices"]) > 0: | |
| prices = [p[1] for p in data["prices"]] | |
| dates = [datetime.fromtimestamp(p[0]/1000) for p in data["prices"]] | |
| return pd.DataFrame({ | |
| "date": dates, | |
| "symbol": symbol, | |
| "asset_type": asset_type, | |
| "price": prices | |
| }) | |
| except Exception as e: | |
| print(f"Error fetching {symbol}: {e}") | |
| return pd.DataFrame() | |
| def calculate_technical_indicators(df: pd.DataFrame): | |
| """Calculate technical indicators""" | |
| if len(df) < 2: | |
| return df | |
| df = df.sort_values("date") | |
| # Moving averages | |
| df["sma_10"] = df["price"].rolling(window=min(10, len(df))).mean() | |
| df["sma_20"] = df["price"].rolling(window=min(20, len(df))).mean() | |
| # RSI | |
| delta = df["price"].diff() | |
| gain = delta.where(delta > 0, 0).rolling(window=min(14, len(df))).mean() | |
| loss = (-delta.where(delta < 0, 0)).rolling(window=min(14, len(df))).mean() | |
| rs = gain / loss | |
| df["rsi"] = 100 - (100 / (1 + rs)) | |
| # Volatility | |
| df["volatility"] = df["price"].pct_change().rolling(window=min(14, len(df))).std() * 100 | |
| # Price position in range | |
| window = min(30, len(df)) | |
| df["high_window"] = df["price"].rolling(window=window).max() | |
| df["low_window"] = df["price"].rolling(window=window).min() | |
| df["price_position"] = ((df["price"] - df["low_window"]) / | |
| (df["high_window"] - df["low_window"]) * 100) | |
| return df | |
| async def main(): | |
| """Main data preparation function""" | |
| params = load_params() | |
| config = params["data"]["prepare"] | |
| # Create output directory | |
| os.makedirs("data/processed", exist_ok=True) | |
| # Fetch market data | |
| all_data = [] | |
| for symbol in config["symbols"]: | |
| asset_type = "CRYPTO" if symbol.lower() in ["bitcoin", "ethereum"] else "ETF" | |
| print(f"Fetching {symbol} ({asset_type})...") | |
| df = await fetch_market_data(symbol, asset_type, config["days"]) | |
| if not df.empty: | |
| all_data.append(df) | |
| if not all_data: | |
| print("No data fetched") | |
| return | |
| # Combine all data | |
| market_df = pd.concat(all_data, ignore_index=True) | |
| market_df.to_parquet("data/processed/market_data.parquet") | |
| print(f"Saved market data: {len(market_df)} rows") | |
| # Calculate indicators | |
| indicators_list = [] | |
| for symbol in market_df["symbol"].unique(): | |
| symbol_df = market_df[market_df["symbol"] == symbol].copy() | |
| symbol_df = calculate_technical_indicators(symbol_df) | |
| indicators_list.append(symbol_df) | |
| indicators_df = pd.concat(indicators_list, ignore_index=True) | |
| indicators_df.to_parquet("data/processed/indicators.parquet") | |
| print(f"Saved indicators: {len(indicators_df)} rows") | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |