import gradio as gr import re import os import requests import time import logging from packaging import version # تنظیم logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def auto_setup_models(): """راه‌اندازی خودکار مدل‌ها در صورت عدم وجود""" models_dir = "./models" required_models = { 'bert-fa-ner': 'HooshvareLab/bert-fa-zwnj-base-ner', 'bert-base-NER': 'dslim/bert-base-NER', } missing_models = [] for model_name in required_models.keys(): model_path = os.path.join(models_dir, model_name) if not os.path.exists(model_path) or not os.listdir(model_path): missing_models.append(model_name) if not missing_models: logger.info("✅ All models are already available") return True logger.info(f"📥 Auto-downloading missing models: {missing_models}") try: from transformers import AutoTokenizer, AutoModelForTokenClassification os.makedirs(models_dir, exist_ok=True) for model_name in missing_models: hf_repo = required_models[model_name] model_path = os.path.join(models_dir, model_name) logger.info(f"📥 Downloading {model_name} from {hf_repo}...") try: tokenizer = AutoTokenizer.from_pretrained(hf_repo) model = AutoModelForTokenClassification.from_pretrained(hf_repo) tokenizer.save_pretrained(model_path) model.save_pretrained(model_path) logger.info(f"✅ {model_name} downloaded successfully") del tokenizer, model except Exception as e: logger.error(f"❌ Failed to download {model_name}: {e}") if os.path.exists(model_path): import shutil shutil.rmtree(model_path) logger.info("🎉 Auto-setup completed!") return True except ImportError: logger.error("❌ transformers library not available for auto-download") return False except Exception as e: logger.error(f"❌ Auto-setup failed: {e}") return False # اجرای auto-setup در startup try: auto_setup_models() except Exception as e: logger.warning(f"⚠️ Auto-setup encountered an issue: {e}") logger.info("ℹ️ Continuing with manual setup...") class SimpleAnonymizer: def __init__(self): self.mapping_table = {} self.counters = { 'company': 0, 'person': 0, 'amount': 0, 'percent': 0 } self.api_key = os.getenv("OPENAI_API_KEY", "") def anonymize_text(self, original_text, lang='fa'): """ناشناس‌سازی ساده و دقیق""" try: if not original_text or not original_text.strip(): return "❌ Please enter input text!" if lang == 'en' else "❌ لطفاً متن ورودی را وارد کنید!" # ریست متغیرها self.mapping_table = {} self.counters = {key: 0 for key in self.counters.keys()} anonymized = original_text # الگوهای ساده و دقیق patterns = [ # شرکت‌ها - فقط نام‌های کامل شرکت‌ها (r'ایران\s+خودرو', 'company'), (r'سایپا', 'company'), (r'بانک\s+[آ-ی]+(?:\s+[آ-ی]+)?', 'company'), (r'شرکت\s+[آ-ی]+(?:\s+[آ-ی]+)*', 'company'), (r'گروه\s+[آ-ی]+(?:\s+[آ-ی]+)*', 'company'), (r'موسسه\s+[آ-ی]+(?:\s+[آ-ی]+)*', 'company'), # مبالغ مالی - فقط مبالغ کامل (r'\d+\s*هزار\s*(?:و\s*)?\d*\s*(?:میلیارد|میلیون)\s*(?:ریال|تومان)', 'amount'), (r'\d+(?:,\d{3})*\s*(?:میلیارد|میلیون|هزار)\s*(?:ریال|تومان)', 'amount'), (r'\d+(?:\.\d+)?\s*(?:میلیارد|میلیون|هزار)\s*(?:ریال|تومان|همت)', 'amount'), (r'\d+\s*همت', 'amount'), (r'\d+\s*میلیون\s*تومان', 'amount'), (r'بیش\s+از\s+\d+\s*همت', 'amount'), (r'حدود\s+\d+\s*میلیون\s*تومان', 'amount'), # درصدها - فقط درصدهای کامل (r'\d+(?:\.\d+)?\s*درصد', 'percent'), (r'\d+\s*٪', 'percent'), # نام اشخاص - فقط با القاب یا عناوین مشخص (r'(?:آقای|خانم|مهندس|دکتر)\s+[آ-ی]+(?:\s+[آ-ی]+)+', 'person'), (r'[آ-ی]+\s+[آ-ی]+\s+مدیرعامل', 'person'), (r'مدیرعامل\s+[آ-ی]+(?:\s+[آ-ی]+)+', 'person'), ] # پردازش الگوها به ترتیب از طولانی‌ترین به کوتاه‌ترین for pattern, category in patterns: matches = list(re.finditer(pattern, anonymized, re.IGNORECASE)) # مرتب‌سازی matches بر اساس طول (طولانی‌ترین اول) matches.sort(key=lambda x: len(x.group(0)), reverse=True) for match in matches: matched_text = match.group(0) # بررسی که قبلاً جایگزین نشده باشد if matched_text in anonymized and matched_text not in self.mapping_table: self.counters[category] += 1 code = f"{category}-{self.counters[category]}" self.mapping_table[matched_text] = code anonymized = anonymized.replace(matched_text, code) logger.info(f"Replaced: {matched_text} -> {code}") logger.info(f"✅ Anonymization completed. Found {len(self.mapping_table)} entities.") return anonymized except Exception as e: return f"❌ Error in anonymization: {str(e)}" if lang == 'en' else f"❌ خطا در ناشناس‌سازی: {str(e)}" def send_to_chatgpt(self, anonymized_text, lang='fa'): """ارسال به ChatGPT""" try: if not anonymized_text or not anonymized_text.strip(): return "❌ Anonymized text is empty!" if lang == 'en' else "❌ متن ناشناس‌شده خالی است!" if not self.api_key: return "❌ API Key not configured! Please set OPENAI_API_KEY environment variable." if lang == 'en' else "❌ کلید API تنظیم نشده است! لطفاً OPENAI_API_KEY را در متغیرهای محیطی تنظیم کنید." system_msg = "You are a professional financial analyst. The text contains anonymous codes. Answer questions accurately." if lang == 'en' else "شما یک تحلیلگر مالی حرفه‌ای هستید. متن حاوی کدهای ناشناس است. به سوالات با دقت پاسخ دهید." headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" } data = { "model": "gpt-4o-mini", "messages": [ {"role": "system", "content": system_msg}, {"role": "user", "content": anonymized_text} ], "max_tokens": 2000, "temperature": 0.7 } response = requests.post( "https://api.openai.com/v1/chat/completions", headers=headers, json=data, timeout=30 ) if response.status_code == 200: result = response.json() return result['choices'][0]['message']['content'] else: error_data = response.json() if response.content else {} error_message = error_data.get('error', {}).get('message', response.text) if 'Incorrect API key' in error_message: return "❌ Invalid API key." if lang == 'en' else "❌ کلید API نامعتبر است." elif 'quota' in error_message: return "❌ API quota exceeded." if lang == 'en' else "❌ سهمیه API تمام شده است." else: return f"❌ API Error: {error_message}" except Exception as e: return f"❌ Error connecting to ChatGPT: {str(e)}" if lang == 'en' else f"❌ خطا در ارتباط با ChatGPT: {str(e)}" def deanonymize_response(self, gpt_response, lang='fa'): """بازگردانی""" try: if not gpt_response or not gpt_response.strip(): return "❌ ChatGPT response is empty!" if lang == 'en' else "❌ پاسخ ChatGPT خالی است!" if not self.mapping_table: return "❌ Mapping table is empty!" if lang == 'en' else "❌ جدول نگاشت خالی است!" final_result = gpt_response reverse_mapping = {code: original for original, code in self.mapping_table.items()} # جایگزینی از طولانی‌ترین کد اول sorted_codes = sorted(reverse_mapping.items(), key=lambda x: len(x[0]), reverse=True) for code, original in sorted_codes: final_result = final_result.replace(code, original) return final_result except Exception as e: return f"❌ Deanonymization error: {str(e)}" if lang == 'en' else f"❌ خطا در بازگردانی: {str(e)}" def process_all_steps(input_text, language): """پردازش خودکار تمام مراحل""" lang = 'en' if language == 'English' else 'fa' if not input_text.strip(): error_msg = "❌ Please enter input text!" if lang == 'en' else "❌ لطفاً متن ورودی را وارد کنید!" return error_msg, "", "", "" try: start_time = time.time() anonymized_text = anonymizer.anonymize_text(input_text, lang) if anonymized_text.startswith("❌"): return anonymized_text, "", "", "" gpt_response = anonymizer.send_to_chatgpt(anonymized_text, lang) if gpt_response.startswith("❌"): entities_found = len(anonymizer.mapping_table) success_msg = (f"✅ Anonymization completed!\n" f"📊 Total: {entities_found} entities protected") return success_msg, anonymized_text, gpt_response, "" final_result = anonymizer.deanonymize_response(gpt_response, lang) total_time = time.time() - start_time entities_found = len(anonymizer.mapping_table) # آمار تفصیلی company_count = anonymizer.counters['company'] amount_count = anonymizer.counters['amount'] percent_count = anonymizer.counters['percent'] person_count = anonymizer.counters['person'] success_msg = (f"🎉 Complete anonymization & restoration successful!\n" f"🏢 Companies: {company_count} | 💰 Amounts: {amount_count} | 📊 Percentages: {percent_count} | 👤 Persons: {person_count}\n" f"📊 Total: {entities_found} entities | ⏱️ Time: {total_time:.2f}s") return success_msg, anonymized_text, gpt_response, final_result except Exception as e: error_msg = f"❌ Processing error: {str(e)}" if lang == 'en' else f"❌ خطا در پردازش: {str(e)}" return error_msg, "", "", "" def get_mapping_table(language): """نمایش جدول نگاشت""" lang = 'en' if language == 'English' else 'fa' if not anonymizer.mapping_table: return "❌ Mapping table is empty! Please process some text first." if lang == 'en' else "❌ جدول نگاشت خالی است! ابتدا متنی را پردازش کنید." result = "📋 **Simple Mapping Table:**\n\n" if lang == 'en' else "📋 **جدول نگاشت ساده:**\n\n" # گروه‌بندی بر اساس نوع categories = { 'company': '🏢 **Companies**', 'amount': '💰 **Amounts**', 'percent': '📊 **Percentages**', 'person': '👤 **Persons**' } for category, title in categories.items(): category_items = {k: v for k, v in anonymizer.mapping_table.items() if v.startswith(category)} if category_items: result += f"{title}:\n" for original, code in category_items.items(): result += f" • `{original}` → `{code}`\n" result += "\n" # آمار کلی result += f"📊 **Summary**: {len(anonymizer.mapping_table)} total entities anonymized\n" return result def clear_all(): """پاک کردن همه""" anonymizer.mapping_table = {} anonymizer.counters = {key: 0 for key in anonymizer.counters.keys()} return "", "", "", "", "" def update_ui_text(language): """به‌روزرسانی متن‌های رابط کاربری""" if language == 'English': return { 'title': 'Simple Business Data Anonymization System', 'step1': 'Input Text & Settings', 'step2': 'Anonymized Text', 'step3': 'Raw ChatGPT Response', 'step4': 'Final Restored Response', 'input_placeholder': 'Enter your business text here...\nExample: Company names, financial amounts, percentages, executive names...', 'process_btn': 'Process with Simple Detection', 'clear_btn': 'Clear All', 'mapping_btn': 'Show Simple Mapping Table', 'direction': 'ltr' } else: return { 'title': 'سیستم ناشناس‌سازی ساده اطلاعات تجاری', 'step1': 'متن ورودی و تنظیمات', 'step2': 'متن ناشناس‌شده', 'step3': 'پاسخ خام ChatGPT', 'step4': 'پاسخ نهایی بازگردانده شده', 'input_placeholder': 'متن تجاری خود را اینجا وارد کنید...\nمثال: نام شرکت‌ها، مبالغ مالی، درصدها، نام مدیران...', 'process_btn': 'پردازش با تشخیص ساده', 'clear_btn': 'پاک کردن همه', 'mapping_btn': 'نمایش جدول نگاشت ساده', 'direction': 'rtl' } def update_interface(language): """تغییر رابط کاربری بر اساس زبان""" ui_text = update_ui_text(language) is_english = (language == 'English') workflow_css = "workflow ltr" if is_english else "workflow rtl" return [ gr.update(value=f"

