|
|
import pandas as pd |
|
|
import numpy as np |
|
|
import re |
|
|
from typing import Dict, List, Tuple, Set |
|
|
import gradio as gr |
|
|
from datetime import datetime |
|
|
import io |
|
|
import tempfile |
|
|
import os |
|
|
|
|
|
class AnonymizationEvaluator: |
|
|
"""ابزار ارزیابی ناشناسسازی با استفاده از متن مرجع""" |
|
|
|
|
|
def __init__(self): |
|
|
self.results_df = None |
|
|
|
|
|
def extract_entity_types_from_text(self, text: str) -> Dict[str, int]: |
|
|
"""استخراج انواع موجودیتها بدون در نظر گرفتن شناسه عددی""" |
|
|
if pd.isna(text) or not isinstance(text, str): |
|
|
return {'companies': 0, 'persons': 0, 'amounts': 0, 'percents': 0, 'groups': 0} |
|
|
|
|
|
|
|
|
text = str(text).strip() |
|
|
|
|
|
|
|
|
patterns = { |
|
|
'companies': [r'company-\d+', r'Company-\d+', r'COMPANY-\d+'], |
|
|
'persons': [r'person-\d+', r'Person-\d+', r'PERSON-\d+'], |
|
|
'amounts': [r'amount-\d+', r'Amount-\d+', r'AMOUNT-\d+'], |
|
|
'percents': [r'percent-\d+', r'Percent-\d+', r'PERCENT-\d+'], |
|
|
'groups': [r'group-\d+', r'Group-\d+', r'GROUP-\d+'] |
|
|
} |
|
|
|
|
|
entity_counts = {} |
|
|
for entity_type, pattern_list in patterns.items(): |
|
|
count = 0 |
|
|
for pattern in pattern_list: |
|
|
matches = re.findall(pattern, text) |
|
|
count += len(matches) |
|
|
entity_counts[entity_type] = count |
|
|
|
|
|
return entity_counts |
|
|
|
|
|
def calculate_precision_recall_f1_by_type(self, reference_counts: Dict[str, int], |
|
|
predicted_counts: Dict[str, int]) -> Tuple[float, float, float]: |
|
|
"""محاسبه Precision, Recall و F1-Score بر اساس نوع موجودیتها""" |
|
|
|
|
|
|
|
|
total_reference = sum(reference_counts.values()) |
|
|
total_predicted = sum(predicted_counts.values()) |
|
|
|
|
|
if total_predicted == 0 and total_reference == 0: |
|
|
return 1.0, 1.0, 1.0 |
|
|
elif total_predicted == 0: |
|
|
return 0.0, 0.0, 0.0 |
|
|
elif total_reference == 0: |
|
|
return 0.0, 1.0 if total_predicted > 0 else 1.0, 0.0 |
|
|
|
|
|
|
|
|
true_positives = 0 |
|
|
for entity_type in reference_counts.keys(): |
|
|
ref_count = reference_counts[entity_type] |
|
|
pred_count = predicted_counts[entity_type] |
|
|
|
|
|
true_positives += min(ref_count, pred_count) |
|
|
|
|
|
|
|
|
precision = true_positives / total_predicted if total_predicted > 0 else 0.0 |
|
|
recall = true_positives / total_reference if total_reference > 0 else 0.0 |
|
|
|
|
|
|
|
|
if precision + recall == 0: |
|
|
f1 = 0.0 |
|
|
else: |
|
|
f1 = 2 * (precision * recall) / (precision + recall) |
|
|
|
|
|
return precision, recall, f1 |
|
|
|
|
|
def evaluate_single_row(self, reference_text: str, predicted_text: str) -> Tuple[float, float, float]: |
|
|
"""ارزیابی یک سطر بر اساس نوع موجودیتها""" |
|
|
try: |
|
|
|
|
|
ref_counts = self.extract_entity_types_from_text(reference_text) |
|
|
pred_counts = self.extract_entity_types_from_text(predicted_text) |
|
|
|
|
|
|
|
|
precision, recall, f1 = self.calculate_precision_recall_f1_by_type(ref_counts, pred_counts) |
|
|
|
|
|
return precision, recall, f1 |
|
|
|
|
|
except Exception as e: |
|
|
print(f"خطا در ارزیابی: {str(e)}") |
|
|
return 0.0, 0.0, 0.0 |
|
|
|
|
|
def debug_text_analysis(self, reference_text: str, predicted_text: str, row_num: int = 0) -> str: |
|
|
"""تابع debugging برای تحلیل متنها بر اساس نوع موجودیتها""" |
|
|
debug_info = f"\n--- Debug Row {row_num + 1} ---\n" |
|
|
debug_info += f"Reference: '{reference_text[:100]}...'\n" |
|
|
debug_info += f"Predicted: '{predicted_text[:100]}...'\n" |
|
|
|
|
|
ref_counts = self.extract_entity_types_from_text(reference_text) |
|
|
pred_counts = self.extract_entity_types_from_text(predicted_text) |
|
|
|
|
|
debug_info += f"Reference entity counts: {ref_counts}\n" |
|
|
debug_info += f"Predicted entity counts: {pred_counts}\n" |
|
|
|
|
|
|
|
|
precision, recall, f1 = self.calculate_precision_recall_f1_by_type(ref_counts, pred_counts) |
|
|
debug_info += f"Metrics: P={precision:.3f}, R={recall:.3f}, F1={f1:.3f}\n" |
|
|
|
|
|
return debug_info |
|
|
|
|
|
def evaluate_dataset(self, file_path: str) -> Tuple[bool, str, pd.DataFrame]: |
|
|
"""ارزیابی کل دیتاست بر اساس نوع موجودیتها""" |
|
|
try: |
|
|
|
|
|
df = pd.read_csv(file_path) |
|
|
|
|
|
|
|
|
required_columns = ['original_text', 'Reference_text', 'anonymized_text'] |
|
|
missing_columns = [col for col in required_columns if col not in df.columns] |
|
|
|
|
|
if missing_columns: |
|
|
return False, f"ستونهای مفقود: {', '.join(missing_columns)}", pd.DataFrame() |
|
|
|
|
|
|
|
|
debug_info = "\n=== روش جدید: ارزیابی بر اساس نوع موجودیتها ===\n" |
|
|
debug_info += f"تعداد سطرها: {len(df)}\n" |
|
|
debug_info += f"ستونها: {list(df.columns)}\n\n" |
|
|
|
|
|
|
|
|
for i in range(min(3, len(df))): |
|
|
ref_text = str(df.iloc[i]['Reference_text']) |
|
|
anon_text = str(df.iloc[i]['anonymized_text']) |
|
|
|
|
|
debug_info += self.debug_text_analysis(ref_text, anon_text, i) |
|
|
|
|
|
print(debug_info) |
|
|
|
|
|
|
|
|
precisions = [] |
|
|
recalls = [] |
|
|
f1_scores = [] |
|
|
|
|
|
total_entity_types_found = 0 |
|
|
|
|
|
for index, row in df.iterrows(): |
|
|
precision, recall, f1 = self.evaluate_single_row( |
|
|
row['Reference_text'], |
|
|
row['anonymized_text'] |
|
|
) |
|
|
|
|
|
precisions.append(round(precision, 4)) |
|
|
recalls.append(round(recall, 4)) |
|
|
f1_scores.append(round(f1, 4)) |
|
|
|
|
|
|
|
|
ref_counts = self.extract_entity_types_from_text(str(row['Reference_text'])) |
|
|
pred_counts = self.extract_entity_types_from_text(str(row['anonymized_text'])) |
|
|
total_entity_types_found += sum(ref_counts.values()) + sum(pred_counts.values()) |
|
|
|
|
|
|
|
|
df['Precision'] = precisions |
|
|
df['Recall'] = recalls |
|
|
df['F1_Score'] = f1_scores |
|
|
|
|
|
|
|
|
self.results_df = df |
|
|
|
|
|
|
|
|
avg_precision = np.mean(precisions) |
|
|
avg_recall = np.mean(recalls) |
|
|
avg_f1 = np.mean(f1_scores) |
|
|
|
|
|
status_message = f"""✅ ارزیابی بر اساس نوع موجودیتها انجام شد: |
|
|
• میانگین Precision: {avg_precision:.3f} |
|
|
• میانگین Recall: {avg_recall:.3f} |
|
|
• میانگین F1-Score: {avg_f1:.3f} |
|
|
• کل موجودیتهای یافت شده: {total_entity_types_found}""" |
|
|
|
|
|
if total_entity_types_found == 0: |
|
|
status_message += "\n⚠️ هیچ موجودیتی تشخیص داده نشد! لطفاً فرمت دادهها را بررسی کنید." |
|
|
|
|
|
return True, status_message, df |
|
|
|
|
|
except Exception as e: |
|
|
return False, f"خطا در پردازش فایل: {str(e)}", pd.DataFrame() |
|
|
|
|
|
def generate_summary_report(self, df: pd.DataFrame) -> str: |
|
|
"""تولید گزارش خلاصه""" |
|
|
if df.empty: |
|
|
return "هیچ دادهای برای گزارش یافت نشد" |
|
|
|
|
|
|
|
|
avg_precision = df['Precision'].mean() |
|
|
avg_recall = df['Recall'].mean() |
|
|
avg_f1 = df['F1_Score'].mean() |
|
|
|
|
|
|
|
|
total_rows = len(df) |
|
|
high_precision_count = len(df[df['Precision'] >= 0.8]) |
|
|
high_recall_count = len(df[df['Recall'] >= 0.8]) |
|
|
high_f1_count = len(df[df['F1_Score'] >= 0.8]) |
|
|
|
|
|
|
|
|
best_f1_idx = df['F1_Score'].idxmax() |
|
|
worst_f1_idx = df['F1_Score'].idxmin() |
|
|
|
|
|
report = f""" |
|
|
## 📊 گزارش جامع ارزیابی |
|
|
|
|
|
### آمار کلی: |
|
|
- **تعداد کل سطرها:** {total_rows} |
|
|
- **میانگین Precision:** {avg_precision:.4f} |
|
|
- **میانگین Recall:** {avg_recall:.4f} |
|
|
- **میانگین F1-Score:** {avg_f1:.4f} |
|
|
|
|
|
### توزیع عملکرد (امتیاز ≥ 0.8): |
|
|
- **Precision بالا:** {high_precision_count} سطر ({high_precision_count/total_rows*100:.1f}%) |
|
|
- **Recall بالا:** {high_recall_count} سطر ({high_recall_count/total_rows*100:.1f}%) |
|
|
- **F1-Score بالا:** {high_f1_count} سطر ({high_f1_count/total_rows*100:.1f}%) |
|
|
|
|
|
### نمونههای برتر و ضعیف: |
|
|
- **بهترین F1-Score:** {df.loc[best_f1_idx, 'F1_Score']:.4f} (سطر {best_f1_idx + 1}) |
|
|
- **ضعیفترین F1-Score:** {df.loc[worst_f1_idx, 'F1_Score']:.4f} (سطر {worst_f1_idx + 1}) |
|
|
""" |
|
|
|
|
|
return report |
|
|
|
|
|
def create_downloadable_csv(self) -> bytes: |
|
|
"""ایجاد محتوای CSV برای دانلود مستقیم""" |
|
|
if self.results_df is None or self.results_df.empty: |
|
|
return None |
|
|
|
|
|
try: |
|
|
|
|
|
csv_buffer = io.StringIO() |
|
|
self.results_df.to_csv(csv_buffer, index=False, encoding='utf-8') |
|
|
csv_content = csv_buffer.getvalue() |
|
|
csv_buffer.close() |
|
|
|
|
|
|
|
|
return csv_content.encode('utf-8-sig') |
|
|
|
|
|
except Exception as e: |
|
|
print(f"خطا در ایجاد محتوای CSV: {str(e)}") |
|
|
return None |
|
|
|
|
|
def create_evaluation_interface(): |
|
|
"""ایجاد رابط کاربری ارزیابی""" |
|
|
evaluator = AnonymizationEvaluator() |
|
|
|
|
|
with gr.Blocks( |
|
|
title="ارزیابی ناشناسسازی", |
|
|
theme=gr.themes.Soft(), |
|
|
css=""" |
|
|
.gradio-container { |
|
|
font-family: 'Tahoma', 'Arial', sans-serif !important; |
|
|
direction: rtl; |
|
|
max-width: 1200px; |
|
|
margin: 0 auto; |
|
|
} |
|
|
.upload-area { |
|
|
border: 2px dashed #4CAF50; |
|
|
border-radius: 15px; |
|
|
padding: 30px; |
|
|
text-align: center; |
|
|
background: linear-gradient(145deg, #f8f9fa, #e9ecef); |
|
|
margin: 20px 0; |
|
|
} |
|
|
.results-table { |
|
|
direction: ltr; |
|
|
font-family: monospace; |
|
|
font-size: 12px; |
|
|
} |
|
|
.summary-box { |
|
|
background-color: #e3f2fd; |
|
|
border: 1px solid #2196F3; |
|
|
border-radius: 10px; |
|
|
padding: 20px; |
|
|
margin: 15px 0; |
|
|
} |
|
|
""" |
|
|
) as interface: |
|
|
|
|
|
gr.Markdown(""" |
|
|
# 📊 ابزار ارزیابی ناشناسسازی با متن مرجع |
|
|
### آپلود فایل CSV شامل ستونهای: original_text, Reference_text, anonymized_text |
|
|
""") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
gr.Markdown("### 📁 بارگذاری فایل") |
|
|
|
|
|
file_input = gr.File( |
|
|
label="انتخاب فایل CSV", |
|
|
file_types=[".csv"], |
|
|
elem_classes=["upload-area"] |
|
|
) |
|
|
|
|
|
evaluate_btn = gr.Button( |
|
|
"🚀 شروع ارزیابی", |
|
|
variant="primary", |
|
|
size="lg", |
|
|
interactive=False |
|
|
) |
|
|
|
|
|
download_btn = gr.Button( |
|
|
"💾 دانلود نتایج CSV", |
|
|
variant="secondary", |
|
|
visible=False |
|
|
) |
|
|
|
|
|
with gr.Column(scale=2): |
|
|
status_output = gr.Markdown("وضعیت: آماده بارگذاری فایل...") |
|
|
|
|
|
summary_output = gr.Markdown( |
|
|
visible=False, |
|
|
elem_classes=["summary-box"] |
|
|
) |
|
|
|
|
|
|
|
|
results_table = gr.Dataframe( |
|
|
label="نتایج تفصیلی (نمایش 10 سطر اول)", |
|
|
visible=False, |
|
|
elem_classes=["results-table"], |
|
|
wrap=True |
|
|
) |
|
|
|
|
|
|
|
|
download_file = gr.File( |
|
|
visible=False, |
|
|
label="فایل نتایج" |
|
|
) |
|
|
|
|
|
def on_file_upload(file): |
|
|
if file is None: |
|
|
return "❌ لطفاً فایل را انتخاب کنید", gr.Button(interactive=False) |
|
|
|
|
|
return "✅ فایل بارگذاری شد، آماده ارزیابی", gr.Button(interactive=True) |
|
|
|
|
|
def evaluate_file(file): |
|
|
if file is None: |
|
|
return ( |
|
|
"❌ هیچ فایلی انتخاب نشده", |
|
|
gr.Markdown(visible=False), |
|
|
gr.Dataframe(visible=False), |
|
|
gr.Button(visible=False), |
|
|
gr.File(visible=False) |
|
|
) |
|
|
|
|
|
try: |
|
|
success, message, df = evaluator.evaluate_dataset(file.name) |
|
|
|
|
|
if not success: |
|
|
return ( |
|
|
f"❌ {message}", |
|
|
gr.Markdown(visible=False), |
|
|
gr.Dataframe(visible=False), |
|
|
gr.Button(visible=False), |
|
|
gr.File(visible=False) |
|
|
) |
|
|
|
|
|
|
|
|
summary = evaluator.generate_summary_report(df) |
|
|
|
|
|
|
|
|
display_df = df.head(10) |
|
|
|
|
|
|
|
|
status_message = f"✅ {message} - {len(df)} سطر پردازش شد. نمایش: 10 سطر اول، دانلود: همه سطرها" |
|
|
|
|
|
return ( |
|
|
status_message, |
|
|
gr.Markdown(value=summary, visible=True), |
|
|
gr.Dataframe(value=display_df, visible=True), |
|
|
gr.Button(visible=True), |
|
|
gr.File(visible=False) |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
return ( |
|
|
f"❌ خطای غیرمنتظره: {str(e)}", |
|
|
gr.Markdown(visible=False), |
|
|
gr.Dataframe(visible=False), |
|
|
gr.Button(visible=False), |
|
|
gr.File(visible=False) |
|
|
) |
|
|
|
|
|
def download_results(): |
|
|
try: |
|
|
if evaluator.results_df is None or evaluator.results_df.empty: |
|
|
return ( |
|
|
"❌ هیچ دادهای برای دانلود وجود ندارد. ابتدا ارزیابی را انجام دهید.", |
|
|
gr.File(visible=False) |
|
|
) |
|
|
|
|
|
|
|
|
csv_content = evaluator.create_downloadable_csv() |
|
|
if csv_content: |
|
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
filename = f"evaluation_results_{timestamp}.csv" |
|
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(mode='wb', delete=False, |
|
|
suffix='.csv', prefix='eval_') as temp_file: |
|
|
temp_file.write(csv_content) |
|
|
temp_filename = temp_file.name |
|
|
|
|
|
return ( |
|
|
f"✅ فایل نتایج آماده شد: {filename} ({len(evaluator.results_df)} سطر)", |
|
|
gr.File(value=temp_filename, visible=True) |
|
|
) |
|
|
else: |
|
|
return ( |
|
|
"❌ خطا در ایجاد محتوای CSV", |
|
|
gr.File(visible=False) |
|
|
) |
|
|
except Exception as e: |
|
|
return ( |
|
|
f"❌ خطا در دانلود: {str(e)}", |
|
|
gr.File(visible=False) |
|
|
) |
|
|
|
|
|
|
|
|
file_input.change( |
|
|
fn=on_file_upload, |
|
|
inputs=[file_input], |
|
|
outputs=[status_output, evaluate_btn] |
|
|
) |
|
|
|
|
|
evaluate_btn.click( |
|
|
fn=evaluate_file, |
|
|
inputs=[file_input], |
|
|
outputs=[status_output, summary_output, results_table, download_btn, download_file] |
|
|
) |
|
|
|
|
|
download_btn.click( |
|
|
fn=download_results, |
|
|
outputs=[status_output, download_file] |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Accordion("📖 راهنمای استفاده", open=False): |
|
|
gr.Markdown(""" |
|
|
### فرمت فایل CSV مورد نیاز: |
|
|
|
|
|
فایل شما باید حاوی دقیقاً این سه ستون باشد: |
|
|
- **original_text**: متن اصلی |
|
|
- **Reference_text**: متن ناشناسشده مرجع (Ground Truth) |
|
|
- **anonymized_text**: متن ناشناسشده مورد ارزیابی |
|
|
|
|
|
### 🆕 روش جدید ارزیابی: بر اساس نوع موجودیتها |
|
|
|
|
|
برنامه حالا **بدون در نظر گیری شناسههای عددی** ارزیابی میکند: |
|
|
|
|
|
#### مثال: |
|
|
- **Reference**: `company-01 amount-02 person-03` |
|
|
- **Predicted**: `company-05 amount-08 person-01` |
|
|
- **نتیجه**: تطبیق کامل! (چون هر دو دارای 1 company + 1 amount + 1 person هستند) |
|
|
|
|
|
#### مزایای این روش: |
|
|
- ✅ نادیده گرفتن شناسههای عددی نامنظم |
|
|
- ✅ تمرکز روی نوع و تعداد موجودیتها |
|
|
- ✅ عملیتر برای ارزیابی کیفیت ناشناسسازی |
|
|
|
|
|
### متریکهای محاسبه شده: |
|
|
|
|
|
- **Precision**: (تعداد موجودیتهای صحیح) / (کل موجودیتهای شناسایی شده) |
|
|
- **Recall**: (تعداد موجودیتهای صحیح) / (کل موجودیتهای مرجع) |
|
|
- **F1-Score**: میانگین هارمونیک Precision و Recall |
|
|
|
|
|
### انواع موجودیتهای پشتیبانی شده: |
|
|
|
|
|
- `company-XX`, `Company-XX`, `COMPANY-XX` → شرکتها |
|
|
- `person-XX`, `Person-XX`, `PERSON-XX` → اشخاص |
|
|
- `amount-XX`, `Amount-XX`, `AMOUNT-XX` → مبالغ و اعداد |
|
|
- `percent-XX`, `Percent-XX`, `PERCENT-XX` → درصدها |
|
|
- `group-XX`, `Group-XX`, `GROUP-XX` → گروهها |
|
|
|
|
|
### مراحل کار: |
|
|
|
|
|
1. فایل CSV را آپلود کنید |
|
|
2. روی "شروع ارزیابی" کلیک کنید |
|
|
3. گزارش خلاصه و جدول نمونه (10 سطر اول) را مشاهده کنید |
|
|
4. **فایل نتایج کامل (همه سطرها) را دانلود کنید** |
|
|
|
|
|
### نکات مهم: |
|
|
|
|
|
- **نمایش رابط**: فقط 10 سطر اول نمایش داده میشود |
|
|
- **فایل دانلود**: شامل تمام سطرهای پردازش شده + متریکها |
|
|
- فایل خروجی شامل ستونهای اصلی + سه ستون متریک خواهد بود |
|
|
- متریکها برای هر سطر جداگانه محاسبه میشوند |
|
|
- آمار کلی در گزارش خلاصه نمایش داده میشود |
|
|
|
|
|
### مشکل در دانلود؟ |
|
|
|
|
|
اگر فایل دانلود نمیشود: |
|
|
1. مرورگر خود را رفرش کنید |
|
|
2. مجدداً ارزیابی را انجام دهید |
|
|
3. اطمینان حاصل کنید که popup blocker غیرفعال است |
|
|
4. از تست دانلود در پایین استفاده کنید |
|
|
""") |
|
|
|
|
|
|
|
|
with gr.Accordion("💡 مثال عملی روش جدید", open=False): |
|
|
gr.Markdown(""" |
|
|
### روش قدیم vs جدید: |
|
|
|
|
|
**متن مرجع**: `شرکت company-01 با سرمایه amount-02 میلیارد تومان و سهم percent-03 از بازار` |
|
|
|
|
|
**متن پیشبینی شده**: `شرکت company-10 با سرمایه amount-50 میلیارد تومان و سهم percent-25 از بازار` |
|
|
|
|
|
#### روش قدیم (تطبیق دقیق شناسه): |
|
|
- کل موجودیتهای مرجع: company-01, amount-02, percent-03 |
|
|
- کل موجودیتهای پیشبینی: company-10, amount-50, percent-25 |
|
|
- تطبیق: صفر! (چون شناسهها متفاوت است) |
|
|
- نتیجه: Precision=0, Recall=0, F1=0 |
|
|
|
|
|
#### روش جدید (بر اساس نوع): |
|
|
- انواع موجودیتهای مرجع: 1 company, 1 amount, 1 percent |
|
|
- انواع موجودیتهای پیشبینی: 1 company, 1 amount, 1 percent |
|
|
- تطبیق: کامل! (تعداد و نوع یکسان است) |
|
|
- نتیجه: Precision=1.0, Recall=1.0, F1=1.0 |
|
|
|
|
|
این روش جدید خیلی عادلانهتر است! 🎯 |
|
|
""") |
|
|
|
|
|
|
|
|
return interface |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
interface = create_evaluation_interface() |
|
|
interface.launch() |