finryver-dev / agents /feedback_manager.py
Sahil Garg
udf added to /notes-llm alongwith RLHF
6611563
"""
RLHF Feedback Management System for FinRyver
Handles collection, storage, and management of human feedback on financial statements
"""
import json
import os
import time
import uuid
from typing import Dict, Any, List, Optional
import logging
logger = logging.getLogger(__name__)
class FeedbackManager:
"""Manages human feedback collection for RLHF training"""
def __init__(self, feedback_dir: str = "data/feedback"):
self.feedback_dir = feedback_dir
self.feedback_db = os.path.join(feedback_dir, "human_feedback.json")
self.statements_db = os.path.join(feedback_dir, "generated_statements.json")
os.makedirs(feedback_dir, exist_ok=True)
def store_generated_statement(self, statement_data: Dict[str, Any]) -> str:
"""Store generated statement for later feedback collection"""
statement_id = str(uuid.uuid4())
statement_record = {
"statement_id": statement_id,
"timestamp": time.time(),
"statement_type": statement_data.get("type", "unknown"),
"file_path": statement_data.get("file_path"),
"output_path": statement_data.get("output_path"),
"generation_time": statement_data.get("generation_time", 0),
"metadata": statement_data.get("metadata", {})
}
# Load existing statements
statements = self._load_statements()
statements.append(statement_record)
# Save updated statements
with open(self.statements_db, "w") as f:
json.dump(statements, f, indent=2)
logger.info(f"Stored statement {statement_id} for feedback collection")
return statement_id
def store_feedback(self, feedback: Dict[str, Any]) -> str:
"""Store human feedback for RLHF training"""
feedback_id = str(uuid.uuid4())
feedback_record = {
"feedback_id": feedback_id,
"statement_id": feedback.get("statement_id"),
"timestamp": time.time(),
"reviewer_id": feedback.get("reviewer_id", "anonymous"),
# Qualitative feedback
"specific_errors": feedback.get("specific_errors", ""),
"missing_items": feedback.get("missing_items", ""),
"improvement_suggestions": feedback.get("improvement_suggestions", ""),
"would_accept_for_audit": feedback.get("would_accept_for_audit", False),
# Additional context
"statement_type": feedback.get("statement_type"),
"complexity_level": feedback.get("complexity_level", "medium")
}
# Load existing feedback
all_feedback = self._load_feedback()
all_feedback.append(feedback_record)
# Save updated feedback
with open(self.feedback_db, "w") as f:
json.dump(all_feedback, f, indent=2)
logger.info(f"Stored feedback {feedback_id} for statement {feedback.get('statement_id')}")
return feedback_id
def get_training_data(self, min_feedback_count: int = 2) -> List[Dict[str, Any]]:
"""Get feedback data suitable for RLHF training"""
feedback_data = self._load_feedback()
if len(feedback_data) < min_feedback_count:
logger.warning(f"Only {len(feedback_data)} feedback samples available, need at least {min_feedback_count}")
return []
# Filter and prepare training data
training_data = []
for feedback in feedback_data:
training_sample = {
"statement_id": feedback["statement_id"],
"statement_type": feedback["statement_type"],
"binary_approval": feedback["would_accept_for_audit"],
"feedback_text": {
"errors": feedback.get("specific_errors", ""),
"missing": feedback.get("missing_items", ""),
"suggestions": feedback.get("improvement_suggestions", "")
}
}
training_data.append(training_sample)
return training_data
def get_statement_for_review(self, statement_id: str) -> Optional[Dict[str, Any]]:
"""Get statement data for human review"""
statements = self._load_statements()
for statement in statements:
if statement["statement_id"] == statement_id:
return statement
return None
def get_pending_reviews(self, limit: int = 10) -> List[Dict[str, Any]]:
"""Get statements that need human review"""
statements = self._load_statements()
feedback_data = self._load_feedback()
# Get statement IDs that already have feedback
reviewed_ids = {fb["statement_id"] for fb in feedback_data}
# Return statements without feedback
pending = [s for s in statements if s["statement_id"] not in reviewed_ids]
return pending[-limit:] # Return most recent
def get_feedback_stats(self) -> Dict[str, Any]:
"""Get statistics about collected feedback"""
feedback_data = self._load_feedback()
statements = self._load_statements()
if not feedback_data:
return {"total_feedback": 0, "total_statements": len(statements)}
# Calculate statistics
audit_approvals = [fb["would_accept_for_audit"] for fb in feedback_data]
stats = {
"total_feedback": len(feedback_data),
"total_statements": len(statements),
"audit_approval_rate": sum(audit_approvals) / len(audit_approvals) if audit_approvals else 0,
"feedback_by_type": {}
}
# Group by statement type
for fb in feedback_data:
stmt_type = fb.get("statement_type", "unknown")
if stmt_type not in stats["feedback_by_type"]:
stats["feedback_by_type"][stmt_type] = {"count": 0}
stats["feedback_by_type"][stmt_type]["count"] += 1
return stats
def _load_feedback(self) -> List[Dict[str, Any]]:
"""Load feedback from storage"""
if os.path.exists(self.feedback_db):
try:
with open(self.feedback_db, "r") as f:
return json.load(f)
except (json.JSONDecodeError, FileNotFoundError):
logger.warning("Could not load feedback database, starting fresh")
return []
def _load_statements(self) -> List[Dict[str, Any]]:
"""Load statements from storage"""
if os.path.exists(self.statements_db):
try:
with open(self.statements_db, "r") as f:
return json.load(f)
except (json.JSONDecodeError, FileNotFoundError):
logger.warning("Could not load statements database, starting fresh")
return []