Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| import requests | |
| import json | |
| import gradio as gr | |
| import logging | |
| from typing import Dict, Any, Tuple | |
| import os | |
| from dataclasses import dataclass | |
| import re | |
| from difflib import SequenceMatcher | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| class CerebrasConfig: | |
| api_key: str | |
| base_url: str = "https://api.cerebras.ai/v1" | |
| model: str = "llama-3.3-70b" | |
| max_tokens: int = 2000 | |
| temperature: float = 0.1 | |
| class AdvancedCerebrasAnonymizer: | |
| def __init__(self, api_key: str = None, openai_api_key: str = None): | |
| if api_key is None: | |
| api_key = os.getenv("CEREBRAS_API_KEY") | |
| if not api_key: | |
| raise ValueError("❌ کلید API Cerebras یافت نشد") | |
| self.config = CerebrasConfig(api_key=api_key) | |
| self.openai_api_key = openai_api_key or os.getenv("OPENAI_API_KEY", "") | |
| self.system_prompt = self._create_system_prompt() | |
| self.mapping_table = {} | |
| logger.info("✅ سیستم آماده شد") | |
| def _create_system_prompt(self) -> str: | |
| return """شما یک «ناشناسساز متون» هستید. وظیفهتان جایگزینی اسامی خاص و مقادیر عددی با شناسههای استاندارد است. | |
| قوانین: | |
| - شرکتها: company-01, company-02, ... | |
| - اشخاص: person-01, person-02, ... | |
| - اعداد/مقادیر: amount-01, amount-02, ... | |
| - درصدها: percent-01, percent-02, ... | |
| مثال: | |
| - "ایرانخودرو" → "company-01" | |
| - "۴.۵۸" → "percent-01" | |
| - "۳۷ میلیارد" → "amount-01" | |
| فقط متن ناشناسشده را برگردانید. هیچ توضیح اضافی نیاید.""" | |
| def anonymize_text(self, text: str) -> Dict[str, Any]: | |
| try: | |
| if not text or not text.strip(): | |
| return {"success": False, "error": "متن ورودی خالی است"} | |
| logger.info(f"🔄 ناشناسسازی... ({len(text)} کاراکتر)") | |
| headers = { | |
| "Authorization": f"Bearer {self.config.api_key}", | |
| "Content-Type": "application/json" | |
| } | |
| payload = { | |
| "model": self.config.model, | |
| "messages": [ | |
| {"role": "system", "content": self.system_prompt}, | |
| {"role": "user", "content": f"لطفاً این متن را ناشناسسازی کنید:\n\n{text}"} | |
| ], | |
| "max_tokens": self.config.max_tokens, | |
| "temperature": self.config.temperature | |
| } | |
| response = requests.post( | |
| f"{self.config.base_url}/chat/completions", | |
| headers=headers, | |
| json=payload, | |
| timeout=60 | |
| ) | |
| if response.status_code != 200: | |
| return {"success": False, "error": f"خطای API: {response.status_code}"} | |
| result = response.json() | |
| anonymized_text = result["choices"][0]["message"]["content"] | |
| usage = result.get("usage", {}) | |
| self._extract_mapping_advanced(text, anonymized_text) | |
| logger.info(f"✅ ناشناسسازی موفق ({len(self.mapping_table)} موجودیت)") | |
| return { | |
| "success": True, | |
| "anonymized_text": anonymized_text, | |
| "usage": usage | |
| } | |
| except Exception as e: | |
| logger.error(f"❌ خطا: {str(e)}") | |
| return {"success": False, "error": str(e)} | |
| def _extract_mapping_advanced(self, original: str, anonymized: str): | |
| """استخراج mapping دقیق با مقایسه متنها""" | |
| self.mapping_table = {} | |
| try: | |
| # تقسیم به جملات | |
| orig_lines = re.split(r'([.!?])', original) | |
| anon_lines = re.split(r'([.!?])', anonymized) | |
| for orig_line, anon_line in zip(orig_lines, anon_lines): | |
| if not orig_line.strip(): | |
| continue | |
| # تقسیم به کلمات | |
| orig_words = orig_line.split() | |
| anon_words = anon_line.split() | |
| # یافتن تطابقات | |
| i = 0 | |
| j = 0 | |
| while i < len(orig_words) and j < len(anon_words): | |
| orig_word = orig_words[i] | |
| anon_word = anon_words[j] | |
| # اگر کدی است | |
| if re.match(r'(company|person|amount|percent)-\d+', anon_word): | |
| # سعی کن تا 3 کلمه قبل را پیدا کن | |
| found = False | |
| for phrase_len in range(min(4, len(orig_words) - i), 0, -1): | |
| phrase = ' '.join(orig_words[i:i+phrase_len]) | |
| # بررسی اینکه آیا این عبارت منطقی است | |
| if self._is_valid_entity(phrase, anon_word): | |
| if phrase not in self.mapping_table: | |
| self.mapping_table[phrase] = anon_word | |
| logger.info(f" {phrase} → {anon_word}") | |
| i += phrase_len | |
| found = True | |
| break | |
| if not found: | |
| i += 1 | |
| j += 1 | |
| else: | |
| i += 1 | |
| j += 1 | |
| except Exception as e: | |
| logger.warning(f"⚠️ خطا در mapping: {str(e)}") | |
| def _is_valid_entity(self, phrase: str, code: str) -> bool: | |
| """بررسی صحت موجودیت""" | |
| phrase_lower = phrase.lower() | |
| if code.startswith('company'): | |
| return any(word in phrase_lower for word in ['شرکت', 'بانک', 'سازمان', 'وزارت', 'گروه']) | |
| elif code.startswith('person'): | |
| return len(phrase) > 2 | |
| elif code.startswith('amount'): | |
| return bool(re.search(r'[\d]+', phrase)) | |
| elif code.startswith('percent'): | |
| return 'درصد' in phrase_lower or '%' in phrase | |
| return False | |
| def send_to_chatgpt(self, anonymized_text: str) -> Dict[str, Any]: | |
| try: | |
| if not anonymized_text or not anonymized_text.strip(): | |
| return {"success": False, "error": "متن ناشناسشده خالی است"} | |
| if not self.openai_api_key: | |
| return {"success": False, "error": "کلید OpenAI تنظیم نشده"} | |
| logger.info("🤖 ارسال به ChatGPT...") | |
| headers = { | |
| "Authorization": f"Bearer {self.openai_api_key}", | |
| "Content-Type": "application/json" | |
| } | |
| data = { | |
| "model": "gpt-4o-mini", | |
| "messages": [ | |
| {"role": "system", "content": "شما یک تحلیلگر مالی حرفهای هستید."}, | |
| {"role": "user", "content": anonymized_text} | |
| ], | |
| "max_tokens": 2000, | |
| "temperature": 0.7 | |
| } | |
| response = requests.post( | |
| "https://api.openai.com/v1/chat/completions", | |
| headers=headers, | |
| json=data, | |
| timeout=30 | |
| ) | |
| if response.status_code == 200: | |
| result = response.json() | |
| return {"success": True, "response": result['choices'][0]['message']['content']} | |
| else: | |
| error_data = response.json() if response.content else {} | |
| error_message = error_data.get('error', {}).get('message', response.text) | |
| return {"success": False, "error": error_message} | |
| except Exception as e: | |
| logger.error(f"❌ خطا: {str(e)}") | |
| return {"success": False, "error": str(e)} | |
| def _create_reverse_mapping(self, mapping_table: Dict[str, str]) -> Dict[str, str]: | |
| """ایجاد reverse mapping: code -> original""" | |
| reverse_map = {} | |
| for original, code in mapping_table.items(): | |
| reverse_map[code] = original | |
| return reverse_map | |
| def deanonymize_response(self, gpt_response: str, mapping_table: Dict[str, str]) -> str: | |
| """بازگردانی تمام موجودیتها (placeholderها و کدهای ساده را پشتیبانی میکند)""" | |
| try: | |
| if not mapping_table: | |
| return gpt_response | |
| result = gpt_response | |
| # ایجاد reverse mapping برای تبدیل code به original | |
| reverse_map = self._create_reverse_mapping(mapping_table) | |
| # مرتبسازی بر اساس طول (طولانیترین اول) | |
| sorted_reverse = sorted(reverse_map.items(), key=lambda x: len(x[0]), reverse=True) | |
| logger.info(f"🔄 شروع بازگردانی {len(sorted_reverse)} موجودیت") | |
| for code, original in sorted_reverse: | |
| # الگوهای مختلف برای جستجو: | |
| # 1. [code] (داخل براکت) | |
| pattern_bracket = f"[{code}]" | |
| if pattern_bracket in result: | |
| result = result.replace(pattern_bracket, original) | |
| logger.info(f" [{code}] → {original} ✓") | |
| # 2. code بدون براکت | |
| if code in result: | |
| result = result.replace(code, original) | |
| logger.info(f" {code} → {original} ✓") | |
| # جایگزینی نسخههای دارای درصد و هزار جدا شده | |
| result = self._replace_special_patterns(result, reverse_map) | |
| logger.info(f"✅ بازگردانی موفق ({len(mapping_table)} موجودیت)") | |
| return result | |
| except Exception as e: | |
| logger.error(f"❌ خطا در بازگردانی: {str(e)}") | |
| return gpt_response | |
| def _replace_special_patterns(self, text: str, reverse_map: Dict[str, str]) -> str: | |
| """جایگزینی الگوهای خاص (درصد-XX، amount-XX و غیره) - شامل placeholderهای بدون براکت""" | |
| try: | |
| result = text | |
| # الگوی 1: Placeholderهای با براکت [code] | |
| pattern_bracket = r'\[((?:company|person|amount|percent|درصد|شرکت|مبلغ)-\d+)\]' | |
| def replacer_bracket(match): | |
| code = match.group(1) | |
| # تبدیل الگوهای فارسی به انگلیسی | |
| code_normalized = code.replace('درصد', 'percent').replace('شرکت', 'company').replace('مبلغ', 'amount') | |
| return reverse_map.get(code_normalized, reverse_map.get(code, match.group(0))) | |
| result = re.sub(pattern_bracket, replacer_bracket, result) | |
| logger.info(f" الگوی 1 (با براکت): جایگزین شد") | |
| # الگوی 2: Placeholderهای داخل پرانتز (code) | |
| pattern_paren = r'\(((?:company|person|amount|percent|درصد|شرکت|مبلغ)-\d+)\)' | |
| def replacer_paren(match): | |
| code = match.group(1) | |
| code_normalized = code.replace('درصد', 'percent').replace('شرکت', 'company').replace('مبلغ', 'amount') | |
| return f"({reverse_map.get(code_normalized, reverse_map.get(code, code))})" | |
| result = re.sub(pattern_paren, replacer_paren, result) | |
| logger.info(f" الگوی 2 (پرانتز): جایگزین شد") | |
| # الگوی 3: Placeholderهای بدون براکت (بسیار مهم!) | |
| # جایگزینی تمام code-XX که با word boundary احاطه شدهاند | |
| pattern_simple = r'\b((?:company|person|amount|percent|درصد|شرکت|مبلغ)-\d+)\b' | |
| def replacer_simple(match): | |
| code = match.group(1) | |
| code_normalized = code.replace('درصد', 'percent').replace('شرکت', 'company').replace('مبلغ', 'amount') | |
| replacement = reverse_map.get(code_normalized, reverse_map.get(code, None)) | |
| if replacement: | |
| logger.info(f" {code} → {replacement}") | |
| return replacement | |
| return match.group(0) | |
| result = re.sub(pattern_simple, replacer_simple, result) | |
| logger.info(f" الگوی 3 (بدون براکت): جایگزین شد") | |
| return result | |
| except Exception as e: | |
| logger.warning(f"⚠️ خطا در جایگزینی الگوهای خاص: {str(e)}") | |
| return text | |
| def create_interface(): | |
| try: | |
| anonymizer = AdvancedCerebrasAnonymizer() | |
| except ValueError as e: | |
| return gr.Interface(fn=lambda x: str(e), inputs="textbox", outputs="textbox", title="❌ خطا") | |
| def process_text(input_text: str) -> Tuple[str, str, str, str]: | |
| logger.info("=" * 60) | |
| logger.info("شروع پردازش") | |
| logger.info("=" * 60) | |
| if not input_text.strip(): | |
| return "❌ متن ورودی خالی است", "", "", "" | |
| try: | |
| # مرحله 1: ناشناسسازی | |
| logger.info("1️⃣ ناشناسسازی...") | |
| anon_result = anonymizer.anonymize_text(input_text) | |
| if not anon_result["success"]: | |
| return f"❌ {anon_result['error']}", "", "", "" | |
| anonymized_text = anon_result["anonymized_text"] | |
| usage_info = anon_result.get("usage", {}) | |
| # مرحله 2: ارسال به ChatGPT | |
| logger.info("2️⃣ ارسال به ChatGPT...") | |
| gpt_result = anonymizer.send_to_chatgpt(anonymized_text) | |
| # تعریف اولیه | |
| gpt_response = "" | |
| gpt_response_deanon = "" | |
| if not gpt_result["success"]: | |
| gpt_response = f"❌ {gpt_result['error']}" | |
| gpt_response_deanon = "" | |
| else: | |
| # پاسخ ChatGPT (ناشناس) | |
| gpt_response = gpt_result["response"] | |
| # مرحله 3: بازگردانی | |
| logger.info("3️⃣ بازگردانی...") | |
| gpt_response_deanon = anonymizer.deanonymize_response(gpt_response, anonymizer.mapping_table) | |
| # آمار | |
| stats = f"""Token: {usage_info.get('total_tokens', '?')} | Mapping: {len(anonymizer.mapping_table)}""" | |
| logger.info("=" * 60) | |
| logger.info("✅ پردازش کامل") | |
| logger.info("=" * 60) | |
| # ✅ ترتیب صحیح: | |
| # 1. آمار | |
| # 2. متن ناشناسشده (ورودی) | |
| # 3. پاسخ ChatGPT (ناشناس) ← باکس "🤖 نتایج ChatGPT" | |
| # 4. نتیجه نهایی (بازگردانی شده) ← باکس "✅ نتیجه نهایی" | |
| return stats, anonymized_text, gpt_response, gpt_response_deanon | |
| except Exception as e: | |
| return f"❌ خطا: {str(e)}", "", "", "" | |
| def copy_text(text: str): | |
| if not text or not text.strip(): | |
| return gr.update(visible=False), "⚠️ متنی وجود ندارد" | |
| return gr.update(value=text, visible=True), "✅ آماده برای کپی" | |
| def clear_all(): | |
| anonymizer.mapping_table = {} | |
| return "", "", "", "", gr.update(visible=False) | |
| with gr.Blocks(title="سیستم ناشناسسازی", theme=gr.themes.Soft()) as interface: | |
| gr.HTML("<h1 style='text-align: center; color: #FFD700;'>🔐 سیستم ناشناسسازی</h1>") | |
| with gr.Row(): | |
| # ستون 1 | |
| with gr.Column(scale=1): | |
| gr.HTML("<h2>📥 ورودی</h2>") | |
| input_text = gr.Textbox(lines=20, placeholder="متن را وارد کنید...", label="", rtl=True) | |
| process_btn = gr.Button("🚀 پردازش", variant="primary", size="lg") | |
| with gr.Row(): | |
| copy_btn = gr.Button("📋 کپی", scale=1) | |
| clear_btn = gr.Button("🗑️ پاک", variant="stop", scale=1) | |
| copy_output = gr.Textbox(visible=False) | |
| # ستون 2 | |
| with gr.Column(scale=1): | |
| gr.HTML("<h2>🎭 متن ناشناسشده</h2>") | |
| anonymized_output = gr.Textbox(lines=20, placeholder="", label="", interactive=False, rtl=True) | |
| # ستون 3 | |
| with gr.Column(scale=1): | |
| gr.HTML("<h2>🤖 نتایج ChatGPT</h2>") | |
| gpt_output = gr.Textbox(lines=10, placeholder="", label="📤 پاسخ", interactive=False, rtl=True) | |
| final_output = gr.Textbox(lines=10, placeholder="", label="✅ نتیجه نهایی", interactive=False, rtl=True) | |
| statistics_output = gr.Textbox(lines=1, label="📊 آمار", interactive=False) | |
| process_btn.click(fn=process_text, inputs=[input_text], outputs=[statistics_output, anonymized_output, gpt_output, final_output]) | |
| copy_btn.click(fn=copy_text, inputs=[final_output], outputs=[copy_output, statistics_output]) | |
| clear_btn.click(fn=clear_all, outputs=[input_text, anonymized_output, gpt_output, final_output, copy_output]) | |
| return interface | |
| def main(): | |
| print("\n🔐 سیستم ناشناسسازی - نسخه 1.0.0\n") | |
| interface = create_interface() | |
| interface.launch(server_name="0.0.0.0", server_port=7860, share=False, show_error=True) | |
| if __name__ == "__main__": | |
| main() | |