Penny_V2.2 / models /translation /translation_utils.py
pythonprincess's picture
Upload translation_utils.py
f583ae5 verified
# models/translation/translation_utils.py
"""
Translation Model Utilities for PENNY Project
Handles multilingual translation using NLLB-200 for civic engagement accessibility.
Provides async translation with structured error handling and language code normalization.
"""
import asyncio
import time
from typing import Dict, Any, Optional, List
# --- Logging Imports ---
from app.logging_utils import log_interaction, sanitize_for_logging
# --- Model Loader Import ---
try:
from app.model_loader import load_model_pipeline
MODEL_LOADER_AVAILABLE = True
except ImportError:
MODEL_LOADER_AVAILABLE = False
import logging
logging.getLogger(__name__).warning("Could not import load_model_pipeline. Translation service unavailable.")
# Global variable to store the loaded pipeline for re-use
TRANSLATION_PIPELINE: Optional[Any] = None
AGENT_NAME = "penny-translate-agent"
INITIALIZATION_ATTEMPTED = False
# NLLB-200 Language Code Mapping (Common languages for civic engagement)
LANGUAGE_CODES = {
# English variants
"english": "eng_Latn",
"en": "eng_Latn",
# Spanish variants
"spanish": "spa_Latn",
"es": "spa_Latn",
"español": "spa_Latn",
# French
"french": "fra_Latn",
"fr": "fra_Latn",
"français": "fra_Latn",
# Mandarin Chinese
"chinese": "zho_Hans",
"mandarin": "zho_Hans",
"zh": "zho_Hans",
# Arabic
"arabic": "arb_Arab",
"ar": "arb_Arab",
# Hindi
"hindi": "hin_Deva",
"hi": "hin_Deva",
# Portuguese
"portuguese": "por_Latn",
"pt": "por_Latn",
# Russian
"russian": "rus_Cyrl",
"ru": "rus_Cyrl",
# German
"german": "deu_Latn",
"de": "deu_Latn",
# Vietnamese
"vietnamese": "vie_Latn",
"vi": "vie_Latn",
# Tagalog
"tagalog": "tgl_Latn",
"tl": "tgl_Latn",
# Urdu
"urdu": "urd_Arab",
"ur": "urd_Arab",
# Swahili
"swahili": "swh_Latn",
"sw": "swh_Latn",
}
# Pre-translated civic phrases for common queries
CIVIC_PHRASES = {
"eng_Latn": {
"voting_location": "Where is my polling place?",
"voter_registration": "How do I register to vote?",
"city_services": "What city services are available?",
"report_issue": "I want to report a problem.",
"contact_city": "How do I contact city hall?",
},
"spa_Latn": {
"voting_location": "¿Dónde está mi lugar de votación?",
"voter_registration": "¿Cómo me registro para votar?",
"city_services": "¿Qué servicios de la ciudad están disponibles?",
"report_issue": "Quiero reportar un problema.",
"contact_city": "¿Cómo contacto al ayuntamiento?",
}
}
def _initialize_translation_pipeline() -> bool:
"""
Initializes the translation pipeline only once.
Returns:
bool: True if initialization succeeded, False otherwise.
"""
global TRANSLATION_PIPELINE, INITIALIZATION_ATTEMPTED
if INITIALIZATION_ATTEMPTED:
return TRANSLATION_PIPELINE is not None
INITIALIZATION_ATTEMPTED = True
if not MODEL_LOADER_AVAILABLE:
log_interaction(
intent="translation_initialization",
success=False,
error="model_loader unavailable"
)
return False
try:
log_interaction(
intent="translation_initialization",
success=None,
details=f"Loading {AGENT_NAME}"
)
TRANSLATION_PIPELINE = load_model_pipeline(AGENT_NAME)
if TRANSLATION_PIPELINE is None:
log_interaction(
intent="translation_initialization",
success=False,
error="Pipeline returned None"
)
return False
log_interaction(
intent="translation_initialization",
success=True,
details=f"Model {AGENT_NAME} loaded successfully"
)
return True
except Exception as e:
log_interaction(
intent="translation_initialization",
success=False,
error=str(e)
)
return False
# Attempt initialization at module load
_initialize_translation_pipeline()
def is_translation_available() -> bool:
"""
Check if translation service is available.
Returns:
bool: True if translation pipeline is loaded and ready.
"""
return TRANSLATION_PIPELINE is not None
def normalize_language_code(lang: str) -> str:
"""
Converts common language names/codes to NLLB-200 format.
Args:
lang: Language name or code (e.g., "spanish", "es", "español")
Returns:
NLLB-200 language code (e.g., "spa_Latn")
"""
if not lang or not isinstance(lang, str):
return "eng_Latn" # Default to English
lang_lower = lang.lower().strip()
# Check if it's already in NLLB format (contains underscore)
if "_" in lang_lower:
return lang_lower
# Look up in mapping
return LANGUAGE_CODES.get(lang_lower, lang_lower)
def get_supported_languages() -> List[str]:
"""
Get list of supported language codes.
Returns:
List of NLLB-200 language codes supported by PENNY.
"""
return list(set(LANGUAGE_CODES.values()))
async def translate_text(
text: str,
source_language: str = "eng_Latn",
target_language: str = "spa_Latn",
tenant_id: Optional[str] = None
) -> Dict[str, Any]:
"""
Translates text from source language to target language using NLLB-200.
Args:
text: The text to translate.
source_language: Source language code (e.g., "eng_Latn", "spanish", "es")
target_language: Target language code (e.g., "spa_Latn", "french", "fr")
tenant_id: Optional tenant identifier for logging.
Returns:
A dictionary containing:
- translated_text (str): The translated text
- source_lang (str): Normalized source language code
- target_lang (str): Normalized target language code
- original_text (str): The input text
- available (bool): Whether the service was available
- error (str, optional): Error message if translation failed
- response_time_ms (int, optional): Translation time in milliseconds
"""
start_time = time.time()
global TRANSLATION_PIPELINE
# Check availability
if not is_translation_available():
log_interaction(
intent="translation",
tenant_id=tenant_id,
success=False,
error="Translation pipeline not available",
fallback_used=True
)
return {
"translated_text": text, # Return original text as fallback
"source_lang": source_language,
"target_lang": target_language,
"original_text": text,
"available": False,
"error": "Translation service is temporarily unavailable."
}
# Validate input
if not text or not isinstance(text, str):
log_interaction(
intent="translation",
tenant_id=tenant_id,
success=False,
error="Invalid text input"
)
return {
"translated_text": "",
"source_lang": source_language,
"target_lang": target_language,
"original_text": text if isinstance(text, str) else "",
"available": True,
"error": "Invalid text input provided."
}
# Check text length (prevent processing extremely long texts)
if len(text) > 5000: # 5k character limit for translation
log_interaction(
intent="translation",
tenant_id=tenant_id,
success=False,
error=f"Text too long: {len(text)} characters",
text_preview=sanitize_for_logging(text[:100])
)
return {
"translated_text": text,
"source_lang": source_language,
"target_lang": target_language,
"original_text": text,
"available": True,
"error": "Text is too long for translation (max 5,000 characters)."
}
# Normalize language codes
src_lang = normalize_language_code(source_language)
tgt_lang = normalize_language_code(target_language)
# Skip translation if source and target are the same
if src_lang == tgt_lang:
log_interaction(
intent="translation_skipped",
tenant_id=tenant_id,
success=True,
details="Source and target languages are identical"
)
return {
"translated_text": text,
"source_lang": src_lang,
"target_lang": tgt_lang,
"original_text": text,
"available": True,
"skipped": True
}
try:
loop = asyncio.get_event_loop()
# Run model inference in thread executor
# load_model_pipeline returns a wrapper that calls client.predict()
# predict() returns: {"translation": "...", "source_lang": "...", "target_lang": "...", "success": True}
result_dict = await loop.run_in_executor(
None,
lambda: TRANSLATION_PIPELINE(
text,
source_lang=src_lang,
target_lang=tgt_lang
)
)
response_time_ms = int((time.time() - start_time) * 1000)
# Validate results - check if predict() returned an error
if not result_dict or not isinstance(result_dict, dict):
log_interaction(
intent="translation",
tenant_id=tenant_id,
success=False,
error="Empty or invalid model output",
response_time_ms=response_time_ms,
source_lang=src_lang,
target_lang=tgt_lang
)
return {
"translated_text": text, # Fallback to original
"source_lang": src_lang,
"target_lang": tgt_lang,
"original_text": text,
"available": True,
"error": "Translation returned unexpected format."
}
# Check for error in result
if not result_dict.get("success", False) or "error" in result_dict:
error_msg = result_dict.get("error", "Translation failed")
log_interaction(
intent="translation",
tenant_id=tenant_id,
success=False,
error=error_msg,
response_time_ms=response_time_ms,
source_lang=src_lang,
target_lang=tgt_lang
)
return {
"translated_text": text, # Fallback to original
"source_lang": src_lang,
"target_lang": tgt_lang,
"original_text": text,
"available": False,
"error": error_msg
}
# Extract translation from predict() result format
# predict() returns: {"translation": "...", "source_lang": "...", "target_lang": "...", "success": True}
translated = result_dict.get('translation', '').strip()
if not translated:
log_interaction(
intent="translation",
tenant_id=tenant_id,
success=False,
error="Empty translation result",
response_time_ms=response_time_ms,
source_lang=src_lang,
target_lang=tgt_lang
)
return {
"translated_text": text, # Fallback to original
"source_lang": src_lang,
"target_lang": tgt_lang,
"original_text": text,
"available": True,
"error": "Translation produced empty result."
}
# Log slow translations
if response_time_ms > 5000: # 5 seconds
log_interaction(
intent="translation_slow",
tenant_id=tenant_id,
success=True,
response_time_ms=response_time_ms,
details="Slow translation detected",
source_lang=src_lang,
target_lang=tgt_lang,
text_length=len(text)
)
log_interaction(
intent="translation",
tenant_id=tenant_id,
success=True,
response_time_ms=response_time_ms,
source_lang=src_lang,
target_lang=tgt_lang,
text_length=len(text)
)
return {
"translated_text": translated,
"source_lang": src_lang,
"target_lang": tgt_lang,
"original_text": text,
"available": True,
"response_time_ms": response_time_ms
}
except asyncio.CancelledError:
log_interaction(
intent="translation",
tenant_id=tenant_id,
success=False,
error="Translation cancelled",
source_lang=src_lang,
target_lang=tgt_lang
)
raise
except Exception as e:
response_time_ms = int((time.time() - start_time) * 1000)
log_interaction(
intent="translation",
tenant_id=tenant_id,
success=False,
error=str(e),
response_time_ms=response_time_ms,
source_lang=src_lang,
target_lang=tgt_lang,
text_preview=sanitize_for_logging(text[:100]),
fallback_used=True
)
return {
"translated_text": text, # Fallback to original
"source_lang": src_lang,
"target_lang": tgt_lang,
"original_text": text,
"available": False,
"error": str(e),
"response_time_ms": response_time_ms
}
async def detect_and_translate(
text: str,
target_language: str = "eng_Latn",
tenant_id: Optional[str] = None
) -> Dict[str, Any]:
"""
Attempts to detect the source language and translate to target.
Note: This is a simplified heuristic-based detection. For production,
consider integrating a dedicated language detection model.
Args:
text: The text to translate
target_language: Target language code
tenant_id: Optional tenant identifier for logging
Returns:
Translation result dictionary
"""
if not text or not isinstance(text, str):
return {
"translated_text": "",
"detected_lang": "unknown",
"target_lang": target_language,
"original_text": text if isinstance(text, str) else "",
"available": True,
"error": "Invalid text input."
}
# Simple heuristic: check for common non-English characters
detected_lang = "eng_Latn" # Default assumption
# Check for Spanish characters
if any(char in text for char in ['¿', '¡', 'ñ', 'á', 'é', 'í', 'ó', 'ú']):
detected_lang = "spa_Latn"
# Check for Chinese characters
elif any('\u4e00' <= char <= '\u9fff' for char in text):
detected_lang = "zho_Hans"
# Check for Arabic script
elif any('\u0600' <= char <= '\u06ff' for char in text):
detected_lang = "arb_Arab"
# Check for Cyrillic (Russian)
elif any('\u0400' <= char <= '\u04ff' for char in text):
detected_lang = "rus_Cyrl"
# Check for Devanagari (Hindi)
elif any('\u0900' <= char <= '\u097f' for char in text):
detected_lang = "hin_Deva"
log_interaction(
intent="language_detection",
tenant_id=tenant_id,
success=True,
detected_lang=detected_lang,
text_preview=sanitize_for_logging(text[:50])
)
result = await translate_text(text, detected_lang, target_language, tenant_id)
result["detected_lang"] = detected_lang
return result
async def batch_translate(
texts: List[str],
source_language: str = "eng_Latn",
target_language: str = "spa_Latn",
tenant_id: Optional[str] = None
) -> List[Dict[str, Any]]:
"""
Translate multiple texts at once.
Args:
texts: List of strings to translate
source_language: Source language code
target_language: Target language code
tenant_id: Optional tenant identifier for logging
Returns:
List of translation result dictionaries
"""
if not texts or not isinstance(texts, list):
log_interaction(
intent="batch_translation",
tenant_id=tenant_id,
success=False,
error="Invalid texts input"
)
return []
# Filter valid texts and limit batch size
valid_texts = [t for t in texts if isinstance(t, str) and t.strip()]
if len(valid_texts) > 50: # Batch size limit
valid_texts = valid_texts[:50]
log_interaction(
intent="batch_translation",
tenant_id=tenant_id,
success=None,
details=f"Batch size limited to 50 texts"
)
if not valid_texts:
log_interaction(
intent="batch_translation",
tenant_id=tenant_id,
success=False,
error="No valid texts in batch"
)
return []
start_time = time.time()
results = []
for text in valid_texts:
result = await translate_text(text, source_language, target_language, tenant_id)
results.append(result)
response_time_ms = int((time.time() - start_time) * 1000)
log_interaction(
intent="batch_translation",
tenant_id=tenant_id,
success=True,
response_time_ms=response_time_ms,
batch_size=len(valid_texts),
source_lang=normalize_language_code(source_language),
target_lang=normalize_language_code(target_language)
)
return results
def get_civic_phrase(
phrase_key: str,
language: str = "eng_Latn"
) -> str:
"""
Get a pre-translated civic phrase for common queries.
Args:
phrase_key: Key for the civic phrase (e.g., "voting_location")
language: Target language code
Returns:
Translated phrase or empty string if not found
"""
if not phrase_key or not isinstance(phrase_key, str):
return ""
lang_code = normalize_language_code(language)
phrase = CIVIC_PHRASES.get(lang_code, {}).get(phrase_key, "")
if phrase:
log_interaction(
intent="civic_phrase_lookup",
success=True,
phrase_key=phrase_key,
language=lang_code
)
return phrase