import requests
import json
import gradio as gr
from typing import Dict, Any, List, Generator, Optional
import os
from dataclasses import dataclass
import re
import pandas as pd
import time
from datetime import datetime
import threading
from queue import Queue
import io
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from enum import Enum
class APIProvider(Enum):
"""پرووایدرهای API پشتیبانی شده"""
CEREBRAS = "cerebras"
GROQ = "groq"
OPENAI = "openai"
@dataclass
class APIConfig:
"""تنظیمات API"""
provider: APIProvider
api_key: str
base_url: str
model: str
max_tokens: int = 2000
temperature: float = 0.1
# تنظیمات پیشفرض برای هر پرووایدر
API_CONFIGS = {
APIProvider.CEREBRAS: {
"base_url": "https://api.cerebras.ai/v1",
"models": ["qwen-3-32b", "llama-4-scout-17b-16e-instruct", "llama3.1-8b"],
"default_model": "qwen-3-32b"
},
APIProvider.GROQ: {
"base_url": "https://api.groq.com/openai/v1",
"models": ["llama-3.3-70b-versatile", "llama-3.1-8b-instant", "mixtral-8x7b-32768"],
"default_model": "llama-3.3-70b-versatile"
},
APIProvider.OPENAI: {
"base_url": "https://api.openai.com/v1",
"models": ["gpt-4o-mini", "gpt-4o", "gpt-3.5-turbo"],
"default_model": "gpt-4o-mini"
}
}
@dataclass
class RateLimitConfig:
"""تنظیمات محدودیت نرخ درخواست"""
requests_per_minute: int = 30
min_delay_between_requests: float = 2.0
max_retries: int = 5
initial_backoff: float = 5.0
max_backoff: float = 60.0
backoff_multiplier: float = 2.0
recovery_window: float = 60.0
class RateLimiter:
"""مدیریت محدودیت نرخ درخواست"""
def __init__(self, config: RateLimitConfig):
self.config = config
self.request_times: List[float] = []
self.lock = threading.Lock()
self.consecutive_failures = 0
self.last_failure_time = 0
self.last_success_time = time.time()
self.total_429_errors = 0
def wait_if_needed(self) -> float:
"""انتظار تا زمان مجاز ارسال درخواست بعدی"""
with self.lock:
now = time.time()
# پاک کردن درخواستهای قدیمی
self.request_times = [t for t in self.request_times if now - t < 60]
# بازیابی از خطاهای قبلی
if self.consecutive_failures > 0 and (now - self.last_failure_time) > self.config.recovery_window:
self.consecutive_failures = max(0, self.consecutive_failures - 1)
wait_time = 0.0
# محدودیت درخواست در دقیقه
if len(self.request_times) >= self.config.requests_per_minute:
oldest_request = min(self.request_times)
wait_time = max(wait_time, 60 - (now - oldest_request) + 1)
# حداقل تأخیر
if self.request_times:
time_since_last = now - max(self.request_times)
if time_since_last < self.config.min_delay_between_requests:
wait_time = max(wait_time, self.config.min_delay_between_requests - time_since_last)
# backoff برای خطاها
if self.consecutive_failures > 0:
failure_wait = min(
self.config.initial_backoff * (self.config.backoff_multiplier ** min(self.consecutive_failures, 4)),
self.config.max_backoff
)
wait_time = max(wait_time, failure_wait)
if wait_time > 0:
time.sleep(wait_time)
self.request_times.append(time.time())
return wait_time
def report_success(self):
with self.lock:
self.last_success_time = time.time()
self.consecutive_failures = 0
def report_failure(self, is_rate_limit: bool = False):
with self.lock:
self.last_failure_time = time.time()
if is_rate_limit:
self.consecutive_failures += 1
self.total_429_errors += 1
else:
self.consecutive_failures = min(self.consecutive_failures + 0.5, 3)
def get_estimated_wait_time(self) -> float:
with self.lock:
now = time.time()
self.request_times = [t for t in self.request_times if now - t < 60]
if len(self.request_times) >= self.config.requests_per_minute:
return max(0, 60 - (now - min(self.request_times)) + 1)
return self.config.min_delay_between_requests
def get_status(self) -> Dict[str, Any]:
with self.lock:
return {
"consecutive_failures": self.consecutive_failures,
"total_429_errors": self.total_429_errors,
"requests_in_last_minute": len(self.request_times)
}
class MultiProviderAnonymizer:
"""سیستم ناشناسسازی با پشتیبانی از چند API"""
def __init__(
self,
provider: APIProvider,
api_key: str,
model: str = None,
rate_limit_config: RateLimitConfig = None
):
self.provider = provider
self.api_key = api_key
provider_config = API_CONFIGS[provider]
self.base_url = provider_config["base_url"]
self.model = model or provider_config["default_model"]
self.rate_limit_config = rate_limit_config or RateLimitConfig()
self.rate_limiter = RateLimiter(self.rate_limit_config)
self.system_prompt = self._create_system_prompt()
self.session = self._create_session()
def _create_session(self) -> requests.Session:
"""ایجاد session با تنظیمات بهینه"""
session = requests.Session()
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[500, 502, 503, 504],
allowed_methods=["POST"]
)
adapter = HTTPAdapter(max_retries=retry_strategy, pool_connections=10, pool_maxsize=10)
session.mount("https://", adapter)
session.mount("http://", adapter)
return session
def _create_system_prompt(self) -> str:
"""دستورالعمل سیستمی"""
return """شما یک «ناشناسساز متون مالی/خبری فارسی» هستید. وظیفهتان جایگزینی اسامی خاص و مقادیر عددی با شناسههای بیمعناست.
## **قوانین اندیسگذاری**
- شرکتها: company-01, company-02, ... (پیوسته)
- اشخاص: person-01, person-02, ... (پیوسته)
- اعداد: amount-01, amount-02, ... (پیوسته)
- درصدها: percent-01, percent-02, ... (پیوسته)
## **ثبات شناسهها:**
- اگر "همراه اول" اولبار company-01 شد، در تمام متن همان باشد
## **تشخیص صحیح:**
- **شرکت/سازمان:** همراه اول، بانک ملی، ایرانخودرو، سایپا، بانک مرکزی
- **گروهها:** "گروه همراه اول" → company-XX (نه group-XX)
- **کلمات عمومی:** "سه شرکت"، "چند بانک" → حفظ شوند
- **شخص:** مهدی اخوان بهابادی، محمدرضا فرزین
- **عدد:** 37، 70، 677 (هر عددی)
- **درصد:** 37 درصدی، 15%
## **مثال:**
**ورودی:** مهدی اخوان، مدیرعامل همراه اول، اعلام کرد درآمد با رشد 37 درصدی به 70 میلیارد رسید.
**خروجی:** person-01، مدیرعامل company-01، اعلام کرد درآمد با رشد percent-01 به amount-01 رسید.
## **حفظ شود:**
- تاریخها: 1404/04/23
- دورههای زمانی: ۹ ماهه، ۵ ماهه سال
- واحدها: میلیارد تومان، همت
- مکانها: تهران، اصفهان
**فقط متن ناشناسشده را برگردان - بدون توضیح اضافی.**
"""
def _make_api_request(self, text: str) -> Dict[str, Any]:
"""ارسال درخواست به API"""
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
payload = {
"messages": [
{"role": "system", "content": self.system_prompt},
{"role": "user", "content": text}
],
"model": self.model,
"temperature": 0.1,
"max_tokens": 2000
}
# اضافه کردن پارامترهای خاص هر پرووایدر
if self.provider == APIProvider.GROQ:
# Groq نیاز به این پارامتر نداره
pass
last_error = None
for attempt in range(self.rate_limit_config.max_retries):
self.rate_limiter.wait_if_needed()
try:
response = self.session.post(
f"{self.base_url}/chat/completions",
headers=headers,
json=payload,
timeout=90
)
if response.status_code == 429:
self.rate_limiter.report_failure(is_rate_limit=True)
retry_after = response.headers.get('Retry-After')
wait_seconds = int(retry_after) if retry_after else min(
self.rate_limit_config.initial_backoff * (self.rate_limit_config.backoff_multiplier ** attempt),
60.0
)
last_error = f"Rate limit (429). تلاش {attempt + 1}/{self.rate_limit_config.max_retries}"
print(f"🚫 {last_error} - انتظار {wait_seconds}s")
time.sleep(wait_seconds)
continue
if response.status_code == 401:
raise Exception(f"❌ کلید API نامعتبر است برای {self.provider.value}")
if response.status_code == 503:
self.rate_limiter.report_failure(is_rate_limit=False)
last_error = f"سرویس در دسترس نیست (503)"
time.sleep(10)
continue
response.raise_for_status()
self.rate_limiter.report_success()
return response.json()
except requests.exceptions.Timeout:
self.rate_limiter.report_failure(is_rate_limit=False)
last_error = f"Timeout"
time.sleep(5)
except requests.exceptions.ConnectionError as e:
self.rate_limiter.report_failure(is_rate_limit=False)
last_error = f"خطای اتصال: {str(e)[:50]}"
time.sleep(10)
except requests.exceptions.RequestException as e:
self.rate_limiter.report_failure(is_rate_limit=False)
last_error = f"خطای شبکه: {str(e)[:50]}"
time.sleep(5)
raise Exception(f"ناموفق پس از {self.rate_limit_config.max_retries} تلاش: {last_error}")
def anonymize_text(self, text: str) -> Dict[str, Any]:
"""ناشناسسازی متن"""
if not text or not text.strip():
return {"success": False, "error": "متن ورودی خالی است", "anonymized_text": ""}
try:
response = self._make_api_request(text)
if "choices" not in response or not response["choices"]:
return {"success": False, "error": "پاسخ نامعتبر", "anonymized_text": ""}
content = response["choices"][0]["message"]["content"]
# پاک کردن markdown
if "```" in content:
lines = content.split('\n')
clean_lines = []
skip = False
for line in lines:
if line.strip().startswith('```'):
skip = not skip
continue
if not skip:
clean_lines.append(line)
content = '\n'.join(clean_lines)
content = content.strip()
# پاک کردن تگهای thinking (برای مدلهای Qwen و مشابه)
content = re.sub(r'.*?', '', content, flags=re.DOTALL)
content = re.sub(r'.*?', '', content, flags=re.DOTALL)
content = content.strip()
# تحلیل نتایج
companies = re.findall(r'company-(\d+)', content)
persons = re.findall(r'person-(\d+)', content)
amounts = re.findall(r'amount-(\d+)', content)
percents = re.findall(r'percent-(\d+)', content)
return {
"success": True,
"anonymized_text": content,
"statistics": {
"company": len(set(companies)),
"person": len(set(persons)),
"amount": len(set(amounts)),
"percent": len(set(percents))
},
"usage": response.get("usage", {})
}
except Exception as e:
return {"success": False, "error": f"خطا: {str(e)}", "anonymized_text": ""}
class BatchProcessor:
"""پردازشگر دستهای"""
def __init__(
self,
provider: APIProvider,
api_key: str,
model: str = None,
rate_limit_config: RateLimitConfig = None
):
self.provider = provider
self.api_key = api_key
self.model = model
self.rate_limit_config = rate_limit_config or RateLimitConfig()
self.anonymizer = None
self.is_cancelled = False
self.current_progress = 0
self.total_rows = 0
self.processed_rows = 0
self.failed_rows = 0
self.start_time = None
self.consecutive_api_failures = 0
self.max_consecutive_failures = 10
def cancel(self):
self.is_cancelled = True
def reset(self):
self.is_cancelled = False
self.current_progress = 0
self.total_rows = 0
self.processed_rows = 0
self.failed_rows = 0
self.start_time = None
self.consecutive_api_failures = 0
def process_csv(
self,
file_path: str,
text_column: str,
output_column: str = "anonymized_text"
) -> Generator[Dict[str, Any], None, None]:
"""پردازش فایل CSV"""
self.reset()
self.start_time = time.time()
# خواندن فایل
try:
df = pd.read_csv(file_path, encoding='utf-8')
except UnicodeDecodeError:
try:
df = pd.read_csv(file_path, encoding='utf-8-sig')
except:
df = pd.read_csv(file_path, encoding='cp1256')
if text_column not in df.columns:
yield {"type": "error", "message": f"ستون '{text_column}' یافت نشد"}
return
self.total_rows = len(df)
# ایجاد anonymizer
self.anonymizer = MultiProviderAnonymizer(
provider=self.provider,
api_key=self.api_key,
model=self.model,
rate_limit_config=self.rate_limit_config
)
df[output_column] = ""
df["anonymization_status"] = ""
df["entities_found"] = ""
yield {
"type": "info",
"message": f"🚀 شروع پردازش {self.total_rows} ردیف با {self.provider.value}..."
}
for idx, row in df.iterrows():
if self.is_cancelled:
yield {
"type": "cancelled",
"message": "لغو شد",
"processed": self.processed_rows,
"failed": self.failed_rows
}
break
if self.consecutive_api_failures >= self.max_consecutive_failures:
partial_path = file_path.replace('.csv', f'_partial_{idx}.csv')
df.to_csv(partial_path, index=False, encoding='utf-8-sig')
yield {
"type": "error",
"message": f"❌ توقف - خطاهای متوالی زیاد. ذخیره شد: {partial_path}"
}
return
text = str(row[text_column]) if pd.notna(row[text_column]) else ""
if not text.strip():
df.at[idx, output_column] = ""
df.at[idx, "anonymization_status"] = "خالی"
self.processed_rows += 1
continue
result = self.anonymizer.anonymize_text(text)
if result["success"]:
df.at[idx, output_column] = result["anonymized_text"]
df.at[idx, "anonymization_status"] = "موفق"
stats = result.get("statistics", {})
df.at[idx, "entities_found"] = f"C:{stats.get('company',0)}|P:{stats.get('person',0)}|A:{stats.get('amount',0)}|%:{stats.get('percent',0)}"
self.processed_rows += 1
self.consecutive_api_failures = 0
else:
df.at[idx, output_column] = f"خطا: {result.get('error', '')}"
df.at[idx, "anonymization_status"] = "ناموفق"
self.failed_rows += 1
if "rate limit" in result.get('error', '').lower() or "429" in result.get('error', ''):
self.consecutive_api_failures += 1
# پیشرفت
self.current_progress = (idx + 1) / self.total_rows * 100
elapsed = time.time() - self.start_time
yield {
"type": "progress",
"current": idx + 1,
"total": self.total_rows,
"progress": self.current_progress,
"processed": self.processed_rows,
"failed": self.failed_rows,
"elapsed": elapsed,
"estimated_remaining": (elapsed / (idx + 1)) * (self.total_rows - idx - 1),
"next_wait": self.anonymizer.rate_limiter.get_estimated_wait_time(),
"last_result": result,
"rate_status": self.anonymizer.rate_limiter.get_status()
}
# Checkpoint هر 100 ردیف
if (idx + 1) % 100 == 0:
checkpoint_path = file_path.replace('.csv', f'_checkpoint_{idx+1}.csv')
df.to_csv(checkpoint_path, index=False, encoding='utf-8-sig')
yield {"type": "info", "message": f"💾 Checkpoint: {checkpoint_path}"}
# ذخیره نهایی
if not self.is_cancelled:
output_path = file_path.replace('.csv', '_anonymized.csv')
if output_path == file_path:
output_path = file_path + '_anonymized.csv'
df.to_csv(output_path, index=False, encoding='utf-8-sig')
yield {
"type": "complete",
"message": "✅ تکمیل شد!",
"output_path": output_path,
"total": self.total_rows,
"processed": self.processed_rows,
"failed": self.failed_rows,
"total_time": time.time() - self.start_time
}
def create_interface():
"""ایجاد رابط کاربری"""
custom_css = """
.rtl-text { direction: rtl; text-align: right; font-family: 'Vazirmatn', 'Tahoma', sans-serif; }
"""
with gr.Blocks(css=custom_css, title="ناشناسساز چند-API", theme=gr.themes.Soft()) as interface:
gr.Markdown("""
# 🔒 سیستم ناشناسسازی متون فارسی
### پشتیبانی از Cerebras، Groq و OpenAI
""", elem_classes=["rtl-text"])
batch_processor = {"instance": None}
with gr.Tabs():
# تب پردازش تکی
with gr.Tab("پردازش تکی"):
with gr.Row():
with gr.Column():
single_provider = gr.Dropdown(
label="🌐 پرووایدر API",
choices=["cerebras", "groq", "openai"],
value="groq",
elem_classes=["rtl-text"]
)
single_api_key = gr.Textbox(
label="🔑 کلید API",
placeholder="کلید API خود را وارد کنید",
type="password"
)
single_model = gr.Dropdown(
label="🤖 مدل",
choices=["llama-3.3-70b-versatile", "llama-3.1-8b-instant"],
value="llama-3.3-70b-versatile"
)
single_input = gr.Textbox(
label="متن ورودی",
lines=5,
elem_classes=["rtl-text"]
)
single_btn = gr.Button("🔄 ناشناسسازی", variant="primary")
with gr.Column():
single_output = gr.Textbox(label="متن ناشناسشده", lines=5, elem_classes=["rtl-text"])
single_stats = gr.Textbox(label="آمار", lines=2)
# تب پردازش دستهای
with gr.Tab("پردازش دستهای"):
with gr.Row():
with gr.Column():
batch_provider = gr.Dropdown(
label="🌐 پرووایدر API",
choices=["cerebras", "groq", "openai"],
value="groq"
)
batch_api_key = gr.Textbox(
label="🔑 کلید API",
type="password"
)
batch_model = gr.Dropdown(
label="🤖 مدل",
choices=["llama-3.3-70b-versatile", "qwen-3-32b", "gpt-4o-mini"],
value="llama-3.3-70b-versatile"
)
csv_file = gr.File(label="📁 فایل CSV", file_types=[".csv"])
text_column = gr.Dropdown(label="ستون متن", choices=[], allow_custom_value=True)
output_column = gr.Textbox(label="نام ستون خروجی", value="anonymized_text")
with gr.Column():
delay_slider = gr.Slider(1, 30, 3, step=0.5, label="تأخیر (ثانیه)")
rpm_slider = gr.Slider(5, 30, 20, step=1, label="حداکثر درخواست/دقیقه")
retries_slider = gr.Slider(1, 10, 5, step=1, label="تلاش مجدد")
with gr.Row():
start_btn = gr.Button("🚀 شروع", variant="primary", size="lg")
cancel_btn = gr.Button("⏹️ لغو", variant="stop", size="lg")
progress_bar = gr.Slider(0, 100, 0, label="پیشرفت", interactive=False)
progress_text = gr.Markdown("در انتظار...")
time_stats = gr.Markdown("")
process_log = gr.Textbox(lines=10, label="لاگ")
preview_table = gr.Dataframe(headers=["متن", "ناشناسشده", "وضعیت"])
output_file = gr.File(label="دانلود", visible=False)
# توابع
def update_models(provider):
if provider == "cerebras":
return gr.update(choices=["qwen-3-32b", "llama-4-scout-17b-16e-instruct"], value="qwen-3-32b")
elif provider == "groq":
return gr.update(choices=["llama-3.3-70b-versatile", "llama-3.1-8b-instant", "mixtral-8x7b-32768"], value="llama-3.3-70b-versatile")
else:
return gr.update(choices=["gpt-4o-mini", "gpt-4o"], value="gpt-4o-mini")
def update_columns(file):
if file is None:
return gr.update(choices=[], value=None)
try:
df = pd.read_csv(file.name, nrows=0, encoding='utf-8')
except:
try:
df = pd.read_csv(file.name, nrows=0, encoding='utf-8-sig')
except:
df = pd.read_csv(file.name, nrows=0, encoding='cp1256')
columns = list(df.columns)
return gr.update(choices=columns, value=columns[0] if columns else None)
def process_single(provider, api_key, model, text):
if not text.strip():
return "", "❌ متن خالی"
if not api_key:
return "", "❌ کلید API وارد نشده"
try:
prov = APIProvider(provider)
anonymizer = MultiProviderAnonymizer(provider=prov, api_key=api_key, model=model)
result = anonymizer.anonymize_text(text)
if result["success"]:
stats = result.get("statistics", {})
return result["anonymized_text"], f"✅ C:{stats.get('company',0)} | P:{stats.get('person',0)} | A:{stats.get('amount',0)} | %:{stats.get('percent',0)}"
else:
return "", f"❌ {result.get('error', '')}"
except Exception as e:
return "", f"❌ {str(e)}"
def start_batch(file, text_col, output_col, delay, rpm, retries, provider, api_key, model):
if file is None:
yield (0, "❌ فایل انتخاب نشده", "", "", None, gr.update(visible=False))
return
if not api_key:
yield (0, "❌ کلید API وارد نشده", "", "", None, gr.update(visible=False))
return
prov = APIProvider(provider)
rate_config = RateLimitConfig(
requests_per_minute=int(rpm),
min_delay_between_requests=float(delay),
max_retries=int(retries)
)
processor = BatchProcessor(provider=prov, api_key=api_key, model=model, rate_limit_config=rate_config)
batch_processor["instance"] = processor
log_lines = []
preview_data = []
for update in processor.process_csv(file.name, text_col, output_col):
update_type = update.get("type")
if update_type == "error":
log_lines.append(f"❌ {update['message']}")
yield (0, f"❌ {update['message']}", "", "\n".join(log_lines), None, gr.update(visible=False))
return
elif update_type == "info":
log_lines.append(f"ℹ️ {update['message']}")
elif update_type == "progress":
progress = update["progress"]
current = update["current"]
total = update["total"]
elapsed = update["elapsed"]
remaining = update["estimated_remaining"]
progress_md = f"**{current}/{total}** ({progress:.1f}%) | ✅ {update['processed']} | ❌ {update['failed']}"
time_md = f"⏱️ {elapsed/60:.1f}m | باقیمانده: {remaining/60:.1f}m"
if current % 10 == 0:
log_lines.append(f"📊 {current}/{total}")
last_result = update.get("last_result", {})
if last_result.get("success"):
preview_data.append(["...", last_result.get("anonymized_text", "")[:80] + "...", "✅"])
if len(preview_data) > 5:
preview_data = preview_data[-5:]
yield (progress, progress_md, time_md, "\n".join(log_lines[-15:]), preview_data if preview_data else None, gr.update(visible=False))
elif update_type == "cancelled":
log_lines.append("⏹️ لغو شد")
yield (0, "⏹️ لغو شد", "", "\n".join(log_lines), preview_data if preview_data else None, gr.update(visible=False))
return
elif update_type == "complete":
log_lines.append(f"✅ {update['message']}")
progress_md = f"✅ تکمیل! {update['processed']}/{update['total']} موفق | {update['total_time']/60:.1f} دقیقه"
yield (100, progress_md, "", "\n".join(log_lines), preview_data if preview_data else None, gr.update(value=update['output_path'], visible=True))
def cancel():
if batch_processor["instance"]:
batch_processor["instance"].cancel()
return "⏹️ لغو شد..."
# اتصالات
single_provider.change(update_models, [single_provider], [single_model])
batch_provider.change(update_models, [batch_provider], [batch_model])
csv_file.change(update_columns, [csv_file], [text_column])
single_btn.click(process_single, [single_provider, single_api_key, single_model, single_input], [single_output, single_stats])
start_btn.click(
start_batch,
[csv_file, text_column, output_column, delay_slider, rpm_slider, retries_slider, batch_provider, batch_api_key, batch_model],
[progress_bar, progress_text, time_stats, process_log, preview_table, output_file]
)
cancel_btn.click(cancel, outputs=[process_log])
return interface
if __name__ == "__main__":
interface = create_interface()
interface.launch(server_name="0.0.0.0", server_port=7860, share=True, show_error=True)