import os import json from datetime import datetime from pathlib import Path from huggingface_hub import HfApi, upload_file, hf_hub_download from typing import Optional import pandas as pd class FeedbackManager: """管理用户反馈,支持保存到 Hugging Face 私有数据集""" def __init__( self, dataset_repo_id: str = None, hf_token: str = None, local_backup: bool = True ): """ 初始化 FeedbackManager Args: dataset_repo_id: Hugging Face 数据集仓库 ID (格式: username/dataset-name) hf_token: Hugging Face API token (用于私有数据集) local_backup: 是否在本地保留备份 """ self.dataset_repo_id = dataset_repo_id self.hf_token = hf_token or os.environ.get('HF_TOKEN') self.local_backup = local_backup # 初始化 HF API if self.dataset_repo_id and self.hf_token: self.api = HfApi(token=self.hf_token) # 确保数据集存在 self._ensure_dataset_exists() else: self.api = None print("⚠️ No HF dataset configured. Will only save locally.") # 设置本地存储路径 if os.environ.get('SPACE_ID'): self.local_dir = Path('/tmp/feedback_data') else: self.local_dir = Path(__file__).parent / 'feedback_data' self.local_dir.mkdir(exist_ok=True, parents=True) self.local_file = self.local_dir / 'user_feedback.json' def _ensure_dataset_exists(self): """确保 HF 数据集存在,如果不存在则创建""" try: from huggingface_hub import create_repo # 尝试创建数据集仓库(如果已存在会抛出异常) try: create_repo( repo_id=self.dataset_repo_id, token=self.hf_token, private=True, repo_type="dataset" ) print(f"✅ Created new private dataset: {self.dataset_repo_id}") # 创建初始的 README.md readme_content = f"""--- license: mit --- # AdaDetectGPT User Feedback Dataset This dataset contains user feedback from the AdaDetectGPT detection system. ## Data Format Each entry contains: - `timestamp`: When the feedback was submitted - `text`: The text that was analyzed - `domain`: The domain selected for analysis - `statistics`: The computed statistics value - `p_value`: The p-value from the detection - `feedback`: User feedback (expected/unexpected) """ readme_file = self.local_dir / 'README.md' readme_file.write_text(readme_content) upload_file( path_or_fileobj=str(readme_file), path_in_repo="README.md", repo_id=self.dataset_repo_id, repo_type="dataset", token=self.hf_token ) except Exception as e: if "already exists" not in str(e): print(f"⚠️ Dataset check: {e}") except Exception as e: print(f"⚠️ Could not verify dataset: {e}") def _load_existing_data(self) -> list: """从 HF 数据集加载现有数据""" existing_data = [] # 首先尝试从 HF 数据集加载 if self.api and self.dataset_repo_id: try: # 下载现有的反馈文件 local_path = hf_hub_download( repo_id=self.dataset_repo_id, filename="feedback_data.json", repo_type="dataset", token=self.hf_token, cache_dir=str(self.local_dir) ) with open(local_path, 'r', encoding='utf-8') as f: existing_data = json.load(f) print(f"📥 Loaded {len(existing_data)} existing feedback entries from HF") except Exception as e: # 文件可能还不存在 if "404" not in str(e): print(f"⚠️ Could not load from HF dataset: {e}") # 如果 HF 加载失败,尝试本地文件 if not existing_data and self.local_file.exists(): try: with open(self.local_file, 'r', encoding='utf-8') as f: existing_data = json.load(f) print(f"📥 Loaded {len(existing_data)} existing feedback entries from local") except Exception as e: print(f"⚠️ Could not load local data: {e}") return existing_data def save_feedback( self, text: str, domain: str, statistics: float, p_value: float, feedback_type: str ) -> tuple[bool, str]: """ 保存用户反馈到 HF 数据集和/或本地文件 Args: text: 被检测的文本 domain: 选择的领域 statistics: 统计值 p_value: p值 feedback_type: 'expected' 或 'unexpected' Returns: (success, message): 是否成功和相关消息 """ # 准备反馈数据 feedback_entry = { 'timestamp': datetime.now().isoformat(), 'text': text, 'domain': domain, 'statistics': float(statistics), 'p_value': float(p_value), 'feedback': feedback_type } # 加载现有数据 feedback_data = self._load_existing_data() # 添加新反馈 feedback_data.append(feedback_entry) success = False messages = [] # 保存到本地(作为备份) if self.local_backup: try: with open(self.local_file, 'w', encoding='utf-8') as f: json.dump(feedback_data, f, ensure_ascii=False, indent=2) messages.append(f"💾 Local backup saved") success = True except Exception as e: messages.append(f"❌ Local save failed: {e}") # 上传到 HF 数据集 if self.api and self.dataset_repo_id: try: # 保存为 JSON 文件 upload_file( path_or_fileobj=str(self.local_file), path_in_repo="feedback_data.json", repo_id=self.dataset_repo_id, repo_type="dataset", token=self.hf_token, commit_message=f"Add feedback: {feedback_type} at {feedback_entry['timestamp']}" ) # 同时创建/更新 CSV 版本(方便查看) df = pd.DataFrame(feedback_data) csv_file = self.local_dir / 'feedback_data.csv' df.to_csv(csv_file, index=False) upload_file( path_or_fileobj=str(csv_file), path_in_repo="feedback_data.csv", repo_id=self.dataset_repo_id, repo_type="dataset", token=self.hf_token, commit_message=f"Update CSV: {len(feedback_data)} total entries" ) messages.append(f"☁️ Uploaded to HF dataset: {self.dataset_repo_id}") success = True except Exception as e: messages.append(f"⚠️ HF upload failed: {e}") # 如果 HF 上传失败但本地保存成功,仍然返回成功 success = success or self.local_backup return success, " | ".join(messages) def get_feedback_stats(self) -> dict: """获取反馈统计信息""" feedback_data = self._load_existing_data() if not feedback_data: return { 'total_count': 0, 'expected_count': 0, 'unexpected_count': 0, 'domains': {} } df = pd.DataFrame(feedback_data) stats = { 'total_count': len(df), 'expected_count': len(df[df['feedback'] == 'expected']), 'unexpected_count': len(df[df['feedback'] == 'unexpected']), 'domains': df['domain'].value_counts().to_dict() if 'domain' in df.columns else {} } return stats # 便捷函数(向后兼容) _default_manager: Optional[FeedbackManager] = None def init_feedback_manager(dataset_repo_id: str = None, hf_token: str = None): """初始化全局反馈管理器""" global _default_manager _default_manager = FeedbackManager( dataset_repo_id=dataset_repo_id, hf_token=hf_token ) return _default_manager def save_feedback(text: str, domain: str, statistics: float, p_value: float, feedback_type: str): """ 使用默认管理器保存反馈(向后兼容) """ global _default_manager if _default_manager is None: # 从环境变量读取配置 dataset_repo_id = os.environ.get('FEEDBACK_DATASET_ID') _default_manager = FeedbackManager(dataset_repo_id=dataset_repo_id) success, message = _default_manager.save_feedback( text, domain, statistics, p_value, feedback_type ) if not success: raise Exception(f"Failed to save feedback: {message}") return message