Novelind / llm_processor.py
Ruhivig65's picture
Upload 10 files
851e238 verified
"""
llm_processor.py - LLM API integration with key rotation, retry, and chunking.
Handles all communication with the HuggingFace Router API.
"""
import time
import random
import logging
import threading
from typing import List, Dict, Optional, Tuple
from dataclasses import dataclass, field
from openai import OpenAI
from openai import (
APIError,
APIConnectionError,
RateLimitError,
APIStatusError,
)
from config import (
HF_API_BASE_URL,
SUPPORTED_MODELS,
DEFAULT_MODEL,
MAX_RETRIES,
INITIAL_BACKOFF_SECONDS,
MAX_BACKOFF_SECONDS,
MAX_CHUNK_CHARS,
)
from translator import pre_translate_to_hindi, is_chinese_text
from glossary_manager import (
build_system_prompt,
extract_new_mappings,
process_and_store_mappings,
)
from chapter_detector import split_long_chapter
import database as db
logger = logging.getLogger(__name__)
# ─── API Key Manager ────────────────────────────────────────────────────────────
class APIKeyManager:
"""
Thread-safe API key pool manager with rotation and failure tracking.
"""
def __init__(self):
self._keys: List[str] = []
self._current_index: int = 0
self._failed_keys: Dict[str, float] = {} # key -> failure_timestamp
self._lock = threading.Lock()
self._failure_cooldown = 300 # 5 minutes cooldown for failed keys
def load_keys(self, keys_text: str):
"""Load API keys from text (one per line or comma-separated)."""
with self._lock:
# Support both newline and comma separated
raw_keys = keys_text.replace(',', '\n').split('\n')
self._keys = [
k.strip() for k in raw_keys
if k.strip() and len(k.strip()) > 10
]
self._current_index = 0
self._failed_keys.clear()
logger.info(f"Loaded {len(self._keys)} API keys")
@property
def total_keys(self) -> int:
return len(self._keys)
@property
def available_keys(self) -> int:
now = time.time()
with self._lock:
failed_count = sum(
1 for ts in self._failed_keys.values()
if now - ts < self._failure_cooldown
)
return len(self._keys) - failed_count
def get_next_key(self) -> Optional[str]:
"""Get the next available API key, skipping recently failed ones."""
with self._lock:
if not self._keys:
return None
now = time.time()
attempts = 0
total = len(self._keys)
while attempts < total:
key = self._keys[self._current_index]
self._current_index = (self._current_index + 1) % total
# Check if key is in cooldown
if key in self._failed_keys:
if now - self._failed_keys[key] < self._failure_cooldown:
attempts += 1
continue
else:
# Cooldown expired, remove from failed
del self._failed_keys[key]
return key
# All keys are in cooldown; return the oldest failed one
logger.warning("All API keys are in cooldown! Using oldest failed key.")
if self._failed_keys:
oldest_key = min(self._failed_keys, key=self._failed_keys.get)
del self._failed_keys[oldest_key]
return oldest_key
return self._keys[0] if self._keys else None
def mark_key_failed(self, key: str):
"""Mark a key as failed (rate limited, quota exceeded, etc.)."""
with self._lock:
self._failed_keys[key] = time.time()
failed_count = len(self._failed_keys)
total = len(self._keys)
logger.warning(
f"API key marked as failed. {failed_count}/{total} keys failed."
)
def mark_key_success(self, key: str):
"""Mark a key as successful (remove from failed if present)."""
with self._lock:
self._failed_keys.pop(key, None)
# ─── Global Key Manager Instance ────────────────────────────────────────────────
key_manager = APIKeyManager()
# ─── LLM API Call ────────────────────────────────────────────────────────────────
def call_llm(
system_prompt: str,
user_content: str,
model_id: str = DEFAULT_MODEL,
max_tokens: int = 8192,
temperature: float = 0.3,
) -> Tuple[str, bool]:
"""
Call the LLM API with automatic key rotation and retry logic.
Returns:
Tuple of (response_text, success_bool)
"""
backoff = INITIAL_BACKOFF_SECONDS
for attempt in range(MAX_RETRIES):
api_key = key_manager.get_next_key()
if not api_key:
logger.error("No API keys available!")
return "ERROR: No API keys available", False
try:
client = OpenAI(
base_url=HF_API_BASE_URL,
api_key=api_key,
)
response = client.chat.completions.create(
model=model_id,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_content},
],
max_tokens=max_tokens,
temperature=temperature,
stream=False,
)
result = response.choices[0].message.content
if result and result.strip():
key_manager.mark_key_success(api_key)
return result.strip(), True
else:
logger.warning(f"Empty response from API on attempt {attempt + 1}")
key_manager.mark_key_failed(api_key)
except RateLimitError as e:
logger.warning(
f"Rate limit hit (attempt {attempt + 1}/{MAX_RETRIES}): {e}"
)
key_manager.mark_key_failed(api_key)
except APIStatusError as e:
status_code = getattr(e, 'status_code', None)
logger.warning(
f"API status error {status_code} (attempt {attempt + 1}/{MAX_RETRIES}): {e}"
)
key_manager.mark_key_failed(api_key)
if status_code and status_code >= 500:
# Server error, wait longer
backoff = min(backoff * 2, MAX_BACKOFF_SECONDS)
except APIConnectionError as e:
logger.warning(
f"API connection error (attempt {attempt + 1}/{MAX_RETRIES}): {e}"
)
key_manager.mark_key_failed(api_key)
except APIError as e:
logger.warning(
f"API error (attempt {attempt + 1}/{MAX_RETRIES}): {e}"
)
key_manager.mark_key_failed(api_key)
except Exception as e:
logger.error(
f"Unexpected error (attempt {attempt + 1}/{MAX_RETRIES}): {e}"
)
key_manager.mark_key_failed(api_key)
# Exponential backoff with jitter
if attempt < MAX_RETRIES - 1:
jitter = random.uniform(0, backoff * 0.3)
sleep_time = backoff + jitter
logger.info(f"Retrying in {sleep_time:.1f} seconds...")
time.sleep(sleep_time)
backoff = min(backoff * 2, MAX_BACKOFF_SECONDS)
return "ERROR: All retry attempts exhausted", False
# ─── Chapter Translation Engine ─────────────────────────────────────────────────
def translate_chapter(
novel_id: int,
chapter_id: int,
chapter_number: int,
chapter_text: str,
model_id: str = DEFAULT_MODEL,
mc_original_name: Optional[str] = None,
mc_indian_name: Optional[str] = None,
use_pre_translation: bool = True,
) -> Tuple[str, bool]:
"""
Translate a single chapter with chunking support.
Steps:
1. Pre-translate Chinese -> Hindi via Google Translate (rough)
2. Split into chunks if needed
3. For each chunk: build prompt with glossary, call LLM
4. Extract new mappings and store them
5. Combine all translated chunks
"""
logger.info(
f"Translating Chapter {chapter_number} "
f"({len(chapter_text)} chars) with model {model_id}"
)
# Step 1: Pre-translate if Chinese
if use_pre_translation and is_chinese_text(chapter_text):
logger.info(f"Pre-translating Chinese text for Chapter {chapter_number}")
pre_translated = pre_translate_to_hindi(chapter_text)
# Send both original and pre-translated to LLM
processing_text = (
f"[मूल Chinese text]:\n{chapter_text}\n\n"
f"[Google Translate Hindi (rough, refine this)]:\n{pre_translated}"
)
else:
processing_text = chapter_text
# Step 2: Split into chunks
chunks = split_long_chapter(processing_text, MAX_CHUNK_CHARS)
logger.info(f"Chapter {chapter_number} split into {len(chunks)} chunks")
translated_parts = []
all_success = True
for chunk_idx, chunk in enumerate(chunks):
# Step 3: Build prompt with current glossary
system_prompt = build_system_prompt(
novel_id=novel_id,
mc_original_name=mc_original_name,
mc_indian_name=mc_indian_name,
)
if len(chunks) > 1:
user_content = (
f"[Chapter {chapter_number}, Part {chunk_idx + 1}/{len(chunks)}]\n\n"
f"{chunk}"
)
else:
user_content = f"[Chapter {chapter_number}]\n\n{chunk}"
# Step 4: Call LLM
response, success = call_llm(
system_prompt=system_prompt,
user_content=user_content,
model_id=model_id,
)
if not success:
logger.error(
f"Failed to translate Chapter {chapter_number}, chunk {chunk_idx + 1}"
)
db.mark_chapter_error(
chapter_id,
f"Chunk {chunk_idx + 1}/{len(chunks)} failed: {response}"
)
all_success = False
translated_parts.append(
f"\n[अनुवाद त्रुटि - Chapter {chapter_number}, Part {chunk_idx + 1}]\n"
)
continue
# Step 5: Extract mappings from response
clean_text, new_mappings = extract_new_mappings(response)
if new_mappings:
process_and_store_mappings(
novel_id=novel_id,
mappings=new_mappings,
chapter_number=chapter_number,
)
translated_parts.append(clean_text)
# Small delay between chunks
if chunk_idx < len(chunks) - 1:
time.sleep(0.5)
# Combine all parts
full_translation = "\n\n".join(translated_parts)
# Save to database
if all_success:
db.save_translated_chapter(chapter_id, full_translation)
logger.info(f"Chapter {chapter_number} translated successfully")
else:
# Save partial translation
db.save_translated_chapter(chapter_id, full_translation)
logger.warning(f"Chapter {chapter_number} translated with some errors")
return full_translation, all_success
# ─── Batch Processing ───────────────────────────────────────────────────────────
def process_novel(
novel_id: int,
model_id: str = DEFAULT_MODEL,
progress_callback=None,
) -> Dict:
"""
Process an entire novel: translate all chapters sequentially.
Args:
novel_id: ID of the novel in database
model_id: LLM model to use
progress_callback: Function to call with progress updates
signature: callback(chapter_num, total_chapters, status_message)
Returns:
Dict with processing results
"""
novel_data = db.get_novel(novel_id)
if not novel_data:
return {"error": "Novel not found"}
mc_original = novel_data.mc_original_name
mc_indian = novel_data.mc_indian_name
total_chapters = novel_data.total_chapters
# Update status
db.update_novel_status(novel_id, "processing")
results = {
"total": total_chapters,
"success": 0,
"failed": 0,
"output_files": [],
}
processed_count = db.get_processed_chapter_count(novel_id)
while True:
# Get next batch of unprocessed chapters
unprocessed = db.get_unprocessed_chapters(novel_id, limit=1)
if not unprocessed:
break
chapter = unprocessed[0]
chapter_num = chapter["chapter_number"]
if progress_callback:
progress_callback(
chapter_num,
total_chapters,
f"अध्याय {chapter_num}/{total_chapters} का अनुवाद हो रहा है..."
)
translation, success = translate_chapter(
novel_id=novel_id,
chapter_id=chapter["id"],
chapter_number=chapter_num,
chapter_text=chapter["original_text"],
model_id=model_id,
mc_original_name=mc_original,
mc_indian_name=mc_indian,
)
if success:
results["success"] += 1
else:
results["failed"] += 1
processed_count += 1
db.update_novel_status(novel_id, "processing", processed_count)
# Check if we've completed a batch of 20
if processed_count % 20 == 0 or processed_count == total_chapters:
batch_start = ((processed_count - 1) // 20) * 20 + 1
batch_end = min(batch_start + 19, total_chapters)
output_file = generate_output_file(
novel_id, batch_start, batch_end, novel_data.title
)
if output_file:
results["output_files"].append(output_file)
if progress_callback:
progress_callback(
chapter_num,
total_chapters,
f"✅ अध्याय {batch_start}-{batch_end} की फ़ाइल तैयार!"
)
# Final status
if results["failed"] == 0:
db.update_novel_status(novel_id, "completed", processed_count)
else:
db.update_novel_status(novel_id, "completed_with_errors", processed_count)
return results
def generate_output_file(
novel_id: int,
chapter_start: int,
chapter_end: int,
novel_title: str,
) -> Optional[Dict]:
"""Generate an output .txt file for a range of chapters."""
chapters = db.get_translated_chapters_range(novel_id, chapter_start, chapter_end)
if not chapters:
return None
# Build file content
lines = []
lines.append(f"{'='*60}")
lines.append(f" {novel_title} - Hindi Localized Version")
lines.append(f" अध्याय {chapter_start} से {chapter_end}")
lines.append(f" Automated Translation & Indianization")
lines.append(f"{'='*60}\n")
for ch in chapters:
lines.append(f"\n{'─'*40}")
lines.append(f"अध्याय {ch['chapter_number']}")
if ch.get("original_title"):
lines.append(f"(मूल: {ch['original_title']})")
lines.append(f"{'─'*40}\n")
lines.append(ch["translated_text"])
lines.append("")
file_content = "\n".join(lines)
filename = f"{novel_title}_chapters_{chapter_start}-{chapter_end}.txt"
# Save to database
db.save_output_file(
novel_id=novel_id,
filename=filename,
chapter_start=chapter_start,
chapter_end=chapter_end,
file_content=file_content,
)
return {
"filename": filename,
"chapter_start": chapter_start,
"chapter_end": chapter_end,
"size": len(file_content),
}