#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ سیستم benchmark ناشناسسازی - ورژن آپدیت شده با الگوهای جامع و دقیق """ import pandas as pd import re import json import logging import os import gradio as gr from typing import Dict, List, Tuple, Set from collections import defaultdict import numpy as np # تنظیم logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # ===== تابع کمکی برای تبدیل numpy/pandas types ===== def convert_to_serializable(obj): if isinstance(obj, (np.integer, np.int64, np.int32)): return int(obj) elif isinstance(obj, (np.floating, np.float64, np.float32)): return float(obj) elif isinstance(obj, np.ndarray): return obj.tolist() elif isinstance(obj, dict): return {key: convert_to_serializable(value) for key, value in obj.items()} elif isinstance(obj, list): return [convert_to_serializable(item) for item in obj] else: return obj # ===== کلاس پردازش entities با الگوهای آپدیت شده ===== class UpdatedEntityExtractor: def __init__(self): # الگوهای آپدیت شده براساس سیستم ناشناسسازی بهبود یافته self.patterns = { # آدرسهای کامل - اولویت بالا با پوشش میدان و برج 'FULL_ADDRESS': [ # الگوی آدرس کامل: شهر + میدان + برج + طبقه + واحد r'(?:تهران|اصفهان|مشهد|شیراز|کرج|اهواز|قم|رشت|کرمان|یزد|بوشهر|ارومیه|همدان|بندر عباس|ساری|اردبیل|خرمآباد|ایلام|بیرجند|گرگان|زنجان|سنندج|شهرکرد|سبزوار|قزوین|زاهدان|خوی|مراغه|کاشان|نجفآباد|شاهینشهر|ملایر|آبادان|دزفول|بابل|آمل|شاهرود|گنبد کاووس|خرمشهر|جهرم|فسا|مرودشت|لار|داراب|فیروزآباد|کازرون|سپیدان|نیریز|استهبان|فارسان|میانه|ورامین|قرچک|ری|پاکدشت|دماوند|فیروزکوه|شهریار|اسلامشهر|ملارد|قدس|بهارستان|چهاردانگه)،\s*(?:میدان|خیابان|کوچه|شهرک|بلوار|کوی|محله)\s+[آ-یّٰ-ٹ\s]+(?:،\s*(?:برج|ساختمان|مجتمع)\s+[آ-یّٰ-ٹ\s]+)?(?:،\s*(?:طبقه|واحد)\s+[آ-یّٰ-ٹ\d\s]+)?(?:،\s*واحد\s+[آ-یّٰ-ٹ\d\s]+)?', # الگوی آدرس کامل: شهر + خیابان + کوچه + پلاک + طبقه r'(?:تهران|اصفهان|مشهد|شیراز|کرج|اهواز|قم|رشت|کرمان|یزد|بوشهر|ارومیه|همدان|بندر عباس|ساری|اردبیل|خرمآباد|ایلام|بیرجند|گرگان|زنجان|سنندج|شهرکرد|سبزوار|قزوین|زاهدان|خوی|مراغه|کاشان|نجفآباد|شاهینشهر|ملایر|آبادان|دزفول|بابل|آمل|شاهرود|گنبد کاووس|خرمشهر|جهرم|فسا|مرودشت|لار|داراب|فیروزآباد|کازرون|سپیدان|نیریز|استهبان|فارسان|میانه|ورامین|قرچک|ری|پاکدشت|دماوند|فیروزکوه|شهریار|اسلامشهر|ملارد|قدس|بهارستان|چهاردانگه)،\s*(?:خیابان|کوچه|شهرک|بلوار|میدان|کوی|محله)\s+[آ-یّٰ-ٹ\s]+(?:،\s*(?:خیابان|کوچه|بلوار|کوی)\s+[آ-یّٰ-ٹ\s]+)?(?:،\s*پلاک\s+\d+)?(?:،\s*(?:طبقه|واحد)\s+[آ-یّٰ-ٹ\d\s]+)?', # الگوی آدرس با شهرک r'(?:تهران|اصفهان|مشهد|شیراز|کرج|اهواز|قم|رشت|کرمان|یزد|بوشهر|ارومیه|همدان|بندر عباس|ساری|اردبیل|خرمآباد|ایلام|بیرجند|گرگان|زنجان|سنندج|شهرکرد|سبزوار|قزوین|زاهدان|خوی|مراغه|کاشان|نجفآباد|شاهینشهر|ملایر|آبادان|دزفول|بابل|آمل|شاهرود|گنبد کاووس|خرمشهر|جهرم|فسا|مرودشت|لار|داراب|فیروزآباد|کازرون|سپیدان|نیریز|استهبان|فارسان|میانه|ورامین|قرچک|ری|پاکدشت|دماوند|فیروزکوه|شهریار|اسلامشهر|ملارد|قدس|بهارستان|چهاردانگه)،\s*شهرک\s+[آ-یّٰ-ٹ\s]+،\s*(?:خیابان|کوچه|بلوار)\s+[آ-یّٰ-ٹ\s]+(?:،\s*پلاک\s+\d+)?', # الگوی سادهتر برای آدرسهای کوتاهتر r'خیابان\s+[آ-یّٰ-ٹ\s]+،\s*کوچه\s+[آ-یّٰ-ٹ\s]+،\s*پلاک\s+\d+(?:،\s*(?:طبقه|واحد)\s+[آ-یّٰ-ٹ\d\s]+)?', ], # اسامی اشخاص - الگوهای دقیقتر شامل خانم 'PERSON': [ r'آقای\s+[آ-یّٰ-ٹ]+\s+[آ-یّٰ-ٹ]+(?=\s+با\s+کد|\s+مدیر|$|،|\.)', r'خانم\s+[آ-یّٰ-ٹ]+\s+[آ-یّٰ-ٹ]+(?=\s+با\s+کد|\s+با\s+موبایل|$|،|\.)', r'مهندس\s+[آ-یّٰ-ٹ]+\s+[آ-یّٰ-ٹ]+(?=\s+با\s+کد|$|،|\.)', r'دکتر\s+[آ-یّٰ-ٹ]+\s+[آ-یّٰ-ٹ]+(?=\s+با\s+کد|$|،|\.)', r'مدیر\s+مالی\s+خانم\s+[آ-یّٰ-ٹ]+\s+[آ-یّٰ-ٹ]+', r'مدیرعامل\s+[آ-یّٰ-ٹ]+\s+[آ-یّٰ-ٹ]+', r'Mr\.\s+[A-Z][a-z]+\s+[A-Z][a-z]+(?=\s|,|\.|$)', r'Ms\.\s+[A-Z][a-z]+\s+[A-Z][a-z]+(?=\s|,|\.|$)', r'Dr\.\s+[A-Z][a-z]+\s+[A-Z][a-z]+(?=\s|,|\.|$)', ], # کدهای ملی و شناسهها - جداسازی از شماره تلفن 'ID_NUMBER': [ r'کد\s+ملی\s+\d{10}', r'شناسه\s+ملی\s+\d{11}', r'(? 100: return False # بررسی کلمات عمومی if self.is_generic_word(text): return False # کلمات ممنوع که نباید entity باشند forbidden_words = [ 'شد', 'کرد', 'است', 'بود', 'در', 'که', 'با', 'از', 'به', 'را', 'و', 'یا', 'شده', 'نموده', 'صادر', 'ارائه', 'معرفی', 'برگزار', 'مطرح', 'واقع' ] if text.lower().strip() in forbidden_words: return False # بررسیهای خاص برای هر category if category == 'COMPANY': # نباید شامل فعل یا کلمات اضافی باشد if any(word in text.lower() for word in ['برگزار', 'مطرح', 'شد', 'است', 'نموده']): return False # باید حداقل یک اسم خاص داشته باشد if text.strip() in ['شرکت', 'بانک', 'شرکت در', 'بانک در']: return False elif category == 'LOCATION': # نباید شامل فعل باشد if any(word in text.lower() for word in ['برگزار', 'شد', 'است', 'واقع']): return False # باید نام مکان واقعی باشد if text.strip() in ['شهر', 'بندر', 'استان']: return False elif category == 'DATE': # نباید عبارات طولانی باشد if 'سال مالی' in text: return False elif category == 'PERSON': # نباید فقط عنوان باشد if text.strip() in ['آقای', 'خانم', 'مهندس', 'دکتر']: return False return True def extract_entities(self, text): """استخراج entities با دقت بالا و اولویتبندی""" if not text or text.strip() == '': return {} entities = {} processed_positions = set() # پردازش براساس اولویت for category in self.priority_order: if category not in self.patterns: continue pattern_list = self.patterns[category] found_entities = [] for pattern_str in pattern_list: try: pattern = re.compile(pattern_str, re.IGNORECASE | re.MULTILINE) matches = pattern.finditer(text) for match in matches: # بررسی تداخل با entities قبلی match_start, match_end = match.span() overlaps = False for proc_start, proc_end in processed_positions: if not (match_end <= proc_start or match_start >= proc_end): overlaps = True break if not overlaps: entity = self.clean_entity(match.group(0)) if self.is_valid_entity(entity, category): found_entities.append(entity) processed_positions.add((match_start, match_end)) except re.error as e: logger.error(f"Regex error in pattern {pattern_str}: {e}") continue # حذف تکراریها و مرتبسازی if found_entities: # حذف entities که زیرمجموعه entities دیگر هستند unique_entities = [] for entity in found_entities: is_subset = False for other in found_entities: if entity != other and entity in other: is_subset = True break if not is_subset: unique_entities.append(entity) entities[category] = sorted(list(set(unique_entities))) return entities def extract_anonymized_codes(self, text): """استخراج کدهای ناشناسسازی""" if not text or text.strip() == '': return {} codes = {} # الگو براساس سیستم ناشناسسازی: category_number pattern = r'([a-zA-Z_]+)_(\d{3})' try: matches = re.finditer(pattern, text, re.IGNORECASE) for match in matches: category = match.group(1).upper() code = match.group(0) if category not in codes: codes[category] = [] codes[category].append(code) except Exception as e: logger.error(f"Error extracting codes: {e}") # حذف تکراریها for category in codes: codes[category] = sorted(list(set(codes[category]))) return codes # ===== کلاس Benchmark آپدیت شده ===== class UpdatedAnonymizationBenchmark: def __init__(self): self.extractor = UpdatedEntityExtractor() def analyze_single_row(self, original_text, anonymized_text, row_number): """تحلیل دقیق یک ردیف""" print(f"\n{'='*80}") print(f"تحلیل دقیق ردیف {row_number} (ورژن آپدیت شده)") print(f"{'='*80}") print(f"\n📝 متن اصلی ({len(original_text)} کاراکتر):") print(f"'{original_text[:150]}{'...' if len(original_text) > 150 else ''}'") print(f"\n🔒 متن ناشناسسازی شده ({len(anonymized_text)} کاراکتر):") print(f"'{anonymized_text[:150]}{'...' if len(anonymized_text) > 150 else ''}'") # استخراج entities از متن اصلی print(f"\n🔍 Entities دقیق استخراج شده از متن اصلی (الگوهای آپدیت شده):") original_entities = self.extractor.extract_entities(original_text) total_original_entities = 0 for category, entities in original_entities.items(): print(f"\n 📊 {category} ({len(entities)} عدد):") for i, entity in enumerate(entities, 1): print(f" {i}. '{entity}'") total_original_entities += len(entities) if not original_entities: print(" ❌ هیچ entity ای یافت نشد!") else: print(f"\n✅ مجموع entities یافت شده: {total_original_entities}") # استخراج کدهای ناشناسسازی print(f"\n🔒 کدهای ناشناسسازی:") anonymized_codes = self.extractor.extract_anonymized_codes(anonymized_text) total_anonymized_codes = 0 for category, codes in anonymized_codes.items(): print(f"\n 🔑 {category} ({len(codes)} عدد):") for i, code in enumerate(codes, 1): print(f" {i}. '{code}'") total_anonymized_codes += len(codes) if not anonymized_codes: print(" ❌ هیچ کد ناشناسسازی یافت نشد!") else: print(f"\n✅ مجموع کدهای ناشناسسازی: {total_anonymized_codes}") # تطبیق دقیق entities و codes print(f"\n📄 تطبیق Entities با کدهای ناشناسسازی:") all_categories = set(original_entities.keys()) | set(anonymized_codes.keys()) for category in sorted(all_categories): orig_count = len(original_entities.get(category, [])) anon_count = len(anonymized_codes.get(category, [])) print(f"\n 📈 {category}:") print(f" Entities اصلی: {orig_count}") print(f" کدهای ناشناسسازی: {anon_count}") if orig_count > 0: print(f" لیست اصلی: {original_entities[category]}") if anon_count > 0: print(f" لیست کدها: {anonymized_codes[category]}") # وضعیت تطبیق if orig_count == anon_count: print(f" وضعیت: ✅ تطبیق کامل") elif orig_count > anon_count: print(f" وضعیت: ⚠️ {orig_count - anon_count} entity از دست رفته") else: print(f" وضعیت: ⚠️ {anon_count - orig_count} کد اضافی") # محاسبه متریکها category_metrics = {} total_tp, total_fp, total_fn = 0, 0, 0 for category in all_categories: original_count = len(original_entities.get(category, [])) anonymized_count = len(anonymized_codes.get(category, [])) tp = min(original_count, anonymized_count) fp = max(0, anonymized_count - original_count) fn = max(0, original_count - anonymized_count) precision = tp / (tp + fp) if (tp + fp) > 0 else 0 recall = tp / (tp + fn) if (tp + fn) > 0 else 0 f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0 category_metrics[category] = { 'original_count': original_count, 'anonymized_count': anonymized_count, 'tp': tp, 'fp': fp, 'fn': fn, 'precision': precision, 'recall': recall, 'f1_score': f1_score } total_tp += tp total_fp += fp total_fn += fn # متریکهای کلی overall_precision = total_tp / (total_tp + total_fp) if (total_tp + total_fp) > 0 else 0 overall_recall = total_tp / (total_tp + total_fn) if (total_tp + total_fn) > 0 else 0 overall_f1 = 2 * (overall_precision * overall_recall) / (overall_precision + overall_recall) if (overall_precision + overall_recall) > 0 else 0 accuracy = total_tp / total_original_entities if total_original_entities > 0 else 0 print(f"\n🎯 متریکهای نهایی ردیف {row_number} (الگوهای آپدیت شده):") print(f" TP: {total_tp}, FP: {total_fp}, FN: {total_fn}") print(f" Precision: {overall_precision:.4f}") print(f" Recall: {overall_recall:.4f}") print(f" F1-Score: {overall_f1:.4f}") print(f" Accuracy: {accuracy:.4f}") return { 'original_entities': original_entities, 'anonymized_codes': anonymized_codes, 'category_metrics': category_metrics, 'overall_metrics': { 'total_original_entities': total_original_entities, 'total_anonymized_entities': total_anonymized_codes, 'total_tp': total_tp, 'total_fp': total_fp, 'total_fn': total_fn, 'precision': overall_precision, 'recall': overall_recall, 'f1_score': overall_f1, 'accuracy': accuracy } } def process_csv(self, csv_file_path): """پردازش فایل CSV""" try: # خواندن فایل df = None for encoding in ['utf-8', 'utf-8-sig', 'cp1256', 'windows-1256']: try: df = pd.read_csv(csv_file_path, encoding=encoding) print(f"✅ فایل با encoding {encoding} خوانده شد") break except UnicodeDecodeError: continue if df is None: return "❌ خطا: نمیتوان فایل را خواند" print(f"\n📋 اطلاعات فایل CSV:") print(f" تعداد ردیفها: {len(df)}") print(f" ستونها: {df.columns.tolist()}") # بررسی ستونها if 'original_text' not in df.columns or 'anonymized_text' not in df.columns: return f"❌ خطا: فایل باید شامل ستونهای 'original_text' و 'anonymized_text' باشد" if len(df) == 0: return "❌ خطا: فایل خالی است" # پردازش هر ردیف results = [] all_analysis = [] for index, row in df.iterrows(): print(f"\n📄 پردازش ردیف {index + 1} از {len(df)}") original_text = str(row['original_text']) if pd.notna(row['original_text']) else "" anonymized_text = str(row['anonymized_text']) if pd.notna(row['anonymized_text']) else "" if original_text.strip() == "" and anonymized_text.strip() == "": print("⚠️ ردیف خالی است، رد میشود") continue # تحلیل دقیق analysis = self.analyze_single_row(original_text, anonymized_text, index + 1) all_analysis.append(analysis) # ذخیره نتیجه result = { 'row_id': int(index), 'original_text': original_text, 'anonymized_text': anonymized_text, **{k: convert_to_serializable(v) for k, v in analysis['overall_metrics'].items()} } # اضافه کردن متریکهای category for category, metrics in analysis['category_metrics'].items(): for metric_name, value in metrics.items(): result[f'{category.lower()}_{metric_name}'] = convert_to_serializable(value) results.append(result) if not results: return "❌ خطا: هیچ ردیف معتبری برای پردازش یافت نشد" return pd.DataFrame(results), all_analysis except Exception as e: return f"❌ خطا در پردازش: {str(e)}" # ===== رابط Gradio آپدیت شده ===== def process_uploaded_file(file): """پردازش فایل آپلود شده""" if file is None: return "❌ لطفاً ابتدا فایل CSV را آپلود کنید.", None print(f"\n🚀 شروع تحلیل دقیق فایل (ورژن آپدیت شده): {file.name}") benchmark = UpdatedAnonymizationBenchmark() result = benchmark.process_csv(file.name) if isinstance(result, str): return result, None results_df, all_analysis = result # تولید گزارش نهایی total_rows = len(results_df) # محاسبه آمار کلی avg_precision = results_df['precision'].mean() if 'precision' in results_df.columns else 0 avg_recall = results_df['recall'].mean() if 'recall' in results_df.columns else 0 avg_f1 = results_df['f1_score'].mean() if 'f1_score' in results_df.columns else 0 avg_accuracy = results_df['accuracy'].mean() if 'accuracy' in results_df.columns else 0 total_original = results_df['total_original_entities'].sum() if 'total_original_entities' in results_df.columns else 0 total_anonymized = results_df['total_anonymized_entities'].sum() if 'total_anonymized_entities' in results_df.columns else 0 total_tp = results_df['total_tp'].sum() if 'total_tp' in results_df.columns else 0 total_fp = results_df['total_fp'].sum() if 'total_fp' in results_df.columns else 0 total_fn = results_df['total_fn'].sum() if 'total_fn' in results_df.columns else 0 # گزارش نهایی report = f""" {'='*80} 🎯 گزارش نهایی Benchmark آپدیت شده - براساس سیستم ناشناسسازی بهبود یافته {'='*80} 📈 آمار کلی (الگوهای آپدیت شده براساس سیستم جامع): • تعداد ردیفهای پردازش شده: {total_rows} • مجموع Entities دقیق در همه ردیفها: {total_original} • مجموع کدهای ناشناسسازی: {total_anonymized} • True Positives (درست شناسایی شده): {total_tp} • False Positives (اشتباه شناسایی شده): {total_fp} • False Negatives (از دست رفته): {total_fn} 🎯 متریکهای میانگین: • Precision: {avg_precision:.4f} • Recall: {avg_recall:.4f} • F1-Score: {avg_f1:.4f} • Accuracy: {avg_accuracy:.4f} 🆕 بهبودهای آپدیت شده: • آدرس کامل شامل میدان، برج، طبقه و واحد • نام اشخاص با عنوان خانم و مدیر مالی • شرکتهای پیچیده (پردازش دادههای ایرانیان) • شماره فاکتور و اسناد رسمی • تلفن ثابت شهری (021-) • فیلتر کلمات عمومی ("همین بانک", "شرکت متقاضی") • اولویتبندی بهتر پردازش entities • جداسازی دقیق کد ملی از شماره تلفن ✅ الگوهای regex دقیقتر و جامعتر شدهاند! ✅ entities اضافی و غلط حذف شدهاند ✅ فقط entities واقعی و معنادار شناسایی میشوند ✅ تطبیق بهتر با سیستم ناشناسسازی پیشرفته """ # ذخیره نتایج try: results_df.to_csv("updated_benchmark_results.csv", index=False, encoding='utf-8-sig') print("✅ نتایج آپدیت شده در فایل updated_benchmark_results.csv ذخیره شد") except Exception as e: print(f"⚠️ خطا در ذخیره فایل: {e}") # ستونهای مهم برای نمایش display_columns = ['row_id', 'total_original_entities', 'total_anonymized_entities', 'total_tp', 'total_fp', 'total_fn', 'precision', 'recall', 'f1_score', 'accuracy'] display_df = results_df[[col for col in display_columns if col in results_df.columns]] return report, display_df def download_results(): """دانلود نتایج""" if os.path.exists("updated_benchmark_results.csv"): return "updated_benchmark_results.csv" return None # ===== رابط اصلی ===== def main(): with gr.Blocks(title="آپدیت شده: Ultra Precise Benchmark", theme=gr.themes.Soft()) as demo: gr.HTML("""