import requests import json import gradio as gr from typing import Dict, Any, List, Generator, Optional import os from dataclasses import dataclass import re import pandas as pd import time from datetime import datetime import threading from queue import Queue import io from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry from enum import Enum class APIProvider(Enum): """پرووایدرهای API پشتیبانی شده""" CEREBRAS = "cerebras" GROQ = "groq" OPENAI = "openai" @dataclass class APIConfig: """تنظیمات API""" provider: APIProvider api_key: str base_url: str model: str max_tokens: int = 2000 temperature: float = 0.1 # تنظیمات پیش‌فرض برای هر پرووایدر API_CONFIGS = { APIProvider.CEREBRAS: { "base_url": "https://api.cerebras.ai/v1", "models": ["qwen-3-32b", "llama-4-scout-17b-16e-instruct", "llama3.1-8b"], "default_model": "qwen-3-32b" }, APIProvider.GROQ: { "base_url": "https://api.groq.com/openai/v1", "models": ["llama-3.3-70b-versatile", "llama-3.1-8b-instant", "mixtral-8x7b-32768"], "default_model": "llama-3.3-70b-versatile" }, APIProvider.OPENAI: { "base_url": "https://api.openai.com/v1", "models": ["gpt-4o-mini", "gpt-4o", "gpt-3.5-turbo"], "default_model": "gpt-4o-mini" } } @dataclass class RateLimitConfig: """تنظیمات محدودیت نرخ درخواست""" requests_per_minute: int = 30 min_delay_between_requests: float = 2.0 max_retries: int = 5 initial_backoff: float = 5.0 max_backoff: float = 60.0 backoff_multiplier: float = 2.0 recovery_window: float = 60.0 class RateLimiter: """مدیریت محدودیت نرخ درخواست""" def __init__(self, config: RateLimitConfig): self.config = config self.request_times: List[float] = [] self.lock = threading.Lock() self.consecutive_failures = 0 self.last_failure_time = 0 self.last_success_time = time.time() self.total_429_errors = 0 def wait_if_needed(self) -> float: """انتظار تا زمان مجاز ارسال درخواست بعدی""" with self.lock: now = time.time() # پاک کردن درخواست‌های قدیمی self.request_times = [t for t in self.request_times if now - t < 60] # بازیابی از خطاهای قبلی if self.consecutive_failures > 0 and (now - self.last_failure_time) > self.config.recovery_window: self.consecutive_failures = max(0, self.consecutive_failures - 1) wait_time = 0.0 # محدودیت درخواست در دقیقه if len(self.request_times) >= self.config.requests_per_minute: oldest_request = min(self.request_times) wait_time = max(wait_time, 60 - (now - oldest_request) + 1) # حداقل تأخیر if self.request_times: time_since_last = now - max(self.request_times) if time_since_last < self.config.min_delay_between_requests: wait_time = max(wait_time, self.config.min_delay_between_requests - time_since_last) # backoff برای خطاها if self.consecutive_failures > 0: failure_wait = min( self.config.initial_backoff * (self.config.backoff_multiplier ** min(self.consecutive_failures, 4)), self.config.max_backoff ) wait_time = max(wait_time, failure_wait) if wait_time > 0: time.sleep(wait_time) self.request_times.append(time.time()) return wait_time def report_success(self): with self.lock: self.last_success_time = time.time() self.consecutive_failures = 0 def report_failure(self, is_rate_limit: bool = False): with self.lock: self.last_failure_time = time.time() if is_rate_limit: self.consecutive_failures += 1 self.total_429_errors += 1 else: self.consecutive_failures = min(self.consecutive_failures + 0.5, 3) def get_estimated_wait_time(self) -> float: with self.lock: now = time.time() self.request_times = [t for t in self.request_times if now - t < 60] if len(self.request_times) >= self.config.requests_per_minute: return max(0, 60 - (now - min(self.request_times)) + 1) return self.config.min_delay_between_requests def get_status(self) -> Dict[str, Any]: with self.lock: return { "consecutive_failures": self.consecutive_failures, "total_429_errors": self.total_429_errors, "requests_in_last_minute": len(self.request_times) } class MultiProviderAnonymizer: """سیستم ناشناس‌سازی با پشتیبانی از چند API""" def __init__( self, provider: APIProvider, api_key: str, model: str = None, rate_limit_config: RateLimitConfig = None ): self.provider = provider self.api_key = api_key provider_config = API_CONFIGS[provider] self.base_url = provider_config["base_url"] self.model = model or provider_config["default_model"] self.rate_limit_config = rate_limit_config or RateLimitConfig() self.rate_limiter = RateLimiter(self.rate_limit_config) self.system_prompt = self._create_system_prompt() self.session = self._create_session() def _create_session(self) -> requests.Session: """ایجاد session با تنظیمات بهینه""" session = requests.Session() retry_strategy = Retry( total=3, backoff_factor=1, status_forcelist=[500, 502, 503, 504], allowed_methods=["POST"] ) adapter = HTTPAdapter(max_retries=retry_strategy, pool_connections=10, pool_maxsize=10) session.mount("https://", adapter) session.mount("http://", adapter) return session def _create_system_prompt(self) -> str: """دستورالعمل سیستمی""" return """شما یک «ناشناس‌ساز متون مالی/خبری فارسی» هستید. وظیفه‌تان جایگزینی اسامی خاص و مقادیر عددی با شناسه‌های بی‌معناست. ## **قوانین اندیس‌گذاری** - شرکت‌ها: company-01, company-02, ... (پیوسته) - اشخاص: person-01, person-02, ... (پیوسته) - اعداد: amount-01, amount-02, ... (پیوسته) - درصدها: percent-01, percent-02, ... (پیوسته) ## **ثبات شناسه‌ها:** - اگر "همراه اول" اول‌بار company-01 شد، در تمام متن همان باشد ## **تشخیص صحیح:** - **شرکت/سازمان:** همراه اول، بانک ملی، ایران‌خودرو، سایپا، بانک مرکزی - **گروه‌ها:** "گروه همراه اول" → company-XX (نه group-XX) - **کلمات عمومی:** "سه شرکت"، "چند بانک" → حفظ شوند - **شخص:** مهدی اخوان بهابادی، محمدرضا فرزین - **عدد:** 37، 70، 677 (هر عددی) - **درصد:** 37 درصدی، 15% ## **مثال:** **ورودی:** مهدی اخوان، مدیرعامل همراه اول، اعلام کرد درآمد با رشد 37 درصدی به 70 میلیارد رسید. **خروجی:** person-01، مدیرعامل company-01، اعلام کرد درآمد با رشد percent-01 به amount-01 رسید. ## **حفظ شود:** - تاریخ‌ها: 1404/04/23 - دوره‌های زمانی: ۹ ماهه، ۵ ماهه سال - واحدها: میلیارد تومان، همت - مکان‌ها: تهران، اصفهان **فقط متن ناشناس‌شده را برگردان - بدون توضیح اضافی.** """ def _make_api_request(self, text: str) -> Dict[str, Any]: """ارسال درخواست به API""" headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" } payload = { "messages": [ {"role": "system", "content": self.system_prompt}, {"role": "user", "content": text} ], "model": self.model, "temperature": 0.1, "max_tokens": 2000 } # اضافه کردن پارامترهای خاص هر پرووایدر if self.provider == APIProvider.GROQ: # Groq نیاز به این پارامتر نداره pass last_error = None for attempt in range(self.rate_limit_config.max_retries): self.rate_limiter.wait_if_needed() try: response = self.session.post( f"{self.base_url}/chat/completions", headers=headers, json=payload, timeout=90 ) if response.status_code == 429: self.rate_limiter.report_failure(is_rate_limit=True) retry_after = response.headers.get('Retry-After') wait_seconds = int(retry_after) if retry_after else min( self.rate_limit_config.initial_backoff * (self.rate_limit_config.backoff_multiplier ** attempt), 60.0 ) last_error = f"Rate limit (429). تلاش {attempt + 1}/{self.rate_limit_config.max_retries}" print(f"🚫 {last_error} - انتظار {wait_seconds}s") time.sleep(wait_seconds) continue if response.status_code == 401: raise Exception(f"❌ کلید API نامعتبر است برای {self.provider.value}") if response.status_code == 503: self.rate_limiter.report_failure(is_rate_limit=False) last_error = f"سرویس در دسترس نیست (503)" time.sleep(10) continue response.raise_for_status() self.rate_limiter.report_success() return response.json() except requests.exceptions.Timeout: self.rate_limiter.report_failure(is_rate_limit=False) last_error = f"Timeout" time.sleep(5) except requests.exceptions.ConnectionError as e: self.rate_limiter.report_failure(is_rate_limit=False) last_error = f"خطای اتصال: {str(e)[:50]}" time.sleep(10) except requests.exceptions.RequestException as e: self.rate_limiter.report_failure(is_rate_limit=False) last_error = f"خطای شبکه: {str(e)[:50]}" time.sleep(5) raise Exception(f"ناموفق پس از {self.rate_limit_config.max_retries} تلاش: {last_error}") def anonymize_text(self, text: str) -> Dict[str, Any]: """ناشناس‌سازی متن""" if not text or not text.strip(): return {"success": False, "error": "متن ورودی خالی است", "anonymized_text": ""} try: response = self._make_api_request(text) if "choices" not in response or not response["choices"]: return {"success": False, "error": "پاسخ نامعتبر", "anonymized_text": ""} content = response["choices"][0]["message"]["content"] # پاک کردن markdown if "```" in content: lines = content.split('\n') clean_lines = [] skip = False for line in lines: if line.strip().startswith('```'): skip = not skip continue if not skip: clean_lines.append(line) content = '\n'.join(clean_lines) content = content.strip() # پاک کردن تگ‌های thinking (برای مدل‌های Qwen و مشابه) content = re.sub(r'.*?', '', content, flags=re.DOTALL) content = re.sub(r'.*?', '', content, flags=re.DOTALL) content = content.strip() # تحلیل نتایج companies = re.findall(r'company-(\d+)', content) persons = re.findall(r'person-(\d+)', content) amounts = re.findall(r'amount-(\d+)', content) percents = re.findall(r'percent-(\d+)', content) return { "success": True, "anonymized_text": content, "statistics": { "company": len(set(companies)), "person": len(set(persons)), "amount": len(set(amounts)), "percent": len(set(percents)) }, "usage": response.get("usage", {}) } except Exception as e: return {"success": False, "error": f"خطا: {str(e)}", "anonymized_text": ""} class BatchProcessor: """پردازشگر دسته‌ای""" def __init__( self, provider: APIProvider, api_key: str, model: str = None, rate_limit_config: RateLimitConfig = None ): self.provider = provider self.api_key = api_key self.model = model self.rate_limit_config = rate_limit_config or RateLimitConfig() self.anonymizer = None self.is_cancelled = False self.current_progress = 0 self.total_rows = 0 self.processed_rows = 0 self.failed_rows = 0 self.start_time = None self.consecutive_api_failures = 0 self.max_consecutive_failures = 10 def cancel(self): self.is_cancelled = True def reset(self): self.is_cancelled = False self.current_progress = 0 self.total_rows = 0 self.processed_rows = 0 self.failed_rows = 0 self.start_time = None self.consecutive_api_failures = 0 def process_csv( self, file_path: str, text_column: str, output_column: str = "anonymized_text" ) -> Generator[Dict[str, Any], None, None]: """پردازش فایل CSV""" self.reset() self.start_time = time.time() # خواندن فایل try: df = pd.read_csv(file_path, encoding='utf-8') except UnicodeDecodeError: try: df = pd.read_csv(file_path, encoding='utf-8-sig') except: df = pd.read_csv(file_path, encoding='cp1256') if text_column not in df.columns: yield {"type": "error", "message": f"ستون '{text_column}' یافت نشد"} return self.total_rows = len(df) # ایجاد anonymizer self.anonymizer = MultiProviderAnonymizer( provider=self.provider, api_key=self.api_key, model=self.model, rate_limit_config=self.rate_limit_config ) df[output_column] = "" df["anonymization_status"] = "" df["entities_found"] = "" yield { "type": "info", "message": f"🚀 شروع پردازش {self.total_rows} ردیف با {self.provider.value}..." } for idx, row in df.iterrows(): if self.is_cancelled: yield { "type": "cancelled", "message": "لغو شد", "processed": self.processed_rows, "failed": self.failed_rows } break if self.consecutive_api_failures >= self.max_consecutive_failures: partial_path = file_path.replace('.csv', f'_partial_{idx}.csv') df.to_csv(partial_path, index=False, encoding='utf-8-sig') yield { "type": "error", "message": f"❌ توقف - خطاهای متوالی زیاد. ذخیره شد: {partial_path}" } return text = str(row[text_column]) if pd.notna(row[text_column]) else "" if not text.strip(): df.at[idx, output_column] = "" df.at[idx, "anonymization_status"] = "خالی" self.processed_rows += 1 continue result = self.anonymizer.anonymize_text(text) if result["success"]: df.at[idx, output_column] = result["anonymized_text"] df.at[idx, "anonymization_status"] = "موفق" stats = result.get("statistics", {}) df.at[idx, "entities_found"] = f"C:{stats.get('company',0)}|P:{stats.get('person',0)}|A:{stats.get('amount',0)}|%:{stats.get('percent',0)}" self.processed_rows += 1 self.consecutive_api_failures = 0 else: df.at[idx, output_column] = f"خطا: {result.get('error', '')}" df.at[idx, "anonymization_status"] = "ناموفق" self.failed_rows += 1 if "rate limit" in result.get('error', '').lower() or "429" in result.get('error', ''): self.consecutive_api_failures += 1 # پیشرفت self.current_progress = (idx + 1) / self.total_rows * 100 elapsed = time.time() - self.start_time yield { "type": "progress", "current": idx + 1, "total": self.total_rows, "progress": self.current_progress, "processed": self.processed_rows, "failed": self.failed_rows, "elapsed": elapsed, "estimated_remaining": (elapsed / (idx + 1)) * (self.total_rows - idx - 1), "next_wait": self.anonymizer.rate_limiter.get_estimated_wait_time(), "last_result": result, "rate_status": self.anonymizer.rate_limiter.get_status() } # Checkpoint هر 100 ردیف if (idx + 1) % 100 == 0: checkpoint_path = file_path.replace('.csv', f'_checkpoint_{idx+1}.csv') df.to_csv(checkpoint_path, index=False, encoding='utf-8-sig') yield {"type": "info", "message": f"💾 Checkpoint: {checkpoint_path}"} # ذخیره نهایی if not self.is_cancelled: output_path = file_path.replace('.csv', '_anonymized.csv') if output_path == file_path: output_path = file_path + '_anonymized.csv' df.to_csv(output_path, index=False, encoding='utf-8-sig') yield { "type": "complete", "message": "✅ تکمیل شد!", "output_path": output_path, "total": self.total_rows, "processed": self.processed_rows, "failed": self.failed_rows, "total_time": time.time() - self.start_time } def create_interface(): """ایجاد رابط کاربری""" custom_css = """ .rtl-text { direction: rtl; text-align: right; font-family: 'Vazirmatn', 'Tahoma', sans-serif; } """ with gr.Blocks(css=custom_css, title="ناشناس‌ساز چند-API", theme=gr.themes.Soft()) as interface: gr.Markdown(""" # 🔒 سیستم ناشناس‌سازی متون فارسی ### پشتیبانی از Cerebras، Groq و OpenAI """, elem_classes=["rtl-text"]) batch_processor = {"instance": None} with gr.Tabs(): # تب پردازش تکی with gr.Tab("پردازش تکی"): with gr.Row(): with gr.Column(): single_provider = gr.Dropdown( label="🌐 پرووایدر API", choices=["cerebras", "groq", "openai"], value="groq", elem_classes=["rtl-text"] ) single_api_key = gr.Textbox( label="🔑 کلید API", placeholder="کلید API خود را وارد کنید", type="password" ) single_model = gr.Dropdown( label="🤖 مدل", choices=["llama-3.3-70b-versatile", "llama-3.1-8b-instant"], value="llama-3.3-70b-versatile" ) single_input = gr.Textbox( label="متن ورودی", lines=5, elem_classes=["rtl-text"] ) single_btn = gr.Button("🔄 ناشناس‌سازی", variant="primary") with gr.Column(): single_output = gr.Textbox(label="متن ناشناس‌شده", lines=5, elem_classes=["rtl-text"]) single_stats = gr.Textbox(label="آمار", lines=2) # تب پردازش دسته‌ای with gr.Tab("پردازش دسته‌ای"): with gr.Row(): with gr.Column(): batch_provider = gr.Dropdown( label="🌐 پرووایدر API", choices=["cerebras", "groq", "openai"], value="groq" ) batch_api_key = gr.Textbox( label="🔑 کلید API", type="password" ) batch_model = gr.Dropdown( label="🤖 مدل", choices=["llama-3.3-70b-versatile", "qwen-3-32b", "gpt-4o-mini"], value="llama-3.3-70b-versatile" ) csv_file = gr.File(label="📁 فایل CSV", file_types=[".csv"]) text_column = gr.Dropdown(label="ستون متن", choices=[], allow_custom_value=True) output_column = gr.Textbox(label="نام ستون خروجی", value="anonymized_text") with gr.Column(): delay_slider = gr.Slider(1, 30, 3, step=0.5, label="تأخیر (ثانیه)") rpm_slider = gr.Slider(5, 30, 20, step=1, label="حداکثر درخواست/دقیقه") retries_slider = gr.Slider(1, 10, 5, step=1, label="تلاش مجدد") with gr.Row(): start_btn = gr.Button("🚀 شروع", variant="primary", size="lg") cancel_btn = gr.Button("⏹️ لغو", variant="stop", size="lg") progress_bar = gr.Slider(0, 100, 0, label="پیشرفت", interactive=False) progress_text = gr.Markdown("در انتظار...") time_stats = gr.Markdown("") process_log = gr.Textbox(lines=10, label="لاگ") preview_table = gr.Dataframe(headers=["متن", "ناشناس‌شده", "وضعیت"]) output_file = gr.File(label="دانلود", visible=False) # توابع def update_models(provider): if provider == "cerebras": return gr.update(choices=["qwen-3-32b", "llama-4-scout-17b-16e-instruct"], value="qwen-3-32b") elif provider == "groq": return gr.update(choices=["llama-3.3-70b-versatile", "llama-3.1-8b-instant", "mixtral-8x7b-32768"], value="llama-3.3-70b-versatile") else: return gr.update(choices=["gpt-4o-mini", "gpt-4o"], value="gpt-4o-mini") def update_columns(file): if file is None: return gr.update(choices=[], value=None) try: df = pd.read_csv(file.name, nrows=0, encoding='utf-8') except: try: df = pd.read_csv(file.name, nrows=0, encoding='utf-8-sig') except: df = pd.read_csv(file.name, nrows=0, encoding='cp1256') columns = list(df.columns) return gr.update(choices=columns, value=columns[0] if columns else None) def process_single(provider, api_key, model, text): if not text.strip(): return "", "❌ متن خالی" if not api_key: return "", "❌ کلید API وارد نشده" try: prov = APIProvider(provider) anonymizer = MultiProviderAnonymizer(provider=prov, api_key=api_key, model=model) result = anonymizer.anonymize_text(text) if result["success"]: stats = result.get("statistics", {}) return result["anonymized_text"], f"✅ C:{stats.get('company',0)} | P:{stats.get('person',0)} | A:{stats.get('amount',0)} | %:{stats.get('percent',0)}" else: return "", f"❌ {result.get('error', '')}" except Exception as e: return "", f"❌ {str(e)}" def start_batch(file, text_col, output_col, delay, rpm, retries, provider, api_key, model): if file is None: yield (0, "❌ فایل انتخاب نشده", "", "", None, gr.update(visible=False)) return if not api_key: yield (0, "❌ کلید API وارد نشده", "", "", None, gr.update(visible=False)) return prov = APIProvider(provider) rate_config = RateLimitConfig( requests_per_minute=int(rpm), min_delay_between_requests=float(delay), max_retries=int(retries) ) processor = BatchProcessor(provider=prov, api_key=api_key, model=model, rate_limit_config=rate_config) batch_processor["instance"] = processor log_lines = [] preview_data = [] for update in processor.process_csv(file.name, text_col, output_col): update_type = update.get("type") if update_type == "error": log_lines.append(f"❌ {update['message']}") yield (0, f"❌ {update['message']}", "", "\n".join(log_lines), None, gr.update(visible=False)) return elif update_type == "info": log_lines.append(f"ℹ️ {update['message']}") elif update_type == "progress": progress = update["progress"] current = update["current"] total = update["total"] elapsed = update["elapsed"] remaining = update["estimated_remaining"] progress_md = f"**{current}/{total}** ({progress:.1f}%) | ✅ {update['processed']} | ❌ {update['failed']}" time_md = f"⏱️ {elapsed/60:.1f}m | باقیمانده: {remaining/60:.1f}m" if current % 10 == 0: log_lines.append(f"📊 {current}/{total}") last_result = update.get("last_result", {}) if last_result.get("success"): preview_data.append(["...", last_result.get("anonymized_text", "")[:80] + "...", "✅"]) if len(preview_data) > 5: preview_data = preview_data[-5:] yield (progress, progress_md, time_md, "\n".join(log_lines[-15:]), preview_data if preview_data else None, gr.update(visible=False)) elif update_type == "cancelled": log_lines.append("⏹️ لغو شد") yield (0, "⏹️ لغو شد", "", "\n".join(log_lines), preview_data if preview_data else None, gr.update(visible=False)) return elif update_type == "complete": log_lines.append(f"✅ {update['message']}") progress_md = f"✅ تکمیل! {update['processed']}/{update['total']} موفق | {update['total_time']/60:.1f} دقیقه" yield (100, progress_md, "", "\n".join(log_lines), preview_data if preview_data else None, gr.update(value=update['output_path'], visible=True)) def cancel(): if batch_processor["instance"]: batch_processor["instance"].cancel() return "⏹️ لغو شد..." # اتصالات single_provider.change(update_models, [single_provider], [single_model]) batch_provider.change(update_models, [batch_provider], [batch_model]) csv_file.change(update_columns, [csv_file], [text_column]) single_btn.click(process_single, [single_provider, single_api_key, single_model, single_input], [single_output, single_stats]) start_btn.click( start_batch, [csv_file, text_column, output_column, delay_slider, rpm_slider, retries_slider, batch_provider, batch_api_key, batch_model], [progress_bar, progress_text, time_stats, process_log, preview_table, output_file] ) cancel_btn.click(cancel, outputs=[process_log]) return interface if __name__ == "__main__": interface = create_interface() interface.launch(server_name="0.0.0.0", server_port=7860, share=True, show_error=True)