|
|
import os |
|
|
import requests |
|
|
import pandas as pd |
|
|
from prefect import flow, task |
|
|
from src.ingestion.ingest import fetch_daily_data |
|
|
from src.processing.features import process_data |
|
|
from src.processing.split import split_data |
|
|
from src.models.train import ModelTrainer |
|
|
from tests.data_validation import validate_data |
|
|
from dotenv import load_dotenv |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
from src.orchestration.notifications import notify_discord |
|
|
|
|
|
@task(retries=3, retry_delay_seconds=60) |
|
|
def fetch_stock_data(symbol: str): |
|
|
"""Task to fetch stock data with retries.""" |
|
|
try: |
|
|
file_path = fetch_daily_data(symbol) |
|
|
return file_path |
|
|
except Exception as e: |
|
|
raise e |
|
|
|
|
|
@task |
|
|
def process_stock_data(file_path: str, symbol: str): |
|
|
"""Task to process stock data.""" |
|
|
output_path = f"data/processed/{symbol}_processed.csv" |
|
|
os.makedirs("data/processed", exist_ok=True) |
|
|
df = process_data(file_path, output_path) |
|
|
return df |
|
|
|
|
|
@task |
|
|
def train_and_evaluate(df: pd.DataFrame, symbol: str): |
|
|
"""Task to train models and evaluate.""" |
|
|
train_df, test_df = split_data(df) |
|
|
|
|
|
|
|
|
validate_data(train_df, test_df, output_dir=f"reports/{symbol}") |
|
|
|
|
|
|
|
|
trainer = ModelTrainer(output_dir=f"models/{symbol}", metrics_dir=f"reports/{symbol}") |
|
|
trainer.train_regression(train_df, test_df) |
|
|
trainer.train_classification(train_df, test_df) |
|
|
trainer.train_clustering(df) |
|
|
trainer.train_pca(df) |
|
|
trainer.save_metrics() |
|
|
|
|
|
return True |
|
|
|
|
|
@flow(name="End-to-End Stock Prediction Pipeline") |
|
|
def main_pipeline(symbols: list[str] = ["AAPL", "GOOGL", "MSFT", "AMZN", "TSLA", "NVDA"]): |
|
|
"""Main flow to run the entire pipeline.""" |
|
|
notify_discord("π Starting End-to-End Pipeline...") |
|
|
|
|
|
for symbol in symbols: |
|
|
try: |
|
|
print(f"Processing {symbol}...") |
|
|
raw_path = fetch_stock_data(symbol) |
|
|
df = process_stock_data(raw_path, symbol) |
|
|
train_and_evaluate(df, symbol) |
|
|
notify_discord(f"β
Pipeline completed for {symbol}") |
|
|
except Exception as e: |
|
|
notify_discord(f"β Pipeline failed for {symbol}: {e}") |
|
|
print(f"Error processing {symbol}: {e}") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main_pipeline() |
|
|
|