import math import sys import textwrap from pathlib import Path import altair as alt import contextily as ctx import geopandas as gpd import matplotlib.dates as mdates import matplotlib.pyplot as plt import numpy as np import pandas as pd import plotly.graph_objects as go import scipy.stats as stats import seaborn as sns import streamlit as st from matplotlib.colors import LinearSegmentedColormap from matplotlib.figure import Figure from osgeo import gdal from plotly.subplots import make_subplots from utils.data_loading import timer COLOR_SCALE = [ "#6D3E91", "#C05917", "#58AC8C", "#286BBB", "#883039", "#BC8E5A", "#00295B", "#C15065", "#18470F", "#9A5129", "#E56E5A", "#A2559C", "#38AABA", "#578145", "#970046", "#00847E", "#B13507", "#4C6A9C", "#CF0A66", "#00875E", "#B16214", "#8C4569", "#3B8E1D", "#D73C50", ] @st.cache_data @timer(include_params=True) def plot_trends_by_station( df: pd.DataFrame, analyte_names: list[str], sample_position: str, figsize=(15, 12) ) -> Figure: """ Create subplots of analyte trends for the given dataframe and analytes. Parameters: ----------- df : pandas DataFrame The filtered dataframe containing data for a specific station and position analyte_names : list[str] List of analyte names to plot figsize : tuple Figure size in inches (width, height) """ # Calculate number of rows needed (2 columns) n_rows = (len(analyte_names) + 1) // 2 fig, axes = plt.subplots(n_rows, 2, figsize=figsize) axes = axes.flatten() # Flatten axes array for easier indexing station_number = df["Station_Number"].iloc[0] station_name = df["Name"].iloc[0] if sample_position == "All": sample_position_label = "Surface and Bottom" else: sample_position_label = sample_position for idx, analyte_name in enumerate(analyte_names): ax = axes[idx] data = ( df[df["Org_Analyte_Name"] == analyte_name] .assign( Year=lambda df: ( df["Reporting_Year"] if "Reporting_Year" in df.columns else df["Activity_Start_Date_Time"].dt.year ) ) .dropna(subset=["Org_Result_Value"]) ) if data.empty: ax.text( 0.5, 0.5, f"No data available for {analyte_name}", ha="center", va="center", ) continue # Determine if log scale should be used log_scale_analytes = [ "Turbidity", "Fecal Coliform (MPN)", "Total Nitrogen", "Total Phosphorus", ] log_scale = analyte_name in log_scale_analytes if log_scale: ax.set_yscale("log") ax.yaxis.set_major_formatter(plt.ScalarFormatter()) # type: ignore # Create box plot groups = data.groupby("Year", observed=True) positions = np.array(list(groups.groups.keys())) group_data = [group["Org_Result_Value"] for name, group in groups] ax.boxplot( group_data, positions=positions, widths=0.6, patch_artist=True, boxprops=dict(facecolor="lightblue", color="blue", alpha=0.5), medianprops=dict(color="blue"), whiskerprops=dict(color="blue"), capprops=dict(color="blue"), flierprops=dict(color="blue", markeredgecolor="blue", alpha=0.5), ) # Calculate and plot trend line yearly_means = data.groupby("Year", observed=True)["Org_Result_Value"].mean() X = yearly_means.index.values.reshape(-1, 1) y = yearly_means.values # Plot means ax.plot(X, y, "bo-", linewidth=1, markersize=4, label="Annual Mean") # Calculate trend line if len(X) > 1: # Only calculate trend if we have more than one point slope, intercept, r_value, p_value, std_err = stats.linregress(X.ravel(), y) trend_line = slope * X.ravel() + intercept ax.plot(X, trend_line, "r--", alpha=0.8, linewidth=1, label="Trend") # Add statistics stats_text = f"R²={r_value**2:.3f}\np={p_value:.3f}" # type: ignore ax.text( 0.02, 0.98, stats_text, transform=ax.transAxes, verticalalignment="top", bbox=dict(boxstyle="round", facecolor="white", alpha=0.8), parse_math=False, ) # Customize subplot ax.set_title(f"{analyte_name}", pad=15) ax.set_xlabel("Year") analyte_unit = data["Org_Result_Unit"].iloc[0] if analyte_name == "Depth, Secchi Disk Depth": y_label = f"Depth ({analyte_unit})" elif analyte_name == "pH": y_label = None elif analyte_name.startswith("Dissolved"): y_label = f"DO ({analyte_unit})" elif analyte_name.startswith("Fecal Coliform"): y_label = f"Fecal Coliform ({analyte_unit})" else: y_label = f"{analyte_name} ({analyte_unit})" ax.set_ylabel(y_label) ax.grid(True, alpha=0.3) # Add sample sizes for year, group in groups: ax.text( year, ax.get_ylim()[1], f"n={len(group)}", ha="center", va="bottom", fontsize=8, ) # Remove any unused subplots for idx in range(len(analyte_names), len(axes)): fig.delaxes(axes[idx]) # Add overall title with more space fig.suptitle( f"Water Quality Trends for {station_number} - {station_name} - {sample_position_label}", fontsize=14, y=0.95, ) # Adjust layout with more space plt.tight_layout(rect=(0, 0, 1, 0.95)) return fig @timer(include_params=True) def altair_plot_sector_trends( df: pd.DataFrame, analyte_names: list[str] ) -> alt.VConcatChart: """ Create plots of mean annual analyte trends by sector using Altair. Parameters: ----------- df : pd.DataFrame Input dataframe analyte_names : list[str] List of analytes to plot Returns: -------- alt.VConcatChart Vertically concatenated Altair charts for each analyte """ # Custom color scheme matching the matplotlib version color_scale = alt.Scale( domain=df["Sector"].unique().tolist(), range=[ "#1f77b4", # blue "#ff7f0e", # orange "#2ca02c", # green "#d62728", # red "#9467bd", # purple "#8c564b", # brown "#e377c2", # pink "#7f7f7f", # gray ], ) charts = [] for analyte_name in analyte_names: # Filter data for current analyte analyte_data = df[df["Org_Analyte_Name"] == analyte_name].copy() # For Salinity, exclude Fresh Water Lakes if analyte_name == "Salinity": analyte_data = analyte_data[analyte_data["Sector"] != "Fresh Water Lakes"] # Calculate annual means and standard errors using Reporting_Year processed_data = ( analyte_data.groupby(["Reporting_Year", "Sector"], observed=True)[ "Org_Result_Value" ] .agg(["mean", "sem"]) .reset_index() .rename(columns={"mean": "Mean", "sem": "SE"}) ) # Add confidence interval bounds processed_data["Upper"] = processed_data["Mean"] + processed_data["SE"] processed_data["Lower"] = processed_data["Mean"] - processed_data["SE"] # Get the unit for the y-axis label unit = analyte_data["Org_Result_Unit"].iloc[0] if not analyte_data.empty else "" # Determine if log scale should be used use_log_scale = analyte_name in [ "Turbidity", "Fecal Coliform (MPN)", "Total Nitrogen", "Total Phosphorus", ] # Create base chart base = alt.Chart(processed_data).encode( x=alt.X("Reporting_Year:O", axis=alt.Axis(title=None)), color=alt.Color("Sector:N", scale=color_scale), tooltip=[ alt.Tooltip("Reporting_Year:O"), alt.Tooltip("Sector:N"), alt.Tooltip("Mean:Q", format=".2f"), alt.Tooltip("SE:Q", format=".2f"), ], ) # Create line and point layers lines = base.mark_line().encode( y=alt.Y( "Mean:Q", title=f"({unit})", scale=alt.Scale(type="log" if use_log_scale else "linear"), ) ) points = base.mark_point(size=50).encode(y=alt.Y("Mean:Q")) # Create confidence interval area area = base.mark_area(opacity=0.15).encode( y=alt.Y("Lower:Q"), y2=alt.Y2("Upper:Q") ) # Combine layers chart = ( (area + lines + points) .properties( width=600, height=300, title=alt.TitleParams(text=analyte_name, anchor="middle", fontSize=14), ) .interactive() ) charts.append(chart) # Combine all charts vertically final_chart = alt.vconcat(*charts).configure( view={"strokeWidth": 0}, axis={"grid": True, "gridOpacity": 0.2} ) return final_chart def plotly_plot_analyte_trends(df: pd.DataFrame, analyte_names: list[str]) -> go.Figure: """ Create subplots of analyte trends using Plotly for the given dataframe and analytes. Parameters: ----------- df : pandas DataFrame The filtered dataframe containing data for a specific station and position analyte_names : list[str] List of analyte names to plot Returns: -------- go.Figure Plotly figure containing the subplots """ # Calculate number of rows needed (2 columns) n_rows = (len(analyte_names) + 1) // 2 # Create subplot figure fig = make_subplots( rows=n_rows, cols=2, subplot_titles=analyte_names, vertical_spacing=0.12, horizontal_spacing=0.1, ) station_number = df["Station_Number"].iloc[0] sample_position = df["Sample_Position"].iloc[0] for idx, analyte_name in enumerate(analyte_names): row = idx // 2 + 1 col = idx % 2 + 1 data = ( df[df["Org_Analyte_Name"] == analyte_name] .assign(Year=lambda df: df["Activity_Start_Date_Time"].dt.year) .dropna(subset=["Org_Result_Value"]) ) if data.empty: fig.add_annotation( text=f"No data available for {analyte_name}", xref=f"x{idx+1}", yref=f"y{idx+1}", x=0.5, y=0.5, showarrow=False, row=row, col=col, ) continue # Determine if log scale should be used log_scale = analyte_name in ["Turbidity", "Fecal Coliform (MPN)"] # Create box plot groups = data.groupby("Year", observed=True) years = list(groups.groups.keys()) # Add box plot fig.add_trace( go.Box( x=data["Year"], y=data["Org_Result_Value"], name="Box Plot", boxpoints="outliers", line=dict(color="blue"), fillcolor="lightblue", showlegend=False, ), row=row, col=col, ) # Calculate and plot means yearly_means = data.groupby("Year", observed=True)["Org_Result_Value"].mean() # Add mean line fig.add_trace( go.Scatter( x=years, y=yearly_means.values, mode="lines+markers", name="Annual Mean", line=dict(color="blue"), showlegend=False, ), row=row, col=col, ) # Calculate and add trend line if len(years) > 1: X = np.array(years) y = yearly_means.values slope, intercept, r_value, p_value, std_err = stats.linregress(X, y) trend_line = slope * X + intercept fig.add_trace( go.Scatter( x=years, y=trend_line, mode="lines", name="Trend", line=dict(color="red", dash="dash"), showlegend=False, ), row=row, col=col, ) # Add statistics annotation stats_text = f"R² = {r_value**2:.3f}
p = {p_value:.3f}" # type: ignore fig.add_annotation( text=stats_text, xref=f"x{idx+1}", yref=f"y{idx+1}", x=min(years), # type: ignore y=max(data["Org_Result_Value"]), showarrow=False, bgcolor="white", bordercolor="black", borderwidth=1, row=row, col=col, ) # Add sample size annotations for year, group in groups: fig.add_annotation( text=f"n={len(group)}", x=year, y=max(data["Org_Result_Value"]), showarrow=False, font=dict(size=8), row=row, col=col, ) # Update axes if log_scale: fig.update_yaxes(type="log", row=row, col=col) fig.update_xaxes(title_text="Year", row=row, col=col) fig.update_yaxes( title_text=f'Value ({data["Org_Result_Unit"].iloc[0]})', row=row, col=col ) # Update layout fig.update_layout( title=f"Water Quality Trends
Station {station_number} - {sample_position}", title_x=0.5, showlegend=False, height=300 * n_rows + 100, width=1000, template="plotly_white", ) return fig @timer(include_params=True) def plot_sector_trends( df: pd.DataFrame, analyte_names: list[str], base_height: float = 4 ) -> Figure: """ Create plots of mean annual analyte trends by sector. Parameters: ----------- df : pd.DataFrame Input dataframe analyte_names : list[str] List of analytes to plot base_height : float Height per subplot in inches (default=4) """ # Calculate figure dimensions n_rows = len(analyte_names) fig_height = base_height * n_rows # Create figure with dynamic height fig, axes = plt.subplots(n_rows, 1, figsize=(15, fig_height)) if n_rows == 1: axes = [axes] custom_colors = [ "#1f77b4", # blue "#ff7f0e", # orange "#2ca02c", # green "#d62728", # red "#9467bd", # purple "#8c564b", # brown "#e377c2", # pink "#7f7f7f", # gray ] for idx, analyte_name in enumerate(analyte_names): ax = axes[idx] # Filter data for current analyte analyte_data = df[df["Org_Analyte_Name"] == analyte_name] # For Salinity, exclude Fresh Water Lakes if analyte_name == "Salinity": analyte_data = analyte_data[analyte_data["Sector"] != "Freshwater Lakes"] # Plot each sector with custom colors for sector, color in zip(df["Sector"].unique(), custom_colors): sector_data = ( analyte_data[analyte_data["Sector"] == sector] .groupby("Reporting_Year", observed=True)["Org_Result_Value"] .agg(["mean", "sem"]) .reset_index() ) if not sector_data.empty: # Plot mean line with error bands ax.plot( sector_data["Reporting_Year"], sector_data["mean"], "-o", color=color, label=sector, markersize=4, linewidth=2, ) # Add error bands with slightly reduced opacity ax.fill_between( sector_data["Reporting_Year"], sector_data["mean"] - sector_data["sem"], sector_data["mean"] + sector_data["sem"], color=color, alpha=0.15, # Reduced opacity for better visibility ) # Set x-axis to show only whole years years = sorted(analyte_data["Reporting_Year"].unique()) ax.set_xticks(years) ax.set_xticklabels(years) # Customize subplot with lighter titles and no x-label ax.set_title(analyte_name, pad=10, fontsize=11, fontweight="normal") ax.set_xlabel("") if not analyte_data.empty: analyte_unit = analyte_data["Org_Result_Unit"].iloc[0] ax.set_ylabel(f"({analyte_unit})", fontsize=10) # Improve grid appearance ax.grid(True, alpha=0.2, linestyle="--") ax.spines["top"].set_visible(False) ax.spines["right"].set_visible(False) # Simplified legend appearance (removed 3D effects) ax.legend( bbox_to_anchor=(1.05, 1), loc="upper left", borderaxespad=0.0, frameon=True, fancybox=False, shadow=False, fontsize=9, ) if analyte_name in [ "Turbidity", "Fecal Coliform (MPN)", "Total Nitrogen", "Total Phosphorus", ]: ax.set_yscale("log") # Adjust layout with more vertical space between subplots plt.tight_layout(rect=(0, 0, 0.85, 1), h_pad=2.0) return fig @st.cache_data @timer(include_params=True) def plot_parameter_correlations( df: pd.DataFrame, analyte_names: list[str], subset_by: str, subset: str, filter_by: str, threshold: float = 0.2, ) -> tuple[Figure, pd.DataFrame]: """ Creates a correlation heatmap showing relationships between water quality parameters, with additional information about data completeness. Parameters ---------- df : pd.DataFrame Input DataFrame containing water quality measurements. Must have columns: - Org_Analyte_Name: Name of the analyte - Org_Result_Value: Measurement value - Activity_Start_Date_Time: Timestamp of measurement - Reporting_Year: Year of measurement - Station_Number: Monitoring station identifier - Name: Station name - Sample_Position: Sample depth position (e.g., "Surface", "Bottom") analyte_names : list[str] List of analyte names to include in correlation analysis subset_by : str Column name used for subsetting the data (e.g., "Sector", "Waterbody_Class") subset : str Value within subset_by column to filter data (e.g., specific sector name) filter_by : str Sample position filter ("Surface", "Bottom", or "All") threshold : float, default=0.2 Minimum data completeness threshold (0-1). Parameters with completeness below this threshold will be excluded from correlation analysis but listed in footnote. Returns ------- tuple[Figure, pd.DataFrame] - Figure: Matplotlib figure containing: - Correlation heatmap with values - Title showing subset and sample size - Footnote listing excluded parameters - DataFrame: Pivot table of filtered data used for correlation analysis Notes ----- - Uses abbreviated parameter names for cleaner display (e.g., "DO" for "Dissolved Oxygen") - Masks upper triangle of correlation matrix - Colors correlations using RdBu_r colormap centered at 0 - Includes data completeness information in footnote - Caches results using streamlit cache decorator """ measured_params = ( df[df["Org_Analyte_Name"].isin(analyte_names)] .groupby("Org_Analyte_Name", observed=True) .size() ) # Create pivot table only for measured parameters that were requested pivot_df = df[ df["Org_Analyte_Name"].isin(set(measured_params.index) & set(analyte_names)) ].pivot_table( index="Activity_Start_Date_Time", columns="Org_Analyte_Name", values="Org_Result_Value", observed=False, ) name_mapping = { "Depth, Secchi Disk Depth": "Secchi Depth", "Dissolved Oxygen": "DO", "Fecal Coliform (MPN)": "Fecal Coliform", "Total Nitrogen": "TN", "Total Phosphorus": "TP", } # Calculate completeness based on number of measurements completeness = {} for param in measured_params.index: if param in analyte_names and param in pivot_df.columns: total_measurements = measured_params[param] # Use original name to get values from pivot_df valid_values = pivot_df[param].notna().sum() # Store result using new name if it exists new_name = name_mapping.get(param, param) completeness[new_name] = valid_values / total_measurements completeness = pd.Series(completeness) pivot_df = pivot_df.rename(columns=name_mapping) # Calculate data completeness for each parameter completeness = pivot_df.notna().mean() valid_params = completeness[completeness >= threshold].index excluded_params = completeness[completeness < threshold] # Filter pivot_df to only include parameters meeting the threshold pivot_df = pivot_df[valid_params] # Calculate correlation matrix corr = pivot_df.corr() # Calculate sample size n_samples = len(df) fig = plt.figure(figsize=(6, 7)) # Adjust gridspec ratios and spacing gs = fig.add_gridspec( 3, 1, height_ratios=[ 1, # Title space 4, # Heatmap 1.5, # Footnote ], hspace=0.4, ) # Add title axes, heatmap axes, and footnote axes title_ax = fig.add_subplot(gs[0]) heatmap_ax = fig.add_subplot(gs[1]) footnote_ax = fig.add_subplot(gs[2]) # Create heatmap mask = np.triu(np.ones_like(corr, dtype=bool)) heatmap = sns.heatmap( corr, mask=mask, annot=True, cmap="RdBu_r", center=0, vmin=-1, vmax=1, ax=heatmap_ax, yticklabels=1, cbar=True, xticklabels=1, ) # Rotate x-axis labels and adjust their position heatmap_ax.set_xticklabels( heatmap_ax.get_xticklabels(), rotation=45, ha="right", rotation_mode="anchor" ) heatmap_ax.tick_params(axis="x", pad=10) # Fix the colorbar ticks warning by setting ticks first colorbar = heatmap.figure.axes[-1] # type: ignore ticks = colorbar.get_yticks() colorbar.set_yticks(ticks) tick_labels = [f"{x:>8.2f}" for x in ticks] colorbar.set_yticklabels(tick_labels) # Rotate y-axis labels to horizontal heatmap_ax.set_yticklabels(heatmap_ax.get_yticklabels(), rotation=0) # Remove axis labels heatmap_ax.set_xlabel("") heatmap_ax.set_ylabel("") # Configure footnote axis footnote_ax.set_frame_on(False) # Hide the frame footnote_ax.set_xticks([]) # Remove x-ticks footnote_ax.set_yticks([]) # Remove y-ticks # Add footnote with adjusted position if not excluded_params.empty: footnote_text = "Excluded parameters (<{:.0%} data completeness):\n".format( threshold ) for param, completeness_val in excluded_params.items(): footnote_text += f" - {param}: {completeness_val:.1%} complete\n" footnote_ax.text( 0.01, 0.40, footnote_text.rstrip(), ha="left", va="center", fontsize=9, fontstyle="italic", transform=footnote_ax.transAxes, ) title_ax.set_frame_on(False) title_ax.set_xticks([]) title_ax.set_yticks([]) display_filter = "Surface and Bottom" if filter_by == "All" else filter_by # Add year information to the subtitle year_info = ( f"Reporting Year {df['Reporting_Year'].iloc[0]}" if len(df["Reporting_Year"].unique()) == 1 else "All Years" ) # Add titles - using figure coordinates with adjusted positions title_ax.text( 0.45, 0.8, f"{subset_by}: {subset}", ha="center", va="center", fontsize=12, fontweight="bold", transform=fig.transFigure, ) title_ax.text( 0.45, 0.75, f"{display_filter}, {year_info} (n={n_samples:,})", ha="center", va="bottom", fontsize=10, fontstyle="italic", transform=fig.transFigure, ) # Replace tight_layout with more explicit spacing control # First, calculate the figure bounds fig.canvas.draw() # Get the tight_bbox renderer = fig.canvas.get_renderer() # type: ignore fig.get_tightbbox(renderer) # Adjust the subplot positions manually fig.subplots_adjust(left=0.1, right=0.95, bottom=0.02, top=0.85, hspace=0.4) return fig, pivot_df def plot_np_ratios(df: pd.DataFrame) -> Figure: # Create dataframe with N, P, and Sector information nutrients_df = ( df[df["Org_Analyte_Name"].isin(["Total Nitrogen", "Total Phosphorus"])] .pivot_table( index=["Activity_Start_Date_Time", "Sector"], columns="Org_Analyte_Name", values="Org_Result_Value", observed=True, ) .reset_index() ) # Calculate N:P ratio nutrients_df["N:P Ratio"] = ( nutrients_df["Total Nitrogen"] / nutrients_df["Total Phosphorus"] ) # Create figure with two subplots fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10)) # Time series plot with colors by sector sns.scatterplot( data=nutrients_df, x="Activity_Start_Date_Time", y="N:P Ratio", hue="Sector", ax=ax1, alpha=0.6, ) ax1.axhline(y=16, color="r", linestyle="--", label="Redfield Ratio (16:1)") ax1.set_ylabel("N:P Ratio") ax1.set_xlabel("Date") ax1.set_title("N:P Ratio Over Time") # Adjust legend position ax1.legend(bbox_to_anchor=(1.05, 1), loc="upper left") # Histogram plot sns.histplot(x=nutrients_df["N:P Ratio"].dropna(), ax=ax2) ax2.axvline(x=16, color="r", linestyle="--", label="Redfield Ratio (16:1)") ax2.set_xlabel("N:P Ratio") ax2.set_title("Distribution of N:P Ratios") ax2.legend() # Adjust layout to accommodate legend plt.tight_layout(rect=(0, 0, 0.9, 1)) return fig def altair_plot_np_ratios(df: pd.DataFrame) -> alt.VConcatChart: # Create dataframe with N, P, and Sector information nutrients_df = ( df[df["Org_Analyte_Name"].isin(["Total Nitrogen", "Total Phosphorus"])] .pivot_table( index=["Activity_Start_Date_Time", "Sector"], columns="Org_Analyte_Name", values="Org_Result_Value", observed=True, ) .reset_index() ) # Calculate N:P ratio nutrients_df["N:P Ratio"] = ( nutrients_df["Total Nitrogen"] / nutrients_df["Total Phosphorus"] ) # Time series plot with colors by sector time_series = ( alt.Chart(nutrients_df) .mark_circle(size=60) .encode( x=alt.X( "Activity_Start_Date_Time:T", axis=alt.Axis(format="%Y", tickCount="year"), title="Date", ), y=alt.Y(r"N\:P Ratio:Q", title="N:P Ratio"), color="Sector:N", tooltip=[ alt.Tooltip("Activity_Start_Date_Time:T", title="Date"), alt.Tooltip(r"N\:P Ratio:Q", format=".0f", title="N:P Ratio"), alt.Tooltip("Sector:N", title="Sector"), ], ) .properties(title="N:P Ratio Over Time", width=600, height=300) .interactive() ) # Add Redfield Ratio line redfield_line = ( alt.Chart(pd.DataFrame({"y": [16]})).mark_rule(color="red").encode(y="y:Q") ) # Histogram plot histogram = ( alt.Chart(nutrients_df) .mark_bar() .encode( x=alt.X(r"N\:P Ratio:Q", bin=alt.Bin(maxbins=30), title="N:P Ratio"), y="count()", tooltip=["count()"], ) .properties(title="Distribution of N:P Ratios", width=600, height=300) .interactive() ) # Add Redfield Ratio line to histogram redfield_hist_line = ( alt.Chart(pd.DataFrame({"x": [16]})).mark_rule(color="red").encode(x="x:Q") ) # Combine plots combined_chart = alt.vconcat( time_series + redfield_line, histogram + redfield_hist_line ).resolve_scale(y="independent") return combined_chart def plot_calendar_heatmap( df: pd.DataFrame, analyte: str, colormap: str | None = None, position_filter: str = "All", ) -> Figure: data = df[df["Org_Analyte_Name"] == analyte].copy() if data.empty: raise ValueError( f"No data available for {analyte} with position filter: {position_filter}" ) result_unit = data["Org_Result_Unit"].iloc[0] if not data.empty else "" data["Year"] = data["Activity_Start_Date_Time"].dt.year data["Month"] = data["Activity_Start_Date_Time"].dt.month pivot_data = data.pivot_table( values="Org_Result_Value", index="Year", columns="Month", aggfunc="mean" ) # Choose appropriate colormap based on analyte type if analyte in ["Fecal Coliform (MPN)"]: cmap = "viridis" # Blue-green-yellow elif analyte in ["Temperature, Water"]: cmap = "coolwarm" elif analyte in ["Dissolved Oxygen"]: cmap = "RdYlBu" elif analyte in ["Total Nitrogen", "Total Phosphorus"]: cmap = "GnBu" # Green-Blue elif analyte in ["Depth, Secchi Disk Depth"]: cmap = "Blues_r" else: cmap = "Blues" # Default blue gradient # If colormap is set, override the analyte-specific default if colormap: cmap = colormap fig, ax = plt.subplots(figsize=(6, len(pivot_data) * 0.5)) # Create heatmap sns.heatmap( pivot_data, cmap=cmap, annot=True, fmt=".2f", cbar_kws={"label": result_unit}, annot_kws={"size": 6}, ) if position_filter == "All": position_filter = "Surface and Bottom" ax.set_title( f"Monthly Averages: {analyte} ({position_filter.lower()})", fontsize=10, pad=10 ) ax.tick_params(axis="both", which="major", labelsize=7) ax.set_xlabel("Month", fontsize=6) ax.set_ylabel("Year", fontsize=6) # Get the colorbar and adjust its label size colorbar = ax.collections[0].colorbar colorbar.ax.tick_params(labelsize=7) # type: ignore colorbar.set_label(result_unit, size=7) # type: ignore return fig def plot_seasonal_salinity( salinity_data: pd.DataFrame, year: str, basemap_provider, alpha=0.5, shapefile_path="data/SAB/SAB.shp", reporting_end_month: int = 10, ): """ Create seasonal plots of mean salinity values by WBID with basemap. Uses configurable Reporting Year with meteorological seasons. Args: salinity_data: DataFrame containing salinity measurements year: Reporting Year to filter data for (str) reporting_end_month: Last month of the reporting year (1-12, default=10 for October) """ # Read and filter WBIDs wbids = gpd.read_file(shapefile_path) relevant_wbids = salinity_data["WBID"].unique() wbids = wbids[wbids["WBID"].isin(relevant_wbids)] wbids = wbids.to_crs(epsg=3857) # Process data - create a copy to avoid SettingWithCopyWarning year_data = salinity_data[salinity_data["Reporting_Year"] == int(year)].copy() # Function to determine quarter based on date and reporting year end def get_quarter(date, reporting_end_month): month = date.month # Calculate month offset to align with reporting year month_offset = (12 - reporting_end_month) % 12 # Adjust month to align with reporting year adjusted_month = ((month + month_offset) % 12) or 12 # Determine quarter (1-4) return f"Q{((adjusted_month - 1) // 3) + 1}" # Add quarter column year_data.loc[:, "quarter"] = year_data["Activity_Start_Date_Time"].apply( lambda x: get_quarter(x, reporting_end_month) ) # Calculate quarterly means seasonal_means = ( year_data.groupby(["WBID", "quarter"], observed=True)["Salinity"] .mean() .reset_index() ) fig = plt.figure(figsize=(20, 14)) # Create custom colormap with focused range colors = ["#08519c", "#73a9cf", "#fee090", "#fc8d59", "#d73027"] cmap = LinearSegmentedColormap.from_list("custom", colors, N=100) # Get global min/max for consistent colormap vmin = seasonal_means["Salinity"].min() vmax = 40 # Calculate map extent bounds = wbids.total_bounds x_buffer = (bounds[2] - bounds[0]) * 0.05 y_buffer = (bounds[3] - bounds[1]) * 0.05 extent = [ bounds[0] - x_buffer, bounds[2] + x_buffer, bounds[1] - y_buffer, bounds[3] + y_buffer, ] # Create subplots with tighter spacing gs = fig.add_gridspec( 2, 2, width_ratios=[1, 1], wspace=0.05, # Minimal horizontal space between plots hspace=-0.15, # More negative value to further reduce vertical space left=0.02, # Left margin right=0.98, # Right margin top=0.95, # Slightly reduced top margin to give more space bottom=0.05, # Slightly increased bottom margin to give more space ) # Function to get quarter date range def get_quarter_dates(quarter: str, year: int, reporting_end_month: int) -> str: # Calculate first month of reporting year first_month = (reporting_end_month % 12) + 1 # Calculate start month for each quarter quarter_num = int(quarter[1]) start_month = ((first_month - 1 + ((quarter_num - 1) * 3)) % 12) + 1 end_month = ((start_month + 2) % 12) or 12 # For Reporting Year X, the start date is actually in year X-1 if the month # is after the reporting end month start_year = int(year) - 1 if start_month > reporting_end_month else int(year) end_year = start_year if end_month < start_month: end_year += 1 start_date = pd.Timestamp(f"{start_year}-{start_month:02d}-01") end_date = pd.Timestamp( f"{end_year}-{end_month:02d}-{pd.Timestamp(f'{end_year}-{end_month:02d}').days_in_month}" ) return f"{start_date.strftime('%b %d, %Y')} - {end_date.strftime('%b %d, %Y')}" # Use quarters instead of seasons quarters = ["Q1", "Q2", "Q3", "Q4"] for idx, quarter in enumerate(quarters): ax = fig.add_subplot(gs[idx // 2, idx % 2]) quarter_data = seasonal_means[seasonal_means["quarter"] == quarter] merged = wbids.merge(quarter_data, on="WBID", how="left") # Plot WBIDs merged.plot( column="Salinity", ax=ax, cmap=cmap, vmin=vmin, vmax=vmax, alpha=0.7, missing_kwds={"color": "lightgrey", "alpha": 0.5}, ) ctx.add_basemap(ax, source=basemap_provider, zoom=11, alpha=alpha) # type: ignore ax.set_xlim(extent[0], extent[1]) ax.set_ylim(extent[2], extent[3]) # Get date range for this quarter date_range = get_quarter_dates(quarter, int(year), reporting_end_month) # Create title with two lines if idx < 2: # Top row ax.set_title( f"Quarter {quarter[1]} Mean Salinity\n{date_range}", pad=15, fontsize=10, ) else: # Bottom row ax.set_title( f"Quarter {quarter[1]} Mean Salinity\n{date_range}", pad=5, fontsize=10, ) ax.set_axis_off() # Add colorbar norm = plt.Normalize(vmin=vmin, vmax=vmax) # type: ignore sm = plt.cm.ScalarMappable(cmap=cmap, norm=norm) sm.set_array([]) fig.colorbar( sm, ax=fig.axes, orientation="vertical", label="Salinity (ppt)", pad=0.01, fraction=0.015, ticks=np.arange(0, 45, 5), # Add ticks every 5 units ) return fig def plot_seasonal_salinity_for_bays( salinity_data: pd.DataFrame, year: str, basemap_provider=ctx.providers.USGS.USTopo, # type: ignore alpha=0.5, shapefile_path="data/SAB/SAB.shp", wbids=None, reporting_end_month: int = 10, ): """ Create seasonal plots of mean salinity values by WBID for N, E, W, SAB, GL and Lake Powell. """ if wbids is None: wbids = gpd.read_file(shapefile_path) if wbids.crs is None: wbids.set_crs(epsg=6439, inplace=True) wbids = wbids.to_crs(epsg=3857) fig = plot_seasonal_salinity( salinity_data.query( "WBID.isin(['1061A', '1061B', '1061C', '1061D', '1061E', '1061F', '1061G', '1061H', '1055A'])" ), year=year, basemap_provider=basemap_provider, alpha=alpha, shapefile_path=shapefile_path, reporting_end_month=reporting_end_month, ) return fig def plot_do_temp_relationship(df: pd.DataFrame) -> Figure: """ Create a scatter plot of DO vs temperature with regression line using seaborn. Parameters: ----------- df : pd.DataFrame Input dataframe containing DO and temperature measurements Returns: -------- Figure Matplotlib figure containing the plot """ do_temp_data = ( df[df["Org_Analyte_Name"].isin(["Dissolved Oxygen", "Temperature, Water"])] .pivot_table( index=["Activity_Start_Date_Time", "Station_Number", "Sample_Position"], columns="Org_Analyte_Name", values="Org_Result_Value", observed=True, ) .reset_index() .dropna(subset=["Dissolved Oxygen", "Temperature, Water"]) ) # Create custom color palette matching DO timeseries custom_palette = {"Surface": "#5AA4D8", "Bottom": "#1B4B8A"} # Create plot with regression line and adjust the hue order g = sns.lmplot( data=do_temp_data, x="Temperature, Water", y="Dissolved Oxygen", hue="Sample_Position", hue_order=["Bottom", "Surface"], # Plot 'Bottom' first palette=custom_palette, scatter_kws={"alpha": 0.5, "zorder": 2, "s": 20}, # Scatter plots at zorder=2 line_kws={"zorder": 3, "linewidth": 1}, # Trend lines at zorder=3 height=8, aspect=1.5, legend=False, ) # Add DO threshold and set z-order ax = g.axes[0, 0] ax.axhline( y=4.8, color="#FF8C00", linestyle="--", alpha=0.9, zorder=1, linewidth=1 ) # Threshold line at zorder=1 ax.text( ax.get_xlim()[0], 4.9, " 4.8 mg/L DO threshold", ha="left", va="bottom", color="#FF8C00", alpha=0.9, ) # Customize spines - only show bottom spine ax.spines["top"].set_visible(False) ax.spines["right"].set_visible(False) ax.spines["left"].set_visible(False) ax.spines["bottom"].set_color("black") ax.spines["bottom"].set_linewidth(0.5) g.set_axis_labels("Water Temperature (°C)", "Dissolved Oxygen (mg/L)") ax.set_title("Dissolved Oxygen vs Water Temperature", pad=20, fontsize=16) # Adjust legend to show 'Surface' first handles, labels = ax.get_legend_handles_labels() # Reverse the order of handles and labels handles = handles[::-1] labels = labels[::-1] ax.legend( handles, labels, bbox_to_anchor=(1.0, 1.0), loc="upper right", frameon=False, handletextpad=0.5, ) # Add grid with matching style ax.grid(True, axis="y", alpha=0.15, linestyle="-", color="gray") # Remove tick marks but keep labels ax.tick_params(axis="y", which="both", length=0) # Set y-axis limits with some padding ymin = max(int(min(do_temp_data["Dissolved Oxygen"].min(), 4.8) * 0.9) - 1, 0) ymax = do_temp_data["Dissolved Oxygen"].max() * 1.1 ax.set_ylim(ymin, ymax) yticks = np.arange(ymin, ymax, 2) ax.set_yticks(yticks) return g.figure def plotly_plot_do_temp_relationship(df: pd.DataFrame) -> go.Figure: """ Create an interactive scatter plot of DO vs temperature with regression lines using Plotly. Matches the style and features of the original matplotlib/seaborn plot. Parameters: ----------- df : pd.DataFrame Input dataframe containing DO and temperature measurements Returns: -------- go.Figure Plotly figure object """ # Prepare the data similarly to the original function do_temp_data = ( df[df["Org_Analyte_Name"].isin(["Dissolved Oxygen", "Temperature, Water"])] .pivot_table( index=[ "Activity_Start_Date_Time", "Station_Number", "Sample_Position", "Sector", # Added for tooltip ], columns="Org_Analyte_Name", values="Org_Result_Value", observed=True, ) .reset_index() .dropna(subset=["Dissolved Oxygen", "Temperature, Water"]) ) # Create figure fig = go.Figure() # Colors matching seaborn's muted palette colors = {"Surface": "#8da0cb", "Bottom": "#fc8d62"} # Add scatter plots and regression lines for each position for position in ["Surface", "Bottom"]: pos_data = do_temp_data[do_temp_data["Sample_Position"] == position] # Add scatter plot fig.add_trace( go.Scatter( x=pos_data["Temperature, Water"], y=pos_data["Dissolved Oxygen"], mode="markers", name=position, marker=dict(color=colors[position], size=8, opacity=0.6), hovertemplate=( "Temperature: %{x:.1f}°C
" "DO: %{y:.1f} mg/L
" "Position: " + position + "
" "Station: %{customdata[0]}
" "Sector: %{customdata[1]}
" "" ), customdata=pos_data[["Station_Number", "Sector"]], ) ) # Calculate and add regression line z = np.polyfit(pos_data["Temperature, Water"], pos_data["Dissolved Oxygen"], 1) p = np.poly1d(z) x_range = np.linspace( pos_data["Temperature, Water"].min(), pos_data["Temperature, Water"].max(), 100, ) fig.add_trace( go.Scatter( x=x_range, y=p(x_range), mode="lines", line=dict(color=colors[position], dash="dash"), name=f"{position} Trend", hovertemplate=None, hoverinfo="skip", showlegend=False, ) ) # Add DO threshold line fig.add_hline( y=4.8, line=dict(color="#FF8C00", width=1, dash="dash"), opacity=0.5, annotation_text="4.8 mg/L DO threshold", annotation_position="left", annotation=dict( font=dict(color="#FF8C00", size=12), xanchor="left", yanchor="bottom", opacity=0.8, ), ) # Update layout fig.update_layout( title=dict( text="Dissolved Oxygen vs Water Temperature", x=0.5, y=0.95, xanchor="center", yanchor="top", font=dict(size=16), ), xaxis_title="Water Temperature (°C)", yaxis_title="Dissolved Oxygen (mg/L)", legend_title="Sample Position", legend=dict( yanchor="top", y=1, xanchor="left", x=1.05, ), template="plotly_white", width=800, height=600, showlegend=True, ) # Update axes fig.update_xaxes(showgrid=True, gridwidth=1, gridcolor="rgba(128, 128, 128, 0.2)") fig.update_yaxes(showgrid=True, gridwidth=1, gridcolor="rgba(128, 128, 128, 0.2)") return fig def altair_plot_do_temp_relationship(df: pd.DataFrame) -> alt.LayerChart: """ Create an interactive scatter plot of DO vs temperature with regression lines using Altair. Matches the style and features of the original matplotlib/seaborn plot. Parameters: ----------- df : pd.DataFrame Input dataframe containing DO and temperature measurements Returns: -------- alt.Chart Altair chart object """ # Prepare the data similarly to the original function do_temp_data = ( df[df["Org_Analyte_Name"].isin(["Dissolved Oxygen", "Temperature, Water"])] .pivot_table( index=[ "Activity_Start_Date_Time", "Station_Number", "Sample_Position", "Sector", ], columns="Org_Analyte_Name", values="Org_Result_Value", observed=True, ) .reset_index() .dropna(subset=["Dissolved Oxygen", "Temperature, Water"]) ) # Create the base scatter plot scatter = ( alt.Chart(do_temp_data) .mark_circle(size=60, opacity=0.6) .encode( x=alt.X( "Temperature, Water:Q", title="Water Temperature (°C)", scale=alt.Scale(zero=False), ), y=alt.Y( "Dissolved Oxygen:Q", title="Dissolved Oxygen (mg/L)", scale=alt.Scale(zero=False), ), color=alt.Color( "Sample_Position:N", scale=alt.Scale( domain=["Surface", "Bottom"], range=["#8da0cb", "#fc8d62"], # Muted blue and orange ), legend=alt.Legend(title="Sample Position"), ), tooltip=[ alt.Tooltip("Temperature, Water:Q", title="Temperature", format=".1f"), alt.Tooltip("Dissolved Oxygen:Q", title="DO", format=".1f"), alt.Tooltip("Sample_Position:N", title="Position"), alt.Tooltip("Sector:N", title="Sector"), alt.Tooltip("Station_Number:N", title="Station"), ], ) ) # Add regression lines for each Sample_Position regression = ( scatter.transform_regression( "Temperature, Water", "Dissolved Oxygen", groupby=["Sample_Position"] ) .mark_line(size=2) .encode( color=alt.Color( "Sample_Position:N", scale=alt.Scale( domain=["Surface", "Bottom"], range=["#8da0cb", "#fc8d62"] ), ) ) ) # Create DO threshold line threshold_df = pd.DataFrame({"y": [5]}) threshold_line = ( alt.Chart(threshold_df) .mark_rule(strokeDash=[4, 4], color="red", opacity=0.5) .encode(y="y:Q") ) # Add threshold label threshold_label = ( alt.Chart( pd.DataFrame({"x": [do_temp_data["Temperature, Water"].min()], "y": [5.1]}) ) .mark_text( align="left", baseline="bottom", color="red", opacity=0.5, text=" 5 mg/L DO threshold", ) .encode(x="x:Q", y="y:Q") ) # Combine all layers and configure final_chart = ( alt.layer(scatter, regression, threshold_line, threshold_label) .properties( width=800, height=750, ) .configure_axis(grid=True, gridOpacity=0.3) .interactive() ) return final_chart @timer(include_params=True) def generate_seasonal_plot(data, year, shapefile_path): """Generate the seasonal trends plot""" # Add debugging information wbids = gpd.read_file(shapefile_path) # Ensure input data has CRS set if isinstance(data, gpd.GeoDataFrame): if data.crs is None: # Assuming the input coordinates are in WGS84 (EPSG:4326) data.set_crs(epsg=4326, inplace=True) # Ensure shapefile has CRS set and transform to Web Mercator if wbids.crs is None: wbids.set_crs(epsg=6439, inplace=True) # Pre-transform to Web Mercator (EPSG:3857) here to avoid issues in plotting function wbids = wbids.to_crs(epsg=3857) if st.session_state.get("DEBUG", False): st.write("Debug Info:") st.write( { "Shapefile CRS": wbids.crs, "Input Data CRS": data.crs if isinstance(data, gpd.GeoDataFrame) else "Not a GeoDataFrame", "GDAL Version": gdal.VersionInfo() if "osgeo.gdal" in sys.modules else "Not available", "GeoPandas Version": gpd.__version__, "Python Version": sys.version, "File exists": Path(shapefile_path).exists(), "Associated files": list(Path(shapefile_path).parent.glob("*.*")), } ) return plot_seasonal_salinity_for_bays( data, year, shapefile_path=shapefile_path, wbids=wbids, reporting_end_month=st.session_state.reporting_month, ) def plot_do_timeseries( df: pd.DataFrame, period: str = "Yearly", sector: str = "All", epa_thresh: float = 4.8, ) -> Figure: """ Create a time series plot of dissolved oxygen levels for surface and bottom measurements. Reference: https://www.hudsonriver.org/ccmp/soe/water-quality/do Parameters: ----------- df : pd.DataFrame Filtered dataframe containing dissolved oxygen measurements period : str 'yearly' or 'monthly' aggregation period epa_thresh : float EPA threshold value for DO in mg/L Returns: -------- Figure Matplotlib figure containing the plot """ period = period.lower() # Filter for DO data and pivot for surface/bottom do_data = df[ (df["Org_Analyte_Name"] == "Dissolved Oxygen") & (df["Sample_Position"].isin(["Surface", "Bottom"])) ].copy() # Create time grouping based on period if period == "yearly": do_data["Period"] = do_data["Reporting_Year"] else: # monthly do_data["Period"] = pd.to_datetime( do_data["Activity_Start_Date_Time"] ).dt.to_period("M") do_data["Period_Start"] = do_data["Period"].dt.to_timestamp() # Calculate means for each position and period means = ( do_data.groupby(["Period", "Sample_Position"], observed=True)[ "Org_Result_Value" ] .mean() .reset_index() .pivot(index="Period", columns="Sample_Position", values="Org_Result_Value") ) # Create figure fig, ax = plt.subplots(figsize=(15, 8)) # Convert Period index to proper format for plotting if period == "yearly": x_values = np.array(means.index.astype(float)) # Explicitly create numpy array else: # Convert to numpy array of datetime64 x_values = np.array( [pd.Period(idx).to_timestamp() for idx in means.index], dtype="datetime64[ns]", ) # Plot connecting lines only (no markers) for i, (idx, row) in enumerate(means.iterrows()): x_val = x_values[i] ax.plot( [x_val, x_val], # Use scalar value instead of list [row["Bottom"], row["Surface"]], color="lightgray", linewidth=1, zorder=1, solid_capstyle="round", ) # Calculate dynamic point size based on number of points n_points = len(x_values) base_size = 80 # Maximum point size min_size = 20 # Minimum point size # Exponential decay formula: size decreases as number of points increases point_size = max( min_size, base_size * math.exp(-0.0015 * n_points), ) # Update scatter plot styling surface_scatter = ax.scatter( x_values, means["Surface"], color="#5AA4D8", s=point_size, zorder=2, label="Surface", edgecolors="white", linewidth=1, alpha=0.9, ) bottom_scatter = ax.scatter( x_values, means["Bottom"], color="#1B4B8A", s=point_size, zorder=2, label="Bottom", edgecolors="white", linewidth=1, alpha=0.9, ) # Update EPA threshold line threshold_line = ax.axhline( y=epa_thresh, color="#FF8C00", linestyle="--", alpha=0.9, linewidth=1, label=f"EPA threshold: {epa_thresh} mg/L", zorder=0, ) # Customize legend ax.legend( handles=[surface_scatter, bottom_scatter, threshold_line], loc="upper right", frameon=False, ncol=1, # Stack legend items vertically bbox_to_anchor=(1.0, 1.0), # Position at top right handletextpad=0.5, # Reduce space between handle and text ) # Customize spines - only show bottom spine ax.spines["top"].set_visible(False) ax.spines["right"].set_visible(False) ax.spines["left"].set_visible(False) ax.spines["bottom"].set_color("black") ax.spines["bottom"].set_linewidth(0.5) # Customize plot with modified grid and axis settings ax.set_xlabel("Year" if period == "yearly" else "Month") ax.set_ylabel("Dissolved Oxygen (mg/L)") ax.set_title("Long-term Dissolved Oxygen Trends") ax.grid(True, axis="y", alpha=0.15, linestyle="-", color="gray") # Set y-axis limits with some padding ymin = max(int(min(means["Bottom"].min(), epa_thresh) * 0.9) - 1, 0) # ymin = 0 ymax = means["Surface"].max() * 1.1 ax.set_ylim(ymin, ymax) yticks = np.arange(ymin, ymax, 2) ax.set_yticks(yticks) # Remove tick marks but keep labels ax.tick_params(axis="y", which="both", length=0) # Adjust x-axis ticks and limits if period == "monthly": ax.xaxis.set_major_formatter(mdates.DateFormatter("%Y")) ax.xaxis.set_major_locator(mdates.YearLocator()) plt.xticks(rotation=0) # Convert to datetime for padding start_date = mdates.date2num( pd.Timestamp(min(x_values)) - pd.DateOffset(months=1) ) end_date = mdates.date2num( pd.Timestamp(max(x_values)) + pd.DateOffset(months=1) ) ax.set_xlim(mdates.num2date(start_date), mdates.num2date(end_date)) else: # For yearly data, ensure whole number ticks but month-based padding min_year = float(np.floor(min(x_values))) max_year = float(np.ceil(max(x_values))) # Set whole number ticks years = np.arange(min_year, max_year + 1) ax.set_xticks(years) # Set limits with one month padding ax.set_xlim( min_year - 0.083, max_year + 0.083 ) # ~1/12 of a year for month padding # Move y-axis labels to the left of the gridlines ax.yaxis.tick_left() ax.yaxis.set_label_position("left") plt.tight_layout() return fig def plot_do_scatter( df: pd.DataFrame, sector: str = "All", thresh: float = 3.0, ) -> Figure: """ Create a scatter plot of all dissolved oxygen measurements. Parameters: ----------- df : pd.DataFrame Filtered dataframe containing dissolved oxygen measurements sector : str Sector to filter by, or 'All' for all sectors thresh : float Threshold value for DO in mg/L Returns: -------- Figure Matplotlib figure containing the plot """ # Filter for DO data do_data = df[ (df["Org_Analyte_Name"] == "Dissolved Oxygen") & (df["Sample_Position"].isin(["Surface", "Bottom"])) ].copy() # Create figure with specific dimensions fig, ax = plt.subplots(figsize=(15, 8)) # Plot surface and bottom measurements with smaller points surface_data = do_data[do_data["Sample_Position"] == "Surface"] bottom_data = do_data[do_data["Sample_Position"] == "Bottom"] # Plot points ax.scatter( surface_data["Activity_Start_Date_Time"], surface_data["Org_Result_Value"], color="#1f77b4", # Darker blue for surface s=25, alpha=0.5, label="Surface", zorder=2, ) ax.scatter( bottom_data["Activity_Start_Date_Time"], bottom_data["Org_Result_Value"], color="#7fbf7b", # Muted green for bottom s=25, alpha=0.5, label="Bottom", zorder=2, ) # Add Hurricane Michael vertical line and annotation if within date range hurricane_date = pd.Timestamp("2018-10-10") # Get the date range of the plotted data data_start = min(do_data["Activity_Start_Date_Time"]) data_end = max(do_data["Activity_Start_Date_Time"]) # Only add hurricane line and annotation if the date falls within the data range if data_start <= hurricane_date <= data_end: # Get y-axis limits for line placement ymin, ymax = ax.get_ylim() line_height = ymax * 0.95 # Add vertical line with dot at top ax.axvline( x=hurricane_date, # type: ignore color="gray", linestyle="-", alpha=0.6, linewidth=1, ymin=0, ymax=line_height / ymax, zorder=1, ) # Add dot at top of line ax.scatter( [hurricane_date], # type: ignore [line_height], color="gray", s=25, alpha=0.8, zorder=2, ) # Add two-line annotation with bold date ax.annotate( "Oct 2018", xy=(hurricane_date, line_height), # type: ignore xytext=(5, 0), textcoords="offset points", ha="left", va="bottom", color="gray", fontsize=10, weight="bold", ) ax.annotate( "Hurricane Michael", xy=(hurricane_date, line_height), # type: ignore xytext=(5, -12), textcoords="offset points", ha="left", va="bottom", color="gray", fontsize=10, ) # Add threshold line ax.axhline( y=thresh, color="red", linestyle=":", alpha=0.9, linewidth=1.5, label=f"Threshold: {thresh} mg/L", zorder=1, ) # Customize legend with larger font ax.legend( loc="upper right", frameon=True, ncol=1, bbox_to_anchor=(1.0, 1.0), handletextpad=0.5, fontsize=12, # Increased font size ) # Customize spines - only show bottom spine ax.spines["top"].set_visible(False) ax.spines["right"].set_visible(False) ax.spines["left"].set_visible(False) ax.spines["bottom"].set_color("black") ax.spines["bottom"].set_linewidth(0.5) # Set labels and title title = "DO mg/L" if sector != "All": title += f" - {sector}" ax.set_title(title, fontsize=14) # Increased font size # Add grid ax.grid(True, axis="both", alpha=0.15, linestyle="-", color="gray") # Set y-axis limits with padding ymin = max(int(min(do_data["Org_Result_Value"].min(), thresh) * 0.9) - 1, 0) ymax = do_data["Org_Result_Value"].max() * 1.1 ax.set_ylim(ymin, ymax) yticks = np.arange(ymin, ymax, 2) ax.set_yticks(yticks) # Remove tick marks but keep labels ax.tick_params(axis="y", which="both", length=0) # Format x-axis years = mdates.YearLocator() ax.xaxis.set_major_locator(years) ax.xaxis.set_major_formatter(mdates.DateFormatter("%Y")) plt.tight_layout() return fig def plot_scatter( df: pd.DataFrame, parameter: str, sector: str = "All", thresh: float | None = None, ) -> tuple[Figure, pd.DataFrame]: """ Create a scatter plot of water quality measurements for any parameter. Parameters: ----------- df : pd.DataFrame Filtered dataframe containing water quality measurements parameter : str Name of the parameter to plot (e.g., "Dissolved Oxygen", "Temperature, Water") sector : str Sector to filter by, or 'All' for all sectors thresh : float | None Optional threshold value to display on plot Returns: -------- tuple[Figure, pd.DataFrame] - Figure: Matplotlib figure containing the scatter plot - DataFrame: Filtered dataframe containing the parameter data used in the plot """ # Filter for parameter data param_data = df[ (df["Org_Analyte_Name"] == parameter) & (df["Sample_Position"].isin(["Surface", "Bottom"])) ].copy() if param_data.empty: raise ValueError(f"No data found for parameter: {parameter}") # Get the unit for y-axis label unit = param_data["Org_Result_Unit"].iloc[0] # Create figure with specific dimensions fig, ax = plt.subplots(figsize=(15, 8)) # Plot surface and bottom measurements surface_data = param_data[param_data["Sample_Position"] == "Surface"] bottom_data = param_data[param_data["Sample_Position"] == "Bottom"] # Determine if log scale should be used log_scale_parameters = [ "Turbidity", "Fecal Coliform (MPN)", "Total Nitrogen", "Total Phosphorus", "Color", ] log_scale = parameter in log_scale_parameters if log_scale: ax.set_yscale("log") ax.yaxis.set_major_formatter(plt.ScalarFormatter()) # type: ignore # For log scale, set limits based on order of magnitude ymin = max( param_data["Org_Result_Value"].min() * 0.5, 0.1 ) # Don't go below 0.1 ymax = param_data["Org_Result_Value"].max() * 2 if thresh is not None: ymin = min(ymin, thresh * 0.5) ax.set_ylim(ymin, ymax) # Generate log-spaced ticks log_ymin = np.floor(np.log10(ymin)) log_ymax = np.ceil(np.log10(ymax)) yticks = np.logspace(log_ymin, log_ymax, int(log_ymax - log_ymin) + 1) ax.set_yticks(yticks) ax.yaxis.set_major_formatter(plt.ScalarFormatter()) # type: ignore ax.yaxis.set_minor_formatter(plt.NullFormatter()) # type: ignore else: # Existing linear scale code ymin = param_data["Org_Result_Value"].min() * 0.9 ymax = param_data["Org_Result_Value"].max() * 1.1 if thresh is not None: ymin = min(ymin, thresh * 0.9) ax.set_ylim(ymin, ymax) # Set y-axis ticks for linear scale tick_range = ymax - ymin if tick_range > 10: tick_spacing = 2.0 elif tick_range > 5: tick_spacing = 1.0 else: tick_spacing = 0.5 yticks = np.arange(np.floor(ymin), np.ceil(ymax), tick_spacing) ax.set_yticks(yticks) # Plot points and collect legend handles/labels handles = [] labels = [] # Always plot surface data surface_scatter = ax.scatter( surface_data["Activity_Start_Date_Time"], surface_data["Org_Result_Value"], color="#1f77b4", # Darker blue for surface s=25, alpha=0.5, label="Surface", zorder=2, ) handles.append(surface_scatter) labels.append("Surface") # Only plot and add to legend if bottom data exists if not bottom_data.empty: bottom_scatter = ax.scatter( bottom_data["Activity_Start_Date_Time"], bottom_data["Org_Result_Value"], color="#7fbf7b", # Muted green for bottom s=25, alpha=0.5, label="Bottom", zorder=2, ) handles.append(bottom_scatter) labels.append("Bottom") # Add Hurricane Michael vertical line and annotation if within date range hurricane_date = pd.Timestamp("2018-10-10") # Get the date range of the plotted data data_start = min(param_data["Activity_Start_Date_Time"]) data_end = max(param_data["Activity_Start_Date_Time"]) # Only add hurricane line and annotation if the date falls within the data range if data_start <= hurricane_date <= data_end: # Get y-axis limits for line placement ymin, ymax = ax.get_ylim() line_height = ymax * 0.95 # Add vertical line with dot at top ax.axvline( x=hurricane_date, # type: ignore color="gray", linestyle="-", alpha=0.6, linewidth=1, ymin=0, ymax=line_height / ymax, zorder=1, ) # Add dot at top of line ax.scatter( [hurricane_date], # type: ignore [line_height], color="gray", s=25, alpha=0.8, zorder=2, ) # Add two-line annotation with bold date ax.annotate( "Oct 2018", xy=(hurricane_date, line_height), # type: ignore xytext=(5, 0), textcoords="offset points", ha="left", va="bottom", color="gray", fontsize=10, weight="bold", ) ax.annotate( "Hurricane Michael", xy=(hurricane_date, line_height), # type: ignore xytext=(5, -12), textcoords="offset points", ha="left", va="bottom", color="gray", fontsize=10, ) # Add threshold line if specified if thresh is not None: threshold_line = ax.axhline( y=thresh, color="red", linestyle=":", alpha=0.9, linewidth=1.5, label=f"Threshold: {thresh} {unit}", zorder=1, ) handles.append(threshold_line) labels.append(f"Threshold: {thresh} {unit}") # Update legend with collected handles and labels if parameter not in ["Depth, Secchi Disk Depth", "Temperature, Air"]: ax.legend( handles=handles, labels=labels, loc="upper right", frameon=True, ncol=1, bbox_to_anchor=(1.0, 1.0), handletextpad=0.5, fontsize=12, ) # Customize spines - only show bottom spine ax.spines["top"].set_visible(False) ax.spines["right"].set_visible(False) ax.spines["left"].set_visible(False) ax.spines["bottom"].set_color("black") ax.spines["bottom"].set_linewidth(0.5) # Set labels and title title = parameter if sector != "All": title += f" - {sector}" ax.set_title(title, fontsize=14) # ax.set_xlabel("Date", fontsize=12) ax.set_ylabel(f"{unit}", fontsize=12) # Add grid ax.grid(True, axis="both", alpha=0.15, linestyle="-", color="gray") # Remove tick marks but keep labels ax.tick_params(axis="y", which="both", length=0) # Format x-axis years = mdates.YearLocator() ax.xaxis.set_major_locator(years) ax.xaxis.set_major_formatter(mdates.DateFormatter("%Y")) plt.tight_layout() return (fig, param_data) @timer(include_params=True) def plot_grouped_bars( df: pd.DataFrame, parameter: str, year_range: tuple[int, int], group_by: str = "sector", ) -> tuple[Figure, pd.DataFrame]: """ Create a grouped bar chart showing means by sector or year for a selected parameter. Parameters: ----------- df : pd.DataFrame Input dataframe containing water quality measurements parameter : str Name of the parameter to plot year_range : tuple[int, int] Start and end years to include in plot group_by : str How to group the bars - either "sector" (default) or "year" Returns: -------- tuple[Figure, pd.DataFrame] - Figure: Matplotlib figure containing the grouped bar chart - DataFrame: Contains the plotted data points with means and standard errors """ # Filter data for parameter and year range plot_df = df[ (df["Org_Analyte_Name"] == parameter) & (df["Reporting_Year"] >= year_range[0]) & (df["Reporting_Year"] <= year_range[1]) ].copy() if plot_df.empty: raise ValueError( f"No data available for {parameter} between {year_range[0]}-{year_range[1]}" ) # Calculate annual means by sector means_df = ( plot_df.groupby(["Reporting_Year", "Sector"], observed=True)["Org_Result_Value"] .agg(["mean", "sem"]) .reset_index() ) # Get unique years and sectors for plotting years = sorted(means_df["Reporting_Year"].unique()) sectors = sorted(means_df["Sector"].unique()) # Determine primary and secondary categories based on grouping if group_by == "year": primary_categories = sectors secondary_categories = years x_values = years group_column = "Reporting_Year" category_column = "Sector" x_label = "Reporting Year" legend_title = "Sector" else: # group_by == "sector" primary_categories = years secondary_categories = sectors x_values = sectors # noqa: F841 group_column = "Sector" # noqa: F841 category_column = "Reporting_Year" x_label = "Sector" legend_title = "Year" # noqa: F841 n_groups = len(primary_categories) colors = [ "#E69F00", # Orange "#56B4E9", # Sky Blue "#009E73", # Bluish Green "#F0E442", # Yellow "#0072B2", # Blue "#D55E00", # Vermilion "#CC79A7", # Reddish Purple "#999999", # Gray "#F5C710", # Golden Yellow "#93AA00", # Lime Green "#482677", # Dark Purple "#DA5724", # Rust "#5082CF", # Steel Blue "#CD9BCD", # Lavender "#C1A43A", # Olive Green ] # Create figure fig, ax = plt.subplots(figsize=(12, 6)) # Calculate bar positions bar_width = 0.8 / n_groups # Standard bar width # Calculate center positions for x-axis labels group_centers = ( np.arange(len(secondary_categories)) + (bar_width * (n_groups - 1)) / 2 ) # Plot bars for each primary category for i, (category, color) in enumerate(zip(primary_categories, colors)): category_data = means_df[means_df[category_column] == category] # Create bars with simple offset calculation bars = ax.bar( # noqa: F841 np.arange(len(secondary_categories)) + i * bar_width, category_data["mean"], bar_width, label=str(category), color=color, alpha=0.7, zorder=2, ) # Add error bars ax.errorbar( np.arange(len(secondary_categories)) + i * bar_width, category_data["mean"], yerr=category_data["sem"], fmt="none", color="black", capsize=3, capthick=1, linewidth=1, alpha=0.5, zorder=3, ) # Customize plot unit = plot_df["Org_Result_Unit"].iloc[0] ax.set_xlabel(x_label) title = f"{parameter} (Mean Annual{' ' + unit if unit else ''})" ax.set_title(title) # Function to wrap text def wrap_labels(text, width=10): """Wrap text at specified width using textwrap.""" # Convert to string and wrap if needed text_str = str(text) if len(text_str) > width: return textwrap.fill(text_str, width=width) return text_str # Set x-axis ticks and labels with wrapping using centered positions ax.set_xticks(group_centers) wrapped_labels = [wrap_labels(str(label)) for label in secondary_categories] ax.set_xticklabels( wrapped_labels, ha="center", va="top", rotation=0, ) # Remove x-axis tick marks ax.tick_params(axis="x", length=0) # Add error bar note with adjusted position ax.text( 0.99, -0.15, "Error bars represent ±1 standard error of the mean", ha="right", va="top", transform=ax.transAxes, fontsize=9, fontstyle="italic", ) # Adjust layout with more vertical space for wrapped labels plt.tight_layout(rect=(0, 0.2, 1, 1)) # Add grid ax.grid(True, axis="y", alpha=0.2, linestyle="-", zorder=1) # Customize spines ax.spines["top"].set_visible(False) ax.spines["right"].set_visible(False) ax.spines["left"].set_visible(False) # Remove tick marks but keep labels ax.tick_params(axis="y", which="both", length=0) ax.legend( bbox_to_anchor=(1.02, 1), # Position at top-right loc="upper left", frameon=False, ncol=1, handletextpad=0.5, fontsize=9, ) # Determine if log scale should be used if parameter in [ # "Turbidity", "Fecal Coliform (MPN)", "Total Nitrogen", "Total Phosphorus", ]: ax.set_yscale("log") ax.yaxis.set_major_formatter(plt.ScalarFormatter()) # type: ignore means_df.insert(0, "parameter", parameter) return fig, means_df def plot_seasonal_line( df: pd.DataFrame, parameter: str, period: str = "quarterly", thresh: float | None = None, sector: str | None = None, ) -> tuple[Figure, pd.DataFrame, pd.DataFrame]: """ Create a line chart showing seasonal trends for a parameter across all years. Parameters: ----------- df : pd.DataFrame Input dataframe containing measurements parameter : str Name of the parameter to plot period : str 'monthly' or 'quarterly' aggregation period thresh : float | None Optional threshold value to display on plot sector : str | None Optional sector name to include in title Returns: -------- tuple[Figure, pd.DataFrame] - Figure: Matplotlib figure containing the plot - DataFrame: Filtered dataframe containing the data used in the plot - DataFrame: Stats dataframe containing the mean, min, max, and overall average """ # Filter for parameter data param_data = df[df["Org_Analyte_Name"] == parameter].copy() if param_data.empty: raise ValueError(f"No data found for parameter: {parameter}") # Add month and quarter columns param_data["Month"] = param_data["Activity_Start_Date_Time"].dt.month param_data["Quarter"] = param_data["Activity_Start_Date_Time"].dt.quarter # Group by period if period.lower() == "monthly": group_col = "Month" x_ticks = range(1, 13) x_label = "Month" else: # quarterly group_col = "Quarter" x_ticks = range(1, 5) x_label = "Quarter" # Calculate means, min, and max stats_df = ( param_data.groupby(group_col, observed=True)["Org_Result_Value"] .agg(["mean", "min", "max"]) .reset_index() ) # Calculate overall average for dotted line stats_df["overall_avg"] = param_data["Org_Result_Value"].mean() fig, ax = plt.subplots(figsize=(10, 6)) # Get the unit unit = param_data["Org_Result_Unit"].iloc[0] # Set log scale for specific parameters if parameter in [ "Turbidity", "Fecal Coliform (MPN)", "Total Nitrogen", "Total Phosphorus", ]: ax.set_yscale("log") ax.yaxis.set_major_formatter( plt.ScalarFormatter() # type: ignore ) # Plot mean line mean_line = ax.plot( stats_df[group_col], stats_df["mean"], "b-", linewidth=2, marker="s", label="Mean", zorder=3, )[0] # Add label at the beginning of mean line ax.annotate( "Mean", xy=(stats_df[group_col].iloc[0], stats_df["mean"].iloc[0]), xytext=(-5, 0), textcoords="offset points", ha="right", va="center", color=mean_line.get_color(), fontsize=9, ) # Plot min line min_line = ax.plot( stats_df[group_col], stats_df["min"], "--", color="gray", linewidth=1, label="Min", zorder=2, )[0] # Add label at the end of min line ax.annotate( "Min", xy=(stats_df[group_col].iloc[-1], stats_df["min"].iloc[-1]), xytext=(5, 0), textcoords="offset points", va="center", color=min_line.get_color(), fontsize=9, ) # Plot max line max_line = ax.plot( stats_df[group_col], stats_df["max"], "--", color="orange", linewidth=1, label="Max", zorder=2, )[0] # Add label at the end of max line ax.annotate( "Max", xy=(stats_df[group_col].iloc[-1], stats_df["max"].iloc[-1]), xytext=(5, 0), textcoords="offset points", va="center", color=max_line.get_color(), fontsize=9, ) # Add overall average line avg_value = stats_df["overall_avg"].iloc[0] ax.axhline( y=avg_value, color="blue", linestyle=":", alpha=0.5, linewidth=1, label="Average", zorder=1, ) # Add label for overall average below the line ax.annotate( "Average", xy=(stats_df[group_col].iloc[-1], avg_value), xytext=(27, -5), # Moved down 5 points textcoords="offset points", va="top", # Text aligns above the point ha="right", # Right-align the text color="blue", alpha=0.5, fontsize=9, ) # Remove the legend if it exists legend = ax.get_legend() if legend is not None: legend.remove() # Add threshold line if specified if thresh is not None: ax.axhline( y=thresh, color="red", linestyle=":", alpha=0.9, linewidth=1.5, label=f"Threshold: {thresh} {unit}", zorder=1, ) # Add legend for threshold only ax.legend( [ ax.axhline( y=thresh, color="red", linestyle=":", alpha=0.9, linewidth=1.5 ) ], [f"Threshold: {thresh} {unit}"], loc="upper right", frameon=False, handletextpad=0.5, fontsize=9, ) # Customize plot ax.set_xticks(x_ticks) if period.lower() == "quarterly": # Convert quarters to seasons season_labels = ["Spring", "Summer", "Fall", "Winter"] ax.set_xticklabels(season_labels) # Remove x-axis tick marks for quarterly view ax.tick_params(axis="x", which="both", length=0) ax.set_xlabel(x_label) # Add secondary y-axis for temperature if unit is Celsius if unit == "deg C": def celsius_to_fahrenheit(temp_c): return (temp_c * 9 / 5) + 32 # Get the primary y-axis limits y1_min, y1_max = ax.get_ylim() # Create secondary axis that aligns with primary axis values ax2 = ax.secondary_yaxis( "right", functions=(celsius_to_fahrenheit, lambda f: (f - 32) * 5 / 9), # type: ignore ) # Set the same limits as primary axis but converted to Fahrenheit ax2.set_ylim(celsius_to_fahrenheit(y1_min), celsius_to_fahrenheit(y1_max)) # Get primary axis ticks and convert them for secondary axis primary_ticks = ax.get_yticks() ax2.set_yticks([celsius_to_fahrenheit(t) for t in primary_ticks]) # Format tick labels with degree symbols ax.yaxis.set_major_formatter(lambda x, p: f"{x:.0f}°C") ax2.yaxis.set_major_formatter(lambda x, p: f"{x:.0f}°F") # Remove right spine for consistency ax2.spines["right"].set_visible(False) # Remove tick marks but keep labels ax2.tick_params(axis="y", which="both", length=0) # Add secondary y-axis for depth if unit is feet elif unit == "ft": def feet_to_meters(feet): return feet * 0.3048 ax2 = ax.secondary_yaxis( "right", functions=(feet_to_meters, lambda m: m / 0.3048), # type: ignore ) ax2.set_ylabel("Depth (m)") ax.set_ylabel("Depth (ft)") # Remove right spine for consistency ax2.spines["right"].set_visible(False) # Remove tick marks but keep labels ax2.tick_params(axis="y", which="both", length=0) else: ax.set_ylabel(f"{unit}") # Get year range for title start_year = param_data["Activity_Start_Date_Time"].dt.year.min() end_year = param_data["Activity_Start_Date_Time"].dt.year.max() year_range = ( f" ({start_year}-{end_year})" if start_year != end_year else f" ({start_year})" ) title = f"Seasonal {parameter} Trends{year_range}" if sector: title = f"{title} - {sector}" ax.set_title(title) ax.grid(True, axis="y", alpha=0.15, linestyle="-", color="gray") # Customize spines ax.spines["top"].set_visible(False) ax.spines["right"].set_visible(False) ax.spines["left"].set_visible(False) # Remove tick marks but keep labels ax.tick_params(axis="y", which="both", length=0) # Adjust layout based on unit type if unit == "deg C": plt.tight_layout(rect=(0, 0, 0.95, 1)) else: plt.tight_layout(rect=(0, 0, 0.9, 1)) stats_df.insert(0, "parameter", parameter) return fig, param_data, stats_df @timer(include_params=True) def plot_sector_line_charts( df: pd.DataFrame, parameter: str, show_sem: bool = True, panel_chart: bool = False, color_scale: list[str] = COLOR_SCALE, ) -> tuple[Figure, pd.DataFrame, pd.DataFrame]: """ Create a plot of mean annual parameter trends by sector. Parameters: ----------- df : pd.DataFrame Input dataframe parameter : str Name of the parameter to plot show_sem : bool, default=True Whether to show the standard error of the mean bands panel_chart : bool, default=False If True, creates a grid of individual sector charts instead of overlapping lines Returns: -------- tuple[Figure, pd.DataFrame, pd.DataFrame] - Figure: Matplotlib figure containing the line chart(s) - DataFrame: Filtered dataframe containing the data used in the plot - DataFrame: Contains the plotted data points with means and standard errors """ GREY10 = "#1a1a1a" # noqa: F841 GREY30 = "#4d4d4d" # noqa: F841 GREY40 = "#666666" # noqa: F841 GREY75 = "#bfbfbf" # noqa: F841 GREY91 = "#e8e8e8" # noqa: F841 # 1. Data preparation param_data = df[df["Org_Analyte_Name"] == parameter].copy() if parameter == "Salinity": param_data = param_data[param_data["Sector"] != "Freshwater Lakes"] sectors = sorted(param_data["Sector"].unique()) years = sorted(param_data["Reporting_Year"].unique()) param_unit = param_data["Org_Result_Unit"].iloc[0] if not param_data.empty else "" # 2. Compute all sector data sector_data_dict = {} for sector in sectors: sector_data = ( param_data[param_data["Sector"] == sector] .groupby("Reporting_Year", observed=True)["Org_Result_Value"] .agg(["mean", "sem"]) .reset_index() ) sector_data["Sector"] = sector sector_data_dict[sector] = sector_data # 3. Determine global y-limits use_log_scale = parameter in [ "Turbidity", "Fecal Coliform (MPN)", "Total Nitrogen", "Total Phosphorus", ] y_min = float("inf") y_max = float("-inf") for data in sector_data_dict.values(): if not data.empty: y_min = min(y_min, (data["mean"] - data["sem"]).min()) y_max = max(y_max, (data["mean"] + data["sem"]).max()) # Add padding to y-axis limits if use_log_scale: y_min = y_min / 1.2 y_max = y_max * 1.2 else: y_range = y_max - y_min y_min = y_min - (y_range * 0.05) y_max = y_max + (y_range * 0.05) # 4. Create figure and determine layout if panel_chart: n_cols = min(3, len(sectors)) n_rows = (len(sectors) + n_cols - 1) // n_cols fig = plt.figure(figsize=(5 * n_cols, 3 * n_rows)) else: fig, main_ax = plt.subplots(figsize=(14, 4)) # 5. Helper function to plot a single sector def plot_sector_on_axis( ax: plt.Axes, # type: ignore sector_data: pd.DataFrame, color: str, show_label: bool = False, ): line = ax.plot( sector_data["Reporting_Year"], sector_data["mean"], "-o", color=color, label=sector if show_label else None, markersize=4, linewidth=2, ) if show_sem: ax.fill_between( sector_data["Reporting_Year"], sector_data["mean"] - sector_data["sem"], sector_data["mean"] + sector_data["sem"], color=color, alpha=0.15, ) # Configure axis ax.grid(True, axis="y", which="major", alpha=0.2, linestyle="--") ax.grid(True, axis="y", which="minor", alpha=0.1, linestyle="--") ax.spines["top"].set_visible(False) ax.spines["right"].set_visible(False) ax.spines["left"].set_visible(False) ax.spines["bottom"].set_color(GREY40) ax.tick_params(axis="both", which="both", length=0, colors=GREY40) ax.set_xticks(years) if use_log_scale: ax.set_yscale("log") ax.set_ylim(y_min, y_max) def format_func(x, _): # Determine if we need decimal places based on data range min_value = min(sector_data["mean"].min(), y_min) needs_decimals = min_value < 1 or not all( val.is_integer() for val in sector_data["mean"] ) if x == 0: return "0" elif needs_decimals: return f"{x:.1f}" else: return f"{int(x)}" ax.yaxis.set_major_formatter(plt.FuncFormatter(format_func)) # type: ignore # Calculate the range ratio and absolute values range_ratio = y_max / y_min abs_min = min(abs(sector_data["mean"].min()), abs(y_min)) abs_max = max(abs(sector_data["mean"].max()), abs(y_max)) if parameter == "Total Phosphorus": # Custom ticks for Total Phosphorus major_ticks = np.array([10, 13, 15, 17, 20, 30, 40, 50]) major_ticks = major_ticks[ (major_ticks >= y_min * 0.9) & (major_ticks <= y_max * 1.1) ] ax.yaxis.set_major_locator(plt.FixedLocator(major_ticks)) # type: ignore ax.yaxis.set_minor_locator(plt.NullLocator()) # type: ignore elif abs_min >= 100: # For larger numbers (e.g., Total Nitrogen) major_ticks = np.array([100, 200, 300, 400, 500]) major_ticks = major_ticks[ (major_ticks >= y_min * 0.9) & (major_ticks <= y_max * 1.1) ] ax.yaxis.set_major_locator(plt.FixedLocator(major_ticks)) # type: ignore ax.yaxis.set_minor_locator(plt.NullLocator()) # type: ignore elif abs_min >= 10 and abs_max <= 100: # For medium numbers (excluding Total Phosphorus) major_ticks = np.array([10, 20, 30, 40, 50, 60, 70, 80, 90, 100]) major_ticks = major_ticks[ (major_ticks >= y_min * 0.9) & (major_ticks <= y_max * 1.1) ] ax.yaxis.set_major_locator(plt.FixedLocator(major_ticks)) # type: ignore ax.yaxis.set_minor_locator(plt.NullLocator()) # type: ignore elif range_ratio > 10: # Wide range but smaller numbers (e.g., Turbidity) ax.yaxis.set_major_locator(plt.LogLocator(base=10.0, numticks=5)) # type: ignore ax.yaxis.set_minor_locator( plt.LogLocator(base=10.0, subs=(2, 5), numticks=5) # type: ignore ) ax.yaxis.set_minor_formatter(plt.FuncFormatter(format_func)) # type: ignore else: # Narrow range with small numbers if y_min < 1: major_ticks = np.array([0.5, 1, 1.5, 2, 2.5, 3, 4, 5]) else: major_ticks = np.arange( np.floor(y_min), np.ceil(y_max) + 1, 1 if y_max - y_min < 5 else 2, ) major_ticks = major_ticks[ (major_ticks >= y_min * 0.9) & (major_ticks <= y_max * 1.1) ] ax.yaxis.set_major_locator(plt.FixedLocator(major_ticks)) # type: ignore ax.yaxis.set_minor_locator(plt.NullLocator()) # type: ignore # Adjust tick parameters ax.tick_params(axis="y", which="both", labelsize=9) else: ax.set_ylim(y_min, y_max) # Determine if we need decimal places for linear scale min_value = min(sector_data["mean"].min(), y_min) needs_decimals = min_value < 1 or not all( val.is_integer() for val in sector_data["mean"] ) def linear_format_func(x, _): if needs_decimals: return f"{x:.1f}" return f"{int(x)}" ax.yaxis.set_major_formatter(plt.FuncFormatter(linear_format_func)) # type: ignore return line # 6. Plot sectors # custom_colors = [ # "#1f77b4", # "#ff7f0e", # "#2ca02c", # "#d62728", # "#9467bd", # "#8c564b", # "#e377c2", # "#7f7f7f", # ] for i, (sector, color) in enumerate(zip(sectors, color_scale)): sector_data = sector_data_dict[sector] if sector_data.empty: continue if panel_chart: ax = fig.add_subplot(n_rows, n_cols, i + 1) plot_sector_on_axis(ax, sector_data, color) ax.set_title(sector, pad=10, fontsize=10, color=GREY30) # Limit number of x-axis ticks to maximum of 8 if len(years) > 8: # Show roughly every nth tick to get 8 or fewer ticks n = len(years) // 8 + 1 visible_ticks = years[::n] ax.set_xticks(visible_ticks) ax.set_xticklabels(visible_ticks, rotation=0, weight=500, color=GREY40) # Show tick marks since we're hiding some labels ax.tick_params(axis="x", which="major", length=4, colors=GREY40) else: ax.set_xticklabels(years, rotation=0, weight=500, color=GREY40) # Hide tick marks when showing all labels ax.tick_params(axis="x", which="major", length=0) else: plot_sector_on_axis(main_ax, sector_data, color, show_label=True) # 7. Final customization if panel_chart: title = f"{parameter}{' (' + param_unit + ')' if param_unit else ''}" fig.suptitle(title, fontsize=14, y=1.02, color=GREY30) # Updated color else: main_ax.set_title( parameter, pad=10, fontsize=14, fontweight="normal", color=GREY30 ) # Updated color main_ax.set_ylabel(param_unit, fontsize=12, color=GREY40) main_ax.set_xticklabels(years, weight=500, color=GREY40) main_ax.yaxis.label.set_color(GREY40) main_ax.legend( bbox_to_anchor=(1.05, 1), loc="upper left", borderaxespad=0.0, frameon=False, fontsize=9, ) if use_log_scale: main_ax.yaxis.set_major_formatter(plt.ScalarFormatter()) # type: ignore main_ax.yaxis.get_major_formatter().set_scientific(False) # type: ignore plt.tight_layout() # 8. Prepare return data plot_data = pd.concat(sector_data_dict.values(), ignore_index=True) plot_data.insert(0, "parameter", parameter) return fig, param_data, plot_data @timer(include_params=True) def plot_sector_box_charts( df: pd.DataFrame, parameter: str, color_scale: list[str] = COLOR_SCALE, show_trend: bool = True, # New parameter ) -> tuple[Figure, pd.DataFrame, pd.DataFrame]: """ Create box plots showing the distribution of parameter values by sector and year, with optional trend lines and statistics. Parameters: ----------- df : pd.DataFrame Input dataframe parameter : str Parameter to plot color_scale : list[str] List of colors to use for sectors show_trend : bool, default=True Whether to show trend lines and statistics Returns: -------- tuple[Figure, pd.DataFrame, pd.DataFrame] - Figure: Matplotlib figure containing the box plots - DataFrame: Filtered dataframe containing the raw data used in the plot - DataFrame: Contains the plotted data points: mean, median, and quartiles """ from scipy import stats # Define consistent colors for styling GREY30 = "#4d4d4d" GREY40 = "#666666" # Filter data for parameter param_data = df[df["Org_Analyte_Name"] == parameter].copy() # For Salinity, exclude Fresh Water Lakes if parameter == "Salinity": param_data = param_data[param_data["Sector"] != "Freshwater Lakes"] # Calculate year and prepare data param_data["Reporting_Year"] = param_data["Activity_Start_Date_Time"].dt.year sectors = sorted(param_data["Sector"].unique()) years = sorted(param_data["Reporting_Year"].unique()) # Determine if log scale should be used use_log_scale = parameter in [ "Turbidity", "Fecal Coliform (MPN)", "Total Nitrogen", "Total Phosphorus", ] # Create figure with single column layout - increased width from 8 to 12 fig = plt.figure(figsize=(15, 2.5 * len(sectors))) # Create box plots for idx, sector in enumerate(sectors): ax = plt.subplot(len(sectors), 1, idx + 1) sector_data = param_data[param_data["Sector"] == sector] bp = ax.boxplot( # noqa: F841 [ sector_data[sector_data["Reporting_Year"] == year][ "Org_Result_Value" ].dropna() for year in years ], labels=years, # type: ignore patch_artist=True, medianprops=dict(color="black"), flierprops=dict( marker="o", markerfacecolor=color_scale[idx], alpha=0.5, markersize=4, ), boxprops=dict(facecolor=color_scale[idx], alpha=0.6), widths=0.6, positions=range(len(years)), ) # Only add trend line and stats if show_trend is True if show_trend: # Calculate annual means for trend line annual_means = [ sector_data[sector_data["Reporting_Year"] == year][ "Org_Result_Value" ].mean() for year in years ] # Remove any NaN values for regression valid_points = [ (x, y) for x, y in enumerate(annual_means) if not np.isnan(y) ] if valid_points: x_valid, y_valid = zip(*valid_points) # Perform linear regression slope, intercept, r_value, p_value, std_err = stats.linregress( x_valid, y_valid ) # Plot trend line line_x = np.array(x_valid) line_y = slope * line_x + intercept ax.plot(line_x, line_y, "--", color="red", alpha=0.7, linewidth=1.5) # Add statistics text stats_text = f"R² = {r_value**2:.3f}\np = {p_value:.3f}" # type: ignore ax.text( 0.02, 0.98, stats_text, transform=ax.transAxes, verticalalignment="top", fontsize=8, bbox=dict(facecolor="white", alpha=0.8, edgecolor="none"), ) # Set proper x-axis limits with padding ax.set_xlim(-0.5, len(years) - 0.5) ax.set_title(sector, pad=10, fontsize=10, color=GREY30) if use_log_scale: ax.set_yscale("log") # Customize appearance ax.grid(True, axis="y", alpha=0.15, linestyle="-", color="gray") ax.spines["top"].set_visible(False) ax.spines["right"].set_visible(False) ax.spines["left"].set_visible(False) ax.spines["bottom"].set_color(GREY40) ax.spines["bottom"].set_linewidth(0.5) # Customize tick parameters ax.tick_params(axis="both", which="both", length=0, colors=GREY40) ax.set_xticks(range(len(years))) ax.set_xticklabels(years, ha="center", weight=500, color=GREY40) # Add overall title fig.suptitle( f"{parameter} Distribution by Sector", fontsize=14, y=1.02, color=GREY30 ) # Adjust layout - removed bottom adjustment since we no longer have rotated labels plt.tight_layout() plt.subplots_adjust(hspace=0.4) # Create stats DataFrame to store box plot statistics stats_data = [] for sector in sectors: sector_data = param_data[param_data["Sector"] == sector] for year in years: year_data = sector_data[sector_data["Reporting_Year"] == year][ "Org_Result_Value" ] if not year_data.empty: stats = { "Sector": sector, "Reporting_Year": year, "mean": year_data.mean(), "median": year_data.median(), "q1": year_data.quantile(0.25), "q3": year_data.quantile(0.75), "min": year_data.min(), "max": year_data.max(), "count": len(year_data), } stats_data.append(stats) # Create stats DataFrame and add parameter column stats_df = pd.DataFrame(stats_data) stats_df.insert(0, "parameter", parameter) return fig, param_data, stats_df @timer(include_params=True) def plot_sector_heatmap( df: pd.DataFrame, parameter: str, show_values: bool = False, ) -> tuple[Figure, pd.DataFrame, pd.DataFrame]: """ Create a heatmap showing annual means by sector and year. Parameters: ----------- df : pd.DataFrame Input dataframe parameter : str Name of the parameter to plot show_values : bool, default=False Whether to display mean values inside each cell Returns: -------- tuple[Figure, pd.DataFrame, pd.DataFrame] - Figure: Matplotlib figure containing the heatmap - DataFrame: Filtered dataframe containing the raw data used in the plot - DataFrame: Contains the plotted data points: mean values for each sector and year """ # Filter data for selected parameter param_data = df[df["Org_Analyte_Name"] == parameter].copy() # For Salinity, exclude Fresh Water Lakes if parameter == "Salinity": param_data = param_data[param_data["Sector"] != "Fresh Water Lakes"] # Calculate annual means plot_data = ( param_data.groupby(["Reporting_Year", "Sector"], observed=True)[ "Org_Result_Value" ] .mean() .reset_index() .pivot(index="Sector", columns="Reporting_Year", values="Org_Result_Value") ) # Create figure with extra space at bottom for colorbar fig, ax = plt.subplots(figsize=(12, len(plot_data) * 0.8)) # Create heatmap with small gaps between cells im = ax.imshow(plot_data, aspect="auto", cmap="YlOrRd") # Customize appearance ax.set_xticks(np.arange(len(plot_data.columns))) ax.set_yticks(np.arange(len(plot_data.index))) ax.set_xticklabels(plot_data.columns) ax.set_yticklabels(plot_data.index) # Remove all spines ax.spines["top"].set_visible(False) ax.spines["right"].set_visible(False) ax.spines["left"].set_visible(False) ax.spines["bottom"].set_visible(False) # Remove all tick marks but keep labels ax.tick_params(axis="both", which="both", length=0) # Add small gaps between cells ax.set_xticks(np.arange(plot_data.shape[1] + 1) - 0.5, minor=True) ax.set_yticks(np.arange(plot_data.shape[0] + 1) - 0.5, minor=True) ax.grid(which="minor", color="w", linestyle="-", linewidth=2) # Set x-axis labels horizontal plt.setp(ax.get_xticklabels(), rotation=0) # Add value annotations if requested if show_values: for i in range(len(plot_data.index)): for j in range(len(plot_data.columns)): value = plot_data.iloc[i, j] if not pd.isna(value): text = f"{value:.1f}" ax.text(j, i, text, ha="center", va="center", color="black") # Add colorbar at the bottom with reduced padding and no border cbar = ax.figure.colorbar(im, ax=ax, orientation="horizontal", pad=0.1) # type: ignore unit = param_data["Org_Result_Unit"].iloc[0] if not param_data.empty else "" cbar.ax.set_xlabel(f"Mean ({unit})") cbar.outline.set_visible(False) # type: ignore # Set title ax.set_title(parameter) plt.tight_layout() # Reset index to make Sector a column and add parameter column plot_data = plot_data.reset_index() plot_data.insert(0, "parameter", parameter) return fig, param_data, plot_data