Spaces:
Running
Running
| # src/tools/stores.py | |
| from typing import List, Dict, Any, Optional | |
| from .interfaces import IFeatureStore, IPlanStore | |
| class InMemoryFeatureStore(IFeatureStore): | |
| def __init__(self): | |
| self._features: Dict[str, Dict[str, Any]] = {} | |
| self._weekly_summaries: Dict[str, Dict[str, Any]] = {} | |
| self._all_features_list: List[Dict[str, Any]] = [] | |
| def put_features(self, run_id: str, features: Dict[str, Any]): | |
| self._features[run_id] = features | |
| # For simplicity in this PoC, we also maintain an ordered list | |
| self._all_features_list.append(features) | |
| def get_features(self, run_id: str) -> Optional[Dict[str, Any]]: | |
| return self._features.get(run_id) | |
| def put_weekly_summary(self, week_key: str, summary: Dict[str, Any]): | |
| self._weekly_summaries[week_key] = summary | |
| def get_weekly_summary(self, week_key: str) -> Optional[Dict[str, Any]]: | |
| return self._weekly_summaries.get(week_key) | |
| def get_all_features(self) -> List[Dict[str, Any]]: | |
| return self._all_features_list | |
| def clear(self): | |
| self._features.clear() | |
| self._weekly_summaries.clear() | |
| self._all_features_list.clear() | |
| class InMemoryPlanStore(IPlanStore): | |
| def __init__(self): | |
| self._plans: Dict[str, str] = {} | |
| self._latest_plan_id: Optional[str] = None | |
| def save_plan(self, plan_id: str, plan_obj: str): | |
| self._plans[plan_id] = plan_obj | |
| self._latest_plan_id = plan_id | |
| def get_plan(self, plan_id: str) -> Optional[str]: | |
| return self._plans.get(plan_id) | |
| def get_latest_plan(self) -> Optional[str]: | |
| if self._latest_plan_id: | |
| return self._plans.get(self._latest_plan_id) | |
| return None | |
| def clear(self): | |
| self._plans.clear() | |
| self._latest_plan_id = None | |