MedAI_Processing / utils /cloud_llm.py
LiamKhoaLe's picture
Upd env pref
7fbd1d7
# Round-robin rotator + paraphrasing + translation/backtranslation
import os
import logging
import requests
import time
from typing import Optional
# Dynamic import for Google GenAI (only when not in local mode)
def _import_google_genai():
"""Dynamically import Google GenAI only when needed"""
try:
from google import genai
return genai
except ImportError as e:
raise ImportError(f"Google GenAI not available: {e}. Make sure IS_LOCAL=false and google-genai is installed.")
# Check if we're in local mode
IS_LOCAL = os.getenv("IS_LOCAL", "false").lower() == "true"
# Only import Google GenAI if not in local mode
if not IS_LOCAL:
try:
genai = _import_google_genai()
except ImportError:
genai = None
else:
genai = None
logger = logging.getLogger("llm")
if not logger.handlers:
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
logger.addHandler(handler)
# LLM parser limit text to log-out
def snip(s: str, n: int = 12) -> str:
if not isinstance(s, str): return "∅"
parts = s.strip().split()
return " ".join(parts[:n]) + (" …" if len(parts) > n else "")
class KeyRotator:
def __init__(self, env_prefix: str, max_keys: int = 5):
self.env_prefix = env_prefix # Store as instance variable
keys = []
for i in range(1, max_keys + 1):
v = os.getenv(f"{env_prefix}_{i}")
if v:
keys.append(v.strip())
if not keys:
logger.warning(f"[LLM] No keys found for prefix {env_prefix}_*")
self.keys = keys
self.dead = set() # Permanently dead keys
self.temp_dead = {} # Temporarily dead keys with retry time
self.retry_counts = {} # Track retry attempts per key
self.idx = 0
self.max_retries = 3 # Max retries before permanent death
self.retry_delay = 60 # Seconds to wait before retry
def next_key(self) -> Optional[str]:
if not self.keys:
return None
# Clean up expired temporary dead keys
current_time = time.time()
expired_keys = [k for k, retry_time in self.temp_dead.items() if current_time > retry_time]
for k in expired_keys:
del self.temp_dead[k]
logger.info(f"[LLM] Key {k[:6]}*** is back in rotation after cooldown")
# Try to find an available key
for _ in range(len(self.keys)):
k = self.keys[self.idx % len(self.keys)]
self.idx += 1
# Skip permanently dead keys
if k in self.dead:
continue
# Skip temporarily dead keys
if k in self.temp_dead and current_time < self.temp_dead[k]:
continue
return k
# All keys are dead or temporarily unavailable
logger.warning(f"[LLM] All keys for {self.env_prefix} are unavailable")
return None
def mark_bad(self, key: Optional[str], error_type: str = "unknown"):
if not key:
return
current_time = time.time()
retry_count = self.retry_counts.get(key, 0)
# Determine if this is a temporary or permanent failure
is_temporary = self._is_temporary_error(error_type)
if is_temporary and retry_count < self.max_retries:
# Temporary failure - add to temp_dead with retry time
retry_delay = self.retry_delay * (2 ** retry_count) # Exponential backoff
self.temp_dead[key] = current_time + retry_delay
self.retry_counts[key] = retry_count + 1
logger.warning(f"[LLM] Key {key[:6]}*** temporarily quarantined for {retry_delay}s (attempt {retry_count + 1}/{self.max_retries})")
else:
# Permanent failure or max retries reached
self.dead.add(key)
if key in self.temp_dead:
del self.temp_dead[key]
if key in self.retry_counts:
del self.retry_counts[key]
logger.error(f"[LLM] Key {key[:6]}*** permanently quarantined after {retry_count} retries")
def _is_temporary_error(self, error_type: str) -> bool:
"""Determine if an error is temporary and worth retrying"""
temporary_errors = [
"rate_limit", "quota_exceeded", "too_many_requests", "429",
"service_unavailable", "503", "bad_gateway", "502",
"timeout", "connection_error", "network_error"
]
error_lower = error_type.lower()
return any(temp_err in error_lower for temp_err in temporary_errors)
def get_stats(self) -> dict:
"""Get rotator statistics"""
return {
"total_keys": len(self.keys),
"dead_keys": len(self.dead),
"temp_dead_keys": len(self.temp_dead),
"available_keys": len(self.keys) - len(self.dead) - len(self.temp_dead),
"retry_counts": self.retry_counts.copy()
}
class GeminiClient:
def __init__(self, rotator: KeyRotator, default_model: str):
self.rotator = rotator
self.default_model = default_model
self.available = genai is not None and not IS_LOCAL
def generate(self, prompt: str, model: Optional[str] = None, temperature: float = 0.2, max_output_tokens: int = 512) -> Optional[str]:
if not self.available:
logger.warning("[LLM][Gemini] Google GenAI not available (local mode or import failed)")
return None
key = self.rotator.next_key()
if not key:
return None
try:
client = genai.Client(api_key=key)
# NOTE: matches your required pattern/use
res = client.models.generate_content(
model=model or self.default_model,
contents=prompt
)
text = getattr(res, "text", None)
if text:
logger.info(f"[LLM][Gemini] out={snip(text)}")
return text
except Exception as e:
error_msg = str(e)
logger.error(f"[LLM][Gemini] {error_msg}")
self.rotator.mark_bad(key, error_msg)
return None
class NvidiaClient:
def __init__(self, rotator: KeyRotator, default_model: str):
self.rotator = rotator
self.default_model = default_model
self.url = os.getenv("NVIDIA_API_URL", "https://integrate.api.nvidia.com/v1/chat/completions")
# Regex-based cleaning resp from quotes
def _clean_resp(self, resp: str) -> str:
if not resp: return resp
txt = resp.strip()
# Remove common boilerplate prefixes
for pat in [
r"^Here is (a|the) .*?:\s*",
r"^Paraphrased(?: version)?:\s*",
r"^Sure[,.]?\s*",
r"^Okay[,.]?\s*"
]:
import re
txt = re.sub(pat, "", txt, flags=re.I)
return txt.strip()
def generate(self, prompt: str, model: Optional[str] = None, temperature: float = 0.2, max_tokens: int = 512) -> Optional[str]:
key = self.rotator.next_key()
if not key:
return None
try:
headers = {"Authorization": f"Bearer {key}", "Content-Type": "application/json"}
payload = {
"model": model or self.default_model,
"messages": [{"role": "user", "content": prompt}],
"temperature": temperature,
"max_tokens": max_tokens
}
r = requests.post(self.url, headers=headers, json=payload, timeout=45)
if r.status_code >= 400:
raise RuntimeError(f"HTTP {r.status_code}: {r.text[:200]}")
data = r.json()
text = data["choices"][0]["message"]["content"]
clean = self._clean_resp(text)
# Log the output here
logger.info(f"[LLM][NVIDIA] out={snip(clean)}")
return clean
except Exception as e:
error_msg = str(e)
logger.error(f"[LLM][NVIDIA] {error_msg}")
self.rotator.mark_bad(key, error_msg)
return None
class Paraphraser:
"""Intelligent API load balancer with rate limiting and cost optimization."""
def __init__(self, nvidia_model: str, gemini_model_easy: str, gemini_model_hard: str):
self.nv = NvidiaClient(KeyRotator("NVIDIA_API"), nvidia_model)
self.gm_easy = GeminiClient(KeyRotator("GEMINI_API"), gemini_model_easy)
# Only use GEMINI_MODEL_EASY, ignore hard model completely
self.gm_hard = None # Disabled - only use easy model
# Rate limiting and load balancing
self.last_nvidia_call = 0
self.last_gemini_call = 0
self.min_call_interval = 0.1 # Minimum 100ms between calls
self.nvidia_success_rate = 1.0 # Track success rates for load balancing
self.gemini_success_rate = 1.0
self.call_counts = {"nvidia": 0, "gemini": 0, "failures": 0}
logger.info("Paraphraser initialized with intelligent load balancing: NVIDIA -> GEMINI_EASY")
def _rate_limit(self, api_type: str):
"""Apply rate limiting to prevent API exhaustion"""
current_time = time.time()
if api_type == "nvidia":
time_since_last = current_time - self.last_nvidia_call
if time_since_last < self.min_call_interval:
sleep_time = self.min_call_interval - time_since_last
time.sleep(sleep_time)
self.last_nvidia_call = time.time()
elif api_type == "gemini":
time_since_last = current_time - self.last_gemini_call
if time_since_last < self.min_call_interval:
sleep_time = self.min_call_interval - time_since_last
time.sleep(sleep_time)
self.last_gemini_call = time.time()
def _select_api(self, prefer_cheap: bool = True) -> str:
"""Intelligently select API based on success rates and availability"""
nvidia_stats = self.nv.rotator.get_stats()
gemini_stats = self.gm_easy.rotator.get_stats()
nvidia_available = nvidia_stats["available_keys"] > 0
gemini_available = gemini_stats["available_keys"] > 0
if not nvidia_available and not gemini_available:
return "none"
elif not nvidia_available:
return "gemini"
elif not gemini_available:
return "nvidia"
# Both available - use intelligent selection
if prefer_cheap:
# Prefer NVIDIA (cheaper) but consider success rates
if self.nvidia_success_rate > 0.8 or self.gemini_success_rate < 0.5:
return "nvidia"
else:
return "gemini"
else:
# Prefer quality (Gemini) but consider success rates
if self.gemini_success_rate > 0.8 or self.nvidia_success_rate < 0.5:
return "gemini"
else:
return "nvidia"
def _update_success_rate(self, api_type: str, success: bool):
"""Update success rate tracking for load balancing"""
if api_type == "nvidia":
# Exponential moving average
alpha = 0.1
self.nvidia_success_rate = alpha * (1.0 if success else 0.0) + (1 - alpha) * self.nvidia_success_rate
elif api_type == "gemini":
alpha = 0.1
self.gemini_success_rate = alpha * (1.0 if success else 0.0) + (1 - alpha) * self.gemini_success_rate
def _call_api(self, prompt: str, api_type: str, **kwargs) -> Optional[str]:
"""Make API call with rate limiting and error tracking"""
self._rate_limit(api_type)
try:
if api_type == "nvidia":
result = self.nv.generate(prompt, **kwargs)
self.call_counts["nvidia"] += 1
success = result is not None
self._update_success_rate("nvidia", success)
return result
elif api_type == "gemini":
result = self.gm_easy.generate(prompt, **kwargs)
self.call_counts["gemini"] += 1
success = result is not None
self._update_success_rate("gemini", success)
return result
except Exception as e:
self.call_counts["failures"] += 1
self._update_success_rate(api_type, False)
logger.error(f"[LLM] API call failed for {api_type}: {e}")
return None
return None
# Enhanced cleaning to remove conversational elements and comments
def _clean_resp(self, resp: str) -> str:
if not resp: return resp
txt = resp.strip()
# Remove common conversational prefixes and comments
prefixes_to_remove = [
"Here's a rewritten version of",
"Here is a rewritten version of",
"Here's the rewritten text:",
"Here is the rewritten text:",
"Here's the translation:",
"Here is the translation:",
"Here's the enhanced text:",
"Here is the enhanced text:",
"Here's the improved text:",
"Here is the improved text:",
"Here's the medical context:",
"Here is the medical context:",
"Here's the cleaned text:",
"Here is the cleaned text:",
"Here's the answer:",
"Here is the answer:",
"Here's a paraphrased version:",
"Here is a paraphrased version:",
"Paraphrased version:",
"Paraphrased:",
"Sure,",
"Okay,",
"Certainly,",
"Of course,",
"I can help you with that.",
"I'll help you with that.",
"Let me help you with that.",
"I can rewrite that for you.",
"I'll rewrite that for you.",
"Let me rewrite that for you.",
"I can translate that for you.",
"I'll translate that for you.",
"Let me translate that for you.",
]
# Remove prefixes
for prefix in prefixes_to_remove:
if txt.lower().startswith(prefix.lower()):
txt = txt[len(prefix):].strip()
break
# Remove common boilerplate prefixes with regex
import re
for pat in [
r"^Here is (a|the) .*?:\s*",
r"^Paraphrased(?: version)?:\s*",
r"^Sure[,.]?\s*",
r"^Okay[,.]?\s*",
r"^Certainly[,.]?\s*",
r"^Of course[,.]?\s*",
r"^I can .*?:\s*",
r"^I'll .*?:\s*",
r"^Let me .*?:\s*"
]:
txt = re.sub(pat, "", txt, flags=re.I)
# Remove any remaining conversational elements
lines = txt.split('\n')
cleaned_lines = []
for line in lines:
line = line.strip()
if line and not any(phrase in line.lower() for phrase in [
"here's", "here is", "let me", "i can", "i'll", "sure,", "okay,",
"certainly,", "of course,", "i hope this helps", "hope this helps",
"does this help", "is this what you", "let me know if"
]):
cleaned_lines.append(line)
return '\n'.join(cleaned_lines).strip()
# ————— Paraphrase —————
def paraphrase(self, text: str, difficulty: str = "easy", custom_prompt: str = None) -> str:
if not text or len(text) < 12:
return text
# Use custom prompt if provided, otherwise use optimized medical prompts
if custom_prompt:
prompt = custom_prompt
else:
# Optimized medical paraphrasing prompts based on difficulty
if difficulty == "easy":
prompt = (
"Rewrite the following medical text using different words while preserving all medical facts, clinical terms, and meaning. Keep the same level of detail and accuracy. Return only the rewritten text without any introduction or commentary.\n\n"
f"{text}"
)
else: # hard difficulty
prompt = (
"Rewrite the following medical text using more sophisticated medical language and different sentence structures while preserving all clinical facts, medical terminology, and diagnostic information. Maintain professional medical tone. Return only the rewritten text without any introduction or commentary.\n\n"
f"{text}"
)
# Optimize temperature and token limits based on difficulty
temperature = 0.1 if difficulty == "easy" else 0.3
max_tokens = min(600, max(128, len(text)//2))
# Intelligent API selection with fallback
api_type = self._select_api(prefer_cheap=True)
if api_type == "nvidia":
out = self._call_api(prompt, "nvidia", temperature=temperature, max_tokens=max_tokens)
if out:
return self._clean_resp(out)
# Fallback to Gemini if NVIDIA fails
api_type = self._select_api(prefer_cheap=False)
if api_type == "gemini":
out = self._call_api(prompt, "gemini", max_output_tokens=max_tokens)
if out:
logger.info(f"[LLM][GEMINI] out={snip(self._clean_resp(out))}")
return self._clean_resp(out)
# Both APIs failed
logger.warning(f"[LLM] All APIs failed for paraphrase, returning original text")
return text
# ————— Translate & Backtranslate —————
def translate(self, text: str, target_lang: str = "vi") -> Optional[str]:
if not text: return text
# Optimized medical translation prompts
if target_lang == "vi":
prompt = (
"Translate the following English medical text to Vietnamese while preserving all medical terminology, clinical facts, and professional medical language. Use appropriate Vietnamese medical terms. Return only the translation without any introduction or commentary.\n\n"
f"{text}"
)
else:
prompt = (
f"Translate the following medical text to {target_lang} while preserving all medical terminology, clinical facts, and professional medical language. Return only the translation without any introduction or commentary.\n\n"
f"{text}"
)
# Intelligent API selection for translation
api_type = self._select_api(prefer_cheap=True)
if api_type == "nvidia":
out = self._call_api(prompt, "nvidia", temperature=0.0, max_tokens=min(800, len(text)+100))
if out:
return out.strip()
# Fallback to Gemini if NVIDIA fails
api_type = self._select_api(prefer_cheap=False)
if api_type == "gemini":
out = self._call_api(prompt, "gemini", max_output_tokens=min(800, len(text)+100))
if out:
return out.strip()
return None
def backtranslate(self, text: str, via_lang: str = "vi") -> Optional[str]:
if not text: return text
mid = self.translate(text, target_lang=via_lang)
if not mid: return None
# Optimized backtranslation prompt with medical focus
if via_lang == "vi":
prompt = (
"Translate the following Vietnamese medical text back to English while preserving all medical terminology, clinical facts, and professional medical language. Ensure the translation is medically accurate. Return only the translation without any introduction or commentary.\n\n"
f"{mid}"
)
else:
prompt = (
f"Translate the following {via_lang} medical text back to English while preserving all medical terminology, clinical facts, and professional medical language. Return only the translation without any introduction or commentary.\n\n"
f"{mid}"
)
# Intelligent API selection for backtranslation
api_type = self._select_api(prefer_cheap=True)
if api_type == "nvidia":
out = self._call_api(prompt, "nvidia", temperature=0.0, max_tokens=min(900, len(text)+150))
if out:
return out.strip()
# Fallback to Gemini if NVIDIA fails
api_type = self._select_api(prefer_cheap=False)
if api_type == "gemini":
out = self._call_api(prompt, "gemini", max_output_tokens=min(900, len(text)+150))
if out:
return out.strip()
return None
# ————— Consistency Judge (cheap, ratio-based) —————
def consistency_check(self, user: str, output: str) -> bool:
"""Return True if 'output' appears supported by 'user' (context/question). Optimized medical validation."""
prompt = (
"Evaluate if the medical answer is consistent with the question/context and medically accurate. Consider medical accuracy, clinical appropriateness, consistency with the question, safety standards, and completeness of medical information. Reply with exactly 'PASS' if the answer is medically sound and consistent, otherwise 'FAIL'.\n\n"
f"Question/Context: {user}\n\n"
f"Medical Answer: {output}"
)
# Use intelligent API selection for consistency check
api_type = self._select_api(prefer_cheap=True)
if api_type == "nvidia":
out = self._call_api(prompt, "nvidia", temperature=0.0, max_tokens=5)
if out:
return isinstance(out, str) and "PASS" in out.upper()
# Fallback to Gemini if NVIDIA fails
api_type = self._select_api(prefer_cheap=False)
if api_type == "gemini":
out = self._call_api(prompt, "gemini", max_output_tokens=5)
if out:
return isinstance(out, str) and "PASS" in out.upper()
# If both APIs fail, assume consistency (conservative approach)
logger.warning("[LLM] Consistency check failed due to API unavailability, assuming consistent")
return True
def get_api_stats(self) -> dict:
"""Get comprehensive API usage statistics"""
nvidia_stats = self.nv.rotator.get_stats()
gemini_stats = self.gm_easy.rotator.get_stats()
return {
"call_counts": self.call_counts.copy(),
"success_rates": {
"nvidia": self.nvidia_success_rate,
"gemini": self.gemini_success_rate
},
"nvidia_rotator": nvidia_stats,
"gemini_rotator": gemini_stats,
"total_calls": sum(self.call_counts.values()),
"failure_rate": self.call_counts["failures"] / max(1, sum(self.call_counts.values()))
}
def medical_accuracy_check(self, question: str, answer: str) -> bool:
"""Check medical accuracy of Q&A pairs using cloud APIs"""
if not question or not answer:
return False
prompt = (
"Evaluate if the medical answer is accurate and appropriate for the question. Consider medical facts, clinical knowledge, appropriate medical terminology, clinical reasoning, logic, and safety considerations. Reply with exactly 'ACCURATE' if the answer is medically correct, otherwise 'INACCURATE'.\n\n"
f"Medical Question: {question}\n\n"
f"Medical Answer: {answer}"
)
out = self.nv.generate(prompt, temperature=0.0, max_tokens=5)
if not out:
out = self.gm_easy.generate(prompt, max_output_tokens=5)
return isinstance(out, str) and "ACCURATE" in out.upper()
def enhance_medical_terminology(self, text: str) -> str:
"""Enhance medical terminology in text using cloud APIs"""
if not text or len(text) < 20:
return text
prompt = (
"Improve the medical terminology in the following text while preserving all factual information and clinical accuracy. Use more precise medical terms where appropriate. Return only the improved text without any introduction or commentary.\n\n"
f"{text}"
)
out = self.nv.generate(prompt, temperature=0.1, max_tokens=min(800, len(text)+100))
if not out:
out = self.gm_easy.generate(prompt, max_output_tokens=min(800, len(text)+100))
return out if out else text
def create_clinical_scenarios(self, question: str, answer: str) -> list:
"""Create different clinical scenarios from Q&A pairs using cloud APIs"""
scenarios = []
# Different clinical context prompts
context_prompts = [
(
"Rewrite this medical question as if asked by a patient in an emergency room setting. Return only the rewritten question without any introduction or commentary:\n\n{question}",
"emergency_room"
),
(
"Rewrite this medical question as if asked by a patient during a routine checkup. Return only the rewritten question without any introduction or commentary:\n\n{question}",
"routine_checkup"
),
(
"Rewrite this medical question as if asked by a patient with chronic conditions. Return only the rewritten question without any introduction or commentary:\n\n{question}",
"chronic_care"
),
(
"Rewrite this medical question as if asked by a patient's family member. Return only the rewritten question without any introduction or commentary:\n\n{question}",
"family_inquiry"
)
]
for prompt_template, scenario_type in context_prompts:
try:
prompt = prompt_template.format(question=question)
scenario_question = self.paraphrase(question, difficulty="hard", custom_prompt=prompt)
if scenario_question and not self._is_invalid_response(scenario_question):
scenarios.append((scenario_question, answer, scenario_type))
except Exception as e:
logger.warning(f"Failed to create clinical scenario {scenario_type}: {e}")
continue
return scenarios
def _is_invalid_response(self, text: str) -> bool:
"""Check if response is invalid"""
if not text or not isinstance(text, str):
return True
text_lower = text.lower().strip()
invalid_patterns = [
"fail", "invalid", "i couldn't", "i can't", "i cannot", "unable to",
"sorry", "error", "not available", "no answer", "insufficient",
"don't know", "do not know", "not sure", "cannot determine",
"unable to provide", "not possible", "not applicable", "n/a"
]
if len(text_lower) < 3:
return True
for pattern in invalid_patterns:
if pattern in text_lower:
return True
return False