kmi_dashboard / Inverse_Model.py
delima1234-Sunbright
KMI Dashboard
5e0490f
# Inverse_Model.py
import numpy as np
import pandas as pd
import joblib
from scipy.optimize import differential_evolution
# =========================================================
# KONFIGURASI GLOBAL (TIDAK ADA STREAMLIT DI FILE INI)
# =========================================================
# List produk yang digunakan untuk inverse model dan dapat juga di-import ke Dashboard
AVAILABLE_PRODUCTS = ["BMR BASE", "CKP BASE", "CKR BASE", "CMR BASE", "MORIGRO BASE"]
# Batas global parameter (dipakai di perhitungan bounds)
PARAMS_BOUNDS = {
"D101330TT": (92, 99),
"D102260TIC_CV": (35, 80),
"D102265TIC_PV": (160, 195),
"D102265TIC_CV": (10, 70),
"D102266TIC": (15, 22),
"D101264FTSCL": (3300, 4900),
}
# Konfigurasi per-produk: GAS range, korelasi, dan BINNING_DATA
PRODUCT_CONFIG = {
"CKR BASE": {
"gas_min": 0.20,
"gas_max": 0.35,
"param_corr": {
"D101330TT": "negatif",
"D102260TIC_CV": "positif",
"D102265TIC_PV": "positif",
"D102265TIC_CV": "positif",
"D102266TIC": "netral",
"D101264FTSCL": "positif",
},
"binning": {
(0.20, 0.275): {
"D101330TT": (92.01, 95.14),
"D102260TIC_CV": (44.0, 70.0),
"D102265TIC_PV": (173.96, 193.58),
"D102265TIC_CV": (19.68, 52.23),
"D102266TIC": (17.42, 18.59),
"D101264FTSCL": (3710.76, 4690.91),
},
(0.275, 0.35): {
"D101330TT": (92.01, 98.31),
"D102260TIC_CV": (38.0, 68.0),
"D102265TIC_PV": (171.59, 194.97),
"D102265TIC_CV": (15.29, 63.62),
"D102266TIC": (15.94, 19.59),
"D101264FTSCL": (3496.96, 4888.82),
},
},
},
"BMR BASE": {
"gas_min": 0.20,
"gas_max": 0.375,
"param_corr": {
"D101330TT": "netral",
"D102260TIC_CV": "positif",
"D102265TIC_PV": "positif",
"D102265TIC_CV": "positif",
"D102266TIC": "netral",
"D101264FTSCL": "positif",
},
"binning": {
(0.20, 0.275): {
"D101330TT": (92.62, 97.05),
"D102260TIC_CV": (38, 62),
"D102265TIC_PV": (171.64, 190.05),
"D102265TIC_CV": (14.47, 24.46),
"D102266TIC": (17.01, 18.98),
"D101264FTSCL": (3633.08, 4125.52),
},
(0.275, 0.375): {
"D101330TT": (92.23, 98.96),
"D102260TIC_CV": (36.0, 60.0),
"D102265TIC_PV": (171.64, 192.53),
"D102265TIC_CV": (11.75, 33.91),
"D102266TIC": (16.16, 18.43),
"D101264FTSCL": (3535.08, 4283.65),
},
},
},
"CKP BASE": {
"gas_min": 0.18,
"gas_max": 0.375,
"param_corr": {
"D101330TT": "netral",
"D102260TIC_CV": "positif",
"D102265TIC_PV": "positif",
"D102265TIC_CV": "positif",
"D102266TIC": "netral",
"D101264FTSCL": "positif",
},
"binning": {
(0.18, 0.28): {
"D101330TT": (92.01, 98.83),
"D102260TIC_CV": (36, 68),
"D102265TIC_PV": (168.11, 194.97),
"D102265TIC_CV": (13.99, 49.36),
"D102266TIC": (15.83, 18.84),
"D101264FTSCL": (3632.62, 4890.58),
},
(0.28, 0.38): {
"D101330TT": (92.01, 99.00),
"D102260TIC_CV": (38, 68),
"D102265TIC_PV": (169.50, 194.97),
"D102265TIC_CV": (13.93, 49.36),
"D102266TIC": (15.86, 19.02),
"D101264FTSCL": (3658.91, 4890.58),
},
},
},
"CMR BASE": {
"gas_min": 0.19,
"gas_max": 0.375,
"param_corr": {
"D101330TT": "netral",
"D102260TIC_CV": "positif",
"D102265TIC_PV": "positif",
"D102265TIC_CV": "positif",
"D102266TIC": "netral",
"D101264FTSCL": "positif",
},
"binning": {
(0.19, 0.275): {
"D101264FTSCL": (3618.73, 4539.96),
"D101330TT": (92.1, 98.91),
"D102260TIC_CV": (38, 62),
"D102265TIC_CV": (15.3, 26.01),
"D102265TIC_PV": (163.14, 192.25),
"D102266TIC": (16.35, 19.55),
},
(0.275, 0.375): {
"D101264FTSCL": (3445.31, 4684.92),
"D101330TT": (92.06, 99.0),
"D102260TIC_CV": (36, 64),
"D102265TIC_CV": (14.75, 39.87),
"D102265TIC_PV": (162.09, 191.96),
"D102266TIC": (16.2, 19.55),
},
},
},
"MORIGRO BASE": {
"gas_min": 0.12,
"gas_max": 0.375,
"param_corr": {
"D101330TT": "netral",
"D102260TIC_CV": "positif",
"D102265TIC_PV": "positif",
"D102265TIC_CV": "positif",
"D102266TIC": "netral",
"D101264FTSCL": "positif",
},
"binning": {
(0.12, 0.28): {
"D101264FTSCL": (3437.81, 3922.18),
"D101330TT": (92.01, 98.78),
"D102260TIC_CV": (36, 70),
"D102265TIC_CV": (20.0, 42.95),
"D102265TIC_PV": (179.98, 194.97),
"D102266TIC": (17.3, 18.32),
},
(0.28, 0.375): {
"D101264FTSCL": (3389.88, 4072.64),
"D101330TT": (92.01, 97.27),
"D102260TIC_CV": (38, 66),
"D102265TIC_CV": (19.65, 45.87),
"D102265TIC_PV": (180.32, 189.0),
"D102266TIC": (16.91, 18.32),
},
},
},
}
# =========================================================
# FUNGSI MODEL & OPTIMISASI (BACKEND)
# =========================================================
def load_model(model_path: str):
"""Load forward model XGBoost + poly_transformer dari checkpoint."""
deployment_bundle = joblib.load(model_path)
return (
deployment_bundle["model"],
deployment_bundle["poly_transformer"],
deployment_bundle["input_features"],
deployment_bundle["poly_feature_names"],
)
def predict_mmbtu(params_array, model, poly_transformer, input_features, poly_feature_names):
"""Prediksi GAS_MMBTU dari array parameter."""
params_dict = dict(zip(input_features, params_array))
X = pd.DataFrame([params_dict])[input_features]
X_poly = poly_transformer.transform(X)
X_poly_df = pd.DataFrame(X_poly, columns=poly_feature_names)
return float(model.predict(X_poly_df)[0])
def get_operational_bounds(target_mmbtu: float, binning: dict):
"""
Ambil bounds operasional dari BINNING_DATA terdekat.
- Jika target di dalam salah satu bin => pakai bin itu
- Jika di bawah minimum => pakai bin pertama
- Jika di atas maksimum => pakai bin terakhir
"""
bins_sorted = sorted(binning.keys(), key=lambda x: x[0]) # sort by lower bound
for (lo, hi) in bins_sorted:
if lo <= target_mmbtu <= hi:
return binning[(lo, hi)]
# fallback
if target_mmbtu < bins_sorted[0][0]:
return binning[bins_sorted[0]]
else:
return binning[bins_sorted[-1]]
def calculate_bounds(target_mmbtu, input_features, product_cfg):
"""
Hitung hard_bounds & soft_bounds untuk satu produk dan satu target MMBTU.
"""
gas_min = product_cfg["gas_min"]
gas_max = product_cfg["gas_max"]
binning = product_cfg["binning"]
param_corr = product_cfg["param_corr"]
# Step 1: SP_target dan level
gas_range = gas_max - gas_min
sp_target = (target_mmbtu - gas_min) / gas_range
sp_target = float(np.clip(sp_target, 0, 1))
sp_inverse = 1.0 - sp_target
if sp_target < 0.33:
level = "rendah"
elif sp_target < 0.67:
level = "menengah"
else:
level = "tinggi"
# Step 2: hard bounds dari binning
operational_bounds = get_operational_bounds(target_mmbtu, binning)
hard_bounds = {}
soft_bounds = {}
# Step 3: soft bounds per parameter
for param in input_features:
keras_min, keras_max = operational_bounds[param]
keras_range = keras_max - keras_min
hard_bounds[param] = (keras_min, keras_max)
korelasi = param_corr.get(param, "netral")
# SP relevan tergantung korelasi
sp = sp_inverse if korelasi == "negatif" else sp_target
# Target ideal global dari global bounds
min_global, max_global = PARAMS_BOUNDS[param]
range_global = max_global - min_global
target_ideal_global = min_global + (sp * range_global)
# Jika target ideal global masih within bin => pakai
if keras_min <= target_ideal_global <= keras_max:
target_ideal = target_ideal_global
else:
target_ideal = keras_min + (sp * keras_range)
buffer = 0.2 * keras_range # 20% dari range
if korelasi == "netral":
ideal_min = keras_min
ideal_max = keras_max
elif level == "rendah":
if korelasi == "positif":
ideal_min = keras_min
ideal_max = target_ideal + buffer
else: # negatif
ideal_min = target_ideal - buffer
ideal_max = keras_max
elif level == "menengah":
ideal_min = target_ideal - buffer
ideal_max = target_ideal + buffer
else: # tinggi
if korelasi == "positif":
ideal_min = target_ideal - buffer
ideal_max = keras_max
else: # negatif
ideal_min = keras_min
ideal_max = target_ideal + buffer
ideal_min = max(ideal_min, keras_min)
ideal_max = min(ideal_max, keras_max)
soft_bounds[param] = (ideal_min, ideal_max)
return hard_bounds, soft_bounds, level
def objective_function(
params_array,
target_mmbtu,
model,
poly_transformer,
input_features,
poly_feature_names,
hard_bounds,
soft_bounds,
):
"""Fungsi objektif untuk Differential Evolution."""
prediction = predict_mmbtu(params_array, model, poly_transformer, input_features, poly_feature_names)
error_pred = (prediction - target_mmbtu) ** 2
total_penalty = 0.0
for i, param in enumerate(input_features):
value = params_array[i]
ideal_min, ideal_max = soft_bounds[param]
keras_min, keras_max = hard_bounds[param]
param_range = keras_max - keras_min
violation = 0.0
if value < ideal_min:
violation = ideal_min - value
elif value > ideal_max:
violation = value - ideal_max
if param_range > 0:
total_penalty += (violation / param_range)
ERROR_THRESHOLD = 1e-4
if abs(prediction - target_mmbtu) < ERROR_THRESHOLD:
return total_penalty
else:
return error_pred + total_penalty
def optimize_one_target(
target_mmbtu,
model,
poly_transformer,
input_features,
poly_feature_names,
product_cfg,
maxiter=100,
popsize=30,
):
"""Optimasi inverse model untuk satu nilai target MMBTU."""
hard_bounds, soft_bounds, level = calculate_bounds(target_mmbtu, input_features, product_cfg)
optimizer_bounds = [hard_bounds[param] for param in input_features]
def obj_wrapper(params_array):
return objective_function(
params_array,
target_mmbtu,
model,
poly_transformer,
input_features,
poly_feature_names,
hard_bounds,
soft_bounds,
)
result = differential_evolution(
func=obj_wrapper,
bounds=optimizer_bounds,
strategy="best1bin",
maxiter=maxiter,
popsize=popsize,
tol=1e-4,
mutation=(0.5, 1),
recombination=0.7,
seed=42,
polish=True,
atol=1e-6,
disp=False,
)
optimal_params = dict(zip(input_features, result.x))
final_pred = predict_mmbtu(result.x, model, poly_transformer, input_features, poly_feature_names)
violations = []
for param, value in optimal_params.items():
ideal_min, ideal_max = soft_bounds[param]
if not (ideal_min <= value <= ideal_max):
violations.append(param)
return {
"target": float(target_mmbtu),
"level": level,
"optimal_params": optimal_params,
"prediction": float(final_pred),
"error": abs(final_pred - target_mmbtu),
"error_pct": abs(final_pred - target_mmbtu) / target_mmbtu * 100.0,
"objective_value": float(result.fun),
"converged": bool(result.success),
"iterations": int(result.nit),
"soft_violations": violations,
"hard_bounds": hard_bounds,
"soft_bounds": soft_bounds,
}
def run_inverse_for_targets(model_path, product_name, targets):
"""
Wrapper: load model dan jalankan optimasi untuk list target.
Dipanggil dari Dashboard.
"""
product_cfg = PRODUCT_CONFIG[product_name]
model, poly_transformer, input_features, poly_feature_names = load_model(model_path)
results = []
for t in targets:
res = optimize_one_target(
target_mmbtu=t,
model=model,
poly_transformer=poly_transformer,
input_features=input_features,
poly_feature_names=poly_feature_names,
product_cfg=product_cfg,
maxiter=100,
popsize=30,
)
results.append(res)
return results
def results_to_dataframe(results, product_name):
"""Convert list of result dicts menjadi DataFrame flat untuk ditampilkan / disimpan."""
rows = []
for r in results:
base = {
"Product": product_name,
"Target_MMBTU": r["target"],
"Level": r["level"],
"Predicted_MMBTU": r["prediction"],
"Error": r["error"],
"Error_Pct": r["error_pct"],
"Objective_Value": r["objective_value"],
"Converged": r["converged"],
"Iterations": r["iterations"],
"Soft_Violations": ", ".join(r["soft_violations"]) if r["soft_violations"] else "",
}
for param, value in r["optimal_params"].items():
base[param] = value
rows.append(base)
return pd.DataFrame(rows)