DavMelchi's picture
Add export raw data toggle with optimized health check evaluation using vectorized operations and improved timestamp handling for baseline/recent period masking with persistent bad date detection
440ac25
from datetime import date, datetime, timedelta
import numpy as np
import pandas as pd
def _to_timestamp(value) -> pd.Timestamp | None:
if value is None:
return None
if isinstance(value, pd.Timestamp):
return value
if isinstance(value, datetime):
return pd.Timestamp(value)
if isinstance(value, date):
return pd.Timestamp(value)
try:
v = pd.to_datetime(value, errors="coerce")
return v if pd.notna(v) else None
except Exception: # noqa: BLE001
return None
def window_bounds_period(
end_dt: pd.Timestamp,
periods: int,
step: timedelta,
) -> tuple[pd.Timestamp, pd.Timestamp]:
start = end_dt - step * (int(periods) - 1)
return start, end_dt
def window_bounds(end_date: date, days: int) -> tuple[date, date]:
start = end_date - timedelta(days=days - 1)
return start, end_date
def is_bad(
value: float | None,
baseline: float | None,
direction: str,
rel_threshold_pct: float,
sla: float | None,
) -> bool:
if value is None or (isinstance(value, float) and np.isnan(value)):
return False
bad = False
if sla is not None and not (isinstance(sla, float) and np.isnan(sla)):
if direction == "higher_is_better":
bad = bad or (value < float(sla))
else:
bad = bad or (value > float(sla))
if baseline is None or (isinstance(baseline, float) and np.isnan(baseline)):
return bad
thr = float(rel_threshold_pct) / 100.0
if direction == "higher_is_better":
return bad or (value < baseline - abs(baseline) * thr)
return bad or (value > baseline + abs(baseline) * thr)
def max_consecutive_periods(values: list, step: timedelta) -> int:
if not values:
return 0
ts = [_to_timestamp(v) for v in values]
ts2 = [t for t in ts if t is not None]
if not ts2:
return 0
ts_sorted = sorted(set(ts2))
streak = 1
best = 1
for prev, cur in zip(ts_sorted, ts_sorted[1:]):
if cur == prev + step:
streak += 1
else:
streak = 1
if streak > best:
best = streak
return best
def max_consecutive_days(dates: list[date]) -> int:
return max_consecutive_periods(dates, step=timedelta(days=1))
def evaluate_health_check(
daily: pd.DataFrame,
rat: str,
rules_df: pd.DataFrame,
baseline_days_n: int,
recent_days_n: int,
rel_threshold_pct: float,
min_consecutive_days: int,
granularity: str = "Daily",
) -> tuple[pd.DataFrame, pd.DataFrame]:
if daily.empty:
return pd.DataFrame(), pd.DataFrame()
g = str(granularity or "Daily").strip().lower()
is_hourly = g.startswith("hour") or g.startswith("h")
time_col = (
"period_start"
if (is_hourly and "period_start" in daily.columns)
else "date_only"
)
step = timedelta(hours=1) if is_hourly else timedelta(days=1)
baseline_periods = int(baseline_days_n) * 24 if is_hourly else int(baseline_days_n)
recent_periods = int(recent_days_n) * 24 if is_hourly else int(recent_days_n)
min_periods = (
int(min_consecutive_days) * 24 if is_hourly else int(min_consecutive_days)
)
end_dt = _to_timestamp(pd.to_datetime(daily[time_col], errors="coerce").max())
if end_dt is None:
return pd.DataFrame(), pd.DataFrame()
recent_start_dt, recent_end_dt = window_bounds_period(end_dt, recent_periods, step)
baseline_end_dt = recent_start_dt - step
baseline_start_dt, _ = window_bounds_period(baseline_end_dt, baseline_periods, step)
rat_rules = rules_df[rules_df["RAT"] == rat].copy()
kpis = [k for k in rat_rules["KPI"].tolist() if k in daily.columns]
rules_by_kpi = {
str(r["KPI"]): r
for r in rat_rules.to_dict(orient="records")
if str(r.get("KPI", ""))
}
rows = []
for site_code, g_site in daily.groupby("site_code"):
city = (
g_site["City"].dropna().iloc[0]
if ("City" in g_site.columns and g_site["City"].notna().any())
else None
)
g_site = g_site.sort_values(time_col)
t_all = pd.to_datetime(g_site[time_col], errors="coerce")
baseline_mask_all = (t_all >= baseline_start_dt) & (t_all <= baseline_end_dt)
recent_mask_all = (t_all >= recent_start_dt) & (t_all <= recent_end_dt)
for kpi in kpis:
rule = rules_by_kpi.get(str(kpi), {})
direction = str(rule.get("direction", "higher_is_better"))
policy = str(rule.get("policy", "enforce") or "enforce").strip().lower()
sla = rule.get("sla", np.nan)
try:
sla_val = float(sla) if pd.notna(sla) else None
except Exception:
sla_val = None
sla_eval = None if policy == "notify" else sla_val
vals = pd.to_numeric(g_site[kpi], errors="coerce")
has_any = bool(vals.notna().any())
if not has_any:
rows.append(
{
"RAT": rat,
"site_code": int(site_code),
"City": city,
"KPI": kpi,
"status": "NO_DATA",
}
)
continue
baseline_vals = vals.loc[baseline_mask_all]
recent_vals = vals.loc[recent_mask_all]
t_recent = t_all.loc[recent_vals.index]
baseline = (
baseline_vals.median(skipna=True) if baseline_mask_all.any() else np.nan
)
recent = (
recent_vals.median(skipna=True) if recent_mask_all.any() else np.nan
)
bad_dates: list = []
if recent_mask_all.any() and recent_vals.notna().any():
thr = float(rel_threshold_pct) / 100.0
b = float(baseline) if pd.notna(baseline) else None
bad_series = pd.Series(False, index=recent_vals.index)
if b is not None:
if direction == "higher_is_better":
bad_series = bad_series | (recent_vals < (b - abs(b) * thr))
else:
bad_series = bad_series | (recent_vals > (b + abs(b) * thr))
if sla_eval is not None and pd.notna(sla_eval):
if direction == "higher_is_better":
bad_series = bad_series | (recent_vals < float(sla_eval))
else:
bad_series = bad_series | (recent_vals > float(sla_eval))
bad_series = bad_series & recent_vals.notna() & t_recent.notna()
if bool(bad_series.any()):
bad_dates = t_recent.loc[bad_series].tolist()
max_streak = max_consecutive_periods(bad_dates, step=step)
persistent = max_streak >= int(min_periods)
is_bad_recent = is_bad(
float(recent) if pd.notna(recent) else None,
float(baseline) if pd.notna(baseline) else None,
direction,
rel_threshold_pct,
sla_eval,
)
is_bad_current = is_bad_recent
try:
last_mask = recent_mask_all & vals.notna() & t_all.notna()
if bool(last_mask.any()):
idx_last = t_all.loc[last_mask].idxmax()
last_val = vals.loc[idx_last]
is_bad_current = is_bad(
float(last_val) if pd.notna(last_val) else None,
float(baseline) if pd.notna(baseline) else None,
direction,
rel_threshold_pct,
sla_eval,
)
except Exception: # noqa: BLE001
pass
had_bad_recent = (len(bad_dates) > 0) or bool(is_bad_recent)
if policy == "notify":
if is_bad_current:
status = "NOTIFY"
elif had_bad_recent:
status = "NOTIFY_RESOLVED"
else:
status = "OK"
else:
if is_bad_current and persistent:
status = "PERSISTENT_DEGRADED"
elif is_bad_current:
status = "DEGRADED"
elif had_bad_recent:
status = "RESOLVED"
else:
status = "OK"
rows.append(
{
"RAT": rat,
"site_code": int(site_code),
"City": city,
"KPI": kpi,
"direction": direction,
"sla": sla_val,
"policy": policy,
"baseline_median": baseline,
"recent_median": recent,
"bad_days_recent": len(bad_dates),
"max_streak_recent": int(max_streak),
"status": status,
}
)
status_df = pd.DataFrame(rows)
summary_rows = []
for site_code, g in status_df.groupby("site_code"):
city = (
g["City"].dropna().iloc[0]
if ("City" in g.columns and g["City"].notna().any())
else None
)
degraded_cnt = int(g["status"].isin(["DEGRADED", "PERSISTENT_DEGRADED"]).sum())
persistent_cnt = int((g["status"] == "PERSISTENT_DEGRADED").sum())
resolved_cnt = int((g["status"] == "RESOLVED").sum())
summary_rows.append(
{
"RAT": rat,
"site_code": int(site_code),
"City": city,
"degraded_kpis": degraded_cnt,
"persistent_kpis": persistent_cnt,
"resolved_kpis": resolved_cnt,
}
)
summary_df = pd.DataFrame(summary_rows).sort_values(
by=["degraded_kpis", "persistent_kpis", "resolved_kpis"],
ascending=[False, False, False],
)
return status_df, summary_df