""" llm_processor.py - LLM API integration with key rotation, retry, and chunking. Handles all communication with the HuggingFace Router API. """ import time import random import logging import threading from typing import List, Dict, Optional, Tuple from dataclasses import dataclass, field from openai import OpenAI from openai import ( APIError, APIConnectionError, RateLimitError, APIStatusError, ) from config import ( HF_API_BASE_URL, SUPPORTED_MODELS, DEFAULT_MODEL, MAX_RETRIES, INITIAL_BACKOFF_SECONDS, MAX_BACKOFF_SECONDS, MAX_CHUNK_CHARS, ) from translator import pre_translate_to_hindi, is_chinese_text from glossary_manager import ( build_system_prompt, extract_new_mappings, process_and_store_mappings, ) from chapter_detector import split_long_chapter import database as db logger = logging.getLogger(__name__) # ─── API Key Manager ──────────────────────────────────────────────────────────── class APIKeyManager: """ Thread-safe API key pool manager with rotation and failure tracking. """ def __init__(self): self._keys: List[str] = [] self._current_index: int = 0 self._failed_keys: Dict[str, float] = {} # key -> failure_timestamp self._lock = threading.Lock() self._failure_cooldown = 300 # 5 minutes cooldown for failed keys def load_keys(self, keys_text: str): """Load API keys from text (one per line or comma-separated).""" with self._lock: # Support both newline and comma separated raw_keys = keys_text.replace(',', '\n').split('\n') self._keys = [ k.strip() for k in raw_keys if k.strip() and len(k.strip()) > 10 ] self._current_index = 0 self._failed_keys.clear() logger.info(f"Loaded {len(self._keys)} API keys") @property def total_keys(self) -> int: return len(self._keys) @property def available_keys(self) -> int: now = time.time() with self._lock: failed_count = sum( 1 for ts in self._failed_keys.values() if now - ts < self._failure_cooldown ) return len(self._keys) - failed_count def get_next_key(self) -> Optional[str]: """Get the next available API key, skipping recently failed ones.""" with self._lock: if not self._keys: return None now = time.time() attempts = 0 total = len(self._keys) while attempts < total: key = self._keys[self._current_index] self._current_index = (self._current_index + 1) % total # Check if key is in cooldown if key in self._failed_keys: if now - self._failed_keys[key] < self._failure_cooldown: attempts += 1 continue else: # Cooldown expired, remove from failed del self._failed_keys[key] return key # All keys are in cooldown; return the oldest failed one logger.warning("All API keys are in cooldown! Using oldest failed key.") if self._failed_keys: oldest_key = min(self._failed_keys, key=self._failed_keys.get) del self._failed_keys[oldest_key] return oldest_key return self._keys[0] if self._keys else None def mark_key_failed(self, key: str): """Mark a key as failed (rate limited, quota exceeded, etc.).""" with self._lock: self._failed_keys[key] = time.time() failed_count = len(self._failed_keys) total = len(self._keys) logger.warning( f"API key marked as failed. {failed_count}/{total} keys failed." ) def mark_key_success(self, key: str): """Mark a key as successful (remove from failed if present).""" with self._lock: self._failed_keys.pop(key, None) # ─── Global Key Manager Instance ──────────────────────────────────────────────── key_manager = APIKeyManager() # ─── LLM API Call ──────────────────────────────────────────────────────────────── def call_llm( system_prompt: str, user_content: str, model_id: str = DEFAULT_MODEL, max_tokens: int = 8192, temperature: float = 0.3, ) -> Tuple[str, bool]: """ Call the LLM API with automatic key rotation and retry logic. Returns: Tuple of (response_text, success_bool) """ backoff = INITIAL_BACKOFF_SECONDS for attempt in range(MAX_RETRIES): api_key = key_manager.get_next_key() if not api_key: logger.error("No API keys available!") return "ERROR: No API keys available", False try: client = OpenAI( base_url=HF_API_BASE_URL, api_key=api_key, ) response = client.chat.completions.create( model=model_id, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_content}, ], max_tokens=max_tokens, temperature=temperature, stream=False, ) result = response.choices[0].message.content if result and result.strip(): key_manager.mark_key_success(api_key) return result.strip(), True else: logger.warning(f"Empty response from API on attempt {attempt + 1}") key_manager.mark_key_failed(api_key) except RateLimitError as e: logger.warning( f"Rate limit hit (attempt {attempt + 1}/{MAX_RETRIES}): {e}" ) key_manager.mark_key_failed(api_key) except APIStatusError as e: status_code = getattr(e, 'status_code', None) logger.warning( f"API status error {status_code} (attempt {attempt + 1}/{MAX_RETRIES}): {e}" ) key_manager.mark_key_failed(api_key) if status_code and status_code >= 500: # Server error, wait longer backoff = min(backoff * 2, MAX_BACKOFF_SECONDS) except APIConnectionError as e: logger.warning( f"API connection error (attempt {attempt + 1}/{MAX_RETRIES}): {e}" ) key_manager.mark_key_failed(api_key) except APIError as e: logger.warning( f"API error (attempt {attempt + 1}/{MAX_RETRIES}): {e}" ) key_manager.mark_key_failed(api_key) except Exception as e: logger.error( f"Unexpected error (attempt {attempt + 1}/{MAX_RETRIES}): {e}" ) key_manager.mark_key_failed(api_key) # Exponential backoff with jitter if attempt < MAX_RETRIES - 1: jitter = random.uniform(0, backoff * 0.3) sleep_time = backoff + jitter logger.info(f"Retrying in {sleep_time:.1f} seconds...") time.sleep(sleep_time) backoff = min(backoff * 2, MAX_BACKOFF_SECONDS) return "ERROR: All retry attempts exhausted", False # ─── Chapter Translation Engine ───────────────────────────────────────────────── def translate_chapter( novel_id: int, chapter_id: int, chapter_number: int, chapter_text: str, model_id: str = DEFAULT_MODEL, mc_original_name: Optional[str] = None, mc_indian_name: Optional[str] = None, use_pre_translation: bool = True, ) -> Tuple[str, bool]: """ Translate a single chapter with chunking support. Steps: 1. Pre-translate Chinese -> Hindi via Google Translate (rough) 2. Split into chunks if needed 3. For each chunk: build prompt with glossary, call LLM 4. Extract new mappings and store them 5. Combine all translated chunks """ logger.info( f"Translating Chapter {chapter_number} " f"({len(chapter_text)} chars) with model {model_id}" ) # Step 1: Pre-translate if Chinese if use_pre_translation and is_chinese_text(chapter_text): logger.info(f"Pre-translating Chinese text for Chapter {chapter_number}") pre_translated = pre_translate_to_hindi(chapter_text) # Send both original and pre-translated to LLM processing_text = ( f"[मूल Chinese text]:\n{chapter_text}\n\n" f"[Google Translate Hindi (rough, refine this)]:\n{pre_translated}" ) else: processing_text = chapter_text # Step 2: Split into chunks chunks = split_long_chapter(processing_text, MAX_CHUNK_CHARS) logger.info(f"Chapter {chapter_number} split into {len(chunks)} chunks") translated_parts = [] all_success = True for chunk_idx, chunk in enumerate(chunks): # Step 3: Build prompt with current glossary system_prompt = build_system_prompt( novel_id=novel_id, mc_original_name=mc_original_name, mc_indian_name=mc_indian_name, ) if len(chunks) > 1: user_content = ( f"[Chapter {chapter_number}, Part {chunk_idx + 1}/{len(chunks)}]\n\n" f"{chunk}" ) else: user_content = f"[Chapter {chapter_number}]\n\n{chunk}" # Step 4: Call LLM response, success = call_llm( system_prompt=system_prompt, user_content=user_content, model_id=model_id, ) if not success: logger.error( f"Failed to translate Chapter {chapter_number}, chunk {chunk_idx + 1}" ) db.mark_chapter_error( chapter_id, f"Chunk {chunk_idx + 1}/{len(chunks)} failed: {response}" ) all_success = False translated_parts.append( f"\n[अनुवाद त्रुटि - Chapter {chapter_number}, Part {chunk_idx + 1}]\n" ) continue # Step 5: Extract mappings from response clean_text, new_mappings = extract_new_mappings(response) if new_mappings: process_and_store_mappings( novel_id=novel_id, mappings=new_mappings, chapter_number=chapter_number, ) translated_parts.append(clean_text) # Small delay between chunks if chunk_idx < len(chunks) - 1: time.sleep(0.5) # Combine all parts full_translation = "\n\n".join(translated_parts) # Save to database if all_success: db.save_translated_chapter(chapter_id, full_translation) logger.info(f"Chapter {chapter_number} translated successfully") else: # Save partial translation db.save_translated_chapter(chapter_id, full_translation) logger.warning(f"Chapter {chapter_number} translated with some errors") return full_translation, all_success # ─── Batch Processing ─────────────────────────────────────────────────────────── def process_novel( novel_id: int, model_id: str = DEFAULT_MODEL, progress_callback=None, ) -> Dict: """ Process an entire novel: translate all chapters sequentially. Args: novel_id: ID of the novel in database model_id: LLM model to use progress_callback: Function to call with progress updates signature: callback(chapter_num, total_chapters, status_message) Returns: Dict with processing results """ novel_data = db.get_novel(novel_id) if not novel_data: return {"error": "Novel not found"} mc_original = novel_data.mc_original_name mc_indian = novel_data.mc_indian_name total_chapters = novel_data.total_chapters # Update status db.update_novel_status(novel_id, "processing") results = { "total": total_chapters, "success": 0, "failed": 0, "output_files": [], } processed_count = db.get_processed_chapter_count(novel_id) while True: # Get next batch of unprocessed chapters unprocessed = db.get_unprocessed_chapters(novel_id, limit=1) if not unprocessed: break chapter = unprocessed[0] chapter_num = chapter["chapter_number"] if progress_callback: progress_callback( chapter_num, total_chapters, f"अध्याय {chapter_num}/{total_chapters} का अनुवाद हो रहा है..." ) translation, success = translate_chapter( novel_id=novel_id, chapter_id=chapter["id"], chapter_number=chapter_num, chapter_text=chapter["original_text"], model_id=model_id, mc_original_name=mc_original, mc_indian_name=mc_indian, ) if success: results["success"] += 1 else: results["failed"] += 1 processed_count += 1 db.update_novel_status(novel_id, "processing", processed_count) # Check if we've completed a batch of 20 if processed_count % 20 == 0 or processed_count == total_chapters: batch_start = ((processed_count - 1) // 20) * 20 + 1 batch_end = min(batch_start + 19, total_chapters) output_file = generate_output_file( novel_id, batch_start, batch_end, novel_data.title ) if output_file: results["output_files"].append(output_file) if progress_callback: progress_callback( chapter_num, total_chapters, f"✅ अध्याय {batch_start}-{batch_end} की फ़ाइल तैयार!" ) # Final status if results["failed"] == 0: db.update_novel_status(novel_id, "completed", processed_count) else: db.update_novel_status(novel_id, "completed_with_errors", processed_count) return results def generate_output_file( novel_id: int, chapter_start: int, chapter_end: int, novel_title: str, ) -> Optional[Dict]: """Generate an output .txt file for a range of chapters.""" chapters = db.get_translated_chapters_range(novel_id, chapter_start, chapter_end) if not chapters: return None # Build file content lines = [] lines.append(f"{'='*60}") lines.append(f" {novel_title} - Hindi Localized Version") lines.append(f" अध्याय {chapter_start} से {chapter_end}") lines.append(f" Automated Translation & Indianization") lines.append(f"{'='*60}\n") for ch in chapters: lines.append(f"\n{'─'*40}") lines.append(f"अध्याय {ch['chapter_number']}") if ch.get("original_title"): lines.append(f"(मूल: {ch['original_title']})") lines.append(f"{'─'*40}\n") lines.append(ch["translated_text"]) lines.append("") file_content = "\n".join(lines) filename = f"{novel_title}_chapters_{chapter_start}-{chapter_end}.txt" # Save to database db.save_output_file( novel_id=novel_id, filename=filename, chapter_start=chapter_start, chapter_end=chapter_end, file_content=file_content, ) return { "filename": filename, "chapter_start": chapter_start, "chapter_end": chapter_end, "size": len(file_content), }