Spaces:
Sleeping
Sleeping
File size: 7,024 Bytes
c172f37 6611563 c172f37 6611563 c172f37 6611563 c172f37 6611563 c172f37 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
"""
RLHF Feedback Management System for FinRyver
Handles collection, storage, and management of human feedback on financial statements
"""
import json
import os
import time
import uuid
from typing import Dict, Any, List, Optional
import logging
logger = logging.getLogger(__name__)
class FeedbackManager:
"""Manages human feedback collection for RLHF training"""
def __init__(self, feedback_dir: str = "data/feedback"):
self.feedback_dir = feedback_dir
self.feedback_db = os.path.join(feedback_dir, "human_feedback.json")
self.statements_db = os.path.join(feedback_dir, "generated_statements.json")
os.makedirs(feedback_dir, exist_ok=True)
def store_generated_statement(self, statement_data: Dict[str, Any]) -> str:
"""Store generated statement for later feedback collection"""
statement_id = str(uuid.uuid4())
statement_record = {
"statement_id": statement_id,
"timestamp": time.time(),
"statement_type": statement_data.get("type", "unknown"),
"file_path": statement_data.get("file_path"),
"output_path": statement_data.get("output_path"),
"generation_time": statement_data.get("generation_time", 0),
"metadata": statement_data.get("metadata", {})
}
# Load existing statements
statements = self._load_statements()
statements.append(statement_record)
# Save updated statements
with open(self.statements_db, "w") as f:
json.dump(statements, f, indent=2)
logger.info(f"Stored statement {statement_id} for feedback collection")
return statement_id
def store_feedback(self, feedback: Dict[str, Any]) -> str:
"""Store human feedback for RLHF training"""
feedback_id = str(uuid.uuid4())
feedback_record = {
"feedback_id": feedback_id,
"statement_id": feedback.get("statement_id"),
"timestamp": time.time(),
"reviewer_id": feedback.get("reviewer_id", "anonymous"),
# Qualitative feedback
"specific_errors": feedback.get("specific_errors", ""),
"missing_items": feedback.get("missing_items", ""),
"improvement_suggestions": feedback.get("improvement_suggestions", ""),
"would_accept_for_audit": feedback.get("would_accept_for_audit", False),
# Additional context
"statement_type": feedback.get("statement_type"),
"complexity_level": feedback.get("complexity_level", "medium")
}
# Load existing feedback
all_feedback = self._load_feedback()
all_feedback.append(feedback_record)
# Save updated feedback
with open(self.feedback_db, "w") as f:
json.dump(all_feedback, f, indent=2)
logger.info(f"Stored feedback {feedback_id} for statement {feedback.get('statement_id')}")
return feedback_id
def get_training_data(self, min_feedback_count: int = 2) -> List[Dict[str, Any]]:
"""Get feedback data suitable for RLHF training"""
feedback_data = self._load_feedback()
if len(feedback_data) < min_feedback_count:
logger.warning(f"Only {len(feedback_data)} feedback samples available, need at least {min_feedback_count}")
return []
# Filter and prepare training data
training_data = []
for feedback in feedback_data:
training_sample = {
"statement_id": feedback["statement_id"],
"statement_type": feedback["statement_type"],
"binary_approval": feedback["would_accept_for_audit"],
"feedback_text": {
"errors": feedback.get("specific_errors", ""),
"missing": feedback.get("missing_items", ""),
"suggestions": feedback.get("improvement_suggestions", "")
}
}
training_data.append(training_sample)
return training_data
def get_statement_for_review(self, statement_id: str) -> Optional[Dict[str, Any]]:
"""Get statement data for human review"""
statements = self._load_statements()
for statement in statements:
if statement["statement_id"] == statement_id:
return statement
return None
def get_pending_reviews(self, limit: int = 10) -> List[Dict[str, Any]]:
"""Get statements that need human review"""
statements = self._load_statements()
feedback_data = self._load_feedback()
# Get statement IDs that already have feedback
reviewed_ids = {fb["statement_id"] for fb in feedback_data}
# Return statements without feedback
pending = [s for s in statements if s["statement_id"] not in reviewed_ids]
return pending[-limit:] # Return most recent
def get_feedback_stats(self) -> Dict[str, Any]:
"""Get statistics about collected feedback"""
feedback_data = self._load_feedback()
statements = self._load_statements()
if not feedback_data:
return {"total_feedback": 0, "total_statements": len(statements)}
# Calculate statistics
audit_approvals = [fb["would_accept_for_audit"] for fb in feedback_data]
stats = {
"total_feedback": len(feedback_data),
"total_statements": len(statements),
"audit_approval_rate": sum(audit_approvals) / len(audit_approvals) if audit_approvals else 0,
"feedback_by_type": {}
}
# Group by statement type
for fb in feedback_data:
stmt_type = fb.get("statement_type", "unknown")
if stmt_type not in stats["feedback_by_type"]:
stats["feedback_by_type"][stmt_type] = {"count": 0}
stats["feedback_by_type"][stmt_type]["count"] += 1
return stats
def _load_feedback(self) -> List[Dict[str, Any]]:
"""Load feedback from storage"""
if os.path.exists(self.feedback_db):
try:
with open(self.feedback_db, "r") as f:
return json.load(f)
except (json.JSONDecodeError, FileNotFoundError):
logger.warning("Could not load feedback database, starting fresh")
return []
def _load_statements(self) -> List[Dict[str, Any]]:
"""Load statements from storage"""
if os.path.exists(self.statements_db):
try:
with open(self.statements_db, "r") as f:
return json.load(f)
except (json.JSONDecodeError, FileNotFoundError):
logger.warning("Could not load statements database, starting fresh")
return []
|