Data-anonymization / app_final_unified.py
leilaghomashchi's picture
Upload 2 files
1901c07 verified
raw
history blame
18.6 kB
import gradio as gr
import re
import os
import requests
import json
import logging
from typing import Dict, Tuple
from unified_llm_sender import UnifiedLLMSender, get_available_models, get_model_display_names
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class AnonymizerAdvanced:
"""ناشناس‌ساز پیشرفته با Cerebras"""
def __init__(self, cerebras_key: str = None):
self.cerebras_key = cerebras_key or os.getenv("CEREBRAS_API_KEY") or os.getenv("GR00_API_KEY")
self.mapping_table = {}
self.reverse_mapping = {}
logger.info("✅ Anonymizer Advanced مقداردهی شد")
def anonymize_with_cerebras(self, text: str) -> Tuple[str, Dict]:
"""ناشناس‌سازی با Cerebras - دریافت mapping از مدل"""
logger.info("🧠 روش Cerebras...")
if not self.cerebras_key:
logger.error("❌ Cerebras API Key موجود نیست")
raise ValueError("Cerebras API Key مورد نیاز است (CEREBRAS_API_KEY یا GR00_API_KEY)")
try:
# مرحله 1: ناشناس‌سازی متن
prompt1 = f"""متن زیر را ناشناس کنید. قوانین:
1. اسامی اشخاص → person-01, person-02, ...
2. نام شرکت‌ها/سازمان‌ها → company-01, company-02, ...
3. مقادیر پولی → amount-01, amount-02, ...
4. درصدها → percent-01, percent-02, ...
5. فقط این توکن‌ها استفاده کنید
6. شماره‌های نسخه را درست حفظ کنید
7. اگر موجودیت تکرار شود از شماره قدیمی استفاده کنید
متن:
{text}
خروجی: فقط متن ناشناس شده"""
response1 = requests.post(
"https://api.cerebras.ai/v1/chat/completions",
headers={
"Authorization": f"Bearer {self.cerebras_key}",
"Content-Type": "application/json"
},
json={
"model": "llama-3.3-70b",
"messages": [{"role": "user", "content": prompt1}],
"max_tokens": 4096,
"temperature": 0.1
},
timeout=60
)
if response1.status_code != 200:
logger.error(f"❌ Cerebras Error: {response1.status_code}")
raise Exception(f"Cerebras API Error: {response1.status_code}")
anonymized_text = response1.json()['choices'][0]['message']['content'].strip()
logger.info("✅ Cerebras: ناشناس‌سازی موفق")
# مرحله 2: استخراج mapping از مدل
prompt2 = f"""متن اصلی:
{text}
متن ناشناس شده:
{anonymized_text}
لطفاً یک جدول mapping برای همه توکن‌های ناشناس ایجاد کن.
برای هر توکن، متن اصلی کامل آن را مشخص کن.
**مهم:**
- برای person-XX: نام کامل شخص (مثلاً "علی احمدی")
- برای company-XX: نام کامل شرکت/سازمان (مثلاً "شرکت پتروشیمی")
- برای amount-XX: عدد + واحد (مثلاً "80 هزار تومان" یا "50 میلیارد ریال")
- برای percent-XX: عدد + کلمه "درصد" (مثلاً "40 درصد" نه فقط "40")
خروجی را به این فرمت JSON بده (فقط JSON، بدون توضیح اضافی):
{{
"person-01": "متن اصلی کامل",
"company-01": "متن اصلی کامل",
"amount-01": "متن اصلی کامل با واحد",
"percent-01": "عدد + درصد",
...
}}"""
response2 = requests.post(
"https://api.cerebras.ai/v1/chat/completions",
headers={
"Authorization": f"Bearer {self.cerebras_key}",
"Content-Type": "application/json"
},
json={
"model": "llama-3.3-70b",
"messages": [{"role": "user", "content": prompt2}],
"max_tokens": 2048,
"temperature": 0.1
},
timeout=60
)
if response2.status_code == 200:
mapping_text = response2.json()['choices'][0]['message']['content'].strip()
mapping_text = mapping_text.replace('```json', '').replace('```', '').strip()
try:
self.mapping_table = json.loads(mapping_text)
self._fix_percent_mapping()
self.reverse_mapping = {v: k for k, v in self.mapping_table.items()}
logger.info(f"✅ Mapping استخراج شد: {len(self.mapping_table)} موجودیت")
except json.JSONDecodeError:
logger.warning("⚠️ خطا در parse کردن JSON mapping - استفاده از روش fallback")
self._extract_mapping_from_text(text, anonymized_text)
else:
logger.warning("⚠️ خطا در دریافت mapping - استفاده از روش fallback")
self._extract_mapping_from_text(text, anonymized_text)
return anonymized_text, self.mapping_table
except Exception as e:
logger.error(f"❌ Cerebras Exception: {e}")
raise
def _fix_percent_mapping(self):
"""اصلاح mapping برای درصدها و مقادیر"""
for token, value in self.mapping_table.items():
value_str = str(value).strip()
if token.startswith('percent-'):
if not re.search(r'(درصد|%|درصدی)', value_str):
self.mapping_table[token] = f"{value_str} درصد"
logger.info(f"✅ اصلاح {token}: '{value_str}' → '{value_str} درصد'")
elif token.startswith('amount-'):
if not re.search(r'(میلیارد|میلیون|هزار|تومان|ریال|دلار|یورو|تن)', value_str):
logger.warning(f"⚠️ {token}: فقط عدد '{value_str}' - واحد مشخص نیست")
def _extract_mapping_from_text(self, original: str, anonymized: str):
"""استخراج mapping از متن‌های اصلی و ناشناس شده"""
all_tokens = []
for entity_type in ['person', 'company', 'amount', 'percent']:
tokens = re.findall(f'{entity_type}-\\d+', anonymized)
all_tokens.extend([(t, entity_type) for t in tokens])
all_tokens = sorted(set(all_tokens), key=lambda x: (x[1], int(x[0].split('-')[1])))
patterns = {
'person': r'\b[ء-ي]+\s+[ء-ي]+(?:\s+[ء-ي]+)*\b',
'company': r'(?:شرکت|بانک|سازمان|گروه|هلدینگ)\s+[ء-ي]+(?:\s+[ء-ي]+)*',
'amount': r'\d+(?:\.\d+)?\s*(?:میلیارد|میلیون|هزار|تومان|ریال|دلار|یورو|تن)',
'percent': r'\d+(?:\.\d+)?\s*(?:درصد|%|درصدی)',
}
original_entities = {}
for entity_type, pattern in patterns.items():
matches = list(re.finditer(pattern, original))
original_entities[entity_type] = [m.group().strip() for m in matches]
for token, entity_type in all_tokens:
if entity_type in original_entities and original_entities[entity_type]:
token_num = int(token.split('-')[1]) - 1
if token_num < len(original_entities[entity_type]):
original_text = original_entities[entity_type][token_num]
self.mapping_table[token] = original_text
self.reverse_mapping[original_text] = token
else:
original_text = original_entities[entity_type][-1]
if token not in self.mapping_table:
self.mapping_table[token] = original_text
self.reverse_mapping[original_text] = token
def analyze_with_model(self, anonymized_text: str, analysis_prompt: str, model_name: str) -> str:
"""
اجرای پرامپت‌ها با مدل انتخابی
"""
logger.info(f"🤖 {model_name} اجرای پرامپت...")
if not analysis_prompt or not analysis_prompt.strip():
logger.info("⚠️ پرامپتی وارد نشده - متن ناشناس‌سازی شده برگردانده می‌شود")
return anonymized_text
try:
# ساخت system message
system_msg = """شما یک تحلیلگر مالی حرفه‌ای هستید. متن حاوی کدهای ناشناس است (person-XX، company-XX، amount-XX، percent-XX).
به سوالات و درخواست‌ها با دقت پاسخ دهید و این کدها را در پاسخ خود حفظ کنید."""
# ساخت پیام کامل
full_text = f"""{analysis_prompt}
متن برای تحلیل:
{anonymized_text}"""
# استفاده از UnifiedLLMSender
sender = UnifiedLLMSender(model=model_name)
response = sender.send(
text=full_text,
system_msg=system_msg,
max_tokens=4096,
temperature=0.1,
lang='fa'
)
logger.info(f"✅ {model_name} پاسخ داد: {len(response)} کاراکتر")
return response
except Exception as e:
logger.error(f"❌ {model_name} Exception: {e}")
return f"❌ خطا در {model_name}: {str(e)}"
def restore_text(self, anonymized_text: str) -> str:
"""بازگردانی متن ناشناس‌سازی شده به متن اصلی"""
logger.info("🔄 بازگردانی متن...")
if not self.mapping_table:
logger.warning("⚠️ جدول نگاشت خالی است")
return anonymized_text
restored = anonymized_text
for placeholder, original in sorted(self.mapping_table.items()):
restored = restored.replace(placeholder, original)
logger.info("✅ بازگردانی کامل")
return restored
def get_mapping_table_md(self) -> str:
"""تبدیل جدول نگاشت به Markdown"""
if not self.mapping_table:
return "### 📋 جدول نگاشت\n\nهیچ موجودیتی شناسایی نشد"
table = "### 📋 جدول نگاشت\n\n"
table += "| شناسه | متن اصلی |\n"
table += "|-------|----------|\n"
for token, original in sorted(self.mapping_table.items()):
table += f"| **{token}** | {original} |\n"
return table
# متغیر سراسری
anonymizer = None
def get_available_model_choices():
"""دریافت لیست مدل‌های موجود برای Dropdown"""
available = get_available_models()
display_names = get_model_display_names()
choices = []
for model_name, info in available.items():
if info['has_key']:
choices.append(display_names.get(model_name, model_name))
# اگر هیچ مدلی موجود نیست، یک پیام نمایش بده
if not choices:
choices = ["❌ هیچ API Key موجود نیست"]
return choices
def get_model_name_from_display(display_name: str) -> str:
"""تبدیل نام نمایشی به نام مدل"""
display_names = get_model_display_names()
reverse_map = {v: k for k, v in display_names.items()}
return reverse_map.get(display_name, display_name)
def process(input_text: str, analysis_prompt: str, model_choice: str):
"""پردازش متن - 4 مرحله"""
global anonymizer
if not input_text.strip():
return "", "", "", ""
# دریافت نام واقعی مدل
model_name = get_model_name_from_display(model_choice)
if not anonymizer:
anonymizer = AnonymizerAdvanced()
else:
anonymizer.mapping_table = {}
anonymizer.reverse_mapping = {}
try:
logger.info("=" * 70)
logger.info(f"🚀 شروع پردازش - مدل تحلیل: {model_name}")
logger.info("=" * 70)
# مرحله 1: ناشناس‌سازی
logger.info("📝 مرحله 1: ناشناس‌سازی...")
anonymized_text, _ = anonymizer.anonymize_with_cerebras(input_text)
logger.info(f"✅ ناشناس‌سازی: {len(anonymized_text)} کاراکتر")
# مرحله 2: مدل انتخابی
logger.info(f"🤖 مرحله 2: {model_name}...")
model_response = anonymizer.analyze_with_model(anonymized_text, analysis_prompt, model_name)
logger.info(f"✅ {model_name}: {len(model_response)} کاراکتر")
# مرحله 3: بازگردانی
logger.info("🔄 مرحله 3: بازگردانی...")
restored_text = anonymizer.restore_text(model_response)
logger.info("✅ بازگردانی کامل")
# مرحله 4: جدول نگاشت
logger.info("📋 مرحله 4: جدول نگاشت...")
mapping_str = anonymizer.get_mapping_table_md()
logger.info(f"✅ {len(anonymizer.mapping_table)} موجودیت")
logger.info("=" * 70)
logger.info("✅ تمام مراحل کامل!")
logger.info("=" * 70)
return restored_text, model_response, anonymized_text, mapping_str
except Exception as e:
logger.error(f"❌ خطا: {str(e)}", exc_info=True)
return "", f"❌ خطا: {str(e)}", "", ""
def clear_all():
"""پاک کردن همه"""
return "", "", "", "", "", ""
# Gradio Interface
css_rtl = """
.input-box { direction: rtl; text-align: right; }
.textbox textarea { direction: rtl; text-align: right; font-family: 'Tahoma', serif; }
"""
with gr.Blocks(title="سیستم ناشناس‌سازی متون", theme=gr.themes.Soft(), css=css_rtl) as app:
gr.Markdown("# 🔐 سیستم ناشناس‌سازی متون مالی فارسی", elem_classes="input-box")
gr.Markdown("### 🌟 با پشتیبانی از مدل‌های پیشرفته AI", elem_classes="input-box")
with gr.Row():
with gr.Column(scale=1):
# منوی انتخاب مدل - بارگذاری دینامیک
model_dropdown = gr.Dropdown(
choices=get_available_model_choices(),
value=get_available_model_choices()[0] if get_available_model_choices() else None,
label="🤖 انتخاب مدل تحلیل",
info="فقط مدل‌هایی که API Key دارند نمایش داده می‌شوند",
interactive=True
)
analysis_prompt = gr.Textbox(
lines=8,
placeholder="مثال: این متن را خلاصه کن\nمثال: نقاط قوت و ضعف را استخراج کن",
label="📋 دستورات تحلیل (اختیاری)",
elem_classes="textbox"
)
gr.Markdown("---")
with gr.Column():
process_btn = gr.Button(
"▶️ پردازش",
variant="primary",
size="lg"
)
clear_btn = gr.Button(
"🗑️ پاک کردن",
variant="stop",
size="lg"
)
with gr.Column(scale=3):
input_text = gr.Textbox(
lines=14,
placeholder="متن مالی/خبری فارسی را وارد کنید...",
label="📝 متن ورودی",
elem_classes="textbox"
)
gr.Markdown("---")
gr.Markdown("## 📊 نتایج پردازش", elem_classes="input-box")
with gr.Row():
with gr.Column(scale=1):
restored_text = gr.Textbox(
lines=12,
label="✅ متن بازگردانی شده",
interactive=False,
elem_classes="textbox"
)
with gr.Column(scale=1):
model_analysis = gr.Textbox(
lines=12,
label="🤖 تحلیل مدل (ناشناس)",
interactive=False,
elem_classes="textbox"
)
with gr.Column(scale=1):
anonymized_text = gr.Textbox(
lines=12,
label="🔒 متن ناشناس‌شده",
interactive=False,
elem_classes="textbox"
)
gr.Markdown("---")
mapping_table = gr.Markdown(
value="### 📋 جدول نگاشت\n\nهنوز پردازشی انجام نشده",
label="📋 جدول نگاشت",
elem_classes="input-box"
)
# Event Handlers
process_btn.click(
fn=process,
inputs=[input_text, analysis_prompt, model_dropdown],
outputs=[restored_text, model_analysis, anonymized_text, mapping_table]
)
clear_btn.click(
fn=clear_all,
outputs=[input_text, analysis_prompt, restored_text, model_analysis, anonymized_text, mapping_table]
)
if __name__ == "__main__":
print("=" * 70)
print("🚀 سیستم ناشناس‌سازی متون در حال راه‌اندازی...")
print("=" * 70)
# نمایش مدل‌های موجود
available = get_available_models()
display_names = get_model_display_names()
print("\n📋 مدل‌های موجود:\n")
for model_name, info in available.items():
status = "✅" if info['has_key'] else "❌"
display = display_names.get(model_name, model_name)
print(f" {status} {display} ({info['env_key']})")
print("\n" + "=" * 70 + "\n")
app.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
show_error=True
)