"""analyze_trend — time-series trend over a period (KM-608). An analytical "family" tool: in ONE call it buckets rows into time periods (day/week/month/quarter/year), aggregates a value per period, and summarizes the movement (first vs last, absolute & percent change, direction, linear slope). Answers questions like "how did revenue trend month over month?". STATUS: compute layer only — the function takes an already-materialized DataFrame. The wrapper layer (fetching data from the catalog via source_id, the ToolOutput envelope, ToolSpec registration) is added once the Planner seam (KM-418) is settled. Keeping compute separate from data-fetching makes this function easy to unit-test in isolation and stable when wrapped. """ from __future__ import annotations import numpy as np import pandas as pd from src.tools.analytics.descriptive import ColumnNotFoundError # Friendly period name -> pandas resample rule. Using the non-deprecated # pandas 2.2 codes (ME/QE/YE) avoids FutureWarnings. FREQ_MAP = { "day": "D", "week": "W", "month": "ME", "quarter": "QE", "year": "YE", } # How to aggregate the value within each period. SUPPORTED_AGGS = ("sum", "mean", "count", "min", "max", "median") class InvalidFrequencyError(ValueError): """The requested period is not in FREQ_MAP (maps to error_code INVALID_FREQUENCY).""" class UnsupportedAggregationError(ValueError): """The requested aggregation is not supported (maps to error_code UNSUPPORTED_AGG).""" def _clean(value: object) -> object: """Convert numpy scalars to plain Python; NaN -> None for JSON-clean output.""" if value is None: return None if isinstance(value, float) and np.isnan(value): return None if hasattr(value, "item"): value = value.item() return None if isinstance(value, float) and np.isnan(value) else value return value def _period_label(ts: pd.Timestamp, freq: str) -> str: """Human-readable period label keyed off the friendly frequency name.""" if freq == "month": return str(ts.strftime("%Y-%m")) if freq == "quarter": return f"{ts.year}-Q{ts.quarter}" if freq == "year": return str(ts.strftime("%Y")) return str(ts.strftime("%Y-%m-%d")) # day / week # Prompt-style description read by the Planner to decide WHEN to pick this tool. # Final destination is ToolSpec.description once the wrapper layer is built. DESCRIPTION = """\ Summary: Time-series trend of one metric over evenly-spaced periods (day, week, \ month, quarter, year). Reports per-period points plus direction, absolute and \ percent change, and a linear slope. USE WHEN the question is about movement over time — growth, decline, trend, \ seasonality. Trigger words: "over time" (dari waktu ke waktu), "trend" (tren), \ "monthly/yearly" (bulanan/tahunan), "growth" (pertumbuhan), "since/last N months". DON'T USE WHEN: - it groups by a non-time category -> analyze_aggregate - it compares two specific groups (A vs B) -> analyze_comparison - it summarizes a column with no time axis -> analyze_descriptive Example questions: - "how did monthly revenue change this year?" - "show the sales trend over the last 12 months" - "is the number of signups growing quarter over quarter?" - "yearly profit from 2019 to 2024" """ def analyze_trend( df: pd.DataFrame, date_column: str, value_column: str, freq: str = "month", agg: str = "sum", ) -> dict[str, object]: """Time-series trend of one value over evenly-spaced periods. Args: df: already-materialized data (in the real system the wrapper fetches this from a source_id). date_column: column holding dates/timestamps. value_column: numeric column to aggregate per period. freq: period granularity — one of FREQ_MAP keys (default "month"). agg: how to aggregate within a period — one of SUPPORTED_AGGS. Returns: dict with: freq, agg — echo of the chosen settings points — [{"period": str, "value": number|None}, ...] first, last — value of the first/last non-empty period change_abs — last - first change_pct — (last - first) / first, or None if first == 0 direction — "up" | "down" | "flat" slope — linear slope across periods, or None if < 2 points Raises: ColumnNotFoundError: if date_column or value_column is absent. InvalidFrequencyError: if freq is not a known period. UnsupportedAggregationError: if agg is not supported. """ missing = [c for c in (date_column, value_column) if c not in df.columns] if missing: raise ColumnNotFoundError(f"columns not found: {missing}") if freq not in FREQ_MAP: raise InvalidFrequencyError( f"unknown frequency '{freq}'; supported: {list(FREQ_MAP)}" ) if agg not in SUPPORTED_AGGS: raise UnsupportedAggregationError( f"unsupported aggregation '{agg}'; supported: {list(SUPPORTED_AGGS)}" ) # Build a clean datetime-indexed series, then resample into periods. s = df[[date_column, value_column]].copy() s[date_column] = pd.to_datetime(s[date_column]) s = s.dropna(subset=[date_column]).set_index(date_column).sort_index() resampled = s[value_column].resample(FREQ_MAP[freq]).agg(agg) points = [ {"period": _period_label(ts, freq), "value": _clean(val)} for ts, val in resampled.items() ] # Summary stats are computed over non-empty periods only. non_null = resampled.dropna() first: float | None last: float | None change_abs: float | None change_pct: float | None slope: float | None if non_null.empty: first = last = change_abs = change_pct = slope = None direction = "flat" else: first = float(non_null.iloc[0]) last = float(non_null.iloc[-1]) change_abs = last - first change_pct = (change_abs / first) if first != 0 else None if change_abs > 0: direction = "up" elif change_abs < 0: direction = "down" else: direction = "flat" if non_null.shape[0] > 1: x = np.arange(non_null.shape[0]) slope = float(np.polyfit(x, non_null.to_numpy(dtype=float), 1)[0]) else: slope = None return { "freq": freq, "agg": agg, "points": points, "first": first, "last": last, "change_abs": change_abs, "change_pct": change_pct, "direction": direction, "slope": slope, }