Spaces:
Paused
Paused
File size: 5,247 Bytes
c86457d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
# feedback_agent.py
"""
Feedback + light RL loop.
Provides:
- run_feedback_agent(state): consumes user feedback (rating + optional comment + milestone tag)
- FeedbackStore: persistent local small store (JSON) of rewards/metadata
- Lightweight updater that adjusts pragmatic/governance thresholds based on moving-average rewards
"""
import os
import json
from datetime import datetime
from typing import Dict, Any, Optional
from logging import getLogger
log = getLogger(__name__)
FEEDBACK_STORE_FILE = os.environ.get("FEEDBACK_STORE_FILE", "feedback_store.json")
DEFAULT_STORE = {
"runs": [], # list of feedback entries
"stats": {
"count": 0,
"avg_reward": 0.0,
"pragmatist_threshold": 200.0, # default threshold in USD (tunable)
"governance_strictness": 1.0 # multiplier: >1 stricter, <1 laxer
}
}
class FeedbackStore:
def __init__(self, path: str = FEEDBACK_STORE_FILE):
self.path = path
if not os.path.exists(self.path):
self._write(DEFAULT_STORE)
self._load()
def _load(self):
try:
with open(self.path, "r", encoding="utf-8") as fh:
self.data = json.load(fh)
except Exception:
self.data = DEFAULT_STORE.copy()
self._write(self.data)
def _write(self, obj):
with open(self.path, "w", encoding="utf-8") as fh:
json.dump(obj, fh, indent=2, default=str)
def add_feedback(self, rating: int, comment: str, run_meta: Dict[str, Any], milestone: str = "final"):
entry = {
"timestamp": datetime.utcnow().isoformat(),
"rating": int(rating),
"comment": comment or "",
"milestone": milestone,
"meta": run_meta or {}
}
self.data.setdefault("runs", []).append(entry)
self._update_stats(entry)
self._write(self.data)
return entry
def _update_stats(self, entry):
s = self.data.setdefault("stats", DEFAULT_STORE["stats"].copy())
count = s.get("count", 0)
avg = s.get("avg_reward", 0.0)
r = float(entry["rating"])
# incremental moving average
new_count = count + 1
new_avg = (avg * count + r) / new_count
s["count"] = new_count
s["avg_reward"] = new_avg
# Simple adaptive rule: if avg_reward drops below threshold, lower pragmatist_threshold
# and increase governance strictness slightly. This is intentionally conservative.
# You can change the step sizes via env variables later.
prag = s.get("pragmatist_threshold", 200.0)
gov = s.get("governance_strictness", 1.0)
# Tuning constants (safe defaults)
DROP_THRESHOLD = 3.5 # if avg rating < 3.5 we become stricter
INCREASE_STEP = 0.10 # 10% change step
DECREASE_STEP = 0.05 # 5% relaxation step
if new_avg < DROP_THRESHOLD:
# become stricter: reduce pragmatist threshold (means we block more heavy experiments)
prag = max(50.0, prag * (1.0 - INCREASE_STEP))
gov = min(2.0, gov * (1.0 + INCREASE_STEP))
s["notes"] = f"Adapted stricter due to avg_reward {new_avg:.2f}"
else:
# relax slightly if good feedback
prag = prag * (1.0 + DECREASE_STEP)
gov = max(0.5, gov * (1.0 - DECREASE_STEP))
s["notes"] = f"Relaxed thresholds (avg_reward {new_avg:.2f})"
s["pragmatist_threshold"] = round(prag, 2)
s["governance_strictness"] = round(gov, 3)
def get_stats(self):
return self.data.get("stats", DEFAULT_STORE["stats"].copy())
def get_all(self):
return self.data
# Convenience single global store
_feedback_store = None
def get_feedback_store():
global _feedback_store
if _feedback_store is None:
_feedback_store = FeedbackStore()
return _feedback_store
# Agent function to be called by LangGraph workflow
def run_feedback_agent(state: Dict[str, Any]) -> Dict[str, Any]:
"""
Expects state to contain:
- feedback_input: { 'rating': int [1-5], 'comment': str, 'milestone': 'synthesis'|'archivist'|'final' }
- run_meta: optional metadata about the run (cost, execution_path, plan summary)
If no feedback_input present, it returns the current feedback stats (useful for UI).
"""
fs = get_feedback_store()
feedback = state.get("feedback_input")
path = (state.get("execution_path") or []) + ["Feedback"]
if not feedback:
# return stats only
return {"feedbackStats": fs.get_stats(), "execution_path": path, "status_update": "Feedback stats returned"}
rating = int(feedback.get("rating", 5))
comment = feedback.get("comment", "")
milestone = feedback.get("milestone", "final")
run_meta = feedback.get("run_meta", {})
entry = fs.add_feedback(rating, comment, run_meta, milestone=milestone)
stats = fs.get_stats()
# Return a short action suggestion: we will expose stats and small guidance to adjust thresholds
return {
"feedbackEntry": entry,
"feedbackStats": stats,
"execution_path": path,
"status_update": f"Feedback recorded (rating={rating})"
}
|