File size: 4,052 Bytes
7a658e1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
"""
Data preparation script for DVC pipeline
Prepares market data and technical indicators
"""
import os
import sys
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import httpx
import asyncio
import yaml

# Add parent directory to path
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

def load_params():
    """Load parameters from params.yaml"""
    with open("params.yaml", "r") as f:
        return yaml.safe_load(f)

async def fetch_market_data(symbol: str, asset_type: str, days: int):
    """Fetch market data for a symbol"""
    try:
        if asset_type == "CRYPTO":
            url = f"https://api.coingecko.com/api/v3/coins/{symbol.lower()}/market_chart"
            params = {"vs_currency": "usd", "days": str(days), "interval": "daily"}
            async with httpx.AsyncClient() as client:
                response = await client.get(url, params=params, timeout=10.0)
                if response.status_code == 200:
                    data = response.json()
                    if "prices" in data and len(data["prices"]) > 0:
                        prices = [p[1] for p in data["prices"]]
                        dates = [datetime.fromtimestamp(p[0]/1000) for p in data["prices"]]
                        return pd.DataFrame({
                            "date": dates,
                            "symbol": symbol,
                            "asset_type": asset_type,
                            "price": prices
                        })
    except Exception as e:
        print(f"Error fetching {symbol}: {e}")
    return pd.DataFrame()

def calculate_technical_indicators(df: pd.DataFrame):
    """Calculate technical indicators"""
    if len(df) < 2:
        return df
    
    df = df.sort_values("date")
    
    # Moving averages
    df["sma_10"] = df["price"].rolling(window=min(10, len(df))).mean()
    df["sma_20"] = df["price"].rolling(window=min(20, len(df))).mean()
    
    # RSI
    delta = df["price"].diff()
    gain = delta.where(delta > 0, 0).rolling(window=min(14, len(df))).mean()
    loss = (-delta.where(delta < 0, 0)).rolling(window=min(14, len(df))).mean()
    rs = gain / loss
    df["rsi"] = 100 - (100 / (1 + rs))
    
    # Volatility
    df["volatility"] = df["price"].pct_change().rolling(window=min(14, len(df))).std() * 100
    
    # Price position in range
    window = min(30, len(df))
    df["high_window"] = df["price"].rolling(window=window).max()
    df["low_window"] = df["price"].rolling(window=window).min()
    df["price_position"] = ((df["price"] - df["low_window"]) / 
                            (df["high_window"] - df["low_window"]) * 100)
    
    return df

async def main():
    """Main data preparation function"""
    params = load_params()
    config = params["data"]["prepare"]
    
    # Create output directory
    os.makedirs("data/processed", exist_ok=True)
    
    # Fetch market data
    all_data = []
    for symbol in config["symbols"]:
        asset_type = "CRYPTO" if symbol.lower() in ["bitcoin", "ethereum"] else "ETF"
        print(f"Fetching {symbol} ({asset_type})...")
        df = await fetch_market_data(symbol, asset_type, config["days"])
        if not df.empty:
            all_data.append(df)
    
    if not all_data:
        print("No data fetched")
        return
    
    # Combine all data
    market_df = pd.concat(all_data, ignore_index=True)
    market_df.to_parquet("data/processed/market_data.parquet")
    print(f"Saved market data: {len(market_df)} rows")
    
    # Calculate indicators
    indicators_list = []
    for symbol in market_df["symbol"].unique():
        symbol_df = market_df[market_df["symbol"] == symbol].copy()
        symbol_df = calculate_technical_indicators(symbol_df)
        indicators_list.append(symbol_df)
    
    indicators_df = pd.concat(indicators_list, ignore_index=True)
    indicators_df.to_parquet("data/processed/indicators.parquet")
    print(f"Saved indicators: {len(indicators_df)} rows")

if __name__ == "__main__":
    asyncio.run(main())