Spaces:
Running
Running
File size: 1,819 Bytes
557ee65 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 | # src/tools/stores.py
from typing import List, Dict, Any, Optional
from .interfaces import IFeatureStore, IPlanStore
class InMemoryFeatureStore(IFeatureStore):
def __init__(self):
self._features: Dict[str, Dict[str, Any]] = {}
self._weekly_summaries: Dict[str, Dict[str, Any]] = {}
self._all_features_list: List[Dict[str, Any]] = []
def put_features(self, run_id: str, features: Dict[str, Any]):
self._features[run_id] = features
# For simplicity in this PoC, we also maintain an ordered list
self._all_features_list.append(features)
def get_features(self, run_id: str) -> Optional[Dict[str, Any]]:
return self._features.get(run_id)
def put_weekly_summary(self, week_key: str, summary: Dict[str, Any]):
self._weekly_summaries[week_key] = summary
def get_weekly_summary(self, week_key: str) -> Optional[Dict[str, Any]]:
return self._weekly_summaries.get(week_key)
def get_all_features(self) -> List[Dict[str, Any]]:
return self._all_features_list
def clear(self):
self._features.clear()
self._weekly_summaries.clear()
self._all_features_list.clear()
class InMemoryPlanStore(IPlanStore):
def __init__(self):
self._plans: Dict[str, str] = {}
self._latest_plan_id: Optional[str] = None
def save_plan(self, plan_id: str, plan_obj: str):
self._plans[plan_id] = plan_obj
self._latest_plan_id = plan_id
def get_plan(self, plan_id: str) -> Optional[str]:
return self._plans.get(plan_id)
def get_latest_plan(self) -> Optional[str]:
if self._latest_plan_id:
return self._plans.get(self._latest_plan_id)
return None
def clear(self):
self._plans.clear()
self._latest_plan_id = None
|