""" PregoPal - 核心循环引擎 ======================= 状态机驱动的每日生命周期管理。 状态流转: LAUNCH → FAMILY_QUIZ → SUMMARIZE → ANALYZE → BRIEF → INTERACT → CONSOLIDATE → (THREE_DAY) → DONE 状态位机制: data/presets/.daily_status.json 记录每日完成状态 """ import json import datetime import asyncio from enum import Enum from pathlib import Path from dataclasses import dataclass, field from plugins.base import LoopPlugin, PluginResult, PluginRegistry, LoopStage, LoopContext PRESETS_DIR = Path("data/presets") STATUS_FILE = PRESETS_DIR / ".daily_status.json" # ============================================================ # 状态枚举 # ============================================================ class LoopState(Enum): LAUNCH = "launch" # 启动检查:检查今日状态位 FAMILY_QUIZ = "family_quiz" # 家庭问卷:检查是否需要询问菜谱/体重 SUMMARIZE = "summarize" # 昨日总结:分析昨日饮食/体重/家庭记忆 ANALYZE = "analyze" # 营养分析:对比 DRIs BRIEF = "brief" # 生成今日简报 INTERACT = "interact" # 白天交互模式(等待用户操作) THREE_DAY = "three_day" # 每三天自动总结 CONSOLIDATE = "consolidate" # 晚间整理 DONE = "done" # 标记今日完成 # ============================================================ # 状态转移表 # ============================================================ _TRANSITIONS = { (LoopState.LAUNCH, "need_summary"): LoopState.FAMILY_QUIZ, (LoopState.LAUNCH, "already_done"): LoopState.INTERACT, (LoopState.FAMILY_QUIZ, "ok"): LoopState.SUMMARIZE, (LoopState.SUMMARIZE, "ok"): LoopState.ANALYZE, (LoopState.ANALYZE, "ok"): LoopState.BRIEF, (LoopState.BRIEF, "ok"): LoopState.INTERACT, (LoopState.INTERACT, "day_ended"): LoopState.CONSOLIDATE, (LoopState.CONSOLIDATE, "need_3day"): LoopState.THREE_DAY, (LoopState.CONSOLIDATE, "ok"): LoopState.DONE, (LoopState.THREE_DAY, "ok"): LoopState.DONE, } # ============================================================ # 状态位管理 # ============================================================ class DailyStatus: """管理每日状态位""" STATUS_FILE = STATUS_FILE @staticmethod def load() -> dict: """加载状态位文件""" if STATUS_FILE.exists(): with open(STATUS_FILE, 'r', encoding='utf-8') as f: return json.load(f) return {} @staticmethod def save(status: dict): """保存状态位文件""" PRESETS_DIR.mkdir(parents=True, exist_ok=True) with open(STATUS_FILE, 'w', encoding='utf-8') as f: json.dump(status, f, ensure_ascii=False, indent=2) @staticmethod def is_today_done() -> bool: """检查今日是否已完成总结""" today = datetime.date.today().isoformat() status = DailyStatus.load() return status.get(today, {}).get("summary_done", False) @staticmethod def mark_summary_done(): """标记今日总结完成""" today = datetime.date.today().isoformat() status = DailyStatus.load() status[today] = status.get(today, {}) status[today]["summary_done"] = True status[today]["completed_at"] = datetime.datetime.now().isoformat() DailyStatus.save(status) @staticmethod def mark_day_ended(): """标记今日结束""" today = datetime.date.today().isoformat() status = DailyStatus.load() status[today] = status.get(today, {}) status[today]["day_ended"] = True status[today]["ended_at"] = datetime.datetime.now().isoformat() DailyStatus.save(status) @staticmethod def get_last_summary_date() -> str | None: """获取最近一次总结的日期""" status = DailyStatus.load() done_dates = [d for d, v in status.items() if v.get("summary_done")] return max(done_dates) if done_dates else None @staticmethod def days_since_last_summary() -> int: """距离上次总结的天数""" last = DailyStatus.get_last_summary_date() if last is None: return 999 last_date = datetime.date.fromisoformat(last) return (datetime.date.today() - last_date).days @staticmethod def should_three_day_summary() -> bool: """检查是否需要进行三天总结""" return DailyStatus.days_since_last_summary() >= 3 # ============================================================ # 主循环引擎 # ============================================================ class PregoPalLoop: """核心循环引擎""" def __init__(self): self.plugins = PluginRegistry() self.state = LoopState.LAUNCH self.context = LoopContext() self._register_default_plugins() def _register_default_plugins(self): """注册默认插件""" from plugins.family_quiz import FamilyRecipeQuizPlugin, WeightQuizPlugin from plugins.diet_summary import DietSummaryPlugin from plugins.weight_check import WeightCheckPlugin from plugins.family_memory import FamilyMemoryPlugin from plugins.dri_analysis import DRIAnalysisPlugin from plugins.briefing_generator import BriefingGeneratorPlugin from plugins.three_day_summary import ThreeDaySummaryPlugin from plugins.preset_writer import PresetWriterPlugin self.plugins.register(FamilyRecipeQuizPlugin()) self.plugins.register(WeightQuizPlugin()) self.plugins.register(DietSummaryPlugin()) self.plugins.register(WeightCheckPlugin()) self.plugins.register(FamilyMemoryPlugin()) self.plugins.register(DRIAnalysisPlugin()) self.plugins.register(BriefingGeneratorPlugin()) self.plugins.register(ThreeDaySummaryPlugin()) self.plugins.register(PresetWriterPlugin()) async def run(self) -> None: """主循环入口""" while self.state is not LoopState.DONE: handler_name = f"_state_{self.state.value}" handler = getattr(self, handler_name) event = await handler() next_state = _TRANSITIONS.get((self.state, event)) if next_state is None: raise RuntimeError( f"No transition from {self.state} on '{event}'" ) self.state = next_state # ============================================================ # 状态处理器 # ============================================================ async def _state_launch(self) -> str: """启动检查:检查今日状态位""" if DailyStatus.is_today_done(): return "already_done" return "need_summary" async def _state_family_quiz(self) -> str: """家庭问卷:检查是否需要询问菜谱/体重""" for plugin in self.plugins.get_plugins(LoopStage.FAMILY_QUIZ): result = await plugin.run(self.context) if not result.success: self.context.errors.append(result.message) return "ok" async def _state_summarize(self) -> str: """昨日总结:执行所有 SUMMARIZE 阶段插件""" for plugin in self.plugins.get_plugins(LoopStage.SUMMARIZE): result = await plugin.run(self.context) if not result.success: self.context.errors.append(result.message) return "ok" async def _state_analyze(self) -> str: """营养分析:执行所有 ANALYZE 阶段插件""" for plugin in self.plugins.get_plugins(LoopStage.ANALYZE): result = await plugin.run(self.context) if not result.success: self.context.errors.append(result.message) return "ok" async def _state_brief(self) -> str: """生成今日简报:执行所有 BRIEF 阶段插件""" for plugin in self.plugins.get_plugins(LoopStage.BRIEF): result = await plugin.run(self.context) if not result.success: self.context.errors.append(result.message) return "ok" async def _state_interact(self) -> str: """ 白天交互模式:等待用户操作 此状态由外部事件触发(Gradio 界面交互) """ return "day_ended" async def _state_three_day(self) -> str: """每三天总结:执行所有 THREE_DAY 阶段插件""" for plugin in self.plugins.get_plugins(LoopStage.THREE_DAY): result = await plugin.run(self.context) if not result.success: self.context.errors.append(result.message) return "ok" async def _state_consolidate(self) -> str: """晚间整理:执行所有 CONSOLIDATE 阶段插件""" for plugin in self.plugins.get_plugins(LoopStage.CONSOLIDATE): result = await plugin.run(self.context) if not result.success: self.context.errors.append(result.message) if DailyStatus.should_three_day_summary(): return "need_3day" return "ok" async def _state_done(self) -> str: """标记今日完成""" DailyStatus.mark_day_ended() return "done_complete" # ============================================================ # 外部接口 # ============================================================ def get_briefing(self) -> dict: """获取今日简报""" return self.context.briefing def get_thinking_keywords(self) -> str: """获取当前思考关键词(用于 UI 显示)""" return self.context.briefing.get("thinking_keywords", "") def get_errors(self) -> list[str]: """获取错误列表""" return self.context.errors def run_sync(self): """同步运行(用于非异步环境)""" asyncio.run(self.run()) # ============================================================ # 便捷函数 # ============================================================ def run_daily_loop(): """运行每日循环(同步入口)""" loop = PregoPalLoop() loop.run_sync() return loop def check_and_run_loop() -> PregoPalLoop: """ 检查并运行循环(供 Gradio 启动时调用) 如果今日已总结,直接返回 loop 实例(不执行总结流程) 如果今日未总结,执行完整总结流程 """ loop = PregoPalLoop() if not DailyStatus.is_today_done(): loop.run_sync() return loop