import math
import sys
import textwrap
from pathlib import Path
import altair as alt
import contextily as ctx
import geopandas as gpd
import matplotlib.dates as mdates
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import plotly.graph_objects as go
import scipy.stats as stats
import seaborn as sns
import streamlit as st
from matplotlib.colors import LinearSegmentedColormap
from matplotlib.figure import Figure
from osgeo import gdal
from plotly.subplots import make_subplots
from utils.data_loading import timer
COLOR_SCALE = [
"#6D3E91",
"#C05917",
"#58AC8C",
"#286BBB",
"#883039",
"#BC8E5A",
"#00295B",
"#C15065",
"#18470F",
"#9A5129",
"#E56E5A",
"#A2559C",
"#38AABA",
"#578145",
"#970046",
"#00847E",
"#B13507",
"#4C6A9C",
"#CF0A66",
"#00875E",
"#B16214",
"#8C4569",
"#3B8E1D",
"#D73C50",
]
@st.cache_data
@timer(include_params=True)
def plot_trends_by_station(
df: pd.DataFrame, analyte_names: list[str], sample_position: str, figsize=(15, 12)
) -> Figure:
"""
Create subplots of analyte trends for the given dataframe and analytes.
Parameters:
-----------
df : pandas DataFrame
The filtered dataframe containing data for a specific station and position
analyte_names : list[str]
List of analyte names to plot
figsize : tuple
Figure size in inches (width, height)
"""
# Calculate number of rows needed (2 columns)
n_rows = (len(analyte_names) + 1) // 2
fig, axes = plt.subplots(n_rows, 2, figsize=figsize)
axes = axes.flatten() # Flatten axes array for easier indexing
station_number = df["Station_Number"].iloc[0]
station_name = df["Name"].iloc[0]
if sample_position == "All":
sample_position_label = "Surface and Bottom"
else:
sample_position_label = sample_position
for idx, analyte_name in enumerate(analyte_names):
ax = axes[idx]
data = (
df[df["Org_Analyte_Name"] == analyte_name]
.assign(
Year=lambda df: (
df["Reporting_Year"]
if "Reporting_Year" in df.columns
else df["Activity_Start_Date_Time"].dt.year
)
)
.dropna(subset=["Org_Result_Value"])
)
if data.empty:
ax.text(
0.5,
0.5,
f"No data available for {analyte_name}",
ha="center",
va="center",
)
continue
# Determine if log scale should be used
log_scale_analytes = [
"Turbidity",
"Fecal Coliform (MPN)",
"Total Nitrogen",
"Total Phosphorus",
]
log_scale = analyte_name in log_scale_analytes
if log_scale:
ax.set_yscale("log")
ax.yaxis.set_major_formatter(plt.ScalarFormatter()) # type: ignore
# Create box plot
groups = data.groupby("Year", observed=True)
positions = np.array(list(groups.groups.keys()))
group_data = [group["Org_Result_Value"] for name, group in groups]
ax.boxplot(
group_data,
positions=positions,
widths=0.6,
patch_artist=True,
boxprops=dict(facecolor="lightblue", color="blue", alpha=0.5),
medianprops=dict(color="blue"),
whiskerprops=dict(color="blue"),
capprops=dict(color="blue"),
flierprops=dict(color="blue", markeredgecolor="blue", alpha=0.5),
)
# Calculate and plot trend line
yearly_means = data.groupby("Year", observed=True)["Org_Result_Value"].mean()
X = yearly_means.index.values.reshape(-1, 1)
y = yearly_means.values
# Plot means
ax.plot(X, y, "bo-", linewidth=1, markersize=4, label="Annual Mean")
# Calculate trend line
if len(X) > 1: # Only calculate trend if we have more than one point
slope, intercept, r_value, p_value, std_err = stats.linregress(X.ravel(), y)
trend_line = slope * X.ravel() + intercept
ax.plot(X, trend_line, "r--", alpha=0.8, linewidth=1, label="Trend")
# Add statistics
stats_text = f"R²={r_value**2:.3f}\np={p_value:.3f}" # type: ignore
ax.text(
0.02,
0.98,
stats_text,
transform=ax.transAxes,
verticalalignment="top",
bbox=dict(boxstyle="round", facecolor="white", alpha=0.8),
parse_math=False,
)
# Customize subplot
ax.set_title(f"{analyte_name}", pad=15)
ax.set_xlabel("Year")
analyte_unit = data["Org_Result_Unit"].iloc[0]
if analyte_name == "Depth, Secchi Disk Depth":
y_label = f"Depth ({analyte_unit})"
elif analyte_name == "pH":
y_label = None
elif analyte_name.startswith("Dissolved"):
y_label = f"DO ({analyte_unit})"
elif analyte_name.startswith("Fecal Coliform"):
y_label = f"Fecal Coliform ({analyte_unit})"
else:
y_label = f"{analyte_name} ({analyte_unit})"
ax.set_ylabel(y_label)
ax.grid(True, alpha=0.3)
# Add sample sizes
for year, group in groups:
ax.text(
year,
ax.get_ylim()[1],
f"n={len(group)}",
ha="center",
va="bottom",
fontsize=8,
)
# Remove any unused subplots
for idx in range(len(analyte_names), len(axes)):
fig.delaxes(axes[idx])
# Add overall title with more space
fig.suptitle(
f"Water Quality Trends for {station_number} - {station_name} - {sample_position_label}",
fontsize=14,
y=0.95,
)
# Adjust layout with more space
plt.tight_layout(rect=(0, 0, 1, 0.95))
return fig
@timer(include_params=True)
def altair_plot_sector_trends(
df: pd.DataFrame, analyte_names: list[str]
) -> alt.VConcatChart:
"""
Create plots of mean annual analyte trends by sector using Altair.
Parameters:
-----------
df : pd.DataFrame
Input dataframe
analyte_names : list[str]
List of analytes to plot
Returns:
--------
alt.VConcatChart
Vertically concatenated Altair charts for each analyte
"""
# Custom color scheme matching the matplotlib version
color_scale = alt.Scale(
domain=df["Sector"].unique().tolist(),
range=[
"#1f77b4", # blue
"#ff7f0e", # orange
"#2ca02c", # green
"#d62728", # red
"#9467bd", # purple
"#8c564b", # brown
"#e377c2", # pink
"#7f7f7f", # gray
],
)
charts = []
for analyte_name in analyte_names:
# Filter data for current analyte
analyte_data = df[df["Org_Analyte_Name"] == analyte_name].copy()
# For Salinity, exclude Fresh Water Lakes
if analyte_name == "Salinity":
analyte_data = analyte_data[analyte_data["Sector"] != "Fresh Water Lakes"]
# Calculate annual means and standard errors using Reporting_Year
processed_data = (
analyte_data.groupby(["Reporting_Year", "Sector"], observed=True)[
"Org_Result_Value"
]
.agg(["mean", "sem"])
.reset_index()
.rename(columns={"mean": "Mean", "sem": "SE"})
)
# Add confidence interval bounds
processed_data["Upper"] = processed_data["Mean"] + processed_data["SE"]
processed_data["Lower"] = processed_data["Mean"] - processed_data["SE"]
# Get the unit for the y-axis label
unit = analyte_data["Org_Result_Unit"].iloc[0] if not analyte_data.empty else ""
# Determine if log scale should be used
use_log_scale = analyte_name in [
"Turbidity",
"Fecal Coliform (MPN)",
"Total Nitrogen",
"Total Phosphorus",
]
# Create base chart
base = alt.Chart(processed_data).encode(
x=alt.X("Reporting_Year:O", axis=alt.Axis(title=None)),
color=alt.Color("Sector:N", scale=color_scale),
tooltip=[
alt.Tooltip("Reporting_Year:O"),
alt.Tooltip("Sector:N"),
alt.Tooltip("Mean:Q", format=".2f"),
alt.Tooltip("SE:Q", format=".2f"),
],
)
# Create line and point layers
lines = base.mark_line().encode(
y=alt.Y(
"Mean:Q",
title=f"({unit})",
scale=alt.Scale(type="log" if use_log_scale else "linear"),
)
)
points = base.mark_point(size=50).encode(y=alt.Y("Mean:Q"))
# Create confidence interval area
area = base.mark_area(opacity=0.15).encode(
y=alt.Y("Lower:Q"), y2=alt.Y2("Upper:Q")
)
# Combine layers
chart = (
(area + lines + points)
.properties(
width=600,
height=300,
title=alt.TitleParams(text=analyte_name, anchor="middle", fontSize=14),
)
.interactive()
)
charts.append(chart)
# Combine all charts vertically
final_chart = alt.vconcat(*charts).configure(
view={"strokeWidth": 0}, axis={"grid": True, "gridOpacity": 0.2}
)
return final_chart
def plotly_plot_analyte_trends(df: pd.DataFrame, analyte_names: list[str]) -> go.Figure:
"""
Create subplots of analyte trends using Plotly for the given dataframe and analytes.
Parameters:
-----------
df : pandas DataFrame
The filtered dataframe containing data for a specific station and position
analyte_names : list[str]
List of analyte names to plot
Returns:
--------
go.Figure
Plotly figure containing the subplots
"""
# Calculate number of rows needed (2 columns)
n_rows = (len(analyte_names) + 1) // 2
# Create subplot figure
fig = make_subplots(
rows=n_rows,
cols=2,
subplot_titles=analyte_names,
vertical_spacing=0.12,
horizontal_spacing=0.1,
)
station_number = df["Station_Number"].iloc[0]
sample_position = df["Sample_Position"].iloc[0]
for idx, analyte_name in enumerate(analyte_names):
row = idx // 2 + 1
col = idx % 2 + 1
data = (
df[df["Org_Analyte_Name"] == analyte_name]
.assign(Year=lambda df: df["Activity_Start_Date_Time"].dt.year)
.dropna(subset=["Org_Result_Value"])
)
if data.empty:
fig.add_annotation(
text=f"No data available for {analyte_name}",
xref=f"x{idx+1}",
yref=f"y{idx+1}",
x=0.5,
y=0.5,
showarrow=False,
row=row,
col=col,
)
continue
# Determine if log scale should be used
log_scale = analyte_name in ["Turbidity", "Fecal Coliform (MPN)"]
# Create box plot
groups = data.groupby("Year", observed=True)
years = list(groups.groups.keys())
# Add box plot
fig.add_trace(
go.Box(
x=data["Year"],
y=data["Org_Result_Value"],
name="Box Plot",
boxpoints="outliers",
line=dict(color="blue"),
fillcolor="lightblue",
showlegend=False,
),
row=row,
col=col,
)
# Calculate and plot means
yearly_means = data.groupby("Year", observed=True)["Org_Result_Value"].mean()
# Add mean line
fig.add_trace(
go.Scatter(
x=years,
y=yearly_means.values,
mode="lines+markers",
name="Annual Mean",
line=dict(color="blue"),
showlegend=False,
),
row=row,
col=col,
)
# Calculate and add trend line
if len(years) > 1:
X = np.array(years)
y = yearly_means.values
slope, intercept, r_value, p_value, std_err = stats.linregress(X, y)
trend_line = slope * X + intercept
fig.add_trace(
go.Scatter(
x=years,
y=trend_line,
mode="lines",
name="Trend",
line=dict(color="red", dash="dash"),
showlegend=False,
),
row=row,
col=col,
)
# Add statistics annotation
stats_text = f"R² = {r_value**2:.3f}
p = {p_value:.3f}" # type: ignore
fig.add_annotation(
text=stats_text,
xref=f"x{idx+1}",
yref=f"y{idx+1}",
x=min(years), # type: ignore
y=max(data["Org_Result_Value"]),
showarrow=False,
bgcolor="white",
bordercolor="black",
borderwidth=1,
row=row,
col=col,
)
# Add sample size annotations
for year, group in groups:
fig.add_annotation(
text=f"n={len(group)}",
x=year,
y=max(data["Org_Result_Value"]),
showarrow=False,
font=dict(size=8),
row=row,
col=col,
)
# Update axes
if log_scale:
fig.update_yaxes(type="log", row=row, col=col)
fig.update_xaxes(title_text="Year", row=row, col=col)
fig.update_yaxes(
title_text=f'Value ({data["Org_Result_Unit"].iloc[0]})', row=row, col=col
)
# Update layout
fig.update_layout(
title=f"Water Quality Trends
Station {station_number} - {sample_position}",
title_x=0.5,
showlegend=False,
height=300 * n_rows + 100,
width=1000,
template="plotly_white",
)
return fig
@timer(include_params=True)
def plot_sector_trends(
df: pd.DataFrame, analyte_names: list[str], base_height: float = 4
) -> Figure:
"""
Create plots of mean annual analyte trends by sector.
Parameters:
-----------
df : pd.DataFrame
Input dataframe
analyte_names : list[str]
List of analytes to plot
base_height : float
Height per subplot in inches (default=4)
"""
# Calculate figure dimensions
n_rows = len(analyte_names)
fig_height = base_height * n_rows
# Create figure with dynamic height
fig, axes = plt.subplots(n_rows, 1, figsize=(15, fig_height))
if n_rows == 1:
axes = [axes]
custom_colors = [
"#1f77b4", # blue
"#ff7f0e", # orange
"#2ca02c", # green
"#d62728", # red
"#9467bd", # purple
"#8c564b", # brown
"#e377c2", # pink
"#7f7f7f", # gray
]
for idx, analyte_name in enumerate(analyte_names):
ax = axes[idx]
# Filter data for current analyte
analyte_data = df[df["Org_Analyte_Name"] == analyte_name]
# For Salinity, exclude Fresh Water Lakes
if analyte_name == "Salinity":
analyte_data = analyte_data[analyte_data["Sector"] != "Freshwater Lakes"]
# Plot each sector with custom colors
for sector, color in zip(df["Sector"].unique(), custom_colors):
sector_data = (
analyte_data[analyte_data["Sector"] == sector]
.groupby("Reporting_Year", observed=True)["Org_Result_Value"]
.agg(["mean", "sem"])
.reset_index()
)
if not sector_data.empty:
# Plot mean line with error bands
ax.plot(
sector_data["Reporting_Year"],
sector_data["mean"],
"-o",
color=color,
label=sector,
markersize=4,
linewidth=2,
)
# Add error bands with slightly reduced opacity
ax.fill_between(
sector_data["Reporting_Year"],
sector_data["mean"] - sector_data["sem"],
sector_data["mean"] + sector_data["sem"],
color=color,
alpha=0.15, # Reduced opacity for better visibility
)
# Set x-axis to show only whole years
years = sorted(analyte_data["Reporting_Year"].unique())
ax.set_xticks(years)
ax.set_xticklabels(years)
# Customize subplot with lighter titles and no x-label
ax.set_title(analyte_name, pad=10, fontsize=11, fontweight="normal")
ax.set_xlabel("")
if not analyte_data.empty:
analyte_unit = analyte_data["Org_Result_Unit"].iloc[0]
ax.set_ylabel(f"({analyte_unit})", fontsize=10)
# Improve grid appearance
ax.grid(True, alpha=0.2, linestyle="--")
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
# Simplified legend appearance (removed 3D effects)
ax.legend(
bbox_to_anchor=(1.05, 1),
loc="upper left",
borderaxespad=0.0,
frameon=True,
fancybox=False,
shadow=False,
fontsize=9,
)
if analyte_name in [
"Turbidity",
"Fecal Coliform (MPN)",
"Total Nitrogen",
"Total Phosphorus",
]:
ax.set_yscale("log")
# Adjust layout with more vertical space between subplots
plt.tight_layout(rect=(0, 0, 0.85, 1), h_pad=2.0)
return fig
@st.cache_data
@timer(include_params=True)
def plot_parameter_correlations(
df: pd.DataFrame,
analyte_names: list[str],
subset_by: str,
subset: str,
filter_by: str,
threshold: float = 0.2,
) -> tuple[Figure, pd.DataFrame]:
"""
Creates a correlation heatmap showing relationships between water quality parameters,
with additional information about data completeness.
Parameters
----------
df : pd.DataFrame
Input DataFrame containing water quality measurements. Must have columns:
- Org_Analyte_Name: Name of the analyte
- Org_Result_Value: Measurement value
- Activity_Start_Date_Time: Timestamp of measurement
- Reporting_Year: Year of measurement
- Station_Number: Monitoring station identifier
- Name: Station name
- Sample_Position: Sample depth position (e.g., "Surface", "Bottom")
analyte_names : list[str]
List of analyte names to include in correlation analysis
subset_by : str
Column name used for subsetting the data (e.g., "Sector", "Waterbody_Class")
subset : str
Value within subset_by column to filter data (e.g., specific sector name)
filter_by : str
Sample position filter ("Surface", "Bottom", or "All")
threshold : float, default=0.2
Minimum data completeness threshold (0-1). Parameters with completeness below
this threshold will be excluded from correlation analysis but listed in footnote.
Returns
-------
tuple[Figure, pd.DataFrame]
- Figure: Matplotlib figure containing:
- Correlation heatmap with values
- Title showing subset and sample size
- Footnote listing excluded parameters
- DataFrame: Pivot table of filtered data used for correlation analysis
Notes
-----
- Uses abbreviated parameter names for cleaner display (e.g., "DO" for "Dissolved Oxygen")
- Masks upper triangle of correlation matrix
- Colors correlations using RdBu_r colormap centered at 0
- Includes data completeness information in footnote
- Caches results using streamlit cache decorator
"""
measured_params = (
df[df["Org_Analyte_Name"].isin(analyte_names)]
.groupby("Org_Analyte_Name", observed=True)
.size()
)
# Create pivot table only for measured parameters that were requested
pivot_df = df[
df["Org_Analyte_Name"].isin(set(measured_params.index) & set(analyte_names))
].pivot_table(
index="Activity_Start_Date_Time",
columns="Org_Analyte_Name",
values="Org_Result_Value",
observed=False,
)
name_mapping = {
"Depth, Secchi Disk Depth": "Secchi Depth",
"Dissolved Oxygen": "DO",
"Fecal Coliform (MPN)": "Fecal Coliform",
"Total Nitrogen": "TN",
"Total Phosphorus": "TP",
}
# Calculate completeness based on number of measurements
completeness = {}
for param in measured_params.index:
if param in analyte_names and param in pivot_df.columns:
total_measurements = measured_params[param]
# Use original name to get values from pivot_df
valid_values = pivot_df[param].notna().sum()
# Store result using new name if it exists
new_name = name_mapping.get(param, param)
completeness[new_name] = valid_values / total_measurements
completeness = pd.Series(completeness)
pivot_df = pivot_df.rename(columns=name_mapping)
# Calculate data completeness for each parameter
completeness = pivot_df.notna().mean()
valid_params = completeness[completeness >= threshold].index
excluded_params = completeness[completeness < threshold]
# Filter pivot_df to only include parameters meeting the threshold
pivot_df = pivot_df[valid_params]
# Calculate correlation matrix
corr = pivot_df.corr()
# Calculate sample size
n_samples = len(df)
fig = plt.figure(figsize=(6, 7))
# Adjust gridspec ratios and spacing
gs = fig.add_gridspec(
3,
1,
height_ratios=[
1, # Title space
4, # Heatmap
1.5, # Footnote
],
hspace=0.4,
)
# Add title axes, heatmap axes, and footnote axes
title_ax = fig.add_subplot(gs[0])
heatmap_ax = fig.add_subplot(gs[1])
footnote_ax = fig.add_subplot(gs[2])
# Create heatmap
mask = np.triu(np.ones_like(corr, dtype=bool))
heatmap = sns.heatmap(
corr,
mask=mask,
annot=True,
cmap="RdBu_r",
center=0,
vmin=-1,
vmax=1,
ax=heatmap_ax,
yticklabels=1,
cbar=True,
xticklabels=1,
)
# Rotate x-axis labels and adjust their position
heatmap_ax.set_xticklabels(
heatmap_ax.get_xticklabels(), rotation=45, ha="right", rotation_mode="anchor"
)
heatmap_ax.tick_params(axis="x", pad=10)
# Fix the colorbar ticks warning by setting ticks first
colorbar = heatmap.figure.axes[-1] # type: ignore
ticks = colorbar.get_yticks()
colorbar.set_yticks(ticks)
tick_labels = [f"{x:>8.2f}" for x in ticks]
colorbar.set_yticklabels(tick_labels)
# Rotate y-axis labels to horizontal
heatmap_ax.set_yticklabels(heatmap_ax.get_yticklabels(), rotation=0)
# Remove axis labels
heatmap_ax.set_xlabel("")
heatmap_ax.set_ylabel("")
# Configure footnote axis
footnote_ax.set_frame_on(False) # Hide the frame
footnote_ax.set_xticks([]) # Remove x-ticks
footnote_ax.set_yticks([]) # Remove y-ticks
# Add footnote with adjusted position
if not excluded_params.empty:
footnote_text = "Excluded parameters (<{:.0%} data completeness):\n".format(
threshold
)
for param, completeness_val in excluded_params.items():
footnote_text += f" - {param}: {completeness_val:.1%} complete\n"
footnote_ax.text(
0.01,
0.40,
footnote_text.rstrip(),
ha="left",
va="center",
fontsize=9,
fontstyle="italic",
transform=footnote_ax.transAxes,
)
title_ax.set_frame_on(False)
title_ax.set_xticks([])
title_ax.set_yticks([])
display_filter = "Surface and Bottom" if filter_by == "All" else filter_by
# Add year information to the subtitle
year_info = (
f"Reporting Year {df['Reporting_Year'].iloc[0]}"
if len(df["Reporting_Year"].unique()) == 1
else "All Years"
)
# Add titles - using figure coordinates with adjusted positions
title_ax.text(
0.45,
0.8,
f"{subset_by}: {subset}",
ha="center",
va="center",
fontsize=12,
fontweight="bold",
transform=fig.transFigure,
)
title_ax.text(
0.45,
0.75,
f"{display_filter}, {year_info} (n={n_samples:,})",
ha="center",
va="bottom",
fontsize=10,
fontstyle="italic",
transform=fig.transFigure,
)
# Replace tight_layout with more explicit spacing control
# First, calculate the figure bounds
fig.canvas.draw()
# Get the tight_bbox
renderer = fig.canvas.get_renderer() # type: ignore
fig.get_tightbbox(renderer)
# Adjust the subplot positions manually
fig.subplots_adjust(left=0.1, right=0.95, bottom=0.02, top=0.85, hspace=0.4)
return fig, pivot_df
def plot_np_ratios(df: pd.DataFrame) -> Figure:
# Create dataframe with N, P, and Sector information
nutrients_df = (
df[df["Org_Analyte_Name"].isin(["Total Nitrogen", "Total Phosphorus"])]
.pivot_table(
index=["Activity_Start_Date_Time", "Sector"],
columns="Org_Analyte_Name",
values="Org_Result_Value",
observed=True,
)
.reset_index()
)
# Calculate N:P ratio
nutrients_df["N:P Ratio"] = (
nutrients_df["Total Nitrogen"] / nutrients_df["Total Phosphorus"]
)
# Create figure with two subplots
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10))
# Time series plot with colors by sector
sns.scatterplot(
data=nutrients_df,
x="Activity_Start_Date_Time",
y="N:P Ratio",
hue="Sector",
ax=ax1,
alpha=0.6,
)
ax1.axhline(y=16, color="r", linestyle="--", label="Redfield Ratio (16:1)")
ax1.set_ylabel("N:P Ratio")
ax1.set_xlabel("Date")
ax1.set_title("N:P Ratio Over Time")
# Adjust legend position
ax1.legend(bbox_to_anchor=(1.05, 1), loc="upper left")
# Histogram plot
sns.histplot(x=nutrients_df["N:P Ratio"].dropna(), ax=ax2)
ax2.axvline(x=16, color="r", linestyle="--", label="Redfield Ratio (16:1)")
ax2.set_xlabel("N:P Ratio")
ax2.set_title("Distribution of N:P Ratios")
ax2.legend()
# Adjust layout to accommodate legend
plt.tight_layout(rect=(0, 0, 0.9, 1))
return fig
def altair_plot_np_ratios(df: pd.DataFrame) -> alt.VConcatChart:
# Create dataframe with N, P, and Sector information
nutrients_df = (
df[df["Org_Analyte_Name"].isin(["Total Nitrogen", "Total Phosphorus"])]
.pivot_table(
index=["Activity_Start_Date_Time", "Sector"],
columns="Org_Analyte_Name",
values="Org_Result_Value",
observed=True,
)
.reset_index()
)
# Calculate N:P ratio
nutrients_df["N:P Ratio"] = (
nutrients_df["Total Nitrogen"] / nutrients_df["Total Phosphorus"]
)
# Time series plot with colors by sector
time_series = (
alt.Chart(nutrients_df)
.mark_circle(size=60)
.encode(
x=alt.X(
"Activity_Start_Date_Time:T",
axis=alt.Axis(format="%Y", tickCount="year"),
title="Date",
),
y=alt.Y(r"N\:P Ratio:Q", title="N:P Ratio"),
color="Sector:N",
tooltip=[
alt.Tooltip("Activity_Start_Date_Time:T", title="Date"),
alt.Tooltip(r"N\:P Ratio:Q", format=".0f", title="N:P Ratio"),
alt.Tooltip("Sector:N", title="Sector"),
],
)
.properties(title="N:P Ratio Over Time", width=600, height=300)
.interactive()
)
# Add Redfield Ratio line
redfield_line = (
alt.Chart(pd.DataFrame({"y": [16]})).mark_rule(color="red").encode(y="y:Q")
)
# Histogram plot
histogram = (
alt.Chart(nutrients_df)
.mark_bar()
.encode(
x=alt.X(r"N\:P Ratio:Q", bin=alt.Bin(maxbins=30), title="N:P Ratio"),
y="count()",
tooltip=["count()"],
)
.properties(title="Distribution of N:P Ratios", width=600, height=300)
.interactive()
)
# Add Redfield Ratio line to histogram
redfield_hist_line = (
alt.Chart(pd.DataFrame({"x": [16]})).mark_rule(color="red").encode(x="x:Q")
)
# Combine plots
combined_chart = alt.vconcat(
time_series + redfield_line, histogram + redfield_hist_line
).resolve_scale(y="independent")
return combined_chart
def plot_calendar_heatmap(
df: pd.DataFrame,
analyte: str,
colormap: str | None = None,
position_filter: str = "All",
) -> Figure:
data = df[df["Org_Analyte_Name"] == analyte].copy()
if data.empty:
raise ValueError(
f"No data available for {analyte} with position filter: {position_filter}"
)
result_unit = data["Org_Result_Unit"].iloc[0] if not data.empty else ""
data["Year"] = data["Activity_Start_Date_Time"].dt.year
data["Month"] = data["Activity_Start_Date_Time"].dt.month
pivot_data = data.pivot_table(
values="Org_Result_Value", index="Year", columns="Month", aggfunc="mean"
)
# Choose appropriate colormap based on analyte type
if analyte in ["Fecal Coliform (MPN)"]:
cmap = "viridis" # Blue-green-yellow
elif analyte in ["Temperature, Water"]:
cmap = "coolwarm"
elif analyte in ["Dissolved Oxygen"]:
cmap = "RdYlBu"
elif analyte in ["Total Nitrogen", "Total Phosphorus"]:
cmap = "GnBu" # Green-Blue
elif analyte in ["Depth, Secchi Disk Depth"]:
cmap = "Blues_r"
else:
cmap = "Blues" # Default blue gradient
# If colormap is set, override the analyte-specific default
if colormap:
cmap = colormap
fig, ax = plt.subplots(figsize=(6, len(pivot_data) * 0.5))
# Create heatmap
sns.heatmap(
pivot_data,
cmap=cmap,
annot=True,
fmt=".2f",
cbar_kws={"label": result_unit},
annot_kws={"size": 6},
)
if position_filter == "All":
position_filter = "Surface and Bottom"
ax.set_title(
f"Monthly Averages: {analyte} ({position_filter.lower()})", fontsize=10, pad=10
)
ax.tick_params(axis="both", which="major", labelsize=7)
ax.set_xlabel("Month", fontsize=6)
ax.set_ylabel("Year", fontsize=6)
# Get the colorbar and adjust its label size
colorbar = ax.collections[0].colorbar
colorbar.ax.tick_params(labelsize=7) # type: ignore
colorbar.set_label(result_unit, size=7) # type: ignore
return fig
def plot_seasonal_salinity(
salinity_data: pd.DataFrame,
year: str,
basemap_provider,
alpha=0.5,
shapefile_path="data/SAB/SAB.shp",
reporting_end_month: int = 10,
):
"""
Create seasonal plots of mean salinity values by WBID with basemap.
Uses configurable Reporting Year with meteorological seasons.
Args:
salinity_data: DataFrame containing salinity measurements
year: Reporting Year to filter data for (str)
reporting_end_month: Last month of the reporting year (1-12, default=10 for October)
"""
# Read and filter WBIDs
wbids = gpd.read_file(shapefile_path)
relevant_wbids = salinity_data["WBID"].unique()
wbids = wbids[wbids["WBID"].isin(relevant_wbids)]
wbids = wbids.to_crs(epsg=3857)
# Process data - create a copy to avoid SettingWithCopyWarning
year_data = salinity_data[salinity_data["Reporting_Year"] == int(year)].copy()
# Function to determine quarter based on date and reporting year end
def get_quarter(date, reporting_end_month):
month = date.month
# Calculate month offset to align with reporting year
month_offset = (12 - reporting_end_month) % 12
# Adjust month to align with reporting year
adjusted_month = ((month + month_offset) % 12) or 12
# Determine quarter (1-4)
return f"Q{((adjusted_month - 1) // 3) + 1}"
# Add quarter column
year_data.loc[:, "quarter"] = year_data["Activity_Start_Date_Time"].apply(
lambda x: get_quarter(x, reporting_end_month)
)
# Calculate quarterly means
seasonal_means = (
year_data.groupby(["WBID", "quarter"], observed=True)["Salinity"]
.mean()
.reset_index()
)
fig = plt.figure(figsize=(20, 14))
# Create custom colormap with focused range
colors = ["#08519c", "#73a9cf", "#fee090", "#fc8d59", "#d73027"]
cmap = LinearSegmentedColormap.from_list("custom", colors, N=100)
# Get global min/max for consistent colormap
vmin = seasonal_means["Salinity"].min()
vmax = 40
# Calculate map extent
bounds = wbids.total_bounds
x_buffer = (bounds[2] - bounds[0]) * 0.05
y_buffer = (bounds[3] - bounds[1]) * 0.05
extent = [
bounds[0] - x_buffer,
bounds[2] + x_buffer,
bounds[1] - y_buffer,
bounds[3] + y_buffer,
]
# Create subplots with tighter spacing
gs = fig.add_gridspec(
2,
2,
width_ratios=[1, 1],
wspace=0.05, # Minimal horizontal space between plots
hspace=-0.15, # More negative value to further reduce vertical space
left=0.02, # Left margin
right=0.98, # Right margin
top=0.95, # Slightly reduced top margin to give more space
bottom=0.05, # Slightly increased bottom margin to give more space
)
# Function to get quarter date range
def get_quarter_dates(quarter: str, year: int, reporting_end_month: int) -> str:
# Calculate first month of reporting year
first_month = (reporting_end_month % 12) + 1
# Calculate start month for each quarter
quarter_num = int(quarter[1])
start_month = ((first_month - 1 + ((quarter_num - 1) * 3)) % 12) + 1
end_month = ((start_month + 2) % 12) or 12
# For Reporting Year X, the start date is actually in year X-1 if the month
# is after the reporting end month
start_year = int(year) - 1 if start_month > reporting_end_month else int(year)
end_year = start_year
if end_month < start_month:
end_year += 1
start_date = pd.Timestamp(f"{start_year}-{start_month:02d}-01")
end_date = pd.Timestamp(
f"{end_year}-{end_month:02d}-{pd.Timestamp(f'{end_year}-{end_month:02d}').days_in_month}"
)
return f"{start_date.strftime('%b %d, %Y')} - {end_date.strftime('%b %d, %Y')}"
# Use quarters instead of seasons
quarters = ["Q1", "Q2", "Q3", "Q4"]
for idx, quarter in enumerate(quarters):
ax = fig.add_subplot(gs[idx // 2, idx % 2])
quarter_data = seasonal_means[seasonal_means["quarter"] == quarter]
merged = wbids.merge(quarter_data, on="WBID", how="left")
# Plot WBIDs
merged.plot(
column="Salinity",
ax=ax,
cmap=cmap,
vmin=vmin,
vmax=vmax,
alpha=0.7,
missing_kwds={"color": "lightgrey", "alpha": 0.5},
)
ctx.add_basemap(ax, source=basemap_provider, zoom=11, alpha=alpha) # type: ignore
ax.set_xlim(extent[0], extent[1])
ax.set_ylim(extent[2], extent[3])
# Get date range for this quarter
date_range = get_quarter_dates(quarter, int(year), reporting_end_month)
# Create title with two lines
if idx < 2: # Top row
ax.set_title(
f"Quarter {quarter[1]} Mean Salinity\n{date_range}",
pad=15,
fontsize=10,
)
else: # Bottom row
ax.set_title(
f"Quarter {quarter[1]} Mean Salinity\n{date_range}",
pad=5,
fontsize=10,
)
ax.set_axis_off()
# Add colorbar
norm = plt.Normalize(vmin=vmin, vmax=vmax) # type: ignore
sm = plt.cm.ScalarMappable(cmap=cmap, norm=norm)
sm.set_array([])
fig.colorbar(
sm,
ax=fig.axes,
orientation="vertical",
label="Salinity (ppt)",
pad=0.01,
fraction=0.015,
ticks=np.arange(0, 45, 5), # Add ticks every 5 units
)
return fig
def plot_seasonal_salinity_for_bays(
salinity_data: pd.DataFrame,
year: str,
basemap_provider=ctx.providers.USGS.USTopo, # type: ignore
alpha=0.5,
shapefile_path="data/SAB/SAB.shp",
wbids=None,
reporting_end_month: int = 10,
):
"""
Create seasonal plots of mean salinity values by WBID for N, E, W, SAB, GL and Lake Powell.
"""
if wbids is None:
wbids = gpd.read_file(shapefile_path)
if wbids.crs is None:
wbids.set_crs(epsg=6439, inplace=True)
wbids = wbids.to_crs(epsg=3857)
fig = plot_seasonal_salinity(
salinity_data.query(
"WBID.isin(['1061A', '1061B', '1061C', '1061D', '1061E', '1061F', '1061G', '1061H', '1055A'])"
),
year=year,
basemap_provider=basemap_provider,
alpha=alpha,
shapefile_path=shapefile_path,
reporting_end_month=reporting_end_month,
)
return fig
def plot_do_temp_relationship(df: pd.DataFrame) -> Figure:
"""
Create a scatter plot of DO vs temperature with regression line using seaborn.
Parameters:
-----------
df : pd.DataFrame
Input dataframe containing DO and temperature measurements
Returns:
--------
Figure
Matplotlib figure containing the plot
"""
do_temp_data = (
df[df["Org_Analyte_Name"].isin(["Dissolved Oxygen", "Temperature, Water"])]
.pivot_table(
index=["Activity_Start_Date_Time", "Station_Number", "Sample_Position"],
columns="Org_Analyte_Name",
values="Org_Result_Value",
observed=True,
)
.reset_index()
.dropna(subset=["Dissolved Oxygen", "Temperature, Water"])
)
# Create custom color palette matching DO timeseries
custom_palette = {"Surface": "#5AA4D8", "Bottom": "#1B4B8A"}
# Create plot with regression line and adjust the hue order
g = sns.lmplot(
data=do_temp_data,
x="Temperature, Water",
y="Dissolved Oxygen",
hue="Sample_Position",
hue_order=["Bottom", "Surface"], # Plot 'Bottom' first
palette=custom_palette,
scatter_kws={"alpha": 0.5, "zorder": 2, "s": 20}, # Scatter plots at zorder=2
line_kws={"zorder": 3, "linewidth": 1}, # Trend lines at zorder=3
height=8,
aspect=1.5,
legend=False,
)
# Add DO threshold and set z-order
ax = g.axes[0, 0]
ax.axhline(
y=4.8, color="#FF8C00", linestyle="--", alpha=0.9, zorder=1, linewidth=1
) # Threshold line at zorder=1
ax.text(
ax.get_xlim()[0],
4.9,
" 4.8 mg/L DO threshold",
ha="left",
va="bottom",
color="#FF8C00",
alpha=0.9,
)
# Customize spines - only show bottom spine
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
ax.spines["left"].set_visible(False)
ax.spines["bottom"].set_color("black")
ax.spines["bottom"].set_linewidth(0.5)
g.set_axis_labels("Water Temperature (°C)", "Dissolved Oxygen (mg/L)")
ax.set_title("Dissolved Oxygen vs Water Temperature", pad=20, fontsize=16)
# Adjust legend to show 'Surface' first
handles, labels = ax.get_legend_handles_labels()
# Reverse the order of handles and labels
handles = handles[::-1]
labels = labels[::-1]
ax.legend(
handles,
labels,
bbox_to_anchor=(1.0, 1.0),
loc="upper right",
frameon=False,
handletextpad=0.5,
)
# Add grid with matching style
ax.grid(True, axis="y", alpha=0.15, linestyle="-", color="gray")
# Remove tick marks but keep labels
ax.tick_params(axis="y", which="both", length=0)
# Set y-axis limits with some padding
ymin = max(int(min(do_temp_data["Dissolved Oxygen"].min(), 4.8) * 0.9) - 1, 0)
ymax = do_temp_data["Dissolved Oxygen"].max() * 1.1
ax.set_ylim(ymin, ymax)
yticks = np.arange(ymin, ymax, 2)
ax.set_yticks(yticks)
return g.figure
def plotly_plot_do_temp_relationship(df: pd.DataFrame) -> go.Figure:
"""
Create an interactive scatter plot of DO vs temperature with regression lines using Plotly.
Matches the style and features of the original matplotlib/seaborn plot.
Parameters:
-----------
df : pd.DataFrame
Input dataframe containing DO and temperature measurements
Returns:
--------
go.Figure
Plotly figure object
"""
# Prepare the data similarly to the original function
do_temp_data = (
df[df["Org_Analyte_Name"].isin(["Dissolved Oxygen", "Temperature, Water"])]
.pivot_table(
index=[
"Activity_Start_Date_Time",
"Station_Number",
"Sample_Position",
"Sector", # Added for tooltip
],
columns="Org_Analyte_Name",
values="Org_Result_Value",
observed=True,
)
.reset_index()
.dropna(subset=["Dissolved Oxygen", "Temperature, Water"])
)
# Create figure
fig = go.Figure()
# Colors matching seaborn's muted palette
colors = {"Surface": "#8da0cb", "Bottom": "#fc8d62"}
# Add scatter plots and regression lines for each position
for position in ["Surface", "Bottom"]:
pos_data = do_temp_data[do_temp_data["Sample_Position"] == position]
# Add scatter plot
fig.add_trace(
go.Scatter(
x=pos_data["Temperature, Water"],
y=pos_data["Dissolved Oxygen"],
mode="markers",
name=position,
marker=dict(color=colors[position], size=8, opacity=0.6),
hovertemplate=(
"Temperature: %{x:.1f}°C
"
"DO: %{y:.1f} mg/L
"
"Position: " + position + "
"
"Station: %{customdata[0]}
"
"Sector: %{customdata[1]}
"
""
),
customdata=pos_data[["Station_Number", "Sector"]],
)
)
# Calculate and add regression line
z = np.polyfit(pos_data["Temperature, Water"], pos_data["Dissolved Oxygen"], 1)
p = np.poly1d(z)
x_range = np.linspace(
pos_data["Temperature, Water"].min(),
pos_data["Temperature, Water"].max(),
100,
)
fig.add_trace(
go.Scatter(
x=x_range,
y=p(x_range),
mode="lines",
line=dict(color=colors[position], dash="dash"),
name=f"{position} Trend",
hovertemplate=None,
hoverinfo="skip",
showlegend=False,
)
)
# Add DO threshold line
fig.add_hline(
y=4.8,
line=dict(color="#FF8C00", width=1, dash="dash"),
opacity=0.5,
annotation_text="4.8 mg/L DO threshold",
annotation_position="left",
annotation=dict(
font=dict(color="#FF8C00", size=12),
xanchor="left",
yanchor="bottom",
opacity=0.8,
),
)
# Update layout
fig.update_layout(
title=dict(
text="Dissolved Oxygen vs Water Temperature",
x=0.5,
y=0.95,
xanchor="center",
yanchor="top",
font=dict(size=16),
),
xaxis_title="Water Temperature (°C)",
yaxis_title="Dissolved Oxygen (mg/L)",
legend_title="Sample Position",
legend=dict(
yanchor="top",
y=1,
xanchor="left",
x=1.05,
),
template="plotly_white",
width=800,
height=600,
showlegend=True,
)
# Update axes
fig.update_xaxes(showgrid=True, gridwidth=1, gridcolor="rgba(128, 128, 128, 0.2)")
fig.update_yaxes(showgrid=True, gridwidth=1, gridcolor="rgba(128, 128, 128, 0.2)")
return fig
def altair_plot_do_temp_relationship(df: pd.DataFrame) -> alt.LayerChart:
"""
Create an interactive scatter plot of DO vs temperature with regression lines using Altair.
Matches the style and features of the original matplotlib/seaborn plot.
Parameters:
-----------
df : pd.DataFrame
Input dataframe containing DO and temperature measurements
Returns:
--------
alt.Chart
Altair chart object
"""
# Prepare the data similarly to the original function
do_temp_data = (
df[df["Org_Analyte_Name"].isin(["Dissolved Oxygen", "Temperature, Water"])]
.pivot_table(
index=[
"Activity_Start_Date_Time",
"Station_Number",
"Sample_Position",
"Sector",
],
columns="Org_Analyte_Name",
values="Org_Result_Value",
observed=True,
)
.reset_index()
.dropna(subset=["Dissolved Oxygen", "Temperature, Water"])
)
# Create the base scatter plot
scatter = (
alt.Chart(do_temp_data)
.mark_circle(size=60, opacity=0.6)
.encode(
x=alt.X(
"Temperature, Water:Q",
title="Water Temperature (°C)",
scale=alt.Scale(zero=False),
),
y=alt.Y(
"Dissolved Oxygen:Q",
title="Dissolved Oxygen (mg/L)",
scale=alt.Scale(zero=False),
),
color=alt.Color(
"Sample_Position:N",
scale=alt.Scale(
domain=["Surface", "Bottom"],
range=["#8da0cb", "#fc8d62"], # Muted blue and orange
),
legend=alt.Legend(title="Sample Position"),
),
tooltip=[
alt.Tooltip("Temperature, Water:Q", title="Temperature", format=".1f"),
alt.Tooltip("Dissolved Oxygen:Q", title="DO", format=".1f"),
alt.Tooltip("Sample_Position:N", title="Position"),
alt.Tooltip("Sector:N", title="Sector"),
alt.Tooltip("Station_Number:N", title="Station"),
],
)
)
# Add regression lines for each Sample_Position
regression = (
scatter.transform_regression(
"Temperature, Water", "Dissolved Oxygen", groupby=["Sample_Position"]
)
.mark_line(size=2)
.encode(
color=alt.Color(
"Sample_Position:N",
scale=alt.Scale(
domain=["Surface", "Bottom"], range=["#8da0cb", "#fc8d62"]
),
)
)
)
# Create DO threshold line
threshold_df = pd.DataFrame({"y": [5]})
threshold_line = (
alt.Chart(threshold_df)
.mark_rule(strokeDash=[4, 4], color="red", opacity=0.5)
.encode(y="y:Q")
)
# Add threshold label
threshold_label = (
alt.Chart(
pd.DataFrame({"x": [do_temp_data["Temperature, Water"].min()], "y": [5.1]})
)
.mark_text(
align="left",
baseline="bottom",
color="red",
opacity=0.5,
text=" 5 mg/L DO threshold",
)
.encode(x="x:Q", y="y:Q")
)
# Combine all layers and configure
final_chart = (
alt.layer(scatter, regression, threshold_line, threshold_label)
.properties(
width=800,
height=750,
)
.configure_axis(grid=True, gridOpacity=0.3)
.interactive()
)
return final_chart
@timer(include_params=True)
def generate_seasonal_plot(data, year, shapefile_path):
"""Generate the seasonal trends plot"""
# Add debugging information
wbids = gpd.read_file(shapefile_path)
# Ensure input data has CRS set
if isinstance(data, gpd.GeoDataFrame):
if data.crs is None:
# Assuming the input coordinates are in WGS84 (EPSG:4326)
data.set_crs(epsg=4326, inplace=True)
# Ensure shapefile has CRS set and transform to Web Mercator
if wbids.crs is None:
wbids.set_crs(epsg=6439, inplace=True)
# Pre-transform to Web Mercator (EPSG:3857) here to avoid issues in plotting function
wbids = wbids.to_crs(epsg=3857)
if st.session_state.get("DEBUG", False):
st.write("Debug Info:")
st.write(
{
"Shapefile CRS": wbids.crs,
"Input Data CRS": data.crs
if isinstance(data, gpd.GeoDataFrame)
else "Not a GeoDataFrame",
"GDAL Version": gdal.VersionInfo()
if "osgeo.gdal" in sys.modules
else "Not available",
"GeoPandas Version": gpd.__version__,
"Python Version": sys.version,
"File exists": Path(shapefile_path).exists(),
"Associated files": list(Path(shapefile_path).parent.glob("*.*")),
}
)
return plot_seasonal_salinity_for_bays(
data,
year,
shapefile_path=shapefile_path,
wbids=wbids,
reporting_end_month=st.session_state.reporting_month,
)
def plot_do_timeseries(
df: pd.DataFrame,
period: str = "Yearly",
sector: str = "All",
epa_thresh: float = 4.8,
) -> Figure:
"""
Create a time series plot of dissolved oxygen levels for surface and bottom measurements.
Reference:
https://www.hudsonriver.org/ccmp/soe/water-quality/do
Parameters:
-----------
df : pd.DataFrame
Filtered dataframe containing dissolved oxygen measurements
period : str
'yearly' or 'monthly' aggregation period
epa_thresh : float
EPA threshold value for DO in mg/L
Returns:
--------
Figure
Matplotlib figure containing the plot
"""
period = period.lower()
# Filter for DO data and pivot for surface/bottom
do_data = df[
(df["Org_Analyte_Name"] == "Dissolved Oxygen")
& (df["Sample_Position"].isin(["Surface", "Bottom"]))
].copy()
# Create time grouping based on period
if period == "yearly":
do_data["Period"] = do_data["Reporting_Year"]
else: # monthly
do_data["Period"] = pd.to_datetime(
do_data["Activity_Start_Date_Time"]
).dt.to_period("M")
do_data["Period_Start"] = do_data["Period"].dt.to_timestamp()
# Calculate means for each position and period
means = (
do_data.groupby(["Period", "Sample_Position"], observed=True)[
"Org_Result_Value"
]
.mean()
.reset_index()
.pivot(index="Period", columns="Sample_Position", values="Org_Result_Value")
)
# Create figure
fig, ax = plt.subplots(figsize=(15, 8))
# Convert Period index to proper format for plotting
if period == "yearly":
x_values = np.array(means.index.astype(float)) # Explicitly create numpy array
else:
# Convert to numpy array of datetime64
x_values = np.array(
[pd.Period(idx).to_timestamp() for idx in means.index],
dtype="datetime64[ns]",
)
# Plot connecting lines only (no markers)
for i, (idx, row) in enumerate(means.iterrows()):
x_val = x_values[i]
ax.plot(
[x_val, x_val], # Use scalar value instead of list
[row["Bottom"], row["Surface"]],
color="lightgray",
linewidth=1,
zorder=1,
solid_capstyle="round",
)
# Calculate dynamic point size based on number of points
n_points = len(x_values)
base_size = 80 # Maximum point size
min_size = 20 # Minimum point size
# Exponential decay formula: size decreases as number of points increases
point_size = max(
min_size,
base_size * math.exp(-0.0015 * n_points),
)
# Update scatter plot styling
surface_scatter = ax.scatter(
x_values,
means["Surface"],
color="#5AA4D8",
s=point_size,
zorder=2,
label="Surface",
edgecolors="white",
linewidth=1,
alpha=0.9,
)
bottom_scatter = ax.scatter(
x_values,
means["Bottom"],
color="#1B4B8A",
s=point_size,
zorder=2,
label="Bottom",
edgecolors="white",
linewidth=1,
alpha=0.9,
)
# Update EPA threshold line
threshold_line = ax.axhline(
y=epa_thresh,
color="#FF8C00",
linestyle="--",
alpha=0.9,
linewidth=1,
label=f"EPA threshold: {epa_thresh} mg/L",
zorder=0,
)
# Customize legend
ax.legend(
handles=[surface_scatter, bottom_scatter, threshold_line],
loc="upper right",
frameon=False,
ncol=1, # Stack legend items vertically
bbox_to_anchor=(1.0, 1.0), # Position at top right
handletextpad=0.5, # Reduce space between handle and text
)
# Customize spines - only show bottom spine
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
ax.spines["left"].set_visible(False)
ax.spines["bottom"].set_color("black")
ax.spines["bottom"].set_linewidth(0.5)
# Customize plot with modified grid and axis settings
ax.set_xlabel("Year" if period == "yearly" else "Month")
ax.set_ylabel("Dissolved Oxygen (mg/L)")
ax.set_title("Long-term Dissolved Oxygen Trends")
ax.grid(True, axis="y", alpha=0.15, linestyle="-", color="gray")
# Set y-axis limits with some padding
ymin = max(int(min(means["Bottom"].min(), epa_thresh) * 0.9) - 1, 0)
# ymin = 0
ymax = means["Surface"].max() * 1.1
ax.set_ylim(ymin, ymax)
yticks = np.arange(ymin, ymax, 2)
ax.set_yticks(yticks)
# Remove tick marks but keep labels
ax.tick_params(axis="y", which="both", length=0)
# Adjust x-axis ticks and limits
if period == "monthly":
ax.xaxis.set_major_formatter(mdates.DateFormatter("%Y"))
ax.xaxis.set_major_locator(mdates.YearLocator())
plt.xticks(rotation=0)
# Convert to datetime for padding
start_date = mdates.date2num(
pd.Timestamp(min(x_values)) - pd.DateOffset(months=1)
)
end_date = mdates.date2num(
pd.Timestamp(max(x_values)) + pd.DateOffset(months=1)
)
ax.set_xlim(mdates.num2date(start_date), mdates.num2date(end_date))
else:
# For yearly data, ensure whole number ticks but month-based padding
min_year = float(np.floor(min(x_values)))
max_year = float(np.ceil(max(x_values)))
# Set whole number ticks
years = np.arange(min_year, max_year + 1)
ax.set_xticks(years)
# Set limits with one month padding
ax.set_xlim(
min_year - 0.083, max_year + 0.083
) # ~1/12 of a year for month padding
# Move y-axis labels to the left of the gridlines
ax.yaxis.tick_left()
ax.yaxis.set_label_position("left")
plt.tight_layout()
return fig
def plot_do_scatter(
df: pd.DataFrame,
sector: str = "All",
thresh: float = 3.0,
) -> Figure:
"""
Create a scatter plot of all dissolved oxygen measurements.
Parameters:
-----------
df : pd.DataFrame
Filtered dataframe containing dissolved oxygen measurements
sector : str
Sector to filter by, or 'All' for all sectors
thresh : float
Threshold value for DO in mg/L
Returns:
--------
Figure
Matplotlib figure containing the plot
"""
# Filter for DO data
do_data = df[
(df["Org_Analyte_Name"] == "Dissolved Oxygen")
& (df["Sample_Position"].isin(["Surface", "Bottom"]))
].copy()
# Create figure with specific dimensions
fig, ax = plt.subplots(figsize=(15, 8))
# Plot surface and bottom measurements with smaller points
surface_data = do_data[do_data["Sample_Position"] == "Surface"]
bottom_data = do_data[do_data["Sample_Position"] == "Bottom"]
# Plot points
ax.scatter(
surface_data["Activity_Start_Date_Time"],
surface_data["Org_Result_Value"],
color="#1f77b4", # Darker blue for surface
s=25,
alpha=0.5,
label="Surface",
zorder=2,
)
ax.scatter(
bottom_data["Activity_Start_Date_Time"],
bottom_data["Org_Result_Value"],
color="#7fbf7b", # Muted green for bottom
s=25,
alpha=0.5,
label="Bottom",
zorder=2,
)
# Add Hurricane Michael vertical line and annotation if within date range
hurricane_date = pd.Timestamp("2018-10-10")
# Get the date range of the plotted data
data_start = min(do_data["Activity_Start_Date_Time"])
data_end = max(do_data["Activity_Start_Date_Time"])
# Only add hurricane line and annotation if the date falls within the data range
if data_start <= hurricane_date <= data_end:
# Get y-axis limits for line placement
ymin, ymax = ax.get_ylim()
line_height = ymax * 0.95
# Add vertical line with dot at top
ax.axvline(
x=hurricane_date, # type: ignore
color="gray",
linestyle="-",
alpha=0.6,
linewidth=1,
ymin=0,
ymax=line_height / ymax,
zorder=1,
)
# Add dot at top of line
ax.scatter(
[hurricane_date], # type: ignore
[line_height],
color="gray",
s=25,
alpha=0.8,
zorder=2,
)
# Add two-line annotation with bold date
ax.annotate(
"Oct 2018",
xy=(hurricane_date, line_height), # type: ignore
xytext=(5, 0),
textcoords="offset points",
ha="left",
va="bottom",
color="gray",
fontsize=10,
weight="bold",
)
ax.annotate(
"Hurricane Michael",
xy=(hurricane_date, line_height), # type: ignore
xytext=(5, -12),
textcoords="offset points",
ha="left",
va="bottom",
color="gray",
fontsize=10,
)
# Add threshold line
ax.axhline(
y=thresh,
color="red",
linestyle=":",
alpha=0.9,
linewidth=1.5,
label=f"Threshold: {thresh} mg/L",
zorder=1,
)
# Customize legend with larger font
ax.legend(
loc="upper right",
frameon=True,
ncol=1,
bbox_to_anchor=(1.0, 1.0),
handletextpad=0.5,
fontsize=12, # Increased font size
)
# Customize spines - only show bottom spine
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
ax.spines["left"].set_visible(False)
ax.spines["bottom"].set_color("black")
ax.spines["bottom"].set_linewidth(0.5)
# Set labels and title
title = "DO mg/L"
if sector != "All":
title += f" - {sector}"
ax.set_title(title, fontsize=14) # Increased font size
# Add grid
ax.grid(True, axis="both", alpha=0.15, linestyle="-", color="gray")
# Set y-axis limits with padding
ymin = max(int(min(do_data["Org_Result_Value"].min(), thresh) * 0.9) - 1, 0)
ymax = do_data["Org_Result_Value"].max() * 1.1
ax.set_ylim(ymin, ymax)
yticks = np.arange(ymin, ymax, 2)
ax.set_yticks(yticks)
# Remove tick marks but keep labels
ax.tick_params(axis="y", which="both", length=0)
# Format x-axis
years = mdates.YearLocator()
ax.xaxis.set_major_locator(years)
ax.xaxis.set_major_formatter(mdates.DateFormatter("%Y"))
plt.tight_layout()
return fig
def plot_scatter(
df: pd.DataFrame,
parameter: str,
sector: str = "All",
thresh: float | None = None,
) -> tuple[Figure, pd.DataFrame]:
"""
Create a scatter plot of water quality measurements for any parameter.
Parameters:
-----------
df : pd.DataFrame
Filtered dataframe containing water quality measurements
parameter : str
Name of the parameter to plot (e.g., "Dissolved Oxygen", "Temperature, Water")
sector : str
Sector to filter by, or 'All' for all sectors
thresh : float | None
Optional threshold value to display on plot
Returns:
--------
tuple[Figure, pd.DataFrame]
- Figure: Matplotlib figure containing the scatter plot
- DataFrame: Filtered dataframe containing the parameter data used in the plot
"""
# Filter for parameter data
param_data = df[
(df["Org_Analyte_Name"] == parameter)
& (df["Sample_Position"].isin(["Surface", "Bottom"]))
].copy()
if param_data.empty:
raise ValueError(f"No data found for parameter: {parameter}")
# Get the unit for y-axis label
unit = param_data["Org_Result_Unit"].iloc[0]
# Create figure with specific dimensions
fig, ax = plt.subplots(figsize=(15, 8))
# Plot surface and bottom measurements
surface_data = param_data[param_data["Sample_Position"] == "Surface"]
bottom_data = param_data[param_data["Sample_Position"] == "Bottom"]
# Determine if log scale should be used
log_scale_parameters = [
"Turbidity",
"Fecal Coliform (MPN)",
"Total Nitrogen",
"Total Phosphorus",
"Color",
]
log_scale = parameter in log_scale_parameters
if log_scale:
ax.set_yscale("log")
ax.yaxis.set_major_formatter(plt.ScalarFormatter()) # type: ignore
# For log scale, set limits based on order of magnitude
ymin = max(
param_data["Org_Result_Value"].min() * 0.5, 0.1
) # Don't go below 0.1
ymax = param_data["Org_Result_Value"].max() * 2
if thresh is not None:
ymin = min(ymin, thresh * 0.5)
ax.set_ylim(ymin, ymax)
# Generate log-spaced ticks
log_ymin = np.floor(np.log10(ymin))
log_ymax = np.ceil(np.log10(ymax))
yticks = np.logspace(log_ymin, log_ymax, int(log_ymax - log_ymin) + 1)
ax.set_yticks(yticks)
ax.yaxis.set_major_formatter(plt.ScalarFormatter()) # type: ignore
ax.yaxis.set_minor_formatter(plt.NullFormatter()) # type: ignore
else:
# Existing linear scale code
ymin = param_data["Org_Result_Value"].min() * 0.9
ymax = param_data["Org_Result_Value"].max() * 1.1
if thresh is not None:
ymin = min(ymin, thresh * 0.9)
ax.set_ylim(ymin, ymax)
# Set y-axis ticks for linear scale
tick_range = ymax - ymin
if tick_range > 10:
tick_spacing = 2.0
elif tick_range > 5:
tick_spacing = 1.0
else:
tick_spacing = 0.5
yticks = np.arange(np.floor(ymin), np.ceil(ymax), tick_spacing)
ax.set_yticks(yticks)
# Plot points and collect legend handles/labels
handles = []
labels = []
# Always plot surface data
surface_scatter = ax.scatter(
surface_data["Activity_Start_Date_Time"],
surface_data["Org_Result_Value"],
color="#1f77b4", # Darker blue for surface
s=25,
alpha=0.5,
label="Surface",
zorder=2,
)
handles.append(surface_scatter)
labels.append("Surface")
# Only plot and add to legend if bottom data exists
if not bottom_data.empty:
bottom_scatter = ax.scatter(
bottom_data["Activity_Start_Date_Time"],
bottom_data["Org_Result_Value"],
color="#7fbf7b", # Muted green for bottom
s=25,
alpha=0.5,
label="Bottom",
zorder=2,
)
handles.append(bottom_scatter)
labels.append("Bottom")
# Add Hurricane Michael vertical line and annotation if within date range
hurricane_date = pd.Timestamp("2018-10-10")
# Get the date range of the plotted data
data_start = min(param_data["Activity_Start_Date_Time"])
data_end = max(param_data["Activity_Start_Date_Time"])
# Only add hurricane line and annotation if the date falls within the data range
if data_start <= hurricane_date <= data_end:
# Get y-axis limits for line placement
ymin, ymax = ax.get_ylim()
line_height = ymax * 0.95
# Add vertical line with dot at top
ax.axvline(
x=hurricane_date, # type: ignore
color="gray",
linestyle="-",
alpha=0.6,
linewidth=1,
ymin=0,
ymax=line_height / ymax,
zorder=1,
)
# Add dot at top of line
ax.scatter(
[hurricane_date], # type: ignore
[line_height],
color="gray",
s=25,
alpha=0.8,
zorder=2,
)
# Add two-line annotation with bold date
ax.annotate(
"Oct 2018",
xy=(hurricane_date, line_height), # type: ignore
xytext=(5, 0),
textcoords="offset points",
ha="left",
va="bottom",
color="gray",
fontsize=10,
weight="bold",
)
ax.annotate(
"Hurricane Michael",
xy=(hurricane_date, line_height), # type: ignore
xytext=(5, -12),
textcoords="offset points",
ha="left",
va="bottom",
color="gray",
fontsize=10,
)
# Add threshold line if specified
if thresh is not None:
threshold_line = ax.axhline(
y=thresh,
color="red",
linestyle=":",
alpha=0.9,
linewidth=1.5,
label=f"Threshold: {thresh} {unit}",
zorder=1,
)
handles.append(threshold_line)
labels.append(f"Threshold: {thresh} {unit}")
# Update legend with collected handles and labels
if parameter not in ["Depth, Secchi Disk Depth", "Temperature, Air"]:
ax.legend(
handles=handles,
labels=labels,
loc="upper right",
frameon=True,
ncol=1,
bbox_to_anchor=(1.0, 1.0),
handletextpad=0.5,
fontsize=12,
)
# Customize spines - only show bottom spine
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
ax.spines["left"].set_visible(False)
ax.spines["bottom"].set_color("black")
ax.spines["bottom"].set_linewidth(0.5)
# Set labels and title
title = parameter
if sector != "All":
title += f" - {sector}"
ax.set_title(title, fontsize=14)
# ax.set_xlabel("Date", fontsize=12)
ax.set_ylabel(f"{unit}", fontsize=12)
# Add grid
ax.grid(True, axis="both", alpha=0.15, linestyle="-", color="gray")
# Remove tick marks but keep labels
ax.tick_params(axis="y", which="both", length=0)
# Format x-axis
years = mdates.YearLocator()
ax.xaxis.set_major_locator(years)
ax.xaxis.set_major_formatter(mdates.DateFormatter("%Y"))
plt.tight_layout()
return (fig, param_data)
@timer(include_params=True)
def plot_grouped_bars(
df: pd.DataFrame,
parameter: str,
year_range: tuple[int, int],
group_by: str = "sector",
) -> tuple[Figure, pd.DataFrame]:
"""
Create a grouped bar chart showing means by sector or year for a selected parameter.
Parameters:
-----------
df : pd.DataFrame
Input dataframe containing water quality measurements
parameter : str
Name of the parameter to plot
year_range : tuple[int, int]
Start and end years to include in plot
group_by : str
How to group the bars - either "sector" (default) or "year"
Returns:
--------
tuple[Figure, pd.DataFrame]
- Figure: Matplotlib figure containing the grouped bar chart
- DataFrame: Contains the plotted data points with means and standard errors
"""
# Filter data for parameter and year range
plot_df = df[
(df["Org_Analyte_Name"] == parameter)
& (df["Reporting_Year"] >= year_range[0])
& (df["Reporting_Year"] <= year_range[1])
].copy()
if plot_df.empty:
raise ValueError(
f"No data available for {parameter} between {year_range[0]}-{year_range[1]}"
)
# Calculate annual means by sector
means_df = (
plot_df.groupby(["Reporting_Year", "Sector"], observed=True)["Org_Result_Value"]
.agg(["mean", "sem"])
.reset_index()
)
# Get unique years and sectors for plotting
years = sorted(means_df["Reporting_Year"].unique())
sectors = sorted(means_df["Sector"].unique())
# Determine primary and secondary categories based on grouping
if group_by == "year":
primary_categories = sectors
secondary_categories = years
x_values = years
group_column = "Reporting_Year"
category_column = "Sector"
x_label = "Reporting Year"
legend_title = "Sector"
else: # group_by == "sector"
primary_categories = years
secondary_categories = sectors
x_values = sectors # noqa: F841
group_column = "Sector" # noqa: F841
category_column = "Reporting_Year"
x_label = "Sector"
legend_title = "Year" # noqa: F841
n_groups = len(primary_categories)
colors = [
"#E69F00", # Orange
"#56B4E9", # Sky Blue
"#009E73", # Bluish Green
"#F0E442", # Yellow
"#0072B2", # Blue
"#D55E00", # Vermilion
"#CC79A7", # Reddish Purple
"#999999", # Gray
"#F5C710", # Golden Yellow
"#93AA00", # Lime Green
"#482677", # Dark Purple
"#DA5724", # Rust
"#5082CF", # Steel Blue
"#CD9BCD", # Lavender
"#C1A43A", # Olive Green
]
# Create figure
fig, ax = plt.subplots(figsize=(12, 6))
# Calculate bar positions
bar_width = 0.8 / n_groups # Standard bar width
# Calculate center positions for x-axis labels
group_centers = (
np.arange(len(secondary_categories)) + (bar_width * (n_groups - 1)) / 2
)
# Plot bars for each primary category
for i, (category, color) in enumerate(zip(primary_categories, colors)):
category_data = means_df[means_df[category_column] == category]
# Create bars with simple offset calculation
bars = ax.bar( # noqa: F841
np.arange(len(secondary_categories)) + i * bar_width,
category_data["mean"],
bar_width,
label=str(category),
color=color,
alpha=0.7,
zorder=2,
)
# Add error bars
ax.errorbar(
np.arange(len(secondary_categories)) + i * bar_width,
category_data["mean"],
yerr=category_data["sem"],
fmt="none",
color="black",
capsize=3,
capthick=1,
linewidth=1,
alpha=0.5,
zorder=3,
)
# Customize plot
unit = plot_df["Org_Result_Unit"].iloc[0]
ax.set_xlabel(x_label)
title = f"{parameter} (Mean Annual{' ' + unit if unit else ''})"
ax.set_title(title)
# Function to wrap text
def wrap_labels(text, width=10):
"""Wrap text at specified width using textwrap."""
# Convert to string and wrap if needed
text_str = str(text)
if len(text_str) > width:
return textwrap.fill(text_str, width=width)
return text_str
# Set x-axis ticks and labels with wrapping using centered positions
ax.set_xticks(group_centers)
wrapped_labels = [wrap_labels(str(label)) for label in secondary_categories]
ax.set_xticklabels(
wrapped_labels,
ha="center",
va="top",
rotation=0,
)
# Remove x-axis tick marks
ax.tick_params(axis="x", length=0)
# Add error bar note with adjusted position
ax.text(
0.99,
-0.15,
"Error bars represent ±1 standard error of the mean",
ha="right",
va="top",
transform=ax.transAxes,
fontsize=9,
fontstyle="italic",
)
# Adjust layout with more vertical space for wrapped labels
plt.tight_layout(rect=(0, 0.2, 1, 1))
# Add grid
ax.grid(True, axis="y", alpha=0.2, linestyle="-", zorder=1)
# Customize spines
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
ax.spines["left"].set_visible(False)
# Remove tick marks but keep labels
ax.tick_params(axis="y", which="both", length=0)
ax.legend(
bbox_to_anchor=(1.02, 1), # Position at top-right
loc="upper left",
frameon=False,
ncol=1,
handletextpad=0.5,
fontsize=9,
)
# Determine if log scale should be used
if parameter in [
# "Turbidity",
"Fecal Coliform (MPN)",
"Total Nitrogen",
"Total Phosphorus",
]:
ax.set_yscale("log")
ax.yaxis.set_major_formatter(plt.ScalarFormatter()) # type: ignore
means_df.insert(0, "parameter", parameter)
return fig, means_df
def plot_seasonal_line(
df: pd.DataFrame,
parameter: str,
period: str = "quarterly",
thresh: float | None = None,
sector: str | None = None,
) -> tuple[Figure, pd.DataFrame, pd.DataFrame]:
"""
Create a line chart showing seasonal trends for a parameter across all years.
Parameters:
-----------
df : pd.DataFrame
Input dataframe containing measurements
parameter : str
Name of the parameter to plot
period : str
'monthly' or 'quarterly' aggregation period
thresh : float | None
Optional threshold value to display on plot
sector : str | None
Optional sector name to include in title
Returns:
--------
tuple[Figure, pd.DataFrame]
- Figure: Matplotlib figure containing the plot
- DataFrame: Filtered dataframe containing the data used in the plot
- DataFrame: Stats dataframe containing the mean, min, max, and overall average
"""
# Filter for parameter data
param_data = df[df["Org_Analyte_Name"] == parameter].copy()
if param_data.empty:
raise ValueError(f"No data found for parameter: {parameter}")
# Add month and quarter columns
param_data["Month"] = param_data["Activity_Start_Date_Time"].dt.month
param_data["Quarter"] = param_data["Activity_Start_Date_Time"].dt.quarter
# Group by period
if period.lower() == "monthly":
group_col = "Month"
x_ticks = range(1, 13)
x_label = "Month"
else: # quarterly
group_col = "Quarter"
x_ticks = range(1, 5)
x_label = "Quarter"
# Calculate means, min, and max
stats_df = (
param_data.groupby(group_col, observed=True)["Org_Result_Value"]
.agg(["mean", "min", "max"])
.reset_index()
)
# Calculate overall average for dotted line
stats_df["overall_avg"] = param_data["Org_Result_Value"].mean()
fig, ax = plt.subplots(figsize=(10, 6))
# Get the unit
unit = param_data["Org_Result_Unit"].iloc[0]
# Set log scale for specific parameters
if parameter in [
"Turbidity",
"Fecal Coliform (MPN)",
"Total Nitrogen",
"Total Phosphorus",
]:
ax.set_yscale("log")
ax.yaxis.set_major_formatter(
plt.ScalarFormatter() # type: ignore
)
# Plot mean line
mean_line = ax.plot(
stats_df[group_col],
stats_df["mean"],
"b-",
linewidth=2,
marker="s",
label="Mean",
zorder=3,
)[0]
# Add label at the beginning of mean line
ax.annotate(
"Mean",
xy=(stats_df[group_col].iloc[0], stats_df["mean"].iloc[0]),
xytext=(-5, 0),
textcoords="offset points",
ha="right",
va="center",
color=mean_line.get_color(),
fontsize=9,
)
# Plot min line
min_line = ax.plot(
stats_df[group_col],
stats_df["min"],
"--",
color="gray",
linewidth=1,
label="Min",
zorder=2,
)[0]
# Add label at the end of min line
ax.annotate(
"Min",
xy=(stats_df[group_col].iloc[-1], stats_df["min"].iloc[-1]),
xytext=(5, 0),
textcoords="offset points",
va="center",
color=min_line.get_color(),
fontsize=9,
)
# Plot max line
max_line = ax.plot(
stats_df[group_col],
stats_df["max"],
"--",
color="orange",
linewidth=1,
label="Max",
zorder=2,
)[0]
# Add label at the end of max line
ax.annotate(
"Max",
xy=(stats_df[group_col].iloc[-1], stats_df["max"].iloc[-1]),
xytext=(5, 0),
textcoords="offset points",
va="center",
color=max_line.get_color(),
fontsize=9,
)
# Add overall average line
avg_value = stats_df["overall_avg"].iloc[0]
ax.axhline(
y=avg_value,
color="blue",
linestyle=":",
alpha=0.5,
linewidth=1,
label="Average",
zorder=1,
)
# Add label for overall average below the line
ax.annotate(
"Average",
xy=(stats_df[group_col].iloc[-1], avg_value),
xytext=(27, -5), # Moved down 5 points
textcoords="offset points",
va="top", # Text aligns above the point
ha="right", # Right-align the text
color="blue",
alpha=0.5,
fontsize=9,
)
# Remove the legend if it exists
legend = ax.get_legend()
if legend is not None:
legend.remove()
# Add threshold line if specified
if thresh is not None:
ax.axhline(
y=thresh,
color="red",
linestyle=":",
alpha=0.9,
linewidth=1.5,
label=f"Threshold: {thresh} {unit}",
zorder=1,
)
# Add legend for threshold only
ax.legend(
[
ax.axhline(
y=thresh, color="red", linestyle=":", alpha=0.9, linewidth=1.5
)
],
[f"Threshold: {thresh} {unit}"],
loc="upper right",
frameon=False,
handletextpad=0.5,
fontsize=9,
)
# Customize plot
ax.set_xticks(x_ticks)
if period.lower() == "quarterly":
# Convert quarters to seasons
season_labels = ["Spring", "Summer", "Fall", "Winter"]
ax.set_xticklabels(season_labels)
# Remove x-axis tick marks for quarterly view
ax.tick_params(axis="x", which="both", length=0)
ax.set_xlabel(x_label)
# Add secondary y-axis for temperature if unit is Celsius
if unit == "deg C":
def celsius_to_fahrenheit(temp_c):
return (temp_c * 9 / 5) + 32
# Get the primary y-axis limits
y1_min, y1_max = ax.get_ylim()
# Create secondary axis that aligns with primary axis values
ax2 = ax.secondary_yaxis(
"right",
functions=(celsius_to_fahrenheit, lambda f: (f - 32) * 5 / 9), # type: ignore
)
# Set the same limits as primary axis but converted to Fahrenheit
ax2.set_ylim(celsius_to_fahrenheit(y1_min), celsius_to_fahrenheit(y1_max))
# Get primary axis ticks and convert them for secondary axis
primary_ticks = ax.get_yticks()
ax2.set_yticks([celsius_to_fahrenheit(t) for t in primary_ticks])
# Format tick labels with degree symbols
ax.yaxis.set_major_formatter(lambda x, p: f"{x:.0f}°C")
ax2.yaxis.set_major_formatter(lambda x, p: f"{x:.0f}°F")
# Remove right spine for consistency
ax2.spines["right"].set_visible(False)
# Remove tick marks but keep labels
ax2.tick_params(axis="y", which="both", length=0)
# Add secondary y-axis for depth if unit is feet
elif unit == "ft":
def feet_to_meters(feet):
return feet * 0.3048
ax2 = ax.secondary_yaxis(
"right",
functions=(feet_to_meters, lambda m: m / 0.3048), # type: ignore
)
ax2.set_ylabel("Depth (m)")
ax.set_ylabel("Depth (ft)")
# Remove right spine for consistency
ax2.spines["right"].set_visible(False)
# Remove tick marks but keep labels
ax2.tick_params(axis="y", which="both", length=0)
else:
ax.set_ylabel(f"{unit}")
# Get year range for title
start_year = param_data["Activity_Start_Date_Time"].dt.year.min()
end_year = param_data["Activity_Start_Date_Time"].dt.year.max()
year_range = (
f" ({start_year}-{end_year})" if start_year != end_year else f" ({start_year})"
)
title = f"Seasonal {parameter} Trends{year_range}"
if sector:
title = f"{title} - {sector}"
ax.set_title(title)
ax.grid(True, axis="y", alpha=0.15, linestyle="-", color="gray")
# Customize spines
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
ax.spines["left"].set_visible(False)
# Remove tick marks but keep labels
ax.tick_params(axis="y", which="both", length=0)
# Adjust layout based on unit type
if unit == "deg C":
plt.tight_layout(rect=(0, 0, 0.95, 1))
else:
plt.tight_layout(rect=(0, 0, 0.9, 1))
stats_df.insert(0, "parameter", parameter)
return fig, param_data, stats_df
@timer(include_params=True)
def plot_sector_line_charts(
df: pd.DataFrame,
parameter: str,
show_sem: bool = True,
panel_chart: bool = False,
color_scale: list[str] = COLOR_SCALE,
) -> tuple[Figure, pd.DataFrame, pd.DataFrame]:
"""
Create a plot of mean annual parameter trends by sector.
Parameters:
-----------
df : pd.DataFrame
Input dataframe
parameter : str
Name of the parameter to plot
show_sem : bool, default=True
Whether to show the standard error of the mean bands
panel_chart : bool, default=False
If True, creates a grid of individual sector charts instead of overlapping lines
Returns:
--------
tuple[Figure, pd.DataFrame, pd.DataFrame]
- Figure: Matplotlib figure containing the line chart(s)
- DataFrame: Filtered dataframe containing the data used in the plot
- DataFrame: Contains the plotted data points with means and standard errors
"""
GREY10 = "#1a1a1a" # noqa: F841
GREY30 = "#4d4d4d" # noqa: F841
GREY40 = "#666666" # noqa: F841
GREY75 = "#bfbfbf" # noqa: F841
GREY91 = "#e8e8e8" # noqa: F841
# 1. Data preparation
param_data = df[df["Org_Analyte_Name"] == parameter].copy()
if parameter == "Salinity":
param_data = param_data[param_data["Sector"] != "Freshwater Lakes"]
sectors = sorted(param_data["Sector"].unique())
years = sorted(param_data["Reporting_Year"].unique())
param_unit = param_data["Org_Result_Unit"].iloc[0] if not param_data.empty else ""
# 2. Compute all sector data
sector_data_dict = {}
for sector in sectors:
sector_data = (
param_data[param_data["Sector"] == sector]
.groupby("Reporting_Year", observed=True)["Org_Result_Value"]
.agg(["mean", "sem"])
.reset_index()
)
sector_data["Sector"] = sector
sector_data_dict[sector] = sector_data
# 3. Determine global y-limits
use_log_scale = parameter in [
"Turbidity",
"Fecal Coliform (MPN)",
"Total Nitrogen",
"Total Phosphorus",
]
y_min = float("inf")
y_max = float("-inf")
for data in sector_data_dict.values():
if not data.empty:
y_min = min(y_min, (data["mean"] - data["sem"]).min())
y_max = max(y_max, (data["mean"] + data["sem"]).max())
# Add padding to y-axis limits
if use_log_scale:
y_min = y_min / 1.2
y_max = y_max * 1.2
else:
y_range = y_max - y_min
y_min = y_min - (y_range * 0.05)
y_max = y_max + (y_range * 0.05)
# 4. Create figure and determine layout
if panel_chart:
n_cols = min(3, len(sectors))
n_rows = (len(sectors) + n_cols - 1) // n_cols
fig = plt.figure(figsize=(5 * n_cols, 3 * n_rows))
else:
fig, main_ax = plt.subplots(figsize=(14, 4))
# 5. Helper function to plot a single sector
def plot_sector_on_axis(
ax: plt.Axes, # type: ignore
sector_data: pd.DataFrame,
color: str,
show_label: bool = False,
):
line = ax.plot(
sector_data["Reporting_Year"],
sector_data["mean"],
"-o",
color=color,
label=sector if show_label else None,
markersize=4,
linewidth=2,
)
if show_sem:
ax.fill_between(
sector_data["Reporting_Year"],
sector_data["mean"] - sector_data["sem"],
sector_data["mean"] + sector_data["sem"],
color=color,
alpha=0.15,
)
# Configure axis
ax.grid(True, axis="y", which="major", alpha=0.2, linestyle="--")
ax.grid(True, axis="y", which="minor", alpha=0.1, linestyle="--")
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
ax.spines["left"].set_visible(False)
ax.spines["bottom"].set_color(GREY40)
ax.tick_params(axis="both", which="both", length=0, colors=GREY40)
ax.set_xticks(years)
if use_log_scale:
ax.set_yscale("log")
ax.set_ylim(y_min, y_max)
def format_func(x, _):
# Determine if we need decimal places based on data range
min_value = min(sector_data["mean"].min(), y_min)
needs_decimals = min_value < 1 or not all(
val.is_integer() for val in sector_data["mean"]
)
if x == 0:
return "0"
elif needs_decimals:
return f"{x:.1f}"
else:
return f"{int(x)}"
ax.yaxis.set_major_formatter(plt.FuncFormatter(format_func)) # type: ignore
# Calculate the range ratio and absolute values
range_ratio = y_max / y_min
abs_min = min(abs(sector_data["mean"].min()), abs(y_min))
abs_max = max(abs(sector_data["mean"].max()), abs(y_max))
if parameter == "Total Phosphorus":
# Custom ticks for Total Phosphorus
major_ticks = np.array([10, 13, 15, 17, 20, 30, 40, 50])
major_ticks = major_ticks[
(major_ticks >= y_min * 0.9) & (major_ticks <= y_max * 1.1)
]
ax.yaxis.set_major_locator(plt.FixedLocator(major_ticks)) # type: ignore
ax.yaxis.set_minor_locator(plt.NullLocator()) # type: ignore
elif abs_min >= 100:
# For larger numbers (e.g., Total Nitrogen)
major_ticks = np.array([100, 200, 300, 400, 500])
major_ticks = major_ticks[
(major_ticks >= y_min * 0.9) & (major_ticks <= y_max * 1.1)
]
ax.yaxis.set_major_locator(plt.FixedLocator(major_ticks)) # type: ignore
ax.yaxis.set_minor_locator(plt.NullLocator()) # type: ignore
elif abs_min >= 10 and abs_max <= 100:
# For medium numbers (excluding Total Phosphorus)
major_ticks = np.array([10, 20, 30, 40, 50, 60, 70, 80, 90, 100])
major_ticks = major_ticks[
(major_ticks >= y_min * 0.9) & (major_ticks <= y_max * 1.1)
]
ax.yaxis.set_major_locator(plt.FixedLocator(major_ticks)) # type: ignore
ax.yaxis.set_minor_locator(plt.NullLocator()) # type: ignore
elif range_ratio > 10:
# Wide range but smaller numbers (e.g., Turbidity)
ax.yaxis.set_major_locator(plt.LogLocator(base=10.0, numticks=5)) # type: ignore
ax.yaxis.set_minor_locator(
plt.LogLocator(base=10.0, subs=(2, 5), numticks=5) # type: ignore
)
ax.yaxis.set_minor_formatter(plt.FuncFormatter(format_func)) # type: ignore
else:
# Narrow range with small numbers
if y_min < 1:
major_ticks = np.array([0.5, 1, 1.5, 2, 2.5, 3, 4, 5])
else:
major_ticks = np.arange(
np.floor(y_min),
np.ceil(y_max) + 1,
1 if y_max - y_min < 5 else 2,
)
major_ticks = major_ticks[
(major_ticks >= y_min * 0.9) & (major_ticks <= y_max * 1.1)
]
ax.yaxis.set_major_locator(plt.FixedLocator(major_ticks)) # type: ignore
ax.yaxis.set_minor_locator(plt.NullLocator()) # type: ignore
# Adjust tick parameters
ax.tick_params(axis="y", which="both", labelsize=9)
else:
ax.set_ylim(y_min, y_max)
# Determine if we need decimal places for linear scale
min_value = min(sector_data["mean"].min(), y_min)
needs_decimals = min_value < 1 or not all(
val.is_integer() for val in sector_data["mean"]
)
def linear_format_func(x, _):
if needs_decimals:
return f"{x:.1f}"
return f"{int(x)}"
ax.yaxis.set_major_formatter(plt.FuncFormatter(linear_format_func)) # type: ignore
return line
# 6. Plot sectors
# custom_colors = [
# "#1f77b4",
# "#ff7f0e",
# "#2ca02c",
# "#d62728",
# "#9467bd",
# "#8c564b",
# "#e377c2",
# "#7f7f7f",
# ]
for i, (sector, color) in enumerate(zip(sectors, color_scale)):
sector_data = sector_data_dict[sector]
if sector_data.empty:
continue
if panel_chart:
ax = fig.add_subplot(n_rows, n_cols, i + 1)
plot_sector_on_axis(ax, sector_data, color)
ax.set_title(sector, pad=10, fontsize=10, color=GREY30)
# Limit number of x-axis ticks to maximum of 8
if len(years) > 8:
# Show roughly every nth tick to get 8 or fewer ticks
n = len(years) // 8 + 1
visible_ticks = years[::n]
ax.set_xticks(visible_ticks)
ax.set_xticklabels(visible_ticks, rotation=0, weight=500, color=GREY40)
# Show tick marks since we're hiding some labels
ax.tick_params(axis="x", which="major", length=4, colors=GREY40)
else:
ax.set_xticklabels(years, rotation=0, weight=500, color=GREY40)
# Hide tick marks when showing all labels
ax.tick_params(axis="x", which="major", length=0)
else:
plot_sector_on_axis(main_ax, sector_data, color, show_label=True)
# 7. Final customization
if panel_chart:
title = f"{parameter}{' (' + param_unit + ')' if param_unit else ''}"
fig.suptitle(title, fontsize=14, y=1.02, color=GREY30) # Updated color
else:
main_ax.set_title(
parameter, pad=10, fontsize=14, fontweight="normal", color=GREY30
) # Updated color
main_ax.set_ylabel(param_unit, fontsize=12, color=GREY40)
main_ax.set_xticklabels(years, weight=500, color=GREY40)
main_ax.yaxis.label.set_color(GREY40)
main_ax.legend(
bbox_to_anchor=(1.05, 1),
loc="upper left",
borderaxespad=0.0,
frameon=False,
fontsize=9,
)
if use_log_scale:
main_ax.yaxis.set_major_formatter(plt.ScalarFormatter()) # type: ignore
main_ax.yaxis.get_major_formatter().set_scientific(False) # type: ignore
plt.tight_layout()
# 8. Prepare return data
plot_data = pd.concat(sector_data_dict.values(), ignore_index=True)
plot_data.insert(0, "parameter", parameter)
return fig, param_data, plot_data
@timer(include_params=True)
def plot_sector_box_charts(
df: pd.DataFrame,
parameter: str,
color_scale: list[str] = COLOR_SCALE,
show_trend: bool = True, # New parameter
) -> tuple[Figure, pd.DataFrame, pd.DataFrame]:
"""
Create box plots showing the distribution of parameter values by sector and year,
with optional trend lines and statistics.
Parameters:
-----------
df : pd.DataFrame
Input dataframe
parameter : str
Parameter to plot
color_scale : list[str]
List of colors to use for sectors
show_trend : bool, default=True
Whether to show trend lines and statistics
Returns:
--------
tuple[Figure, pd.DataFrame, pd.DataFrame]
- Figure: Matplotlib figure containing the box plots
- DataFrame: Filtered dataframe containing the raw data used in the plot
- DataFrame: Contains the plotted data points: mean, median, and quartiles
"""
from scipy import stats
# Define consistent colors for styling
GREY30 = "#4d4d4d"
GREY40 = "#666666"
# Filter data for parameter
param_data = df[df["Org_Analyte_Name"] == parameter].copy()
# For Salinity, exclude Fresh Water Lakes
if parameter == "Salinity":
param_data = param_data[param_data["Sector"] != "Freshwater Lakes"]
# Calculate year and prepare data
param_data["Reporting_Year"] = param_data["Activity_Start_Date_Time"].dt.year
sectors = sorted(param_data["Sector"].unique())
years = sorted(param_data["Reporting_Year"].unique())
# Determine if log scale should be used
use_log_scale = parameter in [
"Turbidity",
"Fecal Coliform (MPN)",
"Total Nitrogen",
"Total Phosphorus",
]
# Create figure with single column layout - increased width from 8 to 12
fig = plt.figure(figsize=(15, 2.5 * len(sectors)))
# Create box plots
for idx, sector in enumerate(sectors):
ax = plt.subplot(len(sectors), 1, idx + 1)
sector_data = param_data[param_data["Sector"] == sector]
bp = ax.boxplot( # noqa: F841
[
sector_data[sector_data["Reporting_Year"] == year][
"Org_Result_Value"
].dropna()
for year in years
],
labels=years, # type: ignore
patch_artist=True,
medianprops=dict(color="black"),
flierprops=dict(
marker="o",
markerfacecolor=color_scale[idx],
alpha=0.5,
markersize=4,
),
boxprops=dict(facecolor=color_scale[idx], alpha=0.6),
widths=0.6,
positions=range(len(years)),
)
# Only add trend line and stats if show_trend is True
if show_trend:
# Calculate annual means for trend line
annual_means = [
sector_data[sector_data["Reporting_Year"] == year][
"Org_Result_Value"
].mean()
for year in years
]
# Remove any NaN values for regression
valid_points = [
(x, y) for x, y in enumerate(annual_means) if not np.isnan(y)
]
if valid_points:
x_valid, y_valid = zip(*valid_points)
# Perform linear regression
slope, intercept, r_value, p_value, std_err = stats.linregress(
x_valid, y_valid
)
# Plot trend line
line_x = np.array(x_valid)
line_y = slope * line_x + intercept
ax.plot(line_x, line_y, "--", color="red", alpha=0.7, linewidth=1.5)
# Add statistics text
stats_text = f"R² = {r_value**2:.3f}\np = {p_value:.3f}" # type: ignore
ax.text(
0.02,
0.98,
stats_text,
transform=ax.transAxes,
verticalalignment="top",
fontsize=8,
bbox=dict(facecolor="white", alpha=0.8, edgecolor="none"),
)
# Set proper x-axis limits with padding
ax.set_xlim(-0.5, len(years) - 0.5)
ax.set_title(sector, pad=10, fontsize=10, color=GREY30)
if use_log_scale:
ax.set_yscale("log")
# Customize appearance
ax.grid(True, axis="y", alpha=0.15, linestyle="-", color="gray")
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
ax.spines["left"].set_visible(False)
ax.spines["bottom"].set_color(GREY40)
ax.spines["bottom"].set_linewidth(0.5)
# Customize tick parameters
ax.tick_params(axis="both", which="both", length=0, colors=GREY40)
ax.set_xticks(range(len(years)))
ax.set_xticklabels(years, ha="center", weight=500, color=GREY40)
# Add overall title
fig.suptitle(
f"{parameter} Distribution by Sector", fontsize=14, y=1.02, color=GREY30
)
# Adjust layout - removed bottom adjustment since we no longer have rotated labels
plt.tight_layout()
plt.subplots_adjust(hspace=0.4)
# Create stats DataFrame to store box plot statistics
stats_data = []
for sector in sectors:
sector_data = param_data[param_data["Sector"] == sector]
for year in years:
year_data = sector_data[sector_data["Reporting_Year"] == year][
"Org_Result_Value"
]
if not year_data.empty:
stats = {
"Sector": sector,
"Reporting_Year": year,
"mean": year_data.mean(),
"median": year_data.median(),
"q1": year_data.quantile(0.25),
"q3": year_data.quantile(0.75),
"min": year_data.min(),
"max": year_data.max(),
"count": len(year_data),
}
stats_data.append(stats)
# Create stats DataFrame and add parameter column
stats_df = pd.DataFrame(stats_data)
stats_df.insert(0, "parameter", parameter)
return fig, param_data, stats_df
@timer(include_params=True)
def plot_sector_heatmap(
df: pd.DataFrame,
parameter: str,
show_values: bool = False,
) -> tuple[Figure, pd.DataFrame, pd.DataFrame]:
"""
Create a heatmap showing annual means by sector and year.
Parameters:
-----------
df : pd.DataFrame
Input dataframe
parameter : str
Name of the parameter to plot
show_values : bool, default=False
Whether to display mean values inside each cell
Returns:
--------
tuple[Figure, pd.DataFrame, pd.DataFrame]
- Figure: Matplotlib figure containing the heatmap
- DataFrame: Filtered dataframe containing the raw data used in the plot
- DataFrame: Contains the plotted data points: mean values for each sector and year
"""
# Filter data for selected parameter
param_data = df[df["Org_Analyte_Name"] == parameter].copy()
# For Salinity, exclude Fresh Water Lakes
if parameter == "Salinity":
param_data = param_data[param_data["Sector"] != "Fresh Water Lakes"]
# Calculate annual means
plot_data = (
param_data.groupby(["Reporting_Year", "Sector"], observed=True)[
"Org_Result_Value"
]
.mean()
.reset_index()
.pivot(index="Sector", columns="Reporting_Year", values="Org_Result_Value")
)
# Create figure with extra space at bottom for colorbar
fig, ax = plt.subplots(figsize=(12, len(plot_data) * 0.8))
# Create heatmap with small gaps between cells
im = ax.imshow(plot_data, aspect="auto", cmap="YlOrRd")
# Customize appearance
ax.set_xticks(np.arange(len(plot_data.columns)))
ax.set_yticks(np.arange(len(plot_data.index)))
ax.set_xticklabels(plot_data.columns)
ax.set_yticklabels(plot_data.index)
# Remove all spines
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
ax.spines["left"].set_visible(False)
ax.spines["bottom"].set_visible(False)
# Remove all tick marks but keep labels
ax.tick_params(axis="both", which="both", length=0)
# Add small gaps between cells
ax.set_xticks(np.arange(plot_data.shape[1] + 1) - 0.5, minor=True)
ax.set_yticks(np.arange(plot_data.shape[0] + 1) - 0.5, minor=True)
ax.grid(which="minor", color="w", linestyle="-", linewidth=2)
# Set x-axis labels horizontal
plt.setp(ax.get_xticklabels(), rotation=0)
# Add value annotations if requested
if show_values:
for i in range(len(plot_data.index)):
for j in range(len(plot_data.columns)):
value = plot_data.iloc[i, j]
if not pd.isna(value):
text = f"{value:.1f}"
ax.text(j, i, text, ha="center", va="center", color="black")
# Add colorbar at the bottom with reduced padding and no border
cbar = ax.figure.colorbar(im, ax=ax, orientation="horizontal", pad=0.1) # type: ignore
unit = param_data["Org_Result_Unit"].iloc[0] if not param_data.empty else ""
cbar.ax.set_xlabel(f"Mean ({unit})")
cbar.outline.set_visible(False) # type: ignore
# Set title
ax.set_title(parameter)
plt.tight_layout()
# Reset index to make Sector a column and add parameter column
plot_data = plot_data.reset_index()
plot_data.insert(0, "parameter", parameter)
return fig, param_data, plot_data