📊 {ui_text['title']}

"), gr.update(value=f"

📁 {ui_text['step1']}

"), gr.update(placeholder=ui_text['input_placeholder'], rtl=not is_english), gr.update(value=f"🚀 {ui_text['process_btn']}"), gr.update(value=f"🗑️ {ui_text['clear_btn']}"), gr.update(rtl=not is_english), gr.update(value=f"

🎭 {ui_text['step2']}

"), gr.update(rtl=not is_english), gr.update(value=f"

🤖 {ui_text['step3']}

"), gr.update(rtl=not is_english), gr.update(value=f"

✅ {ui_text['step4']}

"), gr.update(rtl=not is_english), gr.update(value=f"📋 {ui_text['mapping_btn']}"), gr.update(rtl=not is_english), gr.update(elem_classes=workflow_css) ] # ایجاد instance anonymizer = SimpleAnonymizer() # CSS اصلاح شده custom_css = """ body, .gradio-container { font-family: 'Segoe UI', Tahoma, Arial, sans-serif !important; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%) !important; min-height: 100vh !important; padding: 20px !important; } .rtl { direction: rtl !important; text-align: right !important; } .ltr { direction: ltr !important; text-align: left !important; } .workflow { display: grid !important; grid-template-columns: 1fr 1fr 1fr 1fr !important; gap: 25px !important; padding: 30px !important; align-items: start !important; align-content: start !important; grid-auto-rows: auto !important; } .workflow > * { align-self: start !important; vertical-align: top !important; margin-top: 0 !important; } .workflow .gradio-column, .workflow-column { display: flex !important; flex-direction: column !important; align-items: stretch !important; justify-content: flex-start !important; height: auto !important; min-height: 0 !important; margin-top: 0 !important; padding-top: 0 !important; } .gradio-textbox { border-radius: 10px !important; box-shadow: 0 4px 15px rgba(0,0,0,0.1) !important; flex-grow: 1 !important; min-height: 380px !important; max-height: 380px !important; height: 380px !important; } .gradio-textbox textarea { min-height: 350px !important; max-height: 350px !important; height: 350px !important; resize: vertical !important; } .workflow.rtl { direction: rtl !important; } .workflow.ltr { direction: ltr !important; } h1, h2, h3 { text-shadow: 2px 2px 4px rgba(0,0,0,0.3) !important; margin-top: 0 !important; margin-bottom: 10px !important; padding-top: 0 !important; line-height: 1.2 !important; } h2 { min-height: 40px !important; max-height: 40px !important; display: flex !important; align-items: center !important; margin-bottom: 15px !important; } .status-box { background: linear-gradient(135deg, #4CAF50, #45a049) !important; border: 3px solid #2E7D32 !important; border-radius: 15px !important; padding: 15px !important; margin: 10px 0 !important; box-shadow: 0 8px 32px rgba(76, 175, 80, 0.3) !important; animation: pulse 2s infinite !important; min-height: 120px !important; max-height: 120px !important; } .status-box textarea { background: rgba(255, 255, 255, 0.95) !important; border: none !important; border-radius: 10px !important; font-weight: bold !important; font-size: 1.1em !important; color: #1B5E20 !important; text-shadow: 1px 1px 2px rgba(255, 255, 255, 0.8) !important; min-height: 80px !important; max-height: 80px !important; } @keyframes pulse { 0% { box-shadow: 0 8px 32px rgba(76, 175, 80, 0.3); } 50% { box-shadow: 0 8px 40px rgba(76, 175, 80, 0.6); } 100% { box-shadow: 0 8px 32px rgba(76, 175, 80, 0.3); } } .gradio-button { border-radius: 25px !important; font-weight: bold !important; transition: all 0.3s ease !important; margin: 5px 0 !important; min-height: 50px !important; max-height: 50px !important; } .gradio-button:hover { transform: translateY(-2px) !important; box-shadow: 0 6px 20px rgba(0,0,0,0.2) !important; } h1 { background: linear-gradient(45deg, #FFD700, #FFA500) !important; -webkit-background-clip: text !important; -webkit-text-fill-color: transparent !important; background-clip: text !important; min-height: 80px !important; } @media (max-width: 1200px) { .workflow { grid-template-columns: 1fr 1fr !important; gap: 20px !important; } } @media (max-width: 768px) { .workflow { grid-template-columns: 1fr !important; gap: 15px !important; } .gradio-textbox { min-height: 300px !important; max-height: 300px !important; height: 300px !important; } } """ # رابط کاربری Gradio with gr.Blocks(title="📊 Simple Anonymization System", theme=gr.themes.Soft(), css=custom_css) as app: with gr.Row(): language_selector = gr.Radio( choices=["فارسی", "English"], value="فارسی", label="Language / زبان", interactive=True ) with gr.Column(): title = gr.HTML("

📊 سیستم ناشناس‌سازی ساده اطلاعات تجاری

") with gr.Row(elem_classes="workflow rtl") as workflow_row: with gr.Column(elem_classes="workflow-column"): step1_title = gr.HTML('

📁 متن ورودی و تنظیمات

') input_text = gr.Textbox( lines=15, placeholder="متن تجاری خود را اینجا وارد کنید...\nمثال: نام شرکت‌ها، مبالغ مالی، درصدها، نام مدیران...", label="", rtl=True ) process_btn = gr.Button("🚀 پردازش با تشخیص ساده", variant="primary") clear_btn = gr.Button("🗑️ پاک کردن همه", variant="stop") status = gr.Textbox( label="وضعیت", lines=4, interactive=False, rtl=True, elem_classes=["status-box"] ) with gr.Column(elem_classes="workflow-column"): step2_title = gr.HTML('

🎭 متن ناشناس‌شده

') anonymized_output = gr.Textbox( lines=15, placeholder="متن ناشناس‌شده اینجا نمایش داده می‌شود...", label="", interactive=False, rtl=True ) with gr.Column(elem_classes="workflow-column"): step3_title = gr.HTML('

🤖 پاسخ خام ChatGPT

') gpt_output = gr.Textbox( lines=15, placeholder="پاسخ خام ChatGPT اینجا نمایش داده می‌شود...", label="", interactive=False, rtl=True ) with gr.Column(elem_classes="workflow-column"): step4_title = gr.HTML('

✅ پاسخ نهایی بازگردانده شده

') final_output = gr.Textbox( lines=15, placeholder="پاسخ نهایی اینجا نمایش داده می‌شود...", label="", interactive=False, rtl=True ) with gr.Row(): with gr.Column(): mapping_title = gr.HTML('

🗂️ جدول نگاشت ساده

') mapping_btn = gr.Button("📋 نمایش جدول نگاشت ساده") mapping_output = gr.Textbox( lines=10, label="جدول نگاشت اطلاعات", interactive=False, visible=False, rtl=True ) # Event handlers language_selector.change( fn=update_interface, inputs=[language_selector], outputs=[title, step1_title, input_text, process_btn, clear_btn, status, step2_title, anonymized_output, step3_title, gpt_output, step4_title, final_output, mapping_btn, mapping_output, workflow_row] ) process_btn.click( fn=process_all_steps, inputs=[input_text, language_selector], outputs=[status, anonymized_output, gpt_output, final_output] ) clear_btn.click( fn=clear_all, outputs=[input_text, anonymized_output, gpt_output, final_output, status] ) mapping_btn.click( fn=get_mapping_table, inputs=[language_selector], outputs=[mapping_output] ) mapping_btn.click( fn=lambda: gr.update(visible=True), outputs=[mapping_output] ) if __name__ == "__main__": app.launch()