Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import re | |
| import os | |
| import requests | |
| import json | |
| import logging | |
| from typing import Dict, Tuple | |
| from unified_llm_sender import UnifiedLLMSender, get_available_models, get_model_display_names | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class AnonymizerAdvanced: | |
| """ناشناسساز پیشرفته با Cerebras""" | |
| def __init__(self, cerebras_key: str = None): | |
| self.cerebras_key = cerebras_key or os.getenv("CEREBRAS_API_KEY") or os.getenv("GR00_API_KEY") | |
| self.mapping_table = {} | |
| self.reverse_mapping = {} | |
| logger.info("✅ Anonymizer Advanced مقداردهی شد") | |
| def anonymize_with_cerebras(self, text: str) -> Tuple[str, Dict]: | |
| """ناشناسسازی با Cerebras - دریافت mapping از مدل""" | |
| logger.info("🧠 روش Cerebras...") | |
| if not self.cerebras_key: | |
| logger.error("❌ Cerebras API Key موجود نیست") | |
| raise ValueError("Cerebras API Key مورد نیاز است (CEREBRAS_API_KEY یا GR00_API_KEY)") | |
| try: | |
| # مرحله 1: ناشناسسازی متن | |
| prompt1 = f"""متن زیر را ناشناس کنید. قوانین: | |
| 1. اسامی اشخاص → person-01, person-02, ... | |
| 2. نام شرکتها/سازمانها → company-01, company-02, ... | |
| 3. مقادیر پولی → amount-01, amount-02, ... | |
| 4. درصدها → percent-01, percent-02, ... | |
| 5. فقط این توکنها استفاده کنید | |
| 6. شمارههای نسخه را درست حفظ کنید | |
| 7. اگر موجودیت تکرار شود از شماره قدیمی استفاده کنید | |
| متن: | |
| {text} | |
| خروجی: فقط متن ناشناس شده""" | |
| response1 = requests.post( | |
| "https://api.cerebras.ai/v1/chat/completions", | |
| headers={ | |
| "Authorization": f"Bearer {self.cerebras_key}", | |
| "Content-Type": "application/json" | |
| }, | |
| json={ | |
| "model": "llama-3.3-70b", | |
| "messages": [{"role": "user", "content": prompt1}], | |
| "max_tokens": 4096, | |
| "temperature": 0.1 | |
| }, | |
| timeout=60 | |
| ) | |
| if response1.status_code != 200: | |
| logger.error(f"❌ Cerebras Error: {response1.status_code}") | |
| raise Exception(f"Cerebras API Error: {response1.status_code}") | |
| anonymized_text = response1.json()['choices'][0]['message']['content'].strip() | |
| logger.info("✅ Cerebras: ناشناسسازی موفق") | |
| # مرحله 2: استخراج mapping از مدل | |
| prompt2 = f"""متن اصلی: | |
| {text} | |
| متن ناشناس شده: | |
| {anonymized_text} | |
| لطفاً یک جدول mapping برای همه توکنهای ناشناس ایجاد کن. | |
| برای هر توکن، متن اصلی کامل آن را مشخص کن. | |
| **مهم:** | |
| - برای person-XX: نام کامل شخص (مثلاً "علی احمدی") | |
| - برای company-XX: نام کامل شرکت/سازمان (مثلاً "شرکت پتروشیمی") | |
| - برای amount-XX: عدد + واحد (مثلاً "80 هزار تومان" یا "50 میلیارد ریال") | |
| - برای percent-XX: عدد + کلمه "درصد" (مثلاً "40 درصد" نه فقط "40") | |
| خروجی را به این فرمت JSON بده (فقط JSON، بدون توضیح اضافی): | |
| {{ | |
| "person-01": "متن اصلی کامل", | |
| "company-01": "متن اصلی کامل", | |
| "amount-01": "متن اصلی کامل با واحد", | |
| "percent-01": "عدد + درصد", | |
| ... | |
| }}""" | |
| response2 = requests.post( | |
| "https://api.cerebras.ai/v1/chat/completions", | |
| headers={ | |
| "Authorization": f"Bearer {self.cerebras_key}", | |
| "Content-Type": "application/json" | |
| }, | |
| json={ | |
| "model": "llama-3.3-70b", | |
| "messages": [{"role": "user", "content": prompt2}], | |
| "max_tokens": 2048, | |
| "temperature": 0.1 | |
| }, | |
| timeout=60 | |
| ) | |
| if response2.status_code == 200: | |
| mapping_text = response2.json()['choices'][0]['message']['content'].strip() | |
| mapping_text = mapping_text.replace('```json', '').replace('```', '').strip() | |
| try: | |
| self.mapping_table = json.loads(mapping_text) | |
| self._fix_percent_mapping() | |
| self.reverse_mapping = {v: k for k, v in self.mapping_table.items()} | |
| logger.info(f"✅ Mapping استخراج شد: {len(self.mapping_table)} موجودیت") | |
| except json.JSONDecodeError: | |
| logger.warning("⚠️ خطا در parse کردن JSON mapping - استفاده از روش fallback") | |
| self._extract_mapping_from_text(text, anonymized_text) | |
| else: | |
| logger.warning("⚠️ خطا در دریافت mapping - استفاده از روش fallback") | |
| self._extract_mapping_from_text(text, anonymized_text) | |
| return anonymized_text, self.mapping_table | |
| except Exception as e: | |
| logger.error(f"❌ Cerebras Exception: {e}") | |
| raise | |
| def _fix_percent_mapping(self): | |
| """اصلاح mapping برای درصدها و مقادیر""" | |
| for token, value in self.mapping_table.items(): | |
| value_str = str(value).strip() | |
| if token.startswith('percent-'): | |
| if not re.search(r'(درصد|%|درصدی)', value_str): | |
| self.mapping_table[token] = f"{value_str} درصد" | |
| logger.info(f"✅ اصلاح {token}: '{value_str}' → '{value_str} درصد'") | |
| elif token.startswith('amount-'): | |
| if not re.search(r'(میلیارد|میلیون|هزار|تومان|ریال|دلار|یورو|تن)', value_str): | |
| logger.warning(f"⚠️ {token}: فقط عدد '{value_str}' - واحد مشخص نیست") | |
| def _extract_mapping_from_text(self, original: str, anonymized: str): | |
| """استخراج mapping از متنهای اصلی و ناشناس شده""" | |
| all_tokens = [] | |
| for entity_type in ['person', 'company', 'amount', 'percent']: | |
| tokens = re.findall(f'{entity_type}-\\d+', anonymized) | |
| all_tokens.extend([(t, entity_type) for t in tokens]) | |
| all_tokens = sorted(set(all_tokens), key=lambda x: (x[1], int(x[0].split('-')[1]))) | |
| patterns = { | |
| 'person': r'\b[ء-ي]+\s+[ء-ي]+(?:\s+[ء-ي]+)*\b', | |
| 'company': r'(?:شرکت|بانک|سازمان|گروه|هلدینگ)\s+[ء-ي]+(?:\s+[ء-ي]+)*', | |
| 'amount': r'\d+(?:\.\d+)?\s*(?:میلیارد|میلیون|هزار|تومان|ریال|دلار|یورو|تن)', | |
| 'percent': r'\d+(?:\.\d+)?\s*(?:درصد|%|درصدی)', | |
| } | |
| original_entities = {} | |
| for entity_type, pattern in patterns.items(): | |
| matches = list(re.finditer(pattern, original)) | |
| original_entities[entity_type] = [m.group().strip() for m in matches] | |
| for token, entity_type in all_tokens: | |
| if entity_type in original_entities and original_entities[entity_type]: | |
| token_num = int(token.split('-')[1]) - 1 | |
| if token_num < len(original_entities[entity_type]): | |
| original_text = original_entities[entity_type][token_num] | |
| self.mapping_table[token] = original_text | |
| self.reverse_mapping[original_text] = token | |
| else: | |
| original_text = original_entities[entity_type][-1] | |
| if token not in self.mapping_table: | |
| self.mapping_table[token] = original_text | |
| self.reverse_mapping[original_text] = token | |
| def analyze_with_model(self, anonymized_text: str, analysis_prompt: str, model_name: str) -> str: | |
| """ | |
| اجرای پرامپتها با مدل انتخابی | |
| """ | |
| logger.info(f"🤖 {model_name} اجرای پرامپت...") | |
| if not analysis_prompt or not analysis_prompt.strip(): | |
| logger.info("⚠️ پرامپتی وارد نشده - متن ناشناسسازی شده برگردانده میشود") | |
| return anonymized_text | |
| try: | |
| # ساخت system message | |
| system_msg = """شما یک تحلیلگر مالی حرفهای هستید. متن حاوی کدهای ناشناس است (person-XX، company-XX، amount-XX، percent-XX). | |
| به سوالات و درخواستها با دقت پاسخ دهید و این کدها را در پاسخ خود حفظ کنید.""" | |
| # ساخت پیام کامل | |
| full_text = f"""{analysis_prompt} | |
| متن برای تحلیل: | |
| {anonymized_text}""" | |
| # استفاده از UnifiedLLMSender | |
| sender = UnifiedLLMSender(model=model_name) | |
| response = sender.send( | |
| text=full_text, | |
| system_msg=system_msg, | |
| max_tokens=4096, | |
| temperature=0.1, | |
| lang='fa' | |
| ) | |
| logger.info(f"✅ {model_name} پاسخ داد: {len(response)} کاراکتر") | |
| return response | |
| except Exception as e: | |
| logger.error(f"❌ {model_name} Exception: {e}") | |
| return f"❌ خطا در {model_name}: {str(e)}" | |
| def restore_text(self, anonymized_text: str) -> str: | |
| """بازگردانی متن ناشناسسازی شده به متن اصلی""" | |
| logger.info("🔄 بازگردانی متن...") | |
| if not self.mapping_table: | |
| logger.warning("⚠️ جدول نگاشت خالی است") | |
| return anonymized_text | |
| restored = anonymized_text | |
| for placeholder, original in sorted(self.mapping_table.items()): | |
| restored = restored.replace(placeholder, original) | |
| logger.info("✅ بازگردانی کامل") | |
| return restored | |
| def get_mapping_table_md(self) -> str: | |
| """تبدیل جدول نگاشت به Markdown""" | |
| if not self.mapping_table: | |
| return "### 📋 جدول نگاشت\n\nهیچ موجودیتی شناسایی نشد" | |
| table = "### 📋 جدول نگاشت\n\n" | |
| table += "| شناسه | متن اصلی |\n" | |
| table += "|-------|----------|\n" | |
| for token, original in sorted(self.mapping_table.items()): | |
| table += f"| **{token}** | {original} |\n" | |
| return table | |
| # متغیر سراسری | |
| anonymizer = None | |
| def get_available_model_choices(): | |
| """دریافت لیست مدلهای موجود برای Dropdown""" | |
| available = get_available_models() | |
| display_names = get_model_display_names() | |
| choices = [] | |
| for model_name, info in available.items(): | |
| if info['has_key']: | |
| choices.append(display_names.get(model_name, model_name)) | |
| # اگر هیچ مدلی موجود نیست، یک پیام نمایش بده | |
| if not choices: | |
| choices = ["❌ هیچ API Key موجود نیست"] | |
| return choices | |
| def get_model_name_from_display(display_name: str) -> str: | |
| """تبدیل نام نمایشی به نام مدل""" | |
| display_names = get_model_display_names() | |
| reverse_map = {v: k for k, v in display_names.items()} | |
| return reverse_map.get(display_name, display_name) | |
| def process(input_text: str, analysis_prompt: str, model_choice: str): | |
| """پردازش متن - 4 مرحله""" | |
| global anonymizer | |
| if not input_text.strip(): | |
| return "", "", "", "" | |
| # دریافت نام واقعی مدل | |
| model_name = get_model_name_from_display(model_choice) | |
| if not anonymizer: | |
| anonymizer = AnonymizerAdvanced() | |
| else: | |
| anonymizer.mapping_table = {} | |
| anonymizer.reverse_mapping = {} | |
| try: | |
| logger.info("=" * 70) | |
| logger.info(f"🚀 شروع پردازش - مدل تحلیل: {model_name}") | |
| logger.info("=" * 70) | |
| # مرحله 1: ناشناسسازی | |
| logger.info("📝 مرحله 1: ناشناسسازی...") | |
| anonymized_text, _ = anonymizer.anonymize_with_cerebras(input_text) | |
| logger.info(f"✅ ناشناسسازی: {len(anonymized_text)} کاراکتر") | |
| # مرحله 2: مدل انتخابی | |
| logger.info(f"🤖 مرحله 2: {model_name}...") | |
| model_response = anonymizer.analyze_with_model(anonymized_text, analysis_prompt, model_name) | |
| logger.info(f"✅ {model_name}: {len(model_response)} کاراکتر") | |
| # مرحله 3: بازگردانی | |
| logger.info("🔄 مرحله 3: بازگردانی...") | |
| restored_text = anonymizer.restore_text(model_response) | |
| logger.info("✅ بازگردانی کامل") | |
| # مرحله 4: جدول نگاشت | |
| logger.info("📋 مرحله 4: جدول نگاشت...") | |
| mapping_str = anonymizer.get_mapping_table_md() | |
| logger.info(f"✅ {len(anonymizer.mapping_table)} موجودیت") | |
| logger.info("=" * 70) | |
| logger.info("✅ تمام مراحل کامل!") | |
| logger.info("=" * 70) | |
| return restored_text, model_response, anonymized_text, mapping_str | |
| except Exception as e: | |
| logger.error(f"❌ خطا: {str(e)}", exc_info=True) | |
| return "", f"❌ خطا: {str(e)}", "", "" | |
| def clear_all(): | |
| """پاک کردن همه""" | |
| return "", "", "", "", "", "" | |
| # Gradio Interface | |
| css_rtl = """ | |
| .input-box { direction: rtl; text-align: right; } | |
| .textbox textarea { direction: rtl; text-align: right; font-family: 'Tahoma', serif; } | |
| """ | |
| with gr.Blocks(title="سیستم ناشناسسازی متون", theme=gr.themes.Soft(), css=css_rtl) as app: | |
| gr.Markdown("# 🔐 سیستم ناشناسسازی متون مالی فارسی", elem_classes="input-box") | |
| gr.Markdown("### 🌟 با پشتیبانی از مدلهای پیشرفته AI", elem_classes="input-box") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| # منوی انتخاب مدل - بارگذاری دینامیک | |
| model_dropdown = gr.Dropdown( | |
| choices=get_available_model_choices(), | |
| value=get_available_model_choices()[0] if get_available_model_choices() else None, | |
| label="🤖 انتخاب مدل تحلیل", | |
| info="فقط مدلهایی که API Key دارند نمایش داده میشوند", | |
| interactive=True | |
| ) | |
| analysis_prompt = gr.Textbox( | |
| lines=8, | |
| placeholder="مثال: این متن را خلاصه کن\nمثال: نقاط قوت و ضعف را استخراج کن", | |
| label="📋 دستورات تحلیل (اختیاری)", | |
| elem_classes="textbox" | |
| ) | |
| gr.Markdown("---") | |
| with gr.Column(): | |
| process_btn = gr.Button( | |
| "▶️ پردازش", | |
| variant="primary", | |
| size="lg" | |
| ) | |
| clear_btn = gr.Button( | |
| "🗑️ پاک کردن", | |
| variant="stop", | |
| size="lg" | |
| ) | |
| with gr.Column(scale=3): | |
| input_text = gr.Textbox( | |
| lines=14, | |
| placeholder="متن مالی/خبری فارسی را وارد کنید...", | |
| label="📝 متن ورودی", | |
| elem_classes="textbox" | |
| ) | |
| gr.Markdown("---") | |
| gr.Markdown("## 📊 نتایج پردازش", elem_classes="input-box") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| restored_text = gr.Textbox( | |
| lines=12, | |
| label="✅ متن بازگردانی شده", | |
| interactive=False, | |
| elem_classes="textbox" | |
| ) | |
| with gr.Column(scale=1): | |
| model_analysis = gr.Textbox( | |
| lines=12, | |
| label="🤖 تحلیل مدل (ناشناس)", | |
| interactive=False, | |
| elem_classes="textbox" | |
| ) | |
| with gr.Column(scale=1): | |
| anonymized_text = gr.Textbox( | |
| lines=12, | |
| label="🔒 متن ناشناسشده", | |
| interactive=False, | |
| elem_classes="textbox" | |
| ) | |
| gr.Markdown("---") | |
| mapping_table = gr.Markdown( | |
| value="### 📋 جدول نگاشت\n\nهنوز پردازشی انجام نشده", | |
| label="📋 جدول نگاشت", | |
| elem_classes="input-box" | |
| ) | |
| # Event Handlers | |
| process_btn.click( | |
| fn=process, | |
| inputs=[input_text, analysis_prompt, model_dropdown], | |
| outputs=[restored_text, model_analysis, anonymized_text, mapping_table] | |
| ) | |
| clear_btn.click( | |
| fn=clear_all, | |
| outputs=[input_text, analysis_prompt, restored_text, model_analysis, anonymized_text, mapping_table] | |
| ) | |
| if __name__ == "__main__": | |
| print("=" * 70) | |
| print("🚀 سیستم ناشناسسازی متون در حال راهاندازی...") | |
| print("=" * 70) | |
| # نمایش مدلهای موجود | |
| available = get_available_models() | |
| display_names = get_model_display_names() | |
| print("\n📋 مدلهای موجود:\n") | |
| for model_name, info in available.items(): | |
| status = "✅" if info['has_key'] else "❌" | |
| display = display_names.get(model_name, model_name) | |
| print(f" {status} {display} ({info['env_key']})") | |
| print("\n" + "=" * 70 + "\n") | |
| app.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False, | |
| show_error=True | |
| ) | |