import requests import json import gradio as gr from typing import Dict, Any, List, Generator import os from dataclasses import dataclass import re import pandas as pd import time from datetime import datetime import threading from queue import Queue import io @dataclass class CerebrasConfig: """تنظیمات Cerebras API""" api_key: str base_url: str = "https://api.cerebras.ai/v1" model: str = "llama-3.3-70b" max_tokens: int = 2000 temperature: float = 0.1 @dataclass class RateLimitConfig: """تنظیمات محدودیت نرخ درخواست برای Cerebras""" # محدودیتهای Cerebras Free Tier requests_per_minute: int = 30 # حداکثر درخواست در دقیقه tokens_per_minute: int = 60000 # حداکثر توکن در دقیقه min_delay_between_requests: float = 2.0 # حداقل تأخیر بین درخواستها (ثانیه) max_retries: int = 5 # حداکثر تلاش مجدد initial_backoff: float = 5.0 # تأخیر اولیه برای backoff (ثانیه) max_backoff: float = 120.0 # حداکثر تأخیر backoff (ثانیه) backoff_multiplier: float = 2.0 # ضریب افزایش تأخیر class RateLimiter: """مدیریت محدودیت نرخ درخواست""" def __init__(self, config: RateLimitConfig): self.config = config self.request_times: List[float] = [] self.lock = threading.Lock() self.consecutive_failures = 0 def wait_if_needed(self) -> float: """انتظار تا زمان مجاز ارسال درخواست بعدی""" with self.lock: now = time.time() # پاک کردن درخواستهای قدیمیتر از 1 دقیقه self.request_times = [t for t in self.request_times if now - t < 60] # محاسبه زمان انتظار wait_time = 0.0 # اگر به محدودیت درخواست در دقیقه رسیدهایم if len(self.request_times) >= self.config.requests_per_minute: oldest_request = min(self.request_times) wait_time = max(wait_time, 60 - (now - oldest_request) + 1) # حداقل تأخیر بین درخواستها if self.request_times: time_since_last = now - max(self.request_times) if time_since_last < self.config.min_delay_between_requests: wait_time = max(wait_time, self.config.min_delay_between_requests - time_since_last) # افزایش تأخیر در صورت خطاهای متوالی if self.consecutive_failures > 0: failure_wait = min( self.config.initial_backoff * (self.config.backoff_multiplier ** self.consecutive_failures), self.config.max_backoff ) wait_time = max(wait_time, failure_wait) if wait_time > 0: time.sleep(wait_time) self.request_times.append(time.time()) return wait_time def report_success(self): """گزارش موفقیت درخواست""" with self.lock: self.consecutive_failures = 0 def report_failure(self, is_rate_limit: bool = False): """گزارش شکست درخواست""" with self.lock: if is_rate_limit: self.consecutive_failures += 1 else: # برای خطاهای غیر rate limit، کمتر افزایش میدهیم self.consecutive_failures = min(self.consecutive_failures + 0.5, 3) def get_estimated_wait_time(self) -> float: """تخمین زمان انتظار برای درخواست بعدی""" with self.lock: now = time.time() self.request_times = [t for t in self.request_times if now - t < 60] if len(self.request_times) >= self.config.requests_per_minute: oldest_request = min(self.request_times) return max(0, 60 - (now - oldest_request) + 1) return self.config.min_delay_between_requests class AdvancedCerebrasAnonymizer: """سیستم پیشرفته ناشناسسازی متون مالی/خبری فارسی""" def __init__(self, api_key: str = None, rate_limit_config: RateLimitConfig = None): if api_key is None: api_key = os.getenv("CEREBRAS_API_KEY") if not api_key: raise ValueError("کلید API یافت نشد") self.config = CerebrasConfig(api_key=api_key) self.rate_limit_config = rate_limit_config or RateLimitConfig() self.rate_limiter = RateLimiter(self.rate_limit_config) self.system_prompt = self._create_advanced_system_prompt() def _create_advanced_system_prompt(self) -> str: """ایجاد دستورالعمل سیستمی پیشرفته برای Cerebras""" return """شما یک «ناشناسساز متون مالی/خبری فارسی» هستید. وظیفهتان جایگزینی اسامی خاص و مقادیر عددی با شناسههای بیمعناست. ## **قوانین اندیسگذاری - CRITICAL** ### **1. ترتیب شمارهگذاری الزامی:** - شرکتها: company-01, company-02, company-03, company-04, ... (پیوسته و بدون گپ) - اشخاص: person-01, person-02, person-03, ... (پیوسته و بدون گپ) - اعداد: amount-01, amount-02, amount-03, ... (پیوسته و بدون گپ) - درصدها: percent-01, percent-02, percent-03, ... (پیوسته و بدون گپ) ### **2. ثبات شناسهها در متن:** - اگر "همراه اول" اولبار company-01 شد، در تمام متن همان باشد - اگر "مهدی احمدی" اولبار person-01 شد، در تمام متن همان باشد ### **3. تشخیص صحیح انواع:** **شرکت/سازمان:** همراه اول، بانک ملی، ایرانخودرو، سایپا، بانک مرکزی، سامانه کدال، وزارت نفت، سازمان تنظیم مقررات رادیویی، سازمان تامین اجتماعی **⚠️ CRITICAL - گروهها:** "گروه همراه اول"، "گروه اقتصادی آزادگان"، "گروه مالی صبا" → همه company-XX هستند (نه group-XX) **⚠️ CRITICAL - کلمات عمومی:** "سه شرکت دارویی"، "چند بانک"، "یک شرکت" → کلمات عمومی هستند، موجودیت نیستند (حفظ شوند) **⚠️ CRITICAL - نامهای مستعار:** "فاما" همان "فولاد مبارکه اصفهان" است → هر دو company-01 **شخص:** مهدی اخوان بهابادی، محمدرضا فرزین، ابوالفضل نجارزاده **عدد:** 37، 70، 677، 73.7، 178 (هر عددی) **درصد:** 37 درصدی، 15 درصدی، 53 درصد، 43% ## **مثالهای صحیح:** ### **مثال 1 (الگوی کامل):** **ورودی:** مهدی اخوان بهابادی، مدیرعامل همراه اول، اعلام کرد درآمد عملیاتی شرکت با رشد 37 درصدی به 70 هزار و 677 میلیارد تومان رسیده است. **خروجی صحیح:** person-01، مدیرعامل company-01، اعلام کرد درآمد عملیاتی شرکت با رشد percent-01 به amount-01 رسیده است. ### **مثال 2:** **ورودی:** بانک مرکزی و بانک ملی با همکاری محمدرضا فرزین، 60 درصد سپردهها را مدیریت کردند. **خروجی:** company-01 و company-02 با همکاری person-01، percent-01 سپردهها را مدیریت کردند. ## **⚠️ CRITICAL - دورههای زمانی را حفظ کن:** - "۹ ماهه" → حفظ شود (نه amount-XX) - "۵ ماهه سال" → حفظ شود (نه amount-XX) - "۳ ماهه اول" → حفظ شود (نه amount-XX) ## **موارد حفظ شده:** - تاریخها: 1404/04/23، 30 آذر 1403، پاییز 1401 - فصلهای سال: پاییز، بهار، تابستان، زمستان - عناوین شغلی: مدیرعامل، رئیس کل، مدیرکل - واحدها: میلیارد تومان، همت، ریال، ماه، سال - مکانها: تهران، اصفهان، ایران **فقط متن ناشناسشده را برگردان - هیچ توضیح اضافی نیاز نیست.** """ def _make_api_request_with_retry(self, text: str) -> Dict[str, Any]: """ارسال درخواست به Cerebras API با مدیریت rate limit و retry""" headers = { "Authorization": f"Bearer {self.config.api_key}", "Content-Type": "application/json" } payload = { "messages": [ {"role": "system", "content": self.system_prompt}, {"role": "user", "content": text} ], "model": self.config.model, "temperature": self.config.temperature, "max_tokens": self.config.max_tokens } last_error = None for attempt in range(self.rate_limit_config.max_retries): # انتظار قبل از ارسال درخواست wait_time = self.rate_limiter.wait_if_needed() try: response = requests.post( f"{self.config.base_url}/chat/completions", headers=headers, json=payload, timeout=60 ) # بررسی خطای rate limit (429) if response.status_code == 429: self.rate_limiter.report_failure(is_rate_limit=True) # استخراج زمان انتظار از هدر (اگر موجود باشد) retry_after = response.headers.get('Retry-After') if retry_after: wait_seconds = int(retry_after) else: # محاسبه exponential backoff wait_seconds = min( self.rate_limit_config.initial_backoff * (self.rate_limit_config.backoff_multiplier ** attempt), self.rate_limit_config.max_backoff ) last_error = f"محدودیت نرخ درخواست (429). تلاش {attempt + 1}/{self.rate_limit_config.max_retries}. انتظار {wait_seconds:.1f} ثانیه..." time.sleep(wait_seconds) continue response.raise_for_status() self.rate_limiter.report_success() return response.json() except requests.exceptions.Timeout: self.rate_limiter.report_failure(is_rate_limit=False) last_error = f"خطای timeout. تلاش {attempt + 1}/{self.rate_limit_config.max_retries}" time.sleep(self.rate_limit_config.initial_backoff) except requests.exceptions.RequestException as e: self.rate_limiter.report_failure(is_rate_limit=False) last_error = f"خطای شبکه: {str(e)}. تلاش {attempt + 1}/{self.rate_limit_config.max_retries}" time.sleep(self.rate_limit_config.initial_backoff) raise Exception(f"ناموفق پس از {self.rate_limit_config.max_retries} تلاش. آخرین خطا: {last_error}") def anonymize_text(self, text: str) -> Dict[str, Any]: """ناشناسسازی متن با استفاده از Cerebras""" if not text or not text.strip(): return { "success": False, "error": "متن ورودی خالی است", "anonymized_text": "" } try: response = self._make_api_request_with_retry(text) if "choices" not in response or not response["choices"]: return { "success": False, "error": "پاسخ نامعتبر از API", "anonymized_text": "" } content = response["choices"][0]["message"]["content"] content = self._clean_markdown(content) content = content.strip() analysis = self._analyze_anonymized_text(content) return { "success": True, "anonymized_text": content, "entities": analysis["entities"], "statistics": analysis["statistics"], "usage": response.get("usage", {}) } except Exception as e: return { "success": False, "error": f"خطا در پردازش: {str(e)}", "anonymized_text": "" } def _clean_markdown(self, content: str) -> str: """پاک کردن markdown از پاسخ""" if "```" in content: lines = content.split('\n') clean_lines = [] skip = False for line in lines: if line.strip().startswith('```'): skip = not skip continue if not skip: clean_lines.append(line) content = '\n'.join(clean_lines) return content def _analyze_anonymized_text(self, text: str) -> Dict[str, Any]: """تحلیل متن ناشناسسازی شده""" companies = re.findall(r'company-(\d+)', text) persons = re.findall(r'person-(\d+)', text) amounts = re.findall(r'amount-(\d+)', text) percents = re.findall(r'percent-(\d+)', text) statistics = { "company": len(set(companies)), "person": len(set(persons)), "amount": len(set(amounts)), "percent": len(set(percents)), "total_replacements": len(companies) + len(persons) + len(amounts) + len(percents) } entities = { "companies": sorted(list(set(companies)), key=lambda x: int(x)), "persons": sorted(list(set(persons)), key=lambda x: int(x)), "amounts": sorted(list(set(amounts)), key=lambda x: int(x)), "percents": sorted(list(set(percents)), key=lambda x: int(x)) } return { "statistics": statistics, "entities": entities } class BatchProcessor: """پردازشگر دستهای فایلهای CSV""" def __init__(self, api_key: str, rate_limit_config: RateLimitConfig = None): self.api_key = api_key self.rate_limit_config = rate_limit_config or RateLimitConfig() self.anonymizer = None self.is_cancelled = False self.current_progress = 0 self.total_rows = 0 self.processed_rows = 0 self.failed_rows = 0 self.start_time = None def cancel(self): """لغو پردازش""" self.is_cancelled = True def reset(self): """بازنشانی وضعیت""" self.is_cancelled = False self.current_progress = 0 self.total_rows = 0 self.processed_rows = 0 self.failed_rows = 0 self.start_time = None def process_csv( self, file_path: str, text_column: str, output_column: str = "anonymized_text", progress_callback=None ) -> Generator[Dict[str, Any], None, None]: """پردازش فایل CSV به صورت streaming""" self.reset() self.start_time = time.time() # خواندن فایل CSV try: df = pd.read_csv(file_path, encoding='utf-8') except UnicodeDecodeError: try: df = pd.read_csv(file_path, encoding='utf-8-sig') except: df = pd.read_csv(file_path, encoding='cp1256') if text_column not in df.columns: yield { "type": "error", "message": f"ستون '{text_column}' در فایل یافت نشد. ستونهای موجود: {list(df.columns)}" } return self.total_rows = len(df) # ایجاد anonymizer self.anonymizer = AdvancedCerebrasAnonymizer( api_key=self.api_key, rate_limit_config=self.rate_limit_config ) # ایجاد ستون خروجی df[output_column] = "" df["anonymization_status"] = "" df["entities_found"] = "" yield { "type": "info", "message": f"🚀 شروع پردازش {self.total_rows} ردیف...", "total": self.total_rows } results = [] for idx, row in df.iterrows(): if self.is_cancelled: yield { "type": "cancelled", "message": "پردازش توسط کاربر لغو شد", "processed": self.processed_rows, "failed": self.failed_rows } break text = str(row[text_column]) if pd.notna(row[text_column]) else "" if not text.strip(): df.at[idx, output_column] = "" df.at[idx, "anonymization_status"] = "خالی" df.at[idx, "entities_found"] = "" self.processed_rows += 1 continue # پردازش متن result = self.anonymizer.anonymize_text(text) if result["success"]: df.at[idx, output_column] = result["anonymized_text"] df.at[idx, "anonymization_status"] = "موفق" stats = result.get("statistics", {}) entities_summary = f"شرکت:{stats.get('company',0)} | شخص:{stats.get('person',0)} | مبلغ:{stats.get('amount',0)} | درصد:{stats.get('percent',0)}" df.at[idx, "entities_found"] = entities_summary self.processed_rows += 1 else: df.at[idx, output_column] = f"خطا: {result.get('error', 'نامشخص')}" df.at[idx, "anonymization_status"] = "ناموفق" df.at[idx, "entities_found"] = "" self.failed_rows += 1 # محاسبه پیشرفت و زمان باقیمانده self.current_progress = (idx + 1) / self.total_rows * 100 elapsed = time.time() - self.start_time avg_time_per_row = elapsed / (idx + 1) remaining_rows = self.total_rows - (idx + 1) estimated_remaining = avg_time_per_row * remaining_rows # تخمین زمان انتظار بعدی next_wait = self.anonymizer.rate_limiter.get_estimated_wait_time() yield { "type": "progress", "current": idx + 1, "total": self.total_rows, "progress": self.current_progress, "processed": self.processed_rows, "failed": self.failed_rows, "elapsed": elapsed, "estimated_remaining": estimated_remaining, "next_wait": next_wait, "last_result": result } # ذخیره نتیجه نهایی if not self.is_cancelled: output_path = file_path.replace('.csv', '_anonymized.csv') if output_path == file_path: output_path = file_path + '_anonymized.csv' df.to_csv(output_path, index=False, encoding='utf-8-sig') total_time = time.time() - self.start_time yield { "type": "complete", "message": "✅ پردازش با موفقیت تکمیل شد!", "output_path": output_path, "total": self.total_rows, "processed": self.processed_rows, "failed": self.failed_rows, "total_time": total_time, "dataframe": df } def create_batch_interface(): """ایجاد رابط کاربری برای پردازش دستهای""" api_key_available = bool(os.getenv("CEREBRAS_API_KEY")) custom_css = """ .gradio-container { font-family: 'Tahoma', 'Arial', sans-serif !important; direction: rtl; max-width: 1400px; margin: 0 auto; } .progress-bar { background-color: #e9ecef; border-radius: 10px; height: 30px; overflow: hidden; } .progress-fill { background: linear-gradient(90deg, #28a745, #20c997); height: 100%; transition: width 0.3s ease; } .stats-card { background-color: #f8f9fa; border-radius: 10px; padding: 15px; margin: 10px 0; border: 1px solid #dee2e6; } .warning-box { background-color: #fff3cd; border: 2px solid #ffeaa7; border-radius: 12px; padding: 15px; color: #856404; margin: 10px 0; } .success-box { background-color: #d4edda; border: 2px solid #c3e6cb; border-radius: 12px; padding: 15px; color: #155724; margin: 10px 0; } .info-box { background-color: #d1ecf1; border: 2px solid #bee5eb; border-radius: 12px; padding: 15px; color: #0c5460; margin: 10px 0; } """ # متغیرهای سراسری برای مدیریت پردازش batch_processor = {"instance": None} with gr.Blocks(css=custom_css, title="پردازش دستهای ناشناسسازی با Cerebras", theme=gr.themes.Soft()) as interface: gr.Markdown(""" # 🔒 سیستم پردازش دستهای ناشناسسازی متون فارسی ### ⚡ قدرتگرفته از Cerebras AI با مدیریت هوشمند Rate Limit """) with gr.Tabs(): # تب پردازش تکی with gr.Tab("📝 پردازش تکی"): if api_key_available: gr.Markdown('