""" Data utilities for handling site databases and CSV operations. """ import pandas as pd import streamlit as st from typing import Optional, Dict, Any import os def _database_search_roots(): """Return directories that may contain deployment data files.""" module_dir = os.path.dirname(os.path.abspath(__file__)) return [ os.getcwd(), module_dir, os.path.join(module_dir, 'solarseg'), '/app', ] def _candidate_paths(*candidates: str): """Return all candidate file paths checked at runtime.""" paths = [] for root in _database_search_roots(): for candidate in candidates: paths.append(os.path.join(root, candidate)) return paths def _resolve_existing_path(*candidates: str) -> Optional[str]: """Return first existing path from known runtime locations.""" for path in _candidate_paths(*candidates): if os.path.exists(path): return path return None def get_database_file_status() -> Dict[str, Any]: """Return search diagnostics for built-in site database files.""" search_files = ('PT_sitesAll.csv', 'site_bounds.csv', 'solar_sites_bboxes.csv') searched_paths = _candidate_paths(*search_files) available_files = [path for path in searched_paths if os.path.exists(path)] return { 'search_files': list(search_files), 'searched_paths': searched_paths, 'available_files': available_files, } @st.cache_data(ttl=None) def load_sites_database() -> pd.DataFrame: """Load sites database from CSV file with caching and column mapping""" # Primary database: PowerTrust sites with full capacity data sites_file = _resolve_existing_path('PT_sitesAll.csv', 'site_bounds.csv') if sites_file: try: df = pd.read_csv(sites_file) print(f"✓ Loaded {len(df)} sites from {sites_file}") # Drop unnamed index column if present if 'Unnamed: 0' in df.columns: df = df.drop(columns=['Unnamed: 0']) # Map columns to expected names (handle various conventions) column_mapping = { # Common variations for site ID 'site_id': 'Id', 'id': 'Id', # Common variations for site name 'site_name': 'SiteName', 'name': 'SiteName', 'sitename': 'SiteName', # Other fields 'developer': 'Developer', 'country': 'Country', 'capacity_kw': 'Capacity', 'capacity': 'Capacity', 'cod': 'COD', # Common variations for coordinates 'latitude': 'Latitude', 'lat': 'Latitude', 'longitude': 'Longitude', 'lng': 'Longitude', 'lon': 'Longitude' } # Apply mapping (case-insensitive) existing_cols = {col.lower(): col for col in df.columns} rename_dict = {} for src, dst in column_mapping.items(): if src.lower() in existing_cols and dst not in df.columns: rename_dict[existing_cols[src.lower()]] = dst df = df.rename(columns=rename_dict) # Generate ID column if missing (use index) if 'Id' not in df.columns: df['Id'] = df.index # Add missing columns with default values if 'COD' not in df.columns: df['COD'] = '' if 'Technology Type' not in df.columns: df['Technology Type'] = 'Solar PV' if 'Capacity' not in df.columns: df['Capacity'] = 0.0 return df except Exception as e: st.warning(f"Error reading {sites_file}: {str(e)}") else: print("✗ Sites database file not found in known locations") return pd.DataFrame() # Return empty DataFrame if no file found @st.cache_data def load_bbox_sites_database() -> pd.DataFrame: """Load sites database with bounding box information from CSV file with caching""" # Try bbox-specific file first, fall back to main database bbox_file = _resolve_existing_path('solar_sites_bboxes.csv') if bbox_file: try: df = pd.read_csv(bbox_file) # Standardize column names for consistency if 'original_lat' in df.columns and 'original_lon' in df.columns: df['Latitude'] = df['original_lat'] df['Longitude'] = df['original_lon'] if 'capacity_kw' in df.columns: df['Capacity'] = df['capacity_kw'] if 'site' in df.columns: df['SiteName'] = df['site'] if 'country' in df.columns: df['Country'] = df['country'] return df except Exception as e: st.warning(f"Error reading {bbox_file}: {str(e)}") # Fall back to main database return load_sites_database() def get_combined_sites_database() -> pd.DataFrame: """Get combined sites database from both sources""" main_df = load_sites_database() bbox_df = load_bbox_sites_database() if main_df.empty and bbox_df.empty: return pd.DataFrame() elif main_df.empty: return bbox_df elif bbox_df.empty: return main_df # Combine both databases, prioritizing bbox data for sites that exist in both # Use outer join to include all sites try: # Create merge key based on coordinates (rounded to avoid floating point issues) if 'Latitude' in main_df.columns and 'Longitude' in main_df.columns: main_df['_merge_key'] = (main_df['Latitude'].round(4).astype(str) + '_' + main_df['Longitude'].round(4).astype(str)) if 'Latitude' in bbox_df.columns and 'Longitude' in bbox_df.columns: bbox_df['_merge_key'] = (bbox_df['Latitude'].round(4).astype(str) + '_' + bbox_df['Longitude'].round(4).astype(str)) # Merge, keeping all bbox sites and adding any additional main_df sites combined = pd.concat([bbox_df, main_df], ignore_index=True) combined = combined.drop_duplicates( subset=['_merge_key'], keep='first') combined = combined.drop(columns=['_merge_key']) return combined except Exception: # If merge fails, just concatenate return pd.concat([bbox_df, main_df], ignore_index=True) return main_df def search_site_by_coordinates(lat: float, lon: float, tolerance: float = 0.001) -> Optional[Dict[str, Any]]: """Search for site in database by coordinates within tolerance - checks both databases""" # First check bbox database (more precise data) bbox_site = get_site_bounding_box(lat, lon, tolerance) if bbox_site: return bbox_site # Fall back to main database sites_df = load_sites_database() if sites_df.empty: return None # Ensure required columns exist required_columns = ['Latitude', 'Longitude'] if not all(col in sites_df.columns for col in required_columns): return None # Find sites within tolerance lat_diff = abs(sites_df['Latitude'] - lat) lon_diff = abs(sites_df['Longitude'] - lon) mask = (lat_diff <= tolerance) & (lon_diff <= tolerance) matching_sites = sites_df[mask] if len(matching_sites) > 0: # Return the closest match distances = lat_diff + lon_diff closest_idx = distances[mask].idxmin() return sites_df.loc[closest_idx].to_dict() return None def search_site_by_name_or_id(query: str) -> Optional[Dict[str, Any]]: """Search for site by name or ID - checks both databases""" # First check bbox database bbox_site = search_bbox_site_by_name(query) if bbox_site: return bbox_site # Fall back to main database sites_df = load_sites_database() if sites_df.empty: return None # Search in multiple columns search_columns = ['SiteName', 'Id', 'Developer'] for col in search_columns: if col in sites_df.columns: mask = sites_df[col].astype(str).str.contains( query, case=False, na=False) matches = sites_df[mask] if len(matches) > 0: return matches.iloc[0].to_dict() return None def get_all_sites() -> pd.DataFrame: """Get all sites from database""" return load_sites_database() def validate_sites_database_format(df: pd.DataFrame) -> Dict[str, Any]: """Validate the format of sites database""" required_columns = [ 'Id', 'Developer', 'SiteName', 'COD', 'Latitude', 'Longitude', 'Country', 'Technology Type', 'Capacity' ] missing_columns = [] present_columns = [] for col in required_columns: if col in df.columns: present_columns.append(col) else: missing_columns.append(col) return { 'is_valid': len(missing_columns) == 0, 'missing_columns': missing_columns, 'present_columns': present_columns, 'total_rows': len(df), 'columns': list(df.columns) } def get_database_summary() -> Dict[str, Any]: """Get summary statistics of the sites database""" sites_df = load_sites_database() file_status = get_database_file_status() if sites_df.empty: return { 'status': 'empty', 'message': 'No sites database found', **file_status, } validation = validate_sites_database_format(sites_df) summary = { 'status': 'loaded', 'total_sites': len(sites_df), 'validation': validation, **file_status, } # Add statistics if data is present if 'Country' in sites_df.columns: summary['countries'] = sites_df['Country'].value_counts().to_dict() if 'Developer' in sites_df.columns: summary['developers'] = sites_df['Developer'].value_counts().to_dict() if 'Technology Type' in sites_df.columns: summary['technologies'] = sites_df['Technology Type'].value_counts().to_dict() if 'Capacity' in sites_df.columns: try: capacities = pd.to_numeric(sites_df['Capacity'], errors='coerce') summary['capacity_stats'] = { 'total_capacity': capacities.sum(), 'avg_capacity': capacities.mean(), 'min_capacity': capacities.min(), 'max_capacity': capacities.max() } except Exception: pass return summary def filter_sites_by_criteria(country: str = None, developer: str = None, technology: str = None, min_capacity: float = None, max_capacity: float = None) -> pd.DataFrame: """Filter sites database by various criteria""" sites_df = load_sites_database() if sites_df.empty: return sites_df filtered_df = sites_df.copy() if country and 'Country' in filtered_df.columns: filtered_df = filtered_df[filtered_df['Country'].str.contains( country, case=False, na=False)] if developer and 'Developer' in filtered_df.columns: filtered_df = filtered_df[filtered_df['Developer'].str.contains( developer, case=False, na=False)] if technology and 'Technology Type' in filtered_df.columns: filtered_df = filtered_df[filtered_df['Technology Type'].str.contains( technology, case=False, na=False)] if min_capacity is not None or max_capacity is not None: if 'Capacity' in filtered_df.columns: capacities = pd.to_numeric( filtered_df['Capacity'], errors='coerce') if min_capacity is not None: filtered_df = filtered_df[capacities >= min_capacity] if max_capacity is not None: filtered_df = filtered_df[capacities <= max_capacity] return filtered_df def filter_and_sort_sites_by_capacity(ascending: bool = True, min_capacity: float = None, max_capacity: float = None, exclude_incomplete: bool = False) -> pd.DataFrame: """Filter and sort sites by capacity with optional COD filtering""" sites_df = load_sites_database() if sites_df.empty: return sites_df filtered_df = sites_df.copy() # Convert capacity to numeric for proper filtering and sorting if 'Capacity' in filtered_df.columns: filtered_df['Capacity_numeric'] = pd.to_numeric( filtered_df['Capacity'], errors='coerce') # Filter by capacity range if min_capacity is not None: filtered_df = filtered_df[filtered_df['Capacity_numeric'] >= min_capacity] if max_capacity is not None: filtered_df = filtered_df[filtered_df['Capacity_numeric'] <= max_capacity] # Remove rows with invalid capacity values filtered_df = filtered_df.dropna(subset=['Capacity_numeric']) # Sort by capacity filtered_df = filtered_df.sort_values( 'Capacity_numeric', ascending=ascending) # Remove the temporary numeric column filtered_df = filtered_df.drop('Capacity_numeric', axis=1) # Filter out incomplete projects if requested if exclude_incomplete and 'COD' in filtered_df.columns: filtered_df = filter_completed_projects(filtered_df) return filtered_df def filter_completed_projects(sites_df: pd.DataFrame = None) -> pd.DataFrame: """Filter sites to show only completed projects based on COD (Commercial Operation Date)""" if sites_df is None: sites_df = load_sites_database() if sites_df.empty or 'COD' not in sites_df.columns: return sites_df filtered_df = sites_df.copy() # Convert COD to datetime for proper comparison try: # Try different date formats commonly used date_formats = ['%d/%m/%Y', '%m/%d/%Y', '%Y-%m-%d', '%d-%m-%Y'] for date_format in date_formats: try: filtered_df['COD_date'] = pd.to_datetime( filtered_df['COD'], format=date_format, errors='coerce') # If successful conversion, break if filtered_df['COD_date'].notna().sum() > 0: break except: continue # If no format worked, try pandas automatic parsing if filtered_df['COD_date'].notna().sum() == 0: filtered_df['COD_date'] = pd.to_datetime( filtered_df['COD'], errors='coerce') # Get current date current_date = pd.Timestamp.now() # Filter to show only projects with COD in the past (completed projects) mask = (filtered_df['COD_date'].notna()) & ( filtered_df['COD_date'] <= current_date) completed_projects = filtered_df[mask].copy() # Remove the temporary date column completed_projects = completed_projects.drop('COD_date', axis=1) return completed_projects except Exception as e: st.warning( f"Error parsing COD dates: {str(e)}. Returning all projects.") return filtered_df def filter_by_cod_range(start_date: str = None, end_date: str = None, date_format: str = '%d/%m/%Y') -> pd.DataFrame: """Filter sites by COD date range""" sites_df = load_sites_database() if sites_df.empty or 'COD' not in sites_df.columns: return sites_df filtered_df = sites_df.copy() try: # Convert COD to datetime filtered_df['COD_date'] = pd.to_datetime( filtered_df['COD'], format=date_format, errors='coerce') # Apply date filters if start_date: start_dt = pd.to_datetime(start_date, format=date_format) filtered_df = filtered_df[filtered_df['COD_date'] >= start_dt] if end_date: end_dt = pd.to_datetime(end_date, format=date_format) filtered_df = filtered_df[filtered_df['COD_date'] <= end_dt] # Remove rows with invalid dates and temporary column filtered_df = filtered_df.dropna(subset=['COD_date']) filtered_df = filtered_df.drop('COD_date', axis=1) return filtered_df except Exception as e: st.warning( f"Error filtering by COD range: {str(e)}. Returning original data.") return sites_df def get_capacity_statistics() -> Dict[str, Any]: """Get detailed capacity statistics from the sites database""" sites_df = load_sites_database() if sites_df.empty or 'Capacity' not in sites_df.columns: return {'status': 'no_data', 'message': 'No capacity data available'} try: capacities = pd.to_numeric( sites_df['Capacity'], errors='coerce').dropna() if len(capacities) == 0: return {'status': 'no_valid_data', 'message': 'No valid capacity values found'} stats = { 'status': 'success', 'total_projects': len(capacities), 'total_capacity_kwp': float(capacities.sum()), 'average_capacity_kwp': float(capacities.mean()), 'median_capacity_kwp': float(capacities.median()), 'min_capacity_kwp': float(capacities.min()), 'max_capacity_kwp': float(capacities.max()), 'std_capacity_kwp': float(capacities.std()), 'capacity_quartiles': { '25%': float(capacities.quantile(0.25)), '50%': float(capacities.quantile(0.50)), '75%': float(capacities.quantile(0.75)) } } return stats except Exception as e: return {'status': 'error', 'message': f'Error calculating statistics: {str(e)}'} def get_completion_status_summary() -> Dict[str, Any]: """Get summary of project completion status based on COD""" sites_df = load_sites_database() if sites_df.empty or 'COD' not in sites_df.columns: return {'status': 'no_data', 'message': 'No COD data available'} try: # Get completed and total projects completed_df = filter_completed_projects(sites_df) total_projects = len(sites_df) completed_projects = len(completed_df) incomplete_projects = total_projects - completed_projects # Calculate completion rate completion_rate = (completed_projects / total_projects * 100) if total_projects > 0 else 0 return { 'status': 'success', 'total_projects': total_projects, 'completed_projects': completed_projects, 'incomplete_projects': incomplete_projects, 'completion_rate_percent': round(completion_rate, 2) } except Exception as e: return {'status': 'error', 'message': f'Error analyzing completion status: {str(e)}'} def export_results_to_csv(results: list, filename: str) -> str: """Export validation results to CSV format""" if not results: return "No results to export" # Convert results to DataFrame df = pd.DataFrame(results) # Generate CSV string csv_string = df.to_csv(index=False) return csv_string def get_unique_values(column: str) -> list: """Get unique values from a specific column in the sites database""" sites_df = load_sites_database() if sites_df.empty or column not in sites_df.columns: return [] return sorted(sites_df[column].dropna().unique().tolist()) def get_site_bounding_box(lat: float, lon: float, tolerance: float = 0.001) -> Optional[Dict[str, Any]]: """ Get bounding box information for a site by coordinates Args: lat: Latitude of the site lon: Longitude of the site tolerance: Search tolerance in degrees Returns: Dictionary with bounding box info (north, south, east, west) if found, None otherwise """ bbox_df = load_bbox_sites_database() if bbox_df.empty: return None # Ensure required columns exist if 'Latitude' not in bbox_df.columns or 'Longitude' not in bbox_df.columns: return None # Find sites within tolerance lat_diff = abs(bbox_df['Latitude'] - lat) lon_diff = abs(bbox_df['Longitude'] - lon) mask = (lat_diff <= tolerance) & (lon_diff <= tolerance) matching_sites = bbox_df[mask] if len(matching_sites) > 0: # Return the closest match distances = lat_diff + lon_diff closest_idx = distances[mask].idxmin() site_info = bbox_df.loc[closest_idx].to_dict() # Extract bounding box if available if all(key in site_info for key in ['north', 'south', 'east', 'west']): site_info['has_bbox'] = True site_info['bbox'] = { 'north': site_info['north'], 'south': site_info['south'], 'east': site_info['east'], 'west': site_info['west'] } else: site_info['has_bbox'] = False return site_info return None def search_bbox_site_by_name(site_name: str) -> Optional[Dict[str, Any]]: """ Search for a site with bounding box information by name Args: site_name: Name of the site to search for Returns: Dictionary with site and bounding box info if found, None otherwise """ bbox_df = load_bbox_sites_database() if bbox_df.empty: return None # Search in site name column (case-insensitive) if 'SiteName' in bbox_df.columns: mask = bbox_df['SiteName'].astype(str).str.contains( site_name, case=False, na=False) matches = bbox_df[mask] if len(matches) > 0: site_info = matches.iloc[0].to_dict() # Add bounding box info if all(key in site_info for key in ['north', 'south', 'east', 'west']): site_info['has_bbox'] = True site_info['bbox'] = { 'north': site_info['north'], 'south': site_info['south'], 'east': site_info['east'], 'west': site_info['west'] } else: site_info['has_bbox'] = False return site_info return None def get_all_bbox_sites() -> pd.DataFrame: """Get all sites with bounding box information""" return load_bbox_sites_database() def get_bbox_sites_summary() -> Dict[str, Any]: """Get summary statistics of the bounding box sites database""" bbox_df = load_bbox_sites_database() if bbox_df.empty: return {'status': 'empty', 'message': 'No bounding box sites database found'} summary = { 'status': 'loaded', 'total_sites': len(bbox_df), 'has_bbox_data': all(col in bbox_df.columns for col in ['north', 'south', 'east', 'west']) } # Add statistics if data is present if 'Country' in bbox_df.columns: summary['countries'] = bbox_df['Country'].value_counts().to_dict() summary['total_countries'] = len(bbox_df['Country'].unique()) if 'Capacity' in bbox_df.columns: try: capacities = pd.to_numeric(bbox_df['Capacity'], errors='coerce') summary['capacity_stats'] = { 'total_capacity_kw': capacities.sum(), 'avg_capacity_kw': capacities.mean(), 'min_capacity_kw': capacities.min(), 'max_capacity_kw': capacities.max() } except Exception: pass return summary def calculate_bbox_area(north: float, south: float, east: float, west: float) -> float: """ Calculate approximate area of a bounding box in square meters Args: north, south, east, west: Bounding box coordinates in degrees Returns: Area in square meters (approximate) """ # Approximate conversion at mid-latitude mid_lat = (north + south) / 2 # Degrees to meters conversion (approximate) lat_to_m = 111320 # meters per degree latitude # meters per degree longitude (varies with latitude) lon_to_m = 111320 * abs(mid_lat) height_m = abs(north - south) * lat_to_m width_m = abs(east - west) * lon_to_m return height_m * width_m