File size: 13,306 Bytes
3f0377e
 
 
 
 
 
 
 
 
 
 
417b750
3f0377e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
417b750
3f0377e
 
 
417b750
3f0377e
 
 
 
 
 
 
 
 
 
 
 
 
69fb140
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3f0377e
 
 
 
 
 
 
 
69fb140
 
 
 
 
 
3f0377e
69fb140
 
3f0377e
69fb140
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3f0377e
69fb140
3f0377e
 
 
 
 
 
69fb140
3f0377e
69fb140
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3f0377e
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
"""
批次任務排程器(每日凌晨執行)
使用 Batch API 處理非即時任務,成本降低 50%

定時任務:
- 每日凌晨 3:00 執行記憶摘要
- 每週一凌晨 4:00 執行健康週報
"""

import asyncio
import logging
from datetime import datetime, time as dt_time, timedelta
from typing import Dict, List, Any
from services.batch_processor import batch_processor
from core.database import firestore_db

logger = logging.getLogger("batch_scheduler")


class BatchScheduler:
    """批次任務排程器"""

    def __init__(self):
        self.running = False
        self._tasks: List[asyncio.Task] = []

    async def start(self):
        """啟動排程器"""
        if self.running:
            logger.warning("⚠️ 排程器已在運行中")
            return

        self.running = True
        logger.info("🕐 批次任務排程器已啟動")

        # 啟動定時任務
        self._tasks.append(asyncio.create_task(self._daily_memory_summary()))
        self._tasks.append(asyncio.create_task(self._weekly_health_report()))

    async def stop(self):
        """停止排程器"""
        self.running = False

        for task in self._tasks:
            task.cancel()

        logger.info("🛑 批次任務排程器已停止")

    async def _daily_memory_summary(self):
        """
        每日記憶摘要任務(凌晨 3:00 執行)

        流程:
        1. 從數據庫獲取所有用戶的昨日記憶
        2. 創建批次任務
        3. 等待批次完成(最多 24 小時)
        4. 儲存摘要結果到數據庫
        """
        while self.running:
            try:
                # 計算下次執行時間(凌晨 3:00)
                now = datetime.now()
                target_time = dt_time(3, 0, 0)

                # 等待到凌晨 3:00
                await self._wait_until(target_time)

                if not self.running:
                    break

                logger.info("📚 開始執行每日記憶摘要任務...")

                # 1. 從數據庫獲取所有用戶的昨日記憶
                user_memories = await self._fetch_yesterday_memories()

                if not user_memories:
                    logger.info("📭 沒有需要摘要的記憶,跳過")
                    continue

                logger.info(f"📊 找到 {len(user_memories)} 位用戶的記憶需要摘要")

                # 2. 創建批次任務
                batch_id = await batch_processor.create_memory_summary_batch(user_memories)

                logger.info(f"🚀 批次任務已提交: {batch_id}")
                logger.info("⏳ 等待批次完成(最多 24 小時)...")

                # 3. 等待批次完成
                results = await batch_processor.wait_for_completion(batch_id)

                if results["success"]:
                    logger.info(f"✅ 批次任務完成,收到 {results['total_requests']} 個摘要")

                    # 4. 儲存摘要結果
                    await self._save_memory_summaries(results["results"])

                    logger.info("💾 記憶摘要已儲存到數據庫")
                else:
                    logger.error(f"❌ 批次任務失敗: {results.get('error')}")

            except Exception as e:
                logger.exception(f"❌ 每日記憶摘要任務發生錯誤: {e}")
                # 等待 1 小時後重試
                await asyncio.sleep(3600)

    async def _weekly_health_report(self):
        """
        每週健康報告任務(每週一凌晨 4:00 執行)

        流程:
        1. 從數據庫獲取所有用戶的本週健康數據
        2. 創建批次任務
        3. 等待批次完成
        4. 發送報告給用戶(通知)
        """
        while self.running:
            try:
                # 等待到週一凌晨 4:00
                now = datetime.now()
                target_time = dt_time(4, 0, 0)

                await self._wait_until(target_time)

                # 檢查是否為週一
                if now.weekday() != 0:  # 0 = 週一
                    await asyncio.sleep(3600)  # 不是週一,等 1 小時後再檢查
                    continue

                if not self.running:
                    break

                logger.info("❤️ 開始執行每週健康報告任務...")

                # 1. 獲取用戶健康數據
                user_health_data = await self._fetch_week_health_data()

                if not user_health_data:
                    logger.info("📭 沒有健康數據,跳過")
                    continue

                logger.info(f"📊 找到 {len(user_health_data)} 位用戶的健康數據")

                # 2. 創建批次任務
                batch_id = await batch_processor.create_health_report_batch(user_health_data)

                logger.info(f"🚀 批次任務已提交: {batch_id}")

                # 3. 等待批次完成
                results = await batch_processor.wait_for_completion(batch_id)

                if results["success"]:
                    logger.info(f"✅ 批次任務完成,收到 {results['total_requests']} 個報告")

                    # 4. TODO: 發送報告通知給用戶
                    # await self._send_health_reports(results["results"])

                    logger.info("📧 健康報告已準備就緒")
                else:
                    logger.error(f"❌ 批次任務失敗: {results.get('error')}")

            except Exception as e:
                logger.exception(f"❌ 每週健康報告任務發生錯誤: {e}")
                await asyncio.sleep(3600)

    async def _wait_until(self, target_time: dt_time):
        """等待到指定時間"""
        now = datetime.now()
        target = datetime.combine(now.date(), target_time)

        # 如果目標時間已過,設定為明天
        if target <= now:
            target += timedelta(days=1)

        wait_seconds = (target - now).total_seconds()
        logger.debug(f"⏰ 等待 {wait_seconds:.0f} 秒後執行(目標時間: {target})")

        await asyncio.sleep(wait_seconds)

    async def _fetch_yesterday_memories(self) -> Dict[str, List[str]]:
        """
        從數據庫獲取所有用戶的昨日記憶

        Returns:
            {user_id: [memory_1, memory_2, ...]}
        """
        try:
            if not firestore_db:
                logger.warning("⚠️ Firestore 未連接,無法獲取記憶")
                return {}

            from google.cloud.firestore import FieldFilter
            import asyncio

            # 計算昨日時間範圍
            yesterday_start = (datetime.now() - timedelta(days=1)).replace(
                hour=0, minute=0, second=0, microsecond=0
            )
            yesterday_end = yesterday_start + timedelta(days=1)

            # 獲取所有用戶
            users_collection = firestore_db.collection("users")
            
            def _fetch_users():
                return list(users_collection.stream())

            users = await asyncio.to_thread(_fetch_users)
            
            result: Dict[str, List[str]] = {}

            for user_doc in users:
                user_id = user_doc.id
                
                # 獲取該用戶的昨日記憶
                def _fetch_user_memories(uid: str):
                    memories_ref = firestore_db.collection("users").document(uid).collection("memories")
                    query = memories_ref.where(
                        filter=FieldFilter("created_at", ">=", yesterday_start)
                    ).where(
                        filter=FieldFilter("created_at", "<", yesterday_end)
                    )
                    return list(query.stream())

                memories = await asyncio.to_thread(_fetch_user_memories, user_id)
                
                if memories:
                    memory_contents = []
                    for mem_doc in memories:
                        mem_data = mem_doc.to_dict()
                        content = mem_data.get("content", "")
                        if content:
                            memory_contents.append(content)
                    
                    if memory_contents:
                        result[user_id] = memory_contents

            logger.info(f"📚 獲取到 {len(result)} 位用戶的昨日記憶")
            return result

        except Exception as e:
            logger.exception(f"❌ 獲取昨日記憶失敗: {e}")
            return {}

    async def _save_memory_summaries(self, results: List[Dict[str, Any]]):
        """
        儲存記憶摘要到數據庫

        Args:
            results: 批次結果列表
        """
        if not firestore_db:
            logger.warning("⚠️ Firestore 未連接,無法儲存摘要")
            return

        import asyncio
        
        logger.info(f"💾 準備儲存 {len(results)} 條記憶摘要")
        
        saved_count = 0
        for result in results:
            try:
                custom_id = result.get("custom_id")  # user_id
                response = result.get("response", {}).get("body", {})
                summary = response.get("choices", [{}])[0].get("message", {}).get("content", "")

                if not custom_id or not summary:
                    continue

                logger.debug(f"📝 用戶 {custom_id} 的摘要: {summary[:50]}...")

                # 儲存到用戶的記憶摘要集合
                def _save_summary(uid: str, summary_text: str):
                    summaries_ref = firestore_db.collection("users").document(uid).collection("memory_summaries")
                    summaries_ref.add({
                        "summary": summary_text,
                        "date": datetime.now().date().isoformat(),
                        "created_at": datetime.now(),
                        "type": "daily",
                    })

                await asyncio.to_thread(_save_summary, custom_id, summary)
                saved_count += 1

            except Exception as e:
                logger.error(f"❌ 儲存用戶 {custom_id} 的摘要失敗: {e}")

        logger.info(f"✅ 成功儲存 {saved_count}/{len(results)} 條記憶摘要")

    async def _fetch_week_health_data(self) -> Dict[str, Dict[str, Any]]:
        """
        從數據庫獲取所有用戶的本週健康數據

        Returns:
            {user_id: {heart_rate: [...], steps: [...], sleep: [...], ...}}
        """
        try:
            if not firestore_db:
                logger.warning("⚠️ Firestore 未連接,無法獲取健康數據")
                return {}

            from google.cloud.firestore import FieldFilter
            import asyncio

            # 計算本週時間範圍(週一到今天)
            today = datetime.now()
            week_start = today - timedelta(days=today.weekday())
            week_start = week_start.replace(hour=0, minute=0, second=0, microsecond=0)

            # 獲取所有用戶
            users_collection = firestore_db.collection("users")
            
            def _fetch_users():
                return list(users_collection.stream())

            users = await asyncio.to_thread(_fetch_users)
            
            result: Dict[str, Dict[str, Any]] = {}

            for user_doc in users:
                user_id = user_doc.id
                
                # 獲取該用戶的本週健康數據
                def _fetch_user_health(uid: str):
                    health_ref = firestore_db.collection("health_data")
                    query = health_ref.where(
                        filter=FieldFilter("user_id", "==", uid)
                    ).where(
                        filter=FieldFilter("timestamp", ">=", week_start)
                    )
                    return list(query.stream())

                health_docs = await asyncio.to_thread(_fetch_user_health, user_id)
                
                if health_docs:
                    user_health: Dict[str, List[Any]] = {
                        "heart_rate": [],
                        "steps": [],
                        "sleep": [],
                        "active_calories": [],
                    }
                    
                    for doc in health_docs:
                        data = doc.to_dict()
                        data_type = data.get("type")
                        value = data.get("value")
                        timestamp = data.get("timestamp")
                        
                        if data_type in user_health and value is not None:
                            user_health[data_type].append({
                                "value": value,
                                "timestamp": timestamp,
                            })
                    
                    # 只保留有數據的用戶
                    if any(user_health.values()):
                        result[user_id] = user_health

            logger.info(f"❤️ 獲取到 {len(result)} 位用戶的本週健康數據")
            return result

        except Exception as e:
            logger.exception(f"❌ 獲取本週健康數據失敗: {e}")
            return {}


# 全域單例
batch_scheduler = BatchScheduler()