heboya8 commited on
Commit
3db64d3
·
verified ·
1 Parent(s): 2eee82e

Delete components/model/evaluation.py

Browse files
Files changed (1) hide show
  1. components/model/evaluation.py +0 -239
components/model/evaluation.py DELETED
@@ -1,239 +0,0 @@
1
- import os
2
- import logging
3
- import pandas as pd
4
- import numpy as np
5
- from sklearn.preprocessing import MinMaxScaler
6
- import pickle
7
- from sklearn.metrics import (
8
- mean_squared_error,
9
- mean_absolute_error,
10
- mean_absolute_percentage_error
11
- )
12
- from typing import Dict, List, Tuple
13
- from datetime import datetime, timezone
14
- import tensorflow as tf
15
- import sys
16
- import ast
17
-
18
- sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")))
19
- from components.utils.file_utils import load_extract_config, get_parquet_file_names
20
- from components.model.model_utils import build_model_from_config
21
- from components.model.data_utils import create_data_loader
22
- from components.utils.utils import parse_timezone
23
-
24
- logging.basicConfig(
25
- level=logging.INFO,
26
- format='%(asctime)s %(levelname)s: %(message)s',
27
- datefmt='%Y-%m-%d %H:%M:%S %Z'
28
- )
29
- logger = logging.getLogger(__name__)
30
-
31
- # def model_evaluate(model, scaler: MinMaxScaler, ds: tf.data.Dataset) -> Tuple[float, float]:
32
- # """Evaluate a model on a dataset and return RMSE and MAE.
33
-
34
- # Args:
35
- # model: Trained Keras model.
36
- # scaler (MinMaxScaler): Scaler used for data normalization.
37
- # ds (tf.data.Dataset): Dataset to evaluate on.
38
-
39
- # Returns:
40
- # Tuple[float, float]: RMSE and MAE metrics.
41
- # """
42
- # y_true, y_pred = [], []
43
- # for X, y in ds:
44
- # pred = model.predict(X, verbose=2)
45
- # y_true.append(y.numpy())
46
- # y_pred.append(pred)
47
- # y_true = np.concatenate(y_true)
48
- # y_pred = np.concatenate(y_pred)
49
- # y_true_orig = scaler.inverse_transform(y_true)
50
- # y_pred_orig = scaler.inverse_transform(y_pred)
51
- # return (np.sqrt(mean_squared_error(y_true_orig, y_pred_orig)),
52
- # mean_absolute_error(y_true_orig, y_pred_orig))
53
-
54
- def model_evaluate(model, scaler: MinMaxScaler, ds: tf.data.Dataset) -> Tuple[float, float]:
55
- """Evaluate a model on a dataset and return RMSE and MAE.
56
-
57
- Args:
58
- model: Trained Keras model.
59
- scaler (MinMaxScaler): Scaler used for data normalization.
60
- ds (tf.data.Dataset): Dataset to evaluate on.
61
-
62
- Returns:
63
- Tuple[float, float]: RMSE and MAE metrics.
64
- """
65
- # Collect true labels (y) from dataset
66
- y_true = []
67
- for _, y in ds:
68
- y_true.append(y.numpy())
69
- y_true = np.concatenate(y_true)
70
-
71
- # Predict the entire dataset
72
- y_pred = model.predict(ds, verbose=0) # Silent predictions
73
-
74
- # Inverse transform to original scale
75
- y_true_orig = scaler.inverse_transform(y_true)
76
- y_pred_orig = scaler.inverse_transform(y_pred)
77
-
78
- # Calculate metrics
79
- return (np.sqrt(mean_squared_error(y_true_orig, y_pred_orig)),
80
- mean_absolute_error(y_true_orig, y_pred_orig),
81
- mean_absolute_percentage_error(y_true_orig, y_pred_orig))
82
-
83
- def metric_and_predict_lstm_model(train_result: Dict) -> Dict:
84
- """Evaluate the trained LSTM model and predict the next price.
85
-
86
- Args:
87
- train_result (Dict): Training result dictionary from train_lstm_model task.
88
-
89
- Returns:
90
- Dict: Evaluation metrics and prediction metadata.
91
- """
92
- # Access ti directly from kwargs
93
- if not train_result:
94
- raise ValueError("No training result provided.")
95
-
96
- # Convert string representation to dictionary if necessary
97
- train_result = ast.literal_eval(train_result)
98
-
99
- cfg = load_extract_config('model_config.yml')
100
- parquet_folder = load_extract_config('pipeline_config.yml')['paths']['parquet_folder']
101
- os.makedirs(parquet_folder, exist_ok=True)
102
-
103
- model_cfg = cfg['model']
104
- data_cfg = cfg['data']
105
- out_cfg = cfg['output']
106
- dt_str = train_result['datetime']
107
- model_filename = train_result['model_filename']
108
- dataset_merge = train_result['dataset_merge']
109
-
110
- model_path = train_result['model_path']
111
- scaler_path = train_result['scaler_path']
112
- seq_length = data_cfg['seq_length']
113
- batch_size = cfg['evaluation'].get('eval_batch_size', 64)
114
-
115
- # Load scaler and model
116
- with open(scaler_path, 'rb') as f:
117
- scaler = pickle.load(f)
118
- model = build_model_from_config(seq_length, cfg)
119
- model.load_weights(model_path)
120
-
121
- # Create dataset
122
- parquet_paths = [parquet_folder for el in get_parquet_file_names()]
123
- dataset = create_data_loader(parquet_paths, scaler, seq_length, batch_size)
124
-
125
- # Calculate splits
126
- total_seqs = sum(max(0, len(pd.read_parquet(path, columns=['Close'])) - seq_length)
127
- for path in parquet_paths if os.path.exists(path))
128
- if total_seqs == 0:
129
- raise ValueError("Not enough sequences for evaluation.")
130
-
131
- steps_total = (total_seqs + batch_size - 1) // batch_size
132
- steps_train = int(steps_total * data_cfg['train_ratio'])
133
- steps_val = int(steps_total * data_cfg['val_ratio'])
134
- steps_test = steps_total - steps_train - steps_val
135
-
136
- train_ds = dataset.take(steps_train)
137
- val_ds = dataset.skip(steps_train).take(steps_val)
138
- test_ds = dataset.skip(steps_train + steps_val)
139
-
140
- # Evaluate model
141
- # train_rmse, train_mae = model_evaluate(model, scaler, train_ds)
142
- # val_rmse, val_mae = model_evaluate(model, scaler, val_ds)
143
- train_rmse, train_mae, train_mape = model_evaluate(model, scaler, train_ds)
144
- val_rmse, val_mae, val_mape = model_evaluate(model, scaler, val_ds)
145
- test_rmse, test_mae, test_mape = model_evaluate(model, scaler, test_ds)
146
-
147
- # Save metrics
148
- metrics_path = os.path.join(out_cfg['metrics']['metrics_dir'], f"metrics_{dt_str}.csv")
149
- os.makedirs(out_cfg['metrics']['metrics_dir'], exist_ok=True)
150
-
151
- metrics_data = [
152
- [model_filename, dataset_merge, "Train", "RMSE", train_rmse],
153
- [model_filename, dataset_merge, "Train", "MAE", train_mae],
154
- [model_filename, dataset_merge, "Train", "MAPE", train_mape],
155
- [model_filename, dataset_merge, "Val", "RMSE", val_rmse],
156
- [model_filename, dataset_merge, "Val", "MAE", val_mae],
157
- [model_filename, dataset_merge, "Val", "MAPE", val_mape],
158
- [model_filename, dataset_merge, "Test", "RMSE", test_rmse],
159
- [model_filename, dataset_merge, "Test", "MAE", test_mae],
160
- [model_filename, dataset_merge, "Test", "MAPE", test_mape],
161
- ]
162
-
163
- metrics_df = pd.DataFrame(
164
- metrics_data,
165
- columns=['model_path', 'dataset_merge', 'Split', 'Metric', 'Value']
166
- )
167
- metrics_df.to_csv(metrics_path, index=False)
168
-
169
- # Predict next price
170
- last_chunk = None
171
- for path in reversed(parquet_paths):
172
- if os.path.exists(path):
173
- df_tail = pd.read_parquet(path).tail(seq_length)
174
- if len(df_tail) >= seq_length:
175
- last_chunk = df_tail['Close'].values.astype('float32').reshape(-1, 1)
176
- break
177
- if last_chunk is None:
178
- raise ValueError("Not enough recent data for prediction.")
179
-
180
- last_scaled = scaler.transform(last_chunk)
181
- next_scaled = model.predict(last_scaled.reshape(1, seq_length, 1), verbose=2)
182
- next_price = scaler.inverse_transform(next_scaled)[0][0]
183
-
184
- # Save prediction
185
- pred_path = os.path.join(out_cfg['predictions']['pred_dir'], f"prediction_{dt_str}.txt")
186
- os.makedirs(os.path.dirname(pred_path), exist_ok=True)
187
-
188
- with open(pred_path, 'w') as f:
189
- f.write(f"Model Run: {dt_str}\n")
190
- f.write(f"Model File: {model_filename}\n")
191
- f.write(f"Dataset Merged: {dataset_merge}\n")
192
- f.write(f"Architecture: {model_cfg['architecture'].upper()}\n")
193
- f.write(f"Predicted Next Close: {next_price:.6f}\n")
194
- f.write(f"Based on last {seq_length} timesteps.\n\n")
195
- f.write("Evaluation Metrics:\n")
196
- f.write(f" Train -> RMSE: {train_rmse:8.6f} | MAE: {train_mae:8.6f} | MAPE: {train_mape:8.6f}\n")
197
- f.write(f" Val -> RMSE: {val_rmse:8.6f} | MAE: {val_mae:8.6f} | MAPE: {val_mape:8.6f}\n")
198
- f.write(f" Test -> RMSE: {test_rmse:8.6f} | MAE: {test_mae:8.6f} | MAPE: {test_mape:8.6f}\n")
199
-
200
- logging.info(f"Next price: {next_price:.4f} | Test RMSE: {test_rmse:.6f} | Dataset: {dataset_merge}")
201
-
202
- return {
203
- 'metrics_path': metrics_path,
204
- 'prediction_path': pred_path,
205
- 'next_price': float(next_price)
206
- }
207
-
208
- if __name__ == "__main__":
209
- logger.info("Running standalone evaluation test")
210
- # Simulate training result for testing
211
- cfg = load_extract_config('model_config.yml')
212
- out_cfg = cfg['output']
213
- data_cfg = cfg['data']
214
-
215
- # Mock training result (adjust paths to match an actual trained model and scaler)
216
- mock_train_result = {
217
- 'model_path': os.path.join(out_cfg['checkpoints']['model_dir'],
218
- 'model_2025-10-24-21-59-42-(+07).h5'),
219
- 'model_filename': 'model_2025-10-24-18-40-00-(+07).h5',
220
- 'scaler_path': os.path.join(out_cfg['checkpoints']['scaler_dir'],
221
- 'scaler_2025-10-24-21-59-42-(+07).pkl'),
222
- 'datetime': '2025-10-24-21-59-42-(+07',
223
- 'dataset_merge': 'BTCUSDT-1s-2025-08 + BTCUSDT-1s-2025-09'
224
- }
225
-
226
- # Simulate Airflow task instance
227
- class MockTaskInstance:
228
- def xcom_pull(self, task_ids):
229
- return mock_train_result
230
-
231
- mock_ti = MockTaskInstance()
232
-
233
- try:
234
- result = metric_and_predict_lstm_model(ti=mock_ti)
235
- logger.info("Evaluation completed successfully!")
236
- logger.info(f"Result: {result}")
237
- except Exception as e:
238
- logger.error(f"Evaluation failed: {str(e)}")
239
- logger.info("Standalone evaluation run completed")