Spaces:
Configuration error
Configuration error
| # Inverse_Model.py | |
| import numpy as np | |
| import pandas as pd | |
| import joblib | |
| from scipy.optimize import differential_evolution | |
| # ========================================================= | |
| # KONFIGURASI GLOBAL (TIDAK ADA STREAMLIT DI FILE INI) | |
| # ========================================================= | |
| # List produk yang digunakan untuk inverse model dan dapat juga di-import ke Dashboard | |
| AVAILABLE_PRODUCTS = ["BMR BASE", "CKP BASE", "CKR BASE", "CMR BASE", "MORIGRO BASE"] | |
| # Batas global parameter (dipakai di perhitungan bounds) | |
| PARAMS_BOUNDS = { | |
| "D101330TT": (92, 99), | |
| "D102260TIC_CV": (35, 80), | |
| "D102265TIC_PV": (160, 195), | |
| "D102265TIC_CV": (10, 70), | |
| "D102266TIC": (15, 22), | |
| "D101264FTSCL": (3300, 4900), | |
| } | |
| # Konfigurasi per-produk: GAS range, korelasi, dan BINNING_DATA | |
| PRODUCT_CONFIG = { | |
| "CKR BASE": { | |
| "gas_min": 0.20, | |
| "gas_max": 0.35, | |
| "param_corr": { | |
| "D101330TT": "negatif", | |
| "D102260TIC_CV": "positif", | |
| "D102265TIC_PV": "positif", | |
| "D102265TIC_CV": "positif", | |
| "D102266TIC": "netral", | |
| "D101264FTSCL": "positif", | |
| }, | |
| "binning": { | |
| (0.20, 0.275): { | |
| "D101330TT": (92.01, 95.14), | |
| "D102260TIC_CV": (44.0, 70.0), | |
| "D102265TIC_PV": (173.96, 193.58), | |
| "D102265TIC_CV": (19.68, 52.23), | |
| "D102266TIC": (17.42, 18.59), | |
| "D101264FTSCL": (3710.76, 4690.91), | |
| }, | |
| (0.275, 0.35): { | |
| "D101330TT": (92.01, 98.31), | |
| "D102260TIC_CV": (38.0, 68.0), | |
| "D102265TIC_PV": (171.59, 194.97), | |
| "D102265TIC_CV": (15.29, 63.62), | |
| "D102266TIC": (15.94, 19.59), | |
| "D101264FTSCL": (3496.96, 4888.82), | |
| }, | |
| }, | |
| }, | |
| "BMR BASE": { | |
| "gas_min": 0.20, | |
| "gas_max": 0.375, | |
| "param_corr": { | |
| "D101330TT": "netral", | |
| "D102260TIC_CV": "positif", | |
| "D102265TIC_PV": "positif", | |
| "D102265TIC_CV": "positif", | |
| "D102266TIC": "netral", | |
| "D101264FTSCL": "positif", | |
| }, | |
| "binning": { | |
| (0.20, 0.275): { | |
| "D101330TT": (92.62, 97.05), | |
| "D102260TIC_CV": (38, 62), | |
| "D102265TIC_PV": (171.64, 190.05), | |
| "D102265TIC_CV": (14.47, 24.46), | |
| "D102266TIC": (17.01, 18.98), | |
| "D101264FTSCL": (3633.08, 4125.52), | |
| }, | |
| (0.275, 0.375): { | |
| "D101330TT": (92.23, 98.96), | |
| "D102260TIC_CV": (36.0, 60.0), | |
| "D102265TIC_PV": (171.64, 192.53), | |
| "D102265TIC_CV": (11.75, 33.91), | |
| "D102266TIC": (16.16, 18.43), | |
| "D101264FTSCL": (3535.08, 4283.65), | |
| }, | |
| }, | |
| }, | |
| "CKP BASE": { | |
| "gas_min": 0.18, | |
| "gas_max": 0.375, | |
| "param_corr": { | |
| "D101330TT": "netral", | |
| "D102260TIC_CV": "positif", | |
| "D102265TIC_PV": "positif", | |
| "D102265TIC_CV": "positif", | |
| "D102266TIC": "netral", | |
| "D101264FTSCL": "positif", | |
| }, | |
| "binning": { | |
| (0.18, 0.28): { | |
| "D101330TT": (92.01, 98.83), | |
| "D102260TIC_CV": (36, 68), | |
| "D102265TIC_PV": (168.11, 194.97), | |
| "D102265TIC_CV": (13.99, 49.36), | |
| "D102266TIC": (15.83, 18.84), | |
| "D101264FTSCL": (3632.62, 4890.58), | |
| }, | |
| (0.28, 0.38): { | |
| "D101330TT": (92.01, 99.00), | |
| "D102260TIC_CV": (38, 68), | |
| "D102265TIC_PV": (169.50, 194.97), | |
| "D102265TIC_CV": (13.93, 49.36), | |
| "D102266TIC": (15.86, 19.02), | |
| "D101264FTSCL": (3658.91, 4890.58), | |
| }, | |
| }, | |
| }, | |
| "CMR BASE": { | |
| "gas_min": 0.19, | |
| "gas_max": 0.375, | |
| "param_corr": { | |
| "D101330TT": "netral", | |
| "D102260TIC_CV": "positif", | |
| "D102265TIC_PV": "positif", | |
| "D102265TIC_CV": "positif", | |
| "D102266TIC": "netral", | |
| "D101264FTSCL": "positif", | |
| }, | |
| "binning": { | |
| (0.19, 0.275): { | |
| "D101264FTSCL": (3618.73, 4539.96), | |
| "D101330TT": (92.1, 98.91), | |
| "D102260TIC_CV": (38, 62), | |
| "D102265TIC_CV": (15.3, 26.01), | |
| "D102265TIC_PV": (163.14, 192.25), | |
| "D102266TIC": (16.35, 19.55), | |
| }, | |
| (0.275, 0.375): { | |
| "D101264FTSCL": (3445.31, 4684.92), | |
| "D101330TT": (92.06, 99.0), | |
| "D102260TIC_CV": (36, 64), | |
| "D102265TIC_CV": (14.75, 39.87), | |
| "D102265TIC_PV": (162.09, 191.96), | |
| "D102266TIC": (16.2, 19.55), | |
| }, | |
| }, | |
| }, | |
| "MORIGRO BASE": { | |
| "gas_min": 0.12, | |
| "gas_max": 0.375, | |
| "param_corr": { | |
| "D101330TT": "netral", | |
| "D102260TIC_CV": "positif", | |
| "D102265TIC_PV": "positif", | |
| "D102265TIC_CV": "positif", | |
| "D102266TIC": "netral", | |
| "D101264FTSCL": "positif", | |
| }, | |
| "binning": { | |
| (0.12, 0.28): { | |
| "D101264FTSCL": (3437.81, 3922.18), | |
| "D101330TT": (92.01, 98.78), | |
| "D102260TIC_CV": (36, 70), | |
| "D102265TIC_CV": (20.0, 42.95), | |
| "D102265TIC_PV": (179.98, 194.97), | |
| "D102266TIC": (17.3, 18.32), | |
| }, | |
| (0.28, 0.375): { | |
| "D101264FTSCL": (3389.88, 4072.64), | |
| "D101330TT": (92.01, 97.27), | |
| "D102260TIC_CV": (38, 66), | |
| "D102265TIC_CV": (19.65, 45.87), | |
| "D102265TIC_PV": (180.32, 189.0), | |
| "D102266TIC": (16.91, 18.32), | |
| }, | |
| }, | |
| }, | |
| } | |
| # ========================================================= | |
| # FUNGSI MODEL & OPTIMISASI (BACKEND) | |
| # ========================================================= | |
| def load_model(model_path: str): | |
| """Load forward model XGBoost + poly_transformer dari checkpoint.""" | |
| deployment_bundle = joblib.load(model_path) | |
| return ( | |
| deployment_bundle["model"], | |
| deployment_bundle["poly_transformer"], | |
| deployment_bundle["input_features"], | |
| deployment_bundle["poly_feature_names"], | |
| ) | |
| def predict_mmbtu(params_array, model, poly_transformer, input_features, poly_feature_names): | |
| """Prediksi GAS_MMBTU dari array parameter.""" | |
| params_dict = dict(zip(input_features, params_array)) | |
| X = pd.DataFrame([params_dict])[input_features] | |
| X_poly = poly_transformer.transform(X) | |
| X_poly_df = pd.DataFrame(X_poly, columns=poly_feature_names) | |
| return float(model.predict(X_poly_df)[0]) | |
| def get_operational_bounds(target_mmbtu: float, binning: dict): | |
| """ | |
| Ambil bounds operasional dari BINNING_DATA terdekat. | |
| - Jika target di dalam salah satu bin => pakai bin itu | |
| - Jika di bawah minimum => pakai bin pertama | |
| - Jika di atas maksimum => pakai bin terakhir | |
| """ | |
| bins_sorted = sorted(binning.keys(), key=lambda x: x[0]) # sort by lower bound | |
| for (lo, hi) in bins_sorted: | |
| if lo <= target_mmbtu <= hi: | |
| return binning[(lo, hi)] | |
| # fallback | |
| if target_mmbtu < bins_sorted[0][0]: | |
| return binning[bins_sorted[0]] | |
| else: | |
| return binning[bins_sorted[-1]] | |
| def calculate_bounds(target_mmbtu, input_features, product_cfg): | |
| """ | |
| Hitung hard_bounds & soft_bounds untuk satu produk dan satu target MMBTU. | |
| """ | |
| gas_min = product_cfg["gas_min"] | |
| gas_max = product_cfg["gas_max"] | |
| binning = product_cfg["binning"] | |
| param_corr = product_cfg["param_corr"] | |
| # Step 1: SP_target dan level | |
| gas_range = gas_max - gas_min | |
| sp_target = (target_mmbtu - gas_min) / gas_range | |
| sp_target = float(np.clip(sp_target, 0, 1)) | |
| sp_inverse = 1.0 - sp_target | |
| if sp_target < 0.33: | |
| level = "rendah" | |
| elif sp_target < 0.67: | |
| level = "menengah" | |
| else: | |
| level = "tinggi" | |
| # Step 2: hard bounds dari binning | |
| operational_bounds = get_operational_bounds(target_mmbtu, binning) | |
| hard_bounds = {} | |
| soft_bounds = {} | |
| # Step 3: soft bounds per parameter | |
| for param in input_features: | |
| keras_min, keras_max = operational_bounds[param] | |
| keras_range = keras_max - keras_min | |
| hard_bounds[param] = (keras_min, keras_max) | |
| korelasi = param_corr.get(param, "netral") | |
| # SP relevan tergantung korelasi | |
| sp = sp_inverse if korelasi == "negatif" else sp_target | |
| # Target ideal global dari global bounds | |
| min_global, max_global = PARAMS_BOUNDS[param] | |
| range_global = max_global - min_global | |
| target_ideal_global = min_global + (sp * range_global) | |
| # Jika target ideal global masih within bin => pakai | |
| if keras_min <= target_ideal_global <= keras_max: | |
| target_ideal = target_ideal_global | |
| else: | |
| target_ideal = keras_min + (sp * keras_range) | |
| buffer = 0.2 * keras_range # 20% dari range | |
| if korelasi == "netral": | |
| ideal_min = keras_min | |
| ideal_max = keras_max | |
| elif level == "rendah": | |
| if korelasi == "positif": | |
| ideal_min = keras_min | |
| ideal_max = target_ideal + buffer | |
| else: # negatif | |
| ideal_min = target_ideal - buffer | |
| ideal_max = keras_max | |
| elif level == "menengah": | |
| ideal_min = target_ideal - buffer | |
| ideal_max = target_ideal + buffer | |
| else: # tinggi | |
| if korelasi == "positif": | |
| ideal_min = target_ideal - buffer | |
| ideal_max = keras_max | |
| else: # negatif | |
| ideal_min = keras_min | |
| ideal_max = target_ideal + buffer | |
| ideal_min = max(ideal_min, keras_min) | |
| ideal_max = min(ideal_max, keras_max) | |
| soft_bounds[param] = (ideal_min, ideal_max) | |
| return hard_bounds, soft_bounds, level | |
| def objective_function( | |
| params_array, | |
| target_mmbtu, | |
| model, | |
| poly_transformer, | |
| input_features, | |
| poly_feature_names, | |
| hard_bounds, | |
| soft_bounds, | |
| ): | |
| """Fungsi objektif untuk Differential Evolution.""" | |
| prediction = predict_mmbtu(params_array, model, poly_transformer, input_features, poly_feature_names) | |
| error_pred = (prediction - target_mmbtu) ** 2 | |
| total_penalty = 0.0 | |
| for i, param in enumerate(input_features): | |
| value = params_array[i] | |
| ideal_min, ideal_max = soft_bounds[param] | |
| keras_min, keras_max = hard_bounds[param] | |
| param_range = keras_max - keras_min | |
| violation = 0.0 | |
| if value < ideal_min: | |
| violation = ideal_min - value | |
| elif value > ideal_max: | |
| violation = value - ideal_max | |
| if param_range > 0: | |
| total_penalty += (violation / param_range) | |
| ERROR_THRESHOLD = 1e-4 | |
| if abs(prediction - target_mmbtu) < ERROR_THRESHOLD: | |
| return total_penalty | |
| else: | |
| return error_pred + total_penalty | |
| def optimize_one_target( | |
| target_mmbtu, | |
| model, | |
| poly_transformer, | |
| input_features, | |
| poly_feature_names, | |
| product_cfg, | |
| maxiter=100, | |
| popsize=30, | |
| ): | |
| """Optimasi inverse model untuk satu nilai target MMBTU.""" | |
| hard_bounds, soft_bounds, level = calculate_bounds(target_mmbtu, input_features, product_cfg) | |
| optimizer_bounds = [hard_bounds[param] for param in input_features] | |
| def obj_wrapper(params_array): | |
| return objective_function( | |
| params_array, | |
| target_mmbtu, | |
| model, | |
| poly_transformer, | |
| input_features, | |
| poly_feature_names, | |
| hard_bounds, | |
| soft_bounds, | |
| ) | |
| result = differential_evolution( | |
| func=obj_wrapper, | |
| bounds=optimizer_bounds, | |
| strategy="best1bin", | |
| maxiter=maxiter, | |
| popsize=popsize, | |
| tol=1e-4, | |
| mutation=(0.5, 1), | |
| recombination=0.7, | |
| seed=42, | |
| polish=True, | |
| atol=1e-6, | |
| disp=False, | |
| ) | |
| optimal_params = dict(zip(input_features, result.x)) | |
| final_pred = predict_mmbtu(result.x, model, poly_transformer, input_features, poly_feature_names) | |
| violations = [] | |
| for param, value in optimal_params.items(): | |
| ideal_min, ideal_max = soft_bounds[param] | |
| if not (ideal_min <= value <= ideal_max): | |
| violations.append(param) | |
| return { | |
| "target": float(target_mmbtu), | |
| "level": level, | |
| "optimal_params": optimal_params, | |
| "prediction": float(final_pred), | |
| "error": abs(final_pred - target_mmbtu), | |
| "error_pct": abs(final_pred - target_mmbtu) / target_mmbtu * 100.0, | |
| "objective_value": float(result.fun), | |
| "converged": bool(result.success), | |
| "iterations": int(result.nit), | |
| "soft_violations": violations, | |
| "hard_bounds": hard_bounds, | |
| "soft_bounds": soft_bounds, | |
| } | |
| def run_inverse_for_targets(model_path, product_name, targets): | |
| """ | |
| Wrapper: load model dan jalankan optimasi untuk list target. | |
| Dipanggil dari Dashboard. | |
| """ | |
| product_cfg = PRODUCT_CONFIG[product_name] | |
| model, poly_transformer, input_features, poly_feature_names = load_model(model_path) | |
| results = [] | |
| for t in targets: | |
| res = optimize_one_target( | |
| target_mmbtu=t, | |
| model=model, | |
| poly_transformer=poly_transformer, | |
| input_features=input_features, | |
| poly_feature_names=poly_feature_names, | |
| product_cfg=product_cfg, | |
| maxiter=100, | |
| popsize=30, | |
| ) | |
| results.append(res) | |
| return results | |
| def results_to_dataframe(results, product_name): | |
| """Convert list of result dicts menjadi DataFrame flat untuk ditampilkan / disimpan.""" | |
| rows = [] | |
| for r in results: | |
| base = { | |
| "Product": product_name, | |
| "Target_MMBTU": r["target"], | |
| "Level": r["level"], | |
| "Predicted_MMBTU": r["prediction"], | |
| "Error": r["error"], | |
| "Error_Pct": r["error_pct"], | |
| "Objective_Value": r["objective_value"], | |
| "Converged": r["converged"], | |
| "Iterations": r["iterations"], | |
| "Soft_Violations": ", ".join(r["soft_violations"]) if r["soft_violations"] else "", | |
| } | |
| for param, value in r["optimal_params"].items(): | |
| base[param] = value | |
| rows.append(base) | |
| return pd.DataFrame(rows) | |