#!/usr/bin/env python3 """ test_quickstart.py 功能: 1. 构造 1 小时窗口,演示实时异常检测(正常 / 异常两种场景) 2. 构造 7 天数据,演示异常模式聚合 3. 输出格式化的LLM文案,方便直接接入大模型 运行方式: python test_quickstart.py """ from __future__ import annotations import sys from pathlib import Path from datetime import datetime, timedelta import json import numpy as np import random import random ROOT_DIR = Path(__file__).parent.resolve() sys.path.insert(0, str(ROOT_DIR)) from wearable_anomaly_detector import WearableAnomalyDetector import importlib.util # 动态导入 utils.formatter,避免相对路径问题 formatter_spec = importlib.util.spec_from_file_location( "formatter", ROOT_DIR / "utils" / "formatter.py" ) formatter_module = importlib.util.module_from_spec(formatter_spec) formatter_spec.loader.exec_module(formatter_module) AnomalyFormatter = formatter_module.AnomalyFormatter FORMATTER = AnomalyFormatter() TEST_WINDOW_FILE = ROOT_DIR / "test_data" / "example_window.json" WINDOW_SIZE = 12 # 12 * 5 分钟 = 1 小时 INTERVAL_MINUTES = 5 def make_point(ts: datetime, device_id: str, hrv: float, hr: float, include_static: bool = True) -> dict: """构造单个数据点""" return { "timestamp": ts.isoformat(), "deviceId": device_id, "features": { "hr": float(hr), "hr_resting": 65.0, "hrv_rmssd": float(hrv), "hrv_sdnn": float(hrv * 1.2), "time_period_primary": "day", "time_period_secondary": "workday", "is_weekend": 0.0, "data_quality": "high", "baseline_hrv_mean": 75.0, "baseline_hrv_std": 5.0, }, "static_features": { "age_group": 2, "sex": 0, "exercise": 1, "coffee": 1, "drinking": 0, "MEQ": 50.0, } if include_static else {}, } def generate_window( device_id: str, start: datetime, base_hrv: float, base_hr: float, anomaly_level: float = 0.0, include_static: bool = True, missing_ratio: float = 0.0, ) -> list: """生成 1 小时窗口数据""" data = [] base_hrv_for_day = max(30, base_hrv - 18 * anomaly_level) base_hr_for_day = min(125, base_hr + 10 * anomaly_level) for i in range(WINDOW_SIZE): noise_hrv = np.random.normal(0, 3) noise_hr = np.random.normal(0, 1.5) decline = -15 * anomaly_level * (i / WINDOW_SIZE) increase = 8 * anomaly_level * (i / WINDOW_SIZE) hrv = max(25, base_hrv_for_day + noise_hrv + decline) hr = min(125, base_hr_for_day + noise_hr + increase) ts = start + timedelta(minutes=INTERVAL_MINUTES * i) point = make_point(ts, device_id, hrv, hr, include_static=include_static) if missing_ratio > 0 and random.random() < missing_ratio: point["features"].pop("hr_resting", None) point["features"].pop("baseline_hrv_mean", None) point["features"].pop("baseline_hrv_std", None) if random.random() < 0.5: point["static_features"] = {} data.append(point) return data def load_window_from_file(path: Path) -> list | None: try: with open(path, "r", encoding="utf-8") as f: data = json.load(f) assert isinstance(data, list) and data, "JSON needs to be a non-empty list" return data except Exception as exc: print(f" ⚠️ 读取 {path.name} 失败: {exc}") return None def demo_from_file(detector: WearableAnomalyDetector) -> None: print("\n" + "=" * 80) print("示例文件推理(test_data/example_window.json)") print("=" * 80) if not TEST_WINDOW_FILE.exists(): print(f" ⚠️ 未找到 {TEST_WINDOW_FILE}, 请确认仓库中存在该文件") return window = load_window_from_file(TEST_WINDOW_FILE) if not window: return avg_hrv = np.nanmean([pt["features"]["hrv_rmssd"] for pt in window]) avg_hr = np.nanmean([pt["features"]["hr"] for pt in window]) print(f" - 数据点数: {len(window)}") print(f" - 平均 HRV: {avg_hrv:.2f} ms, 平均心率: {avg_hr:.1f} bpm") result = detector.detect_realtime(window, update_baseline=False) print( f" -> 是否异常: {'是 ⚠️' if result.get('is_anomaly') else '否'} | " f"分数: {result.get('anomaly_score', 0):.4f} | 阈值: {result.get('threshold', 0):.4f}" ) baseline_info = { "baseline_mean": 76.0, "baseline_std": 5.0, "current_value": avg_hrv, "deviation_pct": (avg_hrv - 76.0) / 76.0 * 100, } llm_text = FORMATTER.format_for_llm(result, baseline_info=baseline_info) print("\n LLM 文本片段(前 350 字符):") print("-" * 60) print(llm_text[:350]) print("...") print("-" * 60) def demo_realtime(detector: WearableAnomalyDetector) -> None: print("\n" + "=" * 80) print("实时检测示例") print("=" * 80) start = datetime.now() - timedelta(hours=1) normal_window = generate_window("demo_normal", start, base_hrv=76, base_hr=68, anomaly_level=0.0) anomaly_window = generate_window("demo_anomaly", start, base_hrv=74, base_hr=70, anomaly_level=0.7) for title, window in [("正常窗口", normal_window), ("异常窗口", anomaly_window)]: avg_hrv = np.mean([pt["features"]["hrv_rmssd"] for pt in window]) avg_hr = np.mean([pt["features"]["hr"] for pt in window]) print(f"\n[{title}] HRV≈{avg_hrv:.2f} ms, HR≈{avg_hr:.1f} bpm") result = detector.detect_realtime(window, update_baseline=False) print( f" -> 是否异常: {'是 ⚠️' if result.get('is_anomaly') else '否'} | " f"分数: {result.get('anomaly_score', 0):.4f} | 阈值: {result.get('threshold', 0):.4f}" ) def demo_pattern(detector: WearableAnomalyDetector) -> None: print("\n" + "=" * 80) print("7 天异常模式聚合示例") print("=" * 80) base_date = datetime.now() - timedelta(days=7) daily_data = [] anomaly_plan = [0.0, 0.1, 0.3, 1.0, 1.4, 1.8, 1.8] avg_hrv_per_day = [] for day, anomaly_level in enumerate(anomaly_plan): day_start = base_date + timedelta(days=day) window = generate_window( device_id="demo_pattern", start=day_start.replace(hour=8, minute=0, second=0, microsecond=0), base_hrv=75, base_hr=69, anomaly_level=anomaly_level, ) daily_data.append(window) avg_hrv_per_day.append(np.mean([pt["features"]["hrv_rmssd"] for pt in window])) print(" 日均HRV轨迹: " + ", ".join(f"{val:.1f}" for val in avg_hrv_per_day)) result = detector.detect_pattern( daily_data, days=len(daily_data), min_duration_days=2, format_for_llm=True ) pattern = result.get("anomaly_pattern", {}) print( f" -> 是否有模式: {'是' if pattern.get('has_pattern') else '否'} | " f"持续天数: {pattern.get('duration_days', 0)} | 趋势: {pattern.get('trend', '未知')}" ) if "formatted_for_llm" in result: print("\n格式化输出(前 400 字符):") print("-" * 60) print(result["formatted_for_llm"][:400]) print("...") print("-" * 60) def demo_missing_data(detector: WearableAnomalyDetector) -> None: print("\n" + "=" * 80) print("数据缺失 / 质量下降示例") print("=" * 80) start = datetime.now() - timedelta(hours=1) incomplete_window = generate_window( device_id="demo_missing", start=start, base_hrv=74, base_hr=71, anomaly_level=0.5, include_static=True, missing_ratio=0.4, ) # 模拟传感器丢包:移除 2 个时间点 & 降低数据质量 for idx in (3, 7): incomplete_window[idx]["features"]["data_quality"] = "low" incomplete_window[idx]["features"]["hr"] = float("nan") avg_hrv = np.nanmean([pt["features"].get("hrv_rmssd", np.nan) for pt in incomplete_window]) available_static = sum(bool(pt["static_features"]) for pt in incomplete_window) print(f" - 有效静态特征点数: {available_static}/{len(incomplete_window)}") print(f" - 平均 HRV(忽略缺失): {avg_hrv:.2f} ms") result = detector.detect_realtime(incomplete_window, update_baseline=False) print( f" -> 是否异常: {'是' if result.get('is_anomaly') else '否'} | " f"分数: {result.get('anomaly_score', 0):.4f} | 阈值: {result.get('threshold', 0):.4f}" ) def main() -> None: model_dir = ROOT_DIR / "checkpoints" / "phase2" / "exp_factor_balanced" detector = WearableAnomalyDetector(model_dir=model_dir, device="cpu") detector.update_threshold(0.50) demo_from_file(detector) demo_realtime(detector) demo_pattern(detector) demo_missing_data(detector) if __name__ == "__main__": main()