| from fastapi import FastAPI, Request |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.responses import JSONResponse |
| from pydantic import BaseModel |
| from transformers import AutoTokenizer, AutoModelForSeq2SeqLM |
| import torch |
| import asyncio |
| import httpx |
| import time |
| from concurrent.futures import ThreadPoolExecutor |
| import hashlib |
| from collections import defaultdict |
|
|
| app = FastAPI(title="Distributed M2M100 API") |
|
|
| |
| |
| |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| |
| |
| |
| MODEL_NAME = "facebook/m2m100_418M" |
| print("Loading model...") |
| tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) |
| model = AutoModelForSeq2SeqLM.from_pretrained(MODEL_NAME) |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
| model.to(device) |
| model.eval() |
| torch.set_num_threads(2) |
| torch.set_num_interop_threads(1) |
| print("Model loaded!") |
|
|
| |
| |
| |
| MAX_ACTIVE_REQUESTS = 3 |
| MAX_QUEUE_SIZE = 3 |
| MAX_CHARACTERS = 3000 |
| USERNAME = "velost" |
| SPACE_PREFIX = "trans" |
| DISCOVERY_CHUNK_SIZE = 50 |
| CACHE_TTL = 86400 |
| MAX_CACHE_SIZE = 10000 |
| MAX_RPS_LANGUAGES = 20000 |
|
|
| |
| |
| |
| active_requests = 0 |
| request_queue = asyncio.Queue(maxsize=MAX_QUEUE_SIZE) |
| executor = ThreadPoolExecutor(max_workers=3) |
| lock = asyncio.Lock() |
|
|
| |
| cooldown = {} |
|
|
| |
| translation_cache = {} |
|
|
| |
| languages_request_times = [] |
| languages_lock = asyncio.Lock() |
|
|
| |
| |
| |
| SUPPORTED_LANGS = { |
| "af": "Afrikaans", |
| "am": "Amharic", |
| "ar": "Arabic", |
| "ast": "Asturian", |
| "az": "Azerbaijani", |
| "ba": "Bashkir", |
| "be": "Belarusian", |
| "bg": "Bulgarian", |
| "bn": "Bengali", |
| "br": "Breton", |
| "bs": "Bosnian", |
| "ca": "Catalan", |
| "ceb": "Cebuano", |
| "cs": "Czech", |
| "cy": "Welsh", |
| "da": "Danish", |
| "de": "German", |
| "el": "Greek", |
| "en": "English", |
| "es": "Spanish", |
| "et": "Estonian", |
| "fa": "Persian", |
| "ff": "Fulah", |
| "fi": "Finnish", |
| "fr": "French", |
| "fy": "Western Frisian", |
| "ga": "Irish", |
| "gd": "Scottish Gaelic", |
| "gl": "Galician", |
| "gu": "Gujarati", |
| "ha": "Hausa", |
| "he": "Hebrew", |
| "hi": "Hindi", |
| "hr": "Croatian", |
| "ht": "Haitian Creole", |
| "hu": "Hungarian", |
| "hy": "Armenian", |
| "id": "Indonesian", |
| "ig": "Igbo", |
| "ilo": "Ilocano", |
| "is": "Icelandic", |
| "it": "Italian", |
| "ja": "Japanese", |
| "jv": "Javanese", |
| "ka": "Georgian", |
| "kk": "Kazakh", |
| "km": "Khmer", |
| "kn": "Kannada", |
| "ko": "Korean", |
| "lb": "Luxembourgish", |
| "lg": "Ganda", |
| "ln": "Lingala", |
| "lo": "Lao", |
| "lt": "Lithuanian", |
| "lv": "Latvian", |
| "mg": "Malagasy", |
| "mk": "Macedonian", |
| "ml": "Malayalam", |
| "mn": "Mongolian", |
| "mr": "Marathi", |
| "ms": "Malay", |
| "my": "Myanmar", |
| "ne": "Nepali", |
| "nl": "Dutch", |
| "no": "Norwegian", |
| "ns": "Northern Sotho", |
| "oc": "Occitan", |
| "or": "Odia", |
| "pa": "Punjabi", |
| "pl": "Polish", |
| "ps": "Pashto", |
| "pt": "Portuguese", |
| "ro": "Romanian", |
| "ru": "Russian", |
| "sd": "Sindhi", |
| "si": "Sinhala", |
| "sk": "Slovak", |
| "sl": "Slovenian", |
| "so": "Somali", |
| "sq": "Albanian", |
| "sr": "Serbian", |
| "ss": "Swati", |
| "su": "Sundanese", |
| "sv": "Swedish", |
| "sw": "Swahili", |
| "ta": "Tamil", |
| "th": "Thai", |
| "tl": "Tagalog", |
| "tn": "Tswana", |
| "tr": "Turkish", |
| "uk": "Ukrainian", |
| "ur": "Urdu", |
| "uz": "Uzbek", |
| "vi": "Vietnamese", |
| "wo": "Wolof", |
| "xh": "Xhosa", |
| "yi": "Yiddish", |
| "yo": "Yoruba", |
| "zh": "Chinese", |
| "zu": "Zulu" |
| } |
|
|
| |
| |
| |
| class TranslateRequest(BaseModel): |
| text: str |
| source_lang: str |
| target_lang: str |
|
|
| |
| |
| |
| def blocking_translate(text, source_lang, target_lang): |
| tokenizer.src_lang = source_lang |
| encoded = tokenizer( |
| text, return_tensors="pt" |
| ).to(device) |
| generated_tokens = model.generate( |
| **encoded, |
| forced_bos_token_id=tokenizer.get_lang_id(target_lang), |
| max_length=1024, |
| num_beams=1 |
| ) |
| return tokenizer.batch_decode( |
| generated_tokens, skip_special_tokens=True |
| )[0] |
|
|
| |
| |
| |
| def get_cache_key(text, source_lang, target_lang): |
| """Generate a unique cache key for a translation request""" |
| raw_key = f"{text}|{source_lang}|{target_lang}" |
| return hashlib.md5(raw_key.encode()).hexdigest() |
|
|
| def get_from_cache(text, source_lang, target_lang): |
| """Retrieve translation from cache if valid""" |
| cache_key = get_cache_key(text, source_lang, target_lang) |
| if cache_key in translation_cache: |
| cached_data = translation_cache[cache_key] |
| if time.time() - cached_data["timestamp"] < CACHE_TTL: |
| return cached_data["translated_text"] |
| else: |
| |
| del translation_cache[cache_key] |
| return None |
|
|
| def add_to_cache(text, source_lang, target_lang, translated_text): |
| """Add translation to cache with timestamp. Override oldest entry if cache is full.""" |
| cache_key = get_cache_key(text, source_lang, target_lang) |
| |
| |
| if cache_key in translation_cache: |
| translation_cache[cache_key] = { |
| "translated_text": translated_text, |
| "timestamp": time.time() |
| } |
| return |
| |
| |
| if len(translation_cache) >= MAX_CACHE_SIZE: |
| oldest_key = min( |
| translation_cache.keys(), |
| key=lambda k: translation_cache[k]["timestamp"] |
| ) |
| del translation_cache[oldest_key] |
| |
| |
| translation_cache[cache_key] = { |
| "translated_text": translated_text, |
| "timestamp": time.time() |
| } |
|
|
| |
| |
| |
| def is_blocked(url): |
| return cooldown.get(url, 0) > time.time() |
|
|
| |
| |
| |
| async def check_space(client, i): |
| space_name = SPACE_PREFIX if i == 0 else f"{SPACE_PREFIX}{i}" |
| url = f"https://{USERNAME}-{space_name}.hf.space" |
| |
| if is_blocked(url): |
| return { |
| "exists": True, |
| "space_name": space_name, |
| "url": url, |
| "status_data": { |
| "current_space_status": "cooldown" |
| } |
| } |
| |
| try: |
| response = await client.get(f"{url}/status") |
| if response.status_code == 200: |
| try: |
| return { |
| "exists": True, |
| "space_name": space_name, |
| "url": url, |
| "status_data": response.json() |
| } |
| except: |
| return { |
| "exists": True, |
| "space_name": space_name, |
| "url": url, |
| "status_data": None |
| } |
| elif response.status_code == 404: |
| return { |
| "exists": False, |
| "space_name": space_name, |
| "url": url |
| } |
| elif response.status_code == 429: |
| cooldown[url] = time.time() + 1 |
| return { |
| "exists": True, |
| "space_name": space_name, |
| "url": url, |
| "status_data": { |
| "current_space_status": "rate_limited" |
| } |
| } |
| else: |
| return { |
| "exists": True, |
| "space_name": space_name, |
| "url": url, |
| "status_data": { |
| "current_space_status": "error" |
| } |
| } |
| except: |
| return { |
| "exists": False, |
| "space_name": space_name, |
| "url": url |
| } |
|
|
| |
| |
| |
| async def discover_spaces_parallel(): |
| existing_spaces = [] |
| empty_space_url = None |
| timeout = httpx.Timeout(5.0) |
| |
| async with httpx.AsyncClient(timeout=timeout) as client: |
| start = 0 |
| while True: |
| tasks = [ |
| check_space(client, i) |
| for i in range(start, start + DISCOVERY_CHUNK_SIZE) |
| ] |
| results = await asyncio.gather(*tasks) |
| |
| found_empty = False |
| for result in results: |
| if result["exists"]: |
| status = (result.get("status_data") or {}).get("current_space_status") |
| if status in ["rate_limited", "cooldown"]: |
| continue |
| existing_spaces.append({ |
| "space_name": result["space_name"], |
| "url": result["url"], |
| "status": result.get("status_data") |
| }) |
| else: |
| empty_space_url = result["url"] |
| found_empty = True |
| break |
| |
| if found_empty: |
| break |
| |
| start += DISCOVERY_CHUNK_SIZE |
| |
| return { |
| "existing_spaces": existing_spaces, |
| "empty_space_url": empty_space_url |
| } |
|
|
| |
| |
| |
| @app.get("/status") |
| async def status(): |
| queue_size = request_queue.qsize() |
| full = ( |
| active_requests >= MAX_ACTIVE_REQUESTS |
| and queue_size >= MAX_QUEUE_SIZE |
| ) |
| discovered = await discover_spaces_parallel() |
| return { |
| "current_space_status": "full" if full else "available", |
| "active_requests": active_requests, |
| "queue_size": queue_size, |
| "empty_space_url": discovered["empty_space_url"], |
| "total_spaces": len(discovered["existing_spaces"]), |
| "cached_translations": len(translation_cache) |
| } |
|
|
| |
| |
| |
| @app.get("/languages") |
| async def languages(request: Request): |
| async with languages_lock: |
| current_time = time.time() |
| |
| |
| global languages_request_times |
| languages_request_times = [ |
| t for t in languages_request_times |
| if current_time - t < 1.0 |
| ] |
| |
| |
| if len(languages_request_times) >= MAX_RPS_LANGUAGES: |
| |
| discovered = await discover_spaces_parallel() |
| return JSONResponse( |
| status_code=429, |
| content={ |
| "status": "space_full", |
| "message": f"Rate limit exceeded. Maximum {MAX_RPS_LANGUAGES} requests per second allowed.", |
| "empty_space_url": discovered["empty_space_url"] |
| } |
| ) |
| |
| |
| languages_request_times.append(current_time) |
| |
| return SUPPORTED_LANGS |
|
|
| |
| |
| |
| @app.post("/translate") |
| async def translate(req: TranslateRequest): |
| global active_requests |
| |
| |
| char_count_without_spaces = len(req.text.replace(" ", "")) |
| if char_count_without_spaces > MAX_CHARACTERS: |
| return { |
| "status": "character_limit_exceeded", |
| "message": f"Text exceeds maximum character limit (without spaces). Maximum allowed: {MAX_CHARACTERS} characters, received: {char_count_without_spaces} characters", |
| "max_characters": MAX_CHARACTERS, |
| "received_characters": char_count_without_spaces |
| } |
| |
| |
| cached_result = get_from_cache(req.text, req.source_lang, req.target_lang) |
| if cached_result is not None: |
| return { |
| "status": "success", |
| "translated_text": cached_result, |
| "from_cache": True |
| } |
| |
| queue_size = request_queue.qsize() |
| if active_requests >= MAX_ACTIVE_REQUESTS and queue_size >= MAX_QUEUE_SIZE: |
| discovered = await discover_spaces_parallel() |
| return { |
| "status": "space_full", |
| "empty_space_url": discovered["empty_space_url"] |
| } |
| |
| await request_queue.put("req") |
| async with lock: |
| active_requests += 1 |
| try: |
| loop = asyncio.get_running_loop() |
| result = await loop.run_in_executor( |
| executor, |
| blocking_translate, |
| req.text, |
| req.source_lang, |
| req.target_lang |
| ) |
| |
| |
| add_to_cache(req.text, req.source_lang, req.target_lang, result) |
| |
| return { |
| "status": "success", |
| "translated_text": result, |
| "from_cache": False |
| } |
| finally: |
| async with lock: |
| active_requests -= 1 |
| await request_queue.get() |
| request_queue.task_done() |
|
|
| |
| |
| |
| @app.get("/") |
| def home(): |
| return {"status": "running"} |