File size: 3,333 Bytes
09c2e42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f4b0623
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
"""
🚀 Parallel Execution Engine
Utilizes multi-core CPU architecture to accelerate model training and data processing.
"""
import concurrent.futures
import streamlit as st
import os

def run_models_parallel(series, forecast_days, selected_models):
    """
    Run multiple forecasting models in parallel using ThreadPoolExecutor.
    (Threads are generally safer for Streamlit session state and shared imports than Processes).
    """
    results = {}
    
    # Define wrappers for each model to handle parameters
    def train_arima():
        from models.arima_model import fit_arima
        return "ARIMA", fit_arima(series, forecast_steps=forecast_days, auto_order=False)

    def train_prophet():
        from models.prophet_model import fit_prophet
        return "Prophet", fit_prophet(series, forecast_steps=forecast_days)

    def train_lstm():
        from models.lstm_model import fit_lstm
        return "LSTM", fit_lstm(series, forecast_steps=forecast_days, epochs=30) # Slightly fewer epochs for speed

    def train_exp_smoothing():
        from models.exp_smoothing import fit_exponential_smoothing
        return "Exponential Smoothing", fit_exponential_smoothing(series, forecast_steps=forecast_days)

    model_funcs = {
        "ARIMA": train_arima,
        "Prophet": train_prophet,
        "LSTM": train_lstm,
        "Exponential Smoothing": train_exp_smoothing
    }

    # Only run models that were selected
    tasks = [model_funcs[m] for m in selected_models if m in model_funcs]
    
    # Use max available workers (up to the number of tasks)
    max_workers = min(len(tasks), os.cpu_count() or 4)
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_model = {executor.submit(func): func for func in tasks}
        for future in concurrent.futures.as_completed(future_to_model):
            try:
                name, result = future.result()
                results[name] = result
            except Exception as e:
                # Store the error to be handled by the UI
                results[future_to_model[future].__name__.split('_')[1].upper()] = {"error": str(e)}

    return results

def optimize_tensorflow():
    """Optimize TensorFlow to use available CPU resources efficiently."""
    try:
        import tensorflow as tf
        # Set number of threads for intra and inter op parallelism
        cores = os.cpu_count() or 4
        tf.config.threading.set_intra_op_parallelism_threads(cores)
        tf.config.threading.set_inter_op_parallelism_threads(cores)
    except ImportError:
        pass

def pre_warm_cache():
    """
    Background cache warming.
    Uses spare RAM to pre-load popular data so the app feels instantaneous.
    """
    from config.settings import POPULAR_STOCKS, ALPHA_VANTAGE_API_KEY
    from utils.api_clients import fetch_stock_daily
    import threading

    def warm_task():
        # Only warm the top 5 mega-cap stocks
        top_symbols = ["AAPL", "NVDA", "MSFT", "GOOGL", "AMZN"]
        for symbol in top_symbols:
            try:
                fetch_stock_daily(symbol, ALPHA_VANTAGE_API_KEY)
            except:
                pass

    # Start warming in a separate thread to not block the main app load
    thread = threading.Thread(target=warm_task, daemon=True)
    thread.start()