#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ سیستم benchmark ناشناس‌سازی - ورژن آپدیت شده با الگوهای جامع و دقیق """ import pandas as pd import re import json import logging import os import gradio as gr from typing import Dict, List, Tuple, Set from collections import defaultdict import numpy as np # تنظیم logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # ===== تابع کمکی برای تبدیل numpy/pandas types ===== def convert_to_serializable(obj): if isinstance(obj, (np.integer, np.int64, np.int32)): return int(obj) elif isinstance(obj, (np.floating, np.float64, np.float32)): return float(obj) elif isinstance(obj, np.ndarray): return obj.tolist() elif isinstance(obj, dict): return {key: convert_to_serializable(value) for key, value in obj.items()} elif isinstance(obj, list): return [convert_to_serializable(item) for item in obj] else: return obj # ===== کلاس پردازش entities با الگوهای آپدیت شده ===== class UpdatedEntityExtractor: def __init__(self): # الگوهای آپدیت شده براساس سیستم ناشناس‌سازی بهبود یافته self.patterns = { # آدرس‌های کامل - اولویت بالا با پوشش میدان و برج 'FULL_ADDRESS': [ # الگوی آدرس کامل: شهر + میدان + برج + طبقه + واحد r'(?:تهران|اصفهان|مشهد|شیراز|کرج|اهواز|قم|رشت|کرمان|یزد|بوشهر|ارومیه|همدان|بندر عباس|ساری|اردبیل|خرم‌آباد|ایلام|بیرجند|گرگان|زنجان|سنندج|شهرکرد|سبزوار|قزوین|زاهدان|خوی|مراغه|کاشان|نجف‌آباد|شاهین‌شهر|ملایر|آبادان|دزفول|بابل|آمل|شاهرود|گنبد کاووس|خرمشهر|جهرم|فسا|مرودشت|لار|داراب|فیروزآباد|کازرون|سپیدان|نی‌ریز|استهبان|فارسان|میانه|ورامین|قرچک|ری|پاکدشت|دماوند|فیروزکوه|شهریار|اسلام‌شهر|ملارد|قدس|بهارستان|چهاردانگه)،\s*(?:میدان|خیابان|کوچه|شهرک|بلوار|کوی|محله)\s+[آ-ی‌ّٰ-ٹ\s]+(?:،\s*(?:برج|ساختمان|مجتمع)\s+[آ-ی‌ّٰ-ٹ\s]+)?(?:،\s*(?:طبقه|واحد)\s+[آ-ی‌ّٰ-ٹ\d\s]+)?(?:،\s*واحد\s+[آ-ی‌ّٰ-ٹ\d\s]+)?', # الگوی آدرس کامل: شهر + خیابان + کوچه + پلاک + طبقه r'(?:تهران|اصفهان|مشهد|شیراز|کرج|اهواز|قم|رشت|کرمان|یزد|بوشهر|ارومیه|همدان|بندر عباس|ساری|اردبیل|خرم‌آباد|ایلام|بیرجند|گرگان|زنجان|سنندج|شهرکرد|سبزوار|قزوین|زاهدان|خوی|مراغه|کاشان|نجف‌آباد|شاهین‌شهر|ملایر|آبادان|دزفول|بابل|آمل|شاهرود|گنبد کاووس|خرمشهر|جهرم|فسا|مرودشت|لار|داراب|فیروزآباد|کازرون|سپیدان|نی‌ریز|استهبان|فارسان|میانه|ورامین|قرچک|ری|پاکدشت|دماوند|فیروزکوه|شهریار|اسلام‌شهر|ملارد|قدس|بهارستان|چهاردانگه)،\s*(?:خیابان|کوچه|شهرک|بلوار|میدان|کوی|محله)\s+[آ-ی‌ّٰ-ٹ\s]+(?:،\s*(?:خیابان|کوچه|بلوار|کوی)\s+[آ-ی‌ّٰ-ٹ\s]+)?(?:،\s*پلاک\s+\d+)?(?:،\s*(?:طبقه|واحد)\s+[آ-ی‌ّٰ-ٹ\d\s]+)?', # الگوی آدرس با شهرک r'(?:تهران|اصفهان|مشهد|شیراز|کرج|اهواز|قم|رشت|کرمان|یزد|بوشهر|ارومیه|همدان|بندر عباس|ساری|اردبیل|خرم‌آباد|ایلام|بیرجند|گرگان|زنجان|سنندج|شهرکرد|سبزوار|قزوین|زاهدان|خوی|مراغه|کاشان|نجف‌آباد|شاهین‌شهر|ملایر|آبادان|دزفول|بابل|آمل|شاهرود|گنبد کاووس|خرمشهر|جهرم|فسا|مرودشت|لار|داراب|فیروزآباد|کازرون|سپیدان|نی‌ریز|استهبان|فارسان|میانه|ورامین|قرچک|ری|پاکدشت|دماوند|فیروزکوه|شهریار|اسلام‌شهر|ملارد|قدس|بهارستان|چهاردانگه)،\s*شهرک\s+[آ-ی‌ّٰ-ٹ\s]+،\s*(?:خیابان|کوچه|بلوار)\s+[آ-ی‌ّٰ-ٹ\s]+(?:،\s*پلاک\s+\d+)?', # الگوی ساده‌تر برای آدرس‌های کوتاه‌تر r'خیابان\s+[آ-ی‌ّٰ-ٹ\s]+،\s*کوچه\s+[آ-ی‌ّٰ-ٹ\s]+،\s*پلاک\s+\d+(?:،\s*(?:طبقه|واحد)\s+[آ-ی‌ّٰ-ٹ\d\s]+)?', ], # اسامی اشخاص - الگوهای دقیق‌تر شامل خانم 'PERSON': [ r'آقای\s+[آ-ی‌ّٰ-ٹ]+\s+[آ-ی‌ّٰ-ٹ]+(?=\s+با\s+کد|\s+مدیر|$|،|\.)', r'خانم\s+[آ-ی‌ّٰ-ٹ]+\s+[آ-ی‌ّٰ-ٹ]+(?=\s+با\s+کد|\s+با\s+موبایل|$|،|\.)', r'مهندس\s+[آ-ی‌ّٰ-ٹ]+\s+[آ-ی‌ّٰ-ٹ]+(?=\s+با\s+کد|$|،|\.)', r'دکتر\s+[آ-ی‌ّٰ-ٹ]+\s+[آ-ی‌ّٰ-ٹ]+(?=\s+با\s+کد|$|،|\.)', r'مدیر\s+مالی\s+خانم\s+[آ-ی‌ّٰ-ٹ]+\s+[آ-ی‌ّٰ-ٹ]+', r'مدیرعامل\s+[آ-ی‌ّٰ-ٹ]+\s+[آ-ی‌ّٰ-ٹ]+', r'Mr\.\s+[A-Z][a-z]+\s+[A-Z][a-z]+(?=\s|,|\.|$)', r'Ms\.\s+[A-Z][a-z]+\s+[A-Z][a-z]+(?=\s|,|\.|$)', r'Dr\.\s+[A-Z][a-z]+\s+[A-Z][a-z]+(?=\s|,|\.|$)', ], # کدهای ملی و شناسه‌ها - جداسازی از شماره تلفن 'ID_NUMBER': [ r'کد\s+ملی\s+\d{10}', r'شناسه\s+ملی\s+\d{11}', r'(? 100: return False # بررسی کلمات عمومی if self.is_generic_word(text): return False # کلمات ممنوع که نباید entity باشند forbidden_words = [ 'شد', 'کرد', 'است', 'بود', 'در', 'که', 'با', 'از', 'به', 'را', 'و', 'یا', 'شده', 'نموده', 'صادر', 'ارائه', 'معرفی', 'برگزار', 'مطرح', 'واقع' ] if text.lower().strip() in forbidden_words: return False # بررسی‌های خاص برای هر category if category == 'COMPANY': # نباید شامل فعل یا کلمات اضافی باشد if any(word in text.lower() for word in ['برگزار', 'مطرح', 'شد', 'است', 'نموده']): return False # باید حداقل یک اسم خاص داشته باشد if text.strip() in ['شرکت', 'بانک', 'شرکت در', 'بانک در']: return False elif category == 'LOCATION': # نباید شامل فعل باشد if any(word in text.lower() for word in ['برگزار', 'شد', 'است', 'واقع']): return False # باید نام مکان واقعی باشد if text.strip() in ['شهر', 'بندر', 'استان']: return False elif category == 'DATE': # نباید عبارات طولانی باشد if 'سال مالی' in text: return False elif category == 'PERSON': # نباید فقط عنوان باشد if text.strip() in ['آقای', 'خانم', 'مهندس', 'دکتر']: return False return True def extract_entities(self, text): """استخراج entities با دقت بالا و اولویت‌بندی""" if not text or text.strip() == '': return {} entities = {} processed_positions = set() # پردازش براساس اولویت for category in self.priority_order: if category not in self.patterns: continue pattern_list = self.patterns[category] found_entities = [] for pattern_str in pattern_list: try: pattern = re.compile(pattern_str, re.IGNORECASE | re.MULTILINE) matches = pattern.finditer(text) for match in matches: # بررسی تداخل با entities قبلی match_start, match_end = match.span() overlaps = False for proc_start, proc_end in processed_positions: if not (match_end <= proc_start or match_start >= proc_end): overlaps = True break if not overlaps: entity = self.clean_entity(match.group(0)) if self.is_valid_entity(entity, category): found_entities.append(entity) processed_positions.add((match_start, match_end)) except re.error as e: logger.error(f"Regex error in pattern {pattern_str}: {e}") continue # حذف تکراری‌ها و مرتب‌سازی if found_entities: # حذف entities که زیرمجموعه entities دیگر هستند unique_entities = [] for entity in found_entities: is_subset = False for other in found_entities: if entity != other and entity in other: is_subset = True break if not is_subset: unique_entities.append(entity) entities[category] = sorted(list(set(unique_entities))) return entities def extract_anonymized_codes(self, text): """استخراج کدهای ناشناس‌سازی""" if not text or text.strip() == '': return {} codes = {} # الگو براساس سیستم ناشناس‌سازی: category_number pattern = r'([a-zA-Z_]+)_(\d{3})' try: matches = re.finditer(pattern, text, re.IGNORECASE) for match in matches: category = match.group(1).upper() code = match.group(0) if category not in codes: codes[category] = [] codes[category].append(code) except Exception as e: logger.error(f"Error extracting codes: {e}") # حذف تکراری‌ها for category in codes: codes[category] = sorted(list(set(codes[category]))) return codes # ===== کلاس Benchmark آپدیت شده ===== class UpdatedAnonymizationBenchmark: def __init__(self): self.extractor = UpdatedEntityExtractor() def analyze_single_row(self, original_text, anonymized_text, row_number): """تحلیل دقیق یک ردیف""" print(f"\n{'='*80}") print(f"تحلیل دقیق ردیف {row_number} (ورژن آپدیت شده)") print(f"{'='*80}") print(f"\n📝 متن اصلی ({len(original_text)} کاراکتر):") print(f"'{original_text[:150]}{'...' if len(original_text) > 150 else ''}'") print(f"\n🔒 متن ناشناس‌سازی شده ({len(anonymized_text)} کاراکتر):") print(f"'{anonymized_text[:150]}{'...' if len(anonymized_text) > 150 else ''}'") # استخراج entities از متن اصلی print(f"\n🔍 Entities دقیق استخراج شده از متن اصلی (الگوهای آپدیت شده):") original_entities = self.extractor.extract_entities(original_text) total_original_entities = 0 for category, entities in original_entities.items(): print(f"\n 📊 {category} ({len(entities)} عدد):") for i, entity in enumerate(entities, 1): print(f" {i}. '{entity}'") total_original_entities += len(entities) if not original_entities: print(" ❌ هیچ entity ای یافت نشد!") else: print(f"\n✅ مجموع entities یافت شده: {total_original_entities}") # استخراج کدهای ناشناس‌سازی print(f"\n🔒 کدهای ناشناس‌سازی:") anonymized_codes = self.extractor.extract_anonymized_codes(anonymized_text) total_anonymized_codes = 0 for category, codes in anonymized_codes.items(): print(f"\n 🔑 {category} ({len(codes)} عدد):") for i, code in enumerate(codes, 1): print(f" {i}. '{code}'") total_anonymized_codes += len(codes) if not anonymized_codes: print(" ❌ هیچ کد ناشناس‌سازی یافت نشد!") else: print(f"\n✅ مجموع کدهای ناشناس‌سازی: {total_anonymized_codes}") # تطبیق دقیق entities و codes print(f"\n📄 تطبیق Entities با کدهای ناشناس‌سازی:") all_categories = set(original_entities.keys()) | set(anonymized_codes.keys()) for category in sorted(all_categories): orig_count = len(original_entities.get(category, [])) anon_count = len(anonymized_codes.get(category, [])) print(f"\n 📈 {category}:") print(f" Entities اصلی: {orig_count}") print(f" کدهای ناشناس‌سازی: {anon_count}") if orig_count > 0: print(f" لیست اصلی: {original_entities[category]}") if anon_count > 0: print(f" لیست کدها: {anonymized_codes[category]}") # وضعیت تطبیق if orig_count == anon_count: print(f" وضعیت: ✅ تطبیق کامل") elif orig_count > anon_count: print(f" وضعیت: ⚠️ {orig_count - anon_count} entity از دست رفته") else: print(f" وضعیت: ⚠️ {anon_count - orig_count} کد اضافی") # محاسبه متریک‌ها category_metrics = {} total_tp, total_fp, total_fn = 0, 0, 0 for category in all_categories: original_count = len(original_entities.get(category, [])) anonymized_count = len(anonymized_codes.get(category, [])) tp = min(original_count, anonymized_count) fp = max(0, anonymized_count - original_count) fn = max(0, original_count - anonymized_count) precision = tp / (tp + fp) if (tp + fp) > 0 else 0 recall = tp / (tp + fn) if (tp + fn) > 0 else 0 f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0 category_metrics[category] = { 'original_count': original_count, 'anonymized_count': anonymized_count, 'tp': tp, 'fp': fp, 'fn': fn, 'precision': precision, 'recall': recall, 'f1_score': f1_score } total_tp += tp total_fp += fp total_fn += fn # متریک‌های کلی overall_precision = total_tp / (total_tp + total_fp) if (total_tp + total_fp) > 0 else 0 overall_recall = total_tp / (total_tp + total_fn) if (total_tp + total_fn) > 0 else 0 overall_f1 = 2 * (overall_precision * overall_recall) / (overall_precision + overall_recall) if (overall_precision + overall_recall) > 0 else 0 accuracy = total_tp / total_original_entities if total_original_entities > 0 else 0 print(f"\n🎯 متریک‌های نهایی ردیف {row_number} (الگوهای آپدیت شده):") print(f" TP: {total_tp}, FP: {total_fp}, FN: {total_fn}") print(f" Precision: {overall_precision:.4f}") print(f" Recall: {overall_recall:.4f}") print(f" F1-Score: {overall_f1:.4f}") print(f" Accuracy: {accuracy:.4f}") return { 'original_entities': original_entities, 'anonymized_codes': anonymized_codes, 'category_metrics': category_metrics, 'overall_metrics': { 'total_original_entities': total_original_entities, 'total_anonymized_entities': total_anonymized_codes, 'total_tp': total_tp, 'total_fp': total_fp, 'total_fn': total_fn, 'precision': overall_precision, 'recall': overall_recall, 'f1_score': overall_f1, 'accuracy': accuracy } } def process_csv(self, csv_file_path): """پردازش فایل CSV""" try: # خواندن فایل df = None for encoding in ['utf-8', 'utf-8-sig', 'cp1256', 'windows-1256']: try: df = pd.read_csv(csv_file_path, encoding=encoding) print(f"✅ فایل با encoding {encoding} خوانده شد") break except UnicodeDecodeError: continue if df is None: return "❌ خطا: نمی‌توان فایل را خواند" print(f"\n📋 اطلاعات فایل CSV:") print(f" تعداد ردیف‌ها: {len(df)}") print(f" ستون‌ها: {df.columns.tolist()}") # بررسی ستون‌ها if 'original_text' not in df.columns or 'anonymized_text' not in df.columns: return f"❌ خطا: فایل باید شامل ستون‌های 'original_text' و 'anonymized_text' باشد" if len(df) == 0: return "❌ خطا: فایل خالی است" # پردازش هر ردیف results = [] all_analysis = [] for index, row in df.iterrows(): print(f"\n📄 پردازش ردیف {index + 1} از {len(df)}") original_text = str(row['original_text']) if pd.notna(row['original_text']) else "" anonymized_text = str(row['anonymized_text']) if pd.notna(row['anonymized_text']) else "" if original_text.strip() == "" and anonymized_text.strip() == "": print("⚠️ ردیف خالی است، رد می‌شود") continue # تحلیل دقیق analysis = self.analyze_single_row(original_text, anonymized_text, index + 1) all_analysis.append(analysis) # ذخیره نتیجه result = { 'row_id': int(index), 'original_text': original_text, 'anonymized_text': anonymized_text, **{k: convert_to_serializable(v) for k, v in analysis['overall_metrics'].items()} } # اضافه کردن متریک‌های category for category, metrics in analysis['category_metrics'].items(): for metric_name, value in metrics.items(): result[f'{category.lower()}_{metric_name}'] = convert_to_serializable(value) results.append(result) if not results: return "❌ خطا: هیچ ردیف معتبری برای پردازش یافت نشد" return pd.DataFrame(results), all_analysis except Exception as e: return f"❌ خطا در پردازش: {str(e)}" # ===== رابط Gradio آپدیت شده ===== def process_uploaded_file(file): """پردازش فایل آپلود شده""" if file is None: return "❌ لطفاً ابتدا فایل CSV را آپلود کنید.", None print(f"\n🚀 شروع تحلیل دقیق فایل (ورژن آپدیت شده): {file.name}") benchmark = UpdatedAnonymizationBenchmark() result = benchmark.process_csv(file.name) if isinstance(result, str): return result, None results_df, all_analysis = result # تولید گزارش نهایی total_rows = len(results_df) # محاسبه آمار کلی avg_precision = results_df['precision'].mean() if 'precision' in results_df.columns else 0 avg_recall = results_df['recall'].mean() if 'recall' in results_df.columns else 0 avg_f1 = results_df['f1_score'].mean() if 'f1_score' in results_df.columns else 0 avg_accuracy = results_df['accuracy'].mean() if 'accuracy' in results_df.columns else 0 total_original = results_df['total_original_entities'].sum() if 'total_original_entities' in results_df.columns else 0 total_anonymized = results_df['total_anonymized_entities'].sum() if 'total_anonymized_entities' in results_df.columns else 0 total_tp = results_df['total_tp'].sum() if 'total_tp' in results_df.columns else 0 total_fp = results_df['total_fp'].sum() if 'total_fp' in results_df.columns else 0 total_fn = results_df['total_fn'].sum() if 'total_fn' in results_df.columns else 0 # گزارش نهایی report = f""" {'='*80} 🎯 گزارش نهایی Benchmark آپدیت شده - براساس سیستم ناشناس‌سازی بهبود یافته {'='*80} 📈 آمار کلی (الگوهای آپدیت شده براساس سیستم جامع): • تعداد ردیف‌های پردازش شده: {total_rows} • مجموع Entities دقیق در همه ردیف‌ها: {total_original} • مجموع کدهای ناشناس‌سازی: {total_anonymized} • True Positives (درست شناسایی شده): {total_tp} • False Positives (اشتباه شناسایی شده): {total_fp} • False Negatives (از دست رفته): {total_fn} 🎯 متریک‌های میانگین: • Precision: {avg_precision:.4f} • Recall: {avg_recall:.4f} • F1-Score: {avg_f1:.4f} • Accuracy: {avg_accuracy:.4f} 🆕 بهبودهای آپدیت شده: • آدرس کامل شامل میدان، برج، طبقه و واحد • نام اشخاص با عنوان خانم و مدیر مالی • شرکت‌های پیچیده (پردازش داده‌های ایرانیان) • شماره فاکتور و اسناد رسمی • تلفن ثابت شهری (021-) • فیلتر کلمات عمومی ("همین بانک", "شرکت متقاضی") • اولویت‌بندی بهتر پردازش entities • جداسازی دقیق کد ملی از شماره تلفن ✅ الگوهای regex دقیق‌تر و جامع‌تر شده‌اند! ✅ entities اضافی و غلط حذف شده‌اند ✅ فقط entities واقعی و معنادار شناسایی می‌شوند ✅ تطبیق بهتر با سیستم ناشناس‌سازی پیشرفته """ # ذخیره نتایج try: results_df.to_csv("updated_benchmark_results.csv", index=False, encoding='utf-8-sig') print("✅ نتایج آپدیت شده در فایل updated_benchmark_results.csv ذخیره شد") except Exception as e: print(f"⚠️ خطا در ذخیره فایل: {e}") # ستون‌های مهم برای نمایش display_columns = ['row_id', 'total_original_entities', 'total_anonymized_entities', 'total_tp', 'total_fp', 'total_fn', 'precision', 'recall', 'f1_score', 'accuracy'] display_df = results_df[[col for col in display_columns if col in results_df.columns]] return report, display_df def download_results(): """دانلود نتایج""" if os.path.exists("updated_benchmark_results.csv"): return "updated_benchmark_results.csv" return None # ===== رابط اصلی ===== def main(): with gr.Blocks(title="آپدیت شده: Ultra Precise Benchmark", theme=gr.themes.Soft()) as demo: gr.HTML("""

