#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ سیستم ناشناس‌سازی اصلاح شده - حل مشکلات کدهای 11 رقمی، بانک‌ها و سازمان‌های دولتی """ import gradio as gr import re import os import requests import time import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class ImprovedDataAnonymizer: def __init__(self): self.mapping_table = {} self.pattern_categories = { 'personal_identity': { 'name_fa': 'اطلاعات شخصی و هویتی', 'name_en': 'Personal & Identity Information', 'patterns': ['PERSON', 'ID_NUMBER', 'MIXED_NAMES'], 'icon': '👤' }, 'financial': { 'name_fa': 'اطلاعات مالی', 'name_en': 'Financial Information', 'patterns': ['AMOUNT', 'ACCOUNT', 'CARD_NUMBER'], 'icon': '💰' }, 'temporal': { 'name_fa': 'اطلاعات زمانی', 'name_en': 'Temporal Information', 'patterns': ['DATE'], 'icon': '📅' }, 'location': { 'name_fa': 'اطلاعات مکانی', 'name_en': 'Location Information', 'patterns': ['LOCATION', 'FULL_ADDRESS'], 'icon': '📍' }, 'business': { 'name_fa': 'اطلاعات کسب‌وکار', 'name_en': 'Business Information', 'patterns': ['COMPANY', 'BRANCH'], 'icon': '🏢' }, 'government': { 'name_fa': 'سازمان‌های دولتی', 'name_en': 'Government Organizations', 'patterns': ['GOVERNMENT'], 'icon': '🏛️' }, 'communication': { 'name_fa': 'اطلاعات ارتباطی', 'name_en': 'Communication Information', 'patterns': ['PHONE', 'EMAIL'], 'icon': '📞' } } self.counters = { 'PERSON': 0, 'ID_NUMBER': 0, 'MIXED_NAMES': 0, 'AMOUNT': 0, 'ACCOUNT': 0, 'CARD_NUMBER': 0, 'DATE': 0, 'LOCATION': 0, 'FULL_ADDRESS': 0, 'COMPANY': 0, 'BRANCH': 0, 'GOVERNMENT': 0, 'PHONE': 0, 'EMAIL': 0 } self.api_key = os.getenv("OPENAI_API_KEY", "") def get_improved_patterns(self): """الگوهای بهبود یافته با حل مشکلات شناسایی آدرس کامل""" return { # آدرس‌های کامل - اولویت بالا برای گرفتن آدرس کامل قبل از قطعات 'FULL_ADDRESS': [ # الگوی آدرس کامل: شهر + خیابان + کوچه + پلاک + طبقه r'(?:تهران|اصفهان|مشهد|شیراز|کرج|اهواز|قم|رشت|کرمان|یزد|بوشهر|ارومیه|همدان|بندر عباس|ساری|اردبیل|خرم‌آباد|ایلام|بیرجند|گرگان|زنجان|سنندج|شهرکرد|سبزوار|قزوین|زاهدان|خوی|مراغه|کاشان|نجف‌آباد|شاهین‌شهر|ملایر|آبادان|دزفول|بابل|آمل|شاهرود|گنبد کاووس|خرمشهر|جهرم|فسا|مرودشت|لار|داراب|فیروزآباد|کازرون|سپیدان|نی‌ریز|استهبان|فارسان|میانه|ورامین|قرچک|ری|پاکدشت|دماوند|فیروزکوه|شهریار|اسلام‌شهر|ملارد|قدس|بهارستان|چهاردانگه)،\s*(?:خیابان|کوچه|شهرک|بلوار|میدان|کوی|محله)\s+[آ-یَ-ّ۰-۹\s]+(?:،\s*(?:خیابان|کوچه|بلوار|کوی)\s+[آ-یَ-ّ۰-۹\s]+)?(?:،\s*پلاک\s+\d+)?(?:،\s*(?:طبقه|واحد)\s+[آ-یَ-ّ۰-۹\d\s]+)?', # الگوی آدرس با شهرک r'(?:تهران|اصفهان|مشهد|شیراز|کرج|اهواز|قم|رشت|کرمان|یزد|بوشهر|ارومیه|همدان|بندر عباس|ساری|اردبیل|خرم‌آباد|ایلام|بیرجند|گرگان|زنجان|سنندج|شهرکرد|سبزوار|قزوین|زاهدان|خوی|مراغه|کاشان|نجف‌آباد|شاهین‌شهر|ملایر|آبادان|دزفول|بابل|آمل|شاهرود|گنبد کاووس|خرمشهر|جهرم|فسا|مرودشت|لار|داراب|فیروزآباد|کازرون|سپیدان|نی‌ریز|استهبان|فارسان|میانه|ورامین|قرچک|ری|پاکدشت|دماوند|فیروزکوه|شهریار|اسلام‌شهر|ملارد|قدس|بهارستان|چهاردانگه)،\s*شهرک\s+[آ-یَ-ّ۰-۹\s]+،\s*(?:خیابان|کوچه|بلوار)\s+[آ-یَ-ّ۰-۹\s]+(?:،\s*پلاک\s+\d+)?', # الگوی ساده‌تر برای آدرس‌های کوتاه‌تر r'خیابان\s+[آ-یَ-ّ۰-۹\s]+،\s*کوچه\s+[آ-یَ-ّ۰-۹\s]+،\s*پلاک\s+\d+(?:،\s*(?:طبقه|واحد)\s+[آ-یَ-ّ۰-۹\d\s]+)?', ], # اسامی اشخاص - الگوهای دقیق‌تر 'PERSON': [ r'آقای\s+[آ-یَ-ّ۰-۹]+\s+[آ-یَ-ّ۰-۹]+(?=\s+با\s+کد|\s+مدیر|$|،|\.)', r'خانم\s+[آ-یَ-ّ۰-۹]+\s+[آ-یَ-ّ۰-۹]+(?=\s+با\s+کد|\s+همسر|$|،|\.)', r'مهندس\s+[آ-یَ-ّ۰-۹]+\s+[آ-یَ-ّ۰-۹]+(?=\s+با\s+کد|$|،|\.)', r'دکتر\s+[آ-یَ-ّ۰-۹]+\s+[آ-یَ-ّ۰-۹]+(?=\s+با\s+کد|$|،|\.)', r'Mr\.\s+[A-Z][a-z]+\s+[A-Z][a-z]+(?=\s|,|\.|$)', r'Ms\.\s+[A-Z][a-z]+\s+[A-Z][a-z]+(?=\s|,|\.|$)', r'Dr\.\s+[A-Z][a-z]+\s+[A-Z][a-z]+(?=\s|,|\.|$)', ], # کدهای ملی و شناسه‌ها - جداسازی از شماره تلفن + کدهای 11 رقمی شرکت‌ها 'ID_NUMBER': [ r'کد\s+ملی\s+\d{10}', r'شناسه\s+ملی\s+\d{11}', r'(?= proc_end): overlaps = True break # شرایط قبولی - با فیلتر کلمات عمومی if (not overlaps and full_match not in found_entities and full_match not in self.mapping_table and len(full_match) >= 3 and not full_match.isspace() and not self._is_generic_word(full_match)): # فیلتر کلمات عمومی self.counters[category] += 1 code = f"{category.lower()}_{self.counters[category]:03d}" self.mapping_table[full_match] = code found_entities.add(full_match) processed_entities.add((match_start, match_end)) except re.error as e: logger.error(f"Regex error in pattern {pattern}: {e}") continue # جایگزینی در متن - طولانی‌ترین‌ها اول sorted_items = sorted(self.mapping_table.items(), key=lambda x: len(x[0]), reverse=True) for original_item, code in sorted_items: anonymized = anonymized.replace(original_item, code) logger.info(f"✅ Improved anonymization completed. Found {len(self.mapping_table)} entities.") return anonymized except Exception as e: logger.error(f"Anonymization error: {e}") return f"⚠ Error in anonymization: {str(e)}" if lang == 'en' else f"⚠ خطا در ناشناس‌سازی: {str(e)}" def _is_generic_word(self, text): """بررسی کلمات عمومی که نباید entity محسوب شوند""" generic_words = { 'همین بانک', 'این بانک', 'آن بانک', 'بانک مذکور', 'همین شرکت', 'این شرکت', 'آن شرکت', 'شرکت مذکور', 'همین شعبه', 'این شعبه', 'آن شعبه', 'شعبه مذکور', 'همین شهر', 'این شهر', 'آن شهر', 'همین سازمان', 'این سازمان', 'آن سازمان', 'سازمان مذکور', 'همین وزارت', 'این وزارت', 'آن وزارت', 'وزارت مذکور', 'متقاضی', 'ایشان', 'وی', 'مشتری', 'بانک', 'شرکت', 'شعبه', 'سازمان', 'وزارت' # کلمات تنها } return text.strip() in generic_words or len(text.strip()) < 3 def send_to_chatgpt(self, anonymized_text, lang='fa'): """ارسال به ChatGPT""" try: if not anonymized_text or not anonymized_text.strip(): return "⚠ Anonymized text is empty!" if lang == 'en' else "⚠ متن ناشناس‌شده خالی است!" if not self.api_key: return "⚠ API Key not configured!" if lang == 'en' else "⚠ کلید API تنظیم نشده است!" system_msg = "You are a professional analyst. Answer questions accurately." if lang == 'en' else "شما یک تحلیل‌گر حرفه‌ای هستید. به سوالات با دقت پاسخ دهید." headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" } data = { "model": "gpt-4o-mini", "messages": [ {"role": "system", "content": system_msg}, {"role": "user", "content": anonymized_text} ], "max_tokens": 2000, "temperature": 0.7 } response = requests.post( "https://api.openai.com/v1/chat/completions", headers=headers, json=data, timeout=30 ) if response.status_code == 200: result = response.json() return result['choices'][0]['message']['content'] else: error_data = response.json() if response.content else {} error_message = error_data.get('error', {}).get('message', response.text) return f"⚠ API Error: {error_message}" except Exception as e: return f"⚠ Error connecting to ChatGPT: {str(e)}" if lang == 'en' else f"⚠ خطا در ارتباط با ChatGPT: {str(e)}" def deanonymize_response(self, gpt_response, lang='fa'): """بازگردانی""" try: if not gpt_response or not gpt_response.strip(): return "⚠ ChatGPT response is empty!" if lang == 'en' else "⚠ پاسخ ChatGPT خالی است!" if not self.mapping_table: return "⚠ Mapping table is empty!" if lang == 'en' else "⚠ جدول نگاشت خالی است!" final_result = gpt_response reverse_mapping = {code: original for original, code in self.mapping_table.items()} sorted_codes = sorted(reverse_mapping.items(), key=lambda x: len(x[0]), reverse=True) for code, original in sorted_codes: final_result = final_result.replace(code, original) return final_result except Exception as e: return f"⚠ Deanonymization error: {str(e)}" if lang == 'en' else f"⚠ خطا در بازگردانی: {str(e)}" def get_model_status(self): """وضعیت سیستم""" status = "🚀 **سیستم ناشناس‌سازی بهبود یافته - نسخه اصلاح شده:**\n\n" status += "• **مشکلات حل شده:**\n" status += " ✅ آدرس کامل به عنوان یک entity واحد\n" status += " ✅ حذف کلمات عمومی مثل 'همین بانک'\n" status += " ✅ اولویت‌بندی بهتر برای آدرس کامل\n" status += " ✅ جداسازی دقیق کد ملی از شماره تلفن\n" status += " ✅ جداسازی مبالغ مالی از شماره تلفن\n" status += " ✅ شماره حساب و کارت بانکی جداگانه\n" status += " ✅ شناسایی کدهای 11 رقمی شرکت‌ها\n" status += " ✅ تشخیص بانک‌های معروف ایران\n" status += " ✅ شناسایی سازمان‌های دولتی و نهادهای رسمی\n\n" status += "• **بهبودهای الگو:**\n" status += " 🎯 الگوی جامع برای آدرس کامل فارسی\n" status += " 🎯 فیلتر کلمات عمومی\n" status += " 🎯 اولویت‌بندی بهتر پردازش\n" status += " 🎯 حذف تداخل‌های غلط\n" status += " 🎯 الگوهای دقیق برای کدهای 11 رقمی\n" status += " 🎯 شناسایی بانک‌های ایرانی مشهور\n" status += " 🎯 تشخیص نهادهای دولتی و رسمی\n\n" status += f"📊 **دسته‌بندی‌های موجود:**\n" for cat_key, cat_info in self.pattern_categories.items(): icon = cat_info['icon'] name_fa = cat_info['name_fa'] pattern_count = len(cat_info['patterns']) status += f" {icon} {name_fa}: {pattern_count} الگو\n" status += "\n🔧 **اصلاحات اخیر:**\n" status += " ✅ آدرس کامل: تهران، خیابان ولیعصر، کوچه نیلوفر، پلاک 128، طبقه سوم → full_address_001\n" status += " ✅ آدرس کامل: کرج، شهرک اندیشه، خیابان گلستان، پلاک 45 → full_address_002\n" status += " ✅ فیلتر: 'همین بانک' حذف شد\n" status += " ✅ کد 11 رقمی: 10987654321 → id_number_001\n" status += " ✅ بانک ملی → company_001\n" status += " ✅ بانک مرکزی جمهوری اسلامی ایران → government_001\n" return status # ایجاد instance anonymizer = ImprovedDataAnonymizer() def process_all_steps(input_text, language, selected_categories): """پردازش خودکار تمام مراحل - نسخه اصلاح شده""" lang = 'en' if language == 'English' else 'fa' if not input_text.strip(): error_msg = "⚠ Please enter input text!" if lang == 'en' else "⚠ لطفاً متن ورودی را وارد کنید!" return error_msg, "", "", "" try: start_time = time.time() anonymized_text = anonymizer.anonymize_text(input_text, lang, selected_categories) if anonymized_text.startswith("⚠"): return anonymized_text, "", "", "" gpt_response = anonymizer.send_to_chatgpt(anonymized_text, lang) if gpt_response.startswith("⚠"): entities_found = len(anonymizer.mapping_table) selected_count = len(selected_categories) if selected_categories else 0 success_msg = (f"✅ ناشناس‌سازی اصلاح شده انجام شد!\n" f"📋 دسته‌های انتخابی: {selected_count} | 🔍 پردازش کدهای 11 رقمی و بانک‌ها\n" f"📊 کل entities محافظت شده: {entities_found} | 🎯 مشکلات کدهای ملی، بانک‌ها و سازمان‌ها حل شده!") return success_msg, anonymized_text, gpt_response, "" final_result = anonymizer.deanonymize_response(gpt_response, lang) total_time = time.time() - start_time entities_found = len(anonymizer.mapping_table) selected_count = len(selected_categories) if selected_categories else 7 success_msg = (f"🎉 ناشناس‌سازی کامل و بازگردانی موفق!\n" f"🔧 روش: پردازش اصلاح شده + کدهای 11 رقمی + بانک‌ها | 📋 دسته‌ها: {selected_count}/7\n" f"📊 کل: {entities_found} entities | ⏱️ زمان: {total_time:.2f}s\n" f"⚡ مشکلات کدهای 11 رقمی، بانک‌ها و سازمان‌های دولتی حل شدند!") return success_msg, anonymized_text, gpt_response, final_result except Exception as e: error_msg = f"⚠ Processing error: {str(e)}" if lang == 'en' else f"⚠ خطا در پردازش: {str(e)}" return error_msg, "", "", "" def get_mapping_table(language): """نمایش جدول نگاشت""" lang = 'en' if language == 'English' else 'fa' if not anonymizer.mapping_table: return "⚠ Mapping table is empty!" if lang == 'en' else "⚠ جدول نگاشت خالی است!" result = "📋 **جدول نگاشت اصلاح شده:**\n\n" # نمایش آمار کلی result += f"📊 **آمار**: {len(anonymizer.mapping_table)} entity\n" result += f"🔍 **روش**: پردازش آدرس کامل اصلاح شده\n" result += f"⚡ **حالت**: شناسایی دقیق entities و فیلتر کلمات عمومی\n\n" # دسته‌بندی نتایج category_stats = {} for original, code in anonymizer.mapping_table.items(): category = code.split('_')[0].upper() if category not in category_stats: category_stats[category] = [] category_stats[category].append((original, code)) # نمایش نتایج بر اساس دسته‌بندی for category, items in category_stats.items(): if len(items) > 0: result += f"🔍 **{category}** ({len(items)} مورد):\n" for original, code in items: result += f" • `{original}` → `{code}`\n" result += "\n" result += "✨ **سیستم اصلاح شده**: تشخیص آدرس کامل و حذف کلمات عمومی!" return result def clear_all(): """پاک کردن همه""" anonymizer.mapping_table = {} anonymizer.counters = {key: 0 for key in anonymizer.counters.keys()} return "", "", "", "", "" # رابط کاربری Gradio custom_css = """ body, .gradio-container { font-family: 'Segoe UI', Tahoma, Arial, sans-serif !important; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%) !important; min-height: 100vh !important; padding: 20px !important; } .gradio-textbox { border-radius: 10px !important; box-shadow: 0 4px 15px rgba(0,0,0,0.1) !important; min-height: 300px !important; } .status-box { background: linear-gradient(135deg, #4CAF50, #45a049) !important; border: 3px solid #2E7D32 !important; border-radius: 15px !important; padding: 15px !important; margin: 10px 0 !important; box-shadow: 0 8px 32px rgba(76, 175, 80, 0.3) !important; } .gradio-button { border-radius: 25px !important; font-weight: bold !important; transition: all 0.3s ease !important; margin: 5px 0 !important; min-height: 50px !important; } h1 { background: linear-gradient(45deg, #FFD700, #FFA500) !important; -webkit-background-clip: text !important; -webkit-text-fill-color: transparent !important; background-clip: text !important; text-align: center !important; } """ with gr.Blocks(title="🔧 سیستم ناشناس‌سازی اصلاح شده - کدهای 11 رقمی + بانک‌ها", theme=gr.themes.Soft(), css=custom_css) as app: with gr.Column(): gr.HTML("

