| import json |
| from pathlib import Path |
| from typing import Dict, List, Optional, Any |
| from collections import defaultdict |
|
|
| import numpy as np |
| import pandas as pd |
|
|
|
|
| class FeatureCalculator: |
| """ |
| 统一从配置文件加载特征定义,构建推理/训练需要的窗口结构 |
| """ |
|
|
| def __init__( |
| self, |
| config_path: Optional[Path] = None, |
| norm_params_path: Optional[Path] = None, |
| static_features_path: Optional[Path] = None, |
| storage_dir: Optional[Path] = None, |
| ): |
| base_dir = Path(__file__).parent |
| self.config_path = Path(config_path or base_dir / "configs" / "features_config.json") |
| self.norm_params_path = Path(norm_params_path or base_dir / "processed_data" / "stage3" / "norm_params.json") |
| self.static_features_path = Path(static_features_path or base_dir / "processed_data" / "stage2" / "static_features.csv") |
| self.storage_dir = Path(storage_dir or base_dir / "data_storage") |
| self.storage_dir.mkdir(parents=True, exist_ok=True) |
|
|
| self.features_config = self._load_json(self.config_path) |
| self.norm_params = self._load_json(self.norm_params_path) if self.norm_params_path.exists() else {} |
| self.static_features_dict = self._load_static_features(self.static_features_path) |
|
|
| self.time_series_features = [f for f in self.features_config.get("time_series", []) if f.get("enabled", True)] |
| self.static_feature_defs = [f for f in self.features_config.get("static", []) if f.get("enabled", True)] |
| self.known_future_defs = [f for f in self.features_config.get("known_future", []) if f.get("enabled", True)] |
| factor_cfg = self.features_config.get("factor_features", {}) |
| self.factor_enabled = factor_cfg.get("enabled", False) |
| self.factor_names = factor_cfg.get("factor_names", []) |
| self.factor_dim = factor_cfg.get("factor_dim", 0) |
|
|
| |
| self.user_histories: Dict[str, List[Dict[str, Any]]] = defaultdict(list) |
|
|
| @staticmethod |
| def _load_json(path: Path) -> Dict: |
| if not path.exists(): |
| return {} |
| with open(path, "r") as f: |
| return json.load(f) |
|
|
| @staticmethod |
| def _load_static_features(static_file: Path) -> Dict[str, Dict]: |
| if not static_file.exists(): |
| return {} |
| df = pd.read_csv(static_file) |
| static_dict = {} |
| for _, row in df.iterrows(): |
| device_id = str(row.get("deviceId")) |
| if device_id: |
| static_dict[device_id] = { |
| col: row[col] |
| for col in df.columns |
| if col != "deviceId" |
| } |
| return static_dict |
|
|
| @staticmethod |
| def _to_serializable(value): |
| import numpy as np |
| from datetime import datetime |
| if isinstance(value, (np.integer, )): |
| return int(value) |
| if isinstance(value, (np.floating, )): |
| return float(value) |
| if isinstance(value, (pd.Timestamp, datetime)): |
| return value.isoformat() |
| if isinstance(value, (np.ndarray, )): |
| return value.tolist() |
| raise TypeError(f"Object of type {type(value)} is not JSON serializable") |
|
|
| def register_data_points(self, user_id: str, data_points: List[Dict]): |
| """ |
| 轻量缓存用户数据,并写入 data_storage/users/{user_id}.jsonl |
| """ |
| if not user_id: |
| return |
| user_dir = self.storage_dir / "users" |
| user_dir.mkdir(exist_ok=True, parents=True) |
| history_file = user_dir / f"{user_id}.jsonl" |
|
|
| with history_file.open("a", encoding="utf-8") as f: |
| for point in data_points: |
| serializable = dict(point) |
| ts = serializable.get('timestamp') |
| if isinstance(ts, (pd.Timestamp, )): |
| serializable['timestamp'] = ts.isoformat() |
| elif hasattr(ts, "isoformat"): |
| serializable['timestamp'] = ts.isoformat() |
| f.write(json.dumps(serializable, ensure_ascii=False, default=self._to_serializable) + "\n") |
|
|
| self.user_histories[user_id].extend(data_points) |
| |
| if len(self.user_histories[user_id]) > 5000: |
| self.user_histories[user_id] = self.user_histories[user_id][-5000:] |
|
|
| def normalize_series(self, values: List[float], feature_name: str, cfg: Dict) -> List[float]: |
| arr = np.array(values, dtype=np.float32) |
| norm_cfg = cfg.get("normalization", {"type": "none"}) |
| norm_type = norm_cfg.get("type", "none") |
|
|
| if norm_type == "zscore": |
| mean, std = self._get_norm_stats(feature_name, norm_cfg) |
| if std == 0: |
| std = 1.0 |
| arr = (arr - mean) / std |
| elif norm_type == "minmax": |
| min_v = norm_cfg.get("min", 0.0) |
| max_v = norm_cfg.get("max", 1.0) |
| scale = max(max_v - min_v, 1e-6) |
| arr = (arr - min_v) / scale |
| else: |
| |
| pass |
|
|
| arr = np.nan_to_num(arr, nan=0.0, posinf=0.0, neginf=0.0) |
| return arr.tolist() |
|
|
| @staticmethod |
| def _coerce_value(value, feat_cfg): |
| default = feat_cfg.get("default", 0.0) |
| if value is None or pd.isna(value): |
| return default |
| category_mapping = feat_cfg.get("category_mapping") |
| if isinstance(value, str): |
| if category_mapping: |
| return category_mapping.get(value, default) |
| try: |
| return float(value) |
| except ValueError: |
| return default |
| try: |
| return float(value) |
| except (TypeError, ValueError): |
| return default |
|
|
| def _get_norm_stats(self, feature_name: str, norm_cfg: Dict) -> (float, float): |
| if norm_cfg.get("use_norm_params") and feature_name in self.norm_params: |
| stats = self.norm_params[feature_name] |
| return stats.get("mean", 0.0), stats.get("std", 1.0) |
| return norm_cfg.get("mean", 0.0), norm_cfg.get("std", 1.0) |
|
|
| def build_window(self, data_points: List[Dict], user_id: Optional[str] = None) -> Dict: |
| if len(data_points) < 12: |
| raise ValueError("数据点不足,需要至少12个点构建短期窗口") |
|
|
| if user_id: |
| self.register_data_points(user_id, data_points) |
|
|
| timestamps = [] |
| input_features = {feat["name"]: [] for feat in self.time_series_features} |
|
|
| for point in data_points: |
| ts = point.get("timestamp") |
| if isinstance(ts, str): |
| ts = pd.to_datetime(ts) |
| timestamps.append(ts) |
|
|
| feature_payload = point.get("features", {}) |
| for feat_cfg in self.time_series_features: |
| name = feat_cfg["name"] |
| value = feature_payload.get(name) |
| value = self._coerce_value(value, feat_cfg) |
| input_features[name].append(value) |
|
|
| |
| delta_t = [0.0] |
| for i in range(1, len(timestamps)): |
| diff = (timestamps[i] - timestamps[i - 1]).total_seconds() |
| delta_t.append(float(diff)) |
|
|
| |
| normalized_features = {} |
| for feat_cfg in self.time_series_features: |
| name = feat_cfg["name"] |
| normalized_features[name] = self.normalize_series(input_features[name], name, feat_cfg) |
|
|
| static_features = self._build_static_features(data_points[0], user_id) |
| factor_features = self._build_factor_features(normalized_features) |
| known_future = self._build_known_future(timestamps[-6:] if len(timestamps) >= 6 else timestamps) |
|
|
| return { |
| "input_timestamp": timestamps[:12], |
| "input_delta_t": delta_t[:12], |
| "input_features": normalized_features, |
| "target_timestamp": timestamps[12:] if len(timestamps) > 12 else [], |
| "target_delta_t": delta_t[12:] if len(delta_t) > 12 else [], |
| "static_features": static_features, |
| "known_future_features": known_future, |
| "factor_features": factor_features, |
| } |
|
|
| def _build_static_features(self, first_point: Dict, user_id: Optional[str]) -> Dict: |
| static_payload = dict(first_point.get("static_features", {})) |
| device_id = first_point.get("deviceId") or user_id |
|
|
| if device_id and str(device_id) in self.static_features_dict: |
| for key, value in self.static_features_dict[str(device_id)].items(): |
| static_payload.setdefault(key, value) |
|
|
| result = {} |
| for feat_cfg in self.static_feature_defs: |
| name = feat_cfg["name"] |
| result[name] = static_payload.get(name, feat_cfg.get("default", 0.0)) |
| return result |
|
|
| def _build_factor_features(self, normalized_features: Dict[str, List[float]]) -> Optional[Dict[str, List[float]]]: |
| if not self.factor_enabled or not self.factor_names: |
| return None |
|
|
| factor_vectors = {} |
| for factor_name in self.factor_names: |
| |
| merged = [] |
| for feat_name, values in normalized_features.items(): |
| if factor_name == "physio" and feat_name.startswith("hrv"): |
| merged.extend(values) |
| elif factor_name == "activity" and feat_name in {"steps", "distance", "calories"}: |
| merged.extend(values) |
| elif factor_name == "context" and feat_name in {"time_period_primary", "time_period_secondary", "is_weekend"}: |
| merged.extend(values) |
|
|
| if not merged: |
| factor_vectors[factor_name] = [0.0] * self.factor_dim |
| else: |
| arr = np.array(merged, dtype=np.float32) |
| stats = [ |
| float(arr.mean()), |
| float(arr.std()), |
| float(arr.max()), |
| float(arr.min()) |
| ] |
| factor_vectors[factor_name] = stats[: self.factor_dim] if len(stats) >= self.factor_dim else stats + [0.0] * (self.factor_dim - len(stats)) |
| return factor_vectors |
|
|
| def _build_known_future(self, timestamps: List[pd.Timestamp]) -> Dict[str, List[float]]: |
| hours, days, weekends = [], [], [] |
| for ts in timestamps: |
| if pd.isna(ts): |
| hours.append(12.0) |
| days.append(3.0) |
| weekends.append(0.0) |
| else: |
| hours.append(float(ts.hour)) |
| days.append(float(ts.weekday())) |
| weekends.append(float(1 if ts.weekday() >= 5 else 0)) |
|
|
| result = {} |
| for cfg in self.known_future_defs: |
| name = cfg["name"] |
| if name == "hour_of_day": |
| result[name] = hours |
| elif name == "day_of_week": |
| result[name] = days |
| elif name == "is_weekend": |
| result[name] = weekends |
| return result |
|
|
| def get_enabled_feature_names(self) -> List[str]: |
| return [feat["name"] for feat in self.time_series_features] |
|
|
|
|
| __all__ = ["FeatureCalculator"] |
|
|
|
|