| | from os.path import join as pjoin |
| | from dataclasses import dataclass, field |
| | from typing import Set, Union |
| | import matplotlib.pyplot as plt |
| | import numpy as np |
| | import pandas as pd |
| | from scipy.spatial.distance import cdist |
| | import json |
| | import contextily as ctx |
| | import nltk |
| | from pdb import set_trace |
| |
|
| | from pyproj import Transformer |
| | from geopy.geocoders import Nominatim |
| | import utm |
| | from shapely.geometry import Point |
| | import geopandas as gpd |
| | import geodatasets |
| | import folium |
| | from folium import plugins |
| | import branca |
| | from shapely.geometry import Point |
| | from shapely.geometry.polygon import Polygon |
| | import folium |
| | import matplotlib.colors as mcolors |
| | from folium.plugins import MarkerCluster, BeautifyIcon |
| |
|
| | import pydantic_ai |
| | from pydantic_ai import RunContext |
| | from pydantic_ai.messages import ModelRequest, ToolReturnPart |
| |
|
| | from config import BASE_PATH, DATASETS, DATASET_LIST, DATASET_LEGEND_DICT, SCENARIOS |
| | from utils import calculate_distance, load_data_and_process |
| |
|
| |
|
| | class MCDAEngine: |
| | def __init__(self): |
| | df_dict = {} |
| | layer_list = DATASET_LIST |
| |
|
| | for layer_name in layer_list: |
| | df_dict[layer_name] = load_data_and_process(layer_name) |
| |
|
| | self.df_dict = df_dict |
| |
|
| |
|
| | def get_objective_list(self): |
| | obj_list = ["safety", "economic", "technical", "environment"] |
| | return obj_list |
| |
|
| |
|
| | def calculate_objective(self, obj_name): |
| | if obj_name == "safety": |
| | obj_value = self.safety_objective() |
| | elif obj_name == "economic": |
| | obj_value = self.economic_objective() |
| | elif obj_name == "technical": |
| | obj_value = self.technical_objective() |
| | elif obj_name == "environment": |
| | obj_value = self.environment_objective() |
| | else: |
| | raise Exception(f"In class MCDAEngine, function calculate_objective: Undefined objective '{obj_name}.'") |
| | |
| | return obj_value |
| |
|
| |
|
| | def safety_objective(self): |
| | df_dict = self.df_dict |
| |
|
| | n_licence = len(df_dict["licences"]) |
| | n_well = len(df_dict["wells"]) |
| | |
| | num_wells_within_licence = np.zeros(n_licence) |
| | num_old_wells_within_licence = np.zeros(n_licence) |
| | for licence_idx in range(n_licence): |
| | licence = df_dict["licences"]["geometry"].iloc[licence_idx] |
| | contains = licence.contains(df_dict["wells"]["geometry"]) |
| | num_wells_within_licence[licence_idx] = np.sum(contains) |
| | for licence_idx in range(n_licence): |
| | licence = df_dict["licences"]["geometry"].iloc[licence_idx] |
| | contains = 0 |
| | for well_idx in range(n_well): |
| | if (df_dict["wells"]['ORIGINSTAT'][well_idx] == 'Decommissioned') and (licence.contains(df_dict["wells"]["geometry"][well_idx])): |
| | contains += 1 |
| | num_old_wells_within_licence[licence_idx] = contains |
| | safety_obj = num_wells_within_licence + num_old_wells_within_licence |
| | safety_obj_normalized = (safety_obj - np.min(safety_obj)) / (np.max(safety_obj) - np.min(safety_obj)) |
| |
|
| | return safety_obj_normalized |
| |
|
| |
|
| | def environment_objective(self): |
| | df_dict = self.df_dict |
| |
|
| | n_licence = len(df_dict["licences"]) |
| | n_well = len(df_dict["wells"]) |
| |
|
| | num_seismic_within_licence = np.zeros(n_licence) |
| | for licence_idx in range(n_licence): |
| | licence = df_dict["licences"]["geometry"].iloc[licence_idx] |
| | contains = licence.contains(df_dict["seismic"]["geometry"]) |
| | num_seismic_within_licence[licence_idx] = np.sum(contains) |
| | env_obj = num_seismic_within_licence |
| | env_obj_normalized = (env_obj - np.min(env_obj)) / (np.max(env_obj) - np.min(env_obj)) |
| | |
| | return env_obj_normalized |
| |
|
| |
|
| | def technical_objective(self): |
| | df_dict = self.df_dict |
| |
|
| | n_licence = len(df_dict["licences"]) |
| | n_well = len(df_dict["wells"]) |
| |
|
| | num_pipelines_going_through_licence = np.zeros(n_licence) |
| | for licence_idx in range(n_licence): |
| | licence = df_dict["licences"]["geometry"].iloc[licence_idx] |
| | intersects = licence.intersects(df_dict["pipelines"]["geometry"]) |
| | num_pipelines_going_through_licence[licence_idx] = np.sum(intersects) |
| |
|
| | |
| | num_windfarms_nearby_licence = np.zeros(n_licence) |
| | if "windfarms" in df_dict: |
| | for licence_idx in range(n_licence): |
| | licence = df_dict["licences"]["geometry"].iloc[licence_idx] |
| | |
| | intersects = licence.intersects(df_dict["windfarms"]["geometry"]) |
| | num_windfarms_nearby_licence[licence_idx] = np.sum(intersects) |
| |
|
| | |
| | tech_obj = num_pipelines_going_through_licence + num_windfarms_nearby_licence |
| | |
| | |
| | tech_obj_normalized = (tech_obj - np.min(tech_obj)) / (np.max(tech_obj) - np.min(tech_obj)) |
| | tech_obj_normalized = 1 - tech_obj_normalized |
| |
|
| | return tech_obj_normalized |
| | |
| |
|
| | def economic_objective(self): |
| | df_dict = self.df_dict |
| |
|
| | n_licence = len(df_dict["licences"]) |
| | n_well = len(df_dict["wells"]) |
| |
|
| | dist_to_nearest_field_km = np.zeros(n_licence) |
| | for licence_idx in range(n_licence): |
| | licence = df_dict["licences"]["geometry"].iloc[licence_idx] |
| | dist_min = np.min(licence.distance(df_dict["offshore_fields"]["geometry"])) |
| | dist_to_nearest_field_km[licence_idx] = dist_min |
| | econ_obj = dist_to_nearest_field_km |
| | econ_obj_normalized = (econ_obj - np.min(econ_obj)) / (np.max(econ_obj) - np.min(econ_obj)) |
| |
|
| | return econ_obj_normalized |
| |
|
| |
|
| | def run_mcda(target:str, obj_1: str, obj_2: str, obj_3: str, obj_4: str, |
| | w_1: float, w_2: float, w_3: float, w_4: float): |
| | """ Do a Multi-Criterion Decision Analysis (MCDA) for the given target, |
| | ranking by a number of given objectives. |
| | Args: |
| | target: The target for analysis (e.g. "licence") |
| | obj_1: The 1st objective (e.g. "safety") |
| | obj_2: The 2nd objective (e.g. "environment") |
| | obj_3: The 3rd objective (e.g. "technical") |
| | obj_4: The 4th objective (e.g. "economic") |
| | w_1: The weight for the 1st objective |
| | w_2: The weight for the 2nd objective |
| | w_3: The weight for the 3rd objective |
| | w_4: The weight for the 4rd objective |
| | Return: |
| | report: The final report in text form. Please note that for the objective scores, lower is better. |
| | """ |
| |
|
| | df_dict = {} |
| | layer_list = DATASET_LIST |
| |
|
| | for layer_name in layer_list: |
| | df_dict[layer_name] = load_data_and_process(layer_name) |
| |
|
| | mcda_engine = MCDAEngine() |
| | reference_obj_list = mcda_engine.get_objective_list() |
| |
|
| | obj_list = [obj_1, obj_2, obj_3, obj_4] |
| | w_list = [w_1, w_2, w_3, w_4] |
| | obj_value_list = [] |
| | print(f"Objectives ARE: {obj_list}") |
| | for obj_name in obj_list: |
| | dist_list = [nltk.edit_distance(obj_name, elem) for elem in reference_obj_list] |
| | closest_obj_name = reference_obj_list[np.argmin(dist_list)] |
| | obj_value = mcda_engine.calculate_objective(closest_obj_name) |
| | obj_value_list.append(obj_value) |
| |
|
| | score = w_list[0]*obj_value_list[0] |
| | for obj_idx, obj_value in enumerate(obj_value_list): |
| | if obj_idx == 0: |
| | continue |
| | score += w_list[obj_idx]*obj_value_list[obj_idx] |
| |
|
| | df_licenses = df_dict["licences"].copy() |
| | df_licenses["Score"] = score.tolist() |
| | for obj_idx, obj_name in enumerate(obj_list): |
| | df_licenses[f"{obj_name}_score"] = obj_value_list[obj_idx].tolist() |
| |
|
| | |
| | df_rank = df_licenses.sort_values("Score").iloc[:20,:].copy() |
| | df_rank.insert(0, 'Rank', range(1, len(df_rank) + 1)) |
| |
|
| | df_rank["Coordinates"] = df_rank["geometry"].centroid |
| | df_rank = df_rank.drop('geometry', axis=1) |
| | |
| | |
| | |
| | report = generate_mcda_detailed_report( |
| | df_rank, target, obj_1, obj_2, obj_3, obj_4, w_1, w_2, w_3, w_4 |
| | ) |
| |
|
| | return report, df_rank |
| |
|
| |
|
| | def normalize_weights(weights: dict) -> dict: |
| | """Ensure weights sum to 1.""" |
| | total = sum(weights.values()) |
| | if total == 0: |
| | raise ValueError("All weights are zero!") |
| | return {k: v / total for k, v in weights.items()} |
| |
|
| |
|
| | def run_scenario_analysis(scenario_name: str, adjust: dict = None): |
| | """ |
| | Run MCDA scenario analysis. Optionally adjust weights after choosing scenario. |
| | |
| | Args: |
| | scenario_name: Name of the scenario (e.g. 'economic_focus'). |
| | adjust: Dictionary of adjustments (e.g. {'technical': +0.5, 'economic': -0.2}). |
| | """ |
| |
|
| | if scenario_name not in SCENARIOS: |
| | raise ValueError(f"Scenario '{scenario_name}' not found. Available: {list(SCENARIOS.keys())}") |
| | |
| | |
| | weights = SCENARIOS[scenario_name].copy() |
| |
|
| | |
| | if adjust: |
| | for k, v in adjust.items(): |
| | if k not in weights: |
| | raise ValueError(f"Unknown weight '{k}'. Must be one of {list(weights.keys())}") |
| | weights[k] = max(0, weights[k] + v) |
| | |
| | |
| | weights = normalize_weights(weights) |
| |
|
| | |
| | report, df_rank = run_mcda( |
| | "licences", "safety", "environment", "technical", "economic", |
| | weights["safety"], weights["environment"], |
| | weights["technical"], weights["economic"] |
| | ) |
| |
|
| | return report, df_rank, weights |
| |
|
| |
|
| | def generate_mcda_detailed_report(df_rank: gpd.GeoDataFrame, |
| | target: str, obj_1: str, obj_2: str, obj_3: str, obj_4: str, |
| | w_1: float, w_2: float, w_3: float, w_4: float) -> str: |
| | """Generate detailed MCDA report with proper formatting and analysis.""" |
| | |
| | report_lines = [] |
| | |
| | |
| | report_lines.append(f"# MULTI-CRITERIA DECISION ANALYSIS REPORT") |
| | report_lines.append(f"## Analysis Target: {target.upper()}") |
| | report_lines.append("=" * 60) |
| | report_lines.append(f"**Analysis Date:** {pd.Timestamp.now().strftime('%Y-%m-%d %H:%M')}") |
| | report_lines.append(f"**Total {target} Analyzed:** {len(df_rank)}") |
| | report_lines.append("") |
| | |
| | |
| | report_lines.append("## EVALUATION CRITERIA & WEIGHTS") |
| | total_weight = w_1 + w_2 + w_3 + w_4 |
| | report_lines.append(f"- **{obj_1.title()}:** {w_1/total_weight:.1%} (weight: {w_1})") |
| | report_lines.append(f"- **{obj_2.title()}:** {w_2/total_weight:.1%} (weight: {w_2})") |
| | report_lines.append(f"- **{obj_3.title()}:** {w_3/total_weight:.1%} (weight: {w_3})") |
| | report_lines.append(f"- **{obj_4.title()}:** {w_4/total_weight:.1%} (weight: {w_4})") |
| | report_lines.append("") |
| | report_lines.append("**Scoring Note:** Lower scores indicate better performance") |
| | report_lines.append("") |
| | |
| | |
| | best_score = df_rank['Score'].min() |
| | worst_score = df_rank['Score'].max() |
| | avg_score = df_rank['Score'].mean() |
| | |
| | report_lines.append("## RESULTS OVERVIEW") |
| | report_lines.append(f"- **Best Overall Score:** {best_score:.3f}") |
| | report_lines.append(f"- **Average Score:** {avg_score:.3f}") |
| | report_lines.append(f"- **Worst Score:** {worst_score:.3f}") |
| | report_lines.append(f"- **Score Range:** {worst_score - best_score:.3f}") |
| | report_lines.append("") |
| | |
| | |
| | excellent_count = len(df_rank[df_rank['Score'] <= 0.3]) |
| | good_count = len(df_rank[(df_rank['Score'] > 0.3) & (df_rank['Score'] <= 0.5)]) |
| | fair_count = len(df_rank[(df_rank['Score'] > 0.5) & (df_rank['Score'] <= 0.7)]) |
| | poor_count = len(df_rank[df_rank['Score'] > 0.7]) |
| | |
| | report_lines.append("## PERFORMANCE DISTRIBUTION") |
| | report_lines.append(f"- **Excellent (≤0.3):** {excellent_count} {target}s") |
| | report_lines.append(f"- **Good (0.3-0.5):** {good_count} {target}s") |
| | report_lines.append(f"- **Fair (0.5-0.7):** {fair_count} {target}s") |
| | report_lines.append(f"- **Poor (>0.7):** {poor_count} {target}s") |
| | report_lines.append("") |
| | |
| | |
| | report_lines.append("## TOP PERFORMING LOCATIONS") |
| | report_lines.append("| Rank | Name | Overall Score | Safety | Environment | Technical | Economic | Coordinates |") |
| | report_lines.append("|------|------|---------------|--------|-------------|-----------|----------|-------------|") |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | for _, row in df_rank.head(15).iterrows(): |
| | |
| | try: |
| | if hasattr(row['geometry'], 'centroid'): |
| | coords = row['geometry'].centroid |
| | coord_str = f"{coords.y:.2f}°N, {abs(coords.x):.2f}°{'W' if coords.x < 0 else 'E'}" |
| | elif hasattr(row['geometry'], 'y'): |
| | coord_str = f"{row['geometry'].y:.2f}°N, {abs(row['geometry'].x):.2f}°{'W' if row['geometry'].x < 0 else 'E'}" |
| | else: |
| | coord_str = "N/A" |
| | except: |
| | coord_str = "N/A" |
| | |
| | |
| | score = row['Score'] |
| | if score <= 0.3: |
| | indicator = "⭐" |
| | elif score <= 0.5: |
| | indicator = "✅" |
| | elif score <= 0.7: |
| | indicator = "⚠️" |
| | else: |
| | indicator = "❌" |
| | |
| | report_lines.append( |
| | f"| {int(row['Rank'])} {indicator} | {row['Name']} | {score:.3f} | " |
| | f"{row['safety_score']:.3f} | {row['environment_score']:.3f} | " |
| | f"{row['technical_score']:.3f} | {row['economic_score']:.3f} | {coord_str} |" |
| | ) |
| | |
| | report_lines.append("") |
| | |
| | |
| | report_lines.append("## DETAILED CRITERIA ANALYSIS") |
| | |
| | |
| | safety_avg = df_rank['safety_score'].mean() |
| | safety_best = df_rank['safety_score'].min() |
| | report_lines.append(f"### {obj_1.title()} Performance") |
| | report_lines.append(f"- **Average Score:** {safety_avg:.3f}") |
| | report_lines.append(f"- **Best Score:** {safety_best:.3f}") |
| | if safety_avg < 0.3: |
| | report_lines.append(f"- **Assessment:** Excellent {obj_1} conditions across the region") |
| | elif safety_avg < 0.5: |
| | report_lines.append(f"- **Assessment:** Good {obj_1} performance with some variation") |
| | else: |
| | report_lines.append(f"- **Assessment:** {obj_1.title()} concerns identified - requires attention") |
| | report_lines.append("") |
| | |
| | |
| | env_avg = df_rank['environment_score'].mean() |
| | env_best = df_rank['environment_score'].min() |
| | report_lines.append(f"### {obj_2.title()} Performance") |
| | report_lines.append(f"- **Average Score:** {env_avg:.3f}") |
| | report_lines.append(f"- **Best Score:** {env_best:.3f}") |
| | if env_avg < 0.3: |
| | report_lines.append(f"- **Assessment:** Low {obj_2} impact across most locations") |
| | elif env_avg < 0.5: |
| | report_lines.append(f"- **Assessment:** Moderate {obj_2} considerations") |
| | else: |
| | report_lines.append(f"- **Assessment:** Significant {obj_2} factors require careful management") |
| | report_lines.append("") |
| | |
| | |
| | report_lines.append("## STRATEGIC RECOMMENDATIONS") |
| | |
| | if excellent_count > 0: |
| | report_lines.append(f"**Immediate Development Opportunities ({excellent_count} sites):**") |
| | top_sites = df_rank.head(min(5, excellent_count))['Name'].tolist() |
| | report_lines.append(f"- Priority sites: {', '.join(top_sites[:3])}") |
| | report_lines.append("- These locations show optimal performance across all criteria") |
| | report_lines.append("- Proceed with detailed feasibility studies") |
| | report_lines.append("") |
| | |
| | if good_count > 0: |
| | report_lines.append(f"**Secondary Development Candidates ({good_count} sites):**") |
| | report_lines.append("- Suitable for development with appropriate risk management") |
| | report_lines.append("- Focus on addressing specific criterion weaknesses") |
| | report_lines.append("") |
| | |
| | if poor_count > len(df_rank) * 0.5: |
| | report_lines.append("**Regional Assessment:**") |
| | report_lines.append("- High proportion of challenging locations identified") |
| | report_lines.append("- Consider expanding analysis to adjacent regions") |
| | report_lines.append("- Review weighting criteria for regional appropriateness") |
| | report_lines.append("") |
| | |
| | |
| | report_lines.append("## RECOMMENDED NEXT STEPS") |
| | report_lines.append("1. **Detailed Site Assessment:** Focus on top 5 ranked locations") |
| | report_lines.append("2. **Risk Mitigation Planning:** Address specific criteria weaknesses") |
| | report_lines.append("3. **Stakeholder Engagement:** Begin consultation for priority sites") |
| | report_lines.append("4. **Regulatory Compliance:** Ensure alignment with licensing requirements") |
| | report_lines.append("5. **Environmental Management:** Develop site-specific environmental plans") |
| | report_lines.append("") |
| | |
| | |
| | report_lines.append("---") |
| | report_lines.append("*Analysis completed using Multi-Criteria Decision Analysis (MCDA)*") |
| | report_lines.append(f"*Weighted scoring across {obj_1}, {obj_2}, {obj_3}, and {obj_4} criteria*") |
| | report_lines.append("*Lower scores indicate better performance - prioritize top-ranked locations*") |
| | |
| | return "\n".join(report_lines) |
| |
|
| |
|
| | def main(): |
| | report, df_rank, used_weights = run_scenario_analysis("economic_focus") |
| |
|
| | print("Scenario used:", used_weights) |
| | print(report) |
| |
|
| | print("") |
| | print(df_rank) |
| |
|
| | print("") |
| | print("Used weights") |
| | print(used_weights) |
| |
|
| | report, df_rank, used_weights = run_scenario_analysis("economic_focus", adjust={"technical": +0.5}) |
| |
|
| | print("Scenario used:", used_weights) |
| | print(report) |
| |
|
| | print("") |
| | print(df_rank) |
| |
|
| | print("") |
| | print("Used weights") |
| | print(used_weights) |
| |
|
| | """ |
| | w_1 = 0.9 |
| | w_2 = 0.5 |
| | w_3 = 0.3 |
| | w_4 = 0.3 |
| | report, df_rank = run_mcda( |
| | # "licences", "safety", "environment", "technical", "economic", |
| | "licences", "economic", "technical", "safety", "environment", |
| | w_1, w_2, w_3, w_4) |
| | """ |
| | set_trace() |
| |
|
| |
|
| | if __name__ == "__main__": |
| | main() |