""" 历史数据平台API客户端 最小化实现,只包含必要的功能 """ import json import requests from pathlib import Path from typing import Dict, List, Optional from datetime import datetime, timedelta class HistoricalDataPlatformClient: """ 历史数据平台API客户端 """ def __init__( self, base_url: str = "", api_key: Optional[str] = None, timeout: int = 30, retry_times: int = 3 ): """ 初始化API客户端 参数: base_url: API基础URL api_key: API密钥(可选) timeout: 超时时间(秒) retry_times: 重试次数 """ self.base_url = base_url.rstrip('/') self.api_key = api_key self.timeout = timeout self.retry_times = retry_times # 从配置文件加载(如果base_url为空) if not self.base_url: self._load_config() def _load_config(self): """从配置文件加载API配置""" try: config_path = Path(__file__).parent.parent / "configs" / "api_config.json" if config_path.exists(): with open(config_path, 'r', encoding='utf-8') as f: config = json.load(f) api_config = config.get('historical_data_platform', {}) self.base_url = api_config.get('base_url', '') self.api_key = api_config.get('api_key') or self.api_key self.timeout = api_config.get('timeout', 30) self.retry_times = api_config.get('retry_times', 3) except Exception as e: print(f"⚠️ 加载API配置失败: {e}") def _request( self, method: str, endpoint: str, params: Optional[Dict] = None, data: Optional[Dict] = None ) -> Optional[Dict]: """发送HTTP请求""" if not self.base_url: print("⚠️ API base_url未配置") return None url = f"{self.base_url}{endpoint}" headers = {'Content-Type': 'application/json'} if self.api_key: headers['Authorization'] = f'Bearer {self.api_key}' for attempt in range(self.retry_times): try: if method.upper() == 'GET': response = requests.get(url, params=params, headers=headers, timeout=self.timeout) else: response = requests.post(url, json=data, params=params, headers=headers, timeout=self.timeout) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: if attempt == self.retry_times - 1: print(f"⚠️ API请求失败: {e}") return None continue return None def get_raw_data( self, device_id: str, days: int = 7, start_date: Optional[str] = None, end_date: Optional[str] = None ) -> Optional[Dict]: """ 获取用户原始数据 参数: device_id: 用户ID days: 过去N天(如果start_date和end_date未提供) start_date: 开始日期(YYYY-MM-DD格式) end_date: 结束日期(YYYY-MM-DD格式) 返回: { "deviceId": "user123", "data_points": [...], "total_count": 100 } """ endpoint = f"/api/raw-data/{device_id}" params = {} if start_date and end_date: params['start_date'] = start_date params['end_date'] = end_date else: params['days'] = days return self._request('GET', endpoint, params=params) def get_user_profile(self, device_id: str) -> Optional[Dict]: """ 获取用户个性化信息 返回: { "deviceId": "user123", "age_group": "30-35岁", "sex": "男性", ... } """ endpoint = f"/api/user-profile/{device_id}" return self._request('GET', endpoint) def get_historical_results( self, device_id: str, days: int = 7 ) -> Optional[Dict]: """ 获取历史检测结果 返回: { "deviceId": "user123", "daily_results": [...] } """ endpoint = f"/api/historical-results/{device_id}" params = {'days': days} return self._request('GET', endpoint, params=params)