| | |
| | """ |
| | test_quickstart.py |
| | |
| | 功能: |
| | 1. 构造 1 小时窗口,演示实时异常检测(正常 / 异常两种场景) |
| | 2. 构造 7 天数据,演示异常模式聚合 |
| | 3. 输出格式化的LLM文案,方便直接接入大模型 |
| | |
| | 运行方式: |
| | python test_quickstart.py |
| | """ |
| |
|
| | from __future__ import annotations |
| |
|
| | import sys |
| | from pathlib import Path |
| | from datetime import datetime, timedelta |
| | import json |
| | import numpy as np |
| | import random |
| | import random |
| |
|
| | ROOT_DIR = Path(__file__).parent.resolve() |
| | sys.path.insert(0, str(ROOT_DIR)) |
| |
|
| | from wearable_anomaly_detector import WearableAnomalyDetector |
| | import importlib.util |
| |
|
| | |
| | formatter_spec = importlib.util.spec_from_file_location( |
| | "formatter", ROOT_DIR / "utils" / "formatter.py" |
| | ) |
| | formatter_module = importlib.util.module_from_spec(formatter_spec) |
| | formatter_spec.loader.exec_module(formatter_module) |
| | AnomalyFormatter = formatter_module.AnomalyFormatter |
| | FORMATTER = AnomalyFormatter() |
| | TEST_WINDOW_FILE = ROOT_DIR / "test_data" / "example_window.json" |
| |
|
| | WINDOW_SIZE = 12 |
| | INTERVAL_MINUTES = 5 |
| |
|
| |
|
| | def make_point(ts: datetime, device_id: str, hrv: float, hr: float, include_static: bool = True) -> dict: |
| | """构造单个数据点""" |
| | return { |
| | "timestamp": ts.isoformat(), |
| | "deviceId": device_id, |
| | "features": { |
| | "hr": float(hr), |
| | "hr_resting": 65.0, |
| | "hrv_rmssd": float(hrv), |
| | "hrv_sdnn": float(hrv * 1.2), |
| | "time_period_primary": "day", |
| | "time_period_secondary": "workday", |
| | "is_weekend": 0.0, |
| | "data_quality": "high", |
| | "baseline_hrv_mean": 75.0, |
| | "baseline_hrv_std": 5.0, |
| | }, |
| | "static_features": { |
| | "age_group": 2, |
| | "sex": 0, |
| | "exercise": 1, |
| | "coffee": 1, |
| | "drinking": 0, |
| | "MEQ": 50.0, |
| | } if include_static else {}, |
| | } |
| |
|
| |
|
| | def generate_window( |
| | device_id: str, |
| | start: datetime, |
| | base_hrv: float, |
| | base_hr: float, |
| | anomaly_level: float = 0.0, |
| | include_static: bool = True, |
| | missing_ratio: float = 0.0, |
| | ) -> list: |
| | """生成 1 小时窗口数据""" |
| | data = [] |
| | base_hrv_for_day = max(30, base_hrv - 18 * anomaly_level) |
| | base_hr_for_day = min(125, base_hr + 10 * anomaly_level) |
| | for i in range(WINDOW_SIZE): |
| | noise_hrv = np.random.normal(0, 3) |
| | noise_hr = np.random.normal(0, 1.5) |
| | decline = -15 * anomaly_level * (i / WINDOW_SIZE) |
| | increase = 8 * anomaly_level * (i / WINDOW_SIZE) |
| | hrv = max(25, base_hrv_for_day + noise_hrv + decline) |
| | hr = min(125, base_hr_for_day + noise_hr + increase) |
| | ts = start + timedelta(minutes=INTERVAL_MINUTES * i) |
| | point = make_point(ts, device_id, hrv, hr, include_static=include_static) |
| |
|
| | if missing_ratio > 0 and random.random() < missing_ratio: |
| | point["features"].pop("hr_resting", None) |
| | point["features"].pop("baseline_hrv_mean", None) |
| | point["features"].pop("baseline_hrv_std", None) |
| | if random.random() < 0.5: |
| | point["static_features"] = {} |
| |
|
| | data.append(point) |
| | return data |
| |
|
| |
|
| | def load_window_from_file(path: Path) -> list | None: |
| | try: |
| | with open(path, "r", encoding="utf-8") as f: |
| | data = json.load(f) |
| | assert isinstance(data, list) and data, "JSON needs to be a non-empty list" |
| | return data |
| | except Exception as exc: |
| | print(f" ⚠️ 读取 {path.name} 失败: {exc}") |
| | return None |
| |
|
| |
|
| | def demo_from_file(detector: WearableAnomalyDetector) -> None: |
| | print("\n" + "=" * 80) |
| | print("示例文件推理(test_data/example_window.json)") |
| | print("=" * 80) |
| |
|
| | if not TEST_WINDOW_FILE.exists(): |
| | print(f" ⚠️ 未找到 {TEST_WINDOW_FILE}, 请确认仓库中存在该文件") |
| | return |
| |
|
| | window = load_window_from_file(TEST_WINDOW_FILE) |
| | if not window: |
| | return |
| |
|
| | avg_hrv = np.nanmean([pt["features"]["hrv_rmssd"] for pt in window]) |
| | avg_hr = np.nanmean([pt["features"]["hr"] for pt in window]) |
| | print(f" - 数据点数: {len(window)}") |
| | print(f" - 平均 HRV: {avg_hrv:.2f} ms, 平均心率: {avg_hr:.1f} bpm") |
| |
|
| | result = detector.detect_realtime(window, update_baseline=False) |
| | print( |
| | f" -> 是否异常: {'是 ⚠️' if result.get('is_anomaly') else '否'} | " |
| | f"分数: {result.get('anomaly_score', 0):.4f} | 阈值: {result.get('threshold', 0):.4f}" |
| | ) |
| |
|
| | baseline_info = { |
| | "baseline_mean": 76.0, |
| | "baseline_std": 5.0, |
| | "current_value": avg_hrv, |
| | "deviation_pct": (avg_hrv - 76.0) / 76.0 * 100, |
| | } |
| | llm_text = FORMATTER.format_for_llm(result, baseline_info=baseline_info) |
| | print("\n LLM 文本片段(前 350 字符):") |
| | print("-" * 60) |
| | print(llm_text[:350]) |
| | print("...") |
| | print("-" * 60) |
| |
|
| |
|
| | def demo_realtime(detector: WearableAnomalyDetector) -> None: |
| | print("\n" + "=" * 80) |
| | print("实时检测示例") |
| | print("=" * 80) |
| |
|
| | start = datetime.now() - timedelta(hours=1) |
| | normal_window = generate_window("demo_normal", start, base_hrv=76, base_hr=68, anomaly_level=0.0) |
| | anomaly_window = generate_window("demo_anomaly", start, base_hrv=74, base_hr=70, anomaly_level=0.7) |
| |
|
| | for title, window in [("正常窗口", normal_window), ("异常窗口", anomaly_window)]: |
| | avg_hrv = np.mean([pt["features"]["hrv_rmssd"] for pt in window]) |
| | avg_hr = np.mean([pt["features"]["hr"] for pt in window]) |
| | print(f"\n[{title}] HRV≈{avg_hrv:.2f} ms, HR≈{avg_hr:.1f} bpm") |
| | result = detector.detect_realtime(window, update_baseline=False) |
| | print( |
| | f" -> 是否异常: {'是 ⚠️' if result.get('is_anomaly') else '否'} | " |
| | f"分数: {result.get('anomaly_score', 0):.4f} | 阈值: {result.get('threshold', 0):.4f}" |
| | ) |
| |
|
| |
|
| | def demo_pattern(detector: WearableAnomalyDetector) -> None: |
| | print("\n" + "=" * 80) |
| | print("7 天异常模式聚合示例") |
| | print("=" * 80) |
| |
|
| | base_date = datetime.now() - timedelta(days=7) |
| | daily_data = [] |
| | anomaly_plan = [0.0, 0.1, 0.3, 1.0, 1.4, 1.8, 1.8] |
| | avg_hrv_per_day = [] |
| |
|
| | for day, anomaly_level in enumerate(anomaly_plan): |
| | day_start = base_date + timedelta(days=day) |
| | window = generate_window( |
| | device_id="demo_pattern", |
| | start=day_start.replace(hour=8, minute=0, second=0, microsecond=0), |
| | base_hrv=75, |
| | base_hr=69, |
| | anomaly_level=anomaly_level, |
| | ) |
| | daily_data.append(window) |
| | avg_hrv_per_day.append(np.mean([pt["features"]["hrv_rmssd"] for pt in window])) |
| |
|
| | print(" 日均HRV轨迹: " + ", ".join(f"{val:.1f}" for val in avg_hrv_per_day)) |
| |
|
| | result = detector.detect_pattern( |
| | daily_data, |
| | days=len(daily_data), |
| | min_duration_days=2, |
| | format_for_llm=True |
| | ) |
| | pattern = result.get("anomaly_pattern", {}) |
| | print( |
| | f" -> 是否有模式: {'是' if pattern.get('has_pattern') else '否'} | " |
| | f"持续天数: {pattern.get('duration_days', 0)} | 趋势: {pattern.get('trend', '未知')}" |
| | ) |
| |
|
| | if "formatted_for_llm" in result: |
| | print("\n格式化输出(前 400 字符):") |
| | print("-" * 60) |
| | print(result["formatted_for_llm"][:400]) |
| | print("...") |
| | print("-" * 60) |
| |
|
| |
|
| | def demo_missing_data(detector: WearableAnomalyDetector) -> None: |
| | print("\n" + "=" * 80) |
| | print("数据缺失 / 质量下降示例") |
| | print("=" * 80) |
| |
|
| | start = datetime.now() - timedelta(hours=1) |
| | incomplete_window = generate_window( |
| | device_id="demo_missing", |
| | start=start, |
| | base_hrv=74, |
| | base_hr=71, |
| | anomaly_level=0.5, |
| | include_static=True, |
| | missing_ratio=0.4, |
| | ) |
| |
|
| | |
| | for idx in (3, 7): |
| | incomplete_window[idx]["features"]["data_quality"] = "low" |
| | incomplete_window[idx]["features"]["hr"] = float("nan") |
| |
|
| | avg_hrv = np.nanmean([pt["features"].get("hrv_rmssd", np.nan) for pt in incomplete_window]) |
| | available_static = sum(bool(pt["static_features"]) for pt in incomplete_window) |
| | print(f" - 有效静态特征点数: {available_static}/{len(incomplete_window)}") |
| | print(f" - 平均 HRV(忽略缺失): {avg_hrv:.2f} ms") |
| |
|
| | result = detector.detect_realtime(incomplete_window, update_baseline=False) |
| | print( |
| | f" -> 是否异常: {'是' if result.get('is_anomaly') else '否'} | " |
| | f"分数: {result.get('anomaly_score', 0):.4f} | 阈值: {result.get('threshold', 0):.4f}" |
| | ) |
| |
|
| |
|
| | def main() -> None: |
| | model_dir = ROOT_DIR / "checkpoints" / "phase2" / "exp_factor_balanced" |
| | detector = WearableAnomalyDetector(model_dir=model_dir, device="cpu") |
| | detector.update_threshold(0.50) |
| | demo_from_file(detector) |
| | demo_realtime(detector) |
| | demo_pattern(detector) |
| | demo_missing_data(detector) |
| |
|
| |
|
| | if __name__ == "__main__": |
| | main() |
| |
|
| |
|