|
|
|
|
|
|
|
|
|
|
|
import requests |
|
|
import json |
|
|
import gradio as gr |
|
|
import logging |
|
|
from typing import Dict, Any, Tuple |
|
|
import os |
|
|
from dataclasses import dataclass |
|
|
import re |
|
|
from difflib import SequenceMatcher |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class CerebrasConfig: |
|
|
api_key: str |
|
|
base_url: str = "https://api.cerebras.ai/v1" |
|
|
model: str = "llama-3.3-70b" |
|
|
max_tokens: int = 2000 |
|
|
temperature: float = 0.1 |
|
|
|
|
|
|
|
|
class AdvancedCerebrasAnonymizer: |
|
|
|
|
|
def __init__(self, api_key: str = None, openai_api_key: str = None): |
|
|
if api_key is None: |
|
|
api_key = os.getenv("CEREBRAS_API_KEY") |
|
|
if not api_key: |
|
|
raise ValueError("❌ کلید API Cerebras یافت نشد") |
|
|
|
|
|
self.config = CerebrasConfig(api_key=api_key) |
|
|
self.openai_api_key = openai_api_key or os.getenv("OPENAI_API_KEY", "") |
|
|
self.system_prompt = self._create_system_prompt() |
|
|
self.mapping_table = {} |
|
|
logger.info("✅ سیستم آماده شد") |
|
|
|
|
|
def _create_system_prompt(self) -> str: |
|
|
return """شما یک «ناشناسساز متون» هستید. وظیفهتان جایگزینی اسامی خاص و مقادیر عددی با شناسههای استاندارد است. |
|
|
|
|
|
قوانین: |
|
|
- شرکتها: company-01, company-02, ... |
|
|
- اشخاص: person-01, person-02, ... |
|
|
- اعداد/مقادیر: amount-01, amount-02, ... |
|
|
- درصدها: percent-01, percent-02, ... |
|
|
|
|
|
مثال: |
|
|
- "ایرانخودرو" → "company-01" |
|
|
- "۴.۵۸" → "percent-01" |
|
|
- "۳۷ میلیارد" → "amount-01" |
|
|
|
|
|
فقط متن ناشناسشده را برگردانید. هیچ توضیح اضافی نیاید.""" |
|
|
|
|
|
def anonymize_text(self, text: str) -> Dict[str, Any]: |
|
|
try: |
|
|
if not text or not text.strip(): |
|
|
return {"success": False, "error": "متن ورودی خالی است"} |
|
|
|
|
|
logger.info(f"🔄 ناشناسسازی... ({len(text)} کاراکتر)") |
|
|
|
|
|
headers = { |
|
|
"Authorization": f"Bearer {self.config.api_key}", |
|
|
"Content-Type": "application/json" |
|
|
} |
|
|
|
|
|
payload = { |
|
|
"model": self.config.model, |
|
|
"messages": [ |
|
|
{"role": "system", "content": self.system_prompt}, |
|
|
{"role": "user", "content": f"لطفاً این متن را ناشناسسازی کنید:\n\n{text}"} |
|
|
], |
|
|
"max_tokens": self.config.max_tokens, |
|
|
"temperature": self.config.temperature |
|
|
} |
|
|
|
|
|
response = requests.post( |
|
|
f"{self.config.base_url}/chat/completions", |
|
|
headers=headers, |
|
|
json=payload, |
|
|
timeout=60 |
|
|
) |
|
|
|
|
|
if response.status_code != 200: |
|
|
return {"success": False, "error": f"خطای API: {response.status_code}"} |
|
|
|
|
|
result = response.json() |
|
|
anonymized_text = result["choices"][0]["message"]["content"] |
|
|
usage = result.get("usage", {}) |
|
|
|
|
|
self._extract_mapping_advanced(text, anonymized_text) |
|
|
|
|
|
logger.info(f"✅ ناشناسسازی موفق ({len(self.mapping_table)} موجودیت)") |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"anonymized_text": anonymized_text, |
|
|
"usage": usage |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"❌ خطا: {str(e)}") |
|
|
return {"success": False, "error": str(e)} |
|
|
|
|
|
def _extract_mapping_advanced(self, original: str, anonymized: str): |
|
|
"""استخراج mapping دقیق با مقایسه متنها""" |
|
|
self.mapping_table = {} |
|
|
|
|
|
try: |
|
|
|
|
|
orig_lines = re.split(r'([.!?])', original) |
|
|
anon_lines = re.split(r'([.!?])', anonymized) |
|
|
|
|
|
for orig_line, anon_line in zip(orig_lines, anon_lines): |
|
|
if not orig_line.strip(): |
|
|
continue |
|
|
|
|
|
|
|
|
orig_words = orig_line.split() |
|
|
anon_words = anon_line.split() |
|
|
|
|
|
|
|
|
i = 0 |
|
|
j = 0 |
|
|
|
|
|
while i < len(orig_words) and j < len(anon_words): |
|
|
orig_word = orig_words[i] |
|
|
anon_word = anon_words[j] |
|
|
|
|
|
|
|
|
if re.match(r'(company|person|amount|percent)-\d+', anon_word): |
|
|
|
|
|
found = False |
|
|
|
|
|
for phrase_len in range(min(4, len(orig_words) - i), 0, -1): |
|
|
phrase = ' '.join(orig_words[i:i+phrase_len]) |
|
|
|
|
|
|
|
|
if self._is_valid_entity(phrase, anon_word): |
|
|
if phrase not in self.mapping_table: |
|
|
self.mapping_table[phrase] = anon_word |
|
|
logger.info(f" {phrase} → {anon_word}") |
|
|
i += phrase_len |
|
|
found = True |
|
|
break |
|
|
|
|
|
if not found: |
|
|
i += 1 |
|
|
j += 1 |
|
|
else: |
|
|
i += 1 |
|
|
j += 1 |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"⚠️ خطا در mapping: {str(e)}") |
|
|
|
|
|
def _is_valid_entity(self, phrase: str, code: str) -> bool: |
|
|
"""بررسی صحت موجودیت""" |
|
|
phrase_lower = phrase.lower() |
|
|
|
|
|
if code.startswith('company'): |
|
|
return any(word in phrase_lower for word in ['شرکت', 'بانک', 'سازمان', 'وزارت', 'گروه']) |
|
|
elif code.startswith('person'): |
|
|
return len(phrase) > 2 |
|
|
elif code.startswith('amount'): |
|
|
return bool(re.search(r'[\d]+', phrase)) |
|
|
elif code.startswith('percent'): |
|
|
return 'درصد' in phrase_lower or '%' in phrase |
|
|
|
|
|
return False |
|
|
|
|
|
def send_to_chatgpt(self, anonymized_text: str) -> Dict[str, Any]: |
|
|
try: |
|
|
if not anonymized_text or not anonymized_text.strip(): |
|
|
return {"success": False, "error": "متن ناشناسشده خالی است"} |
|
|
|
|
|
if not self.openai_api_key: |
|
|
return {"success": False, "error": "کلید OpenAI تنظیم نشده"} |
|
|
|
|
|
logger.info("🤖 ارسال به ChatGPT...") |
|
|
|
|
|
headers = { |
|
|
"Authorization": f"Bearer {self.openai_api_key}", |
|
|
"Content-Type": "application/json" |
|
|
} |
|
|
|
|
|
data = { |
|
|
"model": "gpt-4o-mini", |
|
|
"messages": [ |
|
|
{"role": "system", "content": "شما یک تحلیلگر مالی حرفهای هستید."}, |
|
|
{"role": "user", "content": anonymized_text} |
|
|
], |
|
|
"max_tokens": 2000, |
|
|
"temperature": 0.7 |
|
|
} |
|
|
|
|
|
response = requests.post( |
|
|
"https://api.openai.com/v1/chat/completions", |
|
|
headers=headers, |
|
|
json=data, |
|
|
timeout=30 |
|
|
) |
|
|
|
|
|
if response.status_code == 200: |
|
|
result = response.json() |
|
|
return {"success": True, "response": result['choices'][0]['message']['content']} |
|
|
else: |
|
|
error_data = response.json() if response.content else {} |
|
|
error_message = error_data.get('error', {}).get('message', response.text) |
|
|
return {"success": False, "error": error_message} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"❌ خطا: {str(e)}") |
|
|
return {"success": False, "error": str(e)} |
|
|
|
|
|
def _create_reverse_mapping(self, mapping_table: Dict[str, str]) -> Dict[str, str]: |
|
|
"""ایجاد reverse mapping: code -> original""" |
|
|
reverse_map = {} |
|
|
for original, code in mapping_table.items(): |
|
|
reverse_map[code] = original |
|
|
return reverse_map |
|
|
|
|
|
def deanonymize_response(self, gpt_response: str, mapping_table: Dict[str, str]) -> str: |
|
|
"""بازگردانی تمام موجودیتها (placeholderها و کدهای ساده را پشتیبانی میکند)""" |
|
|
try: |
|
|
if not mapping_table: |
|
|
return gpt_response |
|
|
|
|
|
result = gpt_response |
|
|
|
|
|
|
|
|
reverse_map = self._create_reverse_mapping(mapping_table) |
|
|
|
|
|
|
|
|
sorted_reverse = sorted(reverse_map.items(), key=lambda x: len(x[0]), reverse=True) |
|
|
|
|
|
logger.info(f"🔄 شروع بازگردانی {len(sorted_reverse)} موجودیت") |
|
|
|
|
|
for code, original in sorted_reverse: |
|
|
|
|
|
|
|
|
pattern_bracket = f"[{code}]" |
|
|
if pattern_bracket in result: |
|
|
result = result.replace(pattern_bracket, original) |
|
|
logger.info(f" [{code}] → {original} ✓") |
|
|
|
|
|
|
|
|
if code in result: |
|
|
result = result.replace(code, original) |
|
|
logger.info(f" {code} → {original} ✓") |
|
|
|
|
|
|
|
|
result = self._replace_special_patterns(result, reverse_map) |
|
|
|
|
|
logger.info(f"✅ بازگردانی موفق ({len(mapping_table)} موجودیت)") |
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"❌ خطا در بازگردانی: {str(e)}") |
|
|
return gpt_response |
|
|
|
|
|
def _replace_special_patterns(self, text: str, reverse_map: Dict[str, str]) -> str: |
|
|
"""جایگزینی الگوهای خاص (درصد-XX، amount-XX و غیره) - شامل placeholderهای بدون براکت""" |
|
|
try: |
|
|
result = text |
|
|
|
|
|
|
|
|
pattern_bracket = r'\[((?:company|person|amount|percent|درصد|شرکت|مبلغ)-\d+)\]' |
|
|
|
|
|
def replacer_bracket(match): |
|
|
code = match.group(1) |
|
|
|
|
|
code_normalized = code.replace('درصد', 'percent').replace('شرکت', 'company').replace('مبلغ', 'amount') |
|
|
return reverse_map.get(code_normalized, reverse_map.get(code, match.group(0))) |
|
|
|
|
|
result = re.sub(pattern_bracket, replacer_bracket, result) |
|
|
logger.info(f" الگوی 1 (با براکت): جایگزین شد") |
|
|
|
|
|
|
|
|
pattern_paren = r'\(((?:company|person|amount|percent|درصد|شرکت|مبلغ)-\d+)\)' |
|
|
|
|
|
def replacer_paren(match): |
|
|
code = match.group(1) |
|
|
code_normalized = code.replace('درصد', 'percent').replace('شرکت', 'company').replace('مبلغ', 'amount') |
|
|
return f"({reverse_map.get(code_normalized, reverse_map.get(code, code))})" |
|
|
|
|
|
result = re.sub(pattern_paren, replacer_paren, result) |
|
|
logger.info(f" الگوی 2 (پرانتز): جایگزین شد") |
|
|
|
|
|
|
|
|
|
|
|
pattern_simple = r'\b((?:company|person|amount|percent|درصد|شرکت|مبلغ)-\d+)\b' |
|
|
|
|
|
def replacer_simple(match): |
|
|
code = match.group(1) |
|
|
code_normalized = code.replace('درصد', 'percent').replace('شرکت', 'company').replace('مبلغ', 'amount') |
|
|
replacement = reverse_map.get(code_normalized, reverse_map.get(code, None)) |
|
|
if replacement: |
|
|
logger.info(f" {code} → {replacement}") |
|
|
return replacement |
|
|
return match.group(0) |
|
|
|
|
|
result = re.sub(pattern_simple, replacer_simple, result) |
|
|
logger.info(f" الگوی 3 (بدون براکت): جایگزین شد") |
|
|
|
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"⚠️ خطا در جایگزینی الگوهای خاص: {str(e)}") |
|
|
return text |
|
|
|
|
|
|
|
|
def create_interface(): |
|
|
try: |
|
|
anonymizer = AdvancedCerebrasAnonymizer() |
|
|
except ValueError as e: |
|
|
return gr.Interface(fn=lambda x: str(e), inputs="textbox", outputs="textbox", title="❌ خطا") |
|
|
|
|
|
def process_text(input_text: str) -> Tuple[str, str, str, str]: |
|
|
logger.info("=" * 60) |
|
|
logger.info("شروع پردازش") |
|
|
logger.info("=" * 60) |
|
|
|
|
|
if not input_text.strip(): |
|
|
return "❌ متن ورودی خالی است", "", "", "" |
|
|
|
|
|
try: |
|
|
|
|
|
logger.info("1️⃣ ناشناسسازی...") |
|
|
anon_result = anonymizer.anonymize_text(input_text) |
|
|
|
|
|
if not anon_result["success"]: |
|
|
return f"❌ {anon_result['error']}", "", "", "" |
|
|
|
|
|
anonymized_text = anon_result["anonymized_text"] |
|
|
usage_info = anon_result.get("usage", {}) |
|
|
|
|
|
|
|
|
logger.info("2️⃣ ارسال به ChatGPT...") |
|
|
gpt_result = anonymizer.send_to_chatgpt(anonymized_text) |
|
|
|
|
|
|
|
|
gpt_response = "" |
|
|
gpt_response_deanon = "" |
|
|
|
|
|
if not gpt_result["success"]: |
|
|
gpt_response = f"❌ {gpt_result['error']}" |
|
|
gpt_response_deanon = "" |
|
|
else: |
|
|
|
|
|
gpt_response = gpt_result["response"] |
|
|
|
|
|
|
|
|
logger.info("3️⃣ بازگردانی...") |
|
|
gpt_response_deanon = anonymizer.deanonymize_response(gpt_response, anonymizer.mapping_table) |
|
|
|
|
|
|
|
|
stats = f"""Token: {usage_info.get('total_tokens', '?')} | Mapping: {len(anonymizer.mapping_table)}""" |
|
|
|
|
|
logger.info("=" * 60) |
|
|
logger.info("✅ پردازش کامل") |
|
|
logger.info("=" * 60) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return stats, anonymized_text, gpt_response, gpt_response_deanon |
|
|
|
|
|
except Exception as e: |
|
|
return f"❌ خطا: {str(e)}", "", "", "" |
|
|
|
|
|
def copy_text(text: str): |
|
|
if not text or not text.strip(): |
|
|
return gr.update(visible=False), "⚠️ متنی وجود ندارد" |
|
|
return gr.update(value=text, visible=True), "✅ آماده برای کپی" |
|
|
|
|
|
def clear_all(): |
|
|
anonymizer.mapping_table = {} |
|
|
return "", "", "", "", gr.update(visible=False) |
|
|
|
|
|
with gr.Blocks(title="سیستم ناشناسسازی", theme=gr.themes.Soft()) as interface: |
|
|
gr.HTML("<h1 style='text-align: center; color: #FFD700;'>🔐 سیستم ناشناسسازی</h1>") |
|
|
|
|
|
with gr.Row(): |
|
|
|
|
|
with gr.Column(scale=1): |
|
|
gr.HTML("<h2>📥 ورودی</h2>") |
|
|
input_text = gr.Textbox(lines=20, placeholder="متن را وارد کنید...", label="", rtl=True) |
|
|
|
|
|
process_btn = gr.Button("🚀 پردازش", variant="primary", size="lg") |
|
|
with gr.Row(): |
|
|
copy_btn = gr.Button("📋 کپی", scale=1) |
|
|
clear_btn = gr.Button("🗑️ پاک", variant="stop", scale=1) |
|
|
|
|
|
copy_output = gr.Textbox(visible=False) |
|
|
|
|
|
|
|
|
with gr.Column(scale=1): |
|
|
gr.HTML("<h2>🎭 متن ناشناسشده</h2>") |
|
|
anonymized_output = gr.Textbox(lines=20, placeholder="", label="", interactive=False, rtl=True) |
|
|
|
|
|
|
|
|
with gr.Column(scale=1): |
|
|
gr.HTML("<h2>🤖 نتایج ChatGPT</h2>") |
|
|
gpt_output = gr.Textbox(lines=10, placeholder="", label="📤 پاسخ", interactive=False, rtl=True) |
|
|
final_output = gr.Textbox(lines=10, placeholder="", label="✅ نتیجه نهایی", interactive=False, rtl=True) |
|
|
statistics_output = gr.Textbox(lines=1, label="📊 آمار", interactive=False) |
|
|
|
|
|
process_btn.click(fn=process_text, inputs=[input_text], outputs=[statistics_output, anonymized_output, gpt_output, final_output]) |
|
|
copy_btn.click(fn=copy_text, inputs=[final_output], outputs=[copy_output, statistics_output]) |
|
|
clear_btn.click(fn=clear_all, outputs=[input_text, anonymized_output, gpt_output, final_output, copy_output]) |
|
|
|
|
|
return interface |
|
|
|
|
|
|
|
|
def main(): |
|
|
print("\n🔐 سیستم ناشناسسازی - نسخه 1.0.0\n") |
|
|
interface = create_interface() |
|
|
interface.launch(server_name="0.0.0.0", server_port=7860, share=False, show_error=True) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|