Investment_Assistant / scripts /prepare_data.py
Egeekle's picture
Add MLOps, RAG, monitoring, and utility dependencies to requirements.txt
7a658e1
"""
Data preparation script for DVC pipeline
Prepares market data and technical indicators
"""
import os
import sys
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import httpx
import asyncio
import yaml
# Add parent directory to path
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
def load_params():
"""Load parameters from params.yaml"""
with open("params.yaml", "r") as f:
return yaml.safe_load(f)
async def fetch_market_data(symbol: str, asset_type: str, days: int):
"""Fetch market data for a symbol"""
try:
if asset_type == "CRYPTO":
url = f"https://api.coingecko.com/api/v3/coins/{symbol.lower()}/market_chart"
params = {"vs_currency": "usd", "days": str(days), "interval": "daily"}
async with httpx.AsyncClient() as client:
response = await client.get(url, params=params, timeout=10.0)
if response.status_code == 200:
data = response.json()
if "prices" in data and len(data["prices"]) > 0:
prices = [p[1] for p in data["prices"]]
dates = [datetime.fromtimestamp(p[0]/1000) for p in data["prices"]]
return pd.DataFrame({
"date": dates,
"symbol": symbol,
"asset_type": asset_type,
"price": prices
})
except Exception as e:
print(f"Error fetching {symbol}: {e}")
return pd.DataFrame()
def calculate_technical_indicators(df: pd.DataFrame):
"""Calculate technical indicators"""
if len(df) < 2:
return df
df = df.sort_values("date")
# Moving averages
df["sma_10"] = df["price"].rolling(window=min(10, len(df))).mean()
df["sma_20"] = df["price"].rolling(window=min(20, len(df))).mean()
# RSI
delta = df["price"].diff()
gain = delta.where(delta > 0, 0).rolling(window=min(14, len(df))).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=min(14, len(df))).mean()
rs = gain / loss
df["rsi"] = 100 - (100 / (1 + rs))
# Volatility
df["volatility"] = df["price"].pct_change().rolling(window=min(14, len(df))).std() * 100
# Price position in range
window = min(30, len(df))
df["high_window"] = df["price"].rolling(window=window).max()
df["low_window"] = df["price"].rolling(window=window).min()
df["price_position"] = ((df["price"] - df["low_window"]) /
(df["high_window"] - df["low_window"]) * 100)
return df
async def main():
"""Main data preparation function"""
params = load_params()
config = params["data"]["prepare"]
# Create output directory
os.makedirs("data/processed", exist_ok=True)
# Fetch market data
all_data = []
for symbol in config["symbols"]:
asset_type = "CRYPTO" if symbol.lower() in ["bitcoin", "ethereum"] else "ETF"
print(f"Fetching {symbol} ({asset_type})...")
df = await fetch_market_data(symbol, asset_type, config["days"])
if not df.empty:
all_data.append(df)
if not all_data:
print("No data fetched")
return
# Combine all data
market_df = pd.concat(all_data, ignore_index=True)
market_df.to_parquet("data/processed/market_data.parquet")
print(f"Saved market data: {len(market_df)} rows")
# Calculate indicators
indicators_list = []
for symbol in market_df["symbol"].unique():
symbol_df = market_df[market_df["symbol"] == symbol].copy()
symbol_df = calculate_technical_indicators(symbol_df)
indicators_list.append(symbol_df)
indicators_df = pd.concat(indicators_list, ignore_index=True)
indicators_df.to_parquet("data/processed/indicators.parquet")
print(f"Saved indicators: {len(indicators_df)} rows")
if __name__ == "__main__":
asyncio.run(main())