import gradio as gr import re import os import requests import time import logging # تنظیم logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class UniversalAnonymizer: def __init__(self): self.mapping_table = {} self.counters = { 'company': 0, 'person': 0, 'amount': 0, 'percent': 0 } self.api_key = os.getenv("OPENAI_API_KEY", "") def anonymize_text(self, original_text, lang='fa'): """ناشناسسازی جامع با تشخیص خودکار الگوها""" try: if not original_text or not original_text.strip(): return "⚠ Please enter input text!" if lang == 'en' else "⚠ لطفاً متن ورودی را وارد کنید!" # ریست متغیرها self.mapping_table = {} self.counters = {key: 0 for key in self.counters.keys()} anonymized = original_text # ترتیب مهم است: از خاص به عام # مرحله 1: نامهای خاص اشخاص (اول از همه) anonymized = self._anonymize_specific_persons(anonymized) # مرحله 2: نامهای خاص شرکتها anonymized = self._anonymize_specific_companies(anonymized) # مرحله 3: مبالغ مالی anonymized = self._anonymize_amounts(anonymized) # مرحله 4: درصدها anonymized = self._anonymize_percentages(anonymized) # مرحله 5: نامهای عمومی اشخاص anonymized = self._anonymize_general_persons(anonymized) # مرحله 6: نامهای عمومی شرکتها anonymized = self._anonymize_general_companies(anonymized) logger.info(f"✅ Anonymization completed. Found {len(self.mapping_table)} entities.") return anonymized except Exception as e: return f"⚠ Error in anonymization: {str(e)}" if lang == 'en' else f"⚠ خطا در ناشناسسازی: {str(e)}" def _anonymize_specific_persons(self, text): """ناشناسسازی نامهای خاص اشخاص""" # نامهای خاص که حتماً باید ناشناس شوند specific_names = [ 'مهدی اخوان بهابادی', # میتوانید نامهای خاص دیگر را اینجا اضافه کنید ] for name in specific_names: if name in text: if name not in self.mapping_table: self.counters['person'] += 1 code = f"person-{self.counters['person']:02d}" self.mapping_table[name] = code text = text.replace(name, code) logger.info(f"Person replaced: {name} -> {code}") return text def _anonymize_specific_companies(self, text): """ناشناسسازی نامهای خاص شرکتها""" # نامهای خاص شرکتها specific_companies = [ 'شرکت سرمایه گذاری پارسیان', 'شرکت سرمایهگذاری پارسیان', 'بانک پارسیان', 'گروه مالی پارسیان', # میتوانید نامهای خاص دیگر را اینجا اضافه کنید ] for company in specific_companies: if company in text: if company not in self.mapping_table: self.counters['company'] += 1 code = f"company-{self.counters['company']:02d}" self.mapping_table[company] = code text = text.replace(company, code) logger.info(f"Company replaced: {company} -> {code}") return text def _anonymize_amounts(self, text): """تشخیص و ناشناسسازی مبالغ مالی""" # الگوهای مبالغ - ترتیب از خاص به عام amount_patterns = [ # مبالغ با "تومانی" در انتها (r'(\d+(?:\.\d+)?)\s+(میلیارد|میلیون|هزار)\s+تومانی', 'amount'), # مبالغ با همت (r'(\d+(?:\.\d+)?)\s+همت', 'amount'), # مبالغ با هزار تن (r'(\d+(?:\.\d+)?)\s+هزار\s+تن', 'amount'), # مبالغ عادی با میلیارد/میلیون (r'(\d+(?:\.\d+)?)\s+(هزار\s+)?میلیارد\s+(تومان|ریال)', 'amount'), (r'(\d+(?:\.\d+)?)\s+(هزار\s+)?میلیون\s+(تومان|ریال)', 'amount'), (r'(\d+(?:\.\d+)?)\s+هزار\s+(تومان|ریال)', 'amount'), # مبالغ با عبارات اضافی (r'بیش از\s+(\d+(?:\.\d+)?)\s+(میلیارد|میلیون|هزار)\s+(تومان|ریال)', 'amount'), (r'حدود\s+(\d+(?:\.\d+)?)\s+(میلیارد|میلیون|هزار)\s+(تومان|ریال)', 'amount'), (r'نزدیک به\s+(\d+(?:\.\d+)?)\s+(میلیارد|میلیون|هزار)\s+(تومان|ریال)', 'amount'), # واحدهای دیگر (r'(\d+(?:\.\d+)?)\s+(تن|کیلوگرم|متر|لیتر|دستگاه|واحد|نفر)', 'amount'), # مبالغ ساده (r'(\d+(?:\.\d+)?)\s+(تومان|ریال)(?!\w)', 'amount'), ] for pattern, category in amount_patterns: matches = list(re.finditer(pattern, text)) # از آخر به اول جایگزین میکنیم تا موقعیتها تغییر نکنند for match in reversed(matches): matched_text = match.group(0) if matched_text not in self.mapping_table: self.counters[category] += 1 code = f"{category}-{self.counters[category]:02d}" self.mapping_table[matched_text] = code # جایگزینی دقیق با استفاده از موقعیت start, end = match.span() text = text[:start] + code + text[end:] logger.info(f"Amount replaced: {matched_text} -> {code}") return text def _anonymize_percentages(self, text): """تشخیص و ناشناسسازی درصدها""" percent_patterns = [ (r'(\d+(?:\.\d+)?)\s+درصدی', 'percent'), (r'(\d+(?:\.\d+)?)\s+درصد', 'percent'), (r'(\d+(?:\.\d+)?)\s*%', 'percent'), (r'(\d+(?:\.\d+)?)\s*٪', 'percent'), (r'منفی\s+(\d+(?:\.\d+)?)\s+درصد', 'percent'), (r'بیش از\s+(\d+(?:\.\d+)?)\s+درصد', 'percent'), (r'حدود\s+(\d+(?:\.\d+)?)\s+درصد', 'percent'), (r'کمتر از\s+(\d+(?:\.\d+)?)\s+درصد', 'percent'), ] for pattern, category in percent_patterns: matches = list(re.finditer(pattern, text)) for match in reversed(matches): matched_text = match.group(0) if matched_text not in self.mapping_table: self.counters[category] += 1 code = f"{category}-{self.counters[category]:02d}" self.mapping_table[matched_text] = code start, end = match.span() text = text[:start] + code + text[end:] logger.info(f"Percent replaced: {matched_text} -> {code}") return text def _anonymize_general_persons(self, text): """ناشناسسازی نامهای عمومی اشخاص""" person_patterns = [ # نام با عنوان (r'دکتر\s+[آ-ی]+\s+[آ-ی]+(?:\s+[آ-ی]+)?', 'person'), (r'مهندس\s+[آ-ی]+\s+[آ-ی]+(?:\s+[آ-ی]+)?', 'person'), (r'آقای\s+[آ-ی]+\s+[آ-ی]+(?:\s+[آ-ی]+)?', 'person'), (r'خانم\s+[آ-ی]+\s+[آ-ی]+(?:\s+[آ-ی]+)?', 'person'), # نام با سید (r'سید\s*[آ-ی]+\s+[آ-ی]+(?:\s+[آ-ی]+)?', 'person'), # نامهایی که با مدیرعامل همراه هستند (r'[آ-ی]+\s+[آ-ی]+(?:\s+[آ-ی]+)?\s*،?\s*مدیرعامل', 'person'), # نام و نام خانوادگی - حداقل 3 حرف (r'(? {code}") return text def _anonymize_general_companies(self, text): """ناشناسسازی نامهای عمومی شرکتها""" company_patterns = [ # شرکتها با پرانتز (r'شرکت\s+[آ-ی][آ-ی\s]+\([آ-ی\s]+\)', 'company'), (r'بانک\s+[آ-ی][آ-ی\s]+\([آ-ی\s]+\)', 'company'), # شرکتها با انواع مختلف (r'شرکت\s+[آ-ی][آ-ی\s]{4,}', 'company'), (r'بانک\s+[آ-ی][آ-ی\s]{2,}', 'company'), (r'گروه\s+[آ-ی][آ-ی\s]{2,}', 'company'), (r'موسسه\s+[آ-ی][آ-ی\s]{2,}', 'company'), (r'سازمان\s+[آ-ی][آ-ی\s]{2,}', 'company'), # شرکتهای خاص (r'[آ-ی]+\s+خودرو', 'company'), (r'[آ-ی]+\s+فولاد', 'company'), (r'بیمه\s+[آ-ی]+', 'company'), ] # عباراتی که نباید به عنوان شرکت تشخیص داده شوند exclude_company_phrases = ['شرکت اصلی'] for pattern, category in company_patterns: matches = list(re.finditer(pattern, text)) for match in reversed(matches): matched_text = match.group(0) # بررسی که جزو عبارات مستثنی نباشد is_excluded = any(phrase in matched_text for phrase in exclude_company_phrases) if not is_excluded and matched_text not in self.mapping_table: self.counters[category] += 1 code = f"{category}-{self.counters[category]:02d}" self.mapping_table[matched_text] = code start, end = match.span() text = text[:start] + code + text[end:] logger.info(f"Company replaced: {matched_text} -> {code}") return text def send_to_chatgpt(self, anonymized_text, lang='fa'): """ارسال به ChatGPT""" try: if not anonymized_text or not anonymized_text.strip(): return "⚠ Anonymized text is empty!" if lang == 'en' else "⚠ متن ناشناسشده خالی است!" if not self.api_key: return "⚠ API Key not configured! Please set OPENAI_API_KEY environment variable." if lang == 'en' else "⚠ کلید API تنظیم نشده است! لطفاً OPENAI_API_KEY را در متغیرهای محیطی تنظیم کنید." system_msg = "You are a professional financial analyst. The text contains anonymous codes. Answer questions accurately." if lang == 'en' else "شما یک تحلیلگر مالی حرفهای هستید. متن حاوی کدهای ناشناس است. به سوالات با دقت پاسخ دهید." headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" } data = { "model": "gpt-4o-mini", "messages": [ {"role": "system", "content": system_msg}, {"role": "user", "content": anonymized_text} ], "max_tokens": 2000, "temperature": 0.7 } response = requests.post( "https://api.openai.com/v1/chat/completions", headers=headers, json=data, timeout=30 ) if response.status_code == 200: result = response.json() return result['choices'][0]['message']['content'] else: error_data = response.json() if response.content else {} error_message = error_data.get('error', {}).get('message', response.text) if 'Incorrect API key' in error_message: return "⚠ Invalid API key." if lang == 'en' else "⚠ کلید API نامعتبر است." elif 'quota' in error_message: return "⚠ API quota exceeded." if lang == 'en' else "⚠ سهمیه API تمام شده است." else: return f"⚠ API Error: {error_message}" except Exception as e: return f"⚠ Error connecting to ChatGPT: {str(e)}" if lang == 'en' else f"⚠ خطا در ارتباط با ChatGPT: {str(e)}" def deanonymize_response(self, gpt_response, lang='fa'): """بازگردانی""" try: if not gpt_response or not gpt_response.strip(): return "⚠ ChatGPT response is empty!" if lang == 'en' else "⚠ پاسخ ChatGPT خالی است!" if not self.mapping_table: return "⚠ Mapping table is empty!" if lang == 'en' else "⚠ جدول نگاشت خالی است!" final_result = gpt_response reverse_mapping = {code: original for original, code in self.mapping_table.items()} # جایگزینی از طولانیترین کد اول sorted_codes = sorted(reverse_mapping.items(), key=lambda x: len(x[0]), reverse=True) for code, original in sorted_codes: final_result = final_result.replace(code, original) return final_result except Exception as e: return f"⚠ Deanonymization error: {str(e)}" if lang == 'en' else f"⚠ خطا در بازگردانی: {str(e)}" def process_all_steps(input_text, language): """پردازش خودکار تمام مراحل""" lang = 'en' if language == 'English' else 'fa' if not input_text.strip(): error_msg = "⚠ Please enter input text!" if lang == 'en' else "⚠ لطفاً متن ورودی را وارد کنید!" return error_msg, "", "", "" try: start_time = time.time() anonymized_text = anonymizer.anonymize_text(input_text, lang) if anonymized_text.startswith("⚠"): return anonymized_text, "", "", "" gpt_response = anonymizer.send_to_chatgpt(anonymized_text, lang) if gpt_response.startswith("⚠"): entities_found = len(anonymizer.mapping_table) success_msg = (f"✅ Anonymization completed!\n" f"📊 Total: {entities_found} entities protected") return success_msg, anonymized_text, gpt_response, "" final_result = anonymizer.deanonymize_response(gpt_response, lang) total_time = time.time() - start_time entities_found = len(anonymizer.mapping_table) # آمار تفصیلی company_count = anonymizer.counters['company'] amount_count = anonymizer.counters['amount'] percent_count = anonymizer.counters['percent'] person_count = anonymizer.counters['person'] success_msg = (f"🎉 Universal anonymization & restoration successful!\n" f"🏢 Companies: {company_count} | 💰 Amounts: {amount_count} | 📊 Percentages: {percent_count} | 👤 Persons: {person_count}\n" f"📊 Total: {entities_found} entities | ⏱️ Time: {total_time:.2f}s") return success_msg, anonymized_text, gpt_response, final_result except Exception as e: error_msg = f"⚠ Processing error: {str(e)}" if lang == 'en' else f"⚠ خطا در پردازش: {str(e)}" return error_msg, "", "", "" def get_mapping_table(language): """نمایش جدول نگاشت""" lang = 'en' if language == 'English' else 'fa' if not anonymizer.mapping_table: return "⚠ Mapping table is empty! Please process some text first." if lang == 'en' else "⚠ جدول نگاشت خالی است! ابتدا متنی را پردازش کنید." result = "📋 **Universal Mapping Table:**\n\n" if lang == 'en' else "📋 **جدول نگاشت جامع:**\n\n" # گروهبندی بر اساس نوع categories = { 'company': '🏢 **Companies**', 'amount': '💰 **Amounts**', 'percent': '📊 **Percentages**', 'person': '👤 **Persons**' } for category, title in categories.items(): category_items = {k: v for k, v in anonymizer.mapping_table.items() if v.startswith(category)} if category_items: result += f"{title}:\n" for original, code in category_items.items(): result += f" • `{original}` → `{code}`\n" result += "\n" # آمار کلی result += f"📊 **Summary**: {len(anonymizer.mapping_table)} total entities anonymized\n" return result def clear_all(): """پاک کردن همه""" anonymizer.mapping_table = {} anonymizer.counters = {key: 0 for key in anonymizer.counters.keys()} return "", "", "", "", "" def update_ui_text(language): """بهروزرسانی متنهای رابط کاربری""" if language == 'English': return { 'title': 'Universal Business Data Anonymization System', 'step1': 'Input Text & Settings', 'step2': 'Anonymized Text', 'step3': 'Raw ChatGPT Response', 'step4': 'Final Restored Response', 'input_placeholder': 'Enter your business text here...\nThe system will automatically detect and anonymize all types of company names, financial amounts, percentages, and personal names using advanced pattern recognition...', 'process_btn': 'Process with Universal Detection', 'clear_btn': 'Clear All', 'mapping_btn': 'Show Universal Mapping Table', 'direction': 'ltr' } else: return { 'title': 'سیستم ناشناسسازی جامع اطلاعات تجاری', 'step1': 'متن ورودی و تنظیمات', 'step2': 'متن ناشناسشده', 'step3': 'پاسخ خام ChatGPT', 'step4': 'پاسخ نهایی بازگردانده شده', 'input_placeholder': 'متن تجاری خود را اینجا وارد کنید...\nسیستم به صورت خودکار تمام انواع نام شرکتها، مبالغ مالی، درصدها و نامهای اشخاص را با تشخیص الگوی پیشرفته شناسایی و ناشناس میکند...', 'process_btn': 'پردازش با تشخیص جامع', 'clear_btn': 'پاک کردن همه', 'mapping_btn': 'نمایش جدول نگاشت جامع', 'direction': 'rtl' } def update_interface(language): """تغییر رابط کاربری بر اساس زبان""" ui_text = update_ui_text(language) is_english = (language == 'English') workflow_css = "workflow ltr" if is_english else "workflow rtl" return [ gr.update(value=f"