🔧 سیستم ناشناس‌سازی اصلاح شده - حل مشکلات کدهای 11 رقمی و بانک‌ها

") with gr.Row(): language_selector = gr.Radio( choices=["فارسی", "English"], value="فارسی", label="Language / زبان", interactive=True ) # بخش انتخاب دسته‌بندی‌ها with gr.Row(): with gr.Column(): gr.HTML("

🎯 انتخاب دسته‌بندی‌های الگو

") pattern_categories = gr.CheckboxGroup( choices=anonymizer.get_category_choices('fa'), value=anonymizer.get_category_choices('fa'), label="انتخاب دسته‌بندی‌های الگو:", interactive=True ) gr.HTML("""

🔧 اصلاحات: کدهای 11 رقمی شرکت‌ها، بانک‌های ایرانی، سازمان‌های دولتی، آدرس کامل، حذف کلمات عمومی

""") with gr.Row(): with gr.Column(scale=1): gr.HTML('

🔍 متن ورودی

') input_text = gr.Textbox( lines=15, placeholder="متن اصلی خود را اینجا وارد کنید...\nمثال متن مشکل‌دار شما:\nآقای علی احمدی با کد ملی 0123456789 در تاریخ 1402/09/15 درخواست تسهیلات بانکی به مبلغ 500000000 تومان از شعبه مرکزی بانک ملی ارائه نموده است. ایشان ساکن تهران، خیابان ولیعصر، کوچه نیلوفر، پلاک 128، طبقه سوم بوده و شماره تماس 09123456789 را اعلام نموده‌اند...\n\n🆕 نمونه‌های جدید:\nشرکت پتروشیمی تبریز با شناسه اقتصادی 10987654321 قرارداد با بانک صادرات امضا کرد. دفتر اسناد رسمی شماره 45 تهران مدارک را تایید نمود. همچنین وزارت صنعت و سازمان ثبت اسناد نیز موافقت کردند.\n\n🔧 حالا مشکلات حل شده‌اند!", label="", rtl=True ) with gr.Row(): process_btn = gr.Button("🚀 پردازش با الگوهای اصلاح شده", variant="primary") clear_btn = gr.Button("🗑️ پاک کردن همه", variant="stop") status = gr.Textbox( label="وضعیت", lines=4, interactive=False, rtl=True, elem_classes=["status-box"] ) with gr.Column(scale=1): gr.HTML('

🎭 متن ناشناس‌شده

') anonymized_output = gr.Textbox( lines=15, placeholder="متن ناشناس‌شده اینجا نمایش داده می‌شود...\nانتظار می‌رود:\n- person_001 به جای آقای علی احمدی\n- full_address_001 به جای تهران، خیابان ولیعصر، کوچه نیلوفر، پلاک 128، طبقه سوم\n- amount_001 به جای 500000000 تومان\n- company_001 به جای بانک ملی\n- id_number_001 به جای کدهای 11 رقمی شرکت‌ها\n- government_001 به جای دفتر اسناد رسمی\n- و سایر موارد...", label="", interactive=False, rtl=True ) with gr.Row(): with gr.Column(scale=1): gr.HTML('

🤖 پاسخ خام ChatGPT

') gpt_output = gr.Textbox( lines=10, placeholder="پاسخ خام ChatGPT اینجا نمایش داده می‌شود...", label="", interactive=False, rtl=True ) with gr.Column(scale=1): gr.HTML('

✅ پاسخ نهایی بازگردانده شده

') final_output = gr.Textbox( lines=10, placeholder="پاسخ نهایی اینجا نمایش داده می‌شود...", label="", interactive=False, rtl=True ) with gr.Row(): with gr.Column(): gr.HTML('

📋 جدول نگاشت

') mapping_btn = gr.Button("📋 نمایش جدول نگاشت اصلاح شده") mapping_output = gr.Textbox( lines=15, label="جدول نگاشت اطلاعات", interactive=False, visible=False, rtl=True ) with gr.Row(): with gr.Column(): gr.HTML('

⚙️ وضعیت سیستم

') system_status_btn = gr.Button("📊 نمایش وضعیت سیستم اصلاح شده") system_status_output = gr.Textbox( lines=20, label="وضعیت سیستم", interactive=False, visible=False, rtl=True ) # Event handlers process_btn.click( fn=process_all_steps, inputs=[input_text, language_selector, pattern_categories], outputs=[status, anonymized_output, gpt_output, final_output] ) clear_btn.click( fn=clear_all, outputs=[input_text, anonymized_output, gpt_output, final_output, status] ) mapping_btn.click( fn=get_mapping_table, inputs=[language_selector], outputs=[mapping_output] ) mapping_btn.click( fn=lambda: gr.update(visible=True), outputs=[mapping_output] ) system_status_btn.click( fn=lambda: anonymizer.get_model_status(), outputs=[system_status_output] ) system_status_btn.click( fn=lambda: gr.update(visible=True), outputs=[system_status_output] ) if __name__ == "__main__": logger.info("🎯 Starting Smart Anonymization System...") logger.info("✅ Focus: فقط موجودیت‌های منحصربه‌فرد - نام‌های اصلی شرکت‌ها، تاریخ‌ها، مکان‌ها") logger.info("🎯 Ready for selective anonymization of unique entities only!") app.launch( share=False, server_name="0.0.0.0", server_port=7860, show_error=True )