Spaces:
Sleeping
Sleeping
| # Round-robin rotator + paraphrasing + translation/backtranslation | |
| import os | |
| import logging | |
| import requests | |
| import time | |
| from typing import Optional | |
| # Dynamic import for Google GenAI (only when not in local mode) | |
| def _import_google_genai(): | |
| """Dynamically import Google GenAI only when needed""" | |
| try: | |
| from google import genai | |
| return genai | |
| except ImportError as e: | |
| raise ImportError(f"Google GenAI not available: {e}. Make sure IS_LOCAL=false and google-genai is installed.") | |
| # Check if we're in local mode | |
| IS_LOCAL = os.getenv("IS_LOCAL", "false").lower() == "true" | |
| # Only import Google GenAI if not in local mode | |
| if not IS_LOCAL: | |
| try: | |
| genai = _import_google_genai() | |
| except ImportError: | |
| genai = None | |
| else: | |
| genai = None | |
| logger = logging.getLogger("llm") | |
| if not logger.handlers: | |
| logger.setLevel(logging.INFO) | |
| handler = logging.StreamHandler() | |
| logger.addHandler(handler) | |
| # LLM parser limit text to log-out | |
| def snip(s: str, n: int = 12) -> str: | |
| if not isinstance(s, str): return "∅" | |
| parts = s.strip().split() | |
| return " ".join(parts[:n]) + (" …" if len(parts) > n else "") | |
| class KeyRotator: | |
| def __init__(self, env_prefix: str, max_keys: int = 5): | |
| self.env_prefix = env_prefix # Store as instance variable | |
| keys = [] | |
| for i in range(1, max_keys + 1): | |
| v = os.getenv(f"{env_prefix}_{i}") | |
| if v: | |
| keys.append(v.strip()) | |
| if not keys: | |
| logger.warning(f"[LLM] No keys found for prefix {env_prefix}_*") | |
| self.keys = keys | |
| self.dead = set() # Permanently dead keys | |
| self.temp_dead = {} # Temporarily dead keys with retry time | |
| self.retry_counts = {} # Track retry attempts per key | |
| self.idx = 0 | |
| self.max_retries = 3 # Max retries before permanent death | |
| self.retry_delay = 60 # Seconds to wait before retry | |
| def next_key(self) -> Optional[str]: | |
| if not self.keys: | |
| return None | |
| # Clean up expired temporary dead keys | |
| current_time = time.time() | |
| expired_keys = [k for k, retry_time in self.temp_dead.items() if current_time > retry_time] | |
| for k in expired_keys: | |
| del self.temp_dead[k] | |
| logger.info(f"[LLM] Key {k[:6]}*** is back in rotation after cooldown") | |
| # Try to find an available key | |
| for _ in range(len(self.keys)): | |
| k = self.keys[self.idx % len(self.keys)] | |
| self.idx += 1 | |
| # Skip permanently dead keys | |
| if k in self.dead: | |
| continue | |
| # Skip temporarily dead keys | |
| if k in self.temp_dead and current_time < self.temp_dead[k]: | |
| continue | |
| return k | |
| # All keys are dead or temporarily unavailable | |
| logger.warning(f"[LLM] All keys for {self.env_prefix} are unavailable") | |
| return None | |
| def mark_bad(self, key: Optional[str], error_type: str = "unknown"): | |
| if not key: | |
| return | |
| current_time = time.time() | |
| retry_count = self.retry_counts.get(key, 0) | |
| # Determine if this is a temporary or permanent failure | |
| is_temporary = self._is_temporary_error(error_type) | |
| if is_temporary and retry_count < self.max_retries: | |
| # Temporary failure - add to temp_dead with retry time | |
| retry_delay = self.retry_delay * (2 ** retry_count) # Exponential backoff | |
| self.temp_dead[key] = current_time + retry_delay | |
| self.retry_counts[key] = retry_count + 1 | |
| logger.warning(f"[LLM] Key {key[:6]}*** temporarily quarantined for {retry_delay}s (attempt {retry_count + 1}/{self.max_retries})") | |
| else: | |
| # Permanent failure or max retries reached | |
| self.dead.add(key) | |
| if key in self.temp_dead: | |
| del self.temp_dead[key] | |
| if key in self.retry_counts: | |
| del self.retry_counts[key] | |
| logger.error(f"[LLM] Key {key[:6]}*** permanently quarantined after {retry_count} retries") | |
| def _is_temporary_error(self, error_type: str) -> bool: | |
| """Determine if an error is temporary and worth retrying""" | |
| temporary_errors = [ | |
| "rate_limit", "quota_exceeded", "too_many_requests", "429", | |
| "service_unavailable", "503", "bad_gateway", "502", | |
| "timeout", "connection_error", "network_error" | |
| ] | |
| error_lower = error_type.lower() | |
| return any(temp_err in error_lower for temp_err in temporary_errors) | |
| def get_stats(self) -> dict: | |
| """Get rotator statistics""" | |
| return { | |
| "total_keys": len(self.keys), | |
| "dead_keys": len(self.dead), | |
| "temp_dead_keys": len(self.temp_dead), | |
| "available_keys": len(self.keys) - len(self.dead) - len(self.temp_dead), | |
| "retry_counts": self.retry_counts.copy() | |
| } | |
| class GeminiClient: | |
| def __init__(self, rotator: KeyRotator, default_model: str): | |
| self.rotator = rotator | |
| self.default_model = default_model | |
| self.available = genai is not None and not IS_LOCAL | |
| def generate(self, prompt: str, model: Optional[str] = None, temperature: float = 0.2, max_output_tokens: int = 512) -> Optional[str]: | |
| if not self.available: | |
| logger.warning("[LLM][Gemini] Google GenAI not available (local mode or import failed)") | |
| return None | |
| key = self.rotator.next_key() | |
| if not key: | |
| return None | |
| try: | |
| client = genai.Client(api_key=key) | |
| # NOTE: matches your required pattern/use | |
| res = client.models.generate_content( | |
| model=model or self.default_model, | |
| contents=prompt | |
| ) | |
| text = getattr(res, "text", None) | |
| if text: | |
| logger.info(f"[LLM][Gemini] out={snip(text)}") | |
| return text | |
| except Exception as e: | |
| error_msg = str(e) | |
| logger.error(f"[LLM][Gemini] {error_msg}") | |
| self.rotator.mark_bad(key, error_msg) | |
| return None | |
| class NvidiaClient: | |
| def __init__(self, rotator: KeyRotator, default_model: str): | |
| self.rotator = rotator | |
| self.default_model = default_model | |
| self.url = os.getenv("NVIDIA_API_URL", "https://integrate.api.nvidia.com/v1/chat/completions") | |
| # Regex-based cleaning resp from quotes | |
| def _clean_resp(self, resp: str) -> str: | |
| if not resp: return resp | |
| txt = resp.strip() | |
| # Remove common boilerplate prefixes | |
| for pat in [ | |
| r"^Here is (a|the) .*?:\s*", | |
| r"^Paraphrased(?: version)?:\s*", | |
| r"^Sure[,.]?\s*", | |
| r"^Okay[,.]?\s*" | |
| ]: | |
| import re | |
| txt = re.sub(pat, "", txt, flags=re.I) | |
| return txt.strip() | |
| def generate(self, prompt: str, model: Optional[str] = None, temperature: float = 0.2, max_tokens: int = 512) -> Optional[str]: | |
| key = self.rotator.next_key() | |
| if not key: | |
| return None | |
| try: | |
| headers = {"Authorization": f"Bearer {key}", "Content-Type": "application/json"} | |
| payload = { | |
| "model": model or self.default_model, | |
| "messages": [{"role": "user", "content": prompt}], | |
| "temperature": temperature, | |
| "max_tokens": max_tokens | |
| } | |
| r = requests.post(self.url, headers=headers, json=payload, timeout=45) | |
| if r.status_code >= 400: | |
| raise RuntimeError(f"HTTP {r.status_code}: {r.text[:200]}") | |
| data = r.json() | |
| text = data["choices"][0]["message"]["content"] | |
| clean = self._clean_resp(text) | |
| # Log the output here | |
| logger.info(f"[LLM][NVIDIA] out={snip(clean)}") | |
| return clean | |
| except Exception as e: | |
| error_msg = str(e) | |
| logger.error(f"[LLM][NVIDIA] {error_msg}") | |
| self.rotator.mark_bad(key, error_msg) | |
| return None | |
| class Paraphraser: | |
| """Intelligent API load balancer with rate limiting and cost optimization.""" | |
| def __init__(self, nvidia_model: str, gemini_model_easy: str, gemini_model_hard: str): | |
| self.nv = NvidiaClient(KeyRotator("NVIDIA_API"), nvidia_model) | |
| self.gm_easy = GeminiClient(KeyRotator("GEMINI_API"), gemini_model_easy) | |
| # Only use GEMINI_MODEL_EASY, ignore hard model completely | |
| self.gm_hard = None # Disabled - only use easy model | |
| # Rate limiting and load balancing | |
| self.last_nvidia_call = 0 | |
| self.last_gemini_call = 0 | |
| self.min_call_interval = 0.1 # Minimum 100ms between calls | |
| self.nvidia_success_rate = 1.0 # Track success rates for load balancing | |
| self.gemini_success_rate = 1.0 | |
| self.call_counts = {"nvidia": 0, "gemini": 0, "failures": 0} | |
| logger.info("Paraphraser initialized with intelligent load balancing: NVIDIA -> GEMINI_EASY") | |
| def _rate_limit(self, api_type: str): | |
| """Apply rate limiting to prevent API exhaustion""" | |
| current_time = time.time() | |
| if api_type == "nvidia": | |
| time_since_last = current_time - self.last_nvidia_call | |
| if time_since_last < self.min_call_interval: | |
| sleep_time = self.min_call_interval - time_since_last | |
| time.sleep(sleep_time) | |
| self.last_nvidia_call = time.time() | |
| elif api_type == "gemini": | |
| time_since_last = current_time - self.last_gemini_call | |
| if time_since_last < self.min_call_interval: | |
| sleep_time = self.min_call_interval - time_since_last | |
| time.sleep(sleep_time) | |
| self.last_gemini_call = time.time() | |
| def _select_api(self, prefer_cheap: bool = True) -> str: | |
| """Intelligently select API based on success rates and availability""" | |
| nvidia_stats = self.nv.rotator.get_stats() | |
| gemini_stats = self.gm_easy.rotator.get_stats() | |
| nvidia_available = nvidia_stats["available_keys"] > 0 | |
| gemini_available = gemini_stats["available_keys"] > 0 | |
| if not nvidia_available and not gemini_available: | |
| return "none" | |
| elif not nvidia_available: | |
| return "gemini" | |
| elif not gemini_available: | |
| return "nvidia" | |
| # Both available - use intelligent selection | |
| if prefer_cheap: | |
| # Prefer NVIDIA (cheaper) but consider success rates | |
| if self.nvidia_success_rate > 0.8 or self.gemini_success_rate < 0.5: | |
| return "nvidia" | |
| else: | |
| return "gemini" | |
| else: | |
| # Prefer quality (Gemini) but consider success rates | |
| if self.gemini_success_rate > 0.8 or self.nvidia_success_rate < 0.5: | |
| return "gemini" | |
| else: | |
| return "nvidia" | |
| def _update_success_rate(self, api_type: str, success: bool): | |
| """Update success rate tracking for load balancing""" | |
| if api_type == "nvidia": | |
| # Exponential moving average | |
| alpha = 0.1 | |
| self.nvidia_success_rate = alpha * (1.0 if success else 0.0) + (1 - alpha) * self.nvidia_success_rate | |
| elif api_type == "gemini": | |
| alpha = 0.1 | |
| self.gemini_success_rate = alpha * (1.0 if success else 0.0) + (1 - alpha) * self.gemini_success_rate | |
| def _call_api(self, prompt: str, api_type: str, **kwargs) -> Optional[str]: | |
| """Make API call with rate limiting and error tracking""" | |
| self._rate_limit(api_type) | |
| try: | |
| if api_type == "nvidia": | |
| result = self.nv.generate(prompt, **kwargs) | |
| self.call_counts["nvidia"] += 1 | |
| success = result is not None | |
| self._update_success_rate("nvidia", success) | |
| return result | |
| elif api_type == "gemini": | |
| result = self.gm_easy.generate(prompt, **kwargs) | |
| self.call_counts["gemini"] += 1 | |
| success = result is not None | |
| self._update_success_rate("gemini", success) | |
| return result | |
| except Exception as e: | |
| self.call_counts["failures"] += 1 | |
| self._update_success_rate(api_type, False) | |
| logger.error(f"[LLM] API call failed for {api_type}: {e}") | |
| return None | |
| return None | |
| # Enhanced cleaning to remove conversational elements and comments | |
| def _clean_resp(self, resp: str) -> str: | |
| if not resp: return resp | |
| txt = resp.strip() | |
| # Remove common conversational prefixes and comments | |
| prefixes_to_remove = [ | |
| "Here's a rewritten version of", | |
| "Here is a rewritten version of", | |
| "Here's the rewritten text:", | |
| "Here is the rewritten text:", | |
| "Here's the translation:", | |
| "Here is the translation:", | |
| "Here's the enhanced text:", | |
| "Here is the enhanced text:", | |
| "Here's the improved text:", | |
| "Here is the improved text:", | |
| "Here's the medical context:", | |
| "Here is the medical context:", | |
| "Here's the cleaned text:", | |
| "Here is the cleaned text:", | |
| "Here's the answer:", | |
| "Here is the answer:", | |
| "Here's a paraphrased version:", | |
| "Here is a paraphrased version:", | |
| "Paraphrased version:", | |
| "Paraphrased:", | |
| "Sure,", | |
| "Okay,", | |
| "Certainly,", | |
| "Of course,", | |
| "I can help you with that.", | |
| "I'll help you with that.", | |
| "Let me help you with that.", | |
| "I can rewrite that for you.", | |
| "I'll rewrite that for you.", | |
| "Let me rewrite that for you.", | |
| "I can translate that for you.", | |
| "I'll translate that for you.", | |
| "Let me translate that for you.", | |
| ] | |
| # Remove prefixes | |
| for prefix in prefixes_to_remove: | |
| if txt.lower().startswith(prefix.lower()): | |
| txt = txt[len(prefix):].strip() | |
| break | |
| # Remove common boilerplate prefixes with regex | |
| import re | |
| for pat in [ | |
| r"^Here is (a|the) .*?:\s*", | |
| r"^Paraphrased(?: version)?:\s*", | |
| r"^Sure[,.]?\s*", | |
| r"^Okay[,.]?\s*", | |
| r"^Certainly[,.]?\s*", | |
| r"^Of course[,.]?\s*", | |
| r"^I can .*?:\s*", | |
| r"^I'll .*?:\s*", | |
| r"^Let me .*?:\s*" | |
| ]: | |
| txt = re.sub(pat, "", txt, flags=re.I) | |
| # Remove any remaining conversational elements | |
| lines = txt.split('\n') | |
| cleaned_lines = [] | |
| for line in lines: | |
| line = line.strip() | |
| if line and not any(phrase in line.lower() for phrase in [ | |
| "here's", "here is", "let me", "i can", "i'll", "sure,", "okay,", | |
| "certainly,", "of course,", "i hope this helps", "hope this helps", | |
| "does this help", "is this what you", "let me know if" | |
| ]): | |
| cleaned_lines.append(line) | |
| return '\n'.join(cleaned_lines).strip() | |
| # ————— Paraphrase ————— | |
| def paraphrase(self, text: str, difficulty: str = "easy", custom_prompt: str = None) -> str: | |
| if not text or len(text) < 12: | |
| return text | |
| # Use custom prompt if provided, otherwise use optimized medical prompts | |
| if custom_prompt: | |
| prompt = custom_prompt | |
| else: | |
| # Optimized medical paraphrasing prompts based on difficulty | |
| if difficulty == "easy": | |
| prompt = ( | |
| "Rewrite the following medical text using different words while preserving all medical facts, clinical terms, and meaning. Keep the same level of detail and accuracy. Return only the rewritten text without any introduction or commentary.\n\n" | |
| f"{text}" | |
| ) | |
| else: # hard difficulty | |
| prompt = ( | |
| "Rewrite the following medical text using more sophisticated medical language and different sentence structures while preserving all clinical facts, medical terminology, and diagnostic information. Maintain professional medical tone. Return only the rewritten text without any introduction or commentary.\n\n" | |
| f"{text}" | |
| ) | |
| # Optimize temperature and token limits based on difficulty | |
| temperature = 0.1 if difficulty == "easy" else 0.3 | |
| max_tokens = min(600, max(128, len(text)//2)) | |
| # Intelligent API selection with fallback | |
| api_type = self._select_api(prefer_cheap=True) | |
| if api_type == "nvidia": | |
| out = self._call_api(prompt, "nvidia", temperature=temperature, max_tokens=max_tokens) | |
| if out: | |
| return self._clean_resp(out) | |
| # Fallback to Gemini if NVIDIA fails | |
| api_type = self._select_api(prefer_cheap=False) | |
| if api_type == "gemini": | |
| out = self._call_api(prompt, "gemini", max_output_tokens=max_tokens) | |
| if out: | |
| logger.info(f"[LLM][GEMINI] out={snip(self._clean_resp(out))}") | |
| return self._clean_resp(out) | |
| # Both APIs failed | |
| logger.warning(f"[LLM] All APIs failed for paraphrase, returning original text") | |
| return text | |
| # ————— Translate & Backtranslate ————— | |
| def translate(self, text: str, target_lang: str = "vi") -> Optional[str]: | |
| if not text: return text | |
| # Optimized medical translation prompts | |
| if target_lang == "vi": | |
| prompt = ( | |
| "Translate the following English medical text to Vietnamese while preserving all medical terminology, clinical facts, and professional medical language. Use appropriate Vietnamese medical terms. Return only the translation without any introduction or commentary.\n\n" | |
| f"{text}" | |
| ) | |
| else: | |
| prompt = ( | |
| f"Translate the following medical text to {target_lang} while preserving all medical terminology, clinical facts, and professional medical language. Return only the translation without any introduction or commentary.\n\n" | |
| f"{text}" | |
| ) | |
| # Intelligent API selection for translation | |
| api_type = self._select_api(prefer_cheap=True) | |
| if api_type == "nvidia": | |
| out = self._call_api(prompt, "nvidia", temperature=0.0, max_tokens=min(800, len(text)+100)) | |
| if out: | |
| return out.strip() | |
| # Fallback to Gemini if NVIDIA fails | |
| api_type = self._select_api(prefer_cheap=False) | |
| if api_type == "gemini": | |
| out = self._call_api(prompt, "gemini", max_output_tokens=min(800, len(text)+100)) | |
| if out: | |
| return out.strip() | |
| return None | |
| def backtranslate(self, text: str, via_lang: str = "vi") -> Optional[str]: | |
| if not text: return text | |
| mid = self.translate(text, target_lang=via_lang) | |
| if not mid: return None | |
| # Optimized backtranslation prompt with medical focus | |
| if via_lang == "vi": | |
| prompt = ( | |
| "Translate the following Vietnamese medical text back to English while preserving all medical terminology, clinical facts, and professional medical language. Ensure the translation is medically accurate. Return only the translation without any introduction or commentary.\n\n" | |
| f"{mid}" | |
| ) | |
| else: | |
| prompt = ( | |
| f"Translate the following {via_lang} medical text back to English while preserving all medical terminology, clinical facts, and professional medical language. Return only the translation without any introduction or commentary.\n\n" | |
| f"{mid}" | |
| ) | |
| # Intelligent API selection for backtranslation | |
| api_type = self._select_api(prefer_cheap=True) | |
| if api_type == "nvidia": | |
| out = self._call_api(prompt, "nvidia", temperature=0.0, max_tokens=min(900, len(text)+150)) | |
| if out: | |
| return out.strip() | |
| # Fallback to Gemini if NVIDIA fails | |
| api_type = self._select_api(prefer_cheap=False) | |
| if api_type == "gemini": | |
| out = self._call_api(prompt, "gemini", max_output_tokens=min(900, len(text)+150)) | |
| if out: | |
| return out.strip() | |
| return None | |
| # ————— Consistency Judge (cheap, ratio-based) ————— | |
| def consistency_check(self, user: str, output: str) -> bool: | |
| """Return True if 'output' appears supported by 'user' (context/question). Optimized medical validation.""" | |
| prompt = ( | |
| "Evaluate if the medical answer is consistent with the question/context and medically accurate. Consider medical accuracy, clinical appropriateness, consistency with the question, safety standards, and completeness of medical information. Reply with exactly 'PASS' if the answer is medically sound and consistent, otherwise 'FAIL'.\n\n" | |
| f"Question/Context: {user}\n\n" | |
| f"Medical Answer: {output}" | |
| ) | |
| # Use intelligent API selection for consistency check | |
| api_type = self._select_api(prefer_cheap=True) | |
| if api_type == "nvidia": | |
| out = self._call_api(prompt, "nvidia", temperature=0.0, max_tokens=5) | |
| if out: | |
| return isinstance(out, str) and "PASS" in out.upper() | |
| # Fallback to Gemini if NVIDIA fails | |
| api_type = self._select_api(prefer_cheap=False) | |
| if api_type == "gemini": | |
| out = self._call_api(prompt, "gemini", max_output_tokens=5) | |
| if out: | |
| return isinstance(out, str) and "PASS" in out.upper() | |
| # If both APIs fail, assume consistency (conservative approach) | |
| logger.warning("[LLM] Consistency check failed due to API unavailability, assuming consistent") | |
| return True | |
| def get_api_stats(self) -> dict: | |
| """Get comprehensive API usage statistics""" | |
| nvidia_stats = self.nv.rotator.get_stats() | |
| gemini_stats = self.gm_easy.rotator.get_stats() | |
| return { | |
| "call_counts": self.call_counts.copy(), | |
| "success_rates": { | |
| "nvidia": self.nvidia_success_rate, | |
| "gemini": self.gemini_success_rate | |
| }, | |
| "nvidia_rotator": nvidia_stats, | |
| "gemini_rotator": gemini_stats, | |
| "total_calls": sum(self.call_counts.values()), | |
| "failure_rate": self.call_counts["failures"] / max(1, sum(self.call_counts.values())) | |
| } | |
| def medical_accuracy_check(self, question: str, answer: str) -> bool: | |
| """Check medical accuracy of Q&A pairs using cloud APIs""" | |
| if not question or not answer: | |
| return False | |
| prompt = ( | |
| "Evaluate if the medical answer is accurate and appropriate for the question. Consider medical facts, clinical knowledge, appropriate medical terminology, clinical reasoning, logic, and safety considerations. Reply with exactly 'ACCURATE' if the answer is medically correct, otherwise 'INACCURATE'.\n\n" | |
| f"Medical Question: {question}\n\n" | |
| f"Medical Answer: {answer}" | |
| ) | |
| out = self.nv.generate(prompt, temperature=0.0, max_tokens=5) | |
| if not out: | |
| out = self.gm_easy.generate(prompt, max_output_tokens=5) | |
| return isinstance(out, str) and "ACCURATE" in out.upper() | |
| def enhance_medical_terminology(self, text: str) -> str: | |
| """Enhance medical terminology in text using cloud APIs""" | |
| if not text or len(text) < 20: | |
| return text | |
| prompt = ( | |
| "Improve the medical terminology in the following text while preserving all factual information and clinical accuracy. Use more precise medical terms where appropriate. Return only the improved text without any introduction or commentary.\n\n" | |
| f"{text}" | |
| ) | |
| out = self.nv.generate(prompt, temperature=0.1, max_tokens=min(800, len(text)+100)) | |
| if not out: | |
| out = self.gm_easy.generate(prompt, max_output_tokens=min(800, len(text)+100)) | |
| return out if out else text | |
| def create_clinical_scenarios(self, question: str, answer: str) -> list: | |
| """Create different clinical scenarios from Q&A pairs using cloud APIs""" | |
| scenarios = [] | |
| # Different clinical context prompts | |
| context_prompts = [ | |
| ( | |
| "Rewrite this medical question as if asked by a patient in an emergency room setting. Return only the rewritten question without any introduction or commentary:\n\n{question}", | |
| "emergency_room" | |
| ), | |
| ( | |
| "Rewrite this medical question as if asked by a patient during a routine checkup. Return only the rewritten question without any introduction or commentary:\n\n{question}", | |
| "routine_checkup" | |
| ), | |
| ( | |
| "Rewrite this medical question as if asked by a patient with chronic conditions. Return only the rewritten question without any introduction or commentary:\n\n{question}", | |
| "chronic_care" | |
| ), | |
| ( | |
| "Rewrite this medical question as if asked by a patient's family member. Return only the rewritten question without any introduction or commentary:\n\n{question}", | |
| "family_inquiry" | |
| ) | |
| ] | |
| for prompt_template, scenario_type in context_prompts: | |
| try: | |
| prompt = prompt_template.format(question=question) | |
| scenario_question = self.paraphrase(question, difficulty="hard", custom_prompt=prompt) | |
| if scenario_question and not self._is_invalid_response(scenario_question): | |
| scenarios.append((scenario_question, answer, scenario_type)) | |
| except Exception as e: | |
| logger.warning(f"Failed to create clinical scenario {scenario_type}: {e}") | |
| continue | |
| return scenarios | |
| def _is_invalid_response(self, text: str) -> bool: | |
| """Check if response is invalid""" | |
| if not text or not isinstance(text, str): | |
| return True | |
| text_lower = text.lower().strip() | |
| invalid_patterns = [ | |
| "fail", "invalid", "i couldn't", "i can't", "i cannot", "unable to", | |
| "sorry", "error", "not available", "no answer", "insufficient", | |
| "don't know", "do not know", "not sure", "cannot determine", | |
| "unable to provide", "not possible", "not applicable", "n/a" | |
| ] | |
| if len(text_lower) < 3: | |
| return True | |
| for pattern in invalid_patterns: | |
| if pattern in text_lower: | |
| return True | |
| return False | |