import pandas as pd import numpy as np import re from typing import Dict, List, Tuple, Set import gradio as gr from datetime import datetime import io import tempfile import os class AnonymizationEvaluator: """ابزار ارزیابی ناشناس‌سازی با استفاده از متن مرجع""" def __init__(self): self.results_df = None def extract_entity_types_from_text(self, text: str) -> Dict[str, int]: """استخراج انواع موجودیت‌ها با پشتیبانی از فرمت‌های مختلف""" if pd.isna(text) or not isinstance(text, str): return {'companies': 0, 'persons': 0, 'amounts': 0, 'percents': 0, 'groups': 0, 'stocks': 0} # تمیز کردن متن text = str(text).strip() # الگوهای برای فرمت‌های مختلف patterns = { 'companies': [ r'company-\d+', r'Company-\d+', r'COMPANY-\d+', # فرمت استاندارد r'COMPANY_\d+(?:_[A-Z]+)?', r'company_\d+(?:_[a-z]+)?' # فرمت regex ], 'persons': [ r'person-\d+', r'Person-\d+', r'PERSON-\d+', r'PERSON_\d+(?:_[A-Z]+)?', r'person_\d+(?:_[a-z]+)?' ], 'amounts': [ r'amount-\d+', r'Amount-\d+', r'AMOUNT-\d+', r'AMOUNT_\d+(?:_[A-Z]+)?', r'amount_\d+(?:_[a-z]+)?' ], 'percents': [ r'percent-\d+', r'Percent-\d+', r'PERCENT-\d+', r'PERCENT_\d+(?:_[A-Z]+)?', r'percent_\d+(?:_[a-z]+)?' ], 'groups': [ r'group-\d+', r'Group-\d+', r'GROUP-\d+', r'GROUP_\d+(?:_[A-Z]+)?', r'group_\d+(?:_[a-z]+)?' ], 'stocks': [ r'stock-\d+', r'Stock-\d+', r'STOCK-\d+', r'STOCK_SYMBOL_\d+(?:_[A-Z]+)?', r'stock_symbol_\d+(?:_[a-z]+)?' ] } entity_counts = {} for entity_type, pattern_list in patterns.items(): count = 0 for pattern in pattern_list: matches = re.findall(pattern, text) count += len(matches) entity_counts[entity_type] = count return entity_counts def normalize_entity_format(self, text: str) -> str: """نرمال‌سازی فرمت‌های مختلف به یک فرمت استاندارد""" if pd.isna(text) or not isinstance(text, str): return "" # نگاشت انواع مختلف به نام استاندارد replacements = [ # فرمت regex به استاندارد (r'COMPANY_(\d+)(?:_[A-Z]+)?', r'company-\1'), (r'PERSON_(\d+)(?:_[A-Z]+)?', r'person-\1'), (r'AMOUNT_(\d+)(?:_[A-Z]+)?', r'amount-\1'), (r'PERCENT_(\d+)(?:_[A-Z]+)?', r'percent-\1'), (r'GROUP_(\d+)(?:_[A-Z]+)?', r'group-\1'), (r'STOCK_SYMBOL_(\d+)(?:_[A-Z]+)?', r'stock-\1'), # تبدیل حروف بزرگ به کوچک (r'Company-(\d+)', r'company-\1'), (r'Person-(\d+)', r'person-\1'), (r'Amount-(\d+)', r'amount-\1'), (r'Percent-(\d+)', r'percent-\1'), (r'Group-(\d+)', r'group-\1'), ] normalized_text = text for pattern, replacement in replacements: normalized_text = re.sub(pattern, replacement, normalized_text) return normalized_text def calculate_precision_recall_f1_by_type(self, reference_counts: Dict[str, int], predicted_counts: Dict[str, int]) -> Tuple[float, float, float]: """محاسبه Precision, Recall و F1-Score بر اساس نوع موجودیت‌ها""" # مجموع کل موجودیت‌ها total_reference = sum(reference_counts.values()) total_predicted = sum(predicted_counts.values()) if total_predicted == 0 and total_reference == 0: return 1.0, 1.0, 1.0 # هر دو خالی هستند - تطبیق کامل elif total_predicted == 0: return 0.0, 0.0, 0.0 # predicted خالی ولی reference دارد elif total_reference == 0: return 0.0, 1.0 if total_predicted > 0 else 1.0, 0.0 # reference خالی # محاسبه True Positive برای هر نوع موجودیت true_positives = 0 for entity_type in reference_counts.keys(): ref_count = reference_counts[entity_type] pred_count = predicted_counts[entity_type] # True Positive = کمترین تعداد بین reference و predicted برای هر نوع true_positives += min(ref_count, pred_count) # محاسبه Precision و Recall precision = true_positives / total_predicted if total_predicted > 0 else 0.0 recall = true_positives / total_reference if total_reference > 0 else 0.0 # محاسبه F1-Score if precision + recall == 0: f1 = 0.0 else: f1 = 2 * (precision * recall) / (precision + recall) return precision, recall, f1 def evaluate_single_row(self, reference_text: str, predicted_text: str) -> Tuple[float, float, float]: """ارزیابی یک سطر بر اساس نوع موجودیت‌ها""" try: # استخراج تعداد موجودیت‌ها بر اساس نوع ref_counts = self.extract_entity_types_from_text(reference_text) pred_counts = self.extract_entity_types_from_text(predicted_text) # محاسبه متریک‌ها precision, recall, f1 = self.calculate_precision_recall_f1_by_type(ref_counts, pred_counts) return precision, recall, f1 except Exception as e: print(f"خطا در ارزیابی: {str(e)}") return 0.0, 0.0, 0.0 def debug_text_analysis(self, reference_text: str, predicted_text: str, row_num: int = 0) -> str: """تابع debugging برای تحلیل متن‌ها بر اساس نوع موجودیت‌ها""" debug_info = f"\n--- Debug Row {row_num + 1} ---\n" debug_info += f"Reference: '{reference_text[:100]}...'\n" debug_info += f"Predicted: '{predicted_text[:100]}...'\n" ref_counts = self.extract_entity_types_from_text(reference_text) pred_counts = self.extract_entity_types_from_text(predicted_text) debug_info += f"Reference entity counts: {ref_counts}\n" debug_info += f"Predicted entity counts: {pred_counts}\n" # محاسبه متریک‌ها برای این سطر precision, recall, f1 = self.calculate_precision_recall_f1_by_type(ref_counts, pred_counts) debug_info += f"Metrics: P={precision:.3f}, R={recall:.3f}, F1={f1:.3f}\n" return debug_info def evaluate_dataset(self, file_path: str) -> Tuple[bool, str, pd.DataFrame]: """ارزیابی کل دیتاست با پشتیبانی از فرمت‌های مختلف""" try: # بارگذاری فایل df = pd.read_csv(file_path) # بررسی ستون‌ها - پشتیبانی از دو حالت if 'Reference_text' in df.columns and 'anonymized_text' in df.columns: # حالت سه ستونه (فرمت قبلی) required_columns = ['original_text', 'Reference_text', 'anonymized_text'] missing_columns = [col for col in required_columns if col not in df.columns] if missing_columns: return False, f"ستون‌های مفقود: {', '.join(missing_columns)}", pd.DataFrame() reference_col = 'Reference_text' predicted_col = 'anonymized_text' elif 'original_text' in df.columns and 'anonymized_text' in df.columns: # حالت دو ستونه جدید - فرض: original_text مرجع است reference_col = 'original_text' # متن اصلی predicted_col = 'anonymized_text' # متن ناشناس‌شده با regex else: return False, "فایل باید شامل ستون‌های 'original_text' و 'anonymized_text' باشد", pd.DataFrame() # تشخیص مشکل - بررسی نمونه‌ای از داده‌ها debug_info = "\n=== ارزیابی با پشتیبانی فرمت‌های مختلف ===\n" debug_info += f"تعداد سطرها: {len(df)}\n" debug_info += f"ستون‌ها: {list(df.columns)}\n" debug_info += f"مرجع: {reference_col}, پیش‌بینی: {predicted_col}\n\n" # بررسی چند سطر اول for i in range(min(3, len(df))): ref_text = str(df.iloc[i][reference_col]) pred_text = str(df.iloc[i][predicted_col]) debug_info += self.debug_text_analysis(ref_text, pred_text, i) # نمایش نرمال‌سازی normalized_ref = self.normalize_entity_format(ref_text) normalized_pred = self.normalize_entity_format(pred_text) if normalized_ref != ref_text or normalized_pred != pred_text: debug_info += f" نرمال‌سازی مرجع: '{normalized_ref[:50]}...'\n" debug_info += f" نرمال‌سازی پیش‌بینی: '{normalized_pred[:50]}...'\n" print(debug_info) # نمایش در console # محاسبه متریک‌ها برای هر سطر precisions = [] recalls = [] f1_scores = [] total_entity_types_found = 0 for index, row in df.iterrows(): precision, recall, f1 = self.evaluate_single_row( row[reference_col], row[predicted_col] ) precisions.append(round(precision, 4)) recalls.append(round(recall, 4)) f1_scores.append(round(f1, 4)) # شمارش انواع موجودیت‌ها برای debugging ref_counts = self.extract_entity_types_from_text(str(row[reference_col])) pred_counts = self.extract_entity_types_from_text(str(row[predicted_col])) total_entity_types_found += sum(ref_counts.values()) + sum(pred_counts.values()) # اضافه کردن ستون‌های جدید df['Precision'] = precisions df['Recall'] = recalls df['F1_Score'] = f1_scores # ذخیره نتایج self.results_df = df # پیام وضعیت شامل اطلاعات debugging avg_precision = np.mean(precisions) avg_recall = np.mean(recalls) avg_f1 = np.mean(f1_scores) status_message = f"""ارزیابی انجام شد (مرجع: {reference_col}): • میانگین Precision: {avg_precision:.3f} • میانگین Recall: {avg_recall:.3f} • میانگین F1-Score: {avg_f1:.3f} • کل موجودیت‌های یافت شده: {total_entity_types_found} • پشتیبانی فرمت REGEX اضافه شد""" if total_entity_types_found == 0: status_message += "\n⚠️ هیچ موجودیتی تشخیص داده نشد!" return True, status_message, df except Exception as e: return False, f"خطا در پردازش فایل: {str(e)}", pd.DataFrame() def generate_summary_report(self, df: pd.DataFrame) -> str: """تولید گزارش خلاصه""" if df.empty: return "هیچ داده‌ای برای گزارش یافت نشد" # محاسبه آمار کلی avg_precision = df['Precision'].mean() avg_recall = df['Recall'].mean() avg_f1 = df['F1_Score'].mean() # محاسبه آمار تفصیلی total_rows = len(df) high_precision_count = len(df[df['Precision'] >= 0.8]) high_recall_count = len(df[df['Recall'] >= 0.8]) high_f1_count = len(df[df['F1_Score'] >= 0.8]) # بهترین و بدترین نتایج best_f1_idx = df['F1_Score'].idxmax() worst_f1_idx = df['F1_Score'].idxmin() report = f""" ## 📊 گزارش جامع ارزیابی ### آمار کلی: - **تعداد کل سطرها:** {total_rows} - **میانگین Precision:** {avg_precision:.4f} - **میانگین Recall:** {avg_recall:.4f} - **میانگین F1-Score:** {avg_f1:.4f} ### توزیع عملکرد (امتیاز ≥ 0.8): - **Precision بالا:** {high_precision_count} سطر ({high_precision_count/total_rows*100:.1f}%) - **Recall بالا:** {high_recall_count} سطر ({high_recall_count/total_rows*100:.1f}%) - **F1-Score بالا:** {high_f1_count} سطر ({high_f1_count/total_rows*100:.1f}%) ### نمونه‌های برتر و ضعیف: - **بهترین F1-Score:** {df.loc[best_f1_idx, 'F1_Score']:.4f} (سطر {best_f1_idx + 1}) - **ضعیف‌ترین F1-Score:** {df.loc[worst_f1_idx, 'F1_Score']:.4f} (سطر {worst_f1_idx + 1}) """ return report def create_downloadable_csv(self) -> bytes: """ایجاد محتوای CSV برای دانلود مستقیم""" if self.results_df is None or self.results_df.empty: return None try: # تولید محتوای CSV در حافظه csv_buffer = io.StringIO() self.results_df.to_csv(csv_buffer, index=False, encoding='utf-8') csv_content = csv_buffer.getvalue() csv_buffer.close() # تبدیل به bytes برای دانلود return csv_content.encode('utf-8-sig') except Exception as e: print(f"خطا در ایجاد محتوای CSV: {str(e)}") return None def create_evaluation_interface(): """ایجاد رابط کاربری ارزیابی""" evaluator = AnonymizationEvaluator() with gr.Blocks( title="ارزیابی ناشناس‌سازی", theme=gr.themes.Soft(), css=""" .gradio-container { font-family: 'Tahoma', 'Arial', sans-serif !important; direction: rtl; max-width: 1200px; margin: 0 auto; } .upload-area { border: 2px dashed #4CAF50; border-radius: 15px; padding: 30px; text-align: center; background: linear-gradient(145deg, #f8f9fa, #e9ecef); margin: 20px 0; } .results-table { direction: ltr; font-family: monospace; font-size: 12px; } .summary-box { background-color: #e3f2fd; border: 1px solid #2196F3; border-radius: 10px; padding: 20px; margin: 15px 0; } """ ) as interface: gr.Markdown(""" # 📊 ابزار ارزیابی ناشناس‌سازی با پشتیبانی فرمت‌های مختلف ### پشتیبانی از فرمت‌های استاندارد و REGEX """) with gr.Row(): with gr.Column(scale=1): gr.Markdown("### 📁 بارگذاری فایل") file_input = gr.File( label="انتخاب فایل CSV (2 یا 3 ستون)", file_types=[".csv"], elem_classes=["upload-area"] ) evaluate_btn = gr.Button( "🚀 شروع ارزیابی", variant="primary", size="lg", interactive=False ) download_btn = gr.Button( "💾 دانلود نتایج CSV", variant="secondary", visible=False ) with gr.Column(scale=2): status_output = gr.Markdown("وضعیت: آماده بارگذاری فایل...") summary_output = gr.Markdown( visible=False, elem_classes=["summary-box"] ) # جدول نتایج results_table = gr.Dataframe( label="نتایج تفصیلی (نمایش 10 سطر اول)", visible=False, elem_classes=["results-table"], wrap=True ) # فایل دانلود download_file = gr.File( visible=False, label="فایل نتایج" ) def on_file_upload(file): if file is None: return "❌ لطفاً فایل را انتخاب کنید", gr.Button(interactive=False) return "✅ فایل بارگذاری شد، آماده ارزیابی", gr.Button(interactive=True) def evaluate_file(file): if file is None: return ( "❌ هیچ فایلی انتخاب نشده", gr.Markdown(visible=False), gr.Dataframe(visible=False), gr.Button(visible=False), gr.File(visible=False) ) try: success, message, df = evaluator.evaluate_dataset(file.name) if not success: return ( f"❌ {message}", gr.Markdown(visible=False), gr.Dataframe(visible=False), gr.Button(visible=False), gr.File(visible=False) ) # تولید گزارش خلاصه summary = evaluator.generate_summary_report(df) # نمایش 10 سطر اول برای نمونه در رابط display_df = df.head(10) # پیام اطلاع‌رسانی status_message = f"✅ {message} - {len(df)} سطر پردازش شد. نمایش: 10 سطر اول، دانلود: همه سطرها" return ( status_message, gr.Markdown(value=summary, visible=True), gr.Dataframe(value=display_df, visible=True), gr.Button(visible=True), gr.File(visible=False) ) except Exception as e: return ( f"❌ خطای غیرمنتظره: {str(e)}", gr.Markdown(visible=False), gr.Dataframe(visible=False), gr.Button(visible=False), gr.File(visible=False) ) def download_results(): try: if evaluator.results_df is None or evaluator.results_df.empty: return ( "❌ هیچ داده‌ای برای دانلود وجود ندارد. ابتدا ارزیابی را انجام دهید.", gr.File(visible=False) ) # ایجاد محتوای CSV csv_content = evaluator.create_downloadable_csv() if csv_content: # تولید نام فایل timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"evaluation_results_{timestamp}.csv" # ذخیره در فایل موقت برای دانلود with tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.csv', prefix='eval_') as temp_file: temp_file.write(csv_content) temp_filename = temp_file.name return ( f"✅ فایل نتایج آماده شد: {filename} ({len(evaluator.results_df)} سطر)", gr.File(value=temp_filename, visible=True) ) else: return ( "❌ خطا در ایجاد محتوای CSV", gr.File(visible=False) ) except Exception as e: return ( f"❌ خطا در دانلود: {str(e)}", gr.File(visible=False) ) # اتصال رویدادها file_input.change( fn=on_file_upload, inputs=[file_input], outputs=[status_output, evaluate_btn] ) evaluate_btn.click( fn=evaluate_file, inputs=[file_input], outputs=[status_output, summary_output, results_table, download_btn, download_file] ) download_btn.click( fn=download_results, outputs=[status_output, download_file] ) # راهنمای استفاده with gr.Accordion("📖 راهنمای استفاده", open=False): gr.Markdown(""" ### فرمت‌های پشتیبانی شده: **حالت 1: فایل 3 ستونه (مرجع و پیش‌بینی جداگانه)** - original_text: متن اصلی - Reference_text: متن مرجع ناشناس‌شده - anonymized_text: متن پیش‌بینی شده **حالت 2: فایل 2 ستونه (مقایسه با اصل)** - original_text: متن اصلی (مرجع) - anonymized_text: متن ناشناس‌شده (پیش‌بینی) ### انواع فرمت‌های موجودیت پشتیبانی شده: **فرمت استاندارد:** - company-01, person-02, amount-03 - Company-01, Person-02, Amount-03 - COMPANY-01, PERSON-02, AMOUNT-03 **فرمت REGEX:** - COMPANY_001_REGEX, PERSON_002_REGEX - AMOUNT_012, PERCENT_025 - STOCK_SYMBOL_001_REGEX ### نرمال‌سازی خودکار: برنامه خودکار انواع مختلف را تشخیص داده و نرمال‌سازی می‌کند: - COMPANY_001_REGEX → company-001 - AMOUNT_012 → amount-012 - STOCK_SYMBOL_005_REGEX → stock-005 ### متریک‌های محاسبه شده: - **Precision**: موجودیت‌های درست / کل شناسایی‌شده - **Recall**: موجودیت‌های درست / کل مرجع - **F1-Score**: میانگین هارمونیک Precision و Recall ### مراحل کار: 1. فایل CSV را آپلود کنید (2 یا 3 ستون) 2. روی "شروع ارزیابی" کلیک کنید 3. گزارش و نتایج را مشاهده کنید 4. فایل نتایج کامل را دانلود کنید ### نکات: - برنامه خودکار تشخیص می‌دهد فایل 2 ستونه است یا 3 ستونه - فرمت‌های مختلف خودکار نرمال‌سازی می‌شوند - فقط نوع و تعداد موجودیت‌ها مهم است، شناسه عددی نه """) # نمایش مثال فرمت‌های مختلف with gr.Accordion("💡 مثال فرمت‌های مختلف", open=False): gr.Markdown(""" ### مثال 1: فرمت استاندارد vs فرمت REGEX **متن اصلی**: `شرکت فولاد مبارکه با درآمد 127 میلیارد تومان` **فرمت استاندارد**: `شرکت company-01 با درآمد amount-02 میلیارد تومان` **فرمت REGEX**: `شرکت COMPANY_001_REGEX با درآمد AMOUNT_012 میلیارد تومان` **نتیجه ارزیابی**: - هر دو: 1 company + 1 amount - Precision = 1.0, Recall = 1.0, F1 = 1.0 ### مثال 2: تعداد موجودیت‌های متفاوت **مرجع**: `company-01 amount-02 person-03` (1+1+1=3) **پیش‌بینی**: `COMPANY_001_REGEX AMOUNT_012` (1+1=2) **نتیجه**: - True Positive = min(1,1) + min(1,1) + min(1,0) = 2 - Precision = 2/2 = 1.0 - Recall = 2/3 = 0.67 - F1 = 0.80 """) return interface # اجرای برنامه if __name__ == "__main__": interface = create_evaluation_interface() interface.launch()