🔄 سیستم Benchmark آپدیت شده - براساس الگوهای ناشناس‌سازی پیشرفته

""") with gr.Row(): with gr.Column(): gr.HTML("""

🆕 ویژگی‌های آپدیت شده:

""") file_input = gr.File( label="📁 فایل CSV خود را آپلود کنید", file_types=[".csv"], file_count="single" ) process_btn = gr.Button("🔄 تحلیل با الگوهای آپدیت شده", variant="primary", size="lg") with gr.Row(): with gr.Column(): gr.HTML("

📊 گزارش آپدیت شده + لیست Entities صحیح

") results_output = gr.Textbox( label="گزارش کامل تحلیل خطاها و مشکلات", lines=30, max_lines=35, interactive=False ) with gr.Row(): with gr.Column(): gr.HTML("

📋 جدول نتایج آپدیت شده

") results_table = gr.Dataframe( label="متریک‌های دقیق هر ردیف (ورژن جدید)", interactive=False, wrap=True ) with gr.Row(): with gr.Column(): download_btn = gr.Button("💾 دانلود نتایج آپدیت شده", variant="secondary") download_file = gr.File(label="فایل نتایج آپدیت شده", visible=False) # Event handlers process_btn.click( fn=process_uploaded_file, inputs=[file_input], outputs=[results_output, results_table] ) download_btn.click( fn=download_results, outputs=[download_file] ) download_btn.click( fn=lambda: gr.update(visible=True), outputs=[download_file] ) return demo demo = main() if __name__ == "__main__": port = int(os.getenv("PORT", "7860")) demo.launch( share=False, server_name="0.0.0.0", server_port=port, show_error=True )