import gradio as gr import re import os import requests import time import logging from packaging import version # تنظیم logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def auto_setup_models(): """راهاندازی خودکار مدلها در صورت عدم وجود""" models_dir = "./models" required_models = { 'bert-fa-ner': 'HooshvareLab/bert-fa-zwnj-base-ner', 'bert-base-NER': 'dslim/bert-base-NER', } missing_models = [] for model_name in required_models.keys(): model_path = os.path.join(models_dir, model_name) if not os.path.exists(model_path) or not os.listdir(model_path): missing_models.append(model_name) if not missing_models: logger.info("✅ All models are already available") return True logger.info(f"📥 Auto-downloading missing models: {missing_models}") try: from transformers import AutoTokenizer, AutoModelForTokenClassification os.makedirs(models_dir, exist_ok=True) for model_name in missing_models: hf_repo = required_models[model_name] model_path = os.path.join(models_dir, model_name) logger.info(f"📥 Downloading {model_name} from {hf_repo}...") try: tokenizer = AutoTokenizer.from_pretrained(hf_repo) model = AutoModelForTokenClassification.from_pretrained(hf_repo) tokenizer.save_pretrained(model_path) model.save_pretrained(model_path) logger.info(f"✅ {model_name} downloaded successfully") del tokenizer, model except Exception as e: logger.error(f"❌ Failed to download {model_name}: {e}") if os.path.exists(model_path): import shutil shutil.rmtree(model_path) logger.info("🎉 Auto-setup completed!") return True except ImportError: logger.error("❌ transformers library not available for auto-download") return False except Exception as e: logger.error(f"❌ Auto-setup failed: {e}") return False # اجرای auto-setup در startup try: auto_setup_models() except Exception as e: logger.warning(f"⚠️ Auto-setup encountered an issue: {e}") logger.info("ℹ️ Continuing with manual setup...") class SimpleAnonymizer: def __init__(self): self.mapping_table = {} self.counters = { 'company': 0, 'person': 0, 'amount': 0, 'percent': 0 } self.api_key = os.getenv("OPENAI_API_KEY", "") def anonymize_text(self, original_text, lang='fa'): """ناشناسسازی ساده و دقیق""" try: if not original_text or not original_text.strip(): return "❌ Please enter input text!" if lang == 'en' else "❌ لطفاً متن ورودی را وارد کنید!" # ریست متغیرها self.mapping_table = {} self.counters = {key: 0 for key in self.counters.keys()} anonymized = original_text # الگوهای ساده و دقیق patterns = [ # شرکتها - فقط نامهای کامل شرکتها (r'ایران\s+خودرو', 'company'), (r'سایپا', 'company'), (r'بانک\s+[آ-ی]+(?:\s+[آ-ی]+)?', 'company'), (r'شرکت\s+[آ-ی]+(?:\s+[آ-ی]+)*', 'company'), (r'گروه\s+[آ-ی]+(?:\s+[آ-ی]+)*', 'company'), (r'موسسه\s+[آ-ی]+(?:\s+[آ-ی]+)*', 'company'), # مبالغ مالی - فقط مبالغ کامل (r'\d+\s*هزار\s*(?:و\s*)?\d*\s*(?:میلیارد|میلیون)\s*(?:ریال|تومان)', 'amount'), (r'\d+(?:,\d{3})*\s*(?:میلیارد|میلیون|هزار)\s*(?:ریال|تومان)', 'amount'), (r'\d+(?:\.\d+)?\s*(?:میلیارد|میلیون|هزار)\s*(?:ریال|تومان|همت)', 'amount'), (r'\d+\s*همت', 'amount'), (r'\d+\s*میلیون\s*تومان', 'amount'), (r'بیش\s+از\s+\d+\s*همت', 'amount'), (r'حدود\s+\d+\s*میلیون\s*تومان', 'amount'), # درصدها - فقط درصدهای کامل (r'\d+(?:\.\d+)?\s*درصد', 'percent'), (r'\d+\s*٪', 'percent'), # نام اشخاص - فقط با القاب یا عناوین مشخص (r'(?:آقای|خانم|مهندس|دکتر)\s+[آ-ی]+(?:\s+[آ-ی]+)+', 'person'), (r'[آ-ی]+\s+[آ-ی]+\s+مدیرعامل', 'person'), (r'مدیرعامل\s+[آ-ی]+(?:\s+[آ-ی]+)+', 'person'), ] # پردازش الگوها به ترتیب از طولانیترین به کوتاهترین for pattern, category in patterns: matches = list(re.finditer(pattern, anonymized, re.IGNORECASE)) # مرتبسازی matches بر اساس طول (طولانیترین اول) matches.sort(key=lambda x: len(x.group(0)), reverse=True) for match in matches: matched_text = match.group(0) # بررسی که قبلاً جایگزین نشده باشد if matched_text in anonymized and matched_text not in self.mapping_table: self.counters[category] += 1 code = f"{category}-{self.counters[category]}" self.mapping_table[matched_text] = code anonymized = anonymized.replace(matched_text, code) logger.info(f"Replaced: {matched_text} -> {code}") logger.info(f"✅ Anonymization completed. Found {len(self.mapping_table)} entities.") return anonymized except Exception as e: return f"❌ Error in anonymization: {str(e)}" if lang == 'en' else f"❌ خطا در ناشناسسازی: {str(e)}" def send_to_chatgpt(self, anonymized_text, lang='fa'): """ارسال به ChatGPT""" try: if not anonymized_text or not anonymized_text.strip(): return "❌ Anonymized text is empty!" if lang == 'en' else "❌ متن ناشناسشده خالی است!" if not self.api_key: return "❌ API Key not configured! Please set OPENAI_API_KEY environment variable." if lang == 'en' else "❌ کلید API تنظیم نشده است! لطفاً OPENAI_API_KEY را در متغیرهای محیطی تنظیم کنید." system_msg = "You are a professional financial analyst. The text contains anonymous codes. Answer questions accurately." if lang == 'en' else "شما یک تحلیلگر مالی حرفهای هستید. متن حاوی کدهای ناشناس است. به سوالات با دقت پاسخ دهید." headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" } data = { "model": "gpt-4o-mini", "messages": [ {"role": "system", "content": system_msg}, {"role": "user", "content": anonymized_text} ], "max_tokens": 2000, "temperature": 0.7 } response = requests.post( "https://api.openai.com/v1/chat/completions", headers=headers, json=data, timeout=30 ) if response.status_code == 200: result = response.json() return result['choices'][0]['message']['content'] else: error_data = response.json() if response.content else {} error_message = error_data.get('error', {}).get('message', response.text) if 'Incorrect API key' in error_message: return "❌ Invalid API key." if lang == 'en' else "❌ کلید API نامعتبر است." elif 'quota' in error_message: return "❌ API quota exceeded." if lang == 'en' else "❌ سهمیه API تمام شده است." else: return f"❌ API Error: {error_message}" except Exception as e: return f"❌ Error connecting to ChatGPT: {str(e)}" if lang == 'en' else f"❌ خطا در ارتباط با ChatGPT: {str(e)}" def deanonymize_response(self, gpt_response, lang='fa'): """بازگردانی""" try: if not gpt_response or not gpt_response.strip(): return "❌ ChatGPT response is empty!" if lang == 'en' else "❌ پاسخ ChatGPT خالی است!" if not self.mapping_table: return "❌ Mapping table is empty!" if lang == 'en' else "❌ جدول نگاشت خالی است!" final_result = gpt_response reverse_mapping = {code: original for original, code in self.mapping_table.items()} # جایگزینی از طولانیترین کد اول sorted_codes = sorted(reverse_mapping.items(), key=lambda x: len(x[0]), reverse=True) for code, original in sorted_codes: final_result = final_result.replace(code, original) return final_result except Exception as e: return f"❌ Deanonymization error: {str(e)}" if lang == 'en' else f"❌ خطا در بازگردانی: {str(e)}" def process_all_steps(input_text, language): """پردازش خودکار تمام مراحل""" lang = 'en' if language == 'English' else 'fa' if not input_text.strip(): error_msg = "❌ Please enter input text!" if lang == 'en' else "❌ لطفاً متن ورودی را وارد کنید!" return error_msg, "", "", "" try: start_time = time.time() anonymized_text = anonymizer.anonymize_text(input_text, lang) if anonymized_text.startswith("❌"): return anonymized_text, "", "", "" gpt_response = anonymizer.send_to_chatgpt(anonymized_text, lang) if gpt_response.startswith("❌"): entities_found = len(anonymizer.mapping_table) success_msg = (f"✅ Anonymization completed!\n" f"📊 Total: {entities_found} entities protected") return success_msg, anonymized_text, gpt_response, "" final_result = anonymizer.deanonymize_response(gpt_response, lang) total_time = time.time() - start_time entities_found = len(anonymizer.mapping_table) # آمار تفصیلی company_count = anonymizer.counters['company'] amount_count = anonymizer.counters['amount'] percent_count = anonymizer.counters['percent'] person_count = anonymizer.counters['person'] success_msg = (f"🎉 Complete anonymization & restoration successful!\n" f"🏢 Companies: {company_count} | 💰 Amounts: {amount_count} | 📊 Percentages: {percent_count} | 👤 Persons: {person_count}\n" f"📊 Total: {entities_found} entities | ⏱️ Time: {total_time:.2f}s") return success_msg, anonymized_text, gpt_response, final_result except Exception as e: error_msg = f"❌ Processing error: {str(e)}" if lang == 'en' else f"❌ خطا در پردازش: {str(e)}" return error_msg, "", "", "" def get_mapping_table(language): """نمایش جدول نگاشت""" lang = 'en' if language == 'English' else 'fa' if not anonymizer.mapping_table: return "❌ Mapping table is empty! Please process some text first." if lang == 'en' else "❌ جدول نگاشت خالی است! ابتدا متنی را پردازش کنید." result = "📋 **Simple Mapping Table:**\n\n" if lang == 'en' else "📋 **جدول نگاشت ساده:**\n\n" # گروهبندی بر اساس نوع categories = { 'company': '🏢 **Companies**', 'amount': '💰 **Amounts**', 'percent': '📊 **Percentages**', 'person': '👤 **Persons**' } for category, title in categories.items(): category_items = {k: v for k, v in anonymizer.mapping_table.items() if v.startswith(category)} if category_items: result += f"{title}:\n" for original, code in category_items.items(): result += f" • `{original}` → `{code}`\n" result += "\n" # آمار کلی result += f"📊 **Summary**: {len(anonymizer.mapping_table)} total entities anonymized\n" return result def clear_all(): """پاک کردن همه""" anonymizer.mapping_table = {} anonymizer.counters = {key: 0 for key in anonymizer.counters.keys()} return "", "", "", "", "" def update_ui_text(language): """بهروزرسانی متنهای رابط کاربری""" if language == 'English': return { 'title': 'Simple Business Data Anonymization System', 'step1': 'Input Text & Settings', 'step2': 'Anonymized Text', 'step3': 'Raw ChatGPT Response', 'step4': 'Final Restored Response', 'input_placeholder': 'Enter your business text here...\nExample: Company names, financial amounts, percentages, executive names...', 'process_btn': 'Process with Simple Detection', 'clear_btn': 'Clear All', 'mapping_btn': 'Show Simple Mapping Table', 'direction': 'ltr' } else: return { 'title': 'سیستم ناشناسسازی ساده اطلاعات تجاری', 'step1': 'متن ورودی و تنظیمات', 'step2': 'متن ناشناسشده', 'step3': 'پاسخ خام ChatGPT', 'step4': 'پاسخ نهایی بازگردانده شده', 'input_placeholder': 'متن تجاری خود را اینجا وارد کنید...\nمثال: نام شرکتها، مبالغ مالی، درصدها، نام مدیران...', 'process_btn': 'پردازش با تشخیص ساده', 'clear_btn': 'پاک کردن همه', 'mapping_btn': 'نمایش جدول نگاشت ساده', 'direction': 'rtl' } def update_interface(language): """تغییر رابط کاربری بر اساس زبان""" ui_text = update_ui_text(language) is_english = (language == 'English') workflow_css = "workflow ltr" if is_english else "workflow rtl" return [ gr.update(value=f"