import gradio as gr import re import os import requests import time import logging # تنظیم logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class UniversalAnonymizer: def __init__(self): self.mapping_table = {} self.counters = { 'company': 0, 'person': 0, 'amount': 0, 'percent': 0 } self.api_key = os.getenv("OPENAI_API_KEY", "") def anonymize_text(self, original_text, lang='fa'): """ناشناس‌سازی جامع با تشخیص خودکار الگوها""" try: if not original_text or not original_text.strip(): return "⚠ Please enter input text!" if lang == 'en' else "⚠ لطفاً متن ورودی را وارد کنید!" # ریست متغیرها self.mapping_table = {} self.counters = {key: 0 for key in self.counters.keys()} anonymized = original_text # ترتیب مهم است: از خاص به عام # مرحله 1: نام‌های خاص اشخاص (اول از همه) anonymized = self._anonymize_specific_persons(anonymized) # مرحله 2: نام‌های خاص شرکت‌ها anonymized = self._anonymize_specific_companies(anonymized) # مرحله 3: مبالغ مالی anonymized = self._anonymize_amounts(anonymized) # مرحله 4: درصدها anonymized = self._anonymize_percentages(anonymized) # مرحله 5: نام‌های عمومی اشخاص anonymized = self._anonymize_general_persons(anonymized) # مرحله 6: نام‌های عمومی شرکت‌ها anonymized = self._anonymize_general_companies(anonymized) logger.info(f"✅ Anonymization completed. Found {len(self.mapping_table)} entities.") return anonymized except Exception as e: return f"⚠ Error in anonymization: {str(e)}" if lang == 'en' else f"⚠ خطا در ناشناس‌سازی: {str(e)}" def _anonymize_specific_persons(self, text): """ناشناس‌سازی نام‌های خاص اشخاص""" # نام‌های خاص که حتماً باید ناشناس شوند specific_names = [ 'مهدی اخوان بهابادی', # می‌توانید نام‌های خاص دیگر را اینجا اضافه کنید ] for name in specific_names: if name in text: if name not in self.mapping_table: self.counters['person'] += 1 code = f"person-{self.counters['person']:02d}" self.mapping_table[name] = code text = text.replace(name, code) logger.info(f"Person replaced: {name} -> {code}") return text def _anonymize_specific_companies(self, text): """ناشناس‌سازی نام‌های خاص شرکت‌ها""" # نام‌های خاص شرکت‌ها specific_companies = [ 'شرکت سرمایه گذاری پارسیان', 'شرکت سرمایه‌گذاری پارسیان', 'بانک پارسیان', 'گروه مالی پارسیان', # می‌توانید نام‌های خاص دیگر را اینجا اضافه کنید ] for company in specific_companies: if company in text: if company not in self.mapping_table: self.counters['company'] += 1 code = f"company-{self.counters['company']:02d}" self.mapping_table[company] = code text = text.replace(company, code) logger.info(f"Company replaced: {company} -> {code}") return text def _anonymize_amounts(self, text): """تشخیص و ناشناس‌سازی مبالغ مالی""" # الگوهای مبالغ - ترتیب از خاص به عام amount_patterns = [ # مبالغ با "تومانی" در انتها (r'(\d+(?:\.\d+)?)\s+(میلیارد|میلیون|هزار)\s+تومانی', 'amount'), # مبالغ با همت (r'(\d+(?:\.\d+)?)\s+همت', 'amount'), # مبالغ با هزار تن (r'(\d+(?:\.\d+)?)\s+هزار\s+تن', 'amount'), # مبالغ عادی با میلیارد/میلیون (r'(\d+(?:\.\d+)?)\s+(هزار\s+)?میلیارد\s+(تومان|ریال)', 'amount'), (r'(\d+(?:\.\d+)?)\s+(هزار\s+)?میلیون\s+(تومان|ریال)', 'amount'), (r'(\d+(?:\.\d+)?)\s+هزار\s+(تومان|ریال)', 'amount'), # مبالغ با عبارات اضافی (r'بیش از\s+(\d+(?:\.\d+)?)\s+(میلیارد|میلیون|هزار)\s+(تومان|ریال)', 'amount'), (r'حدود\s+(\d+(?:\.\d+)?)\s+(میلیارد|میلیون|هزار)\s+(تومان|ریال)', 'amount'), (r'نزدیک به\s+(\d+(?:\.\d+)?)\s+(میلیارد|میلیون|هزار)\s+(تومان|ریال)', 'amount'), # واحدهای دیگر (r'(\d+(?:\.\d+)?)\s+(تن|کیلوگرم|متر|لیتر|دستگاه|واحد|نفر)', 'amount'), # مبالغ ساده (r'(\d+(?:\.\d+)?)\s+(تومان|ریال)(?!\w)', 'amount'), ] for pattern, category in amount_patterns: matches = list(re.finditer(pattern, text)) # از آخر به اول جایگزین می‌کنیم تا موقعیت‌ها تغییر نکنند for match in reversed(matches): matched_text = match.group(0) if matched_text not in self.mapping_table: self.counters[category] += 1 code = f"{category}-{self.counters[category]:02d}" self.mapping_table[matched_text] = code # جایگزینی دقیق با استفاده از موقعیت start, end = match.span() text = text[:start] + code + text[end:] logger.info(f"Amount replaced: {matched_text} -> {code}") return text def _anonymize_percentages(self, text): """تشخیص و ناشناس‌سازی درصدها""" percent_patterns = [ (r'(\d+(?:\.\d+)?)\s+درصدی', 'percent'), (r'(\d+(?:\.\d+)?)\s+درصد', 'percent'), (r'(\d+(?:\.\d+)?)\s*%', 'percent'), (r'(\d+(?:\.\d+)?)\s*٪', 'percent'), (r'منفی\s+(\d+(?:\.\d+)?)\s+درصد', 'percent'), (r'بیش از\s+(\d+(?:\.\d+)?)\s+درصد', 'percent'), (r'حدود\s+(\d+(?:\.\d+)?)\s+درصد', 'percent'), (r'کمتر از\s+(\d+(?:\.\d+)?)\s+درصد', 'percent'), ] for pattern, category in percent_patterns: matches = list(re.finditer(pattern, text)) for match in reversed(matches): matched_text = match.group(0) if matched_text not in self.mapping_table: self.counters[category] += 1 code = f"{category}-{self.counters[category]:02d}" self.mapping_table[matched_text] = code start, end = match.span() text = text[:start] + code + text[end:] logger.info(f"Percent replaced: {matched_text} -> {code}") return text def _anonymize_general_persons(self, text): """ناشناس‌سازی نام‌های عمومی اشخاص""" person_patterns = [ # نام با عنوان (r'دکتر\s+[آ-ی]+\s+[آ-ی]+(?:\s+[آ-ی]+)?', 'person'), (r'مهندس\s+[آ-ی]+\s+[آ-ی]+(?:\s+[آ-ی]+)?', 'person'), (r'آقای\s+[آ-ی]+\s+[آ-ی]+(?:\s+[آ-ی]+)?', 'person'), (r'خانم\s+[آ-ی]+\s+[آ-ی]+(?:\s+[آ-ی]+)?', 'person'), # نام با سید (r'سید\s*[آ-ی]+\s+[آ-ی]+(?:\s+[آ-ی]+)?', 'person'), # نام‌هایی که با مدیرعامل همراه هستند (r'[آ-ی]+\s+[آ-ی]+(?:\s+[آ-ی]+)?\s*،?\s*مدیرعامل', 'person'), # نام و نام خانوادگی - حداقل 3 حرف (r'(? {code}") return text def _anonymize_general_companies(self, text): """ناشناس‌سازی نام‌های عمومی شرکت‌ها""" company_patterns = [ # شرکت‌ها با پرانتز (r'شرکت\s+[آ-ی][آ-ی\s]+\([آ-ی\s]+\)', 'company'), (r'بانک\s+[آ-ی][آ-ی\s]+\([آ-ی\s]+\)', 'company'), # شرکت‌ها با انواع مختلف (r'شرکت\s+[آ-ی][آ-ی\s]{4,}', 'company'), (r'بانک\s+[آ-ی][آ-ی\s]{2,}', 'company'), (r'گروه\s+[آ-ی][آ-ی\s]{2,}', 'company'), (r'موسسه\s+[آ-ی][آ-ی\s]{2,}', 'company'), (r'سازمان\s+[آ-ی][آ-ی\s]{2,}', 'company'), # شرکت‌های خاص (r'[آ-ی]+\s+خودرو', 'company'), (r'[آ-ی]+\s+فولاد', 'company'), (r'بیمه\s+[آ-ی]+', 'company'), ] # عباراتی که نباید به عنوان شرکت تشخیص داده شوند exclude_company_phrases = ['شرکت اصلی'] for pattern, category in company_patterns: matches = list(re.finditer(pattern, text)) for match in reversed(matches): matched_text = match.group(0) # بررسی که جزو عبارات مستثنی نباشد is_excluded = any(phrase in matched_text for phrase in exclude_company_phrases) if not is_excluded and matched_text not in self.mapping_table: self.counters[category] += 1 code = f"{category}-{self.counters[category]:02d}" self.mapping_table[matched_text] = code start, end = match.span() text = text[:start] + code + text[end:] logger.info(f"Company replaced: {matched_text} -> {code}") return text def send_to_chatgpt(self, anonymized_text, lang='fa'): """ارسال به ChatGPT""" try: if not anonymized_text or not anonymized_text.strip(): return "⚠ Anonymized text is empty!" if lang == 'en' else "⚠ متن ناشناس‌شده خالی است!" if not self.api_key: return "⚠ API Key not configured! Please set OPENAI_API_KEY environment variable." if lang == 'en' else "⚠ کلید API تنظیم نشده است! لطفاً OPENAI_API_KEY را در متغیرهای محیطی تنظیم کنید." system_msg = "You are a professional financial analyst. The text contains anonymous codes. Answer questions accurately." if lang == 'en' else "شما یک تحلیلگر مالی حرفه‌ای هستید. متن حاوی کدهای ناشناس است. به سوالات با دقت پاسخ دهید." headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" } data = { "model": "gpt-4o-mini", "messages": [ {"role": "system", "content": system_msg}, {"role": "user", "content": anonymized_text} ], "max_tokens": 2000, "temperature": 0.7 } response = requests.post( "https://api.openai.com/v1/chat/completions", headers=headers, json=data, timeout=30 ) if response.status_code == 200: result = response.json() return result['choices'][0]['message']['content'] else: error_data = response.json() if response.content else {} error_message = error_data.get('error', {}).get('message', response.text) if 'Incorrect API key' in error_message: return "⚠ Invalid API key." if lang == 'en' else "⚠ کلید API نامعتبر است." elif 'quota' in error_message: return "⚠ API quota exceeded." if lang == 'en' else "⚠ سهمیه API تمام شده است." else: return f"⚠ API Error: {error_message}" except Exception as e: return f"⚠ Error connecting to ChatGPT: {str(e)}" if lang == 'en' else f"⚠ خطا در ارتباط با ChatGPT: {str(e)}" def deanonymize_response(self, gpt_response, lang='fa'): """بازگردانی""" try: if not gpt_response or not gpt_response.strip(): return "⚠ ChatGPT response is empty!" if lang == 'en' else "⚠ پاسخ ChatGPT خالی است!" if not self.mapping_table: return "⚠ Mapping table is empty!" if lang == 'en' else "⚠ جدول نگاشت خالی است!" final_result = gpt_response reverse_mapping = {code: original for original, code in self.mapping_table.items()} # جایگزینی از طولانی‌ترین کد اول sorted_codes = sorted(reverse_mapping.items(), key=lambda x: len(x[0]), reverse=True) for code, original in sorted_codes: final_result = final_result.replace(code, original) return final_result except Exception as e: return f"⚠ Deanonymization error: {str(e)}" if lang == 'en' else f"⚠ خطا در بازگردانی: {str(e)}" def process_all_steps(input_text, language): """پردازش خودکار تمام مراحل""" lang = 'en' if language == 'English' else 'fa' if not input_text.strip(): error_msg = "⚠ Please enter input text!" if lang == 'en' else "⚠ لطفاً متن ورودی را وارد کنید!" return error_msg, "", "", "" try: start_time = time.time() anonymized_text = anonymizer.anonymize_text(input_text, lang) if anonymized_text.startswith("⚠"): return anonymized_text, "", "", "" gpt_response = anonymizer.send_to_chatgpt(anonymized_text, lang) if gpt_response.startswith("⚠"): entities_found = len(anonymizer.mapping_table) success_msg = (f"✅ Anonymization completed!\n" f"📊 Total: {entities_found} entities protected") return success_msg, anonymized_text, gpt_response, "" final_result = anonymizer.deanonymize_response(gpt_response, lang) total_time = time.time() - start_time entities_found = len(anonymizer.mapping_table) # آمار تفصیلی company_count = anonymizer.counters['company'] amount_count = anonymizer.counters['amount'] percent_count = anonymizer.counters['percent'] person_count = anonymizer.counters['person'] success_msg = (f"🎉 Universal anonymization & restoration successful!\n" f"🏢 Companies: {company_count} | 💰 Amounts: {amount_count} | 📊 Percentages: {percent_count} | 👤 Persons: {person_count}\n" f"📊 Total: {entities_found} entities | ⏱️ Time: {total_time:.2f}s") return success_msg, anonymized_text, gpt_response, final_result except Exception as e: error_msg = f"⚠ Processing error: {str(e)}" if lang == 'en' else f"⚠ خطا در پردازش: {str(e)}" return error_msg, "", "", "" def get_mapping_table(language): """نمایش جدول نگاشت""" lang = 'en' if language == 'English' else 'fa' if not anonymizer.mapping_table: return "⚠ Mapping table is empty! Please process some text first." if lang == 'en' else "⚠ جدول نگاشت خالی است! ابتدا متنی را پردازش کنید." result = "📋 **Universal Mapping Table:**\n\n" if lang == 'en' else "📋 **جدول نگاشت جامع:**\n\n" # گروه‌بندی بر اساس نوع categories = { 'company': '🏢 **Companies**', 'amount': '💰 **Amounts**', 'percent': '📊 **Percentages**', 'person': '👤 **Persons**' } for category, title in categories.items(): category_items = {k: v for k, v in anonymizer.mapping_table.items() if v.startswith(category)} if category_items: result += f"{title}:\n" for original, code in category_items.items(): result += f" • `{original}` → `{code}`\n" result += "\n" # آمار کلی result += f"📊 **Summary**: {len(anonymizer.mapping_table)} total entities anonymized\n" return result def clear_all(): """پاک کردن همه""" anonymizer.mapping_table = {} anonymizer.counters = {key: 0 for key in anonymizer.counters.keys()} return "", "", "", "", "" def update_ui_text(language): """به‌روزرسانی متن‌های رابط کاربری""" if language == 'English': return { 'title': 'Universal Business Data Anonymization System', 'step1': 'Input Text & Settings', 'step2': 'Anonymized Text', 'step3': 'Raw ChatGPT Response', 'step4': 'Final Restored Response', 'input_placeholder': 'Enter your business text here...\nThe system will automatically detect and anonymize all types of company names, financial amounts, percentages, and personal names using advanced pattern recognition...', 'process_btn': 'Process with Universal Detection', 'clear_btn': 'Clear All', 'mapping_btn': 'Show Universal Mapping Table', 'direction': 'ltr' } else: return { 'title': 'سیستم ناشناس‌سازی جامع اطلاعات تجاری', 'step1': 'متن ورودی و تنظیمات', 'step2': 'متن ناشناس‌شده', 'step3': 'پاسخ خام ChatGPT', 'step4': 'پاسخ نهایی بازگردانده شده', 'input_placeholder': 'متن تجاری خود را اینجا وارد کنید...\nسیستم به صورت خودکار تمام انواع نام شرکت‌ها، مبالغ مالی، درصدها و نام‌های اشخاص را با تشخیص الگوی پیشرفته شناسایی و ناشناس می‌کند...', 'process_btn': 'پردازش با تشخیص جامع', 'clear_btn': 'پاک کردن همه', 'mapping_btn': 'نمایش جدول نگاشت جامع', 'direction': 'rtl' } def update_interface(language): """تغییر رابط کاربری بر اساس زبان""" ui_text = update_ui_text(language) is_english = (language == 'English') workflow_css = "workflow ltr" if is_english else "workflow rtl" return [ gr.update(value=f"

