solarseg / data_utils.py
0x1ay's picture
Include deployment data assets for Hugging Face
eb68124
"""
Data utilities for handling site databases and CSV operations.
"""
import pandas as pd
import streamlit as st
from typing import Optional, Dict, Any
import os
def _database_search_roots():
"""Return directories that may contain deployment data files."""
module_dir = os.path.dirname(os.path.abspath(__file__))
return [
os.getcwd(),
module_dir,
os.path.join(module_dir, 'solarseg'),
'/app',
]
def _candidate_paths(*candidates: str):
"""Return all candidate file paths checked at runtime."""
paths = []
for root in _database_search_roots():
for candidate in candidates:
paths.append(os.path.join(root, candidate))
return paths
def _resolve_existing_path(*candidates: str) -> Optional[str]:
"""Return first existing path from known runtime locations."""
for path in _candidate_paths(*candidates):
if os.path.exists(path):
return path
return None
def get_database_file_status() -> Dict[str, Any]:
"""Return search diagnostics for built-in site database files."""
search_files = ('PT_sitesAll.csv', 'site_bounds.csv', 'solar_sites_bboxes.csv')
searched_paths = _candidate_paths(*search_files)
available_files = [path for path in searched_paths if os.path.exists(path)]
return {
'search_files': list(search_files),
'searched_paths': searched_paths,
'available_files': available_files,
}
@st.cache_data(ttl=None)
def load_sites_database() -> pd.DataFrame:
"""Load sites database from CSV file with caching and column mapping"""
# Primary database: PowerTrust sites with full capacity data
sites_file = _resolve_existing_path('PT_sitesAll.csv', 'site_bounds.csv')
if sites_file:
try:
df = pd.read_csv(sites_file)
print(f"✓ Loaded {len(df)} sites from {sites_file}")
# Drop unnamed index column if present
if 'Unnamed: 0' in df.columns:
df = df.drop(columns=['Unnamed: 0'])
# Map columns to expected names (handle various conventions)
column_mapping = {
# Common variations for site ID
'site_id': 'Id',
'id': 'Id',
# Common variations for site name
'site_name': 'SiteName',
'name': 'SiteName',
'sitename': 'SiteName',
# Other fields
'developer': 'Developer',
'country': 'Country',
'capacity_kw': 'Capacity',
'capacity': 'Capacity',
'cod': 'COD',
# Common variations for coordinates
'latitude': 'Latitude',
'lat': 'Latitude',
'longitude': 'Longitude',
'lng': 'Longitude',
'lon': 'Longitude'
}
# Apply mapping (case-insensitive)
existing_cols = {col.lower(): col for col in df.columns}
rename_dict = {}
for src, dst in column_mapping.items():
if src.lower() in existing_cols and dst not in df.columns:
rename_dict[existing_cols[src.lower()]] = dst
df = df.rename(columns=rename_dict)
# Generate ID column if missing (use index)
if 'Id' not in df.columns:
df['Id'] = df.index
# Add missing columns with default values
if 'COD' not in df.columns:
df['COD'] = ''
if 'Technology Type' not in df.columns:
df['Technology Type'] = 'Solar PV'
if 'Capacity' not in df.columns:
df['Capacity'] = 0.0
return df
except Exception as e:
st.warning(f"Error reading {sites_file}: {str(e)}")
else:
print("✗ Sites database file not found in known locations")
return pd.DataFrame() # Return empty DataFrame if no file found
@st.cache_data
def load_bbox_sites_database() -> pd.DataFrame:
"""Load sites database with bounding box information from CSV file with caching"""
# Try bbox-specific file first, fall back to main database
bbox_file = _resolve_existing_path('solar_sites_bboxes.csv')
if bbox_file:
try:
df = pd.read_csv(bbox_file)
# Standardize column names for consistency
if 'original_lat' in df.columns and 'original_lon' in df.columns:
df['Latitude'] = df['original_lat']
df['Longitude'] = df['original_lon']
if 'capacity_kw' in df.columns:
df['Capacity'] = df['capacity_kw']
if 'site' in df.columns:
df['SiteName'] = df['site']
if 'country' in df.columns:
df['Country'] = df['country']
return df
except Exception as e:
st.warning(f"Error reading {bbox_file}: {str(e)}")
# Fall back to main database
return load_sites_database()
def get_combined_sites_database() -> pd.DataFrame:
"""Get combined sites database from both sources"""
main_df = load_sites_database()
bbox_df = load_bbox_sites_database()
if main_df.empty and bbox_df.empty:
return pd.DataFrame()
elif main_df.empty:
return bbox_df
elif bbox_df.empty:
return main_df
# Combine both databases, prioritizing bbox data for sites that exist in both
# Use outer join to include all sites
try:
# Create merge key based on coordinates (rounded to avoid floating point issues)
if 'Latitude' in main_df.columns and 'Longitude' in main_df.columns:
main_df['_merge_key'] = (main_df['Latitude'].round(4).astype(str) + '_' +
main_df['Longitude'].round(4).astype(str))
if 'Latitude' in bbox_df.columns and 'Longitude' in bbox_df.columns:
bbox_df['_merge_key'] = (bbox_df['Latitude'].round(4).astype(str) + '_' +
bbox_df['Longitude'].round(4).astype(str))
# Merge, keeping all bbox sites and adding any additional main_df sites
combined = pd.concat([bbox_df, main_df], ignore_index=True)
combined = combined.drop_duplicates(
subset=['_merge_key'], keep='first')
combined = combined.drop(columns=['_merge_key'])
return combined
except Exception:
# If merge fails, just concatenate
return pd.concat([bbox_df, main_df], ignore_index=True)
return main_df
def search_site_by_coordinates(lat: float, lon: float, tolerance: float = 0.001) -> Optional[Dict[str, Any]]:
"""Search for site in database by coordinates within tolerance - checks both databases"""
# First check bbox database (more precise data)
bbox_site = get_site_bounding_box(lat, lon, tolerance)
if bbox_site:
return bbox_site
# Fall back to main database
sites_df = load_sites_database()
if sites_df.empty:
return None
# Ensure required columns exist
required_columns = ['Latitude', 'Longitude']
if not all(col in sites_df.columns for col in required_columns):
return None
# Find sites within tolerance
lat_diff = abs(sites_df['Latitude'] - lat)
lon_diff = abs(sites_df['Longitude'] - lon)
mask = (lat_diff <= tolerance) & (lon_diff <= tolerance)
matching_sites = sites_df[mask]
if len(matching_sites) > 0:
# Return the closest match
distances = lat_diff + lon_diff
closest_idx = distances[mask].idxmin()
return sites_df.loc[closest_idx].to_dict()
return None
def search_site_by_name_or_id(query: str) -> Optional[Dict[str, Any]]:
"""Search for site by name or ID - checks both databases"""
# First check bbox database
bbox_site = search_bbox_site_by_name(query)
if bbox_site:
return bbox_site
# Fall back to main database
sites_df = load_sites_database()
if sites_df.empty:
return None
# Search in multiple columns
search_columns = ['SiteName', 'Id', 'Developer']
for col in search_columns:
if col in sites_df.columns:
mask = sites_df[col].astype(str).str.contains(
query, case=False, na=False)
matches = sites_df[mask]
if len(matches) > 0:
return matches.iloc[0].to_dict()
return None
def get_all_sites() -> pd.DataFrame:
"""Get all sites from database"""
return load_sites_database()
def validate_sites_database_format(df: pd.DataFrame) -> Dict[str, Any]:
"""Validate the format of sites database"""
required_columns = [
'Id', 'Developer', 'SiteName', 'COD', 'Latitude',
'Longitude', 'Country', 'Technology Type', 'Capacity'
]
missing_columns = []
present_columns = []
for col in required_columns:
if col in df.columns:
present_columns.append(col)
else:
missing_columns.append(col)
return {
'is_valid': len(missing_columns) == 0,
'missing_columns': missing_columns,
'present_columns': present_columns,
'total_rows': len(df),
'columns': list(df.columns)
}
def get_database_summary() -> Dict[str, Any]:
"""Get summary statistics of the sites database"""
sites_df = load_sites_database()
file_status = get_database_file_status()
if sites_df.empty:
return {
'status': 'empty',
'message': 'No sites database found',
**file_status,
}
validation = validate_sites_database_format(sites_df)
summary = {
'status': 'loaded',
'total_sites': len(sites_df),
'validation': validation,
**file_status,
}
# Add statistics if data is present
if 'Country' in sites_df.columns:
summary['countries'] = sites_df['Country'].value_counts().to_dict()
if 'Developer' in sites_df.columns:
summary['developers'] = sites_df['Developer'].value_counts().to_dict()
if 'Technology Type' in sites_df.columns:
summary['technologies'] = sites_df['Technology Type'].value_counts().to_dict()
if 'Capacity' in sites_df.columns:
try:
capacities = pd.to_numeric(sites_df['Capacity'], errors='coerce')
summary['capacity_stats'] = {
'total_capacity': capacities.sum(),
'avg_capacity': capacities.mean(),
'min_capacity': capacities.min(),
'max_capacity': capacities.max()
}
except Exception:
pass
return summary
def filter_sites_by_criteria(country: str = None, developer: str = None,
technology: str = None, min_capacity: float = None,
max_capacity: float = None) -> pd.DataFrame:
"""Filter sites database by various criteria"""
sites_df = load_sites_database()
if sites_df.empty:
return sites_df
filtered_df = sites_df.copy()
if country and 'Country' in filtered_df.columns:
filtered_df = filtered_df[filtered_df['Country'].str.contains(
country, case=False, na=False)]
if developer and 'Developer' in filtered_df.columns:
filtered_df = filtered_df[filtered_df['Developer'].str.contains(
developer, case=False, na=False)]
if technology and 'Technology Type' in filtered_df.columns:
filtered_df = filtered_df[filtered_df['Technology Type'].str.contains(
technology, case=False, na=False)]
if min_capacity is not None or max_capacity is not None:
if 'Capacity' in filtered_df.columns:
capacities = pd.to_numeric(
filtered_df['Capacity'], errors='coerce')
if min_capacity is not None:
filtered_df = filtered_df[capacities >= min_capacity]
if max_capacity is not None:
filtered_df = filtered_df[capacities <= max_capacity]
return filtered_df
def filter_and_sort_sites_by_capacity(ascending: bool = True, min_capacity: float = None,
max_capacity: float = None, exclude_incomplete: bool = False) -> pd.DataFrame:
"""Filter and sort sites by capacity with optional COD filtering"""
sites_df = load_sites_database()
if sites_df.empty:
return sites_df
filtered_df = sites_df.copy()
# Convert capacity to numeric for proper filtering and sorting
if 'Capacity' in filtered_df.columns:
filtered_df['Capacity_numeric'] = pd.to_numeric(
filtered_df['Capacity'], errors='coerce')
# Filter by capacity range
if min_capacity is not None:
filtered_df = filtered_df[filtered_df['Capacity_numeric']
>= min_capacity]
if max_capacity is not None:
filtered_df = filtered_df[filtered_df['Capacity_numeric']
<= max_capacity]
# Remove rows with invalid capacity values
filtered_df = filtered_df.dropna(subset=['Capacity_numeric'])
# Sort by capacity
filtered_df = filtered_df.sort_values(
'Capacity_numeric', ascending=ascending)
# Remove the temporary numeric column
filtered_df = filtered_df.drop('Capacity_numeric', axis=1)
# Filter out incomplete projects if requested
if exclude_incomplete and 'COD' in filtered_df.columns:
filtered_df = filter_completed_projects(filtered_df)
return filtered_df
def filter_completed_projects(sites_df: pd.DataFrame = None) -> pd.DataFrame:
"""Filter sites to show only completed projects based on COD (Commercial Operation Date)"""
if sites_df is None:
sites_df = load_sites_database()
if sites_df.empty or 'COD' not in sites_df.columns:
return sites_df
filtered_df = sites_df.copy()
# Convert COD to datetime for proper comparison
try:
# Try different date formats commonly used
date_formats = ['%d/%m/%Y', '%m/%d/%Y', '%Y-%m-%d', '%d-%m-%Y']
for date_format in date_formats:
try:
filtered_df['COD_date'] = pd.to_datetime(
filtered_df['COD'], format=date_format, errors='coerce')
# If successful conversion, break
if filtered_df['COD_date'].notna().sum() > 0:
break
except:
continue
# If no format worked, try pandas automatic parsing
if filtered_df['COD_date'].notna().sum() == 0:
filtered_df['COD_date'] = pd.to_datetime(
filtered_df['COD'], errors='coerce')
# Get current date
current_date = pd.Timestamp.now()
# Filter to show only projects with COD in the past (completed projects)
mask = (filtered_df['COD_date'].notna()) & (
filtered_df['COD_date'] <= current_date)
completed_projects = filtered_df[mask].copy()
# Remove the temporary date column
completed_projects = completed_projects.drop('COD_date', axis=1)
return completed_projects
except Exception as e:
st.warning(
f"Error parsing COD dates: {str(e)}. Returning all projects.")
return filtered_df
def filter_by_cod_range(start_date: str = None, end_date: str = None,
date_format: str = '%d/%m/%Y') -> pd.DataFrame:
"""Filter sites by COD date range"""
sites_df = load_sites_database()
if sites_df.empty or 'COD' not in sites_df.columns:
return sites_df
filtered_df = sites_df.copy()
try:
# Convert COD to datetime
filtered_df['COD_date'] = pd.to_datetime(
filtered_df['COD'], format=date_format, errors='coerce')
# Apply date filters
if start_date:
start_dt = pd.to_datetime(start_date, format=date_format)
filtered_df = filtered_df[filtered_df['COD_date'] >= start_dt]
if end_date:
end_dt = pd.to_datetime(end_date, format=date_format)
filtered_df = filtered_df[filtered_df['COD_date'] <= end_dt]
# Remove rows with invalid dates and temporary column
filtered_df = filtered_df.dropna(subset=['COD_date'])
filtered_df = filtered_df.drop('COD_date', axis=1)
return filtered_df
except Exception as e:
st.warning(
f"Error filtering by COD range: {str(e)}. Returning original data.")
return sites_df
def get_capacity_statistics() -> Dict[str, Any]:
"""Get detailed capacity statistics from the sites database"""
sites_df = load_sites_database()
if sites_df.empty or 'Capacity' not in sites_df.columns:
return {'status': 'no_data', 'message': 'No capacity data available'}
try:
capacities = pd.to_numeric(
sites_df['Capacity'], errors='coerce').dropna()
if len(capacities) == 0:
return {'status': 'no_valid_data', 'message': 'No valid capacity values found'}
stats = {
'status': 'success',
'total_projects': len(capacities),
'total_capacity_kwp': float(capacities.sum()),
'average_capacity_kwp': float(capacities.mean()),
'median_capacity_kwp': float(capacities.median()),
'min_capacity_kwp': float(capacities.min()),
'max_capacity_kwp': float(capacities.max()),
'std_capacity_kwp': float(capacities.std()),
'capacity_quartiles': {
'25%': float(capacities.quantile(0.25)),
'50%': float(capacities.quantile(0.50)),
'75%': float(capacities.quantile(0.75))
}
}
return stats
except Exception as e:
return {'status': 'error', 'message': f'Error calculating statistics: {str(e)}'}
def get_completion_status_summary() -> Dict[str, Any]:
"""Get summary of project completion status based on COD"""
sites_df = load_sites_database()
if sites_df.empty or 'COD' not in sites_df.columns:
return {'status': 'no_data', 'message': 'No COD data available'}
try:
# Get completed and total projects
completed_df = filter_completed_projects(sites_df)
total_projects = len(sites_df)
completed_projects = len(completed_df)
incomplete_projects = total_projects - completed_projects
# Calculate completion rate
completion_rate = (completed_projects /
total_projects * 100) if total_projects > 0 else 0
return {
'status': 'success',
'total_projects': total_projects,
'completed_projects': completed_projects,
'incomplete_projects': incomplete_projects,
'completion_rate_percent': round(completion_rate, 2)
}
except Exception as e:
return {'status': 'error', 'message': f'Error analyzing completion status: {str(e)}'}
def export_results_to_csv(results: list, filename: str) -> str:
"""Export validation results to CSV format"""
if not results:
return "No results to export"
# Convert results to DataFrame
df = pd.DataFrame(results)
# Generate CSV string
csv_string = df.to_csv(index=False)
return csv_string
def get_unique_values(column: str) -> list:
"""Get unique values from a specific column in the sites database"""
sites_df = load_sites_database()
if sites_df.empty or column not in sites_df.columns:
return []
return sorted(sites_df[column].dropna().unique().tolist())
def get_site_bounding_box(lat: float, lon: float, tolerance: float = 0.001) -> Optional[Dict[str, Any]]:
"""
Get bounding box information for a site by coordinates
Args:
lat: Latitude of the site
lon: Longitude of the site
tolerance: Search tolerance in degrees
Returns:
Dictionary with bounding box info (north, south, east, west) if found, None otherwise
"""
bbox_df = load_bbox_sites_database()
if bbox_df.empty:
return None
# Ensure required columns exist
if 'Latitude' not in bbox_df.columns or 'Longitude' not in bbox_df.columns:
return None
# Find sites within tolerance
lat_diff = abs(bbox_df['Latitude'] - lat)
lon_diff = abs(bbox_df['Longitude'] - lon)
mask = (lat_diff <= tolerance) & (lon_diff <= tolerance)
matching_sites = bbox_df[mask]
if len(matching_sites) > 0:
# Return the closest match
distances = lat_diff + lon_diff
closest_idx = distances[mask].idxmin()
site_info = bbox_df.loc[closest_idx].to_dict()
# Extract bounding box if available
if all(key in site_info for key in ['north', 'south', 'east', 'west']):
site_info['has_bbox'] = True
site_info['bbox'] = {
'north': site_info['north'],
'south': site_info['south'],
'east': site_info['east'],
'west': site_info['west']
}
else:
site_info['has_bbox'] = False
return site_info
return None
def search_bbox_site_by_name(site_name: str) -> Optional[Dict[str, Any]]:
"""
Search for a site with bounding box information by name
Args:
site_name: Name of the site to search for
Returns:
Dictionary with site and bounding box info if found, None otherwise
"""
bbox_df = load_bbox_sites_database()
if bbox_df.empty:
return None
# Search in site name column (case-insensitive)
if 'SiteName' in bbox_df.columns:
mask = bbox_df['SiteName'].astype(str).str.contains(
site_name, case=False, na=False)
matches = bbox_df[mask]
if len(matches) > 0:
site_info = matches.iloc[0].to_dict()
# Add bounding box info
if all(key in site_info for key in ['north', 'south', 'east', 'west']):
site_info['has_bbox'] = True
site_info['bbox'] = {
'north': site_info['north'],
'south': site_info['south'],
'east': site_info['east'],
'west': site_info['west']
}
else:
site_info['has_bbox'] = False
return site_info
return None
def get_all_bbox_sites() -> pd.DataFrame:
"""Get all sites with bounding box information"""
return load_bbox_sites_database()
def get_bbox_sites_summary() -> Dict[str, Any]:
"""Get summary statistics of the bounding box sites database"""
bbox_df = load_bbox_sites_database()
if bbox_df.empty:
return {'status': 'empty', 'message': 'No bounding box sites database found'}
summary = {
'status': 'loaded',
'total_sites': len(bbox_df),
'has_bbox_data': all(col in bbox_df.columns for col in ['north', 'south', 'east', 'west'])
}
# Add statistics if data is present
if 'Country' in bbox_df.columns:
summary['countries'] = bbox_df['Country'].value_counts().to_dict()
summary['total_countries'] = len(bbox_df['Country'].unique())
if 'Capacity' in bbox_df.columns:
try:
capacities = pd.to_numeric(bbox_df['Capacity'], errors='coerce')
summary['capacity_stats'] = {
'total_capacity_kw': capacities.sum(),
'avg_capacity_kw': capacities.mean(),
'min_capacity_kw': capacities.min(),
'max_capacity_kw': capacities.max()
}
except Exception:
pass
return summary
def calculate_bbox_area(north: float, south: float, east: float, west: float) -> float:
"""
Calculate approximate area of a bounding box in square meters
Args:
north, south, east, west: Bounding box coordinates in degrees
Returns:
Area in square meters (approximate)
"""
# Approximate conversion at mid-latitude
mid_lat = (north + south) / 2
# Degrees to meters conversion (approximate)
lat_to_m = 111320 # meters per degree latitude
# meters per degree longitude (varies with latitude)
lon_to_m = 111320 * abs(mid_lat)
height_m = abs(north - south) * lat_to_m
width_m = abs(east - west) * lon_to_m
return height_m * width_m