Spaces:
Running
Running
File size: 3,333 Bytes
09c2e42 f4b0623 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 | """
🚀 Parallel Execution Engine
Utilizes multi-core CPU architecture to accelerate model training and data processing.
"""
import concurrent.futures
import streamlit as st
import os
def run_models_parallel(series, forecast_days, selected_models):
"""
Run multiple forecasting models in parallel using ThreadPoolExecutor.
(Threads are generally safer for Streamlit session state and shared imports than Processes).
"""
results = {}
# Define wrappers for each model to handle parameters
def train_arima():
from models.arima_model import fit_arima
return "ARIMA", fit_arima(series, forecast_steps=forecast_days, auto_order=False)
def train_prophet():
from models.prophet_model import fit_prophet
return "Prophet", fit_prophet(series, forecast_steps=forecast_days)
def train_lstm():
from models.lstm_model import fit_lstm
return "LSTM", fit_lstm(series, forecast_steps=forecast_days, epochs=30) # Slightly fewer epochs for speed
def train_exp_smoothing():
from models.exp_smoothing import fit_exponential_smoothing
return "Exponential Smoothing", fit_exponential_smoothing(series, forecast_steps=forecast_days)
model_funcs = {
"ARIMA": train_arima,
"Prophet": train_prophet,
"LSTM": train_lstm,
"Exponential Smoothing": train_exp_smoothing
}
# Only run models that were selected
tasks = [model_funcs[m] for m in selected_models if m in model_funcs]
# Use max available workers (up to the number of tasks)
max_workers = min(len(tasks), os.cpu_count() or 4)
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_model = {executor.submit(func): func for func in tasks}
for future in concurrent.futures.as_completed(future_to_model):
try:
name, result = future.result()
results[name] = result
except Exception as e:
# Store the error to be handled by the UI
results[future_to_model[future].__name__.split('_')[1].upper()] = {"error": str(e)}
return results
def optimize_tensorflow():
"""Optimize TensorFlow to use available CPU resources efficiently."""
try:
import tensorflow as tf
# Set number of threads for intra and inter op parallelism
cores = os.cpu_count() or 4
tf.config.threading.set_intra_op_parallelism_threads(cores)
tf.config.threading.set_inter_op_parallelism_threads(cores)
except ImportError:
pass
def pre_warm_cache():
"""
Background cache warming.
Uses spare RAM to pre-load popular data so the app feels instantaneous.
"""
from config.settings import POPULAR_STOCKS, ALPHA_VANTAGE_API_KEY
from utils.api_clients import fetch_stock_daily
import threading
def warm_task():
# Only warm the top 5 mega-cap stocks
top_symbols = ["AAPL", "NVDA", "MSFT", "GOOGL", "AMZN"]
for symbol in top_symbols:
try:
fetch_stock_daily(symbol, ALPHA_VANTAGE_API_KEY)
except:
pass
# Start warming in a separate thread to not block the main app load
thread = threading.Thread(target=warm_task, daemon=True)
thread.start()
|