# Inverse_Model.py import numpy as np import pandas as pd import joblib from scipy.optimize import differential_evolution # ========================================================= # KONFIGURASI GLOBAL (TIDAK ADA STREAMLIT DI FILE INI) # ========================================================= # List produk yang digunakan untuk inverse model dan dapat juga di-import ke Dashboard AVAILABLE_PRODUCTS = ["BMR BASE", "CKP BASE", "CKR BASE", "CMR BASE", "MORIGRO BASE"] # Batas global parameter (dipakai di perhitungan bounds) PARAMS_BOUNDS = { "D101330TT": (92, 99), "D102260TIC_CV": (35, 80), "D102265TIC_PV": (160, 195), "D102265TIC_CV": (10, 70), "D102266TIC": (15, 22), "D101264FTSCL": (3300, 4900), } # Konfigurasi per-produk: GAS range, korelasi, dan BINNING_DATA PRODUCT_CONFIG = { "CKR BASE": { "gas_min": 0.20, "gas_max": 0.35, "param_corr": { "D101330TT": "negatif", "D102260TIC_CV": "positif", "D102265TIC_PV": "positif", "D102265TIC_CV": "positif", "D102266TIC": "netral", "D101264FTSCL": "positif", }, "binning": { (0.20, 0.275): { "D101330TT": (92.01, 95.14), "D102260TIC_CV": (44.0, 70.0), "D102265TIC_PV": (173.96, 193.58), "D102265TIC_CV": (19.68, 52.23), "D102266TIC": (17.42, 18.59), "D101264FTSCL": (3710.76, 4690.91), }, (0.275, 0.35): { "D101330TT": (92.01, 98.31), "D102260TIC_CV": (38.0, 68.0), "D102265TIC_PV": (171.59, 194.97), "D102265TIC_CV": (15.29, 63.62), "D102266TIC": (15.94, 19.59), "D101264FTSCL": (3496.96, 4888.82), }, }, }, "BMR BASE": { "gas_min": 0.20, "gas_max": 0.375, "param_corr": { "D101330TT": "netral", "D102260TIC_CV": "positif", "D102265TIC_PV": "positif", "D102265TIC_CV": "positif", "D102266TIC": "netral", "D101264FTSCL": "positif", }, "binning": { (0.20, 0.275): { "D101330TT": (92.62, 97.05), "D102260TIC_CV": (38, 62), "D102265TIC_PV": (171.64, 190.05), "D102265TIC_CV": (14.47, 24.46), "D102266TIC": (17.01, 18.98), "D101264FTSCL": (3633.08, 4125.52), }, (0.275, 0.375): { "D101330TT": (92.23, 98.96), "D102260TIC_CV": (36.0, 60.0), "D102265TIC_PV": (171.64, 192.53), "D102265TIC_CV": (11.75, 33.91), "D102266TIC": (16.16, 18.43), "D101264FTSCL": (3535.08, 4283.65), }, }, }, "CKP BASE": { "gas_min": 0.18, "gas_max": 0.375, "param_corr": { "D101330TT": "netral", "D102260TIC_CV": "positif", "D102265TIC_PV": "positif", "D102265TIC_CV": "positif", "D102266TIC": "netral", "D101264FTSCL": "positif", }, "binning": { (0.18, 0.28): { "D101330TT": (92.01, 98.83), "D102260TIC_CV": (36, 68), "D102265TIC_PV": (168.11, 194.97), "D102265TIC_CV": (13.99, 49.36), "D102266TIC": (15.83, 18.84), "D101264FTSCL": (3632.62, 4890.58), }, (0.28, 0.38): { "D101330TT": (92.01, 99.00), "D102260TIC_CV": (38, 68), "D102265TIC_PV": (169.50, 194.97), "D102265TIC_CV": (13.93, 49.36), "D102266TIC": (15.86, 19.02), "D101264FTSCL": (3658.91, 4890.58), }, }, }, "CMR BASE": { "gas_min": 0.19, "gas_max": 0.375, "param_corr": { "D101330TT": "netral", "D102260TIC_CV": "positif", "D102265TIC_PV": "positif", "D102265TIC_CV": "positif", "D102266TIC": "netral", "D101264FTSCL": "positif", }, "binning": { (0.19, 0.275): { "D101264FTSCL": (3618.73, 4539.96), "D101330TT": (92.1, 98.91), "D102260TIC_CV": (38, 62), "D102265TIC_CV": (15.3, 26.01), "D102265TIC_PV": (163.14, 192.25), "D102266TIC": (16.35, 19.55), }, (0.275, 0.375): { "D101264FTSCL": (3445.31, 4684.92), "D101330TT": (92.06, 99.0), "D102260TIC_CV": (36, 64), "D102265TIC_CV": (14.75, 39.87), "D102265TIC_PV": (162.09, 191.96), "D102266TIC": (16.2, 19.55), }, }, }, "MORIGRO BASE": { "gas_min": 0.12, "gas_max": 0.375, "param_corr": { "D101330TT": "netral", "D102260TIC_CV": "positif", "D102265TIC_PV": "positif", "D102265TIC_CV": "positif", "D102266TIC": "netral", "D101264FTSCL": "positif", }, "binning": { (0.12, 0.28): { "D101264FTSCL": (3437.81, 3922.18), "D101330TT": (92.01, 98.78), "D102260TIC_CV": (36, 70), "D102265TIC_CV": (20.0, 42.95), "D102265TIC_PV": (179.98, 194.97), "D102266TIC": (17.3, 18.32), }, (0.28, 0.375): { "D101264FTSCL": (3389.88, 4072.64), "D101330TT": (92.01, 97.27), "D102260TIC_CV": (38, 66), "D102265TIC_CV": (19.65, 45.87), "D102265TIC_PV": (180.32, 189.0), "D102266TIC": (16.91, 18.32), }, }, }, } # ========================================================= # FUNGSI MODEL & OPTIMISASI (BACKEND) # ========================================================= def load_model(model_path: str): """Load forward model XGBoost + poly_transformer dari checkpoint.""" deployment_bundle = joblib.load(model_path) return ( deployment_bundle["model"], deployment_bundle["poly_transformer"], deployment_bundle["input_features"], deployment_bundle["poly_feature_names"], ) def predict_mmbtu(params_array, model, poly_transformer, input_features, poly_feature_names): """Prediksi GAS_MMBTU dari array parameter.""" params_dict = dict(zip(input_features, params_array)) X = pd.DataFrame([params_dict])[input_features] X_poly = poly_transformer.transform(X) X_poly_df = pd.DataFrame(X_poly, columns=poly_feature_names) return float(model.predict(X_poly_df)[0]) def get_operational_bounds(target_mmbtu: float, binning: dict): """ Ambil bounds operasional dari BINNING_DATA terdekat. - Jika target di dalam salah satu bin => pakai bin itu - Jika di bawah minimum => pakai bin pertama - Jika di atas maksimum => pakai bin terakhir """ bins_sorted = sorted(binning.keys(), key=lambda x: x[0]) # sort by lower bound for (lo, hi) in bins_sorted: if lo <= target_mmbtu <= hi: return binning[(lo, hi)] # fallback if target_mmbtu < bins_sorted[0][0]: return binning[bins_sorted[0]] else: return binning[bins_sorted[-1]] def calculate_bounds(target_mmbtu, input_features, product_cfg): """ Hitung hard_bounds & soft_bounds untuk satu produk dan satu target MMBTU. """ gas_min = product_cfg["gas_min"] gas_max = product_cfg["gas_max"] binning = product_cfg["binning"] param_corr = product_cfg["param_corr"] # Step 1: SP_target dan level gas_range = gas_max - gas_min sp_target = (target_mmbtu - gas_min) / gas_range sp_target = float(np.clip(sp_target, 0, 1)) sp_inverse = 1.0 - sp_target if sp_target < 0.33: level = "rendah" elif sp_target < 0.67: level = "menengah" else: level = "tinggi" # Step 2: hard bounds dari binning operational_bounds = get_operational_bounds(target_mmbtu, binning) hard_bounds = {} soft_bounds = {} # Step 3: soft bounds per parameter for param in input_features: keras_min, keras_max = operational_bounds[param] keras_range = keras_max - keras_min hard_bounds[param] = (keras_min, keras_max) korelasi = param_corr.get(param, "netral") # SP relevan tergantung korelasi sp = sp_inverse if korelasi == "negatif" else sp_target # Target ideal global dari global bounds min_global, max_global = PARAMS_BOUNDS[param] range_global = max_global - min_global target_ideal_global = min_global + (sp * range_global) # Jika target ideal global masih within bin => pakai if keras_min <= target_ideal_global <= keras_max: target_ideal = target_ideal_global else: target_ideal = keras_min + (sp * keras_range) buffer = 0.2 * keras_range # 20% dari range if korelasi == "netral": ideal_min = keras_min ideal_max = keras_max elif level == "rendah": if korelasi == "positif": ideal_min = keras_min ideal_max = target_ideal + buffer else: # negatif ideal_min = target_ideal - buffer ideal_max = keras_max elif level == "menengah": ideal_min = target_ideal - buffer ideal_max = target_ideal + buffer else: # tinggi if korelasi == "positif": ideal_min = target_ideal - buffer ideal_max = keras_max else: # negatif ideal_min = keras_min ideal_max = target_ideal + buffer ideal_min = max(ideal_min, keras_min) ideal_max = min(ideal_max, keras_max) soft_bounds[param] = (ideal_min, ideal_max) return hard_bounds, soft_bounds, level def objective_function( params_array, target_mmbtu, model, poly_transformer, input_features, poly_feature_names, hard_bounds, soft_bounds, ): """Fungsi objektif untuk Differential Evolution.""" prediction = predict_mmbtu(params_array, model, poly_transformer, input_features, poly_feature_names) error_pred = (prediction - target_mmbtu) ** 2 total_penalty = 0.0 for i, param in enumerate(input_features): value = params_array[i] ideal_min, ideal_max = soft_bounds[param] keras_min, keras_max = hard_bounds[param] param_range = keras_max - keras_min violation = 0.0 if value < ideal_min: violation = ideal_min - value elif value > ideal_max: violation = value - ideal_max if param_range > 0: total_penalty += (violation / param_range) ERROR_THRESHOLD = 1e-4 if abs(prediction - target_mmbtu) < ERROR_THRESHOLD: return total_penalty else: return error_pred + total_penalty def optimize_one_target( target_mmbtu, model, poly_transformer, input_features, poly_feature_names, product_cfg, maxiter=100, popsize=30, ): """Optimasi inverse model untuk satu nilai target MMBTU.""" hard_bounds, soft_bounds, level = calculate_bounds(target_mmbtu, input_features, product_cfg) optimizer_bounds = [hard_bounds[param] for param in input_features] def obj_wrapper(params_array): return objective_function( params_array, target_mmbtu, model, poly_transformer, input_features, poly_feature_names, hard_bounds, soft_bounds, ) result = differential_evolution( func=obj_wrapper, bounds=optimizer_bounds, strategy="best1bin", maxiter=maxiter, popsize=popsize, tol=1e-4, mutation=(0.5, 1), recombination=0.7, seed=42, polish=True, atol=1e-6, disp=False, ) optimal_params = dict(zip(input_features, result.x)) final_pred = predict_mmbtu(result.x, model, poly_transformer, input_features, poly_feature_names) violations = [] for param, value in optimal_params.items(): ideal_min, ideal_max = soft_bounds[param] if not (ideal_min <= value <= ideal_max): violations.append(param) return { "target": float(target_mmbtu), "level": level, "optimal_params": optimal_params, "prediction": float(final_pred), "error": abs(final_pred - target_mmbtu), "error_pct": abs(final_pred - target_mmbtu) / target_mmbtu * 100.0, "objective_value": float(result.fun), "converged": bool(result.success), "iterations": int(result.nit), "soft_violations": violations, "hard_bounds": hard_bounds, "soft_bounds": soft_bounds, } def run_inverse_for_targets(model_path, product_name, targets): """ Wrapper: load model dan jalankan optimasi untuk list target. Dipanggil dari Dashboard. """ product_cfg = PRODUCT_CONFIG[product_name] model, poly_transformer, input_features, poly_feature_names = load_model(model_path) results = [] for t in targets: res = optimize_one_target( target_mmbtu=t, model=model, poly_transformer=poly_transformer, input_features=input_features, poly_feature_names=poly_feature_names, product_cfg=product_cfg, maxiter=100, popsize=30, ) results.append(res) return results def results_to_dataframe(results, product_name): """Convert list of result dicts menjadi DataFrame flat untuk ditampilkan / disimpan.""" rows = [] for r in results: base = { "Product": product_name, "Target_MMBTU": r["target"], "Level": r["level"], "Predicted_MMBTU": r["prediction"], "Error": r["error"], "Error_Pct": r["error_pct"], "Objective_Value": r["objective_value"], "Converged": r["converged"], "Iterations": r["iterations"], "Soft_Violations": ", ".join(r["soft_violations"]) if r["soft_violations"] else "", } for param, value in r["optimal_params"].items(): base[param] = value rows.append(base) return pd.DataFrame(rows)