|
|
import pandas as pd |
|
|
import numpy as np |
|
|
import re |
|
|
from typing import Dict, List, Tuple, Set |
|
|
import gradio as gr |
|
|
from datetime import datetime |
|
|
import io |
|
|
import tempfile |
|
|
import os |
|
|
|
|
|
class AnonymizationEvaluator: |
|
|
"""ابزار ارزیابی ناشناسسازی با استفاده از متن مرجع""" |
|
|
|
|
|
def __init__(self): |
|
|
self.results_df = None |
|
|
|
|
|
def extract_entities_from_text(self, text: str) -> Dict[str, Set[str]]: |
|
|
"""استخراج موجودیتها از متن با debugging""" |
|
|
if pd.isna(text) or not isinstance(text, str): |
|
|
return {'companies': set(), 'persons': set(), 'amounts': set(), 'percents': set(), 'groups': set()} |
|
|
|
|
|
|
|
|
text = str(text).strip() |
|
|
|
|
|
|
|
|
patterns = { |
|
|
'companies': [r'company-(\d+)', r'Company-(\d+)', r'COMPANY-(\d+)'], |
|
|
'persons': [r'person-(\d+)', r'Person-(\d+)', r'PERSON-(\d+)'], |
|
|
'amounts': [r'amount-(\d+)', r'Amount-(\d+)', r'AMOUNT-(\d+)'], |
|
|
'percents': [r'percent-(\d+)', r'Percent-(\d+)', r'PERCENT-(\d+)'], |
|
|
'groups': [r'group-(\d+)', r'Group-(\d+)', r'GROUP-(\d+)'] |
|
|
} |
|
|
|
|
|
entities = {} |
|
|
for entity_type, pattern_list in patterns.items(): |
|
|
found = set() |
|
|
for pattern in pattern_list: |
|
|
matches = re.findall(pattern, text) |
|
|
found.update(matches) |
|
|
entities[entity_type] = found |
|
|
|
|
|
return entities |
|
|
|
|
|
def debug_text_analysis(self, reference_text: str, predicted_text: str, row_num: int = 0) -> str: |
|
|
"""تابع debugging برای تحلیل متنها""" |
|
|
debug_info = f"\n--- Debug Row {row_num + 1} ---\n" |
|
|
debug_info += f"Reference: '{reference_text[:100]}...'\n" |
|
|
debug_info += f"Predicted: '{predicted_text[:100]}...'\n" |
|
|
|
|
|
ref_entities = self.extract_entities_from_text(reference_text) |
|
|
pred_entities = self.extract_entities_from_text(predicted_text) |
|
|
|
|
|
debug_info += f"Reference entities: {dict(ref_entities)}\n" |
|
|
debug_info += f"Predicted entities: {dict(pred_entities)}\n" |
|
|
|
|
|
return debug_info |
|
|
|
|
|
def calculate_precision_recall_f1(self, reference_entities: Dict[str, Set[str]], |
|
|
predicted_entities: Dict[str, Set[str]]) -> Tuple[float, float, float]: |
|
|
"""محاسبه Precision, Recall و F1-Score""" |
|
|
|
|
|
|
|
|
ref_all = set() |
|
|
pred_all = set() |
|
|
|
|
|
for entity_type in ['companies', 'persons', 'amounts', 'percents', 'groups']: |
|
|
|
|
|
ref_entities = {f"{entity_type}:{e}" for e in reference_entities.get(entity_type, set())} |
|
|
pred_entities = {f"{entity_type}:{e}" for e in predicted_entities.get(entity_type, set())} |
|
|
|
|
|
ref_all.update(ref_entities) |
|
|
pred_all.update(pred_entities) |
|
|
|
|
|
if len(pred_all) == 0 and len(ref_all) == 0: |
|
|
return 1.0, 1.0, 1.0 |
|
|
elif len(pred_all) == 0: |
|
|
return 0.0, 0.0, 0.0 |
|
|
elif len(ref_all) == 0: |
|
|
return 0.0, 1.0, 0.0 |
|
|
|
|
|
|
|
|
true_positive = len(ref_all.intersection(pred_all)) |
|
|
|
|
|
|
|
|
precision = true_positive / len(pred_all) if len(pred_all) > 0 else 0.0 |
|
|
recall = true_positive / len(ref_all) if len(ref_all) > 0 else 0.0 |
|
|
|
|
|
|
|
|
if precision + recall == 0: |
|
|
f1 = 0.0 |
|
|
else: |
|
|
f1 = 2 * (precision * recall) / (precision + recall) |
|
|
|
|
|
return precision, recall, f1 |
|
|
|
|
|
def calculate_accuracy(self, reference_text: str, predicted_text: str) -> float: |
|
|
"""محاسبه Accuracy بر اساس تطابق کامل موجودیتها""" |
|
|
ref_entities = self.extract_entities_from_text(reference_text) |
|
|
pred_entities = self.extract_entities_from_text(predicted_text) |
|
|
|
|
|
|
|
|
ref_total = sum(len(entities) for entities in ref_entities.values()) |
|
|
|
|
|
if ref_total == 0: |
|
|
return 1.0 if sum(len(entities) for entities in pred_entities.values()) == 0 else 0.0 |
|
|
|
|
|
|
|
|
correct = 0 |
|
|
for entity_type in ref_entities.keys(): |
|
|
correct += len(ref_entities[entity_type].intersection(pred_entities[entity_type])) |
|
|
|
|
|
return correct / ref_total |
|
|
|
|
|
def evaluate_single_row(self, reference_text: str, predicted_text: str) -> Tuple[float, float, float]: |
|
|
"""ارزیابی یک سطر""" |
|
|
try: |
|
|
|
|
|
ref_entities = self.extract_entities_from_text(reference_text) |
|
|
pred_entities = self.extract_entities_from_text(predicted_text) |
|
|
|
|
|
|
|
|
precision, recall, f1 = self.calculate_precision_recall_f1(ref_entities, pred_entities) |
|
|
|
|
|
return precision, recall, f1 |
|
|
|
|
|
except Exception as e: |
|
|
print(f"خطا در ارزیابی: {str(e)}") |
|
|
return 0.0, 0.0, 0.0 |
|
|
|
|
|
def evaluate_dataset(self, file_path: str) -> Tuple[bool, str, pd.DataFrame]: |
|
|
"""ارزیابی کل دیتاست با debugging""" |
|
|
try: |
|
|
|
|
|
df = pd.read_csv(file_path) |
|
|
|
|
|
|
|
|
required_columns = ['original_text', 'Reference_text', 'anonymized_text'] |
|
|
missing_columns = [col for col in required_columns if col not in df.columns] |
|
|
|
|
|
if missing_columns: |
|
|
return False, f"ستونهای مفقود: {', '.join(missing_columns)}", pd.DataFrame() |
|
|
|
|
|
|
|
|
debug_info = "\n=== Debug Information ===\n" |
|
|
debug_info += f"تعداد سطرها: {len(df)}\n" |
|
|
debug_info += f"ستونها: {list(df.columns)}\n\n" |
|
|
|
|
|
|
|
|
for i in range(min(3, len(df))): |
|
|
ref_text = str(df.iloc[i]['Reference_text']) |
|
|
anon_text = str(df.iloc[i]['anonymized_text']) |
|
|
|
|
|
debug_info += self.debug_text_analysis(ref_text, anon_text, i) |
|
|
|
|
|
print(debug_info) |
|
|
|
|
|
|
|
|
precisions = [] |
|
|
recalls = [] |
|
|
f1_scores = [] |
|
|
|
|
|
total_entities_found = 0 |
|
|
|
|
|
for index, row in df.iterrows(): |
|
|
precision, recall, f1 = self.evaluate_single_row( |
|
|
row['Reference_text'], |
|
|
row['anonymized_text'] |
|
|
) |
|
|
|
|
|
precisions.append(round(precision, 4)) |
|
|
recalls.append(round(recall, 4)) |
|
|
f1_scores.append(round(f1, 4)) |
|
|
|
|
|
|
|
|
ref_entities = self.extract_entities_from_text(str(row['Reference_text'])) |
|
|
pred_entities = self.extract_entities_from_text(str(row['anonymized_text'])) |
|
|
total_entities_found += sum(len(entities) for entities in ref_entities.values()) |
|
|
total_entities_found += sum(len(entities) for entities in pred_entities.values()) |
|
|
|
|
|
|
|
|
df['Precision'] = precisions |
|
|
df['Recall'] = recalls |
|
|
df['F1_Score'] = f1_scores |
|
|
|
|
|
|
|
|
self.results_df = df |
|
|
|
|
|
|
|
|
status_message = f"ارزیابی انجام شد. کل موجودیتهای یافت شده: {total_entities_found}" |
|
|
if total_entities_found == 0: |
|
|
status_message += "\n⚠️ هیچ موجودیتی تشخیص داده نشد! لطفاً فرمت دادهها را بررسی کنید." |
|
|
|
|
|
return True, status_message, df |
|
|
|
|
|
except Exception as e: |
|
|
return False, f"خطا در پردازش فایل: {str(e)}", pd.DataFrame() |
|
|
|
|
|
def generate_summary_report(self, df: pd.DataFrame) -> str: |
|
|
"""تولید گزارش خلاصه""" |
|
|
if df.empty: |
|
|
return "هیچ دادهای برای گزارش یافت نشد" |
|
|
|
|
|
|
|
|
avg_precision = df['Precision'].mean() |
|
|
avg_recall = df['Recall'].mean() |
|
|
avg_f1 = df['F1_Score'].mean() |
|
|
|
|
|
|
|
|
total_rows = len(df) |
|
|
high_precision_count = len(df[df['Precision'] >= 0.8]) |
|
|
high_recall_count = len(df[df['Recall'] >= 0.8]) |
|
|
high_f1_count = len(df[df['F1_Score'] >= 0.8]) |
|
|
|
|
|
|
|
|
best_f1_idx = df['F1_Score'].idxmax() |
|
|
worst_f1_idx = df['F1_Score'].idxmin() |
|
|
|
|
|
report = f""" |
|
|
## 📊 گزارش جامع ارزیابی |
|
|
|
|
|
### آمار کلی: |
|
|
- **تعداد کل سطرها:** {total_rows} |
|
|
- **میانگین Precision:** {avg_precision:.4f} |
|
|
- **میانگین Recall:** {avg_recall:.4f} |
|
|
- **میانگین F1-Score:** {avg_f1:.4f} |
|
|
|
|
|
### توزیع عملکرد (امتیاز ≥ 0.8): |
|
|
- **Precision بالا:** {high_precision_count} سطر ({high_precision_count/total_rows*100:.1f}%) |
|
|
- **Recall بالا:** {high_recall_count} سطر ({high_recall_count/total_rows*100:.1f}%) |
|
|
- **F1-Score بالا:** {high_f1_count} سطر ({high_f1_count/total_rows*100:.1f}%) |
|
|
|
|
|
### نمونههای برتر و ضعیف: |
|
|
- **بهترین F1-Score:** {df.loc[best_f1_idx, 'F1_Score']:.4f} (سطر {best_f1_idx + 1}) |
|
|
- **ضعیفترین F1-Score:** {df.loc[worst_f1_idx, 'F1_Score']:.4f} (سطر {worst_f1_idx + 1}) |
|
|
""" |
|
|
|
|
|
return report |
|
|
|
|
|
def create_downloadable_csv(self) -> bytes: |
|
|
"""ایجاد محتوای CSV برای دانلود مستقیم""" |
|
|
if self.results_df is None or self.results_df.empty: |
|
|
return None |
|
|
|
|
|
try: |
|
|
|
|
|
csv_buffer = io.StringIO() |
|
|
self.results_df.to_csv(csv_buffer, index=False, encoding='utf-8') |
|
|
csv_content = csv_buffer.getvalue() |
|
|
csv_buffer.close() |
|
|
|
|
|
|
|
|
return csv_content.encode('utf-8-sig') |
|
|
|
|
|
except Exception as e: |
|
|
print(f"خطا در ایجاد محتوای CSV: {str(e)}") |
|
|
return None |
|
|
|
|
|
def create_evaluation_interface(): |
|
|
"""ایجاد رابط کاربری ارزیابی""" |
|
|
evaluator = AnonymizationEvaluator() |
|
|
|
|
|
with gr.Blocks( |
|
|
title="ارزیابی ناشناسسازی", |
|
|
theme=gr.themes.Soft(), |
|
|
css=""" |
|
|
.gradio-container { |
|
|
font-family: 'Tahoma', 'Arial', sans-serif !important; |
|
|
direction: rtl; |
|
|
max-width: 1200px; |
|
|
margin: 0 auto; |
|
|
} |
|
|
.upload-area { |
|
|
border: 2px dashed #4CAF50; |
|
|
border-radius: 15px; |
|
|
padding: 30px; |
|
|
text-align: center; |
|
|
background: linear-gradient(145deg, #f8f9fa, #e9ecef); |
|
|
margin: 20px 0; |
|
|
} |
|
|
.results-table { |
|
|
direction: ltr; |
|
|
font-family: monospace; |
|
|
font-size: 12px; |
|
|
} |
|
|
.summary-box { |
|
|
background-color: #e3f2fd; |
|
|
border: 1px solid #2196F3; |
|
|
border-radius: 10px; |
|
|
padding: 20px; |
|
|
margin: 15px 0; |
|
|
} |
|
|
""" |
|
|
) as interface: |
|
|
|
|
|
gr.Markdown(""" |
|
|
# 📊 ابزار ارزیابی ناشناسسازی با متن مرجع |
|
|
### آپلود فایل CSV شامل ستونهای: original_text, Reference_text, anonymized_text |
|
|
""") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
gr.Markdown("### 📁 بارگذاری فایل") |
|
|
|
|
|
file_input = gr.File( |
|
|
label="انتخاب فایل CSV", |
|
|
file_types=[".csv"], |
|
|
elem_classes=["upload-area"] |
|
|
) |
|
|
|
|
|
evaluate_btn = gr.Button( |
|
|
"🚀 شروع ارزیابی", |
|
|
variant="primary", |
|
|
size="lg", |
|
|
interactive=False |
|
|
) |
|
|
|
|
|
download_btn = gr.Button( |
|
|
"💾 دانلود نتایج CSV", |
|
|
variant="secondary", |
|
|
visible=False |
|
|
) |
|
|
|
|
|
with gr.Column(scale=2): |
|
|
status_output = gr.Markdown("وضعیت: آماده بارگذاری فایل...") |
|
|
|
|
|
summary_output = gr.Markdown( |
|
|
visible=False, |
|
|
elem_classes=["summary-box"] |
|
|
) |
|
|
|
|
|
|
|
|
results_table = gr.Dataframe( |
|
|
label="نتایج تفصیلی (نمایش 10 سطر اول)", |
|
|
visible=False, |
|
|
elem_classes=["results-table"], |
|
|
wrap=True |
|
|
) |
|
|
|
|
|
|
|
|
download_file = gr.File( |
|
|
visible=False, |
|
|
label="فایل نتایج" |
|
|
) |
|
|
|
|
|
def on_file_upload(file): |
|
|
if file is None: |
|
|
return "❌ لطفاً فایل را انتخاب کنید", gr.Button(interactive=False) |
|
|
|
|
|
return "✅ فایل بارگذاری شد، آماده ارزیابی", gr.Button(interactive=True) |
|
|
|
|
|
def evaluate_file(file): |
|
|
if file is None: |
|
|
return ( |
|
|
"❌ هیچ فایلی انتخاب نشده", |
|
|
gr.Markdown(visible=False), |
|
|
gr.Dataframe(visible=False), |
|
|
gr.Button(visible=False), |
|
|
gr.File(visible=False) |
|
|
) |
|
|
|
|
|
try: |
|
|
success, message, df = evaluator.evaluate_dataset(file.name) |
|
|
|
|
|
if not success: |
|
|
return ( |
|
|
f"❌ {message}", |
|
|
gr.Markdown(visible=False), |
|
|
gr.Dataframe(visible=False), |
|
|
gr.Button(visible=False), |
|
|
gr.File(visible=False) |
|
|
) |
|
|
|
|
|
|
|
|
summary = evaluator.generate_summary_report(df) |
|
|
|
|
|
|
|
|
display_df = df.head(10) |
|
|
|
|
|
|
|
|
status_message = f"✅ {message} - {len(df)} سطر پردازش شد. نمایش: 10 سطر اول، دانلود: همه سطرها" |
|
|
|
|
|
return ( |
|
|
status_message, |
|
|
gr.Markdown(value=summary, visible=True), |
|
|
gr.Dataframe(value=display_df, visible=True), |
|
|
gr.Button(visible=True), |
|
|
gr.File(visible=False) |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
return ( |
|
|
f"❌ خطای غیرمنتظره: {str(e)}", |
|
|
gr.Markdown(visible=False), |
|
|
gr.Dataframe(visible=False), |
|
|
gr.Button(visible=False), |
|
|
gr.File(visible=False) |
|
|
) |
|
|
|
|
|
def download_results(): |
|
|
try: |
|
|
if evaluator.results_df is None or evaluator.results_df.empty: |
|
|
return ( |
|
|
"❌ هیچ دادهای برای دانلود وجود ندارد. ابتدا ارزیابی را انجام دهید.", |
|
|
gr.File(visible=False) |
|
|
) |
|
|
|
|
|
|
|
|
csv_content = evaluator.create_downloadable_csv() |
|
|
if csv_content: |
|
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
filename = f"evaluation_results_{timestamp}.csv" |
|
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(mode='wb', delete=False, |
|
|
suffix='.csv', prefix='eval_') as temp_file: |
|
|
temp_file.write(csv_content) |
|
|
temp_filename = temp_file.name |
|
|
|
|
|
return ( |
|
|
f"✅ فایل نتایج آماده شد: {filename} ({len(evaluator.results_df)} سطر)", |
|
|
gr.File(value=temp_filename, visible=True) |
|
|
) |
|
|
else: |
|
|
return ( |
|
|
"❌ خطا در ایجاد محتوای CSV", |
|
|
gr.File(visible=False) |
|
|
) |
|
|
except Exception as e: |
|
|
return ( |
|
|
f"❌ خطا در دانلود: {str(e)}", |
|
|
gr.File(visible=False) |
|
|
) |
|
|
|
|
|
|
|
|
file_input.change( |
|
|
fn=on_file_upload, |
|
|
inputs=[file_input], |
|
|
outputs=[status_output, evaluate_btn] |
|
|
) |
|
|
|
|
|
evaluate_btn.click( |
|
|
fn=evaluate_file, |
|
|
inputs=[file_input], |
|
|
outputs=[status_output, summary_output, results_table, download_btn, download_file] |
|
|
) |
|
|
|
|
|
download_btn.click( |
|
|
fn=download_results, |
|
|
outputs=[status_output, download_file] |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Accordion("📖 راهنمای استفاده", open=False): |
|
|
gr.Markdown(""" |
|
|
### فرمت فایل CSV مورد نیاز: |
|
|
|
|
|
فایل شما باید حاوی دقیقاً این سه ستون باشد: |
|
|
- **original_text**: متن اصلی |
|
|
- **Reference_text**: متن ناشناسشده مرجع (Ground Truth) |
|
|
- **anonymized_text**: متن ناشناسشده مورد ارزیابی |
|
|
|
|
|
### متریکهای محاسبه شده: |
|
|
|
|
|
- **Precision**: دقت = (تعداد موجودیتهای صحیح شناسایی شده) / (کل موجودیتهای شناسایی شده) |
|
|
- **Recall**: بازیابی = (تعداد موجودیتهای صحیح شناسایی شده) / (کل موجودیتهای مرجع) |
|
|
- **F1-Score**: میانگین هارمونیک Precision و Recall |
|
|
|
|
|
### مراحل کار: |
|
|
|
|
|
1. فایل CSV را آپلود کنید |
|
|
2. روی "شروع ارزیابی" کلیک کنید |
|
|
3. گزارش خلاصه و جدول نمونه (10 سطر اول) را مشاهده کنید |
|
|
4. **فایل نتایج کامل (همه سطرها) را دانلود کنید** |
|
|
|
|
|
### نکات مهم: |
|
|
|
|
|
- **نمایش رابط**: فقط 10 سطر اول نمایش داده میشود |
|
|
- **فایل دانلود**: شامل تمام سطرهای پردازش شده + متریکها |
|
|
- فایل خروجی شامل ستونهای اصلی + سه ستون متریک خواهد بود |
|
|
- متریکها برای هر سطر جداگانه محاسبه میشوند |
|
|
- آمار کلی در گزارش خلاصه نمایش داده میشود |
|
|
|
|
|
### مشکل در دانلود؟ |
|
|
|
|
|
اگر فایل دانلود نمیشود: |
|
|
1. مرورگر خود را رفرش کنید |
|
|
2. مجدداً ارزیابی را انجام دهید |
|
|
3. اطمینان حاصل کنید که popup blocker غیرفعال است |
|
|
""") |
|
|
|
|
|
|
|
|
with gr.Accordion("🧪 تست دانلود", open=False): |
|
|
gr.Markdown("برای تست عملکرد دانلود:") |
|
|
test_download_btn = gr.Button("تست دانلود فایل نمونه") |
|
|
test_file_output = gr.File(label="فایل تست", visible=False) |
|
|
|
|
|
def create_test_file(): |
|
|
"""ایجاد فایل تست برای بررسی دانلود""" |
|
|
try: |
|
|
test_data = { |
|
|
'original_text': ['متن تست 1', 'متن تست 2'], |
|
|
'Reference_text': ['company-01 amount-01', 'person-01 amount-02'], |
|
|
'anonymized_text': ['company-01 amount-01', 'person-01 amount-02'], |
|
|
'Precision': [1.0, 1.0], |
|
|
'Recall': [1.0, 1.0], |
|
|
'F1_Score': [1.0, 1.0] |
|
|
} |
|
|
test_df = pd.DataFrame(test_data) |
|
|
|
|
|
|
|
|
csv_buffer = io.StringIO() |
|
|
test_df.to_csv(csv_buffer, index=False) |
|
|
csv_content = csv_buffer.getvalue() |
|
|
csv_buffer.close() |
|
|
|
|
|
|
|
|
csv_bytes = csv_content.encode('utf-8-sig') |
|
|
with tempfile.NamedTemporaryFile(mode='wb', delete=False, |
|
|
suffix='.csv', prefix='test_') as temp_file: |
|
|
temp_file.write(csv_bytes) |
|
|
temp_filename = temp_file.name |
|
|
|
|
|
return gr.File(value=temp_filename, visible=True) |
|
|
except Exception as e: |
|
|
print(f"خطا در ایجاد فایل تست: {str(e)}") |
|
|
return gr.File(visible=False) |
|
|
|
|
|
test_download_btn.click( |
|
|
fn=create_test_file, |
|
|
outputs=[test_file_output] |
|
|
) |
|
|
|
|
|
return interface |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
interface = create_evaluation_interface() |
|
|
interface.launch() |