JatinAutonomousLabs commited on
Commit
c86457d
·
verified ·
1 Parent(s): 9049d91

Create feedback_agent.py

Browse files
Files changed (1) hide show
  1. feedback_agent.py +141 -0
feedback_agent.py ADDED
@@ -0,0 +1,141 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # feedback_agent.py
2
+ """
3
+ Feedback + light RL loop.
4
+
5
+ Provides:
6
+ - run_feedback_agent(state): consumes user feedback (rating + optional comment + milestone tag)
7
+ - FeedbackStore: persistent local small store (JSON) of rewards/metadata
8
+ - Lightweight updater that adjusts pragmatic/governance thresholds based on moving-average rewards
9
+ """
10
+
11
+ import os
12
+ import json
13
+ from datetime import datetime
14
+ from typing import Dict, Any, Optional
15
+ from logging import getLogger
16
+
17
+ log = getLogger(__name__)
18
+
19
+ FEEDBACK_STORE_FILE = os.environ.get("FEEDBACK_STORE_FILE", "feedback_store.json")
20
+ DEFAULT_STORE = {
21
+ "runs": [], # list of feedback entries
22
+ "stats": {
23
+ "count": 0,
24
+ "avg_reward": 0.0,
25
+ "pragmatist_threshold": 200.0, # default threshold in USD (tunable)
26
+ "governance_strictness": 1.0 # multiplier: >1 stricter, <1 laxer
27
+ }
28
+ }
29
+
30
+ class FeedbackStore:
31
+ def __init__(self, path: str = FEEDBACK_STORE_FILE):
32
+ self.path = path
33
+ if not os.path.exists(self.path):
34
+ self._write(DEFAULT_STORE)
35
+ self._load()
36
+
37
+ def _load(self):
38
+ try:
39
+ with open(self.path, "r", encoding="utf-8") as fh:
40
+ self.data = json.load(fh)
41
+ except Exception:
42
+ self.data = DEFAULT_STORE.copy()
43
+ self._write(self.data)
44
+
45
+ def _write(self, obj):
46
+ with open(self.path, "w", encoding="utf-8") as fh:
47
+ json.dump(obj, fh, indent=2, default=str)
48
+
49
+ def add_feedback(self, rating: int, comment: str, run_meta: Dict[str, Any], milestone: str = "final"):
50
+ entry = {
51
+ "timestamp": datetime.utcnow().isoformat(),
52
+ "rating": int(rating),
53
+ "comment": comment or "",
54
+ "milestone": milestone,
55
+ "meta": run_meta or {}
56
+ }
57
+ self.data.setdefault("runs", []).append(entry)
58
+ self._update_stats(entry)
59
+ self._write(self.data)
60
+ return entry
61
+
62
+ def _update_stats(self, entry):
63
+ s = self.data.setdefault("stats", DEFAULT_STORE["stats"].copy())
64
+ count = s.get("count", 0)
65
+ avg = s.get("avg_reward", 0.0)
66
+ r = float(entry["rating"])
67
+ # incremental moving average
68
+ new_count = count + 1
69
+ new_avg = (avg * count + r) / new_count
70
+ s["count"] = new_count
71
+ s["avg_reward"] = new_avg
72
+
73
+ # Simple adaptive rule: if avg_reward drops below threshold, lower pragmatist_threshold
74
+ # and increase governance strictness slightly. This is intentionally conservative.
75
+ # You can change the step sizes via env variables later.
76
+ prag = s.get("pragmatist_threshold", 200.0)
77
+ gov = s.get("governance_strictness", 1.0)
78
+
79
+ # Tuning constants (safe defaults)
80
+ DROP_THRESHOLD = 3.5 # if avg rating < 3.5 we become stricter
81
+ INCREASE_STEP = 0.10 # 10% change step
82
+ DECREASE_STEP = 0.05 # 5% relaxation step
83
+
84
+ if new_avg < DROP_THRESHOLD:
85
+ # become stricter: reduce pragmatist threshold (means we block more heavy experiments)
86
+ prag = max(50.0, prag * (1.0 - INCREASE_STEP))
87
+ gov = min(2.0, gov * (1.0 + INCREASE_STEP))
88
+ s["notes"] = f"Adapted stricter due to avg_reward {new_avg:.2f}"
89
+ else:
90
+ # relax slightly if good feedback
91
+ prag = prag * (1.0 + DECREASE_STEP)
92
+ gov = max(0.5, gov * (1.0 - DECREASE_STEP))
93
+ s["notes"] = f"Relaxed thresholds (avg_reward {new_avg:.2f})"
94
+
95
+ s["pragmatist_threshold"] = round(prag, 2)
96
+ s["governance_strictness"] = round(gov, 3)
97
+
98
+ def get_stats(self):
99
+ return self.data.get("stats", DEFAULT_STORE["stats"].copy())
100
+
101
+ def get_all(self):
102
+ return self.data
103
+
104
+ # Convenience single global store
105
+ _feedback_store = None
106
+ def get_feedback_store():
107
+ global _feedback_store
108
+ if _feedback_store is None:
109
+ _feedback_store = FeedbackStore()
110
+ return _feedback_store
111
+
112
+ # Agent function to be called by LangGraph workflow
113
+ def run_feedback_agent(state: Dict[str, Any]) -> Dict[str, Any]:
114
+ """
115
+ Expects state to contain:
116
+ - feedback_input: { 'rating': int [1-5], 'comment': str, 'milestone': 'synthesis'|'archivist'|'final' }
117
+ - run_meta: optional metadata about the run (cost, execution_path, plan summary)
118
+ If no feedback_input present, it returns the current feedback stats (useful for UI).
119
+ """
120
+ fs = get_feedback_store()
121
+ feedback = state.get("feedback_input")
122
+ path = (state.get("execution_path") or []) + ["Feedback"]
123
+ if not feedback:
124
+ # return stats only
125
+ return {"feedbackStats": fs.get_stats(), "execution_path": path, "status_update": "Feedback stats returned"}
126
+
127
+ rating = int(feedback.get("rating", 5))
128
+ comment = feedback.get("comment", "")
129
+ milestone = feedback.get("milestone", "final")
130
+ run_meta = feedback.get("run_meta", {})
131
+
132
+ entry = fs.add_feedback(rating, comment, run_meta, milestone=milestone)
133
+ stats = fs.get_stats()
134
+
135
+ # Return a short action suggestion: we will expose stats and small guidance to adjust thresholds
136
+ return {
137
+ "feedbackEntry": entry,
138
+ "feedbackStats": stats,
139
+ "execution_path": path,
140
+ "status_update": f"Feedback recorded (rating={rating})"
141
+ }