""" 🚀 Parallel Execution Engine Utilizes multi-core CPU architecture to accelerate model training and data processing. """ import concurrent.futures import streamlit as st import os def run_models_parallel(series, forecast_days, selected_models): """ Run multiple forecasting models in parallel using ThreadPoolExecutor. (Threads are generally safer for Streamlit session state and shared imports than Processes). """ results = {} # Define wrappers for each model to handle parameters def train_arima(): from models.arima_model import fit_arima return "ARIMA", fit_arima(series, forecast_steps=forecast_days, auto_order=False) def train_prophet(): from models.prophet_model import fit_prophet return "Prophet", fit_prophet(series, forecast_steps=forecast_days) def train_lstm(): from models.lstm_model import fit_lstm return "LSTM", fit_lstm(series, forecast_steps=forecast_days, epochs=30) # Slightly fewer epochs for speed def train_exp_smoothing(): from models.exp_smoothing import fit_exponential_smoothing return "Exponential Smoothing", fit_exponential_smoothing(series, forecast_steps=forecast_days) model_funcs = { "ARIMA": train_arima, "Prophet": train_prophet, "LSTM": train_lstm, "Exponential Smoothing": train_exp_smoothing } # Only run models that were selected tasks = [model_funcs[m] for m in selected_models if m in model_funcs] # Use max available workers (up to the number of tasks) max_workers = min(len(tasks), os.cpu_count() or 4) with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_model = {executor.submit(func): func for func in tasks} for future in concurrent.futures.as_completed(future_to_model): try: name, result = future.result() results[name] = result except Exception as e: # Store the error to be handled by the UI results[future_to_model[future].__name__.split('_')[1].upper()] = {"error": str(e)} return results def optimize_tensorflow(): """Optimize TensorFlow to use available CPU resources efficiently.""" try: import tensorflow as tf # Set number of threads for intra and inter op parallelism cores = os.cpu_count() or 4 tf.config.threading.set_intra_op_parallelism_threads(cores) tf.config.threading.set_inter_op_parallelism_threads(cores) except ImportError: pass def pre_warm_cache(): """ Background cache warming. Uses spare RAM to pre-load popular data so the app feels instantaneous. """ from config.settings import POPULAR_STOCKS, ALPHA_VANTAGE_API_KEY from utils.api_clients import fetch_stock_daily import threading def warm_task(): # Only warm the top 5 mega-cap stocks top_symbols = ["AAPL", "NVDA", "MSFT", "GOOGL", "AMZN"] for symbol in top_symbols: try: fetch_stock_daily(symbol, ALPHA_VANTAGE_API_KEY) except: pass # Start warming in a separate thread to not block the main app load thread = threading.Thread(target=warm_task, daemon=True) thread.start()