import gradio as gr import re import os import requests import json import logging from typing import Dict, Tuple from unified_llm_sender import UnifiedLLMSender, get_available_models, get_model_display_names logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class AnonymizerAdvanced: """ناشناس‌ساز پیشرفته با Cerebras""" def __init__(self, cerebras_key: str = None): self.cerebras_key = cerebras_key or os.getenv("CEREBRAS_API_KEY") or os.getenv("GR00_API_KEY") self.mapping_table = {} self.reverse_mapping = {} logger.info("✅ Anonymizer Advanced مقداردهی شد") def anonymize_with_cerebras(self, text: str) -> Tuple[str, Dict]: """ناشناس‌سازی با Cerebras - دریافت mapping از مدل""" logger.info("🧠 روش Cerebras...") if not self.cerebras_key: logger.error("❌ Cerebras API Key موجود نیست") raise ValueError("Cerebras API Key مورد نیاز است (CEREBRAS_API_KEY یا GR00_API_KEY)") try: # مرحله 1: ناشناس‌سازی متن prompt1 = f"""متن زیر را ناشناس کنید. قوانین: 1. اسامی اشخاص → person-01, person-02, ... 2. نام شرکت‌ها/سازمان‌ها → company-01, company-02, ... 3. مقادیر پولی → amount-01, amount-02, ... 4. درصدها → percent-01, percent-02, ... 5. فقط این توکن‌ها استفاده کنید 6. شماره‌های نسخه را درست حفظ کنید 7. اگر موجودیت تکرار شود از شماره قدیمی استفاده کنید متن: {text} خروجی: فقط متن ناشناس شده""" response1 = requests.post( "https://api.cerebras.ai/v1/chat/completions", headers={ "Authorization": f"Bearer {self.cerebras_key}", "Content-Type": "application/json" }, json={ "model": "llama-3.3-70b", "messages": [{"role": "user", "content": prompt1}], "max_tokens": 4096, "temperature": 0.1 }, timeout=60 ) if response1.status_code != 200: logger.error(f"❌ Cerebras Error: {response1.status_code}") raise Exception(f"Cerebras API Error: {response1.status_code}") anonymized_text = response1.json()['choices'][0]['message']['content'].strip() logger.info("✅ Cerebras: ناشناس‌سازی موفق") # مرحله 2: استخراج mapping از مدل prompt2 = f"""متن اصلی: {text} متن ناشناس شده: {anonymized_text} لطفاً یک جدول mapping برای همه توکن‌های ناشناس ایجاد کن. برای هر توکن، متن اصلی کامل آن را مشخص کن. **مهم:** - برای person-XX: نام کامل شخص (مثلاً "علی احمدی") - برای company-XX: نام کامل شرکت/سازمان (مثلاً "شرکت پتروشیمی") - برای amount-XX: عدد + واحد (مثلاً "80 هزار تومان" یا "50 میلیارد ریال") - برای percent-XX: عدد + کلمه "درصد" (مثلاً "40 درصد" نه فقط "40") خروجی را به این فرمت JSON بده (فقط JSON، بدون توضیح اضافی): {{ "person-01": "متن اصلی کامل", "company-01": "متن اصلی کامل", "amount-01": "متن اصلی کامل با واحد", "percent-01": "عدد + درصد", ... }}""" response2 = requests.post( "https://api.cerebras.ai/v1/chat/completions", headers={ "Authorization": f"Bearer {self.cerebras_key}", "Content-Type": "application/json" }, json={ "model": "llama-3.3-70b", "messages": [{"role": "user", "content": prompt2}], "max_tokens": 2048, "temperature": 0.1 }, timeout=60 ) if response2.status_code == 200: mapping_text = response2.json()['choices'][0]['message']['content'].strip() mapping_text = mapping_text.replace('```json', '').replace('```', '').strip() try: self.mapping_table = json.loads(mapping_text) self._fix_percent_mapping() self.reverse_mapping = {v: k for k, v in self.mapping_table.items()} logger.info(f"✅ Mapping استخراج شد: {len(self.mapping_table)} موجودیت") except json.JSONDecodeError: logger.warning("⚠️ خطا در parse کردن JSON mapping - استفاده از روش fallback") self._extract_mapping_from_text(text, anonymized_text) else: logger.warning("⚠️ خطا در دریافت mapping - استفاده از روش fallback") self._extract_mapping_from_text(text, anonymized_text) return anonymized_text, self.mapping_table except Exception as e: logger.error(f"❌ Cerebras Exception: {e}") raise def _fix_percent_mapping(self): """اصلاح mapping برای درصدها و مقادیر""" for token, value in self.mapping_table.items(): value_str = str(value).strip() if token.startswith('percent-'): if not re.search(r'(درصد|%|درصدی)', value_str): self.mapping_table[token] = f"{value_str} درصد" logger.info(f"✅ اصلاح {token}: '{value_str}' → '{value_str} درصد'") elif token.startswith('amount-'): if not re.search(r'(میلیارد|میلیون|هزار|تومان|ریال|دلار|یورو|تن)', value_str): logger.warning(f"⚠️ {token}: فقط عدد '{value_str}' - واحد مشخص نیست") def _extract_mapping_from_text(self, original: str, anonymized: str): """استخراج mapping از متن‌های اصلی و ناشناس شده""" all_tokens = [] for entity_type in ['person', 'company', 'amount', 'percent']: tokens = re.findall(f'{entity_type}-\\d+', anonymized) all_tokens.extend([(t, entity_type) for t in tokens]) all_tokens = sorted(set(all_tokens), key=lambda x: (x[1], int(x[0].split('-')[1]))) patterns = { 'person': r'\b[ء-ي]+\s+[ء-ي]+(?:\s+[ء-ي]+)*\b', 'company': r'(?:شرکت|بانک|سازمان|گروه|هلدینگ)\s+[ء-ي]+(?:\s+[ء-ي]+)*', 'amount': r'\d+(?:\.\d+)?\s*(?:میلیارد|میلیون|هزار|تومان|ریال|دلار|یورو|تن)', 'percent': r'\d+(?:\.\d+)?\s*(?:درصد|%|درصدی)', } original_entities = {} for entity_type, pattern in patterns.items(): matches = list(re.finditer(pattern, original)) original_entities[entity_type] = [m.group().strip() for m in matches] for token, entity_type in all_tokens: if entity_type in original_entities and original_entities[entity_type]: token_num = int(token.split('-')[1]) - 1 if token_num < len(original_entities[entity_type]): original_text = original_entities[entity_type][token_num] self.mapping_table[token] = original_text self.reverse_mapping[original_text] = token else: original_text = original_entities[entity_type][-1] if token not in self.mapping_table: self.mapping_table[token] = original_text self.reverse_mapping[original_text] = token def analyze_with_model(self, anonymized_text: str, analysis_prompt: str, model_name: str) -> str: """ اجرای پرامپت‌ها با مدل انتخابی """ logger.info(f"🤖 {model_name} اجرای پرامپت...") if not analysis_prompt or not analysis_prompt.strip(): logger.info("⚠️ پرامپتی وارد نشده - متن ناشناس‌سازی شده برگردانده می‌شود") return anonymized_text try: # ساخت system message system_msg = """شما یک تحلیلگر مالی حرفه‌ای هستید. متن حاوی کدهای ناشناس است (person-XX، company-XX، amount-XX، percent-XX). به سوالات و درخواست‌ها با دقت پاسخ دهید و این کدها را در پاسخ خود حفظ کنید.""" # ساخت پیام کامل full_text = f"""{analysis_prompt} متن برای تحلیل: {anonymized_text}""" # استفاده از UnifiedLLMSender sender = UnifiedLLMSender(model=model_name) response = sender.send( text=full_text, system_msg=system_msg, max_tokens=4096, temperature=0.1, lang='fa' ) logger.info(f"✅ {model_name} پاسخ داد: {len(response)} کاراکتر") return response except Exception as e: logger.error(f"❌ {model_name} Exception: {e}") return f"❌ خطا در {model_name}: {str(e)}" def restore_text(self, anonymized_text: str) -> str: """بازگردانی متن ناشناس‌سازی شده به متن اصلی""" logger.info("🔄 بازگردانی متن...") if not self.mapping_table: logger.warning("⚠️ جدول نگاشت خالی است") return anonymized_text restored = anonymized_text for placeholder, original in sorted(self.mapping_table.items()): restored = restored.replace(placeholder, original) logger.info("✅ بازگردانی کامل") return restored def get_mapping_table_md(self) -> str: """تبدیل جدول نگاشت به Markdown""" if not self.mapping_table: return "### 📋 جدول نگاشت\n\nهیچ موجودیتی شناسایی نشد" table = "### 📋 جدول نگاشت\n\n" table += "| شناسه | متن اصلی |\n" table += "|-------|----------|\n" for token, original in sorted(self.mapping_table.items()): table += f"| **{token}** | {original} |\n" return table # متغیر سراسری anonymizer = None def get_available_model_choices(): """دریافت لیست مدل‌های موجود برای Dropdown""" available = get_available_models() display_names = get_model_display_names() choices = [] for model_name, info in available.items(): if info['has_key']: choices.append(display_names.get(model_name, model_name)) # اگر هیچ مدلی موجود نیست، یک پیام نمایش بده if not choices: choices = ["❌ هیچ API Key موجود نیست"] return choices def get_model_name_from_display(display_name: str) -> str: """تبدیل نام نمایشی به نام مدل""" display_names = get_model_display_names() reverse_map = {v: k for k, v in display_names.items()} return reverse_map.get(display_name, display_name) def process(input_text: str, analysis_prompt: str, model_choice: str): """پردازش متن - 4 مرحله""" global anonymizer if not input_text.strip(): return "", "", "", "" # دریافت نام واقعی مدل model_name = get_model_name_from_display(model_choice) if not anonymizer: anonymizer = AnonymizerAdvanced() else: anonymizer.mapping_table = {} anonymizer.reverse_mapping = {} try: logger.info("=" * 70) logger.info(f"🚀 شروع پردازش - مدل تحلیل: {model_name}") logger.info("=" * 70) # مرحله 1: ناشناس‌سازی logger.info("📝 مرحله 1: ناشناس‌سازی...") anonymized_text, _ = anonymizer.anonymize_with_cerebras(input_text) logger.info(f"✅ ناشناس‌سازی: {len(anonymized_text)} کاراکتر") # مرحله 2: مدل انتخابی logger.info(f"🤖 مرحله 2: {model_name}...") model_response = anonymizer.analyze_with_model(anonymized_text, analysis_prompt, model_name) logger.info(f"✅ {model_name}: {len(model_response)} کاراکتر") # مرحله 3: بازگردانی logger.info("🔄 مرحله 3: بازگردانی...") restored_text = anonymizer.restore_text(model_response) logger.info("✅ بازگردانی کامل") # مرحله 4: جدول نگاشت logger.info("📋 مرحله 4: جدول نگاشت...") mapping_str = anonymizer.get_mapping_table_md() logger.info(f"✅ {len(anonymizer.mapping_table)} موجودیت") logger.info("=" * 70) logger.info("✅ تمام مراحل کامل!") logger.info("=" * 70) return restored_text, model_response, anonymized_text, mapping_str except Exception as e: logger.error(f"❌ خطا: {str(e)}", exc_info=True) return "", f"❌ خطا: {str(e)}", "", "" def clear_all(): """پاک کردن همه""" return "", "", "", "", "", "" # Gradio Interface css_rtl = """ .input-box { direction: rtl; text-align: right; } .textbox textarea { direction: rtl; text-align: right; font-family: 'Tahoma', serif; } """ with gr.Blocks(title="سیستم ناشناس‌سازی متون", theme=gr.themes.Soft(), css=css_rtl) as app: gr.Markdown("# 🔐 سیستم ناشناس‌سازی متون مالی فارسی", elem_classes="input-box") gr.Markdown("### 🌟 با پشتیبانی از مدل‌های پیشرفته AI", elem_classes="input-box") with gr.Row(): with gr.Column(scale=1): # منوی انتخاب مدل - بارگذاری دینامیک model_dropdown = gr.Dropdown( choices=get_available_model_choices(), value=get_available_model_choices()[0] if get_available_model_choices() else None, label="🤖 انتخاب مدل تحلیل", info="فقط مدل‌هایی که API Key دارند نمایش داده می‌شوند", interactive=True ) analysis_prompt = gr.Textbox( lines=8, placeholder="مثال: این متن را خلاصه کن\nمثال: نقاط قوت و ضعف را استخراج کن", label="📋 دستورات تحلیل (اختیاری)", elem_classes="textbox" ) gr.Markdown("---") with gr.Column(): process_btn = gr.Button( "▶️ پردازش", variant="primary", size="lg" ) clear_btn = gr.Button( "🗑️ پاک کردن", variant="stop", size="lg" ) with gr.Column(scale=3): input_text = gr.Textbox( lines=14, placeholder="متن مالی/خبری فارسی را وارد کنید...", label="📝 متن ورودی", elem_classes="textbox" ) gr.Markdown("---") gr.Markdown("## 📊 نتایج پردازش", elem_classes="input-box") with gr.Row(): with gr.Column(scale=1): restored_text = gr.Textbox( lines=12, label="✅ متن بازگردانی شده", interactive=False, elem_classes="textbox" ) with gr.Column(scale=1): model_analysis = gr.Textbox( lines=12, label="🤖 تحلیل مدل (ناشناس)", interactive=False, elem_classes="textbox" ) with gr.Column(scale=1): anonymized_text = gr.Textbox( lines=12, label="🔒 متن ناشناس‌شده", interactive=False, elem_classes="textbox" ) gr.Markdown("---") mapping_table = gr.Markdown( value="### 📋 جدول نگاشت\n\nهنوز پردازشی انجام نشده", label="📋 جدول نگاشت", elem_classes="input-box" ) # Event Handlers process_btn.click( fn=process, inputs=[input_text, analysis_prompt, model_dropdown], outputs=[restored_text, model_analysis, anonymized_text, mapping_table] ) clear_btn.click( fn=clear_all, outputs=[input_text, analysis_prompt, restored_text, model_analysis, anonymized_text, mapping_table] ) if __name__ == "__main__": print("=" * 70) print("🚀 سیستم ناشناس‌سازی متون در حال راه‌اندازی...") print("=" * 70) # نمایش مدل‌های موجود available = get_available_models() display_names = get_model_display_names() print("\n📋 مدل‌های موجود:\n") for model_name, info in available.items(): status = "✅" if info['has_key'] else "❌" display = display_names.get(model_name, model_name) print(f" {status} {display} ({info['env_key']})") print("\n" + "=" * 70 + "\n") app.launch( server_name="0.0.0.0", server_port=7860, share=False, show_error=True )