import asyncio from concurrent.futures import ThreadPoolExecutor import threading import torch from transformers import AutoTokenizer, AutoModelForSeq2SeqLM import time import json import hashlib import re from datetime import datetime, timedelta import threading from queue import Queue import logging from typing import Dict, List, Tuple, Optional from fastapi import FastAPI, HTTPException, Request, Form from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel import uvicorn import uuid # Enhanced logging configuration logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler('translation.log') ] ) logger = logging.getLogger(__name__) # Global storage for translation requests (WordPress integration) translation_requests = {} completed_translations = {} translation_requests_lock = threading.Lock() # Pydantic models for request/response class TranslationRequest(BaseModel): text: str source_lang: str target_lang: str api_key: Optional[str] = None class TranslationResponse(BaseModel): translation: str source_language: str target_language: str processing_time: float character_count: int status: str chunks_processed: Optional[int] = None estimated_time_remaining: Optional[float] = None current_chunk: Optional[int] = None total_chunks: Optional[int] = None class TranslationCache: def __init__(self, cache_duration_minutes: int = 60): self.cache = {} self.cache_duration = timedelta(minutes=cache_duration_minutes) self.lock = threading.Lock() def _generate_key(self, text: str, source_lang: str, target_lang: str) -> str: """Generate cache key from text and languages""" content = f"{text}_{source_lang}_{target_lang}" return hashlib.md5(content.encode()).hexdigest() def get(self, text: str, source_lang: str, target_lang: str) -> str: """Get translation from cache if exists and not expired""" with self.lock: key = self._generate_key(text, source_lang, target_lang) if key in self.cache: translation, timestamp = self.cache[key] if datetime.now() - timestamp < self.cache_duration: logger.info(f"[CACHE HIT] Retrieved cached translation for key: {key[:8]}... | Length: {len(translation)} chars") return translation else: # Remove expired entry del self.cache[key] logger.info(f"[CACHE EXPIRED] Removed expired cache entry for key: {key[:8]}...") logger.info(f"[CACHE MISS] No cached translation found for key: {key[:8]}...") return None def set(self, text: str, source_lang: str, target_lang: str, translation: str): """Store translation in cache""" with self.lock: key = self._generate_key(text, source_lang, target_lang) self.cache[key] = (translation, datetime.now()) logger.info(f"[CACHE STORE] Cached translation for key: {key[:8]}... | Translation length: {len(translation)} chars") class TranslationQueue: def __init__(self, max_workers: int = 3): self.queue = Queue() self.max_workers = max_workers self.current_workers = 0 self.lock = threading.Lock() def add_task(self, task_func, *args, **kwargs): """Add translation task to queue""" self.queue.put((task_func, args, kwargs)) logger.info(f"[QUEUE] Added task to queue | Queue size: {self.queue.qsize()}") def process_queue(self): """Process tasks from queue""" while not self.queue.empty(): with self.lock: if self.current_workers >= self.max_workers: time.sleep(0.1) continue if not self.queue.empty(): task_func, args, kwargs = self.queue.get() self.current_workers += 1 logger.info(f"[QUEUE] Starting worker | Current workers: {self.current_workers}") def worker(): try: result = task_func(*args, **kwargs) return result finally: with self.lock: self.current_workers -= 1 logger.info(f"[QUEUE] Worker finished | Current workers: {self.current_workers}") thread = threading.Thread(target=worker) thread.start() class TextChunker: """کلاس برای تقسیم متن طولانی به بخش‌های کوچک‌تر""" @staticmethod def split_text_smart(text: str, max_chunk_size: int = 400) -> List[str]: """تقسیم هوشمند متن بر اساس جملات و پاراگراف‌ها""" logger.info(f"[CHUNKER] Starting smart text splitting | Text length: {len(text)} chars | Max chunk size: {max_chunk_size}") if len(text) <= max_chunk_size: logger.info(f"[CHUNKER] Text is small, no chunking needed | Length: {len(text)}") return [text] chunks = [] # تقسیم بر اساس پاراگراف‌ها paragraphs = text.split('\n\n') current_chunk = "" for i, paragraph in enumerate(paragraphs): logger.debug(f"[CHUNKER] Processing paragraph {i+1}/{len(paragraphs)} | Length: {len(paragraph)}") # اگر پاراگراف خودش بزرگ است آن را تقسیم کن if len(paragraph) > max_chunk_size: # ذخیره قسمت فعلی اگر وجود دارد if current_chunk.strip(): chunks.append(current_chunk.strip()) logger.debug(f"[CHUNKER] Added chunk from accumulated paragraphs | Length: {len(current_chunk.strip())}") current_chunk = "" # تقسیم پاراگراف بزرگ sub_chunks = TextChunker._split_paragraph(paragraph, max_chunk_size) chunks.extend(sub_chunks) logger.debug(f"[CHUNKER] Split large paragraph into {len(sub_chunks)} sub-chunks") else: # بررسی اینکه آیا اضافه کردن این پاراگراف از حد تجاوز می‌کند if len(current_chunk) + len(paragraph) + 2 > max_chunk_size: if current_chunk.strip(): chunks.append(current_chunk.strip()) logger.debug(f"[CHUNKER] Added chunk | Length: {len(current_chunk.strip())}") current_chunk = paragraph else: if current_chunk: current_chunk += "\n\n" + paragraph else: current_chunk = paragraph # اضافه کردن آخرین قسمت if current_chunk.strip(): chunks.append(current_chunk.strip()) logger.debug(f"[CHUNKER] Added final chunk | Length: {len(current_chunk.strip())}") logger.info(f"[CHUNKER] Text splitting completed | Total chunks: {len(chunks)} | Average chunk size: {sum(len(c) for c in chunks) / len(chunks):.1f} chars") return chunks @staticmethod def _split_paragraph(paragraph: str, max_chunk_size: int) -> List[str]: """تقسیم پاراگراف بزرگ به جملات""" logger.debug(f"[CHUNKER] Splitting large paragraph | Length: {len(paragraph)}") # تقسیم بر اساس جملات sentences = re.split(r'[.!?]+\s+', paragraph) chunks = [] current_chunk = "" for sentence in sentences: if not sentence.strip(): continue # اضافه کردن علامت نقطه اگر حذف شده if not sentence.endswith(('.', '!', '?')): sentence += '.' if len(sentence) > max_chunk_size: # جمله خودش خیلی بلند است - تقسیم بر اساس کاما if current_chunk.strip(): chunks.append(current_chunk.strip()) current_chunk = "" sub_chunks = TextChunker._split_by_comma(sentence, max_chunk_size) chunks.extend(sub_chunks) else: if len(current_chunk) + len(sentence) + 1 > max_chunk_size: if current_chunk.strip(): chunks.append(current_chunk.strip()) current_chunk = sentence else: if current_chunk: current_chunk += " " + sentence else: current_chunk = sentence if current_chunk.strip(): chunks.append(current_chunk.strip()) logger.debug(f"[CHUNKER] Paragraph split into {len(chunks)} sentence chunks") return chunks @staticmethod def _split_by_comma(sentence: str, max_chunk_size: int) -> List[str]: """تقسیم جمله طولانی بر اساس کاما""" logger.debug(f"[CHUNKER] Splitting long sentence by comma | Length: {len(sentence)}") parts = sentence.split(', ') chunks = [] current_chunk = "" for part in parts: if len(part) > max_chunk_size: # قسمت خودش خیلی بلند است - تقسیم اجباری if current_chunk.strip(): chunks.append(current_chunk.strip()) current_chunk = "" # تقسیم اجباری بر اساس طول while len(part) > max_chunk_size: chunks.append(part[:max_chunk_size].strip()) part = part[max_chunk_size:].strip() if part: current_chunk = part else: if len(current_chunk) + len(part) + 2 > max_chunk_size: if current_chunk.strip(): chunks.append(current_chunk.strip()) current_chunk = part else: if current_chunk: current_chunk += ", " + part else: current_chunk = part if current_chunk.strip(): chunks.append(current_chunk.strip()) return chunks class MultilingualTranslator: def __init__(self, cache_duration_minutes: int = 60): self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") logger.info(f"[INIT] Using device: {self.device}") # Initialize cache and queue self.cache = TranslationCache(cache_duration_minutes) self.queue = TranslationQueue() # Add thread pool for parallel processing self.executor = ThreadPoolExecutor(max_workers=3) self.background_tasks = {} logger.info(f"[INIT] Thread pool initialized with 3 workers") # Load model - using a powerful multilingual model self.model_name = "facebook/m2m100_1.2B" logger.info(f"[INIT] Loading model: {self.model_name}") try: self.tokenizer = AutoTokenizer.from_pretrained(self.model_name) self.model = AutoModelForSeq2SeqLM.from_pretrained(self.model_name) self.model.to(self.device) logger.info(f"[INIT] Model loaded successfully on {self.device}!") except Exception as e: logger.error(f"[INIT] Error loading model: {e}") raise # تنظیمات بهینه برای ترجمه متن‌های بلند self.max_chunk_size = 350 # حداکثر طول هر قسمت self.min_chunk_overlap = 20 # همپوشانی بین قسمت‌ها # Track translation progress self.current_translation = {} self.translation_lock = threading.Lock() logger.info(f"[INIT] Translator initialized | Max chunk size: {self.max_chunk_size} chars") def translate_chunk(self, text: str, source_lang: str, target_lang: str, chunk_index: int = 0, total_chunks: int = 1) -> str: """ترجمه یک قسمت کوچک از متن""" try: logger.info(f"[TRANSLATE] Starting chunk translation [{chunk_index+1}/{total_chunks}] | {source_lang} → {target_lang} | Length: {len(text)} chars") # Set source language for tokenizer self.tokenizer.src_lang = source_lang # Encode input encoded = self.tokenizer(text, return_tensors="pt", truncation=True, max_length=512).to(self.device) logger.debug(f"[TRANSLATE] Text encoded | Input tokens: {encoded.input_ids.shape[1]}") # Generate translation with optimized parameters start_time = time.time() generated_tokens = self.model.generate( **encoded, forced_bos_token_id=self.tokenizer.get_lang_id(target_lang), max_length=1024, # افزایش طول خروجی min_length=10, # حداقل طول خروجی num_beams=5, # افزایش تعداد beam ها برای کیفیت بهتر early_stopping=True, no_repeat_ngram_size=3, # جلوگیری از تکرار length_penalty=1.0, # تنظیم جریمه طول repetition_penalty=1.2, # جلوگیری از تکرار کلمات do_sample=False, # استفاده از روش قطعی temperature=0.7, # کنترل تنوع pad_token_id=self.tokenizer.pad_token_id, eos_token_id=self.tokenizer.eos_token_id ) generation_time = time.time() - start_time # Decode result translation = self.tokenizer.batch_decode(generated_tokens, skip_special_tokens=True)[0] # پاک‌سازی ترجمه از کاراکترهای اضافی translation = translation.strip() logger.info(f"[TRANSLATE] Chunk translation completed [{chunk_index+1}/{total_chunks}] | Generation time: {generation_time:.2f}s | Output length: {len(translation)} chars") return translation except Exception as e: logger.error(f"[TRANSLATE] Chunk translation error [{chunk_index+1}/{total_chunks}]: {e}") return f"[Translation Error: {str(e)}]" def translate_text(self, text: str, source_lang: str, target_lang: str, session_id: str = None) -> Tuple[str, float, int]: """ترجمه متن با پشتیبانی از متن‌های طولانی و لاگ‌های مفصل""" start_time = time.time() if not session_id: session_id = hashlib.md5(f"{text[:100]}{time.time()}".encode()).hexdigest()[:8] logger.info(f"[SESSION:{session_id}] Starting translation | {source_lang} → {target_lang} | Text length: {len(text)} chars") # بررسی کش برای کل متن cached_result = self.cache.get(text, source_lang, target_lang) if cached_result: logger.info(f"[SESSION:{session_id}] Translation completed from cache | Time: {time.time() - start_time:.2f}s") return cached_result, time.time() - start_time, 1 try: # اگر متن کوتاه است مستقیماً ترجمه کن if len(text) <= self.max_chunk_size: logger.info(f"[SESSION:{session_id}] Processing as short text") translation = self.translate_chunk(text, source_lang, target_lang, 0, 1) # ذخیره در کش self.cache.set(text, source_lang, target_lang, translation) processing_time = time.time() - start_time logger.info(f"[SESSION:{session_id}] Short text translation completed | Total time: {processing_time:.2f}s") return translation, processing_time, 1 # تقسیم متن طولانی به قسمت‌های کوچک‌تر logger.info(f"[SESSION:{session_id}] Processing as long text - starting chunking") chunks = TextChunker.split_text_smart(text, self.max_chunk_size) logger.info(f"[SESSION:{session_id}] Text split into {len(chunks)} chunks") # Initialize progress tracking with self.translation_lock: self.current_translation[session_id] = { 'total_chunks': len(chunks), 'completed_chunks': 0, 'start_time': start_time, 'source_lang': source_lang, 'target_lang': target_lang } # ترجمه هر قسمت translated_chunks = [] for i, chunk in enumerate(chunks): chunk_start_time = time.time() logger.info(f"[SESSION:{session_id}] Starting chunk {i+1}/{len(chunks)} | Chunk length: {len(chunk)} chars") # بررسی کش برای هر قسمت chunk_translation = self.cache.get(chunk, source_lang, target_lang) if not chunk_translation: # Estimate remaining time if i > 0: elapsed_time = time.time() - start_time avg_time_per_chunk = elapsed_time / i estimated_remaining = avg_time_per_chunk * (len(chunks) - i) logger.info(f"[SESSION:{session_id}] Progress: {i}/{len(chunks)} | Avg time per chunk: {avg_time_per_chunk:.1f}s | Estimated remaining: {estimated_remaining:.1f}s") chunk_translation = self.translate_chunk(chunk, source_lang, target_lang, i, len(chunks)) # ذخیره قسمت در کش self.cache.set(chunk, source_lang, target_lang, chunk_translation) chunk_time = time.time() - chunk_start_time logger.info(f"[SESSION:{session_id}] Chunk {i+1}/{len(chunks)} translated in {chunk_time:.2f}s") else: logger.info(f"[SESSION:{session_id}] Chunk {i+1}/{len(chunks)} retrieved from cache") translated_chunks.append(chunk_translation) # Update progress with self.translation_lock: if session_id in self.current_translation: self.current_translation[session_id]['completed_chunks'] = i + 1 # کمی استراحت بین ترجمه‌ها برای جلوگیری از بارذاری زیاد if i < len(chunks) - 1: time.sleep(0.1) # ترکیب قسمت‌های ترجمه شده logger.info(f"[SESSION:{session_id}] Combining translated chunks") final_translation = self._combine_translations(translated_chunks, text) # ذخیره نتیجه نهایی در کش self.cache.set(text, source_lang, target_lang, final_translation) processing_time = time.time() - start_time # Mark as completed for WordPress integration logger.info(f"[SESSION:{session_id}] Long text translation completed | Total time: {processing_time:.2f}s | Chunks: {len(chunks)} | Final length: {len(final_translation)} chars") # Store in completed_translations for WordPress to check with translation_requests_lock: completed_translations[session_id] = { 'translation': final_translation, 'processing_time': processing_time, 'character_count': len(text), 'source_lang': source_lang, 'target_lang': target_lang, 'completed_at': datetime.now().isoformat(), 'request_id': session_id, 'status': 'completed' } # Remove from processing requests if exists if session_id in translation_requests: del translation_requests[session_id] # Clean up progress tracking with self.translation_lock: self.current_translation.pop(session_id, None) return final_translation, processing_time, len(chunks) except Exception as e: logger.error(f"[SESSION:{session_id}] Translation error: {e}") # Clean up progress tracking with self.translation_lock: self.current_translation.pop(session_id, None) return f"Translation error: {str(e)}", time.time() - start_time, 0 def get_translation_progress(self, session_id: str) -> Dict: """Get current translation progress""" with self.translation_lock: if session_id not in self.current_translation: return None progress = self.current_translation[session_id].copy() elapsed_time = time.time() - progress['start_time'] if progress['completed_chunks'] > 0: avg_time_per_chunk = elapsed_time / progress['completed_chunks'] remaining_chunks = progress['total_chunks'] - progress['completed_chunks'] estimated_remaining = avg_time_per_chunk * remaining_chunks else: estimated_remaining = None return { 'total_chunks': progress['total_chunks'], 'completed_chunks': progress['completed_chunks'], 'elapsed_time': elapsed_time, 'estimated_remaining': estimated_remaining, 'progress_percentage': (progress['completed_chunks'] / progress['total_chunks']) * 100 } def _combine_translations(self, translated_chunks: List[str], original_text: str) -> str: """ترکیب قسمت‌های ترجمه شده به یک متن یکپارچه""" if not translated_chunks: return "" if len(translated_chunks) == 1: return translated_chunks[0] logger.debug(f"[COMBINER] Combining {len(translated_chunks)} translated chunks") # ترکیب قسمت‌ها با در نظر گیری ساختار اصلی متن combined = [] for i, chunk in enumerate(translated_chunks): # پاک‌سازی قسمت chunk = chunk.strip() if not chunk: continue # اضافه کردن فاصله مناسب بین قسمت‌ها if i > 0 and combined: # اگر قسمت قبلی با نقطه تمام نمی‌شود فاصله اضافه کن if not combined[-1].rstrip().endswith(('.', '!', '?', ':', '۔', '.')): combined[-1] += '.' # بررسی اینکه آیا نیاز به پاراگراف جدید دارکم if '\n\n' in original_text: combined.append('\n\n' + chunk) else: combined.append(' ' + chunk) else: combined.append(chunk) result = ''.join(combined) # پاک‌سازی نهایی result = re.sub(r'\s+', ' ', result) # حذف فاصله‌های اضافی result = re.sub(r'\.+', '.', result) # حذف نقطه‌های تکراری result = result.strip() logger.debug(f"[COMBINER] Combined translation length: {len(result)} chars") return result async def translate_text_async(self, text: str, source_lang: str, target_lang: str, session_id: str = None): """Async wrapper for translate_text""" loop = asyncio.get_event_loop() return await loop.run_in_executor( self.executor, self.translate_text, text, source_lang, target_lang, session_id ) def process_heavy_translation_background(request_id: str, text: str, source_lang: str, target_lang: str): """ Background function to process heavy text translations for WordPress integration. Updates the completed_translations dict when done and automatically charges credits. """ try: logger.info(f"[HF Server] Background processing started for request: {request_id}") start_time = time.time() # Update progress in requests with translation_requests_lock: if request_id in translation_requests: translation_requests[request_id]['progress'] = 10 # Perform actual translation translation, processing_time, chunks_count = translator.translate_text( text, source_lang, target_lang, request_id ) processing_time = time.time() - start_time # Store completed translation with translation_requests_lock: completed_translations[request_id] = { 'translation': translation, 'processing_time': processing_time, 'character_count': len(text), 'source_lang': source_lang, 'target_lang': target_lang, 'completed_at': datetime.now().isoformat(), 'request_id': request_id, 'status': 'completed', 'auto_charged': False # فلگ برای ردیابی کسر خودکار اعتبار } # Remove from processing queue if request_id in translation_requests: del translation_requests[request_id] logger.info(f"[HF Server] Long text translation completed for request: {request_id} in {processing_time:.2f}s") # NEW: اطلاع‌رسانی خودکار به ووردپرس برای کسر اعتبار charge_success = notify_wordpress_completion_and_charge(request_id) if charge_success: # علامت‌گذاری به عنوان کسر شده with translation_requests_lock: if request_id in completed_translations: completed_translations[request_id]['auto_charged'] = True logger.info(f"[HF Server] Automatic charging completed for request: {request_id}") else: logger.warning(f"[HF Server] Automatic charging failed for request: {request_id}") except Exception as e: logger.error(f"[HF Server] Background processing error for {request_id}: {str(e)}") # Mark as failed with translation_requests_lock: completed_translations[request_id] = { 'translation': '', 'error': str(e), 'status': 'failed', 'processing_time': time.time() - start_time if 'start_time' in locals() else 0, 'completed_at': datetime.now().isoformat(), 'request_id': request_id, 'auto_charged': False } # Remove from processing queue if request_id in translation_requests: del translation_requests[request_id] def notify_wordpress_completion_and_charge(request_id: str, wordpress_url: str = None): """ اطلاع‌رسانی به ووردپرس پس از تکمیل ترجمه و کسر خودکار اعتبار """ try: if not wordpress_url: # آدرس ووردپرس باید از متغیر محیطی یا تنظیمات دریافت شود wordpress_url = os.getenv('WORDPRESS_URL', 'https://your-wordpress-site.com') # پیدا کردن اطلاعات ترجمه تکمیل شده with translation_requests_lock: if request_id not in completed_translations: logger.error(f"[AUTO CHARGE] Translation not found in completed cache: {request_id}") return False translation_data = completed_translations[request_id] # ارسال درخواست به ووردپرس برای کسر خودکار اعتبار charge_url = f"{wordpress_url.rstrip('/')}/wp-admin/admin-ajax.php" charge_payload = { 'action': 'amt_auto_charge_completed', 'request_id': request_id, 'character_count': translation_data.get('character_count', 0), 'processing_time': translation_data.get('processing_time', 0), 'translation_length': len(translation_data.get('translation', '')), 'source_lang': translation_data.get('source_lang', ''), 'target_lang': translation_data.get('target_lang', ''), 'completed_at': translation_data.get('completed_at', ''), 'nonce': 'auto_charge_nonce' # باید از ووردپرس دریافت شود } logger.info(f"[AUTO CHARGE] Notifying WordPress for automatic charging: {request_id}") # ارسال درخواست POST به ووردپرس import requests response = requests.post( charge_url, data=charge_payload, timeout=30, headers={ 'Content-Type': 'application/x-www-form-urlencoded', 'User-Agent': 'HuggingFace-Translation-Server/2.1.0' } ) if response.status_code == 200: try: result = response.json() if result.get('success'): logger.info(f"[AUTO CHARGE] WordPress automatic charging successful: {request_id} - Cost: {result.get('cost', 0)}") return True else: logger.error(f"[AUTO CHARGE] WordPress charging failed: {result.get('data', {}).get('message', 'Unknown error')}") return False except: logger.error(f"[AUTO CHARGE] Invalid JSON response from WordPress") return False else: logger.error(f"[AUTO CHARGE] WordPress request failed with status: {response.status_code}") return False except Exception as e: logger.error(f"[AUTO CHARGE] Error notifying WordPress: {str(e)}") return False def perform_translation_internal(text: str, source_lang: str, target_lang: str) -> str: """ Internal translation function - wrapper for translator.translate_text """ try: translation, _, _ = translator.translate_text(text, source_lang, target_lang) return translation except Exception as e: logger.error(f"[INTERNAL] Translation error: {str(e)}") return f"Translation error: {str(e)}" # Language mappings for M2M100 model LANGUAGE_MAP = { "English": "en", "Persian (Farsi)": "fa", "Arabic": "ar", "French": "fr", "German": "de", "Spanish": "es", "Italian": "it", "Portuguese": "pt", "Russian": "ru", "Chinese (Simplified)": "zh", "Japanese": "ja", "Korean": "ko", "Hindi": "hi", "Turkish": "tr", "Dutch": "nl", "Polish": "pl", "Swedish": "sv", "Norwegian": "no", "Danish": "da", "Finnish": "fi", "Greek": "el", "Hebrew": "he", "Thai": "th", "Vietnamese": "vi", "Indonesian": "id", "Malay": "ms", "Czech": "cs", "Slovak": "sk", "Hungarian": "hu", "Romanian": "ro", "Bulgarian": "bg", "Croatian": "hr", "Serbian": "sr", "Slovenian": "sl", "Lithuanian": "lt", "Latvian": "lv", "Estonian": "et", "Ukrainian": "uk", "Belarusian": "be", "Kazakh": "kk", "Uzbek": "uz", "Georgian": "ka", "Armenian": "hy", "Azerbaijani": "az", "Bengali": "bn", "Urdu": "ur", "Tamil": "ta", "Telugu": "te", "Malayalam": "ml", "Kannada": "kn", "Gujarati": "gu", "Punjabi": "pa", "Marathi": "mr", "Nepali": "ne", "Sinhala": "si", "Burmese": "my", "Khmer": "km", "Lao": "lo", "Mongolian": "mn", "Afrikaans": "af", "Amharic": "am", "Yoruba": "yo", "Igbo": "ig", "Hausa": "ha", "Swahili": "sw", "Xhosa": "xh", "Zulu": "zu" } # Initialize translator translator = MultilingualTranslator(60) # Create FastAPI app app = FastAPI(title="Enhanced Multilingual Translation API", version="2.1.0") # Add CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ========== NEW WORDPRESS INTEGRATION ENDPOINTS ========== @app.post("/api/check-completion") async def check_completion(request: Request): """ Endpoint to verify if a translation request has been completed. WordPress calls this to confirm before charging credits. """ try: form_data = await request.form() request_id = form_data.get('request_id', '').strip() if not request_id: return { 'status': 'error', 'message': 'Request ID is required' } logger.info(f"[HF Server] Completion verification requested for: {request_id}") with translation_requests_lock: # Check if request exists in completed translations if request_id in completed_translations: completion_data = completed_translations[request_id] logger.info(f"[HF Server] Completion verification for {request_id}: COMPLETED") return { 'status': 'completed', 'request_id': request_id, 'completed_at': completion_data.get('completed_at'), 'processing_time': completion_data.get('processing_time', 0), 'verified': True } # Check if request is still processing elif request_id in translation_requests: logger.info(f"[HF Server] Completion verification for {request_id}: STILL PROCESSING") return { 'status': 'processing', 'request_id': request_id, 'verified': False } else: logger.info(f"[HF Server] Completion verification for {request_id}: NOT FOUND") return { 'status': 'not_found', 'request_id': request_id, 'message': 'Request ID not found' } except Exception as e: logger.error(f"[HF Server] Error in check_completion: {str(e)}") return { 'status': 'error', 'message': 'Server error occurred' } @app.post("/api/check-translation-status") async def check_translation_status(request: Request): """ Endpoint to get the current status and result of a translation request. Returns translation content if completed. """ try: form_data = await request.form() request_id = form_data.get('request_id', '').strip() if not request_id: return { 'status': 'error', 'message': 'Request ID is required' } logger.info(f"[HF Server] Translation status check for: {request_id}") with translation_requests_lock: # Check if translation is completed if request_id in completed_translations: result = completed_translations[request_id] logger.info(f"[HF Server] Translation status check for {request_id}: COMPLETED - returning translation") return { 'status': 'completed', 'request_id': request_id, 'translation': result.get('translation', ''), 'processing_time': result.get('processing_time', 0), 'character_count': result.get('character_count', 0), 'completed_at': result.get('completed_at'), 'source_lang': result.get('source_lang', ''), 'target_lang': result.get('target_lang', '') } # Check if still processing elif request_id in translation_requests: req_data = translation_requests[request_id] logger.info(f"[HF Server] Translation status check for {request_id}: STILL PROCESSING") return { 'status': 'processing', 'request_id': request_id, 'started_at': req_data.get('started_at'), 'progress': req_data.get('progress', 0) } else: logger.info(f"[HF Server] Translation status check for {request_id}: NOT FOUND") return { 'status': 'not_found', 'request_id': request_id, 'message': 'Translation request not found' } except Exception as e: logger.error(f"[HF Server] Error in check_translation_status: {str(e)}") return { 'status': 'error', 'message': 'Server error occurred' } # ========== UPDATED MAIN TRANSLATION ENDPOINT ========== @app.post("/api/translate/form") async def api_translate_form(request: Request): """ Enhanced translation endpoint that handles both short and long texts. For long texts, returns immediately with request_id for background processing. """ try: form_data = await request.form() text = form_data.get("text", "") source_lang = form_data.get("source_lang", "") target_lang = form_data.get("target_lang", "") api_key = form_data.get("api_key", None) except: try: json_data = await request.json() text = json_data.get("text", "") source_lang = json_data.get("source_lang", "") target_lang = json_data.get("target_lang", "") api_key = json_data.get("api_key", None) except: return {"status": "error", "message": "Invalid request format"} if not text.strip(): logger.error("[FORM API] No text provided") return {"status": "error", "message": "Text, source language, and target language are required"} source_code = LANGUAGE_MAP.get(source_lang) target_code = LANGUAGE_MAP.get(target_lang) if not source_code or not target_code: logger.error(f"[FORM API] Invalid language codes: {source_lang} -> {target_lang}") return {"status": "error", "message": "Invalid language codes"} char_count = len(text) is_heavy_text = char_count > 1000 # Same threshold as WordPress logger.info(f"[FORM API] Translation request: {char_count} chars, {source_lang} → {target_lang}, Heavy: {is_heavy_text}") if is_heavy_text: # Generate request ID for background processing request_id = str(uuid.uuid4()) # First check cache for immediate return cached_result = translator.cache.get(text, source_code, target_code) if cached_result: logger.info(f"[FORM API] Returning cached translation immediately for request: {request_id}") return { "translation": cached_result, "source_language": source_lang, "target_language": target_lang, "processing_time": 0.0, "character_count": char_count, "status": "success", "chunks_processed": None, "request_id": request_id, "cached": True } # Store request for processing with translation_requests_lock: translation_requests[request_id] = { 'text': text, 'source_lang': source_code, 'target_lang': target_code, 'started_at': datetime.now().isoformat(), 'character_count': char_count, 'progress': 0 } # Start background processing thread = threading.Thread( target=process_heavy_translation_background, args=(request_id, text, source_code, target_code) ) thread.daemon = True thread.start() logger.info(f"[FORM API] Started background processing for request: {request_id}") return { 'is_background': True, 'session_id': request_id, 'request_id': request_id, 'status': 'processing', 'message': f'Long text ({char_count} characters) is being processed in background. Use the request ID to check status.', 'character_count': char_count } else: # Process short text immediately try: start_time = time.time() translation, processing_time, chunks_count = translator.translate_text( text, source_code, target_code ) # Check translation content if not translation or not translation.strip() or translation.startswith("Translation error"): logger.error(f"[FORM API] Invalid translation result: {translation[:100] if translation else 'None'}") return { "status": "error", "message": "Translation failed - empty or invalid result" } logger.info(f"[FORM API] Short text translation completed in {processing_time:.2f}s") return { 'status': 'success', 'translation': translation, 'processing_time': processing_time, 'character_count': char_count, 'source_lang': source_lang, 'target_lang': target_lang } except Exception as e: logger.error(f"[FORM API] Translation error: {str(e)}") return {"status": "error", "message": f"Translation failed: {str(e)}"} # ========== EXISTING ENDPOINTS (UPDATED) ========== @app.get("/") async def root(): return { "message": "Enhanced Multilingual Translation API v2.1 with WordPress Integration", "status": "active", "features": [ "enhanced_logging", "progress_tracking", "long_text_support", "smart_chunking", "cache_optimization", "wordpress_integration", "delayed_charging_support" ] } @app.post("/api/translate") async def api_translate(request: TranslationRequest): """API endpoint for translation with enhanced logging and progress tracking""" if not request.text.strip(): raise HTTPException(status_code=400, detail="No text provided") source_code = LANGUAGE_MAP.get(request.source_lang) target_code = LANGUAGE_MAP.get(request.target_lang) if not source_code or not target_code: raise HTTPException(status_code=400, detail="Invalid language codes") try: # Generate session ID for tracking session_id = hashlib.md5(f"{request.text[:100]}{time.time()}".encode()).hexdigest()[:8] translation, processing_time, chunks_count = translator.translate_text( request.text, source_code, target_code, session_id ) return TranslationResponse( translation=translation, source_language=request.source_lang, target_language=request.target_lang, processing_time=processing_time, character_count=len(request.text), status="success", chunks_processed=chunks_count ) except Exception as e: logger.error(f"[API] Translation error: {str(e)}") raise HTTPException(status_code=500, detail=f"Translation error: {str(e)}") @app.get("/api/progress/{session_id}") async def get_translation_progress(session_id: str): """Get translation progress for a session""" progress = translator.get_translation_progress(session_id) if progress is None: raise HTTPException(status_code=404, detail="Session not found or completed") return { "status": "success", "progress": progress } @app.get("/api/languages") async def get_languages(): """Get supported languages""" return { "languages": list(LANGUAGE_MAP.keys()), "language_codes": LANGUAGE_MAP, "status": "success" } @app.get("/api/health") async def health_check(): """Health check endpoint""" with translation_requests_lock: active_requests = len(translation_requests) completed_cache = len(completed_translations) return { "status": "healthy", "device": str(translator.device), "model": translator.model_name, "cache_size": len(translator.cache.cache), "max_chunk_size": translator.max_chunk_size, "active_translations": len(translator.current_translation), "active_requests": active_requests, "completed_cache": completed_cache, "version": "2.1.0" } @app.get("/api/status/{session_id}") async def get_session_status(session_id: str): """Get translation status - non-blocking""" # Check if task is in background tasks if session_id in translator.background_tasks: task = translator.background_tasks[session_id] if task.done(): try: translation, processing_time, chunks_count = await task # Clean up completed task del translator.background_tasks[session_id] return { "status": "completed", "translation": translation, "processing_time": processing_time, "chunks_processed": chunks_count, "message": "Translation completed successfully" } except Exception as e: del translator.background_tasks[session_id] return { "status": "failed", "message": f"Translation failed: {str(e)}" } else: # Task still running - get progress progress = translator.get_translation_progress(session_id) if progress: return { "status": "processing", "progress": progress, "message": f"Processing chunk {progress['completed_chunks']}/{progress['total_chunks']}", "estimated_remaining": progress.get('estimated_remaining', 0) } else: return { "status": "processing", "message": "Translation in progress...", "progress": None } # Check current active translations progress = translator.get_translation_progress(session_id) if progress: return { "status": "processing", "progress": progress, "message": f"Processing chunk {progress['completed_chunks']}/{progress['total_chunks']}", "estimated_remaining": progress.get('estimated_remaining', 0) } return { "status": "not_found", "message": "Session not found or completed" } # اضافه کردن endpoint جدید برای بررسی وضعیت کسر اعتبار @app.post("/api/check-auto-charge-status") async def check_auto_charge_status(request: Request): """ بررسی وضعیت کسر خودکار اعتبار برای درخواست خاص """ try: form_data = await request.form() request_id = form_data.get('request_id', '').strip() if not request_id: return { 'status': 'error', 'message': 'Request ID is required' } with translation_requests_lock: if request_id in completed_translations: translation_data = completed_translations[request_id] return { 'status': 'completed', 'request_id': request_id, 'auto_charged': translation_data.get('auto_charged', False), 'completed_at': translation_data.get('completed_at'), 'processing_time': translation_data.get('processing_time', 0), 'character_count': translation_data.get('character_count', 0) } else: return { 'status': 'not_found', 'request_id': request_id, 'message': 'Translation not found' } except Exception as e: logger.error(f"[HF Server] Error checking auto charge status: {str(e)}") return { 'status': 'error', 'message': 'Server error occurred' } @app.get("/api/server-status") async def get_server_status(): """Get current server status - enhanced for WordPress integration""" active_sessions = [] with translation_requests_lock: background_tasks_count = len(translation_requests) completed_count = len(completed_translations) with translator.translation_lock: for session_id, progress in translator.current_translation.items(): elapsed_time = time.time() - progress['start_time'] if progress['completed_chunks'] > 0: avg_time_per_chunk = elapsed_time / progress['completed_chunks'] remaining_chunks = progress['total_chunks'] - progress['completed_chunks'] estimated_remaining = avg_time_per_chunk * remaining_chunks else: estimated_remaining = None active_sessions.append({ 'session_id': session_id, 'source_lang': progress['source_lang'], 'target_lang': progress['target_lang'], 'total_chunks': progress['total_chunks'], 'completed_chunks': progress['completed_chunks'], 'progress_percentage': (progress['completed_chunks'] / progress['total_chunks']) * 100, 'elapsed_time': elapsed_time, 'estimated_remaining': estimated_remaining }) total_active = len(active_sessions) + background_tasks_count if total_active > 0: if active_sessions: latest_session = active_sessions[-1] message = f"Processing chunk {latest_session['completed_chunks']}/{latest_session['total_chunks']} | {latest_session['source_lang']} → {latest_session['target_lang']}" else: message = f"{background_tasks_count} translation(s) in background queue" return { "has_active_translation": True, "status": "processing", "message": message, "active_sessions": len(active_sessions), "background_tasks": background_tasks_count, "total_active": total_active, "completed_cache": completed_count } else: return { "has_active_translation": False, "status": "idle", "message": "Server is ready for new translations", "active_sessions": 0, "background_tasks": 0, "completed_cache": completed_count } # ========== CLEANUP AND MAINTENANCE FUNCTIONS ========== def cleanup_old_requests(): """ Clean up old completed translations and stuck processing requests. Should be called periodically. """ current_time = datetime.now() with translation_requests_lock: # Clean completed translations older than 2 hours to_remove_completed = [] for req_id, data in completed_translations.items(): try: completed_time = datetime.fromisoformat(data.get('completed_at', '')) if (current_time - completed_time).total_seconds() > 7200: # 2 hours to_remove_completed.append(req_id) except: to_remove_completed.append(req_id) # Remove invalid entries for req_id in to_remove_completed: del completed_translations[req_id] # Clean stuck processing requests older than 1 hour to_remove_processing = [] for req_id, data in translation_requests.items(): try: started_time = datetime.fromisoformat(data.get('started_at', '')) if (current_time - started_time).total_seconds() > 3600: # 1 hour to_remove_processing.append(req_id) except: to_remove_processing.append(req_id) # Remove invalid entries for req_id in to_remove_processing: del translation_requests[req_id] logger.info(f"[HF Server] Cleanup: Removed {len(to_remove_completed)} completed, {len(to_remove_processing)} stuck requests") return len(to_remove_completed), len(to_remove_processing) # Schedule periodic cleanup (runs every hour) def periodic_cleanup(): """Run cleanup every hour""" while True: time.sleep(3600) # 1 hour try: cleanup_old_requests() except Exception as e: logger.error(f"[CLEANUP] Error during periodic cleanup: {e}") # Start cleanup thread cleanup_thread = threading.Thread(target=periodic_cleanup, daemon=True) cleanup_thread.start() # ========== SERVER STARTUP ========== if __name__ == "__main__": logger.info("[HF Server] Starting Enhanced Multilingual Translation API with WordPress Integration") uvicorn.run(app, host="0.0.0.0", port=7860)