import pandas as pd import numpy as np import re from typing import Dict, List, Tuple, Set import gradio as gr from datetime import datetime import io import tempfile import os class AnonymizationEvaluator: """ابزار ارزیابی ناشناس‌سازی با استفاده از متن مرجع""" def __init__(self): self.results_df = None def extract_entities_from_text(self, text: str) -> Dict[str, Set[str]]: """استخراج موجودیت‌ها از متن با debugging""" if pd.isna(text) or not isinstance(text, str): return {'companies': set(), 'persons': set(), 'amounts': set(), 'percents': set(), 'groups': set()} # تمیز کردن متن text = str(text).strip() # الگوهای مختلف برای موجودیت‌ها patterns = { 'companies': [r'company-(\d+)', r'Company-(\d+)', r'COMPANY-(\d+)'], 'persons': [r'person-(\d+)', r'Person-(\d+)', r'PERSON-(\d+)'], 'amounts': [r'amount-(\d+)', r'Amount-(\d+)', r'AMOUNT-(\d+)'], 'percents': [r'percent-(\d+)', r'Percent-(\d+)', r'PERCENT-(\d+)'], 'groups': [r'group-(\d+)', r'Group-(\d+)', r'GROUP-(\d+)'] } entities = {} for entity_type, pattern_list in patterns.items(): found = set() for pattern in pattern_list: matches = re.findall(pattern, text) found.update(matches) entities[entity_type] = found return entities def debug_text_analysis(self, reference_text: str, predicted_text: str, row_num: int = 0) -> str: """تابع debugging برای تحلیل متن‌ها""" debug_info = f"\n--- Debug Row {row_num + 1} ---\n" debug_info += f"Reference: '{reference_text[:100]}...'\n" debug_info += f"Predicted: '{predicted_text[:100]}...'\n" ref_entities = self.extract_entities_from_text(reference_text) pred_entities = self.extract_entities_from_text(predicted_text) debug_info += f"Reference entities: {dict(ref_entities)}\n" debug_info += f"Predicted entities: {dict(pred_entities)}\n" return debug_info def calculate_precision_recall_f1(self, reference_entities: Dict[str, Set[str]], predicted_entities: Dict[str, Set[str]]) -> Tuple[float, float, float]: """محاسبه Precision, Recall و F1-Score""" # ترکیب همه موجودیت‌ها ref_all = set() pred_all = set() for entity_type in ['companies', 'persons', 'amounts', 'percents', 'groups']: # اضافه کردن prefix برای جلوگیری از تداخل ref_entities = {f"{entity_type}:{e}" for e in reference_entities.get(entity_type, set())} pred_entities = {f"{entity_type}:{e}" for e in predicted_entities.get(entity_type, set())} ref_all.update(ref_entities) pred_all.update(pred_entities) if len(pred_all) == 0 and len(ref_all) == 0: return 1.0, 1.0, 1.0 # هر دو خالی هستند elif len(pred_all) == 0: return 0.0, 0.0, 0.0 # predicted خالی ولی reference دارد elif len(ref_all) == 0: return 0.0, 1.0, 0.0 # reference خالی ولی predicted دارد # محاسبه True Positive true_positive = len(ref_all.intersection(pred_all)) # محاسبه Precision, Recall precision = true_positive / len(pred_all) if len(pred_all) > 0 else 0.0 recall = true_positive / len(ref_all) if len(ref_all) > 0 else 0.0 # محاسبه F1-Score if precision + recall == 0: f1 = 0.0 else: f1 = 2 * (precision * recall) / (precision + recall) return precision, recall, f1 def calculate_accuracy(self, reference_text: str, predicted_text: str) -> float: """محاسبه Accuracy بر اساس تطابق کامل موجودیت‌ها""" ref_entities = self.extract_entities_from_text(reference_text) pred_entities = self.extract_entities_from_text(predicted_text) # شمارش کل موجودیت‌ها ref_total = sum(len(entities) for entities in ref_entities.values()) if ref_total == 0: return 1.0 if sum(len(entities) for entities in pred_entities.values()) == 0 else 0.0 # شمارش موجودیت‌های صحیح correct = 0 for entity_type in ref_entities.keys(): correct += len(ref_entities[entity_type].intersection(pred_entities[entity_type])) return correct / ref_total def evaluate_single_row(self, reference_text: str, predicted_text: str) -> Tuple[float, float, float]: """ارزیابی یک سطر""" try: # استخراج موجودیت‌ها ref_entities = self.extract_entities_from_text(reference_text) pred_entities = self.extract_entities_from_text(predicted_text) # محاسبه متریک‌ها precision, recall, f1 = self.calculate_precision_recall_f1(ref_entities, pred_entities) return precision, recall, f1 except Exception as e: print(f"خطا در ارزیابی: {str(e)}") return 0.0, 0.0, 0.0 def evaluate_dataset(self, file_path: str) -> Tuple[bool, str, pd.DataFrame]: """ارزیابی کل دیتاست با debugging""" try: # بارگذاری فایل df = pd.read_csv(file_path) # بررسی ستون‌های مورد نیاز required_columns = ['original_text', 'Reference_text', 'anonymized_text'] missing_columns = [col for col in required_columns if col not in df.columns] if missing_columns: return False, f"ستون‌های مفقود: {', '.join(missing_columns)}", pd.DataFrame() # تشخیص مشکل - بررسی نمونه‌ای از داده‌ها debug_info = "\n=== Debug Information ===\n" debug_info += f"تعداد سطرها: {len(df)}\n" debug_info += f"ستون‌ها: {list(df.columns)}\n\n" # بررسی چند سطر اول for i in range(min(3, len(df))): ref_text = str(df.iloc[i]['Reference_text']) anon_text = str(df.iloc[i]['anonymized_text']) debug_info += self.debug_text_analysis(ref_text, anon_text, i) print(debug_info) # نمایش در console # محاسبه متریک‌ها برای هر سطر precisions = [] recalls = [] f1_scores = [] total_entities_found = 0 # شمارنده کل موجودیت‌های یافت شده for index, row in df.iterrows(): precision, recall, f1 = self.evaluate_single_row( row['Reference_text'], row['anonymized_text'] ) precisions.append(round(precision, 4)) recalls.append(round(recall, 4)) f1_scores.append(round(f1, 4)) # شمارش موجودیت‌ها برای debugging ref_entities = self.extract_entities_from_text(str(row['Reference_text'])) pred_entities = self.extract_entities_from_text(str(row['anonymized_text'])) total_entities_found += sum(len(entities) for entities in ref_entities.values()) total_entities_found += sum(len(entities) for entities in pred_entities.values()) # اضافه کردن ستون‌های جدید df['Precision'] = precisions df['Recall'] = recalls df['F1_Score'] = f1_scores # ذخیره نتایج self.results_df = df # پیام وضعیت شامل اطلاعات debugging status_message = f"ارزیابی انجام شد. کل موجودیت‌های یافت شده: {total_entities_found}" if total_entities_found == 0: status_message += "\n⚠️ هیچ موجودیتی تشخیص داده نشد! لطفاً فرمت داده‌ها را بررسی کنید." return True, status_message, df except Exception as e: return False, f"خطا در پردازش فایل: {str(e)}", pd.DataFrame() def generate_summary_report(self, df: pd.DataFrame) -> str: """تولید گزارش خلاصه""" if df.empty: return "هیچ داده‌ای برای گزارش یافت نشد" # محاسبه آمار کلی avg_precision = df['Precision'].mean() avg_recall = df['Recall'].mean() avg_f1 = df['F1_Score'].mean() # محاسبه آمار تفصیلی total_rows = len(df) high_precision_count = len(df[df['Precision'] >= 0.8]) high_recall_count = len(df[df['Recall'] >= 0.8]) high_f1_count = len(df[df['F1_Score'] >= 0.8]) # بهترین و بدترین نتایج best_f1_idx = df['F1_Score'].idxmax() worst_f1_idx = df['F1_Score'].idxmin() report = f""" ## 📊 گزارش جامع ارزیابی ### آمار کلی: - **تعداد کل سطرها:** {total_rows} - **میانگین Precision:** {avg_precision:.4f} - **میانگین Recall:** {avg_recall:.4f} - **میانگین F1-Score:** {avg_f1:.4f} ### توزیع عملکرد (امتیاز ≥ 0.8): - **Precision بالا:** {high_precision_count} سطر ({high_precision_count/total_rows*100:.1f}%) - **Recall بالا:** {high_recall_count} سطر ({high_recall_count/total_rows*100:.1f}%) - **F1-Score بالا:** {high_f1_count} سطر ({high_f1_count/total_rows*100:.1f}%) ### نمونه‌های برتر و ضعیف: - **بهترین F1-Score:** {df.loc[best_f1_idx, 'F1_Score']:.4f} (سطر {best_f1_idx + 1}) - **ضعیف‌ترین F1-Score:** {df.loc[worst_f1_idx, 'F1_Score']:.4f} (سطر {worst_f1_idx + 1}) """ return report def create_downloadable_csv(self) -> bytes: """ایجاد محتوای CSV برای دانلود مستقیم""" if self.results_df is None or self.results_df.empty: return None try: # تولید محتوای CSV در حافظه csv_buffer = io.StringIO() self.results_df.to_csv(csv_buffer, index=False, encoding='utf-8') csv_content = csv_buffer.getvalue() csv_buffer.close() # تبدیل به bytes برای دانلود return csv_content.encode('utf-8-sig') except Exception as e: print(f"خطا در ایجاد محتوای CSV: {str(e)}") return None def create_evaluation_interface(): """ایجاد رابط کاربری ارزیابی""" evaluator = AnonymizationEvaluator() with gr.Blocks( title="ارزیابی ناشناس‌سازی", theme=gr.themes.Soft(), css=""" .gradio-container { font-family: 'Tahoma', 'Arial', sans-serif !important; direction: rtl; max-width: 1200px; margin: 0 auto; } .upload-area { border: 2px dashed #4CAF50; border-radius: 15px; padding: 30px; text-align: center; background: linear-gradient(145deg, #f8f9fa, #e9ecef); margin: 20px 0; } .results-table { direction: ltr; font-family: monospace; font-size: 12px; } .summary-box { background-color: #e3f2fd; border: 1px solid #2196F3; border-radius: 10px; padding: 20px; margin: 15px 0; } """ ) as interface: gr.Markdown(""" # 📊 ابزار ارزیابی ناشناس‌سازی با متن مرجع ### آپلود فایل CSV شامل ستون‌های: original_text, Reference_text, anonymized_text """) with gr.Row(): with gr.Column(scale=1): gr.Markdown("### 📁 بارگذاری فایل") file_input = gr.File( label="انتخاب فایل CSV", file_types=[".csv"], elem_classes=["upload-area"] ) evaluate_btn = gr.Button( "🚀 شروع ارزیابی", variant="primary", size="lg", interactive=False ) download_btn = gr.Button( "💾 دانلود نتایج CSV", variant="secondary", visible=False ) with gr.Column(scale=2): status_output = gr.Markdown("وضعیت: آماده بارگذاری فایل...") summary_output = gr.Markdown( visible=False, elem_classes=["summary-box"] ) # جدول نتایج results_table = gr.Dataframe( label="نتایج تفصیلی (نمایش 10 سطر اول)", visible=False, elem_classes=["results-table"], wrap=True ) # فایل دانلود download_file = gr.File( visible=False, label="فایل نتایج" ) def on_file_upload(file): if file is None: return "❌ لطفاً فایل را انتخاب کنید", gr.Button(interactive=False) return "✅ فایل بارگذاری شد، آماده ارزیابی", gr.Button(interactive=True) def evaluate_file(file): if file is None: return ( "❌ هیچ فایلی انتخاب نشده", gr.Markdown(visible=False), gr.Dataframe(visible=False), gr.Button(visible=False), gr.File(visible=False) ) try: success, message, df = evaluator.evaluate_dataset(file.name) if not success: return ( f"❌ {message}", gr.Markdown(visible=False), gr.Dataframe(visible=False), gr.Button(visible=False), gr.File(visible=False) ) # تولید گزارش خلاصه summary = evaluator.generate_summary_report(df) # نمایش 10 سطر اول برای نمونه در رابط display_df = df.head(10) # پیام اطلاع‌رسانی status_message = f"✅ {message} - {len(df)} سطر پردازش شد. نمایش: 10 سطر اول، دانلود: همه سطرها" return ( status_message, gr.Markdown(value=summary, visible=True), gr.Dataframe(value=display_df, visible=True), gr.Button(visible=True), gr.File(visible=False) ) except Exception as e: return ( f"❌ خطای غیرمنتظره: {str(e)}", gr.Markdown(visible=False), gr.Dataframe(visible=False), gr.Button(visible=False), gr.File(visible=False) ) def download_results(): try: if evaluator.results_df is None or evaluator.results_df.empty: return ( "❌ هیچ داده‌ای برای دانلود وجود ندارد. ابتدا ارزیابی را انجام دهید.", gr.File(visible=False) ) # ایجاد محتوای CSV csv_content = evaluator.create_downloadable_csv() if csv_content: # تولید نام فایل timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"evaluation_results_{timestamp}.csv" # ذخیره در فایل موقت برای دانلود with tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.csv', prefix='eval_') as temp_file: temp_file.write(csv_content) temp_filename = temp_file.name return ( f"✅ فایل نتایج آماده شد: {filename} ({len(evaluator.results_df)} سطر)", gr.File(value=temp_filename, visible=True) ) else: return ( "❌ خطا در ایجاد محتوای CSV", gr.File(visible=False) ) except Exception as e: return ( f"❌ خطا در دانلود: {str(e)}", gr.File(visible=False) ) # اتصال رویدادها file_input.change( fn=on_file_upload, inputs=[file_input], outputs=[status_output, evaluate_btn] ) evaluate_btn.click( fn=evaluate_file, inputs=[file_input], outputs=[status_output, summary_output, results_table, download_btn, download_file] ) download_btn.click( fn=download_results, outputs=[status_output, download_file] ) # راهنمای استفاده with gr.Accordion("📖 راهنمای استفاده", open=False): gr.Markdown(""" ### فرمت فایل CSV مورد نیاز: فایل شما باید حاوی دقیقاً این سه ستون باشد: - **original_text**: متن اصلی - **Reference_text**: متن ناشناس‌شده مرجع (Ground Truth) - **anonymized_text**: متن ناشناس‌شده مورد ارزیابی ### متریک‌های محاسبه شده: - **Precision**: دقت = (تعداد موجودیت‌های صحیح شناسایی شده) / (کل موجودیت‌های شناسایی شده) - **Recall**: بازیابی = (تعداد موجودیت‌های صحیح شناسایی شده) / (کل موجودیت‌های مرجع) - **F1-Score**: میانگین هارمونیک Precision و Recall ### مراحل کار: 1. فایل CSV را آپلود کنید 2. روی "شروع ارزیابی" کلیک کنید 3. گزارش خلاصه و جدول نمونه (10 سطر اول) را مشاهده کنید 4. **فایل نتایج کامل (همه سطرها) را دانلود کنید** ### نکات مهم: - **نمایش رابط**: فقط 10 سطر اول نمایش داده می‌شود - **فایل دانلود**: شامل تمام سطرهای پردازش شده + متریک‌ها - فایل خروجی شامل ستون‌های اصلی + سه ستون متریک خواهد بود - متریک‌ها برای هر سطر جداگانه محاسبه می‌شوند - آمار کلی در گزارش خلاصه نمایش داده می‌شود ### مشکل در دانلود؟ اگر فایل دانلود نمی‌شود: 1. مرورگر خود را رفرش کنید 2. مجدداً ارزیابی را انجام دهید 3. اطمینان حاصل کنید که popup blocker غیرفعال است """) # تست دانلود with gr.Accordion("🧪 تست دانلود", open=False): gr.Markdown("برای تست عملکرد دانلود:") test_download_btn = gr.Button("تست دانلود فایل نمونه") test_file_output = gr.File(label="فایل تست", visible=False) def create_test_file(): """ایجاد فایل تست برای بررسی دانلود""" try: test_data = { 'original_text': ['متن تست 1', 'متن تست 2'], 'Reference_text': ['company-01 amount-01', 'person-01 amount-02'], 'anonymized_text': ['company-01 amount-01', 'person-01 amount-02'], 'Precision': [1.0, 1.0], 'Recall': [1.0, 1.0], 'F1_Score': [1.0, 1.0] } test_df = pd.DataFrame(test_data) # ایجاد محتوای CSV در حافظه csv_buffer = io.StringIO() test_df.to_csv(csv_buffer, index=False) csv_content = csv_buffer.getvalue() csv_buffer.close() # تبدیل به bytes و ذخیره در فایل موقت csv_bytes = csv_content.encode('utf-8-sig') with tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.csv', prefix='test_') as temp_file: temp_file.write(csv_bytes) temp_filename = temp_file.name return gr.File(value=temp_filename, visible=True) except Exception as e: print(f"خطا در ایجاد فایل تست: {str(e)}") return gr.File(visible=False) test_download_btn.click( fn=create_test_file, outputs=[test_file_output] ) return interface # اجرای برنامه if __name__ == "__main__": interface = create_evaluation_interface() interface.launch()