| from datetime import date, datetime, timedelta |
|
|
| import numpy as np |
| import pandas as pd |
|
|
|
|
| def _to_timestamp(value) -> pd.Timestamp | None: |
| if value is None: |
| return None |
| if isinstance(value, pd.Timestamp): |
| return value |
| if isinstance(value, datetime): |
| return pd.Timestamp(value) |
| if isinstance(value, date): |
| return pd.Timestamp(value) |
| try: |
| v = pd.to_datetime(value, errors="coerce") |
| return v if pd.notna(v) else None |
| except Exception: |
| return None |
|
|
|
|
| def window_bounds_period( |
| end_dt: pd.Timestamp, |
| periods: int, |
| step: timedelta, |
| ) -> tuple[pd.Timestamp, pd.Timestamp]: |
| start = end_dt - step * (int(periods) - 1) |
| return start, end_dt |
|
|
|
|
| def window_bounds(end_date: date, days: int) -> tuple[date, date]: |
| start = end_date - timedelta(days=days - 1) |
| return start, end_date |
|
|
|
|
| def is_bad( |
| value: float | None, |
| baseline: float | None, |
| direction: str, |
| rel_threshold_pct: float, |
| sla: float | None, |
| ) -> bool: |
| if value is None or (isinstance(value, float) and np.isnan(value)): |
| return False |
| bad = False |
| if sla is not None and not (isinstance(sla, float) and np.isnan(sla)): |
| if direction == "higher_is_better": |
| bad = bad or (value < float(sla)) |
| else: |
| bad = bad or (value > float(sla)) |
|
|
| if baseline is None or (isinstance(baseline, float) and np.isnan(baseline)): |
| return bad |
|
|
| thr = float(rel_threshold_pct) / 100.0 |
| if direction == "higher_is_better": |
| return bad or (value < baseline - abs(baseline) * thr) |
| return bad or (value > baseline + abs(baseline) * thr) |
|
|
|
|
| def max_consecutive_periods(values: list, step: timedelta) -> int: |
| if not values: |
| return 0 |
| ts = [_to_timestamp(v) for v in values] |
| ts2 = [t for t in ts if t is not None] |
| if not ts2: |
| return 0 |
| ts_sorted = sorted(set(ts2)) |
| streak = 1 |
| best = 1 |
| for prev, cur in zip(ts_sorted, ts_sorted[1:]): |
| if cur == prev + step: |
| streak += 1 |
| else: |
| streak = 1 |
| if streak > best: |
| best = streak |
| return best |
|
|
|
|
| def max_consecutive_days(dates: list[date]) -> int: |
| return max_consecutive_periods(dates, step=timedelta(days=1)) |
|
|
|
|
| def evaluate_health_check( |
| daily: pd.DataFrame, |
| rat: str, |
| rules_df: pd.DataFrame, |
| baseline_days_n: int, |
| recent_days_n: int, |
| rel_threshold_pct: float, |
| min_consecutive_days: int, |
| granularity: str = "Daily", |
| ) -> tuple[pd.DataFrame, pd.DataFrame]: |
| if daily.empty: |
| return pd.DataFrame(), pd.DataFrame() |
|
|
| g = str(granularity or "Daily").strip().lower() |
| is_hourly = g.startswith("hour") or g.startswith("h") |
| time_col = ( |
| "period_start" |
| if (is_hourly and "period_start" in daily.columns) |
| else "date_only" |
| ) |
|
|
| step = timedelta(hours=1) if is_hourly else timedelta(days=1) |
| baseline_periods = int(baseline_days_n) * 24 if is_hourly else int(baseline_days_n) |
| recent_periods = int(recent_days_n) * 24 if is_hourly else int(recent_days_n) |
| min_periods = ( |
| int(min_consecutive_days) * 24 if is_hourly else int(min_consecutive_days) |
| ) |
|
|
| end_dt = _to_timestamp(pd.to_datetime(daily[time_col], errors="coerce").max()) |
| if end_dt is None: |
| return pd.DataFrame(), pd.DataFrame() |
|
|
| recent_start_dt, recent_end_dt = window_bounds_period(end_dt, recent_periods, step) |
| baseline_end_dt = recent_start_dt - step |
| baseline_start_dt, _ = window_bounds_period(baseline_end_dt, baseline_periods, step) |
|
|
| rat_rules = rules_df[rules_df["RAT"] == rat].copy() |
| kpis = [k for k in rat_rules["KPI"].tolist() if k in daily.columns] |
| rules_by_kpi = { |
| str(r["KPI"]): r |
| for r in rat_rules.to_dict(orient="records") |
| if str(r.get("KPI", "")) |
| } |
|
|
| rows = [] |
|
|
| for site_code, g_site in daily.groupby("site_code"): |
| city = ( |
| g_site["City"].dropna().iloc[0] |
| if ("City" in g_site.columns and g_site["City"].notna().any()) |
| else None |
| ) |
| g_site = g_site.sort_values(time_col) |
| t_all = pd.to_datetime(g_site[time_col], errors="coerce") |
| baseline_mask_all = (t_all >= baseline_start_dt) & (t_all <= baseline_end_dt) |
| recent_mask_all = (t_all >= recent_start_dt) & (t_all <= recent_end_dt) |
|
|
| for kpi in kpis: |
| rule = rules_by_kpi.get(str(kpi), {}) |
| direction = str(rule.get("direction", "higher_is_better")) |
| policy = str(rule.get("policy", "enforce") or "enforce").strip().lower() |
| sla = rule.get("sla", np.nan) |
| try: |
| sla_val = float(sla) if pd.notna(sla) else None |
| except Exception: |
| sla_val = None |
|
|
| sla_eval = None if policy == "notify" else sla_val |
|
|
| vals = pd.to_numeric(g_site[kpi], errors="coerce") |
| has_any = bool(vals.notna().any()) |
| if not has_any: |
| rows.append( |
| { |
| "RAT": rat, |
| "site_code": int(site_code), |
| "City": city, |
| "KPI": kpi, |
| "status": "NO_DATA", |
| } |
| ) |
| continue |
|
|
| baseline_vals = vals.loc[baseline_mask_all] |
| recent_vals = vals.loc[recent_mask_all] |
| t_recent = t_all.loc[recent_vals.index] |
|
|
| baseline = ( |
| baseline_vals.median(skipna=True) if baseline_mask_all.any() else np.nan |
| ) |
| recent = ( |
| recent_vals.median(skipna=True) if recent_mask_all.any() else np.nan |
| ) |
|
|
| bad_dates: list = [] |
| if recent_mask_all.any() and recent_vals.notna().any(): |
| thr = float(rel_threshold_pct) / 100.0 |
| b = float(baseline) if pd.notna(baseline) else None |
| bad_series = pd.Series(False, index=recent_vals.index) |
|
|
| if b is not None: |
| if direction == "higher_is_better": |
| bad_series = bad_series | (recent_vals < (b - abs(b) * thr)) |
| else: |
| bad_series = bad_series | (recent_vals > (b + abs(b) * thr)) |
|
|
| if sla_eval is not None and pd.notna(sla_eval): |
| if direction == "higher_is_better": |
| bad_series = bad_series | (recent_vals < float(sla_eval)) |
| else: |
| bad_series = bad_series | (recent_vals > float(sla_eval)) |
|
|
| bad_series = bad_series & recent_vals.notna() & t_recent.notna() |
| if bool(bad_series.any()): |
| bad_dates = t_recent.loc[bad_series].tolist() |
|
|
| max_streak = max_consecutive_periods(bad_dates, step=step) |
| persistent = max_streak >= int(min_periods) |
|
|
| is_bad_recent = is_bad( |
| float(recent) if pd.notna(recent) else None, |
| float(baseline) if pd.notna(baseline) else None, |
| direction, |
| rel_threshold_pct, |
| sla_eval, |
| ) |
|
|
| is_bad_current = is_bad_recent |
| try: |
| last_mask = recent_mask_all & vals.notna() & t_all.notna() |
| if bool(last_mask.any()): |
| idx_last = t_all.loc[last_mask].idxmax() |
| last_val = vals.loc[idx_last] |
| is_bad_current = is_bad( |
| float(last_val) if pd.notna(last_val) else None, |
| float(baseline) if pd.notna(baseline) else None, |
| direction, |
| rel_threshold_pct, |
| sla_eval, |
| ) |
| except Exception: |
| pass |
|
|
| had_bad_recent = (len(bad_dates) > 0) or bool(is_bad_recent) |
|
|
| if policy == "notify": |
| if is_bad_current: |
| status = "NOTIFY" |
| elif had_bad_recent: |
| status = "NOTIFY_RESOLVED" |
| else: |
| status = "OK" |
| else: |
| if is_bad_current and persistent: |
| status = "PERSISTENT_DEGRADED" |
| elif is_bad_current: |
| status = "DEGRADED" |
| elif had_bad_recent: |
| status = "RESOLVED" |
| else: |
| status = "OK" |
|
|
| rows.append( |
| { |
| "RAT": rat, |
| "site_code": int(site_code), |
| "City": city, |
| "KPI": kpi, |
| "direction": direction, |
| "sla": sla_val, |
| "policy": policy, |
| "baseline_median": baseline, |
| "recent_median": recent, |
| "bad_days_recent": len(bad_dates), |
| "max_streak_recent": int(max_streak), |
| "status": status, |
| } |
| ) |
|
|
| status_df = pd.DataFrame(rows) |
|
|
| summary_rows = [] |
| for site_code, g in status_df.groupby("site_code"): |
| city = ( |
| g["City"].dropna().iloc[0] |
| if ("City" in g.columns and g["City"].notna().any()) |
| else None |
| ) |
| degraded_cnt = int(g["status"].isin(["DEGRADED", "PERSISTENT_DEGRADED"]).sum()) |
| persistent_cnt = int((g["status"] == "PERSISTENT_DEGRADED").sum()) |
| resolved_cnt = int((g["status"] == "RESOLVED").sum()) |
| summary_rows.append( |
| { |
| "RAT": rat, |
| "site_code": int(site_code), |
| "City": city, |
| "degraded_kpis": degraded_cnt, |
| "persistent_kpis": persistent_cnt, |
| "resolved_kpis": resolved_cnt, |
| } |
| ) |
|
|
| summary_df = pd.DataFrame(summary_rows).sort_values( |
| by=["degraded_kpis", "persistent_kpis", "resolved_kpis"], |
| ascending=[False, False, False], |
| ) |
|
|
| return status_df, summary_df |
|
|