Spaces:
Paused
Paused
| # feedback_agent.py | |
| """ | |
| Feedback + light RL loop. | |
| Provides: | |
| - run_feedback_agent(state): consumes user feedback (rating + optional comment + milestone tag) | |
| - FeedbackStore: persistent local small store (JSON) of rewards/metadata | |
| - Lightweight updater that adjusts pragmatic/governance thresholds based on moving-average rewards | |
| """ | |
| import os | |
| import json | |
| from datetime import datetime | |
| from typing import Dict, Any, Optional | |
| from logging import getLogger | |
| log = getLogger(__name__) | |
| FEEDBACK_STORE_FILE = os.environ.get("FEEDBACK_STORE_FILE", "feedback_store.json") | |
| DEFAULT_STORE = { | |
| "runs": [], # list of feedback entries | |
| "stats": { | |
| "count": 0, | |
| "avg_reward": 0.0, | |
| "pragmatist_threshold": 200.0, # default threshold in USD (tunable) | |
| "governance_strictness": 1.0 # multiplier: >1 stricter, <1 laxer | |
| } | |
| } | |
| class FeedbackStore: | |
| def __init__(self, path: str = FEEDBACK_STORE_FILE): | |
| self.path = path | |
| if not os.path.exists(self.path): | |
| self._write(DEFAULT_STORE) | |
| self._load() | |
| def _load(self): | |
| try: | |
| with open(self.path, "r", encoding="utf-8") as fh: | |
| self.data = json.load(fh) | |
| except Exception: | |
| self.data = DEFAULT_STORE.copy() | |
| self._write(self.data) | |
| def _write(self, obj): | |
| with open(self.path, "w", encoding="utf-8") as fh: | |
| json.dump(obj, fh, indent=2, default=str) | |
| def add_feedback(self, rating: int, comment: str, run_meta: Dict[str, Any], milestone: str = "final"): | |
| entry = { | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "rating": int(rating), | |
| "comment": comment or "", | |
| "milestone": milestone, | |
| "meta": run_meta or {} | |
| } | |
| self.data.setdefault("runs", []).append(entry) | |
| self._update_stats(entry) | |
| self._write(self.data) | |
| return entry | |
| def _update_stats(self, entry): | |
| s = self.data.setdefault("stats", DEFAULT_STORE["stats"].copy()) | |
| count = s.get("count", 0) | |
| avg = s.get("avg_reward", 0.0) | |
| r = float(entry["rating"]) | |
| # incremental moving average | |
| new_count = count + 1 | |
| new_avg = (avg * count + r) / new_count | |
| s["count"] = new_count | |
| s["avg_reward"] = new_avg | |
| # Simple adaptive rule: if avg_reward drops below threshold, lower pragmatist_threshold | |
| # and increase governance strictness slightly. This is intentionally conservative. | |
| # You can change the step sizes via env variables later. | |
| prag = s.get("pragmatist_threshold", 200.0) | |
| gov = s.get("governance_strictness", 1.0) | |
| # Tuning constants (safe defaults) | |
| DROP_THRESHOLD = 3.5 # if avg rating < 3.5 we become stricter | |
| INCREASE_STEP = 0.10 # 10% change step | |
| DECREASE_STEP = 0.05 # 5% relaxation step | |
| if new_avg < DROP_THRESHOLD: | |
| # become stricter: reduce pragmatist threshold (means we block more heavy experiments) | |
| prag = max(50.0, prag * (1.0 - INCREASE_STEP)) | |
| gov = min(2.0, gov * (1.0 + INCREASE_STEP)) | |
| s["notes"] = f"Adapted stricter due to avg_reward {new_avg:.2f}" | |
| else: | |
| # relax slightly if good feedback | |
| prag = prag * (1.0 + DECREASE_STEP) | |
| gov = max(0.5, gov * (1.0 - DECREASE_STEP)) | |
| s["notes"] = f"Relaxed thresholds (avg_reward {new_avg:.2f})" | |
| s["pragmatist_threshold"] = round(prag, 2) | |
| s["governance_strictness"] = round(gov, 3) | |
| def get_stats(self): | |
| return self.data.get("stats", DEFAULT_STORE["stats"].copy()) | |
| def get_all(self): | |
| return self.data | |
| # Convenience single global store | |
| _feedback_store = None | |
| def get_feedback_store(): | |
| global _feedback_store | |
| if _feedback_store is None: | |
| _feedback_store = FeedbackStore() | |
| return _feedback_store | |
| # Agent function to be called by LangGraph workflow | |
| def run_feedback_agent(state: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Expects state to contain: | |
| - feedback_input: { 'rating': int [1-5], 'comment': str, 'milestone': 'synthesis'|'archivist'|'final' } | |
| - run_meta: optional metadata about the run (cost, execution_path, plan summary) | |
| If no feedback_input present, it returns the current feedback stats (useful for UI). | |
| """ | |
| fs = get_feedback_store() | |
| feedback = state.get("feedback_input") | |
| path = (state.get("execution_path") or []) + ["Feedback"] | |
| if not feedback: | |
| # return stats only | |
| return {"feedbackStats": fs.get_stats(), "execution_path": path, "status_update": "Feedback stats returned"} | |
| rating = int(feedback.get("rating", 5)) | |
| comment = feedback.get("comment", "") | |
| milestone = feedback.get("milestone", "final") | |
| run_meta = feedback.get("run_meta", {}) | |
| entry = fs.add_feedback(rating, comment, run_meta, milestone=milestone) | |
| stats = fs.get_stats() | |
| # Return a short action suggestion: we will expose stats and small guidance to adjust thresholds | |
| return { | |
| "feedbackEntry": entry, | |
| "feedbackStats": stats, | |
| "execution_path": path, | |
| "status_update": f"Feedback recorded (rating={rating})" | |
| } | |