Wearable_TimeSeries_Health_Monitor / feature_calculator.py
oscarzhang's picture
Upload folder using huggingface_hub
fc66e35 verified
import json
from pathlib import Path
from typing import Dict, List, Optional, Any
from collections import defaultdict
import numpy as np
import pandas as pd
class FeatureCalculator:
"""
统一从配置文件加载特征定义,构建推理/训练需要的窗口结构
"""
def __init__(
self,
config_path: Optional[Path] = None,
norm_params_path: Optional[Path] = None,
static_features_path: Optional[Path] = None,
storage_dir: Optional[Path] = None,
):
base_dir = Path(__file__).parent
self.config_path = Path(config_path or base_dir / "configs" / "features_config.json")
self.norm_params_path = Path(norm_params_path or base_dir / "processed_data" / "stage3" / "norm_params.json")
self.static_features_path = Path(static_features_path or base_dir / "processed_data" / "stage2" / "static_features.csv")
self.storage_dir = Path(storage_dir or base_dir / "data_storage")
self.storage_dir.mkdir(parents=True, exist_ok=True)
self.features_config = self._load_json(self.config_path)
self.norm_params = self._load_json(self.norm_params_path) if self.norm_params_path.exists() else {}
self.static_features_dict = self._load_static_features(self.static_features_path)
self.time_series_features = [f for f in self.features_config.get("time_series", []) if f.get("enabled", True)]
self.static_feature_defs = [f for f in self.features_config.get("static", []) if f.get("enabled", True)]
self.known_future_defs = [f for f in self.features_config.get("known_future", []) if f.get("enabled", True)]
factor_cfg = self.features_config.get("factor_features", {})
self.factor_enabled = factor_cfg.get("enabled", False)
self.factor_names = factor_cfg.get("factor_names", [])
self.factor_dim = factor_cfg.get("factor_dim", 0)
# 简单的内存级历史缓存,便于后续扩展个性化特征
self.user_histories: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
@staticmethod
def _load_json(path: Path) -> Dict:
if not path.exists():
return {}
with open(path, "r") as f:
return json.load(f)
@staticmethod
def _load_static_features(static_file: Path) -> Dict[str, Dict]:
if not static_file.exists():
return {}
df = pd.read_csv(static_file)
static_dict = {}
for _, row in df.iterrows():
device_id = str(row.get("deviceId"))
if device_id:
static_dict[device_id] = {
col: row[col]
for col in df.columns
if col != "deviceId"
}
return static_dict
@staticmethod
def _to_serializable(value):
import numpy as np
from datetime import datetime
if isinstance(value, (np.integer, )):
return int(value)
if isinstance(value, (np.floating, )):
return float(value)
if isinstance(value, (pd.Timestamp, datetime)):
return value.isoformat()
if isinstance(value, (np.ndarray, )):
return value.tolist()
raise TypeError(f"Object of type {type(value)} is not JSON serializable")
def register_data_points(self, user_id: str, data_points: List[Dict]):
"""
轻量缓存用户数据,并写入 data_storage/users/{user_id}.jsonl
"""
if not user_id:
return
user_dir = self.storage_dir / "users"
user_dir.mkdir(exist_ok=True, parents=True)
history_file = user_dir / f"{user_id}.jsonl"
with history_file.open("a", encoding="utf-8") as f:
for point in data_points:
serializable = dict(point)
ts = serializable.get('timestamp')
if isinstance(ts, (pd.Timestamp, )):
serializable['timestamp'] = ts.isoformat()
elif hasattr(ts, "isoformat"):
serializable['timestamp'] = ts.isoformat()
f.write(json.dumps(serializable, ensure_ascii=False, default=self._to_serializable) + "\n")
self.user_histories[user_id].extend(data_points)
# 只保留最近 5,000 条在内存,避免占用
if len(self.user_histories[user_id]) > 5000:
self.user_histories[user_id] = self.user_histories[user_id][-5000:]
def normalize_series(self, values: List[float], feature_name: str, cfg: Dict) -> List[float]:
arr = np.array(values, dtype=np.float32)
norm_cfg = cfg.get("normalization", {"type": "none"})
norm_type = norm_cfg.get("type", "none")
if norm_type == "zscore":
mean, std = self._get_norm_stats(feature_name, norm_cfg)
if std == 0:
std = 1.0
arr = (arr - mean) / std
elif norm_type == "minmax":
min_v = norm_cfg.get("min", 0.0)
max_v = norm_cfg.get("max", 1.0)
scale = max(max_v - min_v, 1e-6)
arr = (arr - min_v) / scale
else:
# none
pass
arr = np.nan_to_num(arr, nan=0.0, posinf=0.0, neginf=0.0)
return arr.tolist()
@staticmethod
def _coerce_value(value, feat_cfg):
default = feat_cfg.get("default", 0.0)
if value is None or pd.isna(value):
return default
category_mapping = feat_cfg.get("category_mapping")
if isinstance(value, str):
if category_mapping:
return category_mapping.get(value, default)
try:
return float(value)
except ValueError:
return default
try:
return float(value)
except (TypeError, ValueError):
return default
def _get_norm_stats(self, feature_name: str, norm_cfg: Dict) -> (float, float):
if norm_cfg.get("use_norm_params") and feature_name in self.norm_params:
stats = self.norm_params[feature_name]
return stats.get("mean", 0.0), stats.get("std", 1.0)
return norm_cfg.get("mean", 0.0), norm_cfg.get("std", 1.0)
def build_window(self, data_points: List[Dict], user_id: Optional[str] = None) -> Dict:
if len(data_points) < 12:
raise ValueError("数据点不足,需要至少12个点构建短期窗口")
if user_id:
self.register_data_points(user_id, data_points)
timestamps = []
input_features = {feat["name"]: [] for feat in self.time_series_features}
for point in data_points:
ts = point.get("timestamp")
if isinstance(ts, str):
ts = pd.to_datetime(ts)
timestamps.append(ts)
feature_payload = point.get("features", {})
for feat_cfg in self.time_series_features:
name = feat_cfg["name"]
value = feature_payload.get(name)
value = self._coerce_value(value, feat_cfg)
input_features[name].append(value)
# delta_t
delta_t = [0.0]
for i in range(1, len(timestamps)):
diff = (timestamps[i] - timestamps[i - 1]).total_seconds()
delta_t.append(float(diff))
# 归一化
normalized_features = {}
for feat_cfg in self.time_series_features:
name = feat_cfg["name"]
normalized_features[name] = self.normalize_series(input_features[name], name, feat_cfg)
static_features = self._build_static_features(data_points[0], user_id)
factor_features = self._build_factor_features(normalized_features)
known_future = self._build_known_future(timestamps[-6:] if len(timestamps) >= 6 else timestamps)
return {
"input_timestamp": timestamps[:12],
"input_delta_t": delta_t[:12],
"input_features": normalized_features,
"target_timestamp": timestamps[12:] if len(timestamps) > 12 else [],
"target_delta_t": delta_t[12:] if len(delta_t) > 12 else [],
"static_features": static_features,
"known_future_features": known_future,
"factor_features": factor_features,
}
def _build_static_features(self, first_point: Dict, user_id: Optional[str]) -> Dict:
static_payload = dict(first_point.get("static_features", {}))
device_id = first_point.get("deviceId") or user_id
if device_id and str(device_id) in self.static_features_dict:
for key, value in self.static_features_dict[str(device_id)].items():
static_payload.setdefault(key, value)
result = {}
for feat_cfg in self.static_feature_defs:
name = feat_cfg["name"]
result[name] = static_payload.get(name, feat_cfg.get("default", 0.0))
return result
def _build_factor_features(self, normalized_features: Dict[str, List[float]]) -> Optional[Dict[str, List[float]]]:
if not self.factor_enabled or not self.factor_names:
return None
factor_vectors = {}
for factor_name in self.factor_names:
# 目前采用简单均值/最大值/最小值/最后值,方便后续替换
merged = []
for feat_name, values in normalized_features.items():
if factor_name == "physio" and feat_name.startswith("hrv"):
merged.extend(values)
elif factor_name == "activity" and feat_name in {"steps", "distance", "calories"}:
merged.extend(values)
elif factor_name == "context" and feat_name in {"time_period_primary", "time_period_secondary", "is_weekend"}:
merged.extend(values)
if not merged:
factor_vectors[factor_name] = [0.0] * self.factor_dim
else:
arr = np.array(merged, dtype=np.float32)
stats = [
float(arr.mean()),
float(arr.std()),
float(arr.max()),
float(arr.min())
]
factor_vectors[factor_name] = stats[: self.factor_dim] if len(stats) >= self.factor_dim else stats + [0.0] * (self.factor_dim - len(stats))
return factor_vectors
def _build_known_future(self, timestamps: List[pd.Timestamp]) -> Dict[str, List[float]]:
hours, days, weekends = [], [], []
for ts in timestamps:
if pd.isna(ts):
hours.append(12.0)
days.append(3.0)
weekends.append(0.0)
else:
hours.append(float(ts.hour))
days.append(float(ts.weekday()))
weekends.append(float(1 if ts.weekday() >= 5 else 0))
result = {}
for cfg in self.known_future_defs:
name = cfg["name"]
if name == "hour_of_day":
result[name] = hours
elif name == "day_of_week":
result[name] = days
elif name == "is_weekend":
result[name] = weekends
return result
def get_enabled_feature_names(self) -> List[str]:
return [feat["name"] for feat in self.time_series_features]
__all__ = ["FeatureCalculator"]