EnergyInfrastructureAI / scenario_modeling.py
dangmanhtruong's picture
Clean commit with LFS-tracked images
845d5aa
from os.path import join as pjoin
from dataclasses import dataclass, field
from typing import Set, Union
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from scipy.spatial.distance import cdist
import json
import contextily as ctx
import nltk
from pdb import set_trace
from pyproj import Transformer
from geopy.geocoders import Nominatim
import utm
from shapely.geometry import Point
import geopandas as gpd
import geodatasets
import folium
from folium import plugins
import branca
from shapely.geometry import Point
from shapely.geometry.polygon import Polygon
import folium
import matplotlib.colors as mcolors
from folium.plugins import MarkerCluster, BeautifyIcon
import pydantic_ai
from pydantic_ai import RunContext
from pydantic_ai.messages import ModelRequest, ToolReturnPart
from config import BASE_PATH, DATASETS, DATASET_LIST, DATASET_LEGEND_DICT, SCENARIOS
from utils import calculate_distance, load_data_and_process
class MCDAEngine:
def __init__(self):
df_dict = {}
layer_list = DATASET_LIST
for layer_name in layer_list:
df_dict[layer_name] = load_data_and_process(layer_name)
self.df_dict = df_dict
def get_objective_list(self):
obj_list = ["safety", "economic", "technical", "environment"]
return obj_list
def calculate_objective(self, obj_name):
if obj_name == "safety":
obj_value = self.safety_objective()
elif obj_name == "economic":
obj_value = self.economic_objective()
elif obj_name == "technical":
obj_value = self.technical_objective()
elif obj_name == "environment":
obj_value = self.environment_objective()
else:
raise Exception(f"In class MCDAEngine, function calculate_objective: Undefined objective '{obj_name}.'")
return obj_value
def safety_objective(self):
df_dict = self.df_dict
n_licence = len(df_dict["licences"])
n_well = len(df_dict["wells"])
num_wells_within_licence = np.zeros(n_licence)
num_old_wells_within_licence = np.zeros(n_licence)
for licence_idx in range(n_licence):
licence = df_dict["licences"]["geometry"].iloc[licence_idx]
contains = licence.contains(df_dict["wells"]["geometry"])
num_wells_within_licence[licence_idx] = np.sum(contains)
for licence_idx in range(n_licence):
licence = df_dict["licences"]["geometry"].iloc[licence_idx]
contains = 0
for well_idx in range(n_well):
if (df_dict["wells"]['ORIGINSTAT'][well_idx] == 'Decommissioned') and (licence.contains(df_dict["wells"]["geometry"][well_idx])):
contains += 1
num_old_wells_within_licence[licence_idx] = contains
safety_obj = num_wells_within_licence + num_old_wells_within_licence
safety_obj_normalized = (safety_obj - np.min(safety_obj)) / (np.max(safety_obj) - np.min(safety_obj))
return safety_obj_normalized
def environment_objective(self):
df_dict = self.df_dict
n_licence = len(df_dict["licences"])
n_well = len(df_dict["wells"])
num_seismic_within_licence = np.zeros(n_licence)
for licence_idx in range(n_licence):
licence = df_dict["licences"]["geometry"].iloc[licence_idx]
contains = licence.contains(df_dict["seismic"]["geometry"])
num_seismic_within_licence[licence_idx] = np.sum(contains)
env_obj = num_seismic_within_licence
env_obj_normalized = (env_obj - np.min(env_obj)) / (np.max(env_obj) - np.min(env_obj))
return env_obj_normalized
def technical_objective(self):
df_dict = self.df_dict
n_licence = len(df_dict["licences"])
n_well = len(df_dict["wells"])
num_pipelines_going_through_licence = np.zeros(n_licence)
for licence_idx in range(n_licence):
licence = df_dict["licences"]["geometry"].iloc[licence_idx]
intersects = licence.intersects(df_dict["pipelines"]["geometry"])
num_pipelines_going_through_licence[licence_idx] = np.sum(intersects)
# ADD: Windfarms calculation for technical feasibility
num_windfarms_nearby_licence = np.zeros(n_licence)
if "windfarms" in df_dict:
for licence_idx in range(n_licence):
licence = df_dict["licences"]["geometry"].iloc[licence_idx]
# Check for windfarms within or intersecting the licence
intersects = licence.intersects(df_dict["windfarms"]["geometry"])
num_windfarms_nearby_licence[licence_idx] = np.sum(intersects)
# Combine pipelines and windfarms for technical score
tech_obj = num_pipelines_going_through_licence + num_windfarms_nearby_licence
# Normalize and invert for technical (make all objectives higher is worse)
tech_obj_normalized = (tech_obj - np.min(tech_obj)) / (np.max(tech_obj) - np.min(tech_obj))
tech_obj_normalized = 1 - tech_obj_normalized
return tech_obj_normalized
def economic_objective(self):
df_dict = self.df_dict
n_licence = len(df_dict["licences"])
n_well = len(df_dict["wells"])
dist_to_nearest_field_km = np.zeros(n_licence)
for licence_idx in range(n_licence):
licence = df_dict["licences"]["geometry"].iloc[licence_idx]
dist_min = np.min(licence.distance(df_dict["offshore_fields"]["geometry"]))
dist_to_nearest_field_km[licence_idx] = dist_min
econ_obj = dist_to_nearest_field_km
econ_obj_normalized = (econ_obj - np.min(econ_obj)) / (np.max(econ_obj) - np.min(econ_obj))
return econ_obj_normalized
def run_mcda(target:str, obj_1: str, obj_2: str, obj_3: str, obj_4: str,
w_1: float, w_2: float, w_3: float, w_4: float):
""" Do a Multi-Criterion Decision Analysis (MCDA) for the given target,
ranking by a number of given objectives.
Args:
target: The target for analysis (e.g. "licence")
obj_1: The 1st objective (e.g. "safety")
obj_2: The 2nd objective (e.g. "environment")
obj_3: The 3rd objective (e.g. "technical")
obj_4: The 4th objective (e.g. "economic")
w_1: The weight for the 1st objective
w_2: The weight for the 2nd objective
w_3: The weight for the 3rd objective
w_4: The weight for the 4rd objective
Return:
report: The final report in text form. Please note that for the objective scores, lower is better.
"""
df_dict = {}
layer_list = DATASET_LIST
for layer_name in layer_list:
df_dict[layer_name] = load_data_and_process(layer_name)
mcda_engine = MCDAEngine()
reference_obj_list = mcda_engine.get_objective_list()
obj_list = [obj_1, obj_2, obj_3, obj_4]
w_list = [w_1, w_2, w_3, w_4]
obj_value_list = []
print(f"Objectives ARE: {obj_list}")
for obj_name in obj_list:
dist_list = [nltk.edit_distance(obj_name, elem) for elem in reference_obj_list]
closest_obj_name = reference_obj_list[np.argmin(dist_list)]
obj_value = mcda_engine.calculate_objective(closest_obj_name)
obj_value_list.append(obj_value)
score = w_list[0]*obj_value_list[0]
for obj_idx, obj_value in enumerate(obj_value_list):
if obj_idx == 0:
continue
score += w_list[obj_idx]*obj_value_list[obj_idx]
df_licenses = df_dict["licences"].copy()
df_licenses["Score"] = score.tolist()
for obj_idx, obj_name in enumerate(obj_list):
df_licenses[f"{obj_name}_score"] = obj_value_list[obj_idx].tolist()
# Rank the licence blocks by score (higher is worse)
df_rank = df_licenses.sort_values("Score").iloc[:20,:].copy()
df_rank.insert(0, 'Rank', range(1, len(df_rank) + 1))
df_rank["Coordinates"] = df_rank["geometry"].centroid
df_rank = df_rank.drop('geometry', axis=1)
# report = df_rank.to_string(index=False)
# report = "REPORT OF 20 MOST RELEVANT POINTS FOUND DURING THE MULTI-CRITERIA DECISION ANALYSIS, ALONG WITH SCORES (lower score is better) \n\n" + report
report = generate_mcda_detailed_report(
df_rank, target, obj_1, obj_2, obj_3, obj_4, w_1, w_2, w_3, w_4
)
return report, df_rank
def normalize_weights(weights: dict) -> dict:
"""Ensure weights sum to 1."""
total = sum(weights.values())
if total == 0:
raise ValueError("All weights are zero!")
return {k: v / total for k, v in weights.items()}
def run_scenario_analysis(scenario_name: str, adjust: dict = None):
"""
Run MCDA scenario analysis. Optionally adjust weights after choosing scenario.
Args:
scenario_name: Name of the scenario (e.g. 'economic_focus').
adjust: Dictionary of adjustments (e.g. {'technical': +0.5, 'economic': -0.2}).
"""
if scenario_name not in SCENARIOS:
raise ValueError(f"Scenario '{scenario_name}' not found. Available: {list(SCENARIOS.keys())}")
# Start from scenario
weights = SCENARIOS[scenario_name].copy()
# Apply adjustments
if adjust:
for k, v in adjust.items():
if k not in weights:
raise ValueError(f"Unknown weight '{k}'. Must be one of {list(weights.keys())}")
weights[k] = max(0, weights[k] + v) # prevent negatives
# Normalize to sum = 1
weights = normalize_weights(weights)
# Run MCDA with updated weights
report, df_rank = run_mcda(
"licences", "safety", "environment", "technical", "economic",
weights["safety"], weights["environment"],
weights["technical"], weights["economic"]
)
return report, df_rank, weights
def generate_mcda_detailed_report(df_rank: gpd.GeoDataFrame,
target: str, obj_1: str, obj_2: str, obj_3: str, obj_4: str,
w_1: float, w_2: float, w_3: float, w_4: float) -> str:
"""Generate detailed MCDA report with proper formatting and analysis."""
report_lines = []
# Header
report_lines.append(f"# MULTI-CRITERIA DECISION ANALYSIS REPORT")
report_lines.append(f"## Analysis Target: {target.upper()}")
report_lines.append("=" * 60)
report_lines.append(f"**Analysis Date:** {pd.Timestamp.now().strftime('%Y-%m-%d %H:%M')}")
report_lines.append(f"**Total {target} Analyzed:** {len(df_rank)}")
report_lines.append("")
# Criteria and weights
report_lines.append("## EVALUATION CRITERIA & WEIGHTS")
total_weight = w_1 + w_2 + w_3 + w_4
report_lines.append(f"- **{obj_1.title()}:** {w_1/total_weight:.1%} (weight: {w_1})")
report_lines.append(f"- **{obj_2.title()}:** {w_2/total_weight:.1%} (weight: {w_2})")
report_lines.append(f"- **{obj_3.title()}:** {w_3/total_weight:.1%} (weight: {w_3})")
report_lines.append(f"- **{obj_4.title()}:** {w_4/total_weight:.1%} (weight: {w_4})")
report_lines.append("")
report_lines.append("**Scoring Note:** Lower scores indicate better performance")
report_lines.append("")
# Results summary
best_score = df_rank['Score'].min()
worst_score = df_rank['Score'].max()
avg_score = df_rank['Score'].mean()
report_lines.append("## RESULTS OVERVIEW")
report_lines.append(f"- **Best Overall Score:** {best_score:.3f}")
report_lines.append(f"- **Average Score:** {avg_score:.3f}")
report_lines.append(f"- **Worst Score:** {worst_score:.3f}")
report_lines.append(f"- **Score Range:** {worst_score - best_score:.3f}")
report_lines.append("")
# Performance categories
excellent_count = len(df_rank[df_rank['Score'] <= 0.3])
good_count = len(df_rank[(df_rank['Score'] > 0.3) & (df_rank['Score'] <= 0.5)])
fair_count = len(df_rank[(df_rank['Score'] > 0.5) & (df_rank['Score'] <= 0.7)])
poor_count = len(df_rank[df_rank['Score'] > 0.7])
report_lines.append("## PERFORMANCE DISTRIBUTION")
report_lines.append(f"- **Excellent (≤0.3):** {excellent_count} {target}s")
report_lines.append(f"- **Good (0.3-0.5):** {good_count} {target}s")
report_lines.append(f"- **Fair (0.5-0.7):** {fair_count} {target}s")
report_lines.append(f"- **Poor (>0.7):** {poor_count} {target}s")
report_lines.append("")
# Top performers table
report_lines.append("## TOP PERFORMING LOCATIONS")
report_lines.append("| Rank | Name | Overall Score | Safety | Environment | Technical | Economic | Coordinates |")
report_lines.append("|------|------|---------------|--------|-------------|-----------|----------|-------------|")
# for _, row in df_rank.head(15).iterrows():
# # Extract coordinates properly
# if hasattr(row.geometry, 'centroid'):
# coords = row.geometry.centroid
# coord_str = f"{coords.y:.2f}°N, {abs(coords.x):.2f}°{'W' if coords.x < 0 else 'E'}"
# else:
# coord_str = "N/A"
for _, row in df_rank.head(15).iterrows():
# Extract coordinates properly
try:
if hasattr(row['geometry'], 'centroid'):
coords = row['geometry'].centroid
coord_str = f"{coords.y:.2f}°N, {abs(coords.x):.2f}°{'W' if coords.x < 0 else 'E'}"
elif hasattr(row['geometry'], 'y'):
coord_str = f"{row['geometry'].y:.2f}°N, {abs(row['geometry'].x):.2f}°{'W' if row['geometry'].x < 0 else 'E'}"
else:
coord_str = "N/A"
except:
coord_str = "N/A"
# Performance indicator
score = row['Score']
if score <= 0.3:
indicator = "⭐"
elif score <= 0.5:
indicator = "✅"
elif score <= 0.7:
indicator = "⚠️"
else:
indicator = "❌"
report_lines.append(
f"| {int(row['Rank'])} {indicator} | {row['Name']} | {score:.3f} | "
f"{row['safety_score']:.3f} | {row['environment_score']:.3f} | "
f"{row['technical_score']:.3f} | {row['economic_score']:.3f} | {coord_str} |"
)
report_lines.append("")
# Criteria analysis
report_lines.append("## DETAILED CRITERIA ANALYSIS")
# Safety analysis
safety_avg = df_rank['safety_score'].mean()
safety_best = df_rank['safety_score'].min()
report_lines.append(f"### {obj_1.title()} Performance")
report_lines.append(f"- **Average Score:** {safety_avg:.3f}")
report_lines.append(f"- **Best Score:** {safety_best:.3f}")
if safety_avg < 0.3:
report_lines.append(f"- **Assessment:** Excellent {obj_1} conditions across the region")
elif safety_avg < 0.5:
report_lines.append(f"- **Assessment:** Good {obj_1} performance with some variation")
else:
report_lines.append(f"- **Assessment:** {obj_1.title()} concerns identified - requires attention")
report_lines.append("")
# Environment analysis
env_avg = df_rank['environment_score'].mean()
env_best = df_rank['environment_score'].min()
report_lines.append(f"### {obj_2.title()} Performance")
report_lines.append(f"- **Average Score:** {env_avg:.3f}")
report_lines.append(f"- **Best Score:** {env_best:.3f}")
if env_avg < 0.3:
report_lines.append(f"- **Assessment:** Low {obj_2} impact across most locations")
elif env_avg < 0.5:
report_lines.append(f"- **Assessment:** Moderate {obj_2} considerations")
else:
report_lines.append(f"- **Assessment:** Significant {obj_2} factors require careful management")
report_lines.append("")
# Recommendations
report_lines.append("## STRATEGIC RECOMMENDATIONS")
if excellent_count > 0:
report_lines.append(f"**Immediate Development Opportunities ({excellent_count} sites):**")
top_sites = df_rank.head(min(5, excellent_count))['Name'].tolist()
report_lines.append(f"- Priority sites: {', '.join(top_sites[:3])}")
report_lines.append("- These locations show optimal performance across all criteria")
report_lines.append("- Proceed with detailed feasibility studies")
report_lines.append("")
if good_count > 0:
report_lines.append(f"**Secondary Development Candidates ({good_count} sites):**")
report_lines.append("- Suitable for development with appropriate risk management")
report_lines.append("- Focus on addressing specific criterion weaknesses")
report_lines.append("")
if poor_count > len(df_rank) * 0.5:
report_lines.append("**Regional Assessment:**")
report_lines.append("- High proportion of challenging locations identified")
report_lines.append("- Consider expanding analysis to adjacent regions")
report_lines.append("- Review weighting criteria for regional appropriateness")
report_lines.append("")
# Next steps
report_lines.append("## RECOMMENDED NEXT STEPS")
report_lines.append("1. **Detailed Site Assessment:** Focus on top 5 ranked locations")
report_lines.append("2. **Risk Mitigation Planning:** Address specific criteria weaknesses")
report_lines.append("3. **Stakeholder Engagement:** Begin consultation for priority sites")
report_lines.append("4. **Regulatory Compliance:** Ensure alignment with licensing requirements")
report_lines.append("5. **Environmental Management:** Develop site-specific environmental plans")
report_lines.append("")
# Footer
report_lines.append("---")
report_lines.append("*Analysis completed using Multi-Criteria Decision Analysis (MCDA)*")
report_lines.append(f"*Weighted scoring across {obj_1}, {obj_2}, {obj_3}, and {obj_4} criteria*")
report_lines.append("*Lower scores indicate better performance - prioritize top-ranked locations*")
return "\n".join(report_lines)
def main():
report, df_rank, used_weights = run_scenario_analysis("economic_focus")
print("Scenario used:", used_weights)
print(report)
print("")
print(df_rank)
print("")
print("Used weights")
print(used_weights)
report, df_rank, used_weights = run_scenario_analysis("economic_focus", adjust={"technical": +0.5})
print("Scenario used:", used_weights)
print(report)
print("")
print(df_rank)
print("")
print("Used weights")
print(used_weights)
"""
w_1 = 0.9
w_2 = 0.5
w_3 = 0.3
w_4 = 0.3
report, df_rank = run_mcda(
# "licences", "safety", "environment", "technical", "economic",
"licences", "economic", "technical", "safety", "environment",
w_1, w_2, w_3, w_4)
"""
set_trace()
if __name__ == "__main__":
main()