📊 {ui_text['title']}

"), gr.update(value=f"

📝 {ui_text['step1']}

"), gr.update(placeholder=ui_text['input_placeholder'], rtl=not is_english), gr.update(value=f"🚀 {ui_text['process_btn']}"), gr.update(value=f"🗑️ {ui_text['clear_btn']}"), gr.update(rtl=not is_english), gr.update(value=f"

🎭 {ui_text['step2']}

"), gr.update(rtl=not is_english), gr.update(value=f"

🤖 {ui_text['step3']}

"), gr.update(rtl=not is_english), gr.update(value=f"

✅ {ui_text['step4']}

"), gr.update(rtl=not is_english), gr.update(value=f"📋 {ui_text['mapping_btn']}"), gr.update(rtl=not is_english), gr.update(elem_classes=workflow_css) ] # ایجاد instance anonymizer = UniversalAnonymizer() # CSS custom_css = """ body, .gradio-container { font-family: 'Segoe UI', Tahoma, Arial, sans-serif !important; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%) !important; min-height: 100vh !important; padding: 20px !important; } .rtl { direction: rtl !important; text-align: right !important; } .ltr { direction: ltr !important; text-align: left !important; } .workflow { display: grid !important; grid-template-columns: 1fr 1fr 1fr 1fr !important; gap: 25px !important; padding: 30px !important; align-items: start !important; align-content: start !important; grid-auto-rows: auto !important; } .gradio-textbox { border-radius: 10px !important; box-shadow: 0 4px 15px rgba(0,0,0,0.1) !important; min-height: 380px !important; max-height: 380px !important; height: 380px !important; } .gradio-textbox textarea { min-height: 350px !important; max-height: 350px !important; height: 350px !important; resize: vertical !important; } .status-box { background: linear-gradient(135deg, #4CAF50, #45a049) !important; border: 3px solid #2E7D32 !important; border-radius: 15px !important; padding: 15px !important; margin: 10px 0 !important; box-shadow: 0 8px 32px rgba(76, 175, 80, 0.3) !important; animation: pulse 2s infinite !important; min-height: 120px !important; max-height: 120px !important; } @keyframes pulse { 0% { box-shadow: 0 8px 32px rgba(76, 175, 80, 0.3); } 50% { box-shadow: 0 8px 40px rgba(76, 175, 80, 0.6); } 100% { box-shadow: 0 8px 32px rgba(76, 175, 80, 0.3); } } .gradio-button { border-radius: 25px !important; font-weight: bold !important; transition: all 0.3s ease !important; margin: 5px 0 !important; min-height: 50px !important; max-height: 50px !important; } h1 { background: linear-gradient(45deg, #FFD700, #FFA500) !important; -webkit-background-clip: text !important; -webkit-text-fill-color: transparent !important; background-clip: text !important; min-height: 80px !important; } """ # رابط کاربری Gradio with gr.Blocks(title="📊 Universal Anonymization System", theme=gr.themes.Soft(), css=custom_css) as app: with gr.Row(): language_selector = gr.Radio( choices=["فارسی", "English"], value="فارسی", label="Language / زبان", interactive=True ) with gr.Column(): title = gr.HTML("

📊 سیستم ناشناس‌سازی جامع اطلاعات تجاری

") with gr.Row(elem_classes="workflow rtl") as workflow_row: with gr.Column(): step1_title = gr.HTML('

📝 متن ورودی و تنظیمات

') input_text = gr.Textbox( lines=15, placeholder="متن تجاری خود را اینجا وارد کنید...\nسیستم به صورت خودکار تمام انواع نام شرکت‌ها، مبالغ مالی، درصدها و نام‌های اشخاص را با تشخیص الگوی پیشرفته شناسایی و ناشناس می‌کند...", label="", rtl=True ) process_btn = gr.Button("🚀 پردازش با تشخیص جامع", variant="primary") clear_btn = gr.Button("🗑️ پاک کردن همه", variant="stop") status = gr.Textbox( label="وضعیت", lines=4, interactive=False, rtl=True, elem_classes=["status-box"] ) with gr.Column(): step2_title = gr.HTML('

🎭 متن ناشناس‌شده

') anonymized_output = gr.Textbox( lines=15, placeholder="متن ناشناس‌شده اینجا نمایش داده می‌شود...", label="", interactive=False, rtl=True ) with gr.Column(): step3_title = gr.HTML('

🤖 پاسخ خام ChatGPT

') gpt_output = gr.Textbox( lines=15, placeholder="پاسخ خام ChatGPT اینجا نمایش داده می‌شود...", label="", interactive=False, rtl=True ) with gr.Column(): step4_title = gr.HTML('

✅ پاسخ نهایی بازگردانده شده

') final_output = gr.Textbox( lines=15, placeholder="پاسخ نهایی اینجا نمایش داده می‌شود...", label="", interactive=False, rtl=True ) with gr.Row(): with gr.Column(): mapping_title = gr.HTML('

🗂️ جدول نگاشت جامع

') mapping_btn = gr.Button("📋 نمایش جدول نگاشت جامع") mapping_output = gr.Textbox( lines=10, label="جدول نگاشت اطلاعات", interactive=False, visible=False, rtl=True ) # Event handlers language_selector.change( fn=update_interface, inputs=[language_selector], outputs=[title, step1_title, input_text, process_btn, clear_btn, status, step2_title, anonymized_output, step3_title, gpt_output, step4_title, final_output, mapping_btn, mapping_output, workflow_row] ) process_btn.click( fn=process_all_steps, inputs=[input_text, language_selector], outputs=[status, anonymized_output, gpt_output, final_output] ) clear_btn.click( fn=clear_all, outputs=[input_text, anonymized_output, gpt_output, final_output, status] ) mapping_btn.click( fn=get_mapping_table, inputs=[language_selector], outputs=[mapping_output] ) mapping_btn.click( fn=lambda: gr.update(visible=True), outputs=[mapping_output] ) if __name__ == "__main__": print("=" * 80) print("تست سیستم ناشناس‌سازی جامع با نمونه‌های ذکر شده:") print("=" * 80) # نمونه‌های تست test_samples = [ "مهدی اخوان بهابادی باید یک اسم حساب شود.", "در مجمع عمومی عادی سالیانه اعلام کرد درآمد عملیاتی شرکت اصلی", "به معنای درآمد روزانه 178 میلیارد تومانی این اپراتور بوده", "در خودروسازان حالا از مرز 305 همت عبور کرده و به 305 همت رسیده است.", "زیان انباشته این شرکت 7.6 همت زیاد شده است.", "تولید محصولات گرم این شرکت به 1000 هزار تن و محصولات سرد به 1378 هزار تن رسید", "شرکت سرمایه گذاری پارسیان را اعلام کرد", "بانک پارسیان و گروه مالی پارسیان" ] # تست کامل full_test = """مهدی اخوان بهابادی در مجمع عمومی عادی سالیانه اعلام کرد درآمد عملیاتی شرکت اصلی به 178 میلیارد تومانی رسیده است. در خودروسازان حالا از مرز 305 همت عبور کرده و سود عملیاتی داشته اما زیان انباشته این شرکت 7.6 همت زیاد شده است. تولید محصولات گرم این شرکت به 1000 هزار تن و محصولات سرد به 1378 هزار تن رسید. شرکت سرمایه گذاری پارسیان سود خوبی را نشان داد. بانک پارسیان و گروه مالی پارسیان هم عملکرد مثبتی داشتند.""" anonymizer_test = UniversalAnonymizer() # تست نمونه‌های جداگانه print("\n📌 تست نمونه‌های جداگانه:") print("-" * 40) for i, sample in enumerate(test_samples, 1): anonymizer_test = UniversalAnonymizer() # ریست برای هر تست result = anonymizer_test.anonymize_text(sample) print(f"{i}. اصلی: {sample}") print(f" ناشناس: {result}") print() # تست کامل print("\n📌 تست کامل:") print("-" * 40) anonymizer_test = UniversalAnonymizer() result = anonymizer_test.anonymize_text(full_test) print("متن اصلی:") print(full_test) print("\nمتن ناشناس‌شده:") print(result) print("\n📊 جدول نگاشت:") print("-" * 40) for original, code in anonymizer_test.mapping_table.items(): print(f"{code} ← {original}") print("\n" + "=" * 80) print("✅ برنامه آماده اجراست!") print("=" * 80) app.launch()