Bloom_Ware / services /batch_scheduler.py
XiaoBai1221's picture
Latest
69fb140
"""
批次任務排程器(每日凌晨執行)
使用 Batch API 處理非即時任務,成本降低 50%
定時任務:
- 每日凌晨 3:00 執行記憶摘要
- 每週一凌晨 4:00 執行健康週報
"""
import asyncio
import logging
from datetime import datetime, time as dt_time, timedelta
from typing import Dict, List, Any
from services.batch_processor import batch_processor
from core.database import firestore_db
logger = logging.getLogger("batch_scheduler")
class BatchScheduler:
"""批次任務排程器"""
def __init__(self):
self.running = False
self._tasks: List[asyncio.Task] = []
async def start(self):
"""啟動排程器"""
if self.running:
logger.warning("⚠️ 排程器已在運行中")
return
self.running = True
logger.info("🕐 批次任務排程器已啟動")
# 啟動定時任務
self._tasks.append(asyncio.create_task(self._daily_memory_summary()))
self._tasks.append(asyncio.create_task(self._weekly_health_report()))
async def stop(self):
"""停止排程器"""
self.running = False
for task in self._tasks:
task.cancel()
logger.info("🛑 批次任務排程器已停止")
async def _daily_memory_summary(self):
"""
每日記憶摘要任務(凌晨 3:00 執行)
流程:
1. 從數據庫獲取所有用戶的昨日記憶
2. 創建批次任務
3. 等待批次完成(最多 24 小時)
4. 儲存摘要結果到數據庫
"""
while self.running:
try:
# 計算下次執行時間(凌晨 3:00)
now = datetime.now()
target_time = dt_time(3, 0, 0)
# 等待到凌晨 3:00
await self._wait_until(target_time)
if not self.running:
break
logger.info("📚 開始執行每日記憶摘要任務...")
# 1. 從數據庫獲取所有用戶的昨日記憶
user_memories = await self._fetch_yesterday_memories()
if not user_memories:
logger.info("📭 沒有需要摘要的記憶,跳過")
continue
logger.info(f"📊 找到 {len(user_memories)} 位用戶的記憶需要摘要")
# 2. 創建批次任務
batch_id = await batch_processor.create_memory_summary_batch(user_memories)
logger.info(f"🚀 批次任務已提交: {batch_id}")
logger.info("⏳ 等待批次完成(最多 24 小時)...")
# 3. 等待批次完成
results = await batch_processor.wait_for_completion(batch_id)
if results["success"]:
logger.info(f"✅ 批次任務完成,收到 {results['total_requests']} 個摘要")
# 4. 儲存摘要結果
await self._save_memory_summaries(results["results"])
logger.info("💾 記憶摘要已儲存到數據庫")
else:
logger.error(f"❌ 批次任務失敗: {results.get('error')}")
except Exception as e:
logger.exception(f"❌ 每日記憶摘要任務發生錯誤: {e}")
# 等待 1 小時後重試
await asyncio.sleep(3600)
async def _weekly_health_report(self):
"""
每週健康報告任務(每週一凌晨 4:00 執行)
流程:
1. 從數據庫獲取所有用戶的本週健康數據
2. 創建批次任務
3. 等待批次完成
4. 發送報告給用戶(通知)
"""
while self.running:
try:
# 等待到週一凌晨 4:00
now = datetime.now()
target_time = dt_time(4, 0, 0)
await self._wait_until(target_time)
# 檢查是否為週一
if now.weekday() != 0: # 0 = 週一
await asyncio.sleep(3600) # 不是週一,等 1 小時後再檢查
continue
if not self.running:
break
logger.info("❤️ 開始執行每週健康報告任務...")
# 1. 獲取用戶健康數據
user_health_data = await self._fetch_week_health_data()
if not user_health_data:
logger.info("📭 沒有健康數據,跳過")
continue
logger.info(f"📊 找到 {len(user_health_data)} 位用戶的健康數據")
# 2. 創建批次任務
batch_id = await batch_processor.create_health_report_batch(user_health_data)
logger.info(f"🚀 批次任務已提交: {batch_id}")
# 3. 等待批次完成
results = await batch_processor.wait_for_completion(batch_id)
if results["success"]:
logger.info(f"✅ 批次任務完成,收到 {results['total_requests']} 個報告")
# 4. TODO: 發送報告通知給用戶
# await self._send_health_reports(results["results"])
logger.info("📧 健康報告已準備就緒")
else:
logger.error(f"❌ 批次任務失敗: {results.get('error')}")
except Exception as e:
logger.exception(f"❌ 每週健康報告任務發生錯誤: {e}")
await asyncio.sleep(3600)
async def _wait_until(self, target_time: dt_time):
"""等待到指定時間"""
now = datetime.now()
target = datetime.combine(now.date(), target_time)
# 如果目標時間已過,設定為明天
if target <= now:
target += timedelta(days=1)
wait_seconds = (target - now).total_seconds()
logger.debug(f"⏰ 等待 {wait_seconds:.0f} 秒後執行(目標時間: {target})")
await asyncio.sleep(wait_seconds)
async def _fetch_yesterday_memories(self) -> Dict[str, List[str]]:
"""
從數據庫獲取所有用戶的昨日記憶
Returns:
{user_id: [memory_1, memory_2, ...]}
"""
try:
if not firestore_db:
logger.warning("⚠️ Firestore 未連接,無法獲取記憶")
return {}
from google.cloud.firestore import FieldFilter
import asyncio
# 計算昨日時間範圍
yesterday_start = (datetime.now() - timedelta(days=1)).replace(
hour=0, minute=0, second=0, microsecond=0
)
yesterday_end = yesterday_start + timedelta(days=1)
# 獲取所有用戶
users_collection = firestore_db.collection("users")
def _fetch_users():
return list(users_collection.stream())
users = await asyncio.to_thread(_fetch_users)
result: Dict[str, List[str]] = {}
for user_doc in users:
user_id = user_doc.id
# 獲取該用戶的昨日記憶
def _fetch_user_memories(uid: str):
memories_ref = firestore_db.collection("users").document(uid).collection("memories")
query = memories_ref.where(
filter=FieldFilter("created_at", ">=", yesterday_start)
).where(
filter=FieldFilter("created_at", "<", yesterday_end)
)
return list(query.stream())
memories = await asyncio.to_thread(_fetch_user_memories, user_id)
if memories:
memory_contents = []
for mem_doc in memories:
mem_data = mem_doc.to_dict()
content = mem_data.get("content", "")
if content:
memory_contents.append(content)
if memory_contents:
result[user_id] = memory_contents
logger.info(f"📚 獲取到 {len(result)} 位用戶的昨日記憶")
return result
except Exception as e:
logger.exception(f"❌ 獲取昨日記憶失敗: {e}")
return {}
async def _save_memory_summaries(self, results: List[Dict[str, Any]]):
"""
儲存記憶摘要到數據庫
Args:
results: 批次結果列表
"""
if not firestore_db:
logger.warning("⚠️ Firestore 未連接,無法儲存摘要")
return
import asyncio
logger.info(f"💾 準備儲存 {len(results)} 條記憶摘要")
saved_count = 0
for result in results:
try:
custom_id = result.get("custom_id") # user_id
response = result.get("response", {}).get("body", {})
summary = response.get("choices", [{}])[0].get("message", {}).get("content", "")
if not custom_id or not summary:
continue
logger.debug(f"📝 用戶 {custom_id} 的摘要: {summary[:50]}...")
# 儲存到用戶的記憶摘要集合
def _save_summary(uid: str, summary_text: str):
summaries_ref = firestore_db.collection("users").document(uid).collection("memory_summaries")
summaries_ref.add({
"summary": summary_text,
"date": datetime.now().date().isoformat(),
"created_at": datetime.now(),
"type": "daily",
})
await asyncio.to_thread(_save_summary, custom_id, summary)
saved_count += 1
except Exception as e:
logger.error(f"❌ 儲存用戶 {custom_id} 的摘要失敗: {e}")
logger.info(f"✅ 成功儲存 {saved_count}/{len(results)} 條記憶摘要")
async def _fetch_week_health_data(self) -> Dict[str, Dict[str, Any]]:
"""
從數據庫獲取所有用戶的本週健康數據
Returns:
{user_id: {heart_rate: [...], steps: [...], sleep: [...], ...}}
"""
try:
if not firestore_db:
logger.warning("⚠️ Firestore 未連接,無法獲取健康數據")
return {}
from google.cloud.firestore import FieldFilter
import asyncio
# 計算本週時間範圍(週一到今天)
today = datetime.now()
week_start = today - timedelta(days=today.weekday())
week_start = week_start.replace(hour=0, minute=0, second=0, microsecond=0)
# 獲取所有用戶
users_collection = firestore_db.collection("users")
def _fetch_users():
return list(users_collection.stream())
users = await asyncio.to_thread(_fetch_users)
result: Dict[str, Dict[str, Any]] = {}
for user_doc in users:
user_id = user_doc.id
# 獲取該用戶的本週健康數據
def _fetch_user_health(uid: str):
health_ref = firestore_db.collection("health_data")
query = health_ref.where(
filter=FieldFilter("user_id", "==", uid)
).where(
filter=FieldFilter("timestamp", ">=", week_start)
)
return list(query.stream())
health_docs = await asyncio.to_thread(_fetch_user_health, user_id)
if health_docs:
user_health: Dict[str, List[Any]] = {
"heart_rate": [],
"steps": [],
"sleep": [],
"active_calories": [],
}
for doc in health_docs:
data = doc.to_dict()
data_type = data.get("type")
value = data.get("value")
timestamp = data.get("timestamp")
if data_type in user_health and value is not None:
user_health[data_type].append({
"value": value,
"timestamp": timestamp,
})
# 只保留有數據的用戶
if any(user_health.values()):
result[user_id] = user_health
logger.info(f"❤️ 獲取到 {len(result)} 位用戶的本週健康數據")
return result
except Exception as e:
logger.exception(f"❌ 獲取本週健康數據失敗: {e}")
return {}
# 全域單例
batch_scheduler = BatchScheduler()