ai-textbook-backend / api /translation.py
AI Development Team
Fix backend API files for HF deployment
8a34091
from fastapi import APIRouter, HTTPException, Header
from pydantic import BaseModel
from typing import Optional
import hashlib
import logging
import os
logger = logging.getLogger(__name__)
router = APIRouter()
# Import dependencies
try:
from services.translation_service import TranslationService
from services.rate_limiter import RateLimiter
from auth.jwt_utils import get_current_user_id_from_token
SERVICES_ENABLED = True
except ImportError as e:
SERVICES_ENABLED = False
logging.warning(f"Services not available: {str(e)}")
try:
from database.db import SessionLocal
from database.models import Translation
DB_ENABLED = True
except ImportError as e:
DB_ENABLED = False
logging.warning(f"Database not available: {str(e)}")
# Initialize services
if SERVICES_ENABLED:
translation_service = TranslationService()
rate_limiter = RateLimiter(max_requests=10, window_seconds=3600)
else:
translation_service = None
rate_limiter = None
# Cost and performance metrics (in-memory)
translation_metrics = {
"total_translations": 0,
"cache_hits": 0,
"cache_misses": 0,
"api_calls": 0,
"total_latency_ms": 0,
"api_failures": 0
}
def log_metrics():
"""Log aggregated metrics for monitoring"""
total = translation_metrics["total_translations"]
if total > 0:
cache_hit_rate = (translation_metrics["cache_hits"] / total) * 100
avg_latency = translation_metrics["total_latency_ms"] / total
cost_savings = (translation_metrics["cache_hits"] / total) * 100
logger.info(
f"Translation Metrics: "
f"Total={total}, Cache Hit Rate={cache_hit_rate:.1f}%, "
f"API Calls={translation_metrics['api_calls']}, "
f"Avg Latency={avg_latency:.0f}ms, "
f"Cost Savings={cost_savings:.1f}%, "
f"Failures={translation_metrics['api_failures']}"
)
class UrduTranslationRequest(BaseModel):
chapter_id: str
content: str
content_hash: str
class UrduTranslationResponse(BaseModel):
translated_content: str
cached: bool
translation_id: Optional[str] = None
class TranslationRequest(BaseModel):
"""Legacy endpoint - kept for backward compatibility"""
text: str
source_lang: str = "en"
target_lang: str = "ur"
class TranslationResponse(BaseModel):
"""Legacy endpoint response"""
original_text: str
translated_text: str
source_lang: str
target_lang: str
@router.post("/translate/urdu", response_model=UrduTranslationResponse)
async def translate_to_urdu(
request: UrduTranslationRequest,
authorization: Optional[str] = Header(None)
):
"""
Translate chapter content to Urdu with JWT authentication, caching, and rate limiting
"""
try:
if not SERVICES_ENABLED:
raise HTTPException(status_code=503, detail="Translation service not available")
# 1. Verify JWT authentication
if not authorization or not authorization.startswith("Bearer "):
raise HTTPException(status_code=401, detail="Missing or invalid authorization header")
token = authorization.replace("Bearer ", "")
user_id = get_current_user_id_from_token(token)
if not user_id:
raise HTTPException(status_code=401, detail="Invalid or expired token")
import time
start_time = time.time()
logger.info(f"Translation request from user {user_id} for chapter {request.chapter_id}")
translation_metrics["total_translations"] += 1
# 2. Validate content hash
computed_hash = hashlib.sha256(request.content.encode('utf-8')).hexdigest()
if computed_hash != request.content_hash:
logger.warning(f"Content hash mismatch: expected {computed_hash}, got {request.content_hash}")
raise HTTPException(
status_code=400,
detail=f"Content hash mismatch. Expected: {computed_hash}, Got: {request.content_hash}"
)
# 3. Check rate limit
if not rate_limiter.check_rate_limit(user_id):
retry_after = rate_limiter.get_retry_after(user_id)
logger.warning(f"Rate limit exceeded for user {user_id}")
raise HTTPException(
status_code=429,
detail=f"Translation rate limit exceeded. Try again in {retry_after} seconds.",
headers={"Retry-After": str(retry_after)}
)
# 4. Check database cache (if enabled)
db = None
if DB_ENABLED:
try:
db = SessionLocal()
cached_translation = db.query(Translation).filter(
Translation.chapter_id == request.chapter_id,
Translation.content_hash == request.content_hash,
Translation.target_language == "urdu"
).first()
if cached_translation:
latency_ms = (time.time() - start_time) * 1000
translation_metrics["cache_hits"] += 1
translation_metrics["total_latency_ms"] += latency_ms
logger.info(f"Cache HIT for chapter {request.chapter_id} | Latency: {latency_ms:.0f}ms")
log_metrics()
return UrduTranslationResponse(
translated_content=cached_translation.translated_content,
cached=True,
translation_id=str(cached_translation.id)
)
except Exception as db_err:
logger.warning(f"Database cache check failed: {str(db_err)}")
finally:
if db:
db.close()
db = None
# 5. Cache MISS - Translate using OpenRouter
translation_metrics["cache_misses"] += 1
translation_metrics["api_calls"] += 1
logger.info(f"Cache MISS for chapter {request.chapter_id} - calling OpenRouter API")
api_start_time = time.time()
try:
translated_text = translation_service.translate_to_urdu(request.content)
api_latency_ms = (time.time() - api_start_time) * 1000
logger.info(f"OpenRouter API call successful | API Latency: {api_latency_ms:.0f}ms")
except Exception as api_error:
translation_metrics["api_failures"] += 1
logger.error(f"OpenRouter API error: {str(api_error)}")
# Retry once with exponential backoff
time.sleep(2)
try:
translated_text = translation_service.translate_to_urdu(request.content)
api_latency_ms = (time.time() - api_start_time) * 1000
logger.info(f"Retry successful after {api_latency_ms:.0f}ms")
except Exception as retry_error:
translation_metrics["api_failures"] += 1
logger.error(f"Retry failed: {str(retry_error)}")
raise HTTPException(
status_code=503,
detail="Translation service temporarily unavailable. Please try again."
)
# 6. Save to database (if enabled)
translation_id = None
if DB_ENABLED:
try:
import uuid
db = SessionLocal()
db_translation = Translation(
id=uuid.uuid4(),
chapter_id=request.chapter_id,
content_hash=request.content_hash,
source_language="english",
target_language="urdu",
original_content=request.content,
translated_content=translated_text,
user_id=uuid.UUID(user_id)
)
db.add(db_translation)
db.commit()
db.refresh(db_translation)
translation_id = str(db_translation.id)
logger.info(f"Saved translation to database: {translation_id}")
except Exception as save_err:
logger.warning(f"Failed to save translation: {str(save_err)}")
if db:
db.rollback()
finally:
if db:
db.close()
# Log final metrics
total_latency_ms = (time.time() - start_time) * 1000
translation_metrics["total_latency_ms"] += total_latency_ms
logger.info(f"Translation complete | Total Latency: {total_latency_ms:.0f}ms | Cached: False")
log_metrics()
return UrduTranslationResponse(
translated_content=translated_text,
cached=False,
translation_id=translation_id
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Unexpected error in translate_to_urdu: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
# Legacy endpoints - kept for backward compatibility
@router.post("/translation/translate", response_model=TranslationResponse)
async def translate_text(request: TranslationRequest):
"""
Legacy translation endpoint - translate text between languages
"""
try:
if not SERVICES_ENABLED:
raise HTTPException(status_code=503, detail="Translation service not available")
if request.source_lang == "en" and request.target_lang == "ur":
translated_text = translation_service.translate_to_urdu(request.text)
elif request.source_lang == "ur" and request.target_lang == "en":
translated_text = translation_service.translate_to_english(request.text)
else:
raise HTTPException(
status_code=400,
detail=f"Unsupported language pair: {request.source_lang} to {request.target_lang}"
)
return TranslationResponse(
original_text=request.text,
translated_text=translated_text,
source_lang=request.source_lang,
target_lang=request.target_lang
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Translation error: {str(e)}")
raise HTTPException(status_code=500, detail=f"Error during translation: {str(e)}")
@router.get("/translation/health")
async def translation_health():
"""Health check for translation service"""
return {
"status": "translation service is running",
"services_enabled": SERVICES_ENABLED,
"database_enabled": DB_ENABLED,
"rate_limiter_enabled": rate_limiter is not None
}
@router.get("/translate/stats")
async def translation_stats(
authorization: Optional[str] = Header(None)
):
"""Get translation statistics for current user"""
try:
if not SERVICES_ENABLED:
raise HTTPException(status_code=503, detail="Service not available")
if not authorization or not authorization.startswith("Bearer "):
raise HTTPException(status_code=401, detail="Missing authorization header")
token = authorization.replace("Bearer ", "")
user_id = get_current_user_id_from_token(token)
if not user_id:
raise HTTPException(status_code=401, detail="Invalid token")
remaining = rate_limiter.get_remaining(user_id)
retry_after = rate_limiter.get_retry_after(user_id) if remaining == 0 else 0
return {
"translations_remaining": remaining,
"translations_limit": rate_limiter.limit,
"window_seconds": rate_limiter.window_seconds,
"retry_after_seconds": retry_after
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting stats: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))