Spaces:
Runtime error
Runtime error
| """ | |
| PregoPal - 核心循环引擎 | |
| ======================= | |
| 状态机驱动的每日生命周期管理。 | |
| 状态流转: | |
| LAUNCH → FAMILY_QUIZ → SUMMARIZE → ANALYZE → BRIEF → INTERACT | |
| → CONSOLIDATE → (THREE_DAY) → DONE | |
| 状态位机制: | |
| data/presets/.daily_status.json 记录每日完成状态 | |
| """ | |
| import json | |
| import datetime | |
| import asyncio | |
| from enum import Enum | |
| from pathlib import Path | |
| from dataclasses import dataclass, field | |
| from plugins.base import LoopPlugin, PluginResult, PluginRegistry, LoopStage, LoopContext | |
| PRESETS_DIR = Path("data/presets") | |
| STATUS_FILE = PRESETS_DIR / ".daily_status.json" | |
| # ============================================================ | |
| # 状态枚举 | |
| # ============================================================ | |
| class LoopState(Enum): | |
| LAUNCH = "launch" # 启动检查:检查今日状态位 | |
| FAMILY_QUIZ = "family_quiz" # 家庭问卷:检查是否需要询问菜谱/体重 | |
| SUMMARIZE = "summarize" # 昨日总结:分析昨日饮食/体重/家庭记忆 | |
| ANALYZE = "analyze" # 营养分析:对比 DRIs | |
| BRIEF = "brief" # 生成今日简报 | |
| INTERACT = "interact" # 白天交互模式(等待用户操作) | |
| THREE_DAY = "three_day" # 每三天自动总结 | |
| CONSOLIDATE = "consolidate" # 晚间整理 | |
| DONE = "done" # 标记今日完成 | |
| # ============================================================ | |
| # 状态转移表 | |
| # ============================================================ | |
| _TRANSITIONS = { | |
| (LoopState.LAUNCH, "need_summary"): LoopState.FAMILY_QUIZ, | |
| (LoopState.LAUNCH, "already_done"): LoopState.INTERACT, | |
| (LoopState.FAMILY_QUIZ, "ok"): LoopState.SUMMARIZE, | |
| (LoopState.SUMMARIZE, "ok"): LoopState.ANALYZE, | |
| (LoopState.ANALYZE, "ok"): LoopState.BRIEF, | |
| (LoopState.BRIEF, "ok"): LoopState.INTERACT, | |
| (LoopState.INTERACT, "day_ended"): LoopState.CONSOLIDATE, | |
| (LoopState.CONSOLIDATE, "need_3day"): LoopState.THREE_DAY, | |
| (LoopState.CONSOLIDATE, "ok"): LoopState.DONE, | |
| (LoopState.THREE_DAY, "ok"): LoopState.DONE, | |
| } | |
| # ============================================================ | |
| # 状态位管理 | |
| # ============================================================ | |
| class DailyStatus: | |
| """管理每日状态位""" | |
| STATUS_FILE = STATUS_FILE | |
| def load() -> dict: | |
| """加载状态位文件""" | |
| if STATUS_FILE.exists(): | |
| with open(STATUS_FILE, 'r', encoding='utf-8') as f: | |
| return json.load(f) | |
| return {} | |
| def save(status: dict): | |
| """保存状态位文件""" | |
| PRESETS_DIR.mkdir(parents=True, exist_ok=True) | |
| with open(STATUS_FILE, 'w', encoding='utf-8') as f: | |
| json.dump(status, f, ensure_ascii=False, indent=2) | |
| def is_today_done() -> bool: | |
| """检查今日是否已完成总结""" | |
| today = datetime.date.today().isoformat() | |
| status = DailyStatus.load() | |
| return status.get(today, {}).get("summary_done", False) | |
| def mark_summary_done(): | |
| """标记今日总结完成""" | |
| today = datetime.date.today().isoformat() | |
| status = DailyStatus.load() | |
| status[today] = status.get(today, {}) | |
| status[today]["summary_done"] = True | |
| status[today]["completed_at"] = datetime.datetime.now().isoformat() | |
| DailyStatus.save(status) | |
| def mark_day_ended(): | |
| """标记今日结束""" | |
| today = datetime.date.today().isoformat() | |
| status = DailyStatus.load() | |
| status[today] = status.get(today, {}) | |
| status[today]["day_ended"] = True | |
| status[today]["ended_at"] = datetime.datetime.now().isoformat() | |
| DailyStatus.save(status) | |
| def get_last_summary_date() -> str | None: | |
| """获取最近一次总结的日期""" | |
| status = DailyStatus.load() | |
| done_dates = [d for d, v in status.items() if v.get("summary_done")] | |
| return max(done_dates) if done_dates else None | |
| def days_since_last_summary() -> int: | |
| """距离上次总结的天数""" | |
| last = DailyStatus.get_last_summary_date() | |
| if last is None: | |
| return 999 | |
| last_date = datetime.date.fromisoformat(last) | |
| return (datetime.date.today() - last_date).days | |
| def should_three_day_summary() -> bool: | |
| """检查是否需要进行三天总结""" | |
| return DailyStatus.days_since_last_summary() >= 3 | |
| # ============================================================ | |
| # 主循环引擎 | |
| # ============================================================ | |
| class PregoPalLoop: | |
| """核心循环引擎""" | |
| def __init__(self): | |
| self.plugins = PluginRegistry() | |
| self.state = LoopState.LAUNCH | |
| self.context = LoopContext() | |
| self._register_default_plugins() | |
| def _register_default_plugins(self): | |
| """注册默认插件""" | |
| from plugins.family_quiz import FamilyRecipeQuizPlugin, WeightQuizPlugin | |
| from plugins.diet_summary import DietSummaryPlugin | |
| from plugins.weight_check import WeightCheckPlugin | |
| from plugins.family_memory import FamilyMemoryPlugin | |
| from plugins.dri_analysis import DRIAnalysisPlugin | |
| from plugins.briefing_generator import BriefingGeneratorPlugin | |
| from plugins.three_day_summary import ThreeDaySummaryPlugin | |
| from plugins.preset_writer import PresetWriterPlugin | |
| self.plugins.register(FamilyRecipeQuizPlugin()) | |
| self.plugins.register(WeightQuizPlugin()) | |
| self.plugins.register(DietSummaryPlugin()) | |
| self.plugins.register(WeightCheckPlugin()) | |
| self.plugins.register(FamilyMemoryPlugin()) | |
| self.plugins.register(DRIAnalysisPlugin()) | |
| self.plugins.register(BriefingGeneratorPlugin()) | |
| self.plugins.register(ThreeDaySummaryPlugin()) | |
| self.plugins.register(PresetWriterPlugin()) | |
| async def run(self) -> None: | |
| """主循环入口""" | |
| while self.state is not LoopState.DONE: | |
| handler_name = f"_state_{self.state.value}" | |
| handler = getattr(self, handler_name) | |
| event = await handler() | |
| next_state = _TRANSITIONS.get((self.state, event)) | |
| if next_state is None: | |
| raise RuntimeError( | |
| f"No transition from {self.state} on '{event}'" | |
| ) | |
| self.state = next_state | |
| # ============================================================ | |
| # 状态处理器 | |
| # ============================================================ | |
| async def _state_launch(self) -> str: | |
| """启动检查:检查今日状态位""" | |
| if DailyStatus.is_today_done(): | |
| return "already_done" | |
| return "need_summary" | |
| async def _state_family_quiz(self) -> str: | |
| """家庭问卷:检查是否需要询问菜谱/体重""" | |
| for plugin in self.plugins.get_plugins(LoopStage.FAMILY_QUIZ): | |
| result = await plugin.run(self.context) | |
| if not result.success: | |
| self.context.errors.append(result.message) | |
| return "ok" | |
| async def _state_summarize(self) -> str: | |
| """昨日总结:执行所有 SUMMARIZE 阶段插件""" | |
| for plugin in self.plugins.get_plugins(LoopStage.SUMMARIZE): | |
| result = await plugin.run(self.context) | |
| if not result.success: | |
| self.context.errors.append(result.message) | |
| return "ok" | |
| async def _state_analyze(self) -> str: | |
| """营养分析:执行所有 ANALYZE 阶段插件""" | |
| for plugin in self.plugins.get_plugins(LoopStage.ANALYZE): | |
| result = await plugin.run(self.context) | |
| if not result.success: | |
| self.context.errors.append(result.message) | |
| return "ok" | |
| async def _state_brief(self) -> str: | |
| """生成今日简报:执行所有 BRIEF 阶段插件""" | |
| for plugin in self.plugins.get_plugins(LoopStage.BRIEF): | |
| result = await plugin.run(self.context) | |
| if not result.success: | |
| self.context.errors.append(result.message) | |
| return "ok" | |
| async def _state_interact(self) -> str: | |
| """ | |
| 白天交互模式:等待用户操作 | |
| 此状态由外部事件触发(Gradio 界面交互) | |
| """ | |
| return "day_ended" | |
| async def _state_three_day(self) -> str: | |
| """每三天总结:执行所有 THREE_DAY 阶段插件""" | |
| for plugin in self.plugins.get_plugins(LoopStage.THREE_DAY): | |
| result = await plugin.run(self.context) | |
| if not result.success: | |
| self.context.errors.append(result.message) | |
| return "ok" | |
| async def _state_consolidate(self) -> str: | |
| """晚间整理:执行所有 CONSOLIDATE 阶段插件""" | |
| for plugin in self.plugins.get_plugins(LoopStage.CONSOLIDATE): | |
| result = await plugin.run(self.context) | |
| if not result.success: | |
| self.context.errors.append(result.message) | |
| if DailyStatus.should_three_day_summary(): | |
| return "need_3day" | |
| return "ok" | |
| async def _state_done(self) -> str: | |
| """标记今日完成""" | |
| DailyStatus.mark_day_ended() | |
| return "done_complete" | |
| # ============================================================ | |
| # 外部接口 | |
| # ============================================================ | |
| def get_briefing(self) -> dict: | |
| """获取今日简报""" | |
| return self.context.briefing | |
| def get_thinking_keywords(self) -> str: | |
| """获取当前思考关键词(用于 UI 显示)""" | |
| return self.context.briefing.get("thinking_keywords", "") | |
| def get_errors(self) -> list[str]: | |
| """获取错误列表""" | |
| return self.context.errors | |
| def run_sync(self): | |
| """同步运行(用于非异步环境)""" | |
| asyncio.run(self.run()) | |
| # ============================================================ | |
| # 便捷函数 | |
| # ============================================================ | |
| def run_daily_loop(): | |
| """运行每日循环(同步入口)""" | |
| loop = PregoPalLoop() | |
| loop.run_sync() | |
| return loop | |
| def check_and_run_loop() -> PregoPalLoop: | |
| """ | |
| 检查并运行循环(供 Gradio 启动时调用) | |
| 如果今日已总结,直接返回 loop 实例(不执行总结流程) | |
| 如果今日未总结,执行完整总结流程 | |
| """ | |
| loop = PregoPalLoop() | |
| if not DailyStatus.is_today_done(): | |
| loop.run_sync() | |
| return loop | |