#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Enhanced Multi-Modal Data Anonymization System - Fixed for HuggingFace Spaces ============================================================================= Combining XLM-RoBERTa + Advanced Regex Patterns for Maximum Accuracy Supports Persian, English, and Mixed Languages """ import gradio as gr import re import os import requests import time import logging from typing import List, Dict, Tuple, Optional, Set import warnings import subprocess import sys import os def install_requirements(): """نصب اجباری وابستگی‌ها""" try: subprocess.check_call([sys.executable, "-m", "pip", "install", "--upgrade", "pip"]) subprocess.check_call([sys.executable, "-m", "pip", "install", "transformers>=4.30.0"]) subprocess.check_call([sys.executable, "-m", "pip", "install", "torch"]) subprocess.check_call([sys.executable, "-m", "pip", "install", "tokenizers>=0.13.0"]) print("✅ Dependencies installed successfully") except Exception as e: print(f"❌ Failed to install dependencies: {e}") # نصب وابستگی‌ها در صورت عدم وجود try: import transformers print("✅ Transformers already available") except ImportError: print("📦 Installing transformers...") install_requirements() # Enhanced dependencies with better error handling TRANSFORMERS_AVAILABLE = False try: print("🔄 Attempting to import transformers...") from transformers import AutoTokenizer, AutoModelForTokenClassification, pipeline TRANSFORMERS_AVAILABLE = True print("✅ Transformers library loaded successfully") except ImportError as e: print(f"⚠️ Transformers import failed: {e}") print("📝 Falling back to regex-only mode") TRANSFORMERS_AVAILABLE = False except Exception as e: print(f"❌ Unexpected error loading transformers: {e}") TRANSFORMERS_AVAILABLE = False warnings.filterwarnings('ignore') logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class EnhancedDataAnonymizer: def __init__(self): self.mapping_table = {} self.counters = {} self.api_key = os.getenv("OPENAI_API_KEY", "") # Processing modes self.processing_modes = { 'regex_only': 'Pure Regex (Fast & Compatible)', 'hybrid': 'Regex + XLM-RoBERTa (Recommended)', 'ner_priority': 'NER Priority + Regex Backup (Highest Accuracy)' } # Model components self.ner_pipeline = None self.model_status = "Initializing..." self.model_ready = False # Initialize model with improved error handling self.initialize_ner_model_safe() # Pattern categories self.pattern_categories = { 'personal_identity': { 'name_fa': 'اطلاعات شخصی و هویتی', 'name_en': 'Personal & Identity Information', 'patterns': ['PERSON', 'MIXED_NAMES', 'ID_NUMBER', 'ENGLISH_TITLES'], 'icon': '👤' }, 'financial': { 'name_fa': 'اطلاعات مالی', 'name_en': 'Financial Information', 'patterns': ['AMOUNT', 'INTERNATIONAL_CURRENCIES', 'ACCOUNT', 'FINANCIAL_TERMS', 'STOCK_SYMBOL'], 'icon': '💰' }, 'temporal': { 'name_fa': 'اطلاعات زمانی', 'name_en': 'Temporal Information', 'patterns': ['DATE', 'ADVANCED_DATE_FORMATS', 'TIME_RANGES'], 'icon': '📅' }, 'location': { 'name_fa': 'اطلاعات مکانی', 'name_en': 'Location Information', 'patterns': ['LOCATION', 'COMPLEX_ADDRESSES'], 'icon': '📍' }, 'technical': { 'name_fa': 'اطلاعات فنی و تکنولوژیکی', 'name_en': 'Technical & Technological', 'patterns': ['TECHNICAL_CODES', 'NETWORK_ADDRESSES', 'TECHNICAL_UNITS', 'ACRONYMS_ABBREVIATIONS'], 'icon': '⚙️' }, 'business': { 'name_fa': 'اطلاعات کسب‌وکار', 'name_en': 'Business Information', 'patterns': ['COMPANY', 'BUSINESS_TERMS', 'PRODUCT', 'PETROCHEMICAL'], 'icon': '🏢' }, 'quantity': { 'name_fa': 'اطلاعات کمیت و واحد', 'name_en': 'Quantity & Unit Information', 'patterns': ['PERCENTAGE', 'VOLUME', 'RATIOS'], 'icon': '📊' }, 'communication': { 'name_fa': 'اطلاعات ارتباطی', 'name_en': 'Communication Information', 'patterns': ['PHONE', 'EMAIL'], 'icon': '📞' } } # Initialize counters self.reset_counters() def initialize_ner_model_safe(self): """بارگذاری ایمن مدل XLM-RoBERTa با مدیریت خطای بهبود یافته""" print("🔄 Starting model initialization...") if not TRANSFORMERS_AVAILABLE: self.model_status = "⚠️ Transformers library not available - Using Regex only mode" self.model_ready = False print("📝 Transformers not available, continuing with regex patterns only") return try: print("🤖 Attempting to load XLM-RoBERTa model...") # Try loading with multiple fallback strategies model_names = [ "xlm-roberta-base", "distilbert-base-multilingual-cased", "bert-base-multilingual-cased" ] for model_name in model_names: try: print(f"🔄 Trying model: {model_name}") self.ner_pipeline = pipeline( "ner", model=model_name, aggregation_strategy="simple", device=-1, # Force CPU tokenizer_kwargs={ "truncation": True, "max_length": 256, "padding": True } ) # Test the model with a simple input test_result = self.ner_pipeline("Test text") self.model_status = f"✅ {model_name} loaded successfully" self.model_ready = True print(f"✅ Successfully loaded model: {model_name}") return except Exception as model_error: print(f"❌ Failed to load {model_name}: {model_error}") continue # If all models failed raise Exception("All model loading attempts failed") except Exception as e: error_msg = str(e)[:100] print(f"❌ Model loading completely failed: {error_msg}") self.model_status = f"❌ Model loading failed - Using Regex only" self.model_ready = False self.ner_pipeline = None def reset_counters(self): """ریست کانترها""" pattern_types = [] for category in self.pattern_categories.values(): pattern_types.extend(category['patterns']) self.counters = {pattern: 0 for pattern in pattern_types} def detect_language(self, text): """تشخیص زبان متن""" if not text: return 'fa' persian_chars = len(re.findall(r'[\u0600-\u06FF]', text)) english_chars = len(re.findall(r'[a-zA-Z]', text)) total = persian_chars + english_chars if total == 0: return 'fa' if persian_chars / total > 0.6: return 'fa' elif english_chars / total > 0.6: return 'en' else: return 'mixed' def get_comprehensive_patterns(self): """الگوهای جامع ناشناس‌سازی""" return { 'PERSON': [ r'آقای\s+([آ-یa-zA-Z]+(?:\s+[آ-یa-zA-Z]+)*)', r'خانم\s+([آ-یa-zA-Z]+(?:\s+[آ-یa-zA-Z]+)*)', r'مهندس\s+([آ-یa-zA-Z]+(?:\s+[آ-یa-zA-Z]+)*)', r'دکتر\s+([آ-یa-zA-Z]+(?:\s+[آ-یa-zA-Z]+)*)', r'استاد\s+([آ-یa-zA-Z]+(?:\s+[آ-یa-zA-Z]+)*)', r'Mr\.\s+([a-zA-Z]+(?:\s+[a-zA-Z]+)*)', r'Ms\.\s+([a-zA-Z]+(?:\s+[a-zA-Z]+)*)', r'Dr\.\s+([a-zA-Z]+(?:\s+[a-zA-Z]+)*)', r'([آ-یa-zA-Z]+\s+[آ-یa-zA-Z]+)(?:، مدیرعامل|\s+مدیرعامل|\s+رئیس)', ], 'MIXED_NAMES': [ r'([آ-یa-zA-Z]{2,}\s+[آ-یa-zA-Z]{2,})', r'([A-Z][a-z]+-[A-Z][a-z]+)', r"([A-Z]'[A-Z][a-z]+)", ], 'ID_NUMBER': [ r'IR[۰-۹0-9]{24}', r'شبا[\s:]*IR[۰-۹0-9]{24}', r'(?:کد[\s]*)?(?:ملی[\s:]*)?[۰-۹0-9]{10}', r'(?:شناسه[\s]*)?(?:ملی[\s:]*)?[۰-۹0-9]{10}', r'National[\s]*(?:ID[\s:]*)?[0-9]{10}', r'(?:پاسپورت[\s:]*)?[A-Z][0-9]{8}', r'SSN[\s:]*[0-9]{3}-[0-9]{2}-[0-9]{4}', ], 'ENGLISH_TITLES': [ r'business\s+partner', r'team\s+lead', r'head\s+of\s+production', r'senior\s+architect', r'civil\s+engineer', r'system\s+administrator', r'network\s+engineer', r'environmental\s+consultant', r'senior\s+loan\s+officer', r'facility\s+manager', r'project\s+team', r'technical\s+support' ], 'AMOUNT': [ r'\d+(?:,\d{3})*\s*(?:میلیون|میلیارد|هزار)\s*تومان', r'مبلغ\s+\d+(?:,\d{3})*\s*(?:میلیون|میلیارد|هزار)?\s*تومان', r'\$\d+(?:,\d{3})*(?:\.\d+)?\s*(?:million|billion|thousand|M|B|K)?', r'€\d+(?:,\d{3})*(?:\.\d+)?', r'\d+(?:,\d{3})*\s*ریال', r'رقم\s+فعلی\s+\d+(?:,\d{3})*\s*(?:میلیون|میلیارد)\s*تومان', r'رقم\s+\d+(?:,\d{3})*\s*(?:میلیون|میلیارد)\s*تومان', r'به\s+\d+(?:,\d{3})*\s*(?:میلیون|میلیارد|هزار)\s*تومان', ], 'INTERNATIONAL_CURRENCIES': [ r'\d+(?:,\d{3})*\s+euro', r'€\d+(?:\.\d+)?M', r'\d+\s+EUR', r'\d+(?:,\d{3})*\s+AED', r'\d+(?:\.\d+)?M\s+AED', r'\$\d+(?:\.\d+)?M', r'\$\d+(?:\.\d+)?K', r'£\d+(?:,\d{3})*(?:\.\d+)?', r'\d+\s+GBP', r'\d+\s+CHF', r'¥\d+(?:,\d{3})*', r'\d+\s+JPY' ], 'ACCOUNT': [ r'(?:شماره[\s]*)?(?:حساب[\s]*)?(?:بانکی[\s:]*)?(?:[۰-۹0-9]{1,3}[-\s]?)*[۰-۹0-9]{8,20}', r'حساب[\s]*(?:شماره[\s:]*)?(?:[۰-۹0-9]{1,3}[-\s]?)*[۰-۹0-9]{8,20}', r'شماره[\s]*حساب[\s:]*(?:[۰-۹0-9]{1,3}[-\s]?)*[۰-۹0-9]{8,20}', r'Account[\s]*(?:Number[\s:]*)?(?:[0-9]{1,3}[-\s]?)*[0-9]{8,20}', r'[۰-۹0-9]{3}[-\s]?[۰-۹0-9]{3}[-\s]?[۰-۹0-9]{6,12}', r'واریز[\s]*(?:سود[\s:]*)?(?:[۰-۹0-9]{1,3}[-\s]?)*[۰-۹0-9]{8,20}', r'سود[\s:]*(?:[۰-۹0-9]{1,3}[-\s]?)*[۰-۹0-9]{8,20}' ], 'FINANCIAL_TERMS': [ r'فروش\s+(?:ماهانه|تجمیعی|صادراتی)', r'درآمد\s+شرکت', r'سود\s+(?:خالص|نقدی)', r'صورت‌های\s+مالی', r'بهای\s+تمام‌شده', r'سودآوری', r'عملکرد\s+مالی', r'میانگین\s+فروش', r'بالاترین\s+رقم\s+فروش', r'رقم\s+فروش', r'درآمدهای\s+عملیاتی' ], 'STOCK_SYMBOL': [ r'نماد\s+([آ-یa-zA-Z0-9]+)', r'(سبهان|غدیر|شتران|شپنا|پترول|فارس|خارک|پلاسکو|جم|کرمان|مارون|اراک|رازی|شازند|کاوه|بندر|پارس|خوزستان|ماهشهر|عسلویه)(?=\s|$|،|\.|\s+)', r'شرکت\s+([آ-یa-zA-Z\s]+?)(?=\s+در|\s+که|\s+با|،|\.|\s+$|\s+را|\s+به)', r'پتروشیمی\s+([آ-یa-zA-Z\s]+?)(?=\s+در|\s+که|\s+با|،|\.|\s+$|\s+توان)', r'(AAPL|GOOGL|MSFT|AMZN|TSLA|META|NVDA|SABIC)(?=\s|$|,|\.)' ], 'DATE': [ r'[۰-۹0-9]{4}[/-][۰-۹0-9]{1,2}[/-][۰-۹0-9]{1,2}', r'[۰-۹0-9]{1,2}[/-][۰-۹0-9]{1,2}[/-][۰-۹0-9]{4}', r'(?:[۰-۹0-9]{1,2})\s*(?:فروردین|اردیبهشت|خرداد|تیر|مرداد|شهریور|مهر|آبان|آذر|دی|بهمن|اسفند)\s*(?:[۰-۹0-9]{4})', r'(?:فروردین|اردیبهشت|خرداد|تیر|مرداد|شهریور|مهر|آبان|آذر|دی|بهمن|اسفند)\s+[۰-۹0-9]{4}', r'(?:[0-9]{1,2})\s*(?:January|February|March|April|May|June|July|August|September|October|November|December)\s*(?:[0-9]{4})', r'(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\s*[0-9]{1,2},?\s*[0-9]{4}', r'سال\s+گذشته', r'سال\s+جاری', r'این\s+سال', r'ماه\s+قبل', r'ماه\s+اخیر', r'(?:13[0-9]{2}|14[0-9]{2}|20[0-9]{2}|19[0-9]{2})(?=\s|$|،|\.)' ], 'ADVANCED_DATE_FORMATS': [ r'(?:March|April|May|June|July|August|September|October|November|December)\s+\d{1,2}(?:st|nd|rd|th),?\s+\d{4}', r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d{3})?Z', r'(?:PST|EST|GMT|UTC)(?:[+-]\d{1,2}:\d{2})?', r'Eastern\s+Time', r'GMT[+-]\d{1,2}:\d{2}', r'end\s+of\s+fiscal\s+year\s+\d{4}/\d{2}/\d{2}' ], 'TIME_RANGES': [ r'\d{2}:\d{2}-\d{2}:\d{2}', r'\d{2}:\d{2}\s+تا\s+\d{2}:\d{2}', r'\d{1,2}:\d{2}\s+(?:AM|PM)\s+(?:PST|EST|GMT|UTC)', r'\d{2}:\d{2}:\d{2}\s+(?:AM|PM)', r'COB\s*\(Close\s+of\s+Business\)', r'\d{1,3}\s+(?:business\s+days|روز\s+کاری)' ], 'LOCATION': [ r'(تهران|اصفهان|ماهشهر|عسلویه|بندرعباس|اهواز|شیراز|مشهد|تبریز|کرج|قم|رشت|کرمان|یزد|زاهدان|بوشهر|خرمشهر|آبادان|اراک|قزوین)', r'استان\s+([آ-ی\s]+)', r'شهر\s+([آ-ی\s]+)', r'(ایران|عراق|کویت|عربستان|امارات|قطر|عمان|بحرین|ترکیه|پاکستان|افغانستان)', r'داخلی|بازار\s+داخلی', r'خارجی|بازارهای\s+خارجی', r'(London|Paris|Tokyo|New\s+York|Dubai|Singapore|Hong\s+Kong|Shanghai|Mumbai|Frankfurt|Amsterdam)' ], 'COMPLEX_ADDRESSES': [ r'کیلومتر\s+\d+\s+جاده\s+[آ-ی\s]+-[آ-ی\s]+', r'روبروی\s+(?:پمپ\s+بنزین|بانک|پارک|مسجد|بیمارستان)\s+[آ-یa-zA-Z\s]+', r'Building-[A-Z],?\s+Floor-\d+,?\s+Unit-[A-Z0-9]+', r'rack\s+number\s+R-\d+,?\s+slot\s+\d+', r'phase\s+\d+\s+development,?\s+block\s+[A-Z],?\s+plot\s+\d+-[A-Z]', r'\d{2,5}\s+[A-Z][a-z]+\s+(?:Street|Avenue|Boulevard|Road|Drive),?\s+Floor\s+\d+,?\s+Building\s+[A-Z]', r'شهرک\s+صنعتی\s+[آ-ی\s]+،?\s+محور\s+[آ-ی\s]+' ], 'TECHNICAL_CODES': [ r'SN-\d{4}-[A-Z]{3}-\d{4}', r'Serial\s+Number[\s:]*[A-Z0-9-]+', r'REF-[A-Z]{3}-\d{4}-\d{3}', r'DOC-[A-Z]{2}-\d{4}-\d{4}', r'INF-\d{4}-\d{4}', r'CTR/\d{4}/\d{3}', r'HVAC-\d{7}', r'Generator-Model-[A-Z0-9]+', r'LOI-\d{4}-[A-Z]{4}-\d{3}', r'BOQ-\d{4}-[A-Z]{3}-\d{3}', r'#INV-\d{4}-Q\d-\d{4}', r'ESC-\d{4}-[A-Z]{3}-\d{3}', r'BN-\d{6}-[A-Z]\d+' ], 'NETWORK_ADDRESSES': [ r'\b(?:\d{1,3}\.){3}\d{1,3}\b', r'xxx\.xxx\.xxx\.xxx', r'[A-F0-9]{2}:[A-F0-9]{2}:[A-F0-9]{2}:[A-F0-9]{2}:[A-F0-9]{2}:[A-F0-9]{2}', r'srv-[a-z]+-[a-z]+-\d{2}', r'[a-z]+-[a-z]+\d*\.[a-z]+\.[a-z]+', r'[a-zA-Z0-9-]+\.[a-zA-Z]{2,4}(?:\.[a-zA-Z]{2,4})?' ], 'TECHNICAL_UNITS': [ r'\d+(?:\.\d+)?\s*MW', r'\d+(?:\.\d+)?\s*kWh?', r'\d+(?:,\d{3})*\s*cubic\s+meters', r'\d+(?:,\d{3})*\s*m³', r'\d+(?:,\d{3})*\s*sq\s+ft', r'\d+(?:\.\d+)?\s*ppm', r'\d+(?:\.\d+)?\s*mg/m³', r'\b(?:CO2|NOx|SO2)\b', r'\d+(?:\.\d+)?\s*TB', r'\d+(?:\.\d+)?\s*GB', r'\d+(?:,\d{3})*\s*square\s+meters', r'\d+(?:\.\d+)?\%\s*efficiency', r'FICO\s+score:\s*\d{3}', r'\d+(?:\.\d+)?\s*(?:bar|psi)', r'\d+(?:\.\d+)?\s*°[CF]', r'\d+(?:\.\d+)?\s*(?:rpm|m/s)' ], 'ACRONYMS_ABBREVIATIONS': [ r'\b(?:HVAC|IT|HSE|BOQ|LC|COB)\b', r'\b(?:YTD|NNN|EIN|SSN|FICO)\b', r'\bIP\s+Address\b', r'\bMAC\s+Address\b', r'\bURL\b', r'\b(?:LLC|Corp|Inc|Ltd)\b', r'\b(?:PST|GMT|UTC|EST)\b', r'\b(?:CO2|NOx|pH|UV)\b', r'\b(?:SCADA|PLC|HMI)\b', r'\b(?:GDP|CPI|ROI|NPV)\b', r'\b(?:FOB|CIF|DDP)\b', r'\b(?:ABA|SWIFT|IBAN)\b' ], 'COMPANY': [ r'شرکت(?=\s+در|\s+که|\s+با|\s+را|\s+به)', r'([آ-یa-zA-Z\s]+)\s+شرکت', r'این\s+شرکت(?=\s|$|،|\.)', r'(بانک\s+[آ-یa-zA-Z\s]+)', r'([A-Z][a-zA-Z\s]+(?:Inc|Corp|Corporation|Company|Ltd|Limited|LLC))' ], 'BUSINESS_TERMS': [ r'تحلیل\s+عملکرد', r'گزارش\s+(?:فعالیت|عملکرد)\s+ماهانه', r'وضعیت\s+فروش', r'تولید\s+پایدار', r'سهم\s+بازار', r'صادرات\s+هدفمند', r'بهره‌وری', r'ظرفیت‌های\s+داخلی', r'شرکت‌های\s+پیشرو', r'صنعت\s+پتروشیمی', r'سرمایه‌گذاران\s+بنیادی', r'شاخص‌های\s+عملیاتی', r'برنامه‌ریزی\s+مناسب', r'واحد\s+فروش', r'موجودی\s+انبار', r'فاز\s+رشد\s+جدید', r'ترکیب\s+فروش', r'سهم\s+صادراتی', r'روند\s+عملکرد', r'اعداد\s+اعلام‌شده', r'داده‌های\s+ثبت‌شده' ], 'PRODUCT': [ r'\b(?:VCM|PVC|PE|PP|PS|ABS|SAN|PC|PMMA|PET|PBT|PA|POM|TPU)\b', r'پلی\s*(?:اتیلن|پروپیلن|استایرن|کربنات|متیل)', r'\b(?:اتیلن|پروپیلن|بنزن|تولوئن|زایلن|متانول|اتانول|استون|فنول)\b', r'\b(?:کلر|هیدروژن|اکسیژن|نیتروژن|آمونیاک|اتان|پروپان|بوتان)\b', r'محصول(?:ات)?', r'تولیدات\s+شرکت' ], 'PETROCHEMICAL': [ r'\b(?:LDPE|HDPE|LLDPE|PP|PS|EPS|ABS|SAN|PC|PMMA|PET|PBT|PA6|PA66|POM|TPU|EVA|EAA)\b', r'(?:Ethylene\s+Vinyl\s+Acetate|Ethyl\s+Acrylate|Methyl\s+Methacrylate|Polyethylene\s+Terephthalate)' ], 'PERCENTAGE': [ r'\d+(?:\.\d+)?\s*درصد(?:\s+افزایش|\s+رشد|\s+کاهش|\s+بالاتر|\s+پایین‌تر)?', r'\d+(?:\.\d+)?\s*%', r'معادل\s+\d+(?:\.\d+)?\s*درصد', r'حدود\s+\d+(?:\.\d+)?\s*درصد', r'با\s+\d+(?:\.\d+)?\s*درصد\s+افزایش', r'رشد\s+\d+(?:\.\d+)?\s*درصدی', r'\d+(?:\.\d+)?\s*درصدی(?=\s+همراه|\s+بوده)', r'میزان\s+رشد(?=\s+نسبت|\s+معادل)', r'افزایش\s+قابل‌توجهی', r'بهبود\s+نسبی', r'\d+(?:\.\d+)?\%\s*(?:increase|decrease|growth|improvement)', r'(?:approximately|about)\s+\d+(?:\.\d+)?\%' ], 'VOLUME': [ r'\d+(?:,\d{3})*\s*تن', r'\d+(?:,\d{3})*\s*(?:کیلوگرم|لیتر|بشکه)', r'میزان\s+\d+(?:,\d{3})*\s*تن', r'مقدار\s+تولید', r'حجم\s+فروش', r'ظرفیت\s+(?:تولید|اسمی)', r'\d+(?:,\d{3})*\s*(?:tons|kg|liters|barrels)', r'\d+(?:,\d{3})*\s*(?:metric\s+tons|MT)', r'\d+(?:,\d{3})*\s*(?:thousand\s+tons|KT)' ], 'RATIOS': [ r'نسبت\s+(?:فروش|تولید)\s+به\s+[آ-ی\s]+', r'\d+(?:\.\d+)?\s*نزدیک', r'برابر\s+با\s+\d+(?:\.\d+)?', r'معادل\s+\d+(?:\.\d+)?', r'میزان\s+(?:رشد|افزایش)', r'شاخص\s+(?:مهم|عملیاتی)', r'\d+(?:\.\d+)?\s*درصد\s+کل\s+تولید' ], 'PHONE': [ r'(?:تلفن[\s:]*)?(?:شماره[\s:]*)?(?:0)?(?:[۰-۹0-9]{2,3}[-\s]?)?[۰-۹0-9]{7,8}', r'(?:تماس[\s:]*)?(?:شماره[\s:]*)?(?:با[\s]*)?(?:0)?(?:[۰-۹0-9]{2,3}[-\s]?)?[۰-۹0-9]{7,8}', r'(?:موبایل[\s:]*)?(?:شماره[\s:]*)?(?:0)?9[۰-۹0-9]{9}', r'[۰-۹0-9]{3,4}[-\s][۰-۹0-9]{7,8}', r'[۰-۹0-9]{11}(?!\d)', r'(?:\+98|0098)?[۰-۹0-9]{10}', r'[۰-۹0-9]{3,4}[-\s]?[۰-۹0-9]{3,4}[-\s]?[۰-۹0-9]{3,4}', r'\+[0-9]{1,3}-[0-9]{3}-[0-9]{3}-[0-9]{4}(?:\s+ext\.\s+[0-9]{3,4})?', r'\([0-9]{3}\)\s+[0-9]{3}-[0-9]{4}' ], 'EMAIL': [ r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', r'ایمیل[\s:]*[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', r'email[\s:]*[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', r'نشانی[\s]*الکترونیکی[\s:]*[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', r'آدرس[\s]*ایمیل[\s:]*[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', r'facility\.manager@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}' ] } def extract_entities_with_ner(self, text: str, confidence_threshold: float = 0.75) -> List[Dict]: """استخراج موجودیت‌ها با مدل NER""" if not self.model_ready or not self.ner_pipeline: return [] try: # Process text with NER model ner_results = self.ner_pipeline(text) entities = [] for entity in ner_results: if entity['score'] >= confidence_threshold: # Clean entity text entity_text = entity['word'].replace('##', '').strip() if len(entity_text) >= 2: # Minimum length filter entities.append({ 'text': entity_text, 'label': entity['entity_group'], 'confidence': entity['score'], 'start': entity['start'], 'end': entity['end'], 'source': 'ner' }) return entities except Exception as e: logger.error(f"Error in NER extraction: {e}") return [] def map_ner_to_categories(self, ner_label: str) -> str: """نگاشت برچسب‌های NER به دسته‌های سیستم""" mapping = { 'PER': 'PERSON', 'PERSON': 'PERSON', 'ORG': 'COMPANY', 'ORGANIZATION': 'COMPANY', 'LOC': 'LOCATION', 'LOCATION': 'LOCATION', 'MISC': 'MIXED_NAMES', 'GPE': 'LOCATION', 'MONEY': 'AMOUNT', 'DATE': 'DATE', 'TIME': 'DATE' } return mapping.get(ner_label.upper(), 'MIXED_NAMES') def extract_entities_with_regex(self, text: str, selected_categories: List[str] = None) -> List[Dict]: """استخراج موجودیت‌ها با Regex""" entities = [] all_patterns = self.get_comprehensive_patterns() # Filter patterns based on selected categories if selected_categories: selected_pattern_types = self.get_selected_patterns(selected_categories, 'fa') patterns = {k: v for k, v in all_patterns.items() if k in selected_pattern_types} else: patterns = all_patterns processed_positions = set() # Process patterns with priority priority_order = [ 'ID_NUMBER', 'EMAIL', 'PHONE', 'ACCOUNT', 'AMOUNT', 'DATE', 'LOCATION', 'COMPANY', 'PERSON' ] for category in priority_order: if category in patterns: pattern_list = patterns[category] for pattern in pattern_list: try: matches = re.finditer(pattern, text, re.IGNORECASE | re.MULTILINE) for match in matches: if match.groups(): entity_text = match.group(1).strip() else: entity_text = match.group(0).strip() # Check for overlaps match_start, match_end = match.span() overlaps = any( not (match_end <= pos_start or match_start >= pos_end) for pos_start, pos_end in processed_positions ) if (not overlaps and len(entity_text) >= 2): entities.append({ 'text': entity_text, 'category': category, 'start': match_start, 'end': match_end, 'confidence': 0.9, 'source': 'regex' }) processed_positions.add((match_start, match_end)) except re.error as e: logger.error(f"Regex error in pattern {pattern}: {e}") continue return entities def fuse_entities(self, regex_entities: List[Dict], ner_entities: List[Dict], processing_mode: str) -> List[Dict]: """ترکیب هوشمندانه نتایج Regex و NER""" if processing_mode == 'regex_only' or not self.model_ready: return regex_entities final_entities = [] processed_positions = set() if processing_mode == 'hybrid': # Regex priority for specific patterns priority_categories = ['PHONE', 'EMAIL', 'ID_NUMBER', 'ACCOUNT', 'AMOUNT'] # Add high-priority regex entities first for entity in regex_entities: if entity['category'] in priority_categories: final_entities.append(entity) processed_positions.add((entity['start'], entity['end'])) # Add NER entities for names and organizations for entity in ner_entities: if not self.has_overlap(entity, processed_positions): category = self.map_ner_to_categories(entity['label']) entity_copy = entity.copy() entity_copy['category'] = category final_entities.append(entity_copy) processed_positions.add((entity['start'], entity['end'])) # Add remaining regex entities for entity in regex_entities: if (entity['category'] not in priority_categories and not self.has_overlap(entity, processed_positions)): final_entities.append(entity) processed_positions.add((entity['start'], entity['end'])) elif processing_mode == 'ner_priority': # NER takes priority, regex as backup for entity in ner_entities: category = self.map_ner_to_categories(entity['label']) entity_copy = entity.copy() entity_copy['category'] = category final_entities.append(entity_copy) processed_positions.add((entity['start'], entity['end'])) # Add non-overlapping regex entities for entity in regex_entities: if not self.has_overlap(entity, processed_positions): final_entities.append(entity) processed_positions.add((entity['start'], entity['end'])) return final_entities def has_overlap(self, entity: Dict, processed_positions: Set[Tuple[int, int]]) -> bool: """بررسی تداخل موقعیت entities""" entity_start, entity_end = entity['start'], entity['end'] for start, end in processed_positions: if not (entity_end <= start or entity_start >= end): return True return False def get_selected_patterns(self, selected_categories: List[str], language: str = 'fa') -> List[str]: """تبدیل دسته‌بندی‌های انتخاب شده به لیست الگوها""" selected_patterns = [] for cat_key, cat_info in self.pattern_categories.items(): name = cat_info['name_fa'] if language == 'fa' else cat_info['name_en'] icon = cat_info['icon'] category_display = f"{icon} {name}" if category_display in selected_categories: selected_patterns.extend(cat_info['patterns']) return selected_patterns def get_category_choices(self, language='fa'): """دریافت لیست دسته‌بندی‌ها برای چک‌باکس""" choices = [] for cat_key, cat_info in self.pattern_categories.items(): name = cat_info['name_fa'] if language == 'fa'else cat_info['name_en'] icon = cat_info['icon'] choices.append(f"{icon} {name}") return choices def anonymize_text_enhanced(self, original_text: str, lang: str = 'fa', selected_categories: List[str] = None, processing_mode: str = 'hybrid') -> str: """ناشناس‌سازی پیشرفته با ترکیب Regex + NER""" try: if not original_text or not original_text.strip(): return "❌ Please enter input text!" if lang == 'en' else "❌ لطفاً متن ورودی را وارد کنید!" # Force regex_only if model not ready if not self.model_ready and processing_mode != 'regex_only': processing_mode = 'regex_only' print(f"🔄 Forced to regex_only mode because model not ready") # Reset self.mapping_table = {} self.reset_counters() # Extract entities with regex regex_entities = self.extract_entities_with_regex(original_text, selected_categories) # Extract entities with NER (if available) ner_entities = [] if processing_mode != 'regex_only' and self.model_ready: ner_raw = self.extract_entities_with_ner(original_text) # Convert to standard format for entity in ner_raw: ner_entities.append({ 'text': entity['text'], 'category': self.map_ner_to_categories(entity['label']), 'start': entity['start'], 'end': entity['end'], 'confidence': entity['confidence'], 'source': 'ner' }) # Fuse entities final_entities = self.fuse_entities(regex_entities, ner_entities, processing_mode) # Create anonymization mapping anonymized = original_text found_entities = set() # Sort by length (longer first to avoid partial replacements) final_entities.sort(key=lambda x: len(x['text']), reverse=True) for entity in final_entities: entity_text = entity['text'].strip() category = entity['category'] if (entity_text not in found_entities and entity_text not in self.mapping_table and len(entity_text) >= 2): # Generate unique code if category not in self.counters: self.counters[category] = 0 self.counters[category] += 1 # Add source indicator if processing_mode == 'regex_only': source_suffix = "REG" elif processing_mode == 'hybrid': source_suffix = "HYB" if self.model_ready else "REG" else: source_suffix = "ENH" if self.model_ready else "REG" code = f"{category}_{self.counters[category]:03d}_{source_suffix}" self.mapping_table[entity_text] = code found_entities.add(entity_text) # Apply anonymization sorted_items = sorted(self.mapping_table.items(), key=lambda x: len(x[0]), reverse=True) for original_item, code in sorted_items: anonymized = anonymized.replace(original_item, code) # Statistics regex_count = len(regex_entities) ner_count = len(ner_entities) final_count = len(final_entities) logger.info(f"✅ Enhanced anonymization completed. Mode: {processing_mode}") logger.info(f"📊 Regex: {regex_count}, NER: {ner_count}, Final: {final_count}") return anonymized except Exception as e: logger.error(f"Enhanced anonymization error: {e}") return f"❌ Error in enhanced anonymization: {str(e)}" def send_to_chatgpt(self, anonymized_text, lang='fa'): """گام 2: ارسال به ChatGPT""" try: if not anonymized_text or not anonymized_text.strip(): return "❌ Anonymized text is empty!" if lang == 'en' else "❌ متن ناشناس‌شده خالی است!" if not self.api_key: return "❌ API Key not configured! Please set OPENAI_API_KEY environment variable." if lang == 'en' else "❌ کلید API تنظیم نشده است!" system_msg = "You are a professional analyst. Answer questions accurately." if lang == 'en' else "شما یک تحلیلگر حرفه‌ای هستید. به سوالات با دقت پاسخ دهید." headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" } data = { "model": "gpt-4o-mini", "messages": [ {"role": "system", "content": system_msg}, {"role": "user", "content": anonymized_text} ], "max_tokens": 2000, "temperature": 0.7 } response = requests.post( "https://api.openai.com/v1/chat/completions", headers=headers, json=data, timeout=15 # Reduced timeout for HF Spaces ) if response.status_code == 200: result = response.json() return result['choices'][0]['message']['content'] else: error_data = response.json() if response.content else {} error_message = error_data.get('error', {}).get('message', response.text) return f"❌ API Error: {error_message}" except Exception as e: return f"❌ Error connecting to ChatGPT: {str(e)}" if lang == 'en' else f"❌ خطا در ارتباط با ChatGPT: {str(e)}" def deanonymize_response(self, gpt_response, lang='fa'): """گام 3: بازگردانی""" try: if not gpt_response or not gpt_response.strip(): return "❌ ChatGPT response is empty!" if lang == 'en' else "❌ پاسخ ChatGPT خالی است!" if not self.mapping_table: return "❌ Mapping table is empty!" if lang == 'en' else "❌ جدول نگاشت خالی است!" final_result = gpt_response reverse_mapping = {code: original for original, code in self.mapping_table.items()} sorted_codes = sorted(reverse_mapping.items(), key=lambda x: len(x[0]), reverse=True) for code, original in sorted_codes: final_result = final_result.replace(code, original) return final_result except Exception as e: return f"❌ Deanonymization error: {str(e)}" if lang == 'en' else f"❌ خطا در بازگردانی: {str(e)}" def get_model_status(self): """وضعیت سیستم""" status = "🚀 **Enhanced Multi-Modal Anonymization System Status:**\n\n" status += f"🤖 **Model Status**: {self.model_status}\n" status += f"📝 **Regex Patterns**: ✅ 221 comprehensive patterns loaded\n" status += f"🌍 **Language Support**: Persian, English, Mixed\n" status += f"🐍 **Python Version**: {sys.version.split()[0]}\n" status += f"📦 **Transformers Available**: {'✅ Yes' if TRANSFORMERS_AVAILABLE else '❌ No'}\n\n" if self.model_ready: status += "🎯 **Available Processing Modes:**\n" status += " • 🔥 Hybrid (Recommended): Regex priority + NER enhancement\n" status += " • 🎯 NER Priority: NER priority + Regex backup\n" status += " • ⚡ Regex Only: High-speed pattern matching\n\n" status += "📈 **Expected Accuracy:**\n" status += " • Regex Only: 70-75%\n" status += " • Hybrid Mode: 85-92%\n" status += " • NER Priority: 88-95%\n\n" else: status += "⚠️ **Current Mode: Regex Only**\n" status += " • Pure Regex processing (70-75% accuracy)\n" if not TRANSFORMERS_AVAILABLE: status += " • Install transformers library for enhanced accuracy\n" status += " • pip install transformers torch\n" status += "\n" status += f"🎯 **Pattern Categories**: {len(self.pattern_categories)} categories available\n" status += f"🔧 **Configuration**: User-controlled category selection\n" status += f"🛡️ **Privacy**: Local processing with optional ChatGPT integration\n" if TRANSFORMERS_AVAILABLE: status += f"✅ **Transformers Library**: Ready for NER processing\n" else: status += f"❌ **Transformers Library**: Not available - Add to requirements.txt\n" return status # Initialize the enhanced anonymizer print("🔄 Initializing Enhanced Data Anonymizer...") anonymizer = EnhancedDataAnonymizer() print(f"✅ Anonymizer initialized with status: {anonymizer.model_status}") def process_all_steps_enhanced(input_text, language, selected_categories, processing_mode): """پردازش خودکار تمام مراحل - نسخه پیشرفته""" lang = 'en' if language == 'English' else 'fa' if not input_text.strip(): error_msg = "❌ Please enter input text!" if lang == 'en' else "❌ لطفاً متن ورودی را وارد کنید!" return error_msg, "", "", "" try: start_time = time.time() # Enhanced anonymization anonymized_text = anonymizer.anonymize_text_enhanced( input_text, lang, selected_categories, processing_mode ) if anonymized_text.startswith("❌"): return anonymized_text, "", "", "" # ChatGPT processing gpt_response = anonymizer.send_to_chatgpt(anonymized_text, lang) if gpt_response.startswith("❌"): entities_found = len(anonymizer.mapping_table) success_msg = (f"✅ Enhanced anonymization completed successfully!\n" f"🎯 Processing mode: {processing_mode}\n" f"📊 Protected entities: {entities_found}") return success_msg, anonymized_text, gpt_response, "" # Deanonymization final_result = anonymizer.deanonymize_response(gpt_response, lang) total_time = time.time() - start_time entities_found = len(anonymizer.mapping_table) model_indicator = 'XLM-RoBERTa + Regex' if anonymizer.model_ready else 'Regex Only' success_msg = (f"🎉 Complete enhanced anonymization & restoration successful!\n" f"🎯 Mode: {processing_mode} | 📊 Entities: {entities_found}\n" f"⏱️ Time: {total_time:.2f}s | 🤖 Model: {model_indicator}") return success_msg, anonymized_text, gpt_response, final_result except Exception as e: error_msg = f"❌ Processing error: {str(e)}" if lang == 'en' else f"❌ خطا در پردازش: {str(e)}" return error_msg, "", "", "" def get_mapping_table_enhanced(language): """نمایش جدول نگاشت پیشرفته""" lang = 'en' if language == 'English' else 'fa' if not anonymizer.mapping_table: return "❌ Mapping table is empty!" if lang == 'en' else "❌ جدول نگاشت خالی است!" result = "🔋 **Enhanced Mapping Table:**\n\n" result += f"📊 **Statistics**: {len(anonymizer.mapping_table)} total entities\n" result += f"🎯 **Method**: {'Hybrid Processing' if anonymizer.model_ready else 'Regex Only'}\n" result += f"🤖 **Model Status**: {anonymizer.model_status}\n\n" # Group by category category_stats = {} for original, code in anonymizer.mapping_table.items(): category = code.split('_')[0] if category not in category_stats: category_stats[category] = [] category_stats[category].append((original, code)) # Display results by category for category, items in category_stats.items(): if len(items) > 0: result += f"📁 **{category}** ({len(items)} items):\n" for original, code in items[:3]: source_indicator = "🧠" if any(x in code for x in ["HYB", "ENH"]) else "📝" result += f" {source_indicator} `{original}` → `{code}`\n" if len(items) > 3: result += f" ... و {len(items) - 3} مورد دیگر\n" result += "\n" result += f"🔥 **Enhanced System**: Advanced Regex patterns with optional NER support!" return result def clear_all_enhanced(): """پاک کردن همه - نسخه پیشرفته""" anonymizer.mapping_table = {} anonymizer.reset_counters() return "", "", "", "", "" # Enhanced CSS enhanced_css = """ body, .gradio-container { font-family: 'Segoe UI', Tahoma, Arial, sans-serif !important; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%) !important; min-height: 100vh !important; padding: 20px !important; } .enhanced-header { background: linear-gradient(45deg, #FF6B6B, #4ECDC4) !important; border-radius: 20px !important; padding: 20px !important; margin-bottom: 20px !important; text-align: center !important; box-shadow: 0 10px 30px rgba(0,0,0,0.3) !important; } .mode-selector { background: linear-gradient(135deg, #74b9ff, #0984e3) !important; border-radius: 15px !important; padding: 20px !important; margin: 15px 0 !important; box-shadow: 0 8px 25px rgba(116, 185, 255, 0.3) !important; } .model-status { background: linear-gradient(135deg, #00b894, #00a085) !important; border-radius: 15px !important; padding: 15px !important; margin: 15px 0 !important; color: white !important; font-weight: bold !important; text-align: center !important; box-shadow: 0 6px 20px rgba(0, 184, 148, 0.4) !important; } .rtl { direction: rtl !important; text-align: right !important; } .ltr { direction: ltr !important; text-align: left !important; } .workflow { display: grid !important; grid-template-columns: 1fr 1fr 1fr 1fr !important; gap: 25px !important; padding: 30px !important; align-items: start !important; background: rgba(255, 255, 255, 0.1) !important; border-radius: 20px !important; backdrop-filter: blur(10px) !important; } .gradio-textbox { border-radius: 10px !important; box-shadow: 0 4px 15px rgba(0,0,0,0.1) !important; min-height: 380px !important; max-height: 380px !important; height: 380px !important; } .gradio-button { border-radius: 25px !important; font-weight: bold !important; transition: all 0.3s ease !important; margin: 5px 0 !important; min-height: 50px !important; background: linear-gradient(45deg, #667eea, #764ba2) !important; border: none !important; color: white !important; } .gradio-button:hover { transform: translateY(-2px) !important; box-shadow: 0 8px 25px rgba(0,0,0,0.3) !important; background: linear-gradient(45deg, #764ba2, #667eea) !important; } @media (max-width: 1200px) { .workflow { grid-template-columns: 1fr 1fr !important; } } @media (max-width: 768px) { .workflow { grid-template-columns: 1fr !important; } } """ # Main Gradio Interface with gr.Blocks(title="🚀 Enhanced Multi-Modal Anonymization", theme=gr.themes.Soft(), css=enhanced_css) as app: # Header with gr.Row(): gr.HTML("""

