import pandas as pd import numpy as np import re from typing import Dict, List, Tuple, Set import gradio as gr from datetime import datetime import io import tempfile import os class AnonymizationEvaluator: """ابزار ارزیابی ناشناس‌سازی با استفاده از متن مرجع""" def __init__(self): self.results_df = None def extract_entity_types_from_text(self, text: str) -> Dict[str, int]: """استخراج انواع موجودیت‌ها بدون در نظر گرفتن شناسه عددی""" if pd.isna(text) or not isinstance(text, str): return {'companies': 0, 'persons': 0, 'amounts': 0, 'percents': 0, 'groups': 0} # تمیز کردن متن text = str(text).strip() # الگوهای برای تشخیص انواع موجودیت‌ها (بدون در نظر گیری شناسه عددی) patterns = { 'companies': [r'company-\d+', r'Company-\d+', r'COMPANY-\d+'], 'persons': [r'person-\d+', r'Person-\d+', r'PERSON-\d+'], 'amounts': [r'amount-\d+', r'Amount-\d+', r'AMOUNT-\d+'], 'percents': [r'percent-\d+', r'Percent-\d+', r'PERCENT-\d+'], 'groups': [r'group-\d+', r'Group-\d+', r'GROUP-\d+'] } entity_counts = {} for entity_type, pattern_list in patterns.items(): count = 0 for pattern in pattern_list: matches = re.findall(pattern, text) count += len(matches) entity_counts[entity_type] = count return entity_counts def calculate_precision_recall_f1_by_type(self, reference_counts: Dict[str, int], predicted_counts: Dict[str, int]) -> Tuple[float, float, float]: """محاسبه Precision, Recall و F1-Score بر اساس نوع موجودیت‌ها""" # مجموع کل موجودیت‌ها total_reference = sum(reference_counts.values()) total_predicted = sum(predicted_counts.values()) if total_predicted == 0 and total_reference == 0: return 1.0, 1.0, 1.0 # هر دو خالی هستند - تطبیق کامل elif total_predicted == 0: return 0.0, 0.0, 0.0 # predicted خالی ولی reference دارد elif total_reference == 0: return 0.0, 1.0 if total_predicted > 0 else 1.0, 0.0 # reference خالی # محاسبه True Positive برای هر نوع موجودیت true_positives = 0 for entity_type in reference_counts.keys(): ref_count = reference_counts[entity_type] pred_count = predicted_counts[entity_type] # True Positive = کمترین تعداد بین reference و predicted برای هر نوع true_positives += min(ref_count, pred_count) # محاسبه Precision و Recall precision = true_positives / total_predicted if total_predicted > 0 else 0.0 recall = true_positives / total_reference if total_reference > 0 else 0.0 # محاسبه F1-Score if precision + recall == 0: f1 = 0.0 else: f1 = 2 * (precision * recall) / (precision + recall) return precision, recall, f1 def evaluate_single_row(self, reference_text: str, predicted_text: str) -> Tuple[float, float, float]: """ارزیابی یک سطر بر اساس نوع موجودیت‌ها""" try: # استخراج تعداد موجودیت‌ها بر اساس نوع ref_counts = self.extract_entity_types_from_text(reference_text) pred_counts = self.extract_entity_types_from_text(predicted_text) # محاسبه متریک‌ها precision, recall, f1 = self.calculate_precision_recall_f1_by_type(ref_counts, pred_counts) return precision, recall, f1 except Exception as e: print(f"خطا در ارزیابی: {str(e)}") return 0.0, 0.0, 0.0 def debug_text_analysis(self, reference_text: str, predicted_text: str, row_num: int = 0) -> str: """تابع debugging برای تحلیل متن‌ها بر اساس نوع موجودیت‌ها""" debug_info = f"\n--- Debug Row {row_num + 1} ---\n" debug_info += f"Reference: '{reference_text[:100]}...'\n" debug_info += f"Predicted: '{predicted_text[:100]}...'\n" ref_counts = self.extract_entity_types_from_text(reference_text) pred_counts = self.extract_entity_types_from_text(predicted_text) debug_info += f"Reference entity counts: {ref_counts}\n" debug_info += f"Predicted entity counts: {pred_counts}\n" # محاسبه متریک‌ها برای این سطر precision, recall, f1 = self.calculate_precision_recall_f1_by_type(ref_counts, pred_counts) debug_info += f"Metrics: P={precision:.3f}, R={recall:.3f}, F1={f1:.3f}\n" return debug_info def evaluate_dataset(self, file_path: str) -> Tuple[bool, str, pd.DataFrame]: """ارزیابی کل دیتاست بر اساس نوع موجودیت‌ها""" try: # بارگذاری فایل df = pd.read_csv(file_path) # بررسی ستون‌های مورد نیاز required_columns = ['original_text', 'Reference_text', 'anonymized_text'] missing_columns = [col for col in required_columns if col not in df.columns] if missing_columns: return False, f"ستون‌های مفقود: {', '.join(missing_columns)}", pd.DataFrame() # تشخیص مشکل - بررسی نمونه‌ای از داده‌ها debug_info = "\n=== روش جدید: ارزیابی بر اساس نوع موجودیت‌ها ===\n" debug_info += f"تعداد سطرها: {len(df)}\n" debug_info += f"ستون‌ها: {list(df.columns)}\n\n" # بررسی چند سطر اول for i in range(min(3, len(df))): ref_text = str(df.iloc[i]['Reference_text']) anon_text = str(df.iloc[i]['anonymized_text']) debug_info += self.debug_text_analysis(ref_text, anon_text, i) print(debug_info) # نمایش در console # محاسبه متریک‌ها برای هر سطر precisions = [] recalls = [] f1_scores = [] total_entity_types_found = 0 # شمارنده کل انواع موجودیت‌های یافت شده for index, row in df.iterrows(): precision, recall, f1 = self.evaluate_single_row( row['Reference_text'], row['anonymized_text'] ) precisions.append(round(precision, 4)) recalls.append(round(recall, 4)) f1_scores.append(round(f1, 4)) # شمارش انواع موجودیت‌ها برای debugging ref_counts = self.extract_entity_types_from_text(str(row['Reference_text'])) pred_counts = self.extract_entity_types_from_text(str(row['anonymized_text'])) total_entity_types_found += sum(ref_counts.values()) + sum(pred_counts.values()) # اضافه کردن ستون‌های جدید df['Precision'] = precisions df['Recall'] = recalls df['F1_Score'] = f1_scores # ذخیره نتایج self.results_df = df # پیام وضعیت شامل اطلاعات debugging avg_precision = np.mean(precisions) avg_recall = np.mean(recalls) avg_f1 = np.mean(f1_scores) status_message = f"""✅ ارزیابی بر اساس نوع موجودیت‌ها انجام شد: • میانگین Precision: {avg_precision:.3f} • میانگین Recall: {avg_recall:.3f} • میانگین F1-Score: {avg_f1:.3f} • کل موجودیت‌های یافت شده: {total_entity_types_found}""" if total_entity_types_found == 0: status_message += "\n⚠️ هیچ موجودیتی تشخیص داده نشد! لطفاً فرمت داده‌ها را بررسی کنید." return True, status_message, df except Exception as e: return False, f"خطا در پردازش فایل: {str(e)}", pd.DataFrame() def generate_summary_report(self, df: pd.DataFrame) -> str: """تولید گزارش خلاصه""" if df.empty: return "هیچ داده‌ای برای گزارش یافت نشد" # محاسبه آمار کلی avg_precision = df['Precision'].mean() avg_recall = df['Recall'].mean() avg_f1 = df['F1_Score'].mean() # محاسبه آمار تفصیلی total_rows = len(df) high_precision_count = len(df[df['Precision'] >= 0.8]) high_recall_count = len(df[df['Recall'] >= 0.8]) high_f1_count = len(df[df['F1_Score'] >= 0.8]) # بهترین و بدترین نتایج best_f1_idx = df['F1_Score'].idxmax() worst_f1_idx = df['F1_Score'].idxmin() report = f""" ## 📊 گزارش جامع ارزیابی ### آمار کلی: - **تعداد کل سطرها:** {total_rows} - **میانگین Precision:** {avg_precision:.4f} - **میانگین Recall:** {avg_recall:.4f} - **میانگین F1-Score:** {avg_f1:.4f} ### توزیع عملکرد (امتیاز ≥ 0.8): - **Precision بالا:** {high_precision_count} سطر ({high_precision_count/total_rows*100:.1f}%) - **Recall بالا:** {high_recall_count} سطر ({high_recall_count/total_rows*100:.1f}%) - **F1-Score بالا:** {high_f1_count} سطر ({high_f1_count/total_rows*100:.1f}%) ### نمونه‌های برتر و ضعیف: - **بهترین F1-Score:** {df.loc[best_f1_idx, 'F1_Score']:.4f} (سطر {best_f1_idx + 1}) - **ضعیف‌ترین F1-Score:** {df.loc[worst_f1_idx, 'F1_Score']:.4f} (سطر {worst_f1_idx + 1}) """ return report def create_downloadable_csv(self) -> bytes: """ایجاد محتوای CSV برای دانلود مستقیم""" if self.results_df is None or self.results_df.empty: return None try: # تولید محتوای CSV در حافظه csv_buffer = io.StringIO() self.results_df.to_csv(csv_buffer, index=False, encoding='utf-8') csv_content = csv_buffer.getvalue() csv_buffer.close() # تبدیل به bytes برای دانلود return csv_content.encode('utf-8-sig') except Exception as e: print(f"خطا در ایجاد محتوای CSV: {str(e)}") return None def create_evaluation_interface(): """ایجاد رابط کاربری ارزیابی""" evaluator = AnonymizationEvaluator() with gr.Blocks( title="ارزیابی ناشناس‌سازی", theme=gr.themes.Soft(), css=""" .gradio-container { font-family: 'Tahoma', 'Arial', sans-serif !important; direction: rtl; max-width: 1200px; margin: 0 auto; } .upload-area { border: 2px dashed #4CAF50; border-radius: 15px; padding: 30px; text-align: center; background: linear-gradient(145deg, #f8f9fa, #e9ecef); margin: 20px 0; } .results-table { direction: ltr; font-family: monospace; font-size: 12px; } .summary-box { background-color: #e3f2fd; border: 1px solid #2196F3; border-radius: 10px; padding: 20px; margin: 15px 0; } """ ) as interface: gr.Markdown(""" # 📊 ابزار ارزیابی ناشناس‌سازی با متن مرجع ### آپلود فایل CSV شامل ستون‌های: original_text, Reference_text, anonymized_text """) with gr.Row(): with gr.Column(scale=1): gr.Markdown("### 📁 بارگذاری فایل") file_input = gr.File( label="انتخاب فایل CSV", file_types=[".csv"], elem_classes=["upload-area"] ) evaluate_btn = gr.Button( "🚀 شروع ارزیابی", variant="primary", size="lg", interactive=False ) download_btn = gr.Button( "💾 دانلود نتایج CSV", variant="secondary", visible=False ) with gr.Column(scale=2): status_output = gr.Markdown("وضعیت: آماده بارگذاری فایل...") summary_output = gr.Markdown( visible=False, elem_classes=["summary-box"] ) # جدول نتایج results_table = gr.Dataframe( label="نتایج تفصیلی (نمایش 10 سطر اول)", visible=False, elem_classes=["results-table"], wrap=True ) # فایل دانلود download_file = gr.File( visible=False, label="فایل نتایج" ) def on_file_upload(file): if file is None: return "❌ لطفاً فایل را انتخاب کنید", gr.Button(interactive=False) return "✅ فایل بارگذاری شد، آماده ارزیابی", gr.Button(interactive=True) def evaluate_file(file): if file is None: return ( "❌ هیچ فایلی انتخاب نشده", gr.Markdown(visible=False), gr.Dataframe(visible=False), gr.Button(visible=False), gr.File(visible=False) ) try: success, message, df = evaluator.evaluate_dataset(file.name) if not success: return ( f"❌ {message}", gr.Markdown(visible=False), gr.Dataframe(visible=False), gr.Button(visible=False), gr.File(visible=False) ) # تولید گزارش خلاصه summary = evaluator.generate_summary_report(df) # نمایش 10 سطر اول برای نمونه در رابط display_df = df.head(10) # پیام اطلاع‌رسانی status_message = f"✅ {message} - {len(df)} سطر پردازش شد. نمایش: 10 سطر اول، دانلود: همه سطرها" return ( status_message, gr.Markdown(value=summary, visible=True), gr.Dataframe(value=display_df, visible=True), gr.Button(visible=True), gr.File(visible=False) ) except Exception as e: return ( f"❌ خطای غیرمنتظره: {str(e)}", gr.Markdown(visible=False), gr.Dataframe(visible=False), gr.Button(visible=False), gr.File(visible=False) ) def download_results(): try: if evaluator.results_df is None or evaluator.results_df.empty: return ( "❌ هیچ داده‌ای برای دانلود وجود ندارد. ابتدا ارزیابی را انجام دهید.", gr.File(visible=False) ) # ایجاد محتوای CSV csv_content = evaluator.create_downloadable_csv() if csv_content: # تولید نام فایل timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"evaluation_results_{timestamp}.csv" # ذخیره در فایل موقت برای دانلود with tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.csv', prefix='eval_') as temp_file: temp_file.write(csv_content) temp_filename = temp_file.name return ( f"✅ فایل نتایج آماده شد: {filename} ({len(evaluator.results_df)} سطر)", gr.File(value=temp_filename, visible=True) ) else: return ( "❌ خطا در ایجاد محتوای CSV", gr.File(visible=False) ) except Exception as e: return ( f"❌ خطا در دانلود: {str(e)}", gr.File(visible=False) ) # اتصال رویدادها file_input.change( fn=on_file_upload, inputs=[file_input], outputs=[status_output, evaluate_btn] ) evaluate_btn.click( fn=evaluate_file, inputs=[file_input], outputs=[status_output, summary_output, results_table, download_btn, download_file] ) download_btn.click( fn=download_results, outputs=[status_output, download_file] ) # راهنمای استفاده with gr.Accordion("📖 راهنمای استفاده", open=False): gr.Markdown(""" ### فرمت فایل CSV مورد نیاز: فایل شما باید حاوی دقیقاً این سه ستون باشد: - **original_text**: متن اصلی - **Reference_text**: متن ناشناس‌شده مرجع (Ground Truth) - **anonymized_text**: متن ناشناس‌شده مورد ارزیابی ### 🆕 روش جدید ارزیابی: بر اساس نوع موجودیت‌ها برنامه حالا **بدون در نظر گیری شناسه‌های عددی** ارزیابی می‌کند: #### مثال: - **Reference**: `company-01 amount-02 person-03` - **Predicted**: `company-05 amount-08 person-01` - **نتیجه**: تطبیق کامل! (چون هر دو دارای 1 company + 1 amount + 1 person هستند) #### مزایای این روش: - ✅ نادیده گرفتن شناسه‌های عددی نامنظم - ✅ تمرکز روی نوع و تعداد موجودیت‌ها - ✅ عملی‌تر برای ارزیابی کیفیت ناشناس‌سازی ### متریک‌های محاسبه شده: - **Precision**: (تعداد موجودیت‌های صحیح) / (کل موجودیت‌های شناسایی شده) - **Recall**: (تعداد موجودیت‌های صحیح) / (کل موجودیت‌های مرجع) - **F1-Score**: میانگین هارمونیک Precision و Recall ### انواع موجودیت‌های پشتیبانی شده: - `company-XX`, `Company-XX`, `COMPANY-XX` → شرکت‌ها - `person-XX`, `Person-XX`, `PERSON-XX` → اشخاص - `amount-XX`, `Amount-XX`, `AMOUNT-XX` → مبالغ و اعداد - `percent-XX`, `Percent-XX`, `PERCENT-XX` → درصدها - `group-XX`, `Group-XX`, `GROUP-XX` → گروه‌ها ### مراحل کار: 1. فایل CSV را آپلود کنید 2. روی "شروع ارزیابی" کلیک کنید 3. گزارش خلاصه و جدول نمونه (10 سطر اول) را مشاهده کنید 4. **فایل نتایج کامل (همه سطرها) را دانلود کنید** ### نکات مهم: - **نمایش رابط**: فقط 10 سطر اول نمایش داده می‌شود - **فایل دانلود**: شامل تمام سطرهای پردازش شده + متریک‌ها - فایل خروجی شامل ستون‌های اصلی + سه ستون متریک خواهد بود - متریک‌ها برای هر سطر جداگانه محاسبه می‌شوند - آمار کلی در گزارش خلاصه نمایش داده می‌شود ### مشکل در دانلود؟ اگر فایل دانلود نمی‌شود: 1. مرورگر خود را رفرش کنید 2. مجدداً ارزیابی را انجام دهید 3. اطمینان حاصل کنید که popup blocker غیرفعال است 4. از تست دانلود در پایین استفاده کنید """) # نمایش مثال عملی with gr.Accordion("💡 مثال عملی روش جدید", open=False): gr.Markdown(""" ### روش قدیم vs جدید: **متن مرجع**: `شرکت company-01 با سرمایه amount-02 میلیارد تومان و سهم percent-03 از بازار` **متن پیش‌بینی شده**: `شرکت company-10 با سرمایه amount-50 میلیارد تومان و سهم percent-25 از بازار` #### روش قدیم (تطبیق دقیق شناسه): - کل موجودیت‌های مرجع: company-01, amount-02, percent-03 - کل موجودیت‌های پیش‌بینی: company-10, amount-50, percent-25 - تطبیق: صفر! (چون شناسه‌ها متفاوت است) - نتیجه: Precision=0, Recall=0, F1=0 #### روش جدید (بر اساس نوع): - انواع موجودیت‌های مرجع: 1 company, 1 amount, 1 percent - انواع موجودیت‌های پیش‌بینی: 1 company, 1 amount, 1 percent - تطبیق: کامل! (تعداد و نوع یکسان است) - نتیجه: Precision=1.0, Recall=1.0, F1=1.0 این روش جدید خیلی عادلانه‌تر است! 🎯 """) return interface # اجرای برنامه if __name__ == "__main__": interface = create_evaluation_interface() interface.launch()