TR / app.py
danicor's picture
Update app.py
bf81409 verified
import asyncio
from concurrent.futures import ThreadPoolExecutor
import threading
import torch
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
import time
import json
import hashlib
import re
from datetime import datetime, timedelta
import threading
from queue import Queue
import logging
from typing import Dict, List, Tuple, Optional
from fastapi import FastAPI, HTTPException, Request, Form
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import uvicorn
import uuid
# Enhanced logging configuration
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('translation.log')
]
)
logger = logging.getLogger(__name__)
# Global storage for translation requests (WordPress integration)
translation_requests = {}
completed_translations = {}
translation_requests_lock = threading.Lock()
# Pydantic models for request/response
class TranslationRequest(BaseModel):
text: str
source_lang: str
target_lang: str
api_key: Optional[str] = None
class TranslationResponse(BaseModel):
translation: str
source_language: str
target_language: str
processing_time: float
character_count: int
status: str
chunks_processed: Optional[int] = None
estimated_time_remaining: Optional[float] = None
current_chunk: Optional[int] = None
total_chunks: Optional[int] = None
class TranslationCache:
def __init__(self, cache_duration_minutes: int = 60):
self.cache = {}
self.cache_duration = timedelta(minutes=cache_duration_minutes)
self.lock = threading.Lock()
def _generate_key(self, text: str, source_lang: str, target_lang: str) -> str:
"""Generate cache key from text and languages"""
content = f"{text}_{source_lang}_{target_lang}"
return hashlib.md5(content.encode()).hexdigest()
def get(self, text: str, source_lang: str, target_lang: str) -> str:
"""Get translation from cache if exists and not expired"""
with self.lock:
key = self._generate_key(text, source_lang, target_lang)
if key in self.cache:
translation, timestamp = self.cache[key]
if datetime.now() - timestamp < self.cache_duration:
logger.info(f"[CACHE HIT] Retrieved cached translation for key: {key[:8]}... | Length: {len(translation)} chars")
return translation
else:
# Remove expired entry
del self.cache[key]
logger.info(f"[CACHE EXPIRED] Removed expired cache entry for key: {key[:8]}...")
logger.info(f"[CACHE MISS] No cached translation found for key: {key[:8]}...")
return None
def set(self, text: str, source_lang: str, target_lang: str, translation: str):
"""Store translation in cache"""
with self.lock:
key = self._generate_key(text, source_lang, target_lang)
self.cache[key] = (translation, datetime.now())
logger.info(f"[CACHE STORE] Cached translation for key: {key[:8]}... | Translation length: {len(translation)} chars")
class TranslationQueue:
def __init__(self, max_workers: int = 3):
self.queue = Queue()
self.max_workers = max_workers
self.current_workers = 0
self.lock = threading.Lock()
def add_task(self, task_func, *args, **kwargs):
"""Add translation task to queue"""
self.queue.put((task_func, args, kwargs))
logger.info(f"[QUEUE] Added task to queue | Queue size: {self.queue.qsize()}")
def process_queue(self):
"""Process tasks from queue"""
while not self.queue.empty():
with self.lock:
if self.current_workers >= self.max_workers:
time.sleep(0.1)
continue
if not self.queue.empty():
task_func, args, kwargs = self.queue.get()
self.current_workers += 1
logger.info(f"[QUEUE] Starting worker | Current workers: {self.current_workers}")
def worker():
try:
result = task_func(*args, **kwargs)
return result
finally:
with self.lock:
self.current_workers -= 1
logger.info(f"[QUEUE] Worker finished | Current workers: {self.current_workers}")
thread = threading.Thread(target=worker)
thread.start()
class TextChunker:
"""کلاس برای تقسیم متن طولانی به بخش‌های کوچک‌تر"""
@staticmethod
def split_text_smart(text: str, max_chunk_size: int = 400) -> List[str]:
"""تقسیم هوشمند متن بر اساس جملات و پاراگراف‌ها"""
logger.info(f"[CHUNKER] Starting smart text splitting | Text length: {len(text)} chars | Max chunk size: {max_chunk_size}")
if len(text) <= max_chunk_size:
logger.info(f"[CHUNKER] Text is small, no chunking needed | Length: {len(text)}")
return [text]
chunks = []
# تقسیم بر اساس پاراگراف‌ها
paragraphs = text.split('\n\n')
current_chunk = ""
for i, paragraph in enumerate(paragraphs):
logger.debug(f"[CHUNKER] Processing paragraph {i+1}/{len(paragraphs)} | Length: {len(paragraph)}")
# اگر پاراگراف خودش بزرگ است آن را تقسیم کن
if len(paragraph) > max_chunk_size:
# ذخیره قسمت فعلی اگر وجود دارد
if current_chunk.strip():
chunks.append(current_chunk.strip())
logger.debug(f"[CHUNKER] Added chunk from accumulated paragraphs | Length: {len(current_chunk.strip())}")
current_chunk = ""
# تقسیم پاراگراف بزرگ
sub_chunks = TextChunker._split_paragraph(paragraph, max_chunk_size)
chunks.extend(sub_chunks)
logger.debug(f"[CHUNKER] Split large paragraph into {len(sub_chunks)} sub-chunks")
else:
# بررسی اینکه آیا اضافه کردن این پاراگراف از حد تجاوز می‌کند
if len(current_chunk) + len(paragraph) + 2 > max_chunk_size:
if current_chunk.strip():
chunks.append(current_chunk.strip())
logger.debug(f"[CHUNKER] Added chunk | Length: {len(current_chunk.strip())}")
current_chunk = paragraph
else:
if current_chunk:
current_chunk += "\n\n" + paragraph
else:
current_chunk = paragraph
# اضافه کردن آخرین قسمت
if current_chunk.strip():
chunks.append(current_chunk.strip())
logger.debug(f"[CHUNKER] Added final chunk | Length: {len(current_chunk.strip())}")
logger.info(f"[CHUNKER] Text splitting completed | Total chunks: {len(chunks)} | Average chunk size: {sum(len(c) for c in chunks) / len(chunks):.1f} chars")
return chunks
@staticmethod
def _split_paragraph(paragraph: str, max_chunk_size: int) -> List[str]:
"""تقسیم پاراگراف بزرگ به جملات"""
logger.debug(f"[CHUNKER] Splitting large paragraph | Length: {len(paragraph)}")
# تقسیم بر اساس جملات
sentences = re.split(r'[.!?]+\s+', paragraph)
chunks = []
current_chunk = ""
for sentence in sentences:
if not sentence.strip():
continue
# اضافه کردن علامت نقطه اگر حذف شده
if not sentence.endswith(('.', '!', '?')):
sentence += '.'
if len(sentence) > max_chunk_size:
# جمله خودش خیلی بلند است - تقسیم بر اساس کاما
if current_chunk.strip():
chunks.append(current_chunk.strip())
current_chunk = ""
sub_chunks = TextChunker._split_by_comma(sentence, max_chunk_size)
chunks.extend(sub_chunks)
else:
if len(current_chunk) + len(sentence) + 1 > max_chunk_size:
if current_chunk.strip():
chunks.append(current_chunk.strip())
current_chunk = sentence
else:
if current_chunk:
current_chunk += " " + sentence
else:
current_chunk = sentence
if current_chunk.strip():
chunks.append(current_chunk.strip())
logger.debug(f"[CHUNKER] Paragraph split into {len(chunks)} sentence chunks")
return chunks
@staticmethod
def _split_by_comma(sentence: str, max_chunk_size: int) -> List[str]:
"""تقسیم جمله طولانی بر اساس کاما"""
logger.debug(f"[CHUNKER] Splitting long sentence by comma | Length: {len(sentence)}")
parts = sentence.split(', ')
chunks = []
current_chunk = ""
for part in parts:
if len(part) > max_chunk_size:
# قسمت خودش خیلی بلند است - تقسیم اجباری
if current_chunk.strip():
chunks.append(current_chunk.strip())
current_chunk = ""
# تقسیم اجباری بر اساس طول
while len(part) > max_chunk_size:
chunks.append(part[:max_chunk_size].strip())
part = part[max_chunk_size:].strip()
if part:
current_chunk = part
else:
if len(current_chunk) + len(part) + 2 > max_chunk_size:
if current_chunk.strip():
chunks.append(current_chunk.strip())
current_chunk = part
else:
if current_chunk:
current_chunk += ", " + part
else:
current_chunk = part
if current_chunk.strip():
chunks.append(current_chunk.strip())
return chunks
class MultilingualTranslator:
def __init__(self, cache_duration_minutes: int = 60):
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
logger.info(f"[INIT] Using device: {self.device}")
# Initialize cache and queue
self.cache = TranslationCache(cache_duration_minutes)
self.queue = TranslationQueue()
# Add thread pool for parallel processing
self.executor = ThreadPoolExecutor(max_workers=3)
self.background_tasks = {}
logger.info(f"[INIT] Thread pool initialized with 3 workers")
# Load model - using a powerful multilingual model
self.model_name = "facebook/m2m100_1.2B"
logger.info(f"[INIT] Loading model: {self.model_name}")
try:
self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
self.model = AutoModelForSeq2SeqLM.from_pretrained(self.model_name)
self.model.to(self.device)
logger.info(f"[INIT] Model loaded successfully on {self.device}!")
except Exception as e:
logger.error(f"[INIT] Error loading model: {e}")
raise
# تنظیمات بهینه برای ترجمه متن‌های بلند
self.max_chunk_size = 350 # حداکثر طول هر قسمت
self.min_chunk_overlap = 20 # همپوشانی بین قسمت‌ها
# Track translation progress
self.current_translation = {}
self.translation_lock = threading.Lock()
logger.info(f"[INIT] Translator initialized | Max chunk size: {self.max_chunk_size} chars")
def translate_chunk(self, text: str, source_lang: str, target_lang: str, chunk_index: int = 0, total_chunks: int = 1) -> str:
"""ترجمه یک قسمت کوچک از متن"""
try:
logger.info(f"[TRANSLATE] Starting chunk translation [{chunk_index+1}/{total_chunks}] | {source_lang}{target_lang} | Length: {len(text)} chars")
# Set source language for tokenizer
self.tokenizer.src_lang = source_lang
# Encode input
encoded = self.tokenizer(text, return_tensors="pt", truncation=True, max_length=512).to(self.device)
logger.debug(f"[TRANSLATE] Text encoded | Input tokens: {encoded.input_ids.shape[1]}")
# Generate translation with optimized parameters
start_time = time.time()
generated_tokens = self.model.generate(
**encoded,
forced_bos_token_id=self.tokenizer.get_lang_id(target_lang),
max_length=1024, # افزایش طول خروجی
min_length=10, # حداقل طول خروجی
num_beams=5, # افزایش تعداد beam ها برای کیفیت بهتر
early_stopping=True,
no_repeat_ngram_size=3, # جلوگیری از تکرار
length_penalty=1.0, # تنظیم جریمه طول
repetition_penalty=1.2, # جلوگیری از تکرار کلمات
do_sample=False, # استفاده از روش قطعی
temperature=0.7, # کنترل تنوع
pad_token_id=self.tokenizer.pad_token_id,
eos_token_id=self.tokenizer.eos_token_id
)
generation_time = time.time() - start_time
# Decode result
translation = self.tokenizer.batch_decode(generated_tokens, skip_special_tokens=True)[0]
# پاک‌سازی ترجمه از کاراکترهای اضافی
translation = translation.strip()
logger.info(f"[TRANSLATE] Chunk translation completed [{chunk_index+1}/{total_chunks}] | Generation time: {generation_time:.2f}s | Output length: {len(translation)} chars")
return translation
except Exception as e:
logger.error(f"[TRANSLATE] Chunk translation error [{chunk_index+1}/{total_chunks}]: {e}")
return f"[Translation Error: {str(e)}]"
def translate_text(self, text: str, source_lang: str, target_lang: str, session_id: str = None) -> Tuple[str, float, int]:
"""ترجمه متن با پشتیبانی از متن‌های طولانی و لاگ‌های مفصل"""
start_time = time.time()
if not session_id:
session_id = hashlib.md5(f"{text[:100]}{time.time()}".encode()).hexdigest()[:8]
logger.info(f"[SESSION:{session_id}] Starting translation | {source_lang}{target_lang} | Text length: {len(text)} chars")
# بررسی کش برای کل متن
cached_result = self.cache.get(text, source_lang, target_lang)
if cached_result:
logger.info(f"[SESSION:{session_id}] Translation completed from cache | Time: {time.time() - start_time:.2f}s")
return cached_result, time.time() - start_time, 1
try:
# اگر متن کوتاه است مستقیماً ترجمه کن
if len(text) <= self.max_chunk_size:
logger.info(f"[SESSION:{session_id}] Processing as short text")
translation = self.translate_chunk(text, source_lang, target_lang, 0, 1)
# ذخیره در کش
self.cache.set(text, source_lang, target_lang, translation)
processing_time = time.time() - start_time
logger.info(f"[SESSION:{session_id}] Short text translation completed | Total time: {processing_time:.2f}s")
return translation, processing_time, 1
# تقسیم متن طولانی به قسمت‌های کوچک‌تر
logger.info(f"[SESSION:{session_id}] Processing as long text - starting chunking")
chunks = TextChunker.split_text_smart(text, self.max_chunk_size)
logger.info(f"[SESSION:{session_id}] Text split into {len(chunks)} chunks")
# Initialize progress tracking
with self.translation_lock:
self.current_translation[session_id] = {
'total_chunks': len(chunks),
'completed_chunks': 0,
'start_time': start_time,
'source_lang': source_lang,
'target_lang': target_lang
}
# ترجمه هر قسمت
translated_chunks = []
for i, chunk in enumerate(chunks):
chunk_start_time = time.time()
logger.info(f"[SESSION:{session_id}] Starting chunk {i+1}/{len(chunks)} | Chunk length: {len(chunk)} chars")
# بررسی کش برای هر قسمت
chunk_translation = self.cache.get(chunk, source_lang, target_lang)
if not chunk_translation:
# Estimate remaining time
if i > 0:
elapsed_time = time.time() - start_time
avg_time_per_chunk = elapsed_time / i
estimated_remaining = avg_time_per_chunk * (len(chunks) - i)
logger.info(f"[SESSION:{session_id}] Progress: {i}/{len(chunks)} | Avg time per chunk: {avg_time_per_chunk:.1f}s | Estimated remaining: {estimated_remaining:.1f}s")
chunk_translation = self.translate_chunk(chunk, source_lang, target_lang, i, len(chunks))
# ذخیره قسمت در کش
self.cache.set(chunk, source_lang, target_lang, chunk_translation)
chunk_time = time.time() - chunk_start_time
logger.info(f"[SESSION:{session_id}] Chunk {i+1}/{len(chunks)} translated in {chunk_time:.2f}s")
else:
logger.info(f"[SESSION:{session_id}] Chunk {i+1}/{len(chunks)} retrieved from cache")
translated_chunks.append(chunk_translation)
# Update progress
with self.translation_lock:
if session_id in self.current_translation:
self.current_translation[session_id]['completed_chunks'] = i + 1
# کمی استراحت بین ترجمه‌ها برای جلوگیری از بارذاری زیاد
if i < len(chunks) - 1:
time.sleep(0.1)
# ترکیب قسمت‌های ترجمه شده
logger.info(f"[SESSION:{session_id}] Combining translated chunks")
final_translation = self._combine_translations(translated_chunks, text)
# ذخیره نتیجه نهایی در کش
self.cache.set(text, source_lang, target_lang, final_translation)
processing_time = time.time() - start_time
# Mark as completed for WordPress integration
logger.info(f"[SESSION:{session_id}] Long text translation completed | Total time: {processing_time:.2f}s | Chunks: {len(chunks)} | Final length: {len(final_translation)} chars")
# Store in completed_translations for WordPress to check
with translation_requests_lock:
completed_translations[session_id] = {
'translation': final_translation,
'processing_time': processing_time,
'character_count': len(text),
'source_lang': source_lang,
'target_lang': target_lang,
'completed_at': datetime.now().isoformat(),
'request_id': session_id,
'status': 'completed'
}
# Remove from processing requests if exists
if session_id in translation_requests:
del translation_requests[session_id]
# Clean up progress tracking
with self.translation_lock:
self.current_translation.pop(session_id, None)
return final_translation, processing_time, len(chunks)
except Exception as e:
logger.error(f"[SESSION:{session_id}] Translation error: {e}")
# Clean up progress tracking
with self.translation_lock:
self.current_translation.pop(session_id, None)
return f"Translation error: {str(e)}", time.time() - start_time, 0
def get_translation_progress(self, session_id: str) -> Dict:
"""Get current translation progress"""
with self.translation_lock:
if session_id not in self.current_translation:
return None
progress = self.current_translation[session_id].copy()
elapsed_time = time.time() - progress['start_time']
if progress['completed_chunks'] > 0:
avg_time_per_chunk = elapsed_time / progress['completed_chunks']
remaining_chunks = progress['total_chunks'] - progress['completed_chunks']
estimated_remaining = avg_time_per_chunk * remaining_chunks
else:
estimated_remaining = None
return {
'total_chunks': progress['total_chunks'],
'completed_chunks': progress['completed_chunks'],
'elapsed_time': elapsed_time,
'estimated_remaining': estimated_remaining,
'progress_percentage': (progress['completed_chunks'] / progress['total_chunks']) * 100
}
def _combine_translations(self, translated_chunks: List[str], original_text: str) -> str:
"""ترکیب قسمت‌های ترجمه شده به یک متن یکپارچه"""
if not translated_chunks:
return ""
if len(translated_chunks) == 1:
return translated_chunks[0]
logger.debug(f"[COMBINER] Combining {len(translated_chunks)} translated chunks")
# ترکیب قسمت‌ها با در نظر گیری ساختار اصلی متن
combined = []
for i, chunk in enumerate(translated_chunks):
# پاک‌سازی قسمت
chunk = chunk.strip()
if not chunk:
continue
# اضافه کردن فاصله مناسب بین قسمت‌ها
if i > 0 and combined:
# اگر قسمت قبلی با نقطه تمام نمی‌شود فاصله اضافه کن
if not combined[-1].rstrip().endswith(('.', '!', '?', ':', '۔', '.')):
combined[-1] += '.'
# بررسی اینکه آیا نیاز به پاراگراف جدید دارکم
if '\n\n' in original_text:
combined.append('\n\n' + chunk)
else:
combined.append(' ' + chunk)
else:
combined.append(chunk)
result = ''.join(combined)
# پاک‌سازی نهایی
result = re.sub(r'\s+', ' ', result) # حذف فاصله‌های اضافی
result = re.sub(r'\.+', '.', result) # حذف نقطه‌های تکراری
result = result.strip()
logger.debug(f"[COMBINER] Combined translation length: {len(result)} chars")
return result
async def translate_text_async(self, text: str, source_lang: str, target_lang: str, session_id: str = None):
"""Async wrapper for translate_text"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self.executor,
self.translate_text,
text, source_lang, target_lang, session_id
)
def process_heavy_translation_background(request_id: str, text: str, source_lang: str, target_lang: str):
"""
Background function to process heavy text translations for WordPress integration.
Updates the completed_translations dict when done and automatically charges credits.
"""
try:
logger.info(f"[HF Server] Background processing started for request: {request_id}")
start_time = time.time()
# Update progress in requests
with translation_requests_lock:
if request_id in translation_requests:
translation_requests[request_id]['progress'] = 10
# Perform actual translation
translation, processing_time, chunks_count = translator.translate_text(
text, source_lang, target_lang, request_id
)
processing_time = time.time() - start_time
# Store completed translation
with translation_requests_lock:
completed_translations[request_id] = {
'translation': translation,
'processing_time': processing_time,
'character_count': len(text),
'source_lang': source_lang,
'target_lang': target_lang,
'completed_at': datetime.now().isoformat(),
'request_id': request_id,
'status': 'completed',
'auto_charged': False # فلگ برای ردیابی کسر خودکار اعتبار
}
# Remove from processing queue
if request_id in translation_requests:
del translation_requests[request_id]
logger.info(f"[HF Server] Long text translation completed for request: {request_id} in {processing_time:.2f}s")
# NEW: اطلاع‌رسانی خودکار به ووردپرس برای کسر اعتبار
charge_success = notify_wordpress_completion_and_charge(request_id)
if charge_success:
# علامت‌گذاری به عنوان کسر شده
with translation_requests_lock:
if request_id in completed_translations:
completed_translations[request_id]['auto_charged'] = True
logger.info(f"[HF Server] Automatic charging completed for request: {request_id}")
else:
logger.warning(f"[HF Server] Automatic charging failed for request: {request_id}")
except Exception as e:
logger.error(f"[HF Server] Background processing error for {request_id}: {str(e)}")
# Mark as failed
with translation_requests_lock:
completed_translations[request_id] = {
'translation': '',
'error': str(e),
'status': 'failed',
'processing_time': time.time() - start_time if 'start_time' in locals() else 0,
'completed_at': datetime.now().isoformat(),
'request_id': request_id,
'auto_charged': False
}
# Remove from processing queue
if request_id in translation_requests:
del translation_requests[request_id]
def notify_wordpress_completion_and_charge(request_id: str, wordpress_url: str = None):
"""
اطلاع‌رسانی به ووردپرس پس از تکمیل ترجمه و کسر خودکار اعتبار
"""
try:
if not wordpress_url:
# آدرس ووردپرس باید از متغیر محیطی یا تنظیمات دریافت شود
wordpress_url = os.getenv('WORDPRESS_URL', 'https://your-wordpress-site.com')
# پیدا کردن اطلاعات ترجمه تکمیل شده
with translation_requests_lock:
if request_id not in completed_translations:
logger.error(f"[AUTO CHARGE] Translation not found in completed cache: {request_id}")
return False
translation_data = completed_translations[request_id]
# ارسال درخواست به ووردپرس برای کسر خودکار اعتبار
charge_url = f"{wordpress_url.rstrip('/')}/wp-admin/admin-ajax.php"
charge_payload = {
'action': 'amt_auto_charge_completed',
'request_id': request_id,
'character_count': translation_data.get('character_count', 0),
'processing_time': translation_data.get('processing_time', 0),
'translation_length': len(translation_data.get('translation', '')),
'source_lang': translation_data.get('source_lang', ''),
'target_lang': translation_data.get('target_lang', ''),
'completed_at': translation_data.get('completed_at', ''),
'nonce': 'auto_charge_nonce' # باید از ووردپرس دریافت شود
}
logger.info(f"[AUTO CHARGE] Notifying WordPress for automatic charging: {request_id}")
# ارسال درخواست POST به ووردپرس
import requests
response = requests.post(
charge_url,
data=charge_payload,
timeout=30,
headers={
'Content-Type': 'application/x-www-form-urlencoded',
'User-Agent': 'HuggingFace-Translation-Server/2.1.0'
}
)
if response.status_code == 200:
try:
result = response.json()
if result.get('success'):
logger.info(f"[AUTO CHARGE] WordPress automatic charging successful: {request_id} - Cost: {result.get('cost', 0)}")
return True
else:
logger.error(f"[AUTO CHARGE] WordPress charging failed: {result.get('data', {}).get('message', 'Unknown error')}")
return False
except:
logger.error(f"[AUTO CHARGE] Invalid JSON response from WordPress")
return False
else:
logger.error(f"[AUTO CHARGE] WordPress request failed with status: {response.status_code}")
return False
except Exception as e:
logger.error(f"[AUTO CHARGE] Error notifying WordPress: {str(e)}")
return False
def perform_translation_internal(text: str, source_lang: str, target_lang: str) -> str:
"""
Internal translation function - wrapper for translator.translate_text
"""
try:
translation, _, _ = translator.translate_text(text, source_lang, target_lang)
return translation
except Exception as e:
logger.error(f"[INTERNAL] Translation error: {str(e)}")
return f"Translation error: {str(e)}"
# Language mappings for M2M100 model
LANGUAGE_MAP = {
"English": "en",
"Persian (Farsi)": "fa",
"Arabic": "ar",
"French": "fr",
"German": "de",
"Spanish": "es",
"Italian": "it",
"Portuguese": "pt",
"Russian": "ru",
"Chinese (Simplified)": "zh",
"Japanese": "ja",
"Korean": "ko",
"Hindi": "hi",
"Turkish": "tr",
"Dutch": "nl",
"Polish": "pl",
"Swedish": "sv",
"Norwegian": "no",
"Danish": "da",
"Finnish": "fi",
"Greek": "el",
"Hebrew": "he",
"Thai": "th",
"Vietnamese": "vi",
"Indonesian": "id",
"Malay": "ms",
"Czech": "cs",
"Slovak": "sk",
"Hungarian": "hu",
"Romanian": "ro",
"Bulgarian": "bg",
"Croatian": "hr",
"Serbian": "sr",
"Slovenian": "sl",
"Lithuanian": "lt",
"Latvian": "lv",
"Estonian": "et",
"Ukrainian": "uk",
"Belarusian": "be",
"Kazakh": "kk",
"Uzbek": "uz",
"Georgian": "ka",
"Armenian": "hy",
"Azerbaijani": "az",
"Bengali": "bn",
"Urdu": "ur",
"Tamil": "ta",
"Telugu": "te",
"Malayalam": "ml",
"Kannada": "kn",
"Gujarati": "gu",
"Punjabi": "pa",
"Marathi": "mr",
"Nepali": "ne",
"Sinhala": "si",
"Burmese": "my",
"Khmer": "km",
"Lao": "lo",
"Mongolian": "mn",
"Afrikaans": "af",
"Amharic": "am",
"Yoruba": "yo",
"Igbo": "ig",
"Hausa": "ha",
"Swahili": "sw",
"Xhosa": "xh",
"Zulu": "zu"
}
# Initialize translator
translator = MultilingualTranslator(60)
# Create FastAPI app
app = FastAPI(title="Enhanced Multilingual Translation API", version="2.1.0")
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ========== NEW WORDPRESS INTEGRATION ENDPOINTS ==========
@app.post("/api/check-completion")
async def check_completion(request: Request):
"""
Endpoint to verify if a translation request has been completed.
WordPress calls this to confirm before charging credits.
"""
try:
form_data = await request.form()
request_id = form_data.get('request_id', '').strip()
if not request_id:
return {
'status': 'error',
'message': 'Request ID is required'
}
logger.info(f"[HF Server] Completion verification requested for: {request_id}")
with translation_requests_lock:
# Check if request exists in completed translations
if request_id in completed_translations:
completion_data = completed_translations[request_id]
logger.info(f"[HF Server] Completion verification for {request_id}: COMPLETED")
return {
'status': 'completed',
'request_id': request_id,
'completed_at': completion_data.get('completed_at'),
'processing_time': completion_data.get('processing_time', 0),
'verified': True
}
# Check if request is still processing
elif request_id in translation_requests:
logger.info(f"[HF Server] Completion verification for {request_id}: STILL PROCESSING")
return {
'status': 'processing',
'request_id': request_id,
'verified': False
}
else:
logger.info(f"[HF Server] Completion verification for {request_id}: NOT FOUND")
return {
'status': 'not_found',
'request_id': request_id,
'message': 'Request ID not found'
}
except Exception as e:
logger.error(f"[HF Server] Error in check_completion: {str(e)}")
return {
'status': 'error',
'message': 'Server error occurred'
}
@app.post("/api/check-translation-status")
async def check_translation_status(request: Request):
"""
Endpoint to get the current status and result of a translation request.
Returns translation content if completed.
"""
try:
form_data = await request.form()
request_id = form_data.get('request_id', '').strip()
if not request_id:
return {
'status': 'error',
'message': 'Request ID is required'
}
logger.info(f"[HF Server] Translation status check for: {request_id}")
with translation_requests_lock:
# Check if translation is completed
if request_id in completed_translations:
result = completed_translations[request_id]
logger.info(f"[HF Server] Translation status check for {request_id}: COMPLETED - returning translation")
return {
'status': 'completed',
'request_id': request_id,
'translation': result.get('translation', ''),
'processing_time': result.get('processing_time', 0),
'character_count': result.get('character_count', 0),
'completed_at': result.get('completed_at'),
'source_lang': result.get('source_lang', ''),
'target_lang': result.get('target_lang', '')
}
# Check if still processing
elif request_id in translation_requests:
req_data = translation_requests[request_id]
logger.info(f"[HF Server] Translation status check for {request_id}: STILL PROCESSING")
return {
'status': 'processing',
'request_id': request_id,
'started_at': req_data.get('started_at'),
'progress': req_data.get('progress', 0)
}
else:
logger.info(f"[HF Server] Translation status check for {request_id}: NOT FOUND")
return {
'status': 'not_found',
'request_id': request_id,
'message': 'Translation request not found'
}
except Exception as e:
logger.error(f"[HF Server] Error in check_translation_status: {str(e)}")
return {
'status': 'error',
'message': 'Server error occurred'
}
# ========== UPDATED MAIN TRANSLATION ENDPOINT ==========
@app.post("/api/translate/form")
async def api_translate_form(request: Request):
"""
Enhanced translation endpoint that handles both short and long texts.
For long texts, returns immediately with request_id for background processing.
"""
try:
form_data = await request.form()
text = form_data.get("text", "")
source_lang = form_data.get("source_lang", "")
target_lang = form_data.get("target_lang", "")
api_key = form_data.get("api_key", None)
except:
try:
json_data = await request.json()
text = json_data.get("text", "")
source_lang = json_data.get("source_lang", "")
target_lang = json_data.get("target_lang", "")
api_key = json_data.get("api_key", None)
except:
return {"status": "error", "message": "Invalid request format"}
if not text.strip():
logger.error("[FORM API] No text provided")
return {"status": "error", "message": "Text, source language, and target language are required"}
source_code = LANGUAGE_MAP.get(source_lang)
target_code = LANGUAGE_MAP.get(target_lang)
if not source_code or not target_code:
logger.error(f"[FORM API] Invalid language codes: {source_lang} -> {target_lang}")
return {"status": "error", "message": "Invalid language codes"}
char_count = len(text)
is_heavy_text = char_count > 1000 # Same threshold as WordPress
logger.info(f"[FORM API] Translation request: {char_count} chars, {source_lang}{target_lang}, Heavy: {is_heavy_text}")
if is_heavy_text:
# Generate request ID for background processing
request_id = str(uuid.uuid4())
# First check cache for immediate return
cached_result = translator.cache.get(text, source_code, target_code)
if cached_result:
logger.info(f"[FORM API] Returning cached translation immediately for request: {request_id}")
return {
"translation": cached_result,
"source_language": source_lang,
"target_language": target_lang,
"processing_time": 0.0,
"character_count": char_count,
"status": "success",
"chunks_processed": None,
"request_id": request_id,
"cached": True
}
# Store request for processing
with translation_requests_lock:
translation_requests[request_id] = {
'text': text,
'source_lang': source_code,
'target_lang': target_code,
'started_at': datetime.now().isoformat(),
'character_count': char_count,
'progress': 0
}
# Start background processing
thread = threading.Thread(
target=process_heavy_translation_background,
args=(request_id, text, source_code, target_code)
)
thread.daemon = True
thread.start()
logger.info(f"[FORM API] Started background processing for request: {request_id}")
return {
'is_background': True,
'session_id': request_id,
'request_id': request_id,
'status': 'processing',
'message': f'Long text ({char_count} characters) is being processed in background. Use the request ID to check status.',
'character_count': char_count
}
else:
# Process short text immediately
try:
start_time = time.time()
translation, processing_time, chunks_count = translator.translate_text(
text, source_code, target_code
)
# Check translation content
if not translation or not translation.strip() or translation.startswith("Translation error"):
logger.error(f"[FORM API] Invalid translation result: {translation[:100] if translation else 'None'}")
return {
"status": "error",
"message": "Translation failed - empty or invalid result"
}
logger.info(f"[FORM API] Short text translation completed in {processing_time:.2f}s")
return {
'status': 'success',
'translation': translation,
'processing_time': processing_time,
'character_count': char_count,
'source_lang': source_lang,
'target_lang': target_lang
}
except Exception as e:
logger.error(f"[FORM API] Translation error: {str(e)}")
return {"status": "error", "message": f"Translation failed: {str(e)}"}
# ========== EXISTING ENDPOINTS (UPDATED) ==========
@app.get("/")
async def root():
return {
"message": "Enhanced Multilingual Translation API v2.1 with WordPress Integration",
"status": "active",
"features": [
"enhanced_logging",
"progress_tracking",
"long_text_support",
"smart_chunking",
"cache_optimization",
"wordpress_integration",
"delayed_charging_support"
]
}
@app.post("/api/translate")
async def api_translate(request: TranslationRequest):
"""API endpoint for translation with enhanced logging and progress tracking"""
if not request.text.strip():
raise HTTPException(status_code=400, detail="No text provided")
source_code = LANGUAGE_MAP.get(request.source_lang)
target_code = LANGUAGE_MAP.get(request.target_lang)
if not source_code or not target_code:
raise HTTPException(status_code=400, detail="Invalid language codes")
try:
# Generate session ID for tracking
session_id = hashlib.md5(f"{request.text[:100]}{time.time()}".encode()).hexdigest()[:8]
translation, processing_time, chunks_count = translator.translate_text(
request.text, source_code, target_code, session_id
)
return TranslationResponse(
translation=translation,
source_language=request.source_lang,
target_language=request.target_lang,
processing_time=processing_time,
character_count=len(request.text),
status="success",
chunks_processed=chunks_count
)
except Exception as e:
logger.error(f"[API] Translation error: {str(e)}")
raise HTTPException(status_code=500, detail=f"Translation error: {str(e)}")
@app.get("/api/progress/{session_id}")
async def get_translation_progress(session_id: str):
"""Get translation progress for a session"""
progress = translator.get_translation_progress(session_id)
if progress is None:
raise HTTPException(status_code=404, detail="Session not found or completed")
return {
"status": "success",
"progress": progress
}
@app.get("/api/languages")
async def get_languages():
"""Get supported languages"""
return {
"languages": list(LANGUAGE_MAP.keys()),
"language_codes": LANGUAGE_MAP,
"status": "success"
}
@app.get("/api/health")
async def health_check():
"""Health check endpoint"""
with translation_requests_lock:
active_requests = len(translation_requests)
completed_cache = len(completed_translations)
return {
"status": "healthy",
"device": str(translator.device),
"model": translator.model_name,
"cache_size": len(translator.cache.cache),
"max_chunk_size": translator.max_chunk_size,
"active_translations": len(translator.current_translation),
"active_requests": active_requests,
"completed_cache": completed_cache,
"version": "2.1.0"
}
@app.get("/api/status/{session_id}")
async def get_session_status(session_id: str):
"""Get translation status - non-blocking"""
# Check if task is in background tasks
if session_id in translator.background_tasks:
task = translator.background_tasks[session_id]
if task.done():
try:
translation, processing_time, chunks_count = await task
# Clean up completed task
del translator.background_tasks[session_id]
return {
"status": "completed",
"translation": translation,
"processing_time": processing_time,
"chunks_processed": chunks_count,
"message": "Translation completed successfully"
}
except Exception as e:
del translator.background_tasks[session_id]
return {
"status": "failed",
"message": f"Translation failed: {str(e)}"
}
else:
# Task still running - get progress
progress = translator.get_translation_progress(session_id)
if progress:
return {
"status": "processing",
"progress": progress,
"message": f"Processing chunk {progress['completed_chunks']}/{progress['total_chunks']}",
"estimated_remaining": progress.get('estimated_remaining', 0)
}
else:
return {
"status": "processing",
"message": "Translation in progress...",
"progress": None
}
# Check current active translations
progress = translator.get_translation_progress(session_id)
if progress:
return {
"status": "processing",
"progress": progress,
"message": f"Processing chunk {progress['completed_chunks']}/{progress['total_chunks']}",
"estimated_remaining": progress.get('estimated_remaining', 0)
}
return {
"status": "not_found",
"message": "Session not found or completed"
}
# اضافه کردن endpoint جدید برای بررسی وضعیت کسر اعتبار
@app.post("/api/check-auto-charge-status")
async def check_auto_charge_status(request: Request):
"""
بررسی وضعیت کسر خودکار اعتبار برای درخواست خاص
"""
try:
form_data = await request.form()
request_id = form_data.get('request_id', '').strip()
if not request_id:
return {
'status': 'error',
'message': 'Request ID is required'
}
with translation_requests_lock:
if request_id in completed_translations:
translation_data = completed_translations[request_id]
return {
'status': 'completed',
'request_id': request_id,
'auto_charged': translation_data.get('auto_charged', False),
'completed_at': translation_data.get('completed_at'),
'processing_time': translation_data.get('processing_time', 0),
'character_count': translation_data.get('character_count', 0)
}
else:
return {
'status': 'not_found',
'request_id': request_id,
'message': 'Translation not found'
}
except Exception as e:
logger.error(f"[HF Server] Error checking auto charge status: {str(e)}")
return {
'status': 'error',
'message': 'Server error occurred'
}
@app.get("/api/server-status")
async def get_server_status():
"""Get current server status - enhanced for WordPress integration"""
active_sessions = []
with translation_requests_lock:
background_tasks_count = len(translation_requests)
completed_count = len(completed_translations)
with translator.translation_lock:
for session_id, progress in translator.current_translation.items():
elapsed_time = time.time() - progress['start_time']
if progress['completed_chunks'] > 0:
avg_time_per_chunk = elapsed_time / progress['completed_chunks']
remaining_chunks = progress['total_chunks'] - progress['completed_chunks']
estimated_remaining = avg_time_per_chunk * remaining_chunks
else:
estimated_remaining = None
active_sessions.append({
'session_id': session_id,
'source_lang': progress['source_lang'],
'target_lang': progress['target_lang'],
'total_chunks': progress['total_chunks'],
'completed_chunks': progress['completed_chunks'],
'progress_percentage': (progress['completed_chunks'] / progress['total_chunks']) * 100,
'elapsed_time': elapsed_time,
'estimated_remaining': estimated_remaining
})
total_active = len(active_sessions) + background_tasks_count
if total_active > 0:
if active_sessions:
latest_session = active_sessions[-1]
message = f"Processing chunk {latest_session['completed_chunks']}/{latest_session['total_chunks']} | {latest_session['source_lang']}{latest_session['target_lang']}"
else:
message = f"{background_tasks_count} translation(s) in background queue"
return {
"has_active_translation": True,
"status": "processing",
"message": message,
"active_sessions": len(active_sessions),
"background_tasks": background_tasks_count,
"total_active": total_active,
"completed_cache": completed_count
}
else:
return {
"has_active_translation": False,
"status": "idle",
"message": "Server is ready for new translations",
"active_sessions": 0,
"background_tasks": 0,
"completed_cache": completed_count
}
# ========== CLEANUP AND MAINTENANCE FUNCTIONS ==========
def cleanup_old_requests():
"""
Clean up old completed translations and stuck processing requests.
Should be called periodically.
"""
current_time = datetime.now()
with translation_requests_lock:
# Clean completed translations older than 2 hours
to_remove_completed = []
for req_id, data in completed_translations.items():
try:
completed_time = datetime.fromisoformat(data.get('completed_at', ''))
if (current_time - completed_time).total_seconds() > 7200: # 2 hours
to_remove_completed.append(req_id)
except:
to_remove_completed.append(req_id) # Remove invalid entries
for req_id in to_remove_completed:
del completed_translations[req_id]
# Clean stuck processing requests older than 1 hour
to_remove_processing = []
for req_id, data in translation_requests.items():
try:
started_time = datetime.fromisoformat(data.get('started_at', ''))
if (current_time - started_time).total_seconds() > 3600: # 1 hour
to_remove_processing.append(req_id)
except:
to_remove_processing.append(req_id) # Remove invalid entries
for req_id in to_remove_processing:
del translation_requests[req_id]
logger.info(f"[HF Server] Cleanup: Removed {len(to_remove_completed)} completed, {len(to_remove_processing)} stuck requests")
return len(to_remove_completed), len(to_remove_processing)
# Schedule periodic cleanup (runs every hour)
def periodic_cleanup():
"""Run cleanup every hour"""
while True:
time.sleep(3600) # 1 hour
try:
cleanup_old_requests()
except Exception as e:
logger.error(f"[CLEANUP] Error during periodic cleanup: {e}")
# Start cleanup thread
cleanup_thread = threading.Thread(target=periodic_cleanup, daemon=True)
cleanup_thread.start()
# ========== SERVER STARTUP ==========
if __name__ == "__main__":
logger.info("[HF Server] Starting Enhanced Multilingual Translation API with WordPress Integration")
uvicorn.run(app, host="0.0.0.0", port=7860)