umer6016's picture
Upload folder using huggingface_hub
ce3d808 verified
import os
import requests
import pandas as pd
from prefect import flow, task
from src.ingestion.ingest import fetch_daily_data
from src.processing.features import process_data
from src.processing.split import split_data
from src.models.train import ModelTrainer
from tests.data_validation import validate_data
from dotenv import load_dotenv
load_dotenv()
from src.orchestration.notifications import notify_discord
@task(retries=3, retry_delay_seconds=60)
def fetch_stock_data(symbol: str):
"""Task to fetch stock data with retries."""
try:
file_path = fetch_daily_data(symbol)
return file_path
except Exception as e:
raise e
@task
def process_stock_data(file_path: str, symbol: str):
"""Task to process stock data."""
output_path = f"data/processed/{symbol}_processed.csv"
os.makedirs("data/processed", exist_ok=True)
df = process_data(file_path, output_path)
return df
@task
def train_and_evaluate(df: pd.DataFrame, symbol: str):
"""Task to train models and evaluate."""
train_df, test_df = split_data(df)
# Validation
validate_data(train_df, test_df, output_dir=f"reports/{symbol}")
# Training
trainer = ModelTrainer(output_dir=f"models/{symbol}", metrics_dir=f"reports/{symbol}")
trainer.train_regression(train_df, test_df)
trainer.train_classification(train_df, test_df)
trainer.train_clustering(df)
trainer.train_pca(df)
trainer.save_metrics()
return True
@flow(name="End-to-End Stock Prediction Pipeline")
def main_pipeline(symbols: list[str] = ["AAPL", "GOOGL", "MSFT", "AMZN", "TSLA", "NVDA"]):
"""Main flow to run the entire pipeline."""
notify_discord("πŸš€ Starting End-to-End Pipeline...")
for symbol in symbols:
try:
print(f"Processing {symbol}...")
raw_path = fetch_stock_data(symbol)
df = process_stock_data(raw_path, symbol)
train_and_evaluate(df, symbol)
notify_discord(f"βœ… Pipeline completed for {symbol}")
except Exception as e:
notify_discord(f"❌ Pipeline failed for {symbol}: {e}")
print(f"Error processing {symbol}: {e}")
if __name__ == "__main__":
main_pipeline()