import gradio as gr import re import os import requests import json import logging import time from typing import Dict, List, Tuple, Optional from chatgpt_sender import ChatGPTSender logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class MultiModelSender: """کلاس واحد برای ارسال به مدل‌های مختلف با retry mechanism""" def __init__(self): # دریافت API Keys از environment و trim کردن آنها self.openai_key = os.getenv("OPENAI_API_KEY", "").strip() if os.getenv("OPENAI_API_KEY") else None self.grok_key = os.getenv("XAI_API_KEY", "").strip() if os.getenv("XAI_API_KEY") else None self.gemini_key = os.getenv("GEMINI_API_KEY", "").strip() if os.getenv("GEMINI_API_KEY") else None self.deepseek_key = os.getenv("DEEPSEEK_API_KEY", "").strip() if os.getenv("DEEPSEEK_API_KEY") else None # ایجاد instance از ChatGPTSender self.chatgpt_sender = ChatGPTSender(api_key=self.openai_key, model="gpt-4o-mini") logger.info("✅ MultiModelSender مقداردهی شد") def send_to_chatgpt(self, text: str, system_msg: Optional[str] = None) -> str: """ ارسال به ChatGPT با استفاده از ماژول پیشرفته """ try: if system_msg: return self.chatgpt_sender.send( text=text, system_msg=system_msg, max_tokens=4096, temperature=0.1, lang='fa', retry_count=3 ) else: return self.chatgpt_sender.send_simple(text, lang='fa') except Exception as e: logger.error(f"❌ ChatGPT Error: {e}") return f"❌ خطا در ChatGPT: {str(e)}" def send_to_grok(self, text: str, system_msg: Optional[str] = None, retry_count: int = 3) -> str: """ارسال به Grok (xAI) با retry mechanism""" if not self.grok_key: return "❌ XAI_API_KEY (Grok) موجود نیست" messages = [] if system_msg: messages.append({"role": "system", "content": system_msg}) messages.append({"role": "user", "content": text}) for attempt in range(retry_count): try: logger.info(f"📤 ارسال به Grok (تلاش {attempt + 1}/{retry_count})...") response = requests.post( "https://api.x.ai/v1/chat/completions", headers={ "Authorization": f"Bearer {self.grok_key}", "Content-Type": "application/json" }, json={ "model": "grok-beta", "messages": messages, "temperature": 0.1, "max_tokens": 4096 }, timeout=60 ) if response.status_code == 200: logger.info("✅ Grok: پاسخ دریافت شد") return response.json()['choices'][0]['message']['content'].strip() elif response.status_code == 429: wait_time = 5 * (attempt + 1) logger.warning(f"⚠️ Grok Rate limit | صبر: {wait_time} ثانیه") if attempt < retry_count - 1: time.sleep(wait_time) continue else: return "❌ سهمیه Grok تمام شده است" elif response.status_code == 401: return "❌ کلید API Grok نامعتبر است!" elif response.status_code in [502, 503, 504]: wait_time = 2 * (attempt + 1) logger.warning(f"⚠️ Grok Server error | صبر: {wait_time} ثانیه") if attempt < retry_count - 1: time.sleep(wait_time) continue else: return f"❌ خطای سرور Grok: {response.status_code}" else: error_msg = response.text logger.error(f"❌ Grok API Error: {error_msg}") return f"❌ Grok API Error: {response.status_code}" except requests.exceptions.Timeout: logger.warning("⚠️ Grok Timeout | تلاش مجدد...") if attempt < retry_count - 1: time.sleep(3) continue else: return "❌ خطای اتصال به Grok: timeout" except Exception as e: logger.error(f"❌ Grok Exception: {e}") if attempt < retry_count - 1: time.sleep(2) continue else: return f"❌ خطا در Grok: {str(e)}" return "❌ خطای ناشناخته در Grok" def send_to_gemini(self, text: str, system_msg: Optional[str] = None, retry_count: int = 3) -> str: """ارسال به Google Gemini با retry mechanism""" if not self.gemini_key: return "❌ GEMINI_API_KEY موجود نیست" # ترکیب system message و text full_prompt = f"{system_msg}\n\n{text}" if system_msg else text for attempt in range(retry_count): try: logger.info(f"📤 ارسال به Gemini (تلاش {attempt + 1}/{retry_count})...") response = requests.post( f"https://generativelanguage.googleapis.com/v1beta/models/gemini-pro:generateContent?key={self.gemini_key}", headers={ "Content-Type": "application/json" }, json={ "contents": [{ "parts": [{"text": full_prompt}] }], "generationConfig": { "temperature": 0.1, "maxOutputTokens": 4096 } }, timeout=60 ) if response.status_code == 200: result = response.json() logger.info("✅ Gemini: پاسخ دریافت شد") return result['candidates'][0]['content']['parts'][0]['text'].strip() elif response.status_code == 429: wait_time = 5 * (attempt + 1) logger.warning(f"⚠️ Gemini Rate limit | صبر: {wait_time} ثانیه") if attempt < retry_count - 1: time.sleep(wait_time) continue else: return "❌ سهمیه Gemini تمام شده است" elif response.status_code == 400: error_data = response.json() error_msg = error_data.get('error', {}).get('message', 'خطای نامعتبر') return f"❌ خطای Gemini: {error_msg}" elif response.status_code in [502, 503, 504]: wait_time = 2 * (attempt + 1) logger.warning(f"⚠️ Gemini Server error | صبر: {wait_time} ثانیه") if attempt < retry_count - 1: time.sleep(wait_time) continue else: return f"❌ خطای سرور Gemini: {response.status_code}" else: error_msg = response.text logger.error(f"❌ Gemini API Error: {error_msg}") return f"❌ Gemini API Error: {response.status_code}" except requests.exceptions.Timeout: logger.warning("⚠️ Gemini Timeout | تلاش مجدد...") if attempt < retry_count - 1: time.sleep(3) continue else: return "❌ خطای اتصال به Gemini: timeout" except Exception as e: logger.error(f"❌ Gemini Exception: {e}") if attempt < retry_count - 1: time.sleep(2) continue else: return f"❌ خطا در Gemini: {str(e)}" return "❌ خطای ناشناخته در Gemini" def send_to_deepseek(self, text: str, system_msg: Optional[str] = None, retry_count: int = 3) -> str: """ارسال به DeepSeek با retry mechanism""" if not self.deepseek_key: return "❌ DEEPSEEK_API_KEY موجود نیست" messages = [] if system_msg: messages.append({"role": "system", "content": system_msg}) messages.append({"role": "user", "content": text}) for attempt in range(retry_count): try: logger.info(f"📤 ارسال به DeepSeek (تلاش {attempt + 1}/{retry_count})...") response = requests.post( "https://api.deepseek.com/v1/chat/completions", headers={ "Authorization": f"Bearer {self.deepseek_key}", "Content-Type": "application/json" }, json={ "model": "deepseek-chat", "messages": messages, "temperature": 0.1, "max_tokens": 4096 }, timeout=60 ) if response.status_code == 200: logger.info("✅ DeepSeek: پاسخ دریافت شد") return response.json()['choices'][0]['message']['content'].strip() elif response.status_code == 429: wait_time = 5 * (attempt + 1) logger.warning(f"⚠️ DeepSeek Rate limit | صبر: {wait_time} ثانیه") if attempt < retry_count - 1: time.sleep(wait_time) continue else: return "❌ سهمیه DeepSeek تمام شده است" elif response.status_code == 401: return "❌ کلید API DeepSeek نامعتبر است!" elif response.status_code in [502, 503, 504]: wait_time = 2 * (attempt + 1) logger.warning(f"⚠️ DeepSeek Server error | صبر: {wait_time} ثانیه") if attempt < retry_count - 1: time.sleep(wait_time) continue else: return f"❌ خطای سرور DeepSeek: {response.status_code}" else: error_msg = response.text logger.error(f"❌ DeepSeek API Error: {error_msg}") return f"❌ DeepSeek API Error: {response.status_code}" except requests.exceptions.Timeout: logger.warning("⚠️ DeepSeek Timeout | تلاش مجدد...") if attempt < retry_count - 1: time.sleep(3) continue else: return "❌ خطای اتصال به DeepSeek: timeout" except Exception as e: logger.error(f"❌ DeepSeek Exception: {e}") if attempt < retry_count - 1: time.sleep(2) continue else: return f"❌ خطا در DeepSeek: {str(e)}" return "❌ خطای ناشناخته در DeepSeek" def send_message(self, text: str, model_name: str, system_msg: Optional[str] = None) -> str: """ ارسال پیام به مدل انتخاب شده Args: text: متن ارسالی model_name: نام مدل system_msg: پیام سیستم (اختیاری) Returns: پاسخ مدل """ logger.info(f"🤖 ارسال به {model_name}...") if model_name == "ChatGPT-4o-mini": return self.send_to_chatgpt(text, system_msg) elif model_name == "Grok": return self.send_to_grok(text, system_msg) elif model_name == "Gemini": return self.send_to_gemini(text, system_msg) elif model_name == "DeepSeek": return self.send_to_deepseek(text, system_msg) else: return f"❌ مدل نامعتبر: {model_name}" class AnonymizerAdvanced: """ناشناس‌ساز پیشرفته با Cerebras""" def __init__(self, cerebras_key: str = None): self.cerebras_key = cerebras_key or os.getenv("CEREBRAS_API_KEY") self.mapping_table = {} self.reverse_mapping = {} # ایجاد instance از MultiModelSender self.model_sender = MultiModelSender() logger.info("✅ Anonymizer Advanced مقداردهی شد") def anonymize_with_cerebras(self, text: str) -> Tuple[str, Dict]: """ناشناس‌سازی با Cerebras - دریافت mapping از مدل""" logger.info("🧠 روش Cerebras...") if not self.cerebras_key: logger.error("❌ Cerebras API Key موجود نیست") raise ValueError("Cerebras API Key مورد نیاز است") try: # مرحله 1: ناشناس‌سازی متن prompt1 = f"""متن زیر را ناشناس کنید. قوانین: 1. اسامی اشخاص → person-01, person-02, ... 2. نام شرکت‌ها/سازمان‌ها → company-01, company-02, ... 3. مقادیر پولی → amount-01, amount-02, ... 4. درصدها → percent-01, percent-02, ... 5. فقط این توکن‌ها استفاده کنید 6. شماره‌های نسخه را درست حفظ کنید 7. اگر موجودیت تکرار شود از شماره قدیمی استفاده کنید متن: {text} خروجی: فقط متن ناشناس شده""" response1 = requests.post( "https://api.cerebras.ai/v1/chat/completions", headers={ "Authorization": f"Bearer {self.cerebras_key}", "Content-Type": "application/json" }, json={ "model": "llama-3.3-70b", "messages": [{"role": "user", "content": prompt1}], "max_tokens": 4096, "temperature": 0.1 }, timeout=60 ) if response1.status_code != 200: logger.error(f"❌ Cerebras Error: {response1.status_code}") raise Exception(f"Cerebras API Error: {response1.status_code}") anonymized_text = response1.json()['choices'][0]['message']['content'].strip() logger.info("✅ Cerebras: ناشناس‌سازی موفق") # مرحله 2: استخراج mapping از مدل prompt2 = f"""متن اصلی: {text} متن ناشناس شده: {anonymized_text} لطفاً یک جدول mapping برای همه توکن‌های ناشناس ایجاد کن. برای هر توکن، متن اصلی کامل آن را مشخص کن. **مهم:** - برای person-XX: نام کامل شخص (مثلاً "علی احمدی") - برای company-XX: نام کامل شرکت/سازمان (مثلاً "شرکت پتروشیمی") - برای amount-XX: عدد + واحد (مثلاً "80 هزار تومان" یا "50 میلیارد ریال") - برای percent-XX: عدد + کلمه "درصد" (مثلاً "40 درصد" نه فقط "40") خروجی را به این فرمت JSON بده (فقط JSON، بدون توضیح اضافی): {{ "person-01": "متن اصلی کامل", "company-01": "متن اصلی کامل", "amount-01": "متن اصلی کامل با واحد", "percent-01": "عدد + درصد", ... }}""" response2 = requests.post( "https://api.cerebras.ai/v1/chat/completions", headers={ "Authorization": f"Bearer {self.cerebras_key}", "Content-Type": "application/json" }, json={ "model": "llama-3.3-70b", "messages": [{"role": "user", "content": prompt2}], "max_tokens": 2048, "temperature": 0.1 }, timeout=60 ) if response2.status_code == 200: mapping_text = response2.json()['choices'][0]['message']['content'].strip() mapping_text = mapping_text.replace('```json', '').replace('```', '').strip() try: self.mapping_table = json.loads(mapping_text) self._fix_percent_mapping() self.reverse_mapping = {v: k for k, v in self.mapping_table.items()} logger.info(f"✅ Mapping استخراج شد: {len(self.mapping_table)} موجودیت") except json.JSONDecodeError: logger.warning("⚠️ خطا در parse کردن JSON mapping - استفاده از روش fallback") self._extract_mapping_from_text(text, anonymized_text) else: logger.warning("⚠️ خطا در دریافت mapping - استفاده از روش fallback") self._extract_mapping_from_text(text, anonymized_text) return anonymized_text, self.mapping_table except Exception as e: logger.error(f"❌ Cerebras Exception: {e}") raise def _fix_percent_mapping(self): """اصلاح mapping برای درصدها و مقادیر""" for token, value in self.mapping_table.items(): value_str = str(value).strip() if token.startswith('percent-'): if not re.search(r'(درصد|%|درصدی)', value_str): self.mapping_table[token] = f"{value_str} درصد" logger.info(f"✅ اصلاح {token}: '{value_str}' → '{value_str} درصد'") elif token.startswith('amount-'): if not re.search(r'(میلیارد|میلیون|هزار|تومان|ریال|دلار|یورو|تن)', value_str): logger.warning(f"⚠️ {token}: فقط عدد '{value_str}' - واحد مشخص نیست") def _extract_mapping_from_text(self, original: str, anonymized: str): """استخراج mapping از متن‌های اصلی و ناشناس شده""" all_tokens = [] for entity_type in ['person', 'company', 'amount', 'percent']: tokens = re.findall(f'{entity_type}-\\d+', anonymized) all_tokens.extend([(t, entity_type) for t in tokens]) all_tokens = sorted(set(all_tokens), key=lambda x: (x[1], int(x[0].split('-')[1]))) patterns = { 'person': r'\b[ء-ي]+\s+[ء-ي]+(?:\s+[ء-ي]+)*\b', 'company': r'(?:شرکت|بانک|سازمان|گروه|هلدینگ)\s+[ء-ي]+(?:\s+[ء-ي]+)*', 'amount': r'\d+(?:\.\d+)?\s*(?:میلیارد|میلیون|هزار|تومان|ریال|دلار|یورو|تن)', 'percent': r'\d+(?:\.\d+)?\s*(?:درصد|%|درصدی)', } original_entities = {} for entity_type, pattern in patterns.items(): matches = list(re.finditer(pattern, original)) original_entities[entity_type] = [m.group().strip() for m in matches] for token, entity_type in all_tokens: if entity_type in original_entities and original_entities[entity_type]: token_num = int(token.split('-')[1]) - 1 if token_num < len(original_entities[entity_type]): original_text = original_entities[entity_type][token_num] self.mapping_table[token] = original_text self.reverse_mapping[original_text] = token else: original_text = original_entities[entity_type][-1] if token not in self.mapping_table: self.mapping_table[token] = original_text self.reverse_mapping[original_text] = token def analyze_with_model(self, anonymized_text: str, analysis_prompt: str, model_name: str) -> str: """ اجرای پرامپت‌های درون متن ناشناس‌سازی شده با مدل انتخابی """ logger.info(f"🤖 {model_name} اجرای پرامپت...") if not analysis_prompt or not analysis_prompt.strip(): logger.info("⚠️ پرامپتی وارد نشده - متن ناشناس‌سازی شده برگردانده می‌شود") return anonymized_text try: # ساخت system message system_msg = """شما یک تحلیلگر مالی حرفه‌ای هستید. متن حاوی کدهای ناشناس است (person-XX، company-XX، amount-XX، percent-XX). به سوالات و درخواست‌ها با دقت پاسخ دهید و این کدها را در پاسخ خود حفظ کنید.""" # ساخت پیام کامل full_text = f"""{analysis_prompt} متن برای تحلیل: {anonymized_text}""" # ارسال به مدل انتخابی response = self.model_sender.send_message(full_text, model_name, system_msg) logger.info(f"✅ {model_name} پاسخ داد: {len(response)} کاراکتر") return response except Exception as e: logger.error(f"❌ {model_name} Exception: {e}") return f"❌ خطا در {model_name}: {str(e)}" def restore_text(self, anonymized_text: str) -> str: """بازگردانی متن ناشناس‌سازی شده به متن اصلی""" logger.info("🔄 بازگردانی متن...") if not self.mapping_table: logger.warning("⚠️ جدول نگاشت خالی است") return anonymized_text restored = anonymized_text for placeholder, original in sorted(self.mapping_table.items()): restored = restored.replace(placeholder, original) logger.info("✅ بازگردانی کامل") return restored def get_mapping_table_md(self) -> str: """تبدیل جدول نگاشت به Markdown""" if not self.mapping_table: return "### 📋 جدول نگاشت\n\nهیچ موجودیتی شناسایی نشد" table = "### 📋 جدول نگاشت\n\n" table += "| شناسه | متن اصلی |\n" table += "|-------|----------|\n" for token, original in sorted(self.mapping_table.items()): table += f"| **{token}** | {original} |\n" return table # متغیر سراسری anonymizer = None def process(input_text: str, analysis_prompt: str, model_choice: str): """پردازش متن - 4 مرحله""" global anonymizer if not input_text.strip(): return "", "", "", "" cerebras_key = os.getenv("CEREBRAS_API_KEY") if not anonymizer: anonymizer = AnonymizerAdvanced(cerebras_key) else: anonymizer.mapping_table = {} anonymizer.reverse_mapping = {} try: logger.info("=" * 70) logger.info(f"🚀 شروع پردازش - مدل تحلیل: {model_choice}") logger.info("=" * 70) # مرحله 1: ناشناس‌سازی logger.info("📝 مرحله 1: ناشناس‌سازی...") anonymized_text, _ = anonymizer.anonymize_with_cerebras(input_text) logger.info(f"✅ ناشناس‌سازی: {len(anonymized_text)} کاراکتر") # مرحله 2: مدل انتخابی logger.info(f"🤖 مرحله 2: {model_choice}...") model_response = anonymizer.analyze_with_model(anonymized_text, analysis_prompt, model_choice) logger.info(f"✅ {model_choice}: {len(model_response)} کاراکتر") # مرحله 3: بازگردانی logger.info("🔄 مرحله 3: بازگردانی...") restored_text = anonymizer.restore_text(model_response) logger.info("✅ بازگردانی کامل") # مرحله 4: جدول نگاشت logger.info("📋 مرحله 4: جدول نگاشت...") mapping_str = anonymizer.get_mapping_table_md() logger.info(f"✅ {len(anonymizer.mapping_table)} موجودیت") logger.info("=" * 70) logger.info("✅ تمام مراحل کامل!") logger.info("=" * 70) return restored_text, model_response, anonymized_text, mapping_str except Exception as e: logger.error(f"❌ خطا: {str(e)}", exc_info=True) return "", f"❌ خطا: {str(e)}", "", "" def clear_all(): """پاک کردن همه""" return "", "", "", "", "", "" # Gradio Interface css_rtl = """ .input-box { direction: rtl; text-align: right; } .textbox textarea { direction: rtl; text-align: right; font-family: 'Tahoma', serif; } """ with gr.Blocks(title="سیستم ناشناس‌سازی متون", theme=gr.themes.Soft(), css=css_rtl) as app: gr.Markdown("# 🔐 سیستم ناشناس‌سازی متون مالی فارسی (چند مدل)", elem_classes="input-box") gr.Markdown("### با قابلیت Retry و مدیریت خطای پیشرفته", elem_classes="input-box") with gr.Row(): with gr.Column(scale=1): # منوی انتخاب مدل model_dropdown = gr.Dropdown( choices=["ChatGPT-4o-mini", "Grok", "Gemini", "DeepSeek"], value="ChatGPT-4o-mini", label="🤖 انتخاب مدل تحلیل", info="هر مدل دارای Retry و Error Handling است", interactive=True ) analysis_prompt = gr.Textbox( lines=8, placeholder="مثال: این متن را خلاصه کن\nمثال: نقاط قوت و ضعف را استخراج کن", label="📋 دستورات تحلیل (اختیاری)", elem_classes="textbox" ) gr.Markdown("---") with gr.Column(): process_btn = gr.Button( "▶️ پردازش", variant="primary", size="lg" ) clear_btn = gr.Button( "🗑️ پاک کردن", variant="stop", size="lg" ) with gr.Column(scale=3): input_text = gr.Textbox( lines=14, placeholder="متن مالی/خبری فارسی را وارد کنید...", label="📝 متن ورودی", elem_classes="textbox" ) gr.Markdown("---") gr.Markdown("## 📊 نتایج پردازش", elem_classes="input-box") with gr.Row(): with gr.Column(scale=1): restored_text = gr.Textbox( lines=12, label="✅ متن بازگردانی شده", interactive=False, elem_classes="textbox" ) with gr.Column(scale=1): model_analysis = gr.Textbox( lines=12, label="🤖 تحلیل مدل (ناشناس)", interactive=False, elem_classes="textbox" ) with gr.Column(scale=1): anonymized_text = gr.Textbox( lines=12, label="🔒 متن ناشناس‌شده", interactive=False, elem_classes="textbox" ) gr.Markdown("---") mapping_table = gr.Markdown( value="### 📋 جدول نگاشت\n\nهنوز پردازشی انجام نشده", label="📋 جدول نگاشت", elem_classes="input-box" ) # Event Handlers process_btn.click( fn=process, inputs=[input_text, analysis_prompt, model_dropdown], outputs=[restored_text, model_analysis, anonymized_text, mapping_table] ) clear_btn.click( fn=clear_all, outputs=[input_text, analysis_prompt, restored_text, model_analysis, anonymized_text, mapping_table] ) if __name__ == "__main__": print("=" * 70) print("🚀 سیستم ناشناس‌سازی متون در حال راه‌اندازی...") print("=" * 70) print("\n📋 API Keys مورد نیاز:\n") print(" ✅ CEREBRAS_API_KEY (برای ناشناس‌سازی)") print(" ✅ OPENAI_API_KEY (برای ChatGPT)") print(" ✅ XAI_API_KEY (برای Grok)") print(" ✅ GEMINI_API_KEY (برای Gemini)") print(" ✅ DEEPSEEK_API_KEY (برای DeepSeek)") print("\n⚡ ویژگی‌های جدید:\n") print(" 🔄 Retry mechanism برای همه مدل‌ها") print(" ⚠️ مدیریت خطاهای Rate limiting") print(" 🛡️ Error handling پیشرفته") print(" ⏱️ Timeout handling با تلاش مجدد") print("\n🎯 روش ناشناس‌سازی: Cerebras (Llama 3.3-70B)") print("🤖 مدل‌های تحلیل: ChatGPT, Grok, Gemini, DeepSeek") print("=" * 70 + "\n") app.launch( server_name="0.0.0.0", server_port=7860, share=False, show_error=True )