| """ |
| Data utilities for handling site databases and CSV operations. |
| """ |
|
|
| import pandas as pd |
| import streamlit as st |
| from typing import Optional, Dict, Any |
| import os |
|
|
|
|
| def _database_search_roots(): |
| """Return directories that may contain deployment data files.""" |
| module_dir = os.path.dirname(os.path.abspath(__file__)) |
| return [ |
| os.getcwd(), |
| module_dir, |
| os.path.join(module_dir, 'solarseg'), |
| '/app', |
| ] |
|
|
|
|
| def _candidate_paths(*candidates: str): |
| """Return all candidate file paths checked at runtime.""" |
| paths = [] |
| for root in _database_search_roots(): |
| for candidate in candidates: |
| paths.append(os.path.join(root, candidate)) |
| return paths |
|
|
|
|
| def _resolve_existing_path(*candidates: str) -> Optional[str]: |
| """Return first existing path from known runtime locations.""" |
| for path in _candidate_paths(*candidates): |
| if os.path.exists(path): |
| return path |
| return None |
|
|
|
|
| def get_database_file_status() -> Dict[str, Any]: |
| """Return search diagnostics for built-in site database files.""" |
| search_files = ('PT_sitesAll.csv', 'site_bounds.csv', 'solar_sites_bboxes.csv') |
| searched_paths = _candidate_paths(*search_files) |
| available_files = [path for path in searched_paths if os.path.exists(path)] |
| return { |
| 'search_files': list(search_files), |
| 'searched_paths': searched_paths, |
| 'available_files': available_files, |
| } |
|
|
|
|
| @st.cache_data(ttl=None) |
| def load_sites_database() -> pd.DataFrame: |
| """Load sites database from CSV file with caching and column mapping""" |
| |
| sites_file = _resolve_existing_path('PT_sitesAll.csv', 'site_bounds.csv') |
|
|
| if sites_file: |
| try: |
| df = pd.read_csv(sites_file) |
| print(f"✓ Loaded {len(df)} sites from {sites_file}") |
| |
| |
| if 'Unnamed: 0' in df.columns: |
| df = df.drop(columns=['Unnamed: 0']) |
| |
| |
| column_mapping = { |
| |
| 'site_id': 'Id', |
| 'id': 'Id', |
| |
| 'site_name': 'SiteName', |
| 'name': 'SiteName', |
| 'sitename': 'SiteName', |
| |
| 'developer': 'Developer', |
| 'country': 'Country', |
| 'capacity_kw': 'Capacity', |
| 'capacity': 'Capacity', |
| 'cod': 'COD', |
| |
| 'latitude': 'Latitude', |
| 'lat': 'Latitude', |
| 'longitude': 'Longitude', |
| 'lng': 'Longitude', |
| 'lon': 'Longitude' |
| } |
| |
| |
| existing_cols = {col.lower(): col for col in df.columns} |
| rename_dict = {} |
| for src, dst in column_mapping.items(): |
| if src.lower() in existing_cols and dst not in df.columns: |
| rename_dict[existing_cols[src.lower()]] = dst |
| |
| df = df.rename(columns=rename_dict) |
| |
| |
| if 'Id' not in df.columns: |
| df['Id'] = df.index |
| |
| |
| if 'COD' not in df.columns: |
| df['COD'] = '' |
| if 'Technology Type' not in df.columns: |
| df['Technology Type'] = 'Solar PV' |
| if 'Capacity' not in df.columns: |
| df['Capacity'] = 0.0 |
| |
| return df |
| except Exception as e: |
| st.warning(f"Error reading {sites_file}: {str(e)}") |
| else: |
| print("✗ Sites database file not found in known locations") |
|
|
| return pd.DataFrame() |
|
|
|
|
| @st.cache_data |
| def load_bbox_sites_database() -> pd.DataFrame: |
| """Load sites database with bounding box information from CSV file with caching""" |
| |
| bbox_file = _resolve_existing_path('solar_sites_bboxes.csv') |
|
|
| if bbox_file: |
| try: |
| df = pd.read_csv(bbox_file) |
| |
| if 'original_lat' in df.columns and 'original_lon' in df.columns: |
| df['Latitude'] = df['original_lat'] |
| df['Longitude'] = df['original_lon'] |
| if 'capacity_kw' in df.columns: |
| df['Capacity'] = df['capacity_kw'] |
| if 'site' in df.columns: |
| df['SiteName'] = df['site'] |
| if 'country' in df.columns: |
| df['Country'] = df['country'] |
| return df |
| except Exception as e: |
| st.warning(f"Error reading {bbox_file}: {str(e)}") |
| |
| |
| return load_sites_database() |
|
|
|
|
| def get_combined_sites_database() -> pd.DataFrame: |
| """Get combined sites database from both sources""" |
| main_df = load_sites_database() |
| bbox_df = load_bbox_sites_database() |
|
|
| if main_df.empty and bbox_df.empty: |
| return pd.DataFrame() |
| elif main_df.empty: |
| return bbox_df |
| elif bbox_df.empty: |
| return main_df |
|
|
| |
| |
| try: |
| |
| if 'Latitude' in main_df.columns and 'Longitude' in main_df.columns: |
| main_df['_merge_key'] = (main_df['Latitude'].round(4).astype(str) + '_' + |
| main_df['Longitude'].round(4).astype(str)) |
|
|
| if 'Latitude' in bbox_df.columns and 'Longitude' in bbox_df.columns: |
| bbox_df['_merge_key'] = (bbox_df['Latitude'].round(4).astype(str) + '_' + |
| bbox_df['Longitude'].round(4).astype(str)) |
|
|
| |
| combined = pd.concat([bbox_df, main_df], ignore_index=True) |
| combined = combined.drop_duplicates( |
| subset=['_merge_key'], keep='first') |
| combined = combined.drop(columns=['_merge_key']) |
| return combined |
| except Exception: |
| |
| return pd.concat([bbox_df, main_df], ignore_index=True) |
|
|
| return main_df |
|
|
|
|
| def search_site_by_coordinates(lat: float, lon: float, tolerance: float = 0.001) -> Optional[Dict[str, Any]]: |
| """Search for site in database by coordinates within tolerance - checks both databases""" |
| |
| bbox_site = get_site_bounding_box(lat, lon, tolerance) |
| if bbox_site: |
| return bbox_site |
|
|
| |
| sites_df = load_sites_database() |
|
|
| if sites_df.empty: |
| return None |
|
|
| |
| required_columns = ['Latitude', 'Longitude'] |
| if not all(col in sites_df.columns for col in required_columns): |
| return None |
|
|
| |
| lat_diff = abs(sites_df['Latitude'] - lat) |
| lon_diff = abs(sites_df['Longitude'] - lon) |
|
|
| mask = (lat_diff <= tolerance) & (lon_diff <= tolerance) |
| matching_sites = sites_df[mask] |
|
|
| if len(matching_sites) > 0: |
| |
| distances = lat_diff + lon_diff |
| closest_idx = distances[mask].idxmin() |
| return sites_df.loc[closest_idx].to_dict() |
|
|
| return None |
|
|
|
|
| def search_site_by_name_or_id(query: str) -> Optional[Dict[str, Any]]: |
| """Search for site by name or ID - checks both databases""" |
| |
| bbox_site = search_bbox_site_by_name(query) |
| if bbox_site: |
| return bbox_site |
|
|
| |
| sites_df = load_sites_database() |
|
|
| if sites_df.empty: |
| return None |
|
|
| |
| search_columns = ['SiteName', 'Id', 'Developer'] |
|
|
| for col in search_columns: |
| if col in sites_df.columns: |
| mask = sites_df[col].astype(str).str.contains( |
| query, case=False, na=False) |
| matches = sites_df[mask] |
|
|
| if len(matches) > 0: |
| return matches.iloc[0].to_dict() |
|
|
| return None |
|
|
|
|
| def get_all_sites() -> pd.DataFrame: |
| """Get all sites from database""" |
| return load_sites_database() |
|
|
|
|
| def validate_sites_database_format(df: pd.DataFrame) -> Dict[str, Any]: |
| """Validate the format of sites database""" |
| required_columns = [ |
| 'Id', 'Developer', 'SiteName', 'COD', 'Latitude', |
| 'Longitude', 'Country', 'Technology Type', 'Capacity' |
| ] |
|
|
| missing_columns = [] |
| present_columns = [] |
|
|
| for col in required_columns: |
| if col in df.columns: |
| present_columns.append(col) |
| else: |
| missing_columns.append(col) |
|
|
| return { |
| 'is_valid': len(missing_columns) == 0, |
| 'missing_columns': missing_columns, |
| 'present_columns': present_columns, |
| 'total_rows': len(df), |
| 'columns': list(df.columns) |
| } |
|
|
|
|
| def get_database_summary() -> Dict[str, Any]: |
| """Get summary statistics of the sites database""" |
| sites_df = load_sites_database() |
| file_status = get_database_file_status() |
|
|
| if sites_df.empty: |
| return { |
| 'status': 'empty', |
| 'message': 'No sites database found', |
| **file_status, |
| } |
|
|
| validation = validate_sites_database_format(sites_df) |
|
|
| summary = { |
| 'status': 'loaded', |
| 'total_sites': len(sites_df), |
| 'validation': validation, |
| **file_status, |
| } |
|
|
| |
| if 'Country' in sites_df.columns: |
| summary['countries'] = sites_df['Country'].value_counts().to_dict() |
|
|
| if 'Developer' in sites_df.columns: |
| summary['developers'] = sites_df['Developer'].value_counts().to_dict() |
|
|
| if 'Technology Type' in sites_df.columns: |
| summary['technologies'] = sites_df['Technology Type'].value_counts().to_dict() |
|
|
| if 'Capacity' in sites_df.columns: |
| try: |
| capacities = pd.to_numeric(sites_df['Capacity'], errors='coerce') |
| summary['capacity_stats'] = { |
| 'total_capacity': capacities.sum(), |
| 'avg_capacity': capacities.mean(), |
| 'min_capacity': capacities.min(), |
| 'max_capacity': capacities.max() |
| } |
| except Exception: |
| pass |
|
|
| return summary |
|
|
|
|
| def filter_sites_by_criteria(country: str = None, developer: str = None, |
| technology: str = None, min_capacity: float = None, |
| max_capacity: float = None) -> pd.DataFrame: |
| """Filter sites database by various criteria""" |
| sites_df = load_sites_database() |
|
|
| if sites_df.empty: |
| return sites_df |
|
|
| filtered_df = sites_df.copy() |
|
|
| if country and 'Country' in filtered_df.columns: |
| filtered_df = filtered_df[filtered_df['Country'].str.contains( |
| country, case=False, na=False)] |
|
|
| if developer and 'Developer' in filtered_df.columns: |
| filtered_df = filtered_df[filtered_df['Developer'].str.contains( |
| developer, case=False, na=False)] |
|
|
| if technology and 'Technology Type' in filtered_df.columns: |
| filtered_df = filtered_df[filtered_df['Technology Type'].str.contains( |
| technology, case=False, na=False)] |
|
|
| if min_capacity is not None or max_capacity is not None: |
| if 'Capacity' in filtered_df.columns: |
| capacities = pd.to_numeric( |
| filtered_df['Capacity'], errors='coerce') |
|
|
| if min_capacity is not None: |
| filtered_df = filtered_df[capacities >= min_capacity] |
|
|
| if max_capacity is not None: |
| filtered_df = filtered_df[capacities <= max_capacity] |
|
|
| return filtered_df |
|
|
|
|
| def filter_and_sort_sites_by_capacity(ascending: bool = True, min_capacity: float = None, |
| max_capacity: float = None, exclude_incomplete: bool = False) -> pd.DataFrame: |
| """Filter and sort sites by capacity with optional COD filtering""" |
| sites_df = load_sites_database() |
|
|
| if sites_df.empty: |
| return sites_df |
|
|
| filtered_df = sites_df.copy() |
|
|
| |
| if 'Capacity' in filtered_df.columns: |
| filtered_df['Capacity_numeric'] = pd.to_numeric( |
| filtered_df['Capacity'], errors='coerce') |
|
|
| |
| if min_capacity is not None: |
| filtered_df = filtered_df[filtered_df['Capacity_numeric'] |
| >= min_capacity] |
|
|
| if max_capacity is not None: |
| filtered_df = filtered_df[filtered_df['Capacity_numeric'] |
| <= max_capacity] |
|
|
| |
| filtered_df = filtered_df.dropna(subset=['Capacity_numeric']) |
|
|
| |
| filtered_df = filtered_df.sort_values( |
| 'Capacity_numeric', ascending=ascending) |
|
|
| |
| filtered_df = filtered_df.drop('Capacity_numeric', axis=1) |
|
|
| |
| if exclude_incomplete and 'COD' in filtered_df.columns: |
| filtered_df = filter_completed_projects(filtered_df) |
|
|
| return filtered_df |
|
|
|
|
| def filter_completed_projects(sites_df: pd.DataFrame = None) -> pd.DataFrame: |
| """Filter sites to show only completed projects based on COD (Commercial Operation Date)""" |
| if sites_df is None: |
| sites_df = load_sites_database() |
|
|
| if sites_df.empty or 'COD' not in sites_df.columns: |
| return sites_df |
|
|
| filtered_df = sites_df.copy() |
|
|
| |
| try: |
| |
| date_formats = ['%d/%m/%Y', '%m/%d/%Y', '%Y-%m-%d', '%d-%m-%Y'] |
|
|
| for date_format in date_formats: |
| try: |
| filtered_df['COD_date'] = pd.to_datetime( |
| filtered_df['COD'], format=date_format, errors='coerce') |
| |
| if filtered_df['COD_date'].notna().sum() > 0: |
| break |
| except: |
| continue |
|
|
| |
| if filtered_df['COD_date'].notna().sum() == 0: |
| filtered_df['COD_date'] = pd.to_datetime( |
| filtered_df['COD'], errors='coerce') |
|
|
| |
| current_date = pd.Timestamp.now() |
|
|
| |
| mask = (filtered_df['COD_date'].notna()) & ( |
| filtered_df['COD_date'] <= current_date) |
| completed_projects = filtered_df[mask].copy() |
|
|
| |
| completed_projects = completed_projects.drop('COD_date', axis=1) |
|
|
| return completed_projects |
|
|
| except Exception as e: |
| st.warning( |
| f"Error parsing COD dates: {str(e)}. Returning all projects.") |
| return filtered_df |
|
|
|
|
| def filter_by_cod_range(start_date: str = None, end_date: str = None, |
| date_format: str = '%d/%m/%Y') -> pd.DataFrame: |
| """Filter sites by COD date range""" |
| sites_df = load_sites_database() |
|
|
| if sites_df.empty or 'COD' not in sites_df.columns: |
| return sites_df |
|
|
| filtered_df = sites_df.copy() |
|
|
| try: |
| |
| filtered_df['COD_date'] = pd.to_datetime( |
| filtered_df['COD'], format=date_format, errors='coerce') |
|
|
| |
| if start_date: |
| start_dt = pd.to_datetime(start_date, format=date_format) |
| filtered_df = filtered_df[filtered_df['COD_date'] >= start_dt] |
|
|
| if end_date: |
| end_dt = pd.to_datetime(end_date, format=date_format) |
| filtered_df = filtered_df[filtered_df['COD_date'] <= end_dt] |
|
|
| |
| filtered_df = filtered_df.dropna(subset=['COD_date']) |
| filtered_df = filtered_df.drop('COD_date', axis=1) |
|
|
| return filtered_df |
|
|
| except Exception as e: |
| st.warning( |
| f"Error filtering by COD range: {str(e)}. Returning original data.") |
| return sites_df |
|
|
|
|
| def get_capacity_statistics() -> Dict[str, Any]: |
| """Get detailed capacity statistics from the sites database""" |
| sites_df = load_sites_database() |
|
|
| if sites_df.empty or 'Capacity' not in sites_df.columns: |
| return {'status': 'no_data', 'message': 'No capacity data available'} |
|
|
| try: |
| capacities = pd.to_numeric( |
| sites_df['Capacity'], errors='coerce').dropna() |
|
|
| if len(capacities) == 0: |
| return {'status': 'no_valid_data', 'message': 'No valid capacity values found'} |
|
|
| stats = { |
| 'status': 'success', |
| 'total_projects': len(capacities), |
| 'total_capacity_kwp': float(capacities.sum()), |
| 'average_capacity_kwp': float(capacities.mean()), |
| 'median_capacity_kwp': float(capacities.median()), |
| 'min_capacity_kwp': float(capacities.min()), |
| 'max_capacity_kwp': float(capacities.max()), |
| 'std_capacity_kwp': float(capacities.std()), |
| 'capacity_quartiles': { |
| '25%': float(capacities.quantile(0.25)), |
| '50%': float(capacities.quantile(0.50)), |
| '75%': float(capacities.quantile(0.75)) |
| } |
| } |
|
|
| return stats |
|
|
| except Exception as e: |
| return {'status': 'error', 'message': f'Error calculating statistics: {str(e)}'} |
|
|
|
|
| def get_completion_status_summary() -> Dict[str, Any]: |
| """Get summary of project completion status based on COD""" |
| sites_df = load_sites_database() |
|
|
| if sites_df.empty or 'COD' not in sites_df.columns: |
| return {'status': 'no_data', 'message': 'No COD data available'} |
|
|
| try: |
| |
| completed_df = filter_completed_projects(sites_df) |
| total_projects = len(sites_df) |
| completed_projects = len(completed_df) |
| incomplete_projects = total_projects - completed_projects |
|
|
| |
| completion_rate = (completed_projects / |
| total_projects * 100) if total_projects > 0 else 0 |
|
|
| return { |
| 'status': 'success', |
| 'total_projects': total_projects, |
| 'completed_projects': completed_projects, |
| 'incomplete_projects': incomplete_projects, |
| 'completion_rate_percent': round(completion_rate, 2) |
| } |
|
|
| except Exception as e: |
| return {'status': 'error', 'message': f'Error analyzing completion status: {str(e)}'} |
|
|
|
|
| def export_results_to_csv(results: list, filename: str) -> str: |
| """Export validation results to CSV format""" |
| if not results: |
| return "No results to export" |
|
|
| |
| df = pd.DataFrame(results) |
|
|
| |
| csv_string = df.to_csv(index=False) |
| return csv_string |
|
|
|
|
| def get_unique_values(column: str) -> list: |
| """Get unique values from a specific column in the sites database""" |
| sites_df = load_sites_database() |
|
|
| if sites_df.empty or column not in sites_df.columns: |
| return [] |
|
|
| return sorted(sites_df[column].dropna().unique().tolist()) |
|
|
|
|
| def get_site_bounding_box(lat: float, lon: float, tolerance: float = 0.001) -> Optional[Dict[str, Any]]: |
| """ |
| Get bounding box information for a site by coordinates |
| |
| Args: |
| lat: Latitude of the site |
| lon: Longitude of the site |
| tolerance: Search tolerance in degrees |
| |
| Returns: |
| Dictionary with bounding box info (north, south, east, west) if found, None otherwise |
| """ |
| bbox_df = load_bbox_sites_database() |
|
|
| if bbox_df.empty: |
| return None |
|
|
| |
| if 'Latitude' not in bbox_df.columns or 'Longitude' not in bbox_df.columns: |
| return None |
|
|
| |
| lat_diff = abs(bbox_df['Latitude'] - lat) |
| lon_diff = abs(bbox_df['Longitude'] - lon) |
|
|
| mask = (lat_diff <= tolerance) & (lon_diff <= tolerance) |
| matching_sites = bbox_df[mask] |
|
|
| if len(matching_sites) > 0: |
| |
| distances = lat_diff + lon_diff |
| closest_idx = distances[mask].idxmin() |
| site_info = bbox_df.loc[closest_idx].to_dict() |
|
|
| |
| if all(key in site_info for key in ['north', 'south', 'east', 'west']): |
| site_info['has_bbox'] = True |
| site_info['bbox'] = { |
| 'north': site_info['north'], |
| 'south': site_info['south'], |
| 'east': site_info['east'], |
| 'west': site_info['west'] |
| } |
| else: |
| site_info['has_bbox'] = False |
|
|
| return site_info |
|
|
| return None |
|
|
|
|
| def search_bbox_site_by_name(site_name: str) -> Optional[Dict[str, Any]]: |
| """ |
| Search for a site with bounding box information by name |
| |
| Args: |
| site_name: Name of the site to search for |
| |
| Returns: |
| Dictionary with site and bounding box info if found, None otherwise |
| """ |
| bbox_df = load_bbox_sites_database() |
|
|
| if bbox_df.empty: |
| return None |
|
|
| |
| if 'SiteName' in bbox_df.columns: |
| mask = bbox_df['SiteName'].astype(str).str.contains( |
| site_name, case=False, na=False) |
| matches = bbox_df[mask] |
|
|
| if len(matches) > 0: |
| site_info = matches.iloc[0].to_dict() |
|
|
| |
| if all(key in site_info for key in ['north', 'south', 'east', 'west']): |
| site_info['has_bbox'] = True |
| site_info['bbox'] = { |
| 'north': site_info['north'], |
| 'south': site_info['south'], |
| 'east': site_info['east'], |
| 'west': site_info['west'] |
| } |
| else: |
| site_info['has_bbox'] = False |
|
|
| return site_info |
|
|
| return None |
|
|
|
|
| def get_all_bbox_sites() -> pd.DataFrame: |
| """Get all sites with bounding box information""" |
| return load_bbox_sites_database() |
|
|
|
|
| def get_bbox_sites_summary() -> Dict[str, Any]: |
| """Get summary statistics of the bounding box sites database""" |
| bbox_df = load_bbox_sites_database() |
|
|
| if bbox_df.empty: |
| return {'status': 'empty', 'message': 'No bounding box sites database found'} |
|
|
| summary = { |
| 'status': 'loaded', |
| 'total_sites': len(bbox_df), |
| 'has_bbox_data': all(col in bbox_df.columns for col in ['north', 'south', 'east', 'west']) |
| } |
|
|
| |
| if 'Country' in bbox_df.columns: |
| summary['countries'] = bbox_df['Country'].value_counts().to_dict() |
| summary['total_countries'] = len(bbox_df['Country'].unique()) |
|
|
| if 'Capacity' in bbox_df.columns: |
| try: |
| capacities = pd.to_numeric(bbox_df['Capacity'], errors='coerce') |
| summary['capacity_stats'] = { |
| 'total_capacity_kw': capacities.sum(), |
| 'avg_capacity_kw': capacities.mean(), |
| 'min_capacity_kw': capacities.min(), |
| 'max_capacity_kw': capacities.max() |
| } |
| except Exception: |
| pass |
|
|
| return summary |
|
|
|
|
| def calculate_bbox_area(north: float, south: float, east: float, west: float) -> float: |
| """ |
| Calculate approximate area of a bounding box in square meters |
| |
| Args: |
| north, south, east, west: Bounding box coordinates in degrees |
| |
| Returns: |
| Area in square meters (approximate) |
| """ |
| |
| mid_lat = (north + south) / 2 |
|
|
| |
| lat_to_m = 111320 |
| |
| lon_to_m = 111320 * abs(mid_lat) |
|
|
| height_m = abs(north - south) * lat_to_m |
| width_m = abs(east - west) * lon_to_m |
|
|
| return height_m * width_m |
|
|