🚀 Enhanced Multi-Modal Anonymization System

🤖 Advanced Regex + Optional NER = Maximum Accuracy

""") # Language and Mode Selection with gr.Row(): with gr.Column(scale=1): language_selector = gr.Radio( choices=["فارسی", "English"], value="فارسی", label="Language / زبان", interactive=True ) with gr.Column(scale=2, elem_classes="mode-selector"): processing_mode = gr.Radio( choices=[ ("⚡ Regex Only (Fast & Compatible)", "regex_only"), ("🎯 Hybrid Mode (Recommended)", "hybrid"), ("🔬 NER Priority (Highest Accuracy)", "ner_priority") ], value="regex_only" if not anonymizer.model_ready else "hybrid", label="🎚️ Processing Mode", info="Choose processing complexity vs accuracy trade-off" ) # Model Status Display with gr.Row(): model_status_display = gr.HTML( f'
🤖 Model Status: {anonymizer.model_status}
' ) # Category Selection with gr.Row(): with gr.Column(): pattern_categories = gr.CheckboxGroup( choices=anonymizer.get_category_choices('fa'), value=anonymizer.get_category_choices('fa'), label="🎯 انتخاب دسته‌بندی‌های الگوی ناشناس‌سازی:", interactive=True ) # Main Workflow with gr.Row(elem_classes="workflow rtl") as workflow_row: with gr.Column(): step1_title = gr.HTML('

📝 متن ورودی

') input_text = gr.Textbox( lines=15, placeholder="متن اصلی خود را اینجا وارد کنید...\n\n🚀 سیستم پیشرفته با الگوهای regex جامع\n✅ دقت بالا برای نام اشخاص، شرکت‌ها، مکان‌ها\n📱 شناسایی دقیق تلفن، ایمیل، حساب بانکی\n💰 تشخیص مبالغ مالی و درصدها\n🗓️ استخراج تاریخ‌ها و زمان‌ها", label="", rtl=True ) process_btn = gr.Button("🚀 پردازش پیشرفته", variant="primary") clear_btn = gr.Button("🗑️ پاک کردن همه", variant="stop") status = gr.Textbox( label="وضعیت پردازش", lines=4, interactive=False, rtl=True ) with gr.Column(): step2_title = gr.HTML('

🎭 متن ناشناس‌شده

') anonymized_output = gr.Textbox( lines=15, placeholder="متن ناشناس‌شده با کدهای محافظتی...", label="", interactive=False, rtl=True ) with gr.Column(): step3_title = gr.HTML('

🤖 پاسخ ChatGPT

') gpt_output = gr.Textbox( lines=15, placeholder="پاسخ ChatGPT به متن ناشناس‌شده...", label="", interactive=False, rtl=True ) with gr.Column(): step4_title = gr.HTML('

✅ پاسخ نهایی

') final_output = gr.Textbox( lines=15, placeholder="پاسخ نهایی با بازگردانی اطلاعات اصلی...", label="", interactive=False, rtl=True ) # Additional Tools with gr.Row(): with gr.Column(): mapping_btn = gr.Button("📋 نمایش جدول نگاشت پیشرفته") mapping_output = gr.Textbox( lines=15, label="جدول نگاشت اطلاعات", interactive=False, visible=False, rtl=True ) with gr.Column(): system_status_btn = gr.Button("📊 نمایش وضعیت سیستم پیشرفته") system_status_output = gr.Textbox( lines=20, label="وضعیت سیستم", interactive=False, visible=False, rtl=True ) # Event Handlers process_btn.click( fn=process_all_steps_enhanced, inputs=[input_text, language_selector, pattern_categories, processing_mode], outputs=[status, anonymized_output, gpt_output, final_output] ) clear_btn.click( fn=clear_all_enhanced, outputs=[input_text, anonymized_output, gpt_output, final_output, status] ) mapping_btn.click( fn=get_mapping_table_enhanced, inputs=[language_selector], outputs=[mapping_output] ) mapping_btn.click( fn=lambda: gr.update(visible=True), outputs=[mapping_output] ) system_status_btn.click( fn=lambda: anonymizer.get_model_status(), outputs=[system_status_output] ) system_status_btn.click( fn=lambda: gr.update(visible=True), outputs=[system_status_output] ) if __name__ == "__main__": logger.info("🚀 Starting Enhanced Multi-Modal Anonymization System...") logger.info(f"🤖 XLM-RoBERTa Status: {anonymizer.model_status}") logger.info("✅ Ready for high-accuracy bilingual processing!") app.launch( share=False, server_name="0.0.0.0", server_port=7860, show_error=True )