Spaces:
Sleeping
Sleeping
File size: 4,033 Bytes
0d4a0ba | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 | import pandas as pd
import altair as alt
from src.utils import PERIODS, CURRENT_YEAR, UNIT_NAME_COL
def build_trend_chart(trend_df: pd.DataFrame, species_name: str,
year_from: int = None, year_to: int = None) -> alt.Chart:
base = alt.Chart(trend_df).encode(
x=alt.X("year:O", title="Jahr", axis=alt.Axis(labelAngle=-45)),
y=alt.Y("count:Q", title="Anzahl Funde", scale=alt.Scale(zero=True)),
tooltip=[
alt.Tooltip("year:O", title="Jahr"),
alt.Tooltip("count:Q", title="Funde"),
],
)
line = base.mark_line(color="#00cc6a", strokeWidth=2)
points = base.mark_circle(color="#00cc6a", size=40)
current_data = trend_df[trend_df["year"] == CURRENT_YEAR]
current_point = alt.Chart(current_data).mark_circle(
color="#ff6b35", size=80
).encode(
x=alt.X("year:O"),
y=alt.Y("count:Q"),
tooltip=[alt.Tooltip("year:O", title="Jahr"), alt.Tooltip("count:Q", title="Funde")],
)
y_label = f"{year_from or PERIODS[0][0]}–{year_to or CURRENT_YEAR}"
chart = (line + points + current_point).properties(
title=alt.TitleParams(
text=f"Fundtrend: {species_name}",
subtitle=f"Jährliche GBIF-Funde {y_label} | Rot = aktuelles Jahr",
fontSize=14,
subtitleFontSize=11,
),
height=300,
).interactive()
return chart
def prepare_per_unit_data(
df: pd.DataFrame,
unit_col: str,
max_units: int = 20,
) -> tuple[list[str], pd.DataFrame]:
"""Return (ordered list of top unit names, aggregated year/count DataFrame).
The returned DataFrame has columns [unit_col, 'year', 'count'].
Units are ordered by total descending.
"""
if unit_col not in df.columns or "year" not in df.columns:
return [], pd.DataFrame()
data = (
df.dropna(subset=["year", unit_col])
.assign(year=lambda d: d["year"].astype(int))
.groupby([unit_col, "year"])
.size()
.reset_index(name="count")
)
if data.empty:
return [], pd.DataFrame()
top_units = (
data.groupby(unit_col)["count"].sum()
.nlargest(max_units).index.tolist()
)
data = data[data[unit_col].isin(top_units)].copy()
return top_units, data
def build_unit_row_chart(unit_data: pd.DataFrame,
y_max: int | None = None) -> alt.Chart:
"""Full-width line chart for a single geographic unit, same style as overall trend.
y_max: shared Y-axis maximum across all unit charts for comparability.
"""
y_scale = alt.Scale(zero=True, domain=[0, y_max]) if y_max is not None else alt.Scale(zero=True)
base = alt.Chart(unit_data).encode(
x=alt.X("year:O", title="Jahr", axis=alt.Axis(labelAngle=-45)),
y=alt.Y("count:Q", title="Funde", scale=y_scale),
tooltip=[
alt.Tooltip("year:O", title="Jahr"),
alt.Tooltip("count:Q", title="Funde"),
],
)
line = base.mark_line(color="#00cc6a", strokeWidth=2)
points = base.mark_circle(color="#00cc6a", size=40)
current_data = unit_data[unit_data["year"] == CURRENT_YEAR]
current_point = alt.Chart(current_data).mark_circle(
color="#ff6b35", size=80
).encode(
x=alt.X("year:O"),
y=alt.Y("count:Q", scale=y_scale),
tooltip=[alt.Tooltip("year:O", title="Jahr"), alt.Tooltip("count:Q", title="Funde")],
)
return (line + points + current_point).properties(
height=140,
).interactive()
def compute_trend_from_df(df: pd.DataFrame) -> pd.DataFrame:
"""Count occurrences per year from any DataFrame with a 'year' column."""
if df.empty or "year" not in df.columns:
return pd.DataFrame(columns=["year", "count"])
counts = (
df.dropna(subset=["year"])
.assign(year=lambda d: d["year"].astype(int))
.groupby("year")
.size()
.reset_index(name="count")
.sort_values("year")
)
return counts
|