File size: 6,232 Bytes
845d5aa |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
import pandas as pd
import geopandas as gpd
import matplotlib.pyplot as plt
import os
import glob
from os.path import join as pjoin
import math
import numpy as np
import sys
import time
from config import get_dataset_path, get_shapefile_from_s3, DATASET_S3_KEYS
def add_data_source(run_context, layer_list):
for layer_name in layer_list:
if layer_name == "seismic":
run_context.deps.add_source("UK BGS earthquake data (https://www.earthquakes.bgs.ac.uk)")
elif layer_name == "drilling":
run_context.deps.add_source("UKCS daily production data")
elif layer_name == "licences":
run_context.deps.add_source("UKCS licensed blocks data (https://www.arcgis.com/home/item.html?id=92b08a672721407ca90ed26e67514af8)")
elif layer_name == "wells":
run_context.deps.add_source("UKCS wells data (https://www.arcgis.com/home/item.html?id=92b08a672721407ca90ed26e67514af8)")
elif layer_name == "pipelines":
run_context.deps.add_source("UKCS pipeline data (https://www.arcgis.com/home/item.html?id=92b08a672721407ca90ed26e67514af8)")
elif layer_name == "offshore_fields":
run_context.deps.add_source("UKCS offshore fields data (https://www.arcgis.com/home/item.html?id=92b08a672721407ca90ed26e67514af8)")
elif layer_name == "windfarms":
run_context.deps.add_source("EMODNet active offshore wind farms data (https://emodnet.ec.europa.eu/)")
elif layer_name == "copernicus_wind":
run_context.deps.add_source("Copernicus Marine Data Service's Global Ocean Hourly Reprocessed Sea Surface Wind and Stress from Scatterometer and Model API, https://data.marine.copernicus.eu/product/WIND_GLO_PHY_L4_MY_012_006/description.")
elif layer_name == "copernicus_wave":
run_context.deps.add_source("Copernicus Marine Data Service's Global Ocean L 4 Significant Wave Height From Reprocessed Satellite Measurements API, https://data.marine.copernicus.eu/product/WAVE_GLO_PHY_SWH_L4_MY_014_007/description.")
def load_data_and_process(layer_name: str):
"""
Load and process datasets from S3.
Handles both CSV and shapefile formats.
"""
print(f" β load_data_and_process({layer_name})", flush=True)
sys.stdout.flush()
start = time.time()
# Determine file type from S3 key
s3_key = DATASET_S3_KEYS[layer_name]
print(f" β S3 key: {s3_key}", flush=True)
sys.stdout.flush()
# Load data based on file type
if s3_key.endswith('.csv'):
print(f" β Loading CSV...", flush=True)
sys.stdout.flush()
local_path = get_dataset_path(layer_name)
print(f" β Local path: {local_path}", flush=True)
df = pd.read_csv(local_path)
print(f" β CSV loaded: {len(df)} rows", flush=True)
sys.stdout.flush()
# Convert to GeoDataFrame for CSV files with Lat/Lon
if 'Lat' in df.columns and 'Lon' in df.columns:
print(f" β Converting to GeoDataFrame...", flush=True)
sys.stdout.flush()
df = gpd.GeoDataFrame(
df,
geometry=gpd.points_from_xy(df['Lon'], df['Lat']),
crs='EPSG:4326'
)
elif s3_key.endswith('.shp'):
print(f" β Loading shapefile...", flush=True)
sys.stdout.flush()
local_path = get_shapefile_from_s3(layer_name)
print(f" β Local path: {local_path}", flush=True)
sys.stdout.flush()
df = gpd.read_file(local_path)
print(f" β Shapefile loaded: {len(df)} rows", flush=True)
sys.stdout.flush()
else:
raise ValueError(f"Unsupported file type for {layer_name}")
print(f" β Processing {layer_name}...", flush=True)
sys.stdout.flush()
# Process data based on layer type
if layer_name == "seismic":
df["Name"] = df["Region"] + ", " + df["Comment"]
if 'geometry' not in df.columns:
df["geometry"] = gpd.points_from_xy(df['Lon'], df['Lat'])
df = df[["geometry", "Name"]]
elif layer_name == "drilling":
if 'geometry' not in df.columns:
df["geometry"] = gpd.points_from_xy(df['Lon'], df['Lat'])
df.rename(columns={'Field': 'Name'}, inplace=True)
df = df[["geometry", "Name", "GasAvg"]]
elif layer_name == "licences":
df['Name'] = df['LICTYPE'] + df['LICNO'].astype(str) + '_' + df['BLOCKREF'] + '_' + df['BLOCKSUFFI']
df = df[["geometry", "Name"]]
df['Name'] = df['Name'].fillna("")
elif layer_name == "pipelines":
df.rename(columns={'PIPE_NAME': 'Name'}, inplace=True)
df = df[["geometry", "Name"]]
elif layer_name == "offshore_fields":
df.rename(columns={'FIELDNAME': 'Name'}, inplace=True)
df = df[["geometry", "Name"]]
elif layer_name == "wells":
df.rename(columns={'WELLREGNO': 'Name'}, inplace=True)
df = df[["geometry", "Name", "ORIGINSTAT"]]
elif layer_name == "windfarms":
# Filter for active windfarms only
df = df[df["STATUS"].isin(["Construction", "Production"])]
df['Name'] = df['NAME'].fillna('Unnamed') + ' (' + df['POWER_MW'].astype(str) + 'MW)'
df = df[["geometry", "Name"]]
# Ensure it's a GeoDataFrame
if not isinstance(df, gpd.GeoDataFrame):
df = gpd.GeoDataFrame(df, geometry='geometry', crs='EPSG:4326')
elapsed = time.time() - start
print(f" β load_data_and_process({layer_name}) complete in {elapsed:.2f}s", flush=True)
sys.stdout.flush()
return df
def calculate_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""
Calculate distance between two points using Haversine formula.
Returns:
Distance in kilometers
"""
R = 6371 # Earth's radius in km
# Convert to radians
lat1, lon1, lat2, lon2 = map(math.radians, [lat1, lon1, lat2, lon2])
dlat = lat2 - lat1
dlon = lon2 - lon1
a = (math.sin(dlat/2)**2 +
math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2)
c = 2 * math.asin(math.sqrt(a))
return R * c
|