ishaq101's picture
feat/Planner Agent (#2)
81e5fe7
Raw
History Blame
6.75 kB
"""analyze_trend β€” time-series trend over a period (KM-608).
An analytical "family" tool: in ONE call it buckets rows into time periods
(day/week/month/quarter/year), aggregates a value per period, and summarizes
the movement (first vs last, absolute & percent change, direction, linear
slope). Answers questions like "how did revenue trend month over month?".
STATUS: compute layer only β€” the function takes an already-materialized
DataFrame. The wrapper layer (fetching data from the catalog via source_id,
the ToolOutput envelope, ToolSpec registration) is added once the Planner
seam (KM-418) is settled. Keeping compute separate from data-fetching makes
this function easy to unit-test in isolation and stable when wrapped.
"""
from __future__ import annotations
import numpy as np
import pandas as pd
from src.tools.analytics.descriptive import ColumnNotFoundError
# Friendly period name -> pandas resample rule. Using the non-deprecated
# pandas 2.2 codes (ME/QE/YE) avoids FutureWarnings.
FREQ_MAP = {
"day": "D",
"week": "W",
"month": "ME",
"quarter": "QE",
"year": "YE",
}
# How to aggregate the value within each period.
SUPPORTED_AGGS = ("sum", "mean", "count", "min", "max", "median")
class InvalidFrequencyError(ValueError):
"""The requested period is not in FREQ_MAP (maps to error_code INVALID_FREQUENCY)."""
class UnsupportedAggregationError(ValueError):
"""The requested aggregation is not supported (maps to error_code UNSUPPORTED_AGG)."""
def _clean(value: object) -> object:
"""Convert numpy scalars to plain Python; NaN -> None for JSON-clean output."""
if value is None:
return None
if isinstance(value, float) and np.isnan(value):
return None
if hasattr(value, "item"):
value = value.item()
return None if isinstance(value, float) and np.isnan(value) else value
return value
def _period_label(ts: pd.Timestamp, freq: str) -> str:
"""Human-readable period label keyed off the friendly frequency name."""
if freq == "month":
return str(ts.strftime("%Y-%m"))
if freq == "quarter":
return f"{ts.year}-Q{ts.quarter}"
if freq == "year":
return str(ts.strftime("%Y"))
return str(ts.strftime("%Y-%m-%d")) # day / week
# Prompt-style description read by the Planner to decide WHEN to pick this tool.
# Final destination is ToolSpec.description once the wrapper layer is built.
DESCRIPTION = """\
Summary: Time-series trend of one metric over evenly-spaced periods (day, week, \
month, quarter, year). Reports per-period points plus direction, absolute and \
percent change, and a linear slope.
USE WHEN the question is about movement over time β€” growth, decline, trend, \
seasonality. Trigger words: "over time" (dari waktu ke waktu), "trend" (tren), \
"monthly/yearly" (bulanan/tahunan), "growth" (pertumbuhan), "since/last N months".
DON'T USE WHEN:
- it groups by a non-time category -> analyze_aggregate
- it compares two specific groups (A vs B) -> analyze_comparison
- it summarizes a column with no time axis -> analyze_descriptive
Example questions:
- "how did monthly revenue change this year?"
- "show the sales trend over the last 12 months"
- "is the number of signups growing quarter over quarter?"
- "yearly profit from 2019 to 2024"
"""
def analyze_trend(
df: pd.DataFrame,
date_column: str,
value_column: str,
freq: str = "month",
agg: str = "sum",
) -> dict[str, object]:
"""Time-series trend of one value over evenly-spaced periods.
Args:
df: already-materialized data (in the real system the wrapper fetches
this from a source_id).
date_column: column holding dates/timestamps.
value_column: numeric column to aggregate per period.
freq: period granularity β€” one of FREQ_MAP keys (default "month").
agg: how to aggregate within a period β€” one of SUPPORTED_AGGS.
Returns:
dict with:
freq, agg β€” echo of the chosen settings
points β€” [{"period": str, "value": number|None}, ...]
first, last β€” value of the first/last non-empty period
change_abs β€” last - first
change_pct β€” (last - first) / first, or None if first == 0
direction β€” "up" | "down" | "flat"
slope β€” linear slope across periods, or None if < 2 points
Raises:
ColumnNotFoundError: if date_column or value_column is absent.
InvalidFrequencyError: if freq is not a known period.
UnsupportedAggregationError: if agg is not supported.
"""
missing = [c for c in (date_column, value_column) if c not in df.columns]
if missing:
raise ColumnNotFoundError(f"columns not found: {missing}")
if freq not in FREQ_MAP:
raise InvalidFrequencyError(
f"unknown frequency '{freq}'; supported: {list(FREQ_MAP)}"
)
if agg not in SUPPORTED_AGGS:
raise UnsupportedAggregationError(
f"unsupported aggregation '{agg}'; supported: {list(SUPPORTED_AGGS)}"
)
# Build a clean datetime-indexed series, then resample into periods.
s = df[[date_column, value_column]].copy()
s[date_column] = pd.to_datetime(s[date_column])
s = s.dropna(subset=[date_column]).set_index(date_column).sort_index()
resampled = s[value_column].resample(FREQ_MAP[freq]).agg(agg)
points = [
{"period": _period_label(ts, freq), "value": _clean(val)}
for ts, val in resampled.items()
]
# Summary stats are computed over non-empty periods only.
non_null = resampled.dropna()
first: float | None
last: float | None
change_abs: float | None
change_pct: float | None
slope: float | None
if non_null.empty:
first = last = change_abs = change_pct = slope = None
direction = "flat"
else:
first = float(non_null.iloc[0])
last = float(non_null.iloc[-1])
change_abs = last - first
change_pct = (change_abs / first) if first != 0 else None
if change_abs > 0:
direction = "up"
elif change_abs < 0:
direction = "down"
else:
direction = "flat"
if non_null.shape[0] > 1:
x = np.arange(non_null.shape[0])
slope = float(np.polyfit(x, non_null.to_numpy(dtype=float), 1)[0])
else:
slope = None
return {
"freq": freq,
"agg": agg,
"points": points,
"first": first,
"last": last,
"change_abs": change_abs,
"change_pct": change_pct,
"direction": direction,
"slope": slope,
}