#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Enhanced Multi-Modal Data Anonymization System - Fixed for HuggingFace Spaces ============================================================================= Combining XLM-RoBERTa + Advanced Regex Patterns for Maximum Accuracy Supports Persian, English, and Mixed Languages """ import gradio as gr import re import os import requests import time import logging from typing import List, Dict, Tuple, Optional, Set import warnings import subprocess import sys import os def install_requirements(): """نصب اجباری وابستگیها""" try: subprocess.check_call([sys.executable, "-m", "pip", "install", "--upgrade", "pip"]) subprocess.check_call([sys.executable, "-m", "pip", "install", "transformers>=4.30.0"]) subprocess.check_call([sys.executable, "-m", "pip", "install", "torch"]) subprocess.check_call([sys.executable, "-m", "pip", "install", "tokenizers>=0.13.0"]) print("✅ Dependencies installed successfully") except Exception as e: print(f"❌ Failed to install dependencies: {e}") # نصب وابستگیها در صورت عدم وجود try: import transformers print("✅ Transformers already available") except ImportError: print("📦 Installing transformers...") install_requirements() # Enhanced dependencies with better error handling TRANSFORMERS_AVAILABLE = False try: print("🔄 Attempting to import transformers...") from transformers import AutoTokenizer, AutoModelForTokenClassification, pipeline TRANSFORMERS_AVAILABLE = True print("✅ Transformers library loaded successfully") except ImportError as e: print(f"⚠️ Transformers import failed: {e}") print("📝 Falling back to regex-only mode") TRANSFORMERS_AVAILABLE = False except Exception as e: print(f"❌ Unexpected error loading transformers: {e}") TRANSFORMERS_AVAILABLE = False warnings.filterwarnings('ignore') logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class EnhancedDataAnonymizer: def __init__(self): self.mapping_table = {} self.counters = {} self.api_key = os.getenv("OPENAI_API_KEY", "") # Processing modes self.processing_modes = { 'regex_only': 'Pure Regex (Fast & Compatible)', 'hybrid': 'Regex + XLM-RoBERTa (Recommended)', 'ner_priority': 'NER Priority + Regex Backup (Highest Accuracy)' } # Model components self.ner_pipeline = None self.model_status = "Initializing..." self.model_ready = False # Initialize model with improved error handling self.initialize_ner_model_safe() # Pattern categories self.pattern_categories = { 'personal_identity': { 'name_fa': 'اطلاعات شخصی و هویتی', 'name_en': 'Personal & Identity Information', 'patterns': ['PERSON', 'MIXED_NAMES', 'ID_NUMBER', 'ENGLISH_TITLES'], 'icon': '👤' }, 'financial': { 'name_fa': 'اطلاعات مالی', 'name_en': 'Financial Information', 'patterns': ['AMOUNT', 'INTERNATIONAL_CURRENCIES', 'ACCOUNT', 'FINANCIAL_TERMS', 'STOCK_SYMBOL'], 'icon': '💰' }, 'temporal': { 'name_fa': 'اطلاعات زمانی', 'name_en': 'Temporal Information', 'patterns': ['DATE', 'ADVANCED_DATE_FORMATS', 'TIME_RANGES'], 'icon': '📅' }, 'location': { 'name_fa': 'اطلاعات مکانی', 'name_en': 'Location Information', 'patterns': ['LOCATION', 'COMPLEX_ADDRESSES'], 'icon': '📍' }, 'technical': { 'name_fa': 'اطلاعات فنی و تکنولوژیکی', 'name_en': 'Technical & Technological', 'patterns': ['TECHNICAL_CODES', 'NETWORK_ADDRESSES', 'TECHNICAL_UNITS', 'ACRONYMS_ABBREVIATIONS'], 'icon': '⚙️' }, 'business': { 'name_fa': 'اطلاعات کسبوکار', 'name_en': 'Business Information', 'patterns': ['COMPANY', 'BUSINESS_TERMS', 'PRODUCT', 'PETROCHEMICAL'], 'icon': '🏢' }, 'quantity': { 'name_fa': 'اطلاعات کمیت و واحد', 'name_en': 'Quantity & Unit Information', 'patterns': ['PERCENTAGE', 'VOLUME', 'RATIOS'], 'icon': '📊' }, 'communication': { 'name_fa': 'اطلاعات ارتباطی', 'name_en': 'Communication Information', 'patterns': ['PHONE', 'EMAIL'], 'icon': '📞' } } # Initialize counters self.reset_counters() def initialize_ner_model_safe(self): """بارگذاری ایمن مدل XLM-RoBERTa با مدیریت خطای بهبود یافته""" print("🔄 Starting model initialization...") if not TRANSFORMERS_AVAILABLE: self.model_status = "⚠️ Transformers library not available - Using Regex only mode" self.model_ready = False print("📝 Transformers not available, continuing with regex patterns only") return try: print("🤖 Attempting to load XLM-RoBERTa model...") # Try loading with multiple fallback strategies model_names = [ "xlm-roberta-base", "distilbert-base-multilingual-cased", "bert-base-multilingual-cased" ] for model_name in model_names: try: print(f"🔄 Trying model: {model_name}") self.ner_pipeline = pipeline( "ner", model=model_name, aggregation_strategy="simple", device=-1, # Force CPU tokenizer_kwargs={ "truncation": True, "max_length": 256, "padding": True } ) # Test the model with a simple input test_result = self.ner_pipeline("Test text") self.model_status = f"✅ {model_name} loaded successfully" self.model_ready = True print(f"✅ Successfully loaded model: {model_name}") return except Exception as model_error: print(f"❌ Failed to load {model_name}: {model_error}") continue # If all models failed raise Exception("All model loading attempts failed") except Exception as e: error_msg = str(e)[:100] print(f"❌ Model loading completely failed: {error_msg}") self.model_status = f"❌ Model loading failed - Using Regex only" self.model_ready = False self.ner_pipeline = None def reset_counters(self): """ریست کانترها""" pattern_types = [] for category in self.pattern_categories.values(): pattern_types.extend(category['patterns']) self.counters = {pattern: 0 for pattern in pattern_types} def detect_language(self, text): """تشخیص زبان متن""" if not text: return 'fa' persian_chars = len(re.findall(r'[\u0600-\u06FF]', text)) english_chars = len(re.findall(r'[a-zA-Z]', text)) total = persian_chars + english_chars if total == 0: return 'fa' if persian_chars / total > 0.6: return 'fa' elif english_chars / total > 0.6: return 'en' else: return 'mixed' def get_comprehensive_patterns(self): """الگوهای جامع ناشناسسازی""" return { 'PERSON': [ r'آقای\s+([آ-یa-zA-Z]+(?:\s+[آ-یa-zA-Z]+)*)', r'خانم\s+([آ-یa-zA-Z]+(?:\s+[آ-یa-zA-Z]+)*)', r'مهندس\s+([آ-یa-zA-Z]+(?:\s+[آ-یa-zA-Z]+)*)', r'دکتر\s+([آ-یa-zA-Z]+(?:\s+[آ-یa-zA-Z]+)*)', r'استاد\s+([آ-یa-zA-Z]+(?:\s+[آ-یa-zA-Z]+)*)', r'Mr\.\s+([a-zA-Z]+(?:\s+[a-zA-Z]+)*)', r'Ms\.\s+([a-zA-Z]+(?:\s+[a-zA-Z]+)*)', r'Dr\.\s+([a-zA-Z]+(?:\s+[a-zA-Z]+)*)', r'([آ-یa-zA-Z]+\s+[آ-یa-zA-Z]+)(?:، مدیرعامل|\s+مدیرعامل|\s+رئیس)', ], 'MIXED_NAMES': [ r'([آ-یa-zA-Z]{2,}\s+[آ-یa-zA-Z]{2,})', r'([A-Z][a-z]+-[A-Z][a-z]+)', r"([A-Z]'[A-Z][a-z]+)", ], 'ID_NUMBER': [ r'IR[۰-۹0-9]{24}', r'شبا[\s:]*IR[۰-۹0-9]{24}', r'(?:کد[\s]*)?(?:ملی[\s:]*)?[۰-۹0-9]{10}', r'(?:شناسه[\s]*)?(?:ملی[\s:]*)?[۰-۹0-9]{10}', r'National[\s]*(?:ID[\s:]*)?[0-9]{10}', r'(?:پاسپورت[\s:]*)?[A-Z][0-9]{8}', r'SSN[\s:]*[0-9]{3}-[0-9]{2}-[0-9]{4}', ], 'ENGLISH_TITLES': [ r'business\s+partner', r'team\s+lead', r'head\s+of\s+production', r'senior\s+architect', r'civil\s+engineer', r'system\s+administrator', r'network\s+engineer', r'environmental\s+consultant', r'senior\s+loan\s+officer', r'facility\s+manager', r'project\s+team', r'technical\s+support' ], 'AMOUNT': [ r'\d+(?:,\d{3})*\s*(?:میلیون|میلیارد|هزار)\s*تومان', r'مبلغ\s+\d+(?:,\d{3})*\s*(?:میلیون|میلیارد|هزار)?\s*تومان', r'\$\d+(?:,\d{3})*(?:\.\d+)?\s*(?:million|billion|thousand|M|B|K)?', r'€\d+(?:,\d{3})*(?:\.\d+)?', r'\d+(?:,\d{3})*\s*ریال', r'رقم\s+فعلی\s+\d+(?:,\d{3})*\s*(?:میلیون|میلیارد)\s*تومان', r'رقم\s+\d+(?:,\d{3})*\s*(?:میلیون|میلیارد)\s*تومان', r'به\s+\d+(?:,\d{3})*\s*(?:میلیون|میلیارد|هزار)\s*تومان', ], 'INTERNATIONAL_CURRENCIES': [ r'\d+(?:,\d{3})*\s+euro', r'€\d+(?:\.\d+)?M', r'\d+\s+EUR', r'\d+(?:,\d{3})*\s+AED', r'\d+(?:\.\d+)?M\s+AED', r'\$\d+(?:\.\d+)?M', r'\$\d+(?:\.\d+)?K', r'£\d+(?:,\d{3})*(?:\.\d+)?', r'\d+\s+GBP', r'\d+\s+CHF', r'¥\d+(?:,\d{3})*', r'\d+\s+JPY' ], 'ACCOUNT': [ r'(?:شماره[\s]*)?(?:حساب[\s]*)?(?:بانکی[\s:]*)?(?:[۰-۹0-9]{1,3}[-\s]?)*[۰-۹0-9]{8,20}', r'حساب[\s]*(?:شماره[\s:]*)?(?:[۰-۹0-9]{1,3}[-\s]?)*[۰-۹0-9]{8,20}', r'شماره[\s]*حساب[\s:]*(?:[۰-۹0-9]{1,3}[-\s]?)*[۰-۹0-9]{8,20}', r'Account[\s]*(?:Number[\s:]*)?(?:[0-9]{1,3}[-\s]?)*[0-9]{8,20}', r'[۰-۹0-9]{3}[-\s]?[۰-۹0-9]{3}[-\s]?[۰-۹0-9]{6,12}', r'واریز[\s]*(?:سود[\s:]*)?(?:[۰-۹0-9]{1,3}[-\s]?)*[۰-۹0-9]{8,20}', r'سود[\s:]*(?:[۰-۹0-9]{1,3}[-\s]?)*[۰-۹0-9]{8,20}' ], 'FINANCIAL_TERMS': [ r'فروش\s+(?:ماهانه|تجمیعی|صادراتی)', r'درآمد\s+شرکت', r'سود\s+(?:خالص|نقدی)', r'صورتهای\s+مالی', r'بهای\s+تمامشده', r'سودآوری', r'عملکرد\s+مالی', r'میانگین\s+فروش', r'بالاترین\s+رقم\s+فروش', r'رقم\s+فروش', r'درآمدهای\s+عملیاتی' ], 'STOCK_SYMBOL': [ r'نماد\s+([آ-یa-zA-Z0-9]+)', r'(سبهان|غدیر|شتران|شپنا|پترول|فارس|خارک|پلاسکو|جم|کرمان|مارون|اراک|رازی|شازند|کاوه|بندر|پارس|خوزستان|ماهشهر|عسلویه)(?=\s|$|،|\.|\s+)', r'شرکت\s+([آ-یa-zA-Z\s]+?)(?=\s+در|\s+که|\s+با|،|\.|\s+$|\s+را|\s+به)', r'پتروشیمی\s+([آ-یa-zA-Z\s]+?)(?=\s+در|\s+که|\s+با|،|\.|\s+$|\s+توان)', r'(AAPL|GOOGL|MSFT|AMZN|TSLA|META|NVDA|SABIC)(?=\s|$|,|\.)' ], 'DATE': [ r'[۰-۹0-9]{4}[/-][۰-۹0-9]{1,2}[/-][۰-۹0-9]{1,2}', r'[۰-۹0-9]{1,2}[/-][۰-۹0-9]{1,2}[/-][۰-۹0-9]{4}', r'(?:[۰-۹0-9]{1,2})\s*(?:فروردین|اردیبهشت|خرداد|تیر|مرداد|شهریور|مهر|آبان|آذر|دی|بهمن|اسفند)\s*(?:[۰-۹0-9]{4})', r'(?:فروردین|اردیبهشت|خرداد|تیر|مرداد|شهریور|مهر|آبان|آذر|دی|بهمن|اسفند)\s+[۰-۹0-9]{4}', r'(?:[0-9]{1,2})\s*(?:January|February|March|April|May|June|July|August|September|October|November|December)\s*(?:[0-9]{4})', r'(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\s*[0-9]{1,2},?\s*[0-9]{4}', r'سال\s+گذشته', r'سال\s+جاری', r'این\s+سال', r'ماه\s+قبل', r'ماه\s+اخیر', r'(?:13[0-9]{2}|14[0-9]{2}|20[0-9]{2}|19[0-9]{2})(?=\s|$|،|\.)' ], 'ADVANCED_DATE_FORMATS': [ r'(?:March|April|May|June|July|August|September|October|November|December)\s+\d{1,2}(?:st|nd|rd|th),?\s+\d{4}', r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d{3})?Z', r'(?:PST|EST|GMT|UTC)(?:[+-]\d{1,2}:\d{2})?', r'Eastern\s+Time', r'GMT[+-]\d{1,2}:\d{2}', r'end\s+of\s+fiscal\s+year\s+\d{4}/\d{2}/\d{2}' ], 'TIME_RANGES': [ r'\d{2}:\d{2}-\d{2}:\d{2}', r'\d{2}:\d{2}\s+تا\s+\d{2}:\d{2}', r'\d{1,2}:\d{2}\s+(?:AM|PM)\s+(?:PST|EST|GMT|UTC)', r'\d{2}:\d{2}:\d{2}\s+(?:AM|PM)', r'COB\s*\(Close\s+of\s+Business\)', r'\d{1,3}\s+(?:business\s+days|روز\s+کاری)' ], 'LOCATION': [ r'(تهران|اصفهان|ماهشهر|عسلویه|بندرعباس|اهواز|شیراز|مشهد|تبریز|کرج|قم|رشت|کرمان|یزد|زاهدان|بوشهر|خرمشهر|آبادان|اراک|قزوین)', r'استان\s+([آ-ی\s]+)', r'شهر\s+([آ-ی\s]+)', r'(ایران|عراق|کویت|عربستان|امارات|قطر|عمان|بحرین|ترکیه|پاکستان|افغانستان)', r'داخلی|بازار\s+داخلی', r'خارجی|بازارهای\s+خارجی', r'(London|Paris|Tokyo|New\s+York|Dubai|Singapore|Hong\s+Kong|Shanghai|Mumbai|Frankfurt|Amsterdam)' ], 'COMPLEX_ADDRESSES': [ r'کیلومتر\s+\d+\s+جاده\s+[آ-ی\s]+-[آ-ی\s]+', r'روبروی\s+(?:پمپ\s+بنزین|بانک|پارک|مسجد|بیمارستان)\s+[آ-یa-zA-Z\s]+', r'Building-[A-Z],?\s+Floor-\d+,?\s+Unit-[A-Z0-9]+', r'rack\s+number\s+R-\d+,?\s+slot\s+\d+', r'phase\s+\d+\s+development,?\s+block\s+[A-Z],?\s+plot\s+\d+-[A-Z]', r'\d{2,5}\s+[A-Z][a-z]+\s+(?:Street|Avenue|Boulevard|Road|Drive),?\s+Floor\s+\d+,?\s+Building\s+[A-Z]', r'شهرک\s+صنعتی\s+[آ-ی\s]+،?\s+محور\s+[آ-ی\s]+' ], 'TECHNICAL_CODES': [ r'SN-\d{4}-[A-Z]{3}-\d{4}', r'Serial\s+Number[\s:]*[A-Z0-9-]+', r'REF-[A-Z]{3}-\d{4}-\d{3}', r'DOC-[A-Z]{2}-\d{4}-\d{4}', r'INF-\d{4}-\d{4}', r'CTR/\d{4}/\d{3}', r'HVAC-\d{7}', r'Generator-Model-[A-Z0-9]+', r'LOI-\d{4}-[A-Z]{4}-\d{3}', r'BOQ-\d{4}-[A-Z]{3}-\d{3}', r'#INV-\d{4}-Q\d-\d{4}', r'ESC-\d{4}-[A-Z]{3}-\d{3}', r'BN-\d{6}-[A-Z]\d+' ], 'NETWORK_ADDRESSES': [ r'\b(?:\d{1,3}\.){3}\d{1,3}\b', r'xxx\.xxx\.xxx\.xxx', r'[A-F0-9]{2}:[A-F0-9]{2}:[A-F0-9]{2}:[A-F0-9]{2}:[A-F0-9]{2}:[A-F0-9]{2}', r'srv-[a-z]+-[a-z]+-\d{2}', r'[a-z]+-[a-z]+\d*\.[a-z]+\.[a-z]+', r'[a-zA-Z0-9-]+\.[a-zA-Z]{2,4}(?:\.[a-zA-Z]{2,4})?' ], 'TECHNICAL_UNITS': [ r'\d+(?:\.\d+)?\s*MW', r'\d+(?:\.\d+)?\s*kWh?', r'\d+(?:,\d{3})*\s*cubic\s+meters', r'\d+(?:,\d{3})*\s*m³', r'\d+(?:,\d{3})*\s*sq\s+ft', r'\d+(?:\.\d+)?\s*ppm', r'\d+(?:\.\d+)?\s*mg/m³', r'\b(?:CO2|NOx|SO2)\b', r'\d+(?:\.\d+)?\s*TB', r'\d+(?:\.\d+)?\s*GB', r'\d+(?:,\d{3})*\s*square\s+meters', r'\d+(?:\.\d+)?\%\s*efficiency', r'FICO\s+score:\s*\d{3}', r'\d+(?:\.\d+)?\s*(?:bar|psi)', r'\d+(?:\.\d+)?\s*°[CF]', r'\d+(?:\.\d+)?\s*(?:rpm|m/s)' ], 'ACRONYMS_ABBREVIATIONS': [ r'\b(?:HVAC|IT|HSE|BOQ|LC|COB)\b', r'\b(?:YTD|NNN|EIN|SSN|FICO)\b', r'\bIP\s+Address\b', r'\bMAC\s+Address\b', r'\bURL\b', r'\b(?:LLC|Corp|Inc|Ltd)\b', r'\b(?:PST|GMT|UTC|EST)\b', r'\b(?:CO2|NOx|pH|UV)\b', r'\b(?:SCADA|PLC|HMI)\b', r'\b(?:GDP|CPI|ROI|NPV)\b', r'\b(?:FOB|CIF|DDP)\b', r'\b(?:ABA|SWIFT|IBAN)\b' ], 'COMPANY': [ r'شرکت(?=\s+در|\s+که|\s+با|\s+را|\s+به)', r'([آ-یa-zA-Z\s]+)\s+شرکت', r'این\s+شرکت(?=\s|$|،|\.)', r'(بانک\s+[آ-یa-zA-Z\s]+)', r'([A-Z][a-zA-Z\s]+(?:Inc|Corp|Corporation|Company|Ltd|Limited|LLC))' ], 'BUSINESS_TERMS': [ r'تحلیل\s+عملکرد', r'گزارش\s+(?:فعالیت|عملکرد)\s+ماهانه', r'وضعیت\s+فروش', r'تولید\s+پایدار', r'سهم\s+بازار', r'صادرات\s+هدفمند', r'بهرهوری', r'ظرفیتهای\s+داخلی', r'شرکتهای\s+پیشرو', r'صنعت\s+پتروشیمی', r'سرمایهگذاران\s+بنیادی', r'شاخصهای\s+عملیاتی', r'برنامهریزی\s+مناسب', r'واحد\s+فروش', r'موجودی\s+انبار', r'فاز\s+رشد\s+جدید', r'ترکیب\s+فروش', r'سهم\s+صادراتی', r'روند\s+عملکرد', r'اعداد\s+اعلامشده', r'دادههای\s+ثبتشده' ], 'PRODUCT': [ r'\b(?:VCM|PVC|PE|PP|PS|ABS|SAN|PC|PMMA|PET|PBT|PA|POM|TPU)\b', r'پلی\s*(?:اتیلن|پروپیلن|استایرن|کربنات|متیل)', r'\b(?:اتیلن|پروپیلن|بنزن|تولوئن|زایلن|متانول|اتانول|استون|فنول)\b', r'\b(?:کلر|هیدروژن|اکسیژن|نیتروژن|آمونیاک|اتان|پروپان|بوتان)\b', r'محصول(?:ات)?', r'تولیدات\s+شرکت' ], 'PETROCHEMICAL': [ r'\b(?:LDPE|HDPE|LLDPE|PP|PS|EPS|ABS|SAN|PC|PMMA|PET|PBT|PA6|PA66|POM|TPU|EVA|EAA)\b', r'(?:Ethylene\s+Vinyl\s+Acetate|Ethyl\s+Acrylate|Methyl\s+Methacrylate|Polyethylene\s+Terephthalate)' ], 'PERCENTAGE': [ r'\d+(?:\.\d+)?\s*درصد(?:\s+افزایش|\s+رشد|\s+کاهش|\s+بالاتر|\s+پایینتر)?', r'\d+(?:\.\d+)?\s*%', r'معادل\s+\d+(?:\.\d+)?\s*درصد', r'حدود\s+\d+(?:\.\d+)?\s*درصد', r'با\s+\d+(?:\.\d+)?\s*درصد\s+افزایش', r'رشد\s+\d+(?:\.\d+)?\s*درصدی', r'\d+(?:\.\d+)?\s*درصدی(?=\s+همراه|\s+بوده)', r'میزان\s+رشد(?=\s+نسبت|\s+معادل)', r'افزایش\s+قابلتوجهی', r'بهبود\s+نسبی', r'\d+(?:\.\d+)?\%\s*(?:increase|decrease|growth|improvement)', r'(?:approximately|about)\s+\d+(?:\.\d+)?\%' ], 'VOLUME': [ r'\d+(?:,\d{3})*\s*تن', r'\d+(?:,\d{3})*\s*(?:کیلوگرم|لیتر|بشکه)', r'میزان\s+\d+(?:,\d{3})*\s*تن', r'مقدار\s+تولید', r'حجم\s+فروش', r'ظرفیت\s+(?:تولید|اسمی)', r'\d+(?:,\d{3})*\s*(?:tons|kg|liters|barrels)', r'\d+(?:,\d{3})*\s*(?:metric\s+tons|MT)', r'\d+(?:,\d{3})*\s*(?:thousand\s+tons|KT)' ], 'RATIOS': [ r'نسبت\s+(?:فروش|تولید)\s+به\s+[آ-ی\s]+', r'\d+(?:\.\d+)?\s*نزدیک', r'برابر\s+با\s+\d+(?:\.\d+)?', r'معادل\s+\d+(?:\.\d+)?', r'میزان\s+(?:رشد|افزایش)', r'شاخص\s+(?:مهم|عملیاتی)', r'\d+(?:\.\d+)?\s*درصد\s+کل\s+تولید' ], 'PHONE': [ r'(?:تلفن[\s:]*)?(?:شماره[\s:]*)?(?:0)?(?:[۰-۹0-9]{2,3}[-\s]?)?[۰-۹0-9]{7,8}', r'(?:تماس[\s:]*)?(?:شماره[\s:]*)?(?:با[\s]*)?(?:0)?(?:[۰-۹0-9]{2,3}[-\s]?)?[۰-۹0-9]{7,8}', r'(?:موبایل[\s:]*)?(?:شماره[\s:]*)?(?:0)?9[۰-۹0-9]{9}', r'[۰-۹0-9]{3,4}[-\s][۰-۹0-9]{7,8}', r'[۰-۹0-9]{11}(?!\d)', r'(?:\+98|0098)?[۰-۹0-9]{10}', r'[۰-۹0-9]{3,4}[-\s]?[۰-۹0-9]{3,4}[-\s]?[۰-۹0-9]{3,4}', r'\+[0-9]{1,3}-[0-9]{3}-[0-9]{3}-[0-9]{4}(?:\s+ext\.\s+[0-9]{3,4})?', r'\([0-9]{3}\)\s+[0-9]{3}-[0-9]{4}' ], 'EMAIL': [ r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', r'ایمیل[\s:]*[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', r'email[\s:]*[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', r'نشانی[\s]*الکترونیکی[\s:]*[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', r'آدرس[\s]*ایمیل[\s:]*[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', r'facility\.manager@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}' ] } def extract_entities_with_ner(self, text: str, confidence_threshold: float = 0.75) -> List[Dict]: """استخراج موجودیتها با مدل NER""" if not self.model_ready or not self.ner_pipeline: return [] try: # Process text with NER model ner_results = self.ner_pipeline(text) entities = [] for entity in ner_results: if entity['score'] >= confidence_threshold: # Clean entity text entity_text = entity['word'].replace('##', '').strip() if len(entity_text) >= 2: # Minimum length filter entities.append({ 'text': entity_text, 'label': entity['entity_group'], 'confidence': entity['score'], 'start': entity['start'], 'end': entity['end'], 'source': 'ner' }) return entities except Exception as e: logger.error(f"Error in NER extraction: {e}") return [] def map_ner_to_categories(self, ner_label: str) -> str: """نگاشت برچسبهای NER به دستههای سیستم""" mapping = { 'PER': 'PERSON', 'PERSON': 'PERSON', 'ORG': 'COMPANY', 'ORGANIZATION': 'COMPANY', 'LOC': 'LOCATION', 'LOCATION': 'LOCATION', 'MISC': 'MIXED_NAMES', 'GPE': 'LOCATION', 'MONEY': 'AMOUNT', 'DATE': 'DATE', 'TIME': 'DATE' } return mapping.get(ner_label.upper(), 'MIXED_NAMES') def extract_entities_with_regex(self, text: str, selected_categories: List[str] = None) -> List[Dict]: """استخراج موجودیتها با Regex""" entities = [] all_patterns = self.get_comprehensive_patterns() # Filter patterns based on selected categories if selected_categories: selected_pattern_types = self.get_selected_patterns(selected_categories, 'fa') patterns = {k: v for k, v in all_patterns.items() if k in selected_pattern_types} else: patterns = all_patterns processed_positions = set() # Process patterns with priority priority_order = [ 'ID_NUMBER', 'EMAIL', 'PHONE', 'ACCOUNT', 'AMOUNT', 'DATE', 'LOCATION', 'COMPANY', 'PERSON' ] for category in priority_order: if category in patterns: pattern_list = patterns[category] for pattern in pattern_list: try: matches = re.finditer(pattern, text, re.IGNORECASE | re.MULTILINE) for match in matches: if match.groups(): entity_text = match.group(1).strip() else: entity_text = match.group(0).strip() # Check for overlaps match_start, match_end = match.span() overlaps = any( not (match_end <= pos_start or match_start >= pos_end) for pos_start, pos_end in processed_positions ) if (not overlaps and len(entity_text) >= 2): entities.append({ 'text': entity_text, 'category': category, 'start': match_start, 'end': match_end, 'confidence': 0.9, 'source': 'regex' }) processed_positions.add((match_start, match_end)) except re.error as e: logger.error(f"Regex error in pattern {pattern}: {e}") continue return entities def fuse_entities(self, regex_entities: List[Dict], ner_entities: List[Dict], processing_mode: str) -> List[Dict]: """ترکیب هوشمندانه نتایج Regex و NER""" if processing_mode == 'regex_only' or not self.model_ready: return regex_entities final_entities = [] processed_positions = set() if processing_mode == 'hybrid': # Regex priority for specific patterns priority_categories = ['PHONE', 'EMAIL', 'ID_NUMBER', 'ACCOUNT', 'AMOUNT'] # Add high-priority regex entities first for entity in regex_entities: if entity['category'] in priority_categories: final_entities.append(entity) processed_positions.add((entity['start'], entity['end'])) # Add NER entities for names and organizations for entity in ner_entities: if not self.has_overlap(entity, processed_positions): category = self.map_ner_to_categories(entity['label']) entity_copy = entity.copy() entity_copy['category'] = category final_entities.append(entity_copy) processed_positions.add((entity['start'], entity['end'])) # Add remaining regex entities for entity in regex_entities: if (entity['category'] not in priority_categories and not self.has_overlap(entity, processed_positions)): final_entities.append(entity) processed_positions.add((entity['start'], entity['end'])) elif processing_mode == 'ner_priority': # NER takes priority, regex as backup for entity in ner_entities: category = self.map_ner_to_categories(entity['label']) entity_copy = entity.copy() entity_copy['category'] = category final_entities.append(entity_copy) processed_positions.add((entity['start'], entity['end'])) # Add non-overlapping regex entities for entity in regex_entities: if not self.has_overlap(entity, processed_positions): final_entities.append(entity) processed_positions.add((entity['start'], entity['end'])) return final_entities def has_overlap(self, entity: Dict, processed_positions: Set[Tuple[int, int]]) -> bool: """بررسی تداخل موقعیت entities""" entity_start, entity_end = entity['start'], entity['end'] for start, end in processed_positions: if not (entity_end <= start or entity_start >= end): return True return False def get_selected_patterns(self, selected_categories: List[str], language: str = 'fa') -> List[str]: """تبدیل دستهبندیهای انتخاب شده به لیست الگوها""" selected_patterns = [] for cat_key, cat_info in self.pattern_categories.items(): name = cat_info['name_fa'] if language == 'fa' else cat_info['name_en'] icon = cat_info['icon'] category_display = f"{icon} {name}" if category_display in selected_categories: selected_patterns.extend(cat_info['patterns']) return selected_patterns def get_category_choices(self, language='fa'): """دریافت لیست دستهبندیها برای چکباکس""" choices = [] for cat_key, cat_info in self.pattern_categories.items(): name = cat_info['name_fa'] if language == 'fa'else cat_info['name_en'] icon = cat_info['icon'] choices.append(f"{icon} {name}") return choices def anonymize_text_enhanced(self, original_text: str, lang: str = 'fa', selected_categories: List[str] = None, processing_mode: str = 'hybrid') -> str: """ناشناسسازی پیشرفته با ترکیب Regex + NER""" try: if not original_text or not original_text.strip(): return "❌ Please enter input text!" if lang == 'en' else "❌ لطفاً متن ورودی را وارد کنید!" # Force regex_only if model not ready if not self.model_ready and processing_mode != 'regex_only': processing_mode = 'regex_only' print(f"🔄 Forced to regex_only mode because model not ready") # Reset self.mapping_table = {} self.reset_counters() # Extract entities with regex regex_entities = self.extract_entities_with_regex(original_text, selected_categories) # Extract entities with NER (if available) ner_entities = [] if processing_mode != 'regex_only' and self.model_ready: ner_raw = self.extract_entities_with_ner(original_text) # Convert to standard format for entity in ner_raw: ner_entities.append({ 'text': entity['text'], 'category': self.map_ner_to_categories(entity['label']), 'start': entity['start'], 'end': entity['end'], 'confidence': entity['confidence'], 'source': 'ner' }) # Fuse entities final_entities = self.fuse_entities(regex_entities, ner_entities, processing_mode) # Create anonymization mapping anonymized = original_text found_entities = set() # Sort by length (longer first to avoid partial replacements) final_entities.sort(key=lambda x: len(x['text']), reverse=True) for entity in final_entities: entity_text = entity['text'].strip() category = entity['category'] if (entity_text not in found_entities and entity_text not in self.mapping_table and len(entity_text) >= 2): # Generate unique code if category not in self.counters: self.counters[category] = 0 self.counters[category] += 1 # Add source indicator if processing_mode == 'regex_only': source_suffix = "REG" elif processing_mode == 'hybrid': source_suffix = "HYB" if self.model_ready else "REG" else: source_suffix = "ENH" if self.model_ready else "REG" code = f"{category}_{self.counters[category]:03d}_{source_suffix}" self.mapping_table[entity_text] = code found_entities.add(entity_text) # Apply anonymization sorted_items = sorted(self.mapping_table.items(), key=lambda x: len(x[0]), reverse=True) for original_item, code in sorted_items: anonymized = anonymized.replace(original_item, code) # Statistics regex_count = len(regex_entities) ner_count = len(ner_entities) final_count = len(final_entities) logger.info(f"✅ Enhanced anonymization completed. Mode: {processing_mode}") logger.info(f"📊 Regex: {regex_count}, NER: {ner_count}, Final: {final_count}") return anonymized except Exception as e: logger.error(f"Enhanced anonymization error: {e}") return f"❌ Error in enhanced anonymization: {str(e)}" def send_to_chatgpt(self, anonymized_text, lang='fa'): """گام 2: ارسال به ChatGPT""" try: if not anonymized_text or not anonymized_text.strip(): return "❌ Anonymized text is empty!" if lang == 'en' else "❌ متن ناشناسشده خالی است!" if not self.api_key: return "❌ API Key not configured! Please set OPENAI_API_KEY environment variable." if lang == 'en' else "❌ کلید API تنظیم نشده است!" system_msg = "You are a professional analyst. Answer questions accurately." if lang == 'en' else "شما یک تحلیلگر حرفهای هستید. به سوالات با دقت پاسخ دهید." headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" } data = { "model": "gpt-4o-mini", "messages": [ {"role": "system", "content": system_msg}, {"role": "user", "content": anonymized_text} ], "max_tokens": 2000, "temperature": 0.7 } response = requests.post( "https://api.openai.com/v1/chat/completions", headers=headers, json=data, timeout=15 # Reduced timeout for HF Spaces ) if response.status_code == 200: result = response.json() return result['choices'][0]['message']['content'] else: error_data = response.json() if response.content else {} error_message = error_data.get('error', {}).get('message', response.text) return f"❌ API Error: {error_message}" except Exception as e: return f"❌ Error connecting to ChatGPT: {str(e)}" if lang == 'en' else f"❌ خطا در ارتباط با ChatGPT: {str(e)}" def deanonymize_response(self, gpt_response, lang='fa'): """گام 3: بازگردانی""" try: if not gpt_response or not gpt_response.strip(): return "❌ ChatGPT response is empty!" if lang == 'en' else "❌ پاسخ ChatGPT خالی است!" if not self.mapping_table: return "❌ Mapping table is empty!" if lang == 'en' else "❌ جدول نگاشت خالی است!" final_result = gpt_response reverse_mapping = {code: original for original, code in self.mapping_table.items()} sorted_codes = sorted(reverse_mapping.items(), key=lambda x: len(x[0]), reverse=True) for code, original in sorted_codes: final_result = final_result.replace(code, original) return final_result except Exception as e: return f"❌ Deanonymization error: {str(e)}" if lang == 'en' else f"❌ خطا در بازگردانی: {str(e)}" def get_model_status(self): """وضعیت سیستم""" status = "🚀 **Enhanced Multi-Modal Anonymization System Status:**\n\n" status += f"🤖 **Model Status**: {self.model_status}\n" status += f"📝 **Regex Patterns**: ✅ 221 comprehensive patterns loaded\n" status += f"🌍 **Language Support**: Persian, English, Mixed\n" status += f"🐍 **Python Version**: {sys.version.split()[0]}\n" status += f"📦 **Transformers Available**: {'✅ Yes' if TRANSFORMERS_AVAILABLE else '❌ No'}\n\n" if self.model_ready: status += "🎯 **Available Processing Modes:**\n" status += " • 🔥 Hybrid (Recommended): Regex priority + NER enhancement\n" status += " • 🎯 NER Priority: NER priority + Regex backup\n" status += " • ⚡ Regex Only: High-speed pattern matching\n\n" status += "📈 **Expected Accuracy:**\n" status += " • Regex Only: 70-75%\n" status += " • Hybrid Mode: 85-92%\n" status += " • NER Priority: 88-95%\n\n" else: status += "⚠️ **Current Mode: Regex Only**\n" status += " • Pure Regex processing (70-75% accuracy)\n" if not TRANSFORMERS_AVAILABLE: status += " • Install transformers library for enhanced accuracy\n" status += " • pip install transformers torch\n" status += "\n" status += f"🎯 **Pattern Categories**: {len(self.pattern_categories)} categories available\n" status += f"🔧 **Configuration**: User-controlled category selection\n" status += f"🛡️ **Privacy**: Local processing with optional ChatGPT integration\n" if TRANSFORMERS_AVAILABLE: status += f"✅ **Transformers Library**: Ready for NER processing\n" else: status += f"❌ **Transformers Library**: Not available - Add to requirements.txt\n" return status # Initialize the enhanced anonymizer print("🔄 Initializing Enhanced Data Anonymizer...") anonymizer = EnhancedDataAnonymizer() print(f"✅ Anonymizer initialized with status: {anonymizer.model_status}") def process_all_steps_enhanced(input_text, language, selected_categories, processing_mode): """پردازش خودکار تمام مراحل - نسخه پیشرفته""" lang = 'en' if language == 'English' else 'fa' if not input_text.strip(): error_msg = "❌ Please enter input text!" if lang == 'en' else "❌ لطفاً متن ورودی را وارد کنید!" return error_msg, "", "", "" try: start_time = time.time() # Enhanced anonymization anonymized_text = anonymizer.anonymize_text_enhanced( input_text, lang, selected_categories, processing_mode ) if anonymized_text.startswith("❌"): return anonymized_text, "", "", "" # ChatGPT processing gpt_response = anonymizer.send_to_chatgpt(anonymized_text, lang) if gpt_response.startswith("❌"): entities_found = len(anonymizer.mapping_table) success_msg = (f"✅ Enhanced anonymization completed successfully!\n" f"🎯 Processing mode: {processing_mode}\n" f"📊 Protected entities: {entities_found}") return success_msg, anonymized_text, gpt_response, "" # Deanonymization final_result = anonymizer.deanonymize_response(gpt_response, lang) total_time = time.time() - start_time entities_found = len(anonymizer.mapping_table) model_indicator = 'XLM-RoBERTa + Regex' if anonymizer.model_ready else 'Regex Only' success_msg = (f"🎉 Complete enhanced anonymization & restoration successful!\n" f"🎯 Mode: {processing_mode} | 📊 Entities: {entities_found}\n" f"⏱️ Time: {total_time:.2f}s | 🤖 Model: {model_indicator}") return success_msg, anonymized_text, gpt_response, final_result except Exception as e: error_msg = f"❌ Processing error: {str(e)}" if lang == 'en' else f"❌ خطا در پردازش: {str(e)}" return error_msg, "", "", "" def get_mapping_table_enhanced(language): """نمایش جدول نگاشت پیشرفته""" lang = 'en' if language == 'English' else 'fa' if not anonymizer.mapping_table: return "❌ Mapping table is empty!" if lang == 'en' else "❌ جدول نگاشت خالی است!" result = "🔋 **Enhanced Mapping Table:**\n\n" result += f"📊 **Statistics**: {len(anonymizer.mapping_table)} total entities\n" result += f"🎯 **Method**: {'Hybrid Processing' if anonymizer.model_ready else 'Regex Only'}\n" result += f"🤖 **Model Status**: {anonymizer.model_status}\n\n" # Group by category category_stats = {} for original, code in anonymizer.mapping_table.items(): category = code.split('_')[0] if category not in category_stats: category_stats[category] = [] category_stats[category].append((original, code)) # Display results by category for category, items in category_stats.items(): if len(items) > 0: result += f"📁 **{category}** ({len(items)} items):\n" for original, code in items[:3]: source_indicator = "🧠" if any(x in code for x in ["HYB", "ENH"]) else "📝" result += f" {source_indicator} `{original}` → `{code}`\n" if len(items) > 3: result += f" ... و {len(items) - 3} مورد دیگر\n" result += "\n" result += f"🔥 **Enhanced System**: Advanced Regex patterns with optional NER support!" return result def clear_all_enhanced(): """پاک کردن همه - نسخه پیشرفته""" anonymizer.mapping_table = {} anonymizer.reset_counters() return "", "", "", "", "" # Enhanced CSS enhanced_css = """ body, .gradio-container { font-family: 'Segoe UI', Tahoma, Arial, sans-serif !important; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%) !important; min-height: 100vh !important; padding: 20px !important; } .enhanced-header { background: linear-gradient(45deg, #FF6B6B, #4ECDC4) !important; border-radius: 20px !important; padding: 20px !important; margin-bottom: 20px !important; text-align: center !important; box-shadow: 0 10px 30px rgba(0,0,0,0.3) !important; } .mode-selector { background: linear-gradient(135deg, #74b9ff, #0984e3) !important; border-radius: 15px !important; padding: 20px !important; margin: 15px 0 !important; box-shadow: 0 8px 25px rgba(116, 185, 255, 0.3) !important; } .model-status { background: linear-gradient(135deg, #00b894, #00a085) !important; border-radius: 15px !important; padding: 15px !important; margin: 15px 0 !important; color: white !important; font-weight: bold !important; text-align: center !important; box-shadow: 0 6px 20px rgba(0, 184, 148, 0.4) !important; } .rtl { direction: rtl !important; text-align: right !important; } .ltr { direction: ltr !important; text-align: left !important; } .workflow { display: grid !important; grid-template-columns: 1fr 1fr 1fr 1fr !important; gap: 25px !important; padding: 30px !important; align-items: start !important; background: rgba(255, 255, 255, 0.1) !important; border-radius: 20px !important; backdrop-filter: blur(10px) !important; } .gradio-textbox { border-radius: 10px !important; box-shadow: 0 4px 15px rgba(0,0,0,0.1) !important; min-height: 380px !important; max-height: 380px !important; height: 380px !important; } .gradio-button { border-radius: 25px !important; font-weight: bold !important; transition: all 0.3s ease !important; margin: 5px 0 !important; min-height: 50px !important; background: linear-gradient(45deg, #667eea, #764ba2) !important; border: none !important; color: white !important; } .gradio-button:hover { transform: translateY(-2px) !important; box-shadow: 0 8px 25px rgba(0,0,0,0.3) !important; background: linear-gradient(45deg, #764ba2, #667eea) !important; } @media (max-width: 1200px) { .workflow { grid-template-columns: 1fr 1fr !important; } } @media (max-width: 768px) { .workflow { grid-template-columns: 1fr !important; } } """ # Main Gradio Interface with gr.Blocks(title="🚀 Enhanced Multi-Modal Anonymization", theme=gr.themes.Soft(), css=enhanced_css) as app: # Header with gr.Row(): gr.HTML("""
🤖 Advanced Regex + Optional NER = Maximum Accuracy