| """ |
| llm_processor.py - LLM API integration with key rotation, retry, and chunking. |
| Handles all communication with the HuggingFace Router API. |
| """ |
|
|
| import time |
| import random |
| import logging |
| import threading |
| from typing import List, Dict, Optional, Tuple |
| from dataclasses import dataclass, field |
|
|
| from openai import OpenAI |
| from openai import ( |
| APIError, |
| APIConnectionError, |
| RateLimitError, |
| APIStatusError, |
| ) |
|
|
| from config import ( |
| HF_API_BASE_URL, |
| SUPPORTED_MODELS, |
| DEFAULT_MODEL, |
| MAX_RETRIES, |
| INITIAL_BACKOFF_SECONDS, |
| MAX_BACKOFF_SECONDS, |
| MAX_CHUNK_CHARS, |
| ) |
| from translator import pre_translate_to_hindi, is_chinese_text |
| from glossary_manager import ( |
| build_system_prompt, |
| extract_new_mappings, |
| process_and_store_mappings, |
| ) |
| from chapter_detector import split_long_chapter |
| import database as db |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| |
|
|
| class APIKeyManager: |
| """ |
| Thread-safe API key pool manager with rotation and failure tracking. |
| """ |
|
|
| def __init__(self): |
| self._keys: List[str] = [] |
| self._current_index: int = 0 |
| self._failed_keys: Dict[str, float] = {} |
| self._lock = threading.Lock() |
| self._failure_cooldown = 300 |
|
|
| def load_keys(self, keys_text: str): |
| """Load API keys from text (one per line or comma-separated).""" |
| with self._lock: |
| |
| raw_keys = keys_text.replace(',', '\n').split('\n') |
| self._keys = [ |
| k.strip() for k in raw_keys |
| if k.strip() and len(k.strip()) > 10 |
| ] |
| self._current_index = 0 |
| self._failed_keys.clear() |
|
|
| logger.info(f"Loaded {len(self._keys)} API keys") |
|
|
| @property |
| def total_keys(self) -> int: |
| return len(self._keys) |
|
|
| @property |
| def available_keys(self) -> int: |
| now = time.time() |
| with self._lock: |
| failed_count = sum( |
| 1 for ts in self._failed_keys.values() |
| if now - ts < self._failure_cooldown |
| ) |
| return len(self._keys) - failed_count |
|
|
| def get_next_key(self) -> Optional[str]: |
| """Get the next available API key, skipping recently failed ones.""" |
| with self._lock: |
| if not self._keys: |
| return None |
|
|
| now = time.time() |
| attempts = 0 |
| total = len(self._keys) |
|
|
| while attempts < total: |
| key = self._keys[self._current_index] |
| self._current_index = (self._current_index + 1) % total |
|
|
| |
| if key in self._failed_keys: |
| if now - self._failed_keys[key] < self._failure_cooldown: |
| attempts += 1 |
| continue |
| else: |
| |
| del self._failed_keys[key] |
|
|
| return key |
|
|
| |
| logger.warning("All API keys are in cooldown! Using oldest failed key.") |
| if self._failed_keys: |
| oldest_key = min(self._failed_keys, key=self._failed_keys.get) |
| del self._failed_keys[oldest_key] |
| return oldest_key |
|
|
| return self._keys[0] if self._keys else None |
|
|
| def mark_key_failed(self, key: str): |
| """Mark a key as failed (rate limited, quota exceeded, etc.).""" |
| with self._lock: |
| self._failed_keys[key] = time.time() |
| failed_count = len(self._failed_keys) |
| total = len(self._keys) |
| logger.warning( |
| f"API key marked as failed. {failed_count}/{total} keys failed." |
| ) |
|
|
| def mark_key_success(self, key: str): |
| """Mark a key as successful (remove from failed if present).""" |
| with self._lock: |
| self._failed_keys.pop(key, None) |
|
|
|
|
| |
| key_manager = APIKeyManager() |
|
|
|
|
| |
|
|
| def call_llm( |
| system_prompt: str, |
| user_content: str, |
| model_id: str = DEFAULT_MODEL, |
| max_tokens: int = 8192, |
| temperature: float = 0.3, |
| ) -> Tuple[str, bool]: |
| """ |
| Call the LLM API with automatic key rotation and retry logic. |
| |
| Returns: |
| Tuple of (response_text, success_bool) |
| """ |
| backoff = INITIAL_BACKOFF_SECONDS |
|
|
| for attempt in range(MAX_RETRIES): |
| api_key = key_manager.get_next_key() |
| if not api_key: |
| logger.error("No API keys available!") |
| return "ERROR: No API keys available", False |
|
|
| try: |
| client = OpenAI( |
| base_url=HF_API_BASE_URL, |
| api_key=api_key, |
| ) |
|
|
| response = client.chat.completions.create( |
| model=model_id, |
| messages=[ |
| {"role": "system", "content": system_prompt}, |
| {"role": "user", "content": user_content}, |
| ], |
| max_tokens=max_tokens, |
| temperature=temperature, |
| stream=False, |
| ) |
|
|
| result = response.choices[0].message.content |
| if result and result.strip(): |
| key_manager.mark_key_success(api_key) |
| return result.strip(), True |
| else: |
| logger.warning(f"Empty response from API on attempt {attempt + 1}") |
| key_manager.mark_key_failed(api_key) |
|
|
| except RateLimitError as e: |
| logger.warning( |
| f"Rate limit hit (attempt {attempt + 1}/{MAX_RETRIES}): {e}" |
| ) |
| key_manager.mark_key_failed(api_key) |
|
|
| except APIStatusError as e: |
| status_code = getattr(e, 'status_code', None) |
| logger.warning( |
| f"API status error {status_code} (attempt {attempt + 1}/{MAX_RETRIES}): {e}" |
| ) |
| key_manager.mark_key_failed(api_key) |
|
|
| if status_code and status_code >= 500: |
| |
| backoff = min(backoff * 2, MAX_BACKOFF_SECONDS) |
|
|
| except APIConnectionError as e: |
| logger.warning( |
| f"API connection error (attempt {attempt + 1}/{MAX_RETRIES}): {e}" |
| ) |
| key_manager.mark_key_failed(api_key) |
|
|
| except APIError as e: |
| logger.warning( |
| f"API error (attempt {attempt + 1}/{MAX_RETRIES}): {e}" |
| ) |
| key_manager.mark_key_failed(api_key) |
|
|
| except Exception as e: |
| logger.error( |
| f"Unexpected error (attempt {attempt + 1}/{MAX_RETRIES}): {e}" |
| ) |
| key_manager.mark_key_failed(api_key) |
|
|
| |
| if attempt < MAX_RETRIES - 1: |
| jitter = random.uniform(0, backoff * 0.3) |
| sleep_time = backoff + jitter |
| logger.info(f"Retrying in {sleep_time:.1f} seconds...") |
| time.sleep(sleep_time) |
| backoff = min(backoff * 2, MAX_BACKOFF_SECONDS) |
|
|
| return "ERROR: All retry attempts exhausted", False |
|
|
|
|
| |
|
|
| def translate_chapter( |
| novel_id: int, |
| chapter_id: int, |
| chapter_number: int, |
| chapter_text: str, |
| model_id: str = DEFAULT_MODEL, |
| mc_original_name: Optional[str] = None, |
| mc_indian_name: Optional[str] = None, |
| use_pre_translation: bool = True, |
| ) -> Tuple[str, bool]: |
| """ |
| Translate a single chapter with chunking support. |
| |
| Steps: |
| 1. Pre-translate Chinese -> Hindi via Google Translate (rough) |
| 2. Split into chunks if needed |
| 3. For each chunk: build prompt with glossary, call LLM |
| 4. Extract new mappings and store them |
| 5. Combine all translated chunks |
| """ |
| logger.info( |
| f"Translating Chapter {chapter_number} " |
| f"({len(chapter_text)} chars) with model {model_id}" |
| ) |
|
|
| |
| if use_pre_translation and is_chinese_text(chapter_text): |
| logger.info(f"Pre-translating Chinese text for Chapter {chapter_number}") |
| pre_translated = pre_translate_to_hindi(chapter_text) |
| |
| processing_text = ( |
| f"[मूल Chinese text]:\n{chapter_text}\n\n" |
| f"[Google Translate Hindi (rough, refine this)]:\n{pre_translated}" |
| ) |
| else: |
| processing_text = chapter_text |
|
|
| |
| chunks = split_long_chapter(processing_text, MAX_CHUNK_CHARS) |
| logger.info(f"Chapter {chapter_number} split into {len(chunks)} chunks") |
|
|
| translated_parts = [] |
| all_success = True |
|
|
| for chunk_idx, chunk in enumerate(chunks): |
| |
| system_prompt = build_system_prompt( |
| novel_id=novel_id, |
| mc_original_name=mc_original_name, |
| mc_indian_name=mc_indian_name, |
| ) |
|
|
| if len(chunks) > 1: |
| user_content = ( |
| f"[Chapter {chapter_number}, Part {chunk_idx + 1}/{len(chunks)}]\n\n" |
| f"{chunk}" |
| ) |
| else: |
| user_content = f"[Chapter {chapter_number}]\n\n{chunk}" |
|
|
| |
| response, success = call_llm( |
| system_prompt=system_prompt, |
| user_content=user_content, |
| model_id=model_id, |
| ) |
|
|
| if not success: |
| logger.error( |
| f"Failed to translate Chapter {chapter_number}, chunk {chunk_idx + 1}" |
| ) |
| db.mark_chapter_error( |
| chapter_id, |
| f"Chunk {chunk_idx + 1}/{len(chunks)} failed: {response}" |
| ) |
| all_success = False |
| translated_parts.append( |
| f"\n[अनुवाद त्रुटि - Chapter {chapter_number}, Part {chunk_idx + 1}]\n" |
| ) |
| continue |
|
|
| |
| clean_text, new_mappings = extract_new_mappings(response) |
|
|
| if new_mappings: |
| process_and_store_mappings( |
| novel_id=novel_id, |
| mappings=new_mappings, |
| chapter_number=chapter_number, |
| ) |
|
|
| translated_parts.append(clean_text) |
|
|
| |
| if chunk_idx < len(chunks) - 1: |
| time.sleep(0.5) |
|
|
| |
| full_translation = "\n\n".join(translated_parts) |
|
|
| |
| if all_success: |
| db.save_translated_chapter(chapter_id, full_translation) |
| logger.info(f"Chapter {chapter_number} translated successfully") |
| else: |
| |
| db.save_translated_chapter(chapter_id, full_translation) |
| logger.warning(f"Chapter {chapter_number} translated with some errors") |
|
|
| return full_translation, all_success |
|
|
|
|
| |
|
|
| def process_novel( |
| novel_id: int, |
| model_id: str = DEFAULT_MODEL, |
| progress_callback=None, |
| ) -> Dict: |
| """ |
| Process an entire novel: translate all chapters sequentially. |
| |
| Args: |
| novel_id: ID of the novel in database |
| model_id: LLM model to use |
| progress_callback: Function to call with progress updates |
| signature: callback(chapter_num, total_chapters, status_message) |
| |
| Returns: |
| Dict with processing results |
| """ |
| novel_data = db.get_novel(novel_id) |
| if not novel_data: |
| return {"error": "Novel not found"} |
|
|
| mc_original = novel_data.mc_original_name |
| mc_indian = novel_data.mc_indian_name |
| total_chapters = novel_data.total_chapters |
|
|
| |
| db.update_novel_status(novel_id, "processing") |
|
|
| results = { |
| "total": total_chapters, |
| "success": 0, |
| "failed": 0, |
| "output_files": [], |
| } |
|
|
| processed_count = db.get_processed_chapter_count(novel_id) |
|
|
| while True: |
| |
| unprocessed = db.get_unprocessed_chapters(novel_id, limit=1) |
| if not unprocessed: |
| break |
|
|
| chapter = unprocessed[0] |
| chapter_num = chapter["chapter_number"] |
|
|
| if progress_callback: |
| progress_callback( |
| chapter_num, |
| total_chapters, |
| f"अध्याय {chapter_num}/{total_chapters} का अनुवाद हो रहा है..." |
| ) |
|
|
| translation, success = translate_chapter( |
| novel_id=novel_id, |
| chapter_id=chapter["id"], |
| chapter_number=chapter_num, |
| chapter_text=chapter["original_text"], |
| model_id=model_id, |
| mc_original_name=mc_original, |
| mc_indian_name=mc_indian, |
| ) |
|
|
| if success: |
| results["success"] += 1 |
| else: |
| results["failed"] += 1 |
|
|
| processed_count += 1 |
| db.update_novel_status(novel_id, "processing", processed_count) |
|
|
| |
| if processed_count % 20 == 0 or processed_count == total_chapters: |
| batch_start = ((processed_count - 1) // 20) * 20 + 1 |
| batch_end = min(batch_start + 19, total_chapters) |
|
|
| output_file = generate_output_file( |
| novel_id, batch_start, batch_end, novel_data.title |
| ) |
| if output_file: |
| results["output_files"].append(output_file) |
|
|
| if progress_callback: |
| progress_callback( |
| chapter_num, |
| total_chapters, |
| f"✅ अध्याय {batch_start}-{batch_end} की फ़ाइल तैयार!" |
| ) |
|
|
| |
| if results["failed"] == 0: |
| db.update_novel_status(novel_id, "completed", processed_count) |
| else: |
| db.update_novel_status(novel_id, "completed_with_errors", processed_count) |
|
|
| return results |
|
|
|
|
| def generate_output_file( |
| novel_id: int, |
| chapter_start: int, |
| chapter_end: int, |
| novel_title: str, |
| ) -> Optional[Dict]: |
| """Generate an output .txt file for a range of chapters.""" |
|
|
| chapters = db.get_translated_chapters_range(novel_id, chapter_start, chapter_end) |
| if not chapters: |
| return None |
|
|
| |
| lines = [] |
| lines.append(f"{'='*60}") |
| lines.append(f" {novel_title} - Hindi Localized Version") |
| lines.append(f" अध्याय {chapter_start} से {chapter_end}") |
| lines.append(f" Automated Translation & Indianization") |
| lines.append(f"{'='*60}\n") |
|
|
| for ch in chapters: |
| lines.append(f"\n{'─'*40}") |
| lines.append(f"अध्याय {ch['chapter_number']}") |
| if ch.get("original_title"): |
| lines.append(f"(मूल: {ch['original_title']})") |
| lines.append(f"{'─'*40}\n") |
| lines.append(ch["translated_text"]) |
| lines.append("") |
|
|
| file_content = "\n".join(lines) |
| filename = f"{novel_title}_chapters_{chapter_start}-{chapter_end}.txt" |
|
|
| |
| db.save_output_file( |
| novel_id=novel_id, |
| filename=filename, |
| chapter_start=chapter_start, |
| chapter_end=chapter_end, |
| file_content=file_content, |
| ) |
|
|
| return { |
| "filename": filename, |
| "chapter_start": chapter_start, |
| "chapter_end": chapter_end, |
| "size": len(file_content), |
| } |
|
|