import requests import json import gradio as gr from typing import Dict, Any, List, Generator import os from dataclasses import dataclass import re import pandas as pd import time from datetime import datetime import threading from queue import Queue import io from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry @dataclass class CerebrasConfig: """تنظیمات Cerebras API""" api_key: str base_url: str = "https://api.cerebras.ai/v1" model: str = "qwen-3-32b" max_tokens: int = 2000 temperature: float = 0.1 @dataclass class RateLimitConfig: """تنظیمات محدودیت نرخ درخواست برای Cerebras""" requests_per_minute: int = 30 tokens_per_minute: int = 60000 min_delay_between_requests: float = 2.0 max_retries: int = 5 initial_backoff: float = 5.0 max_backoff: float = 120.0 backoff_multiplier: float = 2.0 # تنظیمات جدید برای بازیابی بهتر recovery_window: float = 60.0 # زمان پاک شدن شمارنده خطا (ثانیه) quota_exhausted_wait: float = 300.0 # زمان انتظار وقتی quota تمام شده (5 دقیقه) class RateLimiter: """مدیریت محدودیت نرخ درخواست با بازیابی بهتر""" def __init__(self, config: RateLimitConfig): self.config = config self.request_times: List[float] = [] self.lock = threading.Lock() self.consecutive_failures = 0 self.last_failure_time = 0 self.last_success_time = time.time() self.total_429_errors = 0 self.is_quota_exhausted = False def wait_if_needed(self) -> float: """انتظار تا زمان مجاز ارسال درخواست بعدی""" with self.lock: now = time.time() # اگر quota تمام شده و زمان کافی گذشته، ریست کن if self.is_quota_exhausted: if now - self.last_failure_time > self.config.quota_exhausted_wait: self.is_quota_exhausted = False self.consecutive_failures = 0 self.total_429_errors = 0 print(f"🔄 ریست محدودیت quota - زمان انتظار گذشت") else: remaining_wait = self.config.quota_exhausted_wait - (now - self.last_failure_time) print(f"⏳ انتظار برای بازیابی quota: {remaining_wait:.0f} ثانیه باقیمانده") time.sleep(min(remaining_wait, 30)) # حداکثر 30 ثانیه منتظر بمان return remaining_wait # پاک کردن درخواست‌های قدیمی‌تر از 1 دقیقه self.request_times = [t for t in self.request_times if now - t < 60] # بازیابی از خطاهای قبلی اگر زمان کافی گذشته if self.consecutive_failures > 0 and (now - self.last_failure_time) > self.config.recovery_window: old_failures = self.consecutive_failures self.consecutive_failures = max(0, self.consecutive_failures - 1) print(f"🔄 کاهش شمارنده خطا از {old_failures} به {self.consecutive_failures}") wait_time = 0.0 # اگر به محدودیت درخواست در دقیقه رسیده‌ایم if len(self.request_times) >= self.config.requests_per_minute: oldest_request = min(self.request_times) wait_time = max(wait_time, 60 - (now - oldest_request) + 1) # حداقل تأخیر بین درخواست‌ها if self.request_times: time_since_last = now - max(self.request_times) if time_since_last < self.config.min_delay_between_requests: wait_time = max(wait_time, self.config.min_delay_between_requests - time_since_last) # افزایش تأخیر در صورت خطاهای متوالی (با سقف کمتر) if self.consecutive_failures > 0: failure_wait = min( self.config.initial_backoff * (self.config.backoff_multiplier ** min(self.consecutive_failures, 4)), 60.0 # حداکثر 60 ثانیه بجای 120 ) wait_time = max(wait_time, failure_wait) if wait_time > 0: print(f"⏳ انتظار {wait_time:.1f} ثانیه قبل از درخواست بعدی...") time.sleep(wait_time) self.request_times.append(time.time()) return wait_time def report_success(self): """گزارش موفقیت درخواست""" with self.lock: self.last_success_time = time.time() # ریست کامل‌تر بعد از موفقیت if self.consecutive_failures > 0: print(f"✅ موفقیت - ریست شمارنده خطا از {self.consecutive_failures}") self.consecutive_failures = 0 self.is_quota_exhausted = False def report_failure(self, is_rate_limit: bool = False, status_code: int = None): """گزارش شکست درخواست""" with self.lock: self.last_failure_time = time.time() if is_rate_limit: self.consecutive_failures += 1 self.total_429_errors += 1 # اگر خطاهای 429 زیاد شد، quota رو exhausted بزن if self.total_429_errors >= 5 and self.consecutive_failures >= 3: self.is_quota_exhausted = True print(f"🚫 quota احتمالاً تمام شده - {self.total_429_errors} خطای 429") else: self.consecutive_failures = min(self.consecutive_failures + 0.5, 3) def get_estimated_wait_time(self) -> float: """تخمین زمان انتظار برای درخواست بعدی""" with self.lock: now = time.time() self.request_times = [t for t in self.request_times if now - t < 60] if self.is_quota_exhausted: return self.config.quota_exhausted_wait - (now - self.last_failure_time) if len(self.request_times) >= self.config.requests_per_minute: oldest_request = min(self.request_times) return max(0, 60 - (now - oldest_request) + 1) return self.config.min_delay_between_requests def get_status(self) -> Dict[str, Any]: """وضعیت فعلی rate limiter""" with self.lock: return { "consecutive_failures": self.consecutive_failures, "total_429_errors": self.total_429_errors, "is_quota_exhausted": self.is_quota_exhausted, "requests_in_last_minute": len(self.request_times) } class AdvancedCerebrasAnonymizer: """سیستم پیشرفته ناشناس‌سازی متون مالی/خبری فارسی""" def __init__(self, api_key: str = None, rate_limit_config: RateLimitConfig = None): if api_key is None: api_key = os.getenv("CEREBRAS_API_KEY") if not api_key: raise ValueError("کلید API یافت نشد") self.config = CerebrasConfig(api_key=api_key) self.rate_limit_config = rate_limit_config or RateLimitConfig() self.rate_limiter = RateLimiter(self.rate_limit_config) self.system_prompt = self._create_advanced_system_prompt() # ایجاد session با connection pooling و retry self.session = self._create_session() def _create_session(self) -> requests.Session: """ایجاد session با تنظیمات بهینه""" session = requests.Session() # تنظیم retry strategy retry_strategy = Retry( total=3, backoff_factor=1, status_forcelist=[500, 502, 503, 504], allowed_methods=["POST"] ) adapter = HTTPAdapter( max_retries=retry_strategy, pool_connections=10, pool_maxsize=10 ) session.mount("https://", adapter) session.mount("http://", adapter) return session def _create_advanced_system_prompt(self) -> str: """ایجاد دستورالعمل سیستمی پیشرفته برای Cerebras""" return """شما یک «ناشناس‌ساز متون مالی/خبری فارسی» هستید. وظیفه‌تان جایگزینی اسامی خاص و مقادیر عددی با شناسه‌های بی‌معناست. ## **قوانین اندیس‌گذاری - CRITICAL** ### **1. ترتیب شماره‌گذاری الزامی:** - شرکت‌ها: company-01, company-02, company-03, company-04, ... (پیوسته و بدون گپ) - اشخاص: person-01, person-02, person-03, ... (پیوسته و بدون گپ) - اعداد: amount-01, amount-02, amount-03, ... (پیوسته و بدون گپ) - درصدها: percent-01, percent-02, percent-03, ... (پیوسته و بدون گپ) ### **2. ثبات شناسه‌ها در متن:** - اگر "همراه اول" اول‌بار company-01 شد، در تمام متن همان باشد - اگر "مهدی احمدی" اول‌بار person-01 شد، در تمام متن همان باشد ### **3. تشخیص صحیح انواع:** **شرکت/سازمان:** همراه اول، بانک ملی، ایران‌خودرو، سایپا، بانک مرکزی، سامانه کدال، وزارت نفت، سازمان تنظیم مقررات رادیویی، سازمان تامین اجتماعی **⚠️ CRITICAL - گروه‌ها:** "گروه همراه اول"، "گروه اقتصادی آزادگان"، "گروه مالی صبا" → همه company-XX هستند (نه group-XX) **⚠️ CRITICAL - کلمات عمومی:** "سه شرکت دارویی"، "چند بانک"، "یک شرکت" → کلمات عمومی هستند، موجودیت نیستند (حفظ شوند) **⚠️ CRITICAL - نام‌های مستعار:** "فاما" همان "فولاد مبارکه اصفهان" است → هر دو company-01 **شخص:** مهدی اخوان بهابادی، محمدرضا فرزین، ابوالفضل نجارزاده **عدد:** 37، 70، 677، 73.7، 178 (هر عددی) **درصد:** 37 درصدی، 15 درصدی، 53 درصد، 43% ## **مثال‌های صحیح:** ### **مثال 1 (الگوی کامل):** **ورودی:** مهدی اخوان بهابادی، مدیرعامل همراه اول، اعلام کرد درآمد عملیاتی شرکت با رشد 37 درصدی به 70 هزار و 677 میلیارد تومان رسیده است. **خروجی صحیح:** person-01، مدیرعامل company-01، اعلام کرد درآمد عملیاتی شرکت با رشد percent-01 به amount-01 رسیده است. ### **مثال 2:** **ورودی:** بانک مرکزی و بانک ملی با همکاری محمدرضا فرزین، 60 درصد سپرده‌ها را مدیریت کردند. **خروجی:** company-01 و company-02 با همکاری person-01، percent-01 سپرده‌ها را مدیریت کردند. ## **⚠️ CRITICAL - دوره‌های زمانی را حفظ کن:** - "۹ ماهه" → حفظ شود (نه amount-XX) - "۵ ماهه سال" → حفظ شود (نه amount-XX) - "۳ ماهه اول" → حفظ شود (نه amount-XX) ## **موارد حفظ شده:** - تاریخ‌ها: 1404/04/23، 30 آذر 1403، پاییز 1401 - فصل‌های سال: پاییز، بهار، تابستان، زمستان - عناوین شغلی: مدیرعامل، رئیس کل، مدیرکل - واحدها: میلیارد تومان، همت، ریال، ماه، سال - مکان‌ها: تهران، اصفهان، ایران **فقط متن ناشناس‌شده را برگردان - هیچ توضیح اضافی نیاز نیست.** """ def _make_api_request_with_retry(self, text: str) -> Dict[str, Any]: """ارسال درخواست به Cerebras API با مدیریت rate limit و retry""" headers = { "Authorization": f"Bearer {self.config.api_key}", "Content-Type": "application/json" } payload = { "messages": [ {"role": "system", "content": self.system_prompt}, {"role": "user", "content": text} ], "model": self.config.model, "temperature": self.config.temperature, "max_tokens": self.config.max_tokens } last_error = None for attempt in range(self.rate_limit_config.max_retries): # انتظار قبل از ارسال درخواست wait_time = self.rate_limiter.wait_if_needed() # چک کردن وضعیت rate limiter status = self.rate_limiter.get_status() if status["is_quota_exhausted"]: print(f"⚠️ quota تمام شده - انتظار برای بازیابی...") try: response = self.session.post( f"{self.config.base_url}/chat/completions", headers=headers, json=payload, timeout=90 # افزایش timeout به 90 ثانیه ) # بررسی خطای rate limit (429) if response.status_code == 429: self.rate_limiter.report_failure(is_rate_limit=True, status_code=429) # استخراج زمان انتظار از هدر retry_after = response.headers.get('Retry-After') if retry_after: wait_seconds = int(retry_after) else: wait_seconds = min( self.rate_limit_config.initial_backoff * (self.rate_limit_config.backoff_multiplier ** attempt), 60.0 # کاهش حداکثر انتظار ) last_error = f"محدودیت نرخ درخواست (429). تلاش {attempt + 1}/{self.rate_limit_config.max_retries}. انتظار {wait_seconds:.1f} ثانیه..." print(f"🚫 {last_error}") time.sleep(wait_seconds) continue # بررسی سایر خطاها if response.status_code == 401: raise Exception("کلید API نامعتبر است") if response.status_code == 503: self.rate_limiter.report_failure(is_rate_limit=False, status_code=503) last_error = f"سرویس موقتاً در دسترس نیست (503). تلاش {attempt + 1}/{self.rate_limit_config.max_retries}" print(f"⚠️ {last_error}") time.sleep(10) continue response.raise_for_status() self.rate_limiter.report_success() return response.json() except requests.exceptions.Timeout: self.rate_limiter.report_failure(is_rate_limit=False) last_error = f"خطای timeout (90s). تلاش {attempt + 1}/{self.rate_limit_config.max_retries}" print(f"⏱️ {last_error}") time.sleep(self.rate_limit_config.initial_backoff) except requests.exceptions.ConnectionError as e: self.rate_limiter.report_failure(is_rate_limit=False) last_error = f"خطای اتصال: {str(e)[:50]}. تلاش {attempt + 1}/{self.rate_limit_config.max_retries}" print(f"🔌 {last_error}") time.sleep(self.rate_limit_config.initial_backoff * 2) # انتظار بیشتر برای مشکل شبکه except requests.exceptions.RequestException as e: self.rate_limiter.report_failure(is_rate_limit=False) last_error = f"خطای شبکه: {str(e)[:50]}. تلاش {attempt + 1}/{self.rate_limit_config.max_retries}" print(f"❌ {last_error}") time.sleep(self.rate_limit_config.initial_backoff) raise Exception(f"ناموفق پس از {self.rate_limit_config.max_retries} تلاش. آخرین خطا: {last_error}") def anonymize_text(self, text: str) -> Dict[str, Any]: """ناشناس‌سازی متن با استفاده از Cerebras""" if not text or not text.strip(): return { "success": False, "error": "متن ورودی خالی است", "anonymized_text": "" } try: response = self._make_api_request_with_retry(text) if "choices" not in response or not response["choices"]: return { "success": False, "error": "پاسخ نامعتبر از API", "anonymized_text": "" } content = response["choices"][0]["message"]["content"] content = self._clean_markdown(content) content = content.strip() analysis = self._analyze_anonymized_text(content) return { "success": True, "anonymized_text": content, "entities": analysis["entities"], "statistics": analysis["statistics"], "usage": response.get("usage", {}) } except Exception as e: return { "success": False, "error": f"خطا در پردازش: {str(e)}", "anonymized_text": "" } def _clean_markdown(self, content: str) -> str: """پاک کردن markdown از پاسخ""" if "```" in content: lines = content.split('\n') clean_lines = [] skip = False for line in lines: if line.strip().startswith('```'): skip = not skip continue if not skip: clean_lines.append(line) content = '\n'.join(clean_lines) return content def _analyze_anonymized_text(self, text: str) -> Dict[str, Any]: """تحلیل متن ناشناس‌سازی شده""" companies = re.findall(r'company-(\d+)', text) persons = re.findall(r'person-(\d+)', text) amounts = re.findall(r'amount-(\d+)', text) percents = re.findall(r'percent-(\d+)', text) statistics = { "company": len(set(companies)), "person": len(set(persons)), "amount": len(set(amounts)), "percent": len(set(percents)), "total_replacements": len(companies) + len(persons) + len(amounts) + len(percents) } entities = { "companies": sorted(list(set(companies)), key=lambda x: int(x)), "persons": sorted(list(set(persons)), key=lambda x: int(x)), "amounts": sorted(list(set(amounts)), key=lambda x: int(x)), "percents": sorted(list(set(percents)), key=lambda x: int(x)) } return { "statistics": statistics, "entities": entities } class BatchProcessor: """پردازشگر دسته‌ای فایل‌های CSV""" def __init__(self, api_key: str, rate_limit_config: RateLimitConfig = None): self.api_key = api_key self.rate_limit_config = rate_limit_config or RateLimitConfig() self.anonymizer = None self.is_cancelled = False self.current_progress = 0 self.total_rows = 0 self.processed_rows = 0 self.failed_rows = 0 self.start_time = None self.consecutive_api_failures = 0 self.max_consecutive_failures = 10 # حداکثر خطاهای متوالی قبل از توقف def cancel(self): """لغو پردازش""" self.is_cancelled = True def reset(self): """بازنشانی وضعیت""" self.is_cancelled = False self.current_progress = 0 self.total_rows = 0 self.processed_rows = 0 self.failed_rows = 0 self.start_time = None self.consecutive_api_failures = 0 def process_csv( self, file_path: str, text_column: str, output_column: str = "anonymized_text", progress_callback=None ) -> Generator[Dict[str, Any], None, None]: """پردازش فایل CSV به صورت streaming""" self.reset() self.start_time = time.time() # خواندن فایل CSV try: df = pd.read_csv(file_path, encoding='utf-8') except UnicodeDecodeError: try: df = pd.read_csv(file_path, encoding='utf-8-sig') except: df = pd.read_csv(file_path, encoding='cp1256') if text_column not in df.columns: yield { "type": "error", "message": f"ستون '{text_column}' در فایل یافت نشد. ستون‌های موجود: {list(df.columns)}" } return self.total_rows = len(df) # ایجاد anonymizer self.anonymizer = AdvancedCerebrasAnonymizer( api_key=self.api_key, rate_limit_config=self.rate_limit_config ) # ایجاد ستون خروجی df[output_column] = "" df["anonymization_status"] = "" df["entities_found"] = "" yield { "type": "info", "message": f"🚀 شروع پردازش {self.total_rows} ردیف...", "total": self.total_rows } results = [] for idx, row in df.iterrows(): if self.is_cancelled: yield { "type": "cancelled", "message": "پردازش توسط کاربر لغو شد", "processed": self.processed_rows, "failed": self.failed_rows } break # چک کردن خطاهای متوالی if self.consecutive_api_failures >= self.max_consecutive_failures: yield { "type": "error", "message": f"❌ توقف به دلیل {self.consecutive_api_failures} خطای متوالی API. لطفاً API key را بررسی کنید یا چند دقیقه صبر کنید.", "processed": self.processed_rows, "failed": self.failed_rows } # ذخیره پیشرفت فعلی partial_output_path = file_path.replace('.csv', f'_partial_{idx}.csv') df.to_csv(partial_output_path, index=False, encoding='utf-8-sig') yield { "type": "info", "message": f"📁 پیشرفت ذخیره شد: {partial_output_path}" } return text = str(row[text_column]) if pd.notna(row[text_column]) else "" if not text.strip(): df.at[idx, output_column] = "" df.at[idx, "anonymization_status"] = "خالی" df.at[idx, "entities_found"] = "" self.processed_rows += 1 continue # پردازش متن result = self.anonymizer.anonymize_text(text) if result["success"]: df.at[idx, output_column] = result["anonymized_text"] df.at[idx, "anonymization_status"] = "موفق" stats = result.get("statistics", {}) entities_summary = f"شرکت:{stats.get('company',0)} | شخص:{stats.get('person',0)} | مبلغ:{stats.get('amount',0)} | درصد:{stats.get('percent',0)}" df.at[idx, "entities_found"] = entities_summary self.processed_rows += 1 self.consecutive_api_failures = 0 # ریست شمارنده خطا else: df.at[idx, output_column] = f"خطا: {result.get('error', 'نامشخص')}" df.at[idx, "anonymization_status"] = "ناموفق" df.at[idx, "entities_found"] = "" self.failed_rows += 1 # افزایش شمارنده خطای متوالی if "rate limit" in result.get('error', '').lower() or "429" in result.get('error', ''): self.consecutive_api_failures += 1 yield { "type": "warning", "message": f"⚠️ خطای rate limit ({self.consecutive_api_failures}/{self.max_consecutive_failures})" } # محاسبه پیشرفت و زمان باقیمانده self.current_progress = (idx + 1) / self.total_rows * 100 elapsed = time.time() - self.start_time avg_time_per_row = elapsed / (idx + 1) remaining_rows = self.total_rows - (idx + 1) estimated_remaining = avg_time_per_row * remaining_rows # تخمین زمان انتظار بعدی next_wait = self.anonymizer.rate_limiter.get_estimated_wait_time() # اطلاعات وضعیت rate limiter rate_status = self.anonymizer.rate_limiter.get_status() yield { "type": "progress", "current": idx + 1, "total": self.total_rows, "progress": self.current_progress, "processed": self.processed_rows, "failed": self.failed_rows, "elapsed": elapsed, "estimated_remaining": estimated_remaining, "next_wait": next_wait, "last_result": result, "rate_status": rate_status } # ذخیره میانی هر 100 ردیف if (idx + 1) % 100 == 0: checkpoint_path = file_path.replace('.csv', f'_checkpoint_{idx+1}.csv') df.to_csv(checkpoint_path, index=False, encoding='utf-8-sig') yield { "type": "info", "message": f"💾 Checkpoint ذخیره شد: {checkpoint_path}" } # ذخیره نتیجه نهایی if not self.is_cancelled: output_path = file_path.replace('.csv', '_anonymized.csv') if output_path == file_path: output_path = file_path + '_anonymized.csv' df.to_csv(output_path, index=False, encoding='utf-8-sig') total_time = time.time() - self.start_time yield { "type": "complete", "message": "✅ پردازش با موفقیت تکمیل شد!", "output_path": output_path, "total": self.total_rows, "processed": self.processed_rows, "failed": self.failed_rows, "total_time": total_time, "dataframe": df } def create_batch_interface(): """ایجاد رابط کاربری برای پردازش دسته‌ای""" api_key_available = bool(os.getenv("CEREBRAS_API_KEY")) custom_css = """ .rtl-text { direction: rtl; text-align: right; font-family: 'Vazirmatn', 'Tahoma', sans-serif; } .progress-box { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); border-radius: 10px; padding: 15px; color: white; } .status-success { color: #10B981; font-weight: bold; } .status-error { color: #EF4444; font-weight: bold; } .warning-box { background-color: #FEF3C7; border: 1px solid #F59E0B; border-radius: 8px; padding: 10px; margin: 10px 0; } """ with gr.Blocks(css=custom_css, title="ناشناس‌ساز متون فارسی - Cerebras", theme=gr.themes.Soft()) as interface: gr.Markdown(""" # 🔒 سیستم ناشناس‌سازی متون مالی/خبری فارسی ### نسخه بهبود یافته با مدیریت بهتر Rate Limit """, elem_classes=["rtl-text"]) # نمایش هشدار وضعیت with gr.Row(): status_box = gr.Markdown("", elem_classes=["warning-box"]) # ذخیره وضعیت پردازشگر batch_processor = {"instance": None} with gr.Tabs(): # تب پردازش تکی with gr.Tab("پردازش تکی", elem_classes=["rtl-text"]): with gr.Row(): with gr.Column(scale=1): single_api_key = gr.Textbox( label="کلید API (اختیاری)", placeholder="اگر خالی باشد از متغیر محیطی استفاده می‌شود", type="password", elem_classes=["rtl-text"] ) single_input = gr.Textbox( label="متن ورودی", placeholder="متن فارسی خود را وارد کنید...", lines=5, elem_classes=["rtl-text"] ) single_btn = gr.Button("🔄 ناشناس‌سازی", variant="primary") with gr.Column(scale=1): single_output = gr.Textbox( label="متن ناشناس‌شده", lines=5, elem_classes=["rtl-text"] ) single_stats = gr.Textbox( label="آمار", lines=2, elem_classes=["rtl-text"] ) # تب پردازش دسته‌ای with gr.Tab("پردازش دسته‌ای CSV", elem_classes=["rtl-text"]): with gr.Row(): with gr.Column(scale=1): batch_api_key = gr.Textbox( label="🔑 کلید API Cerebras", placeholder="اگر خالی باشد از متغیر محیطی استفاده می‌شود", type="password", elem_classes=["rtl-text"] ) csv_file = gr.File( label="📁 فایل CSV", file_types=[".csv"], elem_classes=["rtl-text"] ) text_column = gr.Dropdown( label="ستون متن", choices=[], allow_custom_value=True, elem_classes=["rtl-text"] ) output_column = gr.Textbox( label="نام ستون خروجی", value="anonymized_text", elem_classes=["rtl-text"] ) with gr.Column(scale=1): gr.Markdown("### ⚙️ تنظیمات Rate Limit") delay_between_requests = gr.Slider( minimum=1, maximum=30, value=3, step=0.5, label="⏱️ تأخیر بین درخواست‌ها (ثانیه)", info="مقدار بیشتر = پایداری بیشتر" ) requests_per_minute = gr.Slider( minimum=5, maximum=30, value=20, step=1, label="📊 حداکثر درخواست در دقیقه", info="Free tier: 30 RPM" ) max_retries = gr.Slider( minimum=1, maximum=10, value=5, step=1, label="🔄 حداکثر تلاش مجدد", info="تعداد تلاش در صورت خطای 429" ) with gr.Row(): start_btn = gr.Button("🚀 شروع پردازش", variant="primary", size="lg") cancel_btn = gr.Button("⏹️ لغو پردازش", variant="stop", size="lg") # نمایش پیشرفت with gr.Row(): with gr.Column(scale=1): gr.Markdown("### 📈 پیشرفت کلی") progress_bar = gr.Slider( minimum=0, maximum=100, value=0, label="", interactive=False ) with gr.Row(): with gr.Column(scale=1): gr.Markdown("### ⏱️ زمان‌بندی") time_stats = gr.Markdown("...در انتظار شروع") with gr.Column(scale=1): gr.Markdown("### ✅ وضعیت پردازش") progress_text = gr.Markdown("...در انتظار شروع") with gr.Row(): with gr.Column(): gr.Markdown("### 📋 لاگ پردازش") process_log = gr.Textbox( lines=10, max_lines=20, elem_classes=["rtl-text"], show_label=False ) with gr.Row(): gr.Markdown("### 👁️ پیش‌نمایش نتایج") preview_table = gr.Dataframe( headers=["متن اصلی", "متن ناشناس‌شده", "وضعیت"], label="", visible=True ) output_file = gr.File( label="📥 دانلود فایل خروجی", visible=False ) # توابع کمکی def update_columns(file): """بروزرسانی لیست ستون‌ها""" if file is None: return gr.update(choices=[], value=None) try: df = pd.read_csv(file.name, nrows=0, encoding='utf-8') except: try: df = pd.read_csv(file.name, nrows=0, encoding='utf-8-sig') except: df = pd.read_csv(file.name, nrows=0, encoding='cp1256') columns = list(df.columns) default = columns[0] if columns else None # تلاش برای یافتن ستون متن for col in columns: if any(term in col.lower() for term in ['text', 'متن', 'content', 'body', 'news']): default = col break return gr.update(choices=columns, value=default) def process_single_text(text, api_key): """پردازش متن تکی""" if not text.strip(): return "", "❌ متن ورودی خالی است" try: key = api_key if api_key else os.getenv("CEREBRAS_API_KEY") if not key: return "", "❌ کلید API وارد نشده است" anonymizer = AdvancedCerebrasAnonymizer(api_key=key) result = anonymizer.anonymize_text(text) if result["success"]: stats = result.get("statistics", {}) stats_text = f"✅ شرکت: {stats.get('company',0)} | شخص: {stats.get('person',0)} | مبلغ: {stats.get('amount',0)} | درصد: {stats.get('percent',0)}" return result["anonymized_text"], stats_text else: return "", f"❌ خطا: {result.get('error', 'نامشخص')}" except Exception as e: return "", f"❌ خطا: {str(e)}" def start_batch_processing( file, text_col, output_col, delay, rpm, retries, api_key ): """شروع پردازش دسته‌ای""" if file is None: yield ( 0, "### ❌ خطا\nفایل انتخاب نشده است", "", "", None, gr.update(visible=False) ) return key = api_key if api_key else os.getenv("CEREBRAS_API_KEY") if not key: yield ( 0, "### ❌ خطا\nکلید API وارد نشده است", "", "", None, gr.update(visible=False) ) return # تنظیم rate limit rate_config = RateLimitConfig( requests_per_minute=int(rpm), min_delay_between_requests=float(delay), max_retries=int(retries) ) # ایجاد پردازشگر processor = BatchProcessor(api_key=key, rate_limit_config=rate_config) batch_processor["instance"] = processor log_lines = [] preview_data = [] # پردازش for update in processor.process_csv(file.name, text_col, output_col): update_type = update.get("type") if update_type == "error": log_lines.append(f"❌ {update['message']}") yield ( 0, f"### ❌ خطا\n{update['message']}", "", "\n".join(log_lines), None, gr.update(visible=False) ) return elif update_type == "warning": log_lines.append(f"⚠️ {update['message']}") elif update_type == "info": log_lines.append(f"ℹ️ {update['message']}") elif update_type == "progress": progress = update["progress"] current = update["current"] total = update["total"] processed = update["processed"] failed = update["failed"] elapsed = update["elapsed"] remaining = update["estimated_remaining"] next_wait = update.get("next_wait", 0) rate_status = update.get("rate_status", {}) # نمایش وضعیت rate limiter rate_info = "" if rate_status.get("is_quota_exhausted"): rate_info = "\n- **⚠️ وضعیت:** انتظار برای بازیابی quota" elif rate_status.get("consecutive_failures", 0) > 0: rate_info = f"\n- **⚠️ خطاهای متوالی:** {rate_status['consecutive_failures']}" progress_md = f""" ### 📈 وضعیت پردازش - **پردازش شده:** {current}/{total} ({progress:.1f}%) - **موفق:** {processed} ✅ - **ناموفق:** {failed} ❌ - **تأخیر بعدی:** {next_wait:.1f} ثانیه{rate_info} """ time_md = f""" ### ⏱️ زمان‌بندی - **سپری شده:** {elapsed/60:.1f} دقیقه - **تخمین باقیمانده:** {remaining/60:.1f} دقیقه - **سرعت:** {current/elapsed*60:.1f} ردیف/دقیقه """ # بروزرسانی لاگ هر 10 ردیف if current % 10 == 0 or current == total: log_lines.append(f"📊 پردازش {current}/{total} - موفق: {processed}, ناموفق: {failed}") # بروزرسانی پیش‌نمایش last_result = update.get("last_result", {}) if last_result.get("success"): preview_data.append([ "...", last_result.get("anonymized_text", "")[:100] + "...", "✅ موفق" ]) if len(preview_data) > 5: preview_data = preview_data[-5:] yield ( progress, progress_md, time_md, "\n".join(log_lines[-20:]), preview_data if preview_data else None, gr.update(visible=False) ) elif update_type == "cancelled": log_lines.append(f"⏹️ {update['message']}") yield ( 0, f"### ⏹️ لغو شد\nپردازش شده: {update['processed']}, ناموفق: {update['failed']}", "", "\n".join(log_lines), preview_data if preview_data else None, gr.update(visible=False) ) return elif update_type == "complete": total_time = update["total_time"] log_lines.append(f"✅ {update['message']}") log_lines.append(f"📁 فایل خروجی: {update['output_path']}") progress_md = f""" ### ✅ پردازش تکمیل شد! - **کل ردیف‌ها:** {update['total']} - **موفق:** {update['processed']} ✅ - **ناموفق:** {update['failed']} ❌ - **زمان کل:** {total_time/60:.1f} دقیقه """ time_md = f""" ### 📊 آمار نهایی - **سرعت میانگین:** {update['total']/total_time*60:.1f} ردیف/دقیقه - **نرخ موفقیت:** {update['processed']/update['total']*100:.1f}% """ yield ( 100, progress_md, time_md, "\n".join(log_lines), preview_data if preview_data else None, gr.update(value=update['output_path'], visible=True) ) def cancel_processing(): """لغو پردازش""" if batch_processor["instance"]: batch_processor["instance"].cancel() return "⏹️ درخواست لغو ارسال شد..." # اتصال رویدادها csv_file.change( fn=update_columns, inputs=[csv_file], outputs=[text_column] ) single_btn.click( fn=process_single_text, inputs=[single_input, single_api_key], outputs=[single_output, single_stats] ) start_btn.click( fn=start_batch_processing, inputs=[ csv_file, text_column, output_column, delay_between_requests, requests_per_minute, max_retries, batch_api_key ], outputs=[ progress_bar, progress_text, time_stats, process_log, preview_table, output_file ] ) cancel_btn.click( fn=cancel_processing, outputs=[process_log] ) return interface # اجرای برنامه if __name__ == "__main__": interface = create_batch_interface() interface.launch( server_name="0.0.0.0", server_port=7860, share=True, show_error=True )