import json import os import uuid import traceback from datetime import datetime from typing import Dict, List, Any, Optional import pytz from langchain_openai import ChatOpenAI from langchain.schema import HumanMessage, SystemMessage from .config import logger from .github_storage import get_github_storage class MedicalAnswerValidator: """ Medical answer validation system that evaluates responses using a separate LLM instance. Produces structured JSON evaluations and saves them to evaluation_results.json. """ def __init__(self): """Initialize the validator with LLM and system prompt.""" self.validator_llm = self._create_validator_llm() self.validation_system_prompt = self._create_validation_system_prompt() self.evaluation_file = "evaluation_results.json" logger.info("Medical answer validator initialized successfully") def _get_next_interaction_id(self) -> str: """Get the next interaction ID by finding the highest existing ID and adding 1.""" try: # Try to get from GitHub first github_storage = get_github_storage() existing_content = github_storage._get_file_content("medical_data/evaluation_results.json") if existing_content: try: evaluations = json.loads(existing_content) if evaluations and isinstance(evaluations, list): logger.info(f"Found {len(evaluations)} existing evaluations in GitHub") # Find the highest existing ID max_id = 0 for eval_item in evaluations: try: current_id = int(eval_item.get("interaction_id", "0")) max_id = max(max_id, current_id) except (ValueError, TypeError): continue next_id = str(max_id + 1) logger.info(f"Next interaction ID will be: {next_id}") return next_id except json.JSONDecodeError as e: logger.warning(f"Failed to parse GitHub evaluation file: {e}") pass # Fallback to local file check if os.path.exists(self.evaluation_file): logger.info("GitHub file not found, checking local file") with open(self.evaluation_file, "r", encoding="utf-8") as f: evaluations = json.load(f) if evaluations: logger.info(f"Found {len(evaluations)} existing evaluations in local file") # Find the highest existing ID max_id = 0 for eval_item in evaluations: try: current_id = int(eval_item.get("interaction_id", "0")) max_id = max(max_id, current_id) except (ValueError, TypeError): continue next_id = str(max_id + 1) logger.info(f"Next interaction ID from local file: {next_id}") return next_id else: logger.info("Local file is empty, starting with ID 1") return "1" else: logger.info("No existing evaluation file found, starting with ID 1") return "1" except Exception as e: logger.error(f"Error getting next interaction ID: {e}") return "1" def _clean_documents_for_storage(self, documents: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Clean documents by removing snippets and keeping only essential fields.""" cleaned_docs = [] for doc in documents: is_context_page = doc.get("context_enrichment", False) cleaned_doc = { "doc_id": doc.get("doc_id"), "source": doc.get("source", "unknown"), "provider": doc.get("provider", "unknown"), "page_number": doc.get("page_number", "unknown"), "disease": doc.get("disease", "unknown"), "page_type": "CONTEXT PAGE" if is_context_page else "ORIGINAL PAGE", "context_enrichment": is_context_page, "content": doc.get("content", "") } cleaned_docs.append(cleaned_doc) return cleaned_docs def _create_validation_system_prompt(self) -> str: """Create the system prompt for the validation LLM.""" return """Role You are medical information validator tasked with validating the following answer to ensure it is accurate, complete, relevant, well-structured (coherent), appropriately concise (length), and properly attributed (cited) based only on the provided documents. Here is your input: Question: [User's original question] Retrieved Answer: [The answer generated or retrieved from documents] Documents: [Provide a link or summary of the relevant document sections] Validation Task Criteria: For each criterion below, provide a Score (0-100%) and a detailed Comment explaining the score and noting any necessary improvements, specific issues, or confirming satisfactory performance. Accuracy (0-100%) Is the answer factually correct based only on the provided documents? Ensure that no information contradicts what is written in the documents. If you find any discrepancies or factual errors, point them out in the [Accuracy_Comment]. If the answer contains unsupported statements (hallucinations), highlight them in the [Accuracy_Comment]. Validation Score Guidelines: 100%: The answer is factually correct, with no contradictions or missing information based on the provided documents. 85-99%: The answer is mostly correct, but contains minor inaccuracies or omissions that don't substantially affect the overall accuracy. 70-84%: The answer contains notable factual errors or omissions that may affect the response's reliability. Below 70%: The answer is factually incorrect, contains critical errors, or misrepresents the content of the documents. Coherence (0-100%) Is the answer logically structured and clear? Ensure the answer flows well, uses appropriate language, and makes sense to a human reader. If the answer is unclear or poorly structured, suggest specific improvements in the [Coherence_Comment]. Coherence Score Guidelines: 100%: The answer is logically structured, easy to understand, and free from confusion or ambiguity. 85-99%: The answer is mostly clear but may have slight issues with flow or readability, such as minor disjointedness. 70-84%: The answer lacks clarity or contains some sections that confuse the reader due to poor structure. Below 70%: The answer is poorly structured or difficult to follow, requiring significant improvement in clarity and flow. Relevance (0-100%) Does the answer address the user's question adequately and fully? Ensure that the core topic of the question is covered and that no irrelevant or off-topic information is included. If parts of the question are missed or the answer is irrelevant, identify which parts need improvement in the [Relevance_Comment]. Relevance Score Guidelines: 100%: The answer directly addresses all parts of the user's question without unnecessary deviations. 85-99%: The answer is mostly relevant, but might include slight off-topic information or miss minor aspects of the question. 70-84%: The answer misses key points or includes significant irrelevant details that distract from the question. Below 70%: The answer is largely irrelevant to the user's question or includes significant off-topic information. Completeness (0-100%) Does the answer provide all necessary information that is available in the documents to fully address the question? Are there any critical details missing? If the answer is incomplete or vague, suggest what additional details should be included from the documents in the [Completeness_Comment]. Completeness Score Guidelines: 100%: The answer provides all necessary information in sufficient detail, covering all aspects of the question based on the documents. 85-99%: The answer covers most of the required details but may lack some minor points available in the source. 70-84%: The answer is missing critical information available in the documents or lacks important details to fully address the question. Below 70%: The answer is severely incomplete, leaving out essential information available in the documents. Citations/Attribution (0-100%) Is every claim in the answer correctly attributed (cited) to the relevant document(s)? Are all citations accurate and correctly placed? If any statement lacks a citation or has an incorrect citation, note the specific issue in the [Citations_Attribution_Comment]. Citations/Attribution Score Guidelines: 100%: Every piece of information is correctly and appropriately cited to the supporting document(s). 85-99%: Citations are mostly correct, but there are one or two minor errors (e.g., misplaced citation, minor formatting issue). 70-84%: Several statements are missing citations, or multiple citations are incorrectly attributed, leading to potential confusion about the source. Below 70%: The majority of the answer lacks proper citation, or citations are so poorly done they are unreliable. Length (0-100%) Is the answer the right length to fully answer the question, without being too short (lacking detail) or too long (causing distraction or including irrelevant information)? Provide a rating based on whether the answer strikes the right balance in the [Length_Comment]. Length Score Guidelines: 100%: The answer is appropriately detailed, offering enough information to fully address the question without unnecessary elaboration. 85-99%: The answer is sufficiently detailed but could be slightly more concise or might include minor irrelevant information. 70-84%: The answer is either too brief and lacks necessary detail or too lengthy with excessive, distracting information. Below 70%: The answer is either too short to be meaningful or too long, causing distractions or loss of focus. Final Evaluation Output Based on the above checks, provide a rating and a comment for each aspect, and a final overall rating. Your entire output must be a single JSON object that strictly follows the structure defined below. CRITICAL INSTRUCTIONS: - Output ONLY valid JSON - no additional text before or after - Use double quotes for all strings - Ensure all rating values are numbers between 0-100 (no quotes around numbers) - Do not include any markdown formatting or code blocks - Start your response immediately with { and end with } Required JSON Output Structure: { "Accuracy_Rating": "95", "Accuracy_Comment": "Detailed comment on factual correctness/issues", "Coherence_Rating": "90", "Coherence_Comment": "Detailed comment on flow, structure, and clarity", "Relevance_Rating": "88", "Relevance_Comment": "Detailed comment on addressing the question fully/irrelevant info", "Completeness_Rating": "92", "Completeness_Comment": "Detailed comment on missing critical details available in the documents", "Citations_Attribution_Rating": "85", "Citations_Attribution_Comment": "Detailed comment on citation accuracy and completeness", "Length_Rating": "90", "Length_Comment": "Detailed comment on conciseness and appropriate detail", "Overall_Rating": "90", "Final_Summary_and_Improvement_Plan": "Overall judgment. If rating is below 90%, describe what specific changes are needed to achieve a 100%. If 90% or above, state that the answer is ready." } REMEMBER: Output ONLY the JSON object above with your specific ratings and comments. No other text.""" def _create_validator_llm(self) -> ChatOpenAI: """Create a separate LLM instance for validation.""" try: openai_key = os.getenv("OPENAI_API_KEY") if not openai_key: raise ValueError("OpenAI API key is required for validation") return ChatOpenAI( model="gpt-4o", api_key=openai_key, # base_url=os.getenv("OPENAI_BASE_URL"), temperature=0.0, max_tokens=1024, request_timeout=60, max_retries=3, streaming=False, ) except Exception as e: logger.error(f"Failed to create validator LLM: {e}") raise def validate_answer( self, question: str, retrieved_documents: List[Dict[str, Any]], generated_answer: str ) -> Dict[str, Any]: """ Validate a medical answer and return structured evaluation. Args: question: The original user question retrieved_documents: List of retrieved documents with metadata generated_answer: The AI-generated answer to validate Returns: Dict containing the complete evaluation with metadata """ try: # Generate simple sequential interaction ID interaction_id = self._get_next_interaction_id() logger.info(f"Starting validation for interaction {interaction_id}") # Clean documents (remove snippets) for storage cleaned_documents = self._clean_documents_for_storage(retrieved_documents) # Format documents for validation formatted_docs = self._format_documents_for_validation(retrieved_documents) # Create validation prompt validation_prompt = f"""Question: {question} Retrieved Answer: {generated_answer} Documents: {formatted_docs}""" # Get validation from LLM with retry logic validation_report = None max_retries = 3 for attempt in range(max_retries): try: messages = [ SystemMessage(content=self.validation_system_prompt), HumanMessage(content=validation_prompt) ] response = self.validator_llm.invoke(messages) validation_content = response.content.strip() # Check if response is empty if not validation_content: logger.warning(f"Empty response from validation LLM (attempt {attempt + 1})") if attempt < max_retries - 1: continue else: validation_report = self._create_fallback_validation("Empty response from validation LLM") break # Try to parse JSON directly first try: validation_report = json.loads(validation_content) except json.JSONDecodeError: # Try to extract JSON from response that might have extra text validation_report = self._extract_json_from_response(validation_content) if validation_report is None: raise json.JSONDecodeError("Could not extract valid JSON", validation_content, 0) # Validate that all required fields are present required_fields = [ "Accuracy_Rating", "Accuracy_Comment", "Coherence_Rating", "Coherence_Comment", "Relevance_Rating", "Relevance_Comment", "Completeness_Rating", "Completeness_Comment", "Citations_Attribution_Rating", "Citations_Attribution_Comment", "Length_Rating", "Length_Comment", "Overall_Rating", "Final_Summary_and_Improvement_Plan" ] missing_fields = [field for field in required_fields if field not in validation_report] if missing_fields: logger.warning(f"Missing fields in validation response: {missing_fields}") if attempt < max_retries - 1: continue else: # Fill missing fields for field in missing_fields: if field.endswith("_Rating"): validation_report[field] = "0" else: validation_report[field] = f"Field missing from validation response: {field}" # Success - break out of retry loop break except json.JSONDecodeError as e: logger.error(f"Failed to parse validation JSON (attempt {attempt + 1}): {e}") logger.error(f"Raw response: {validation_content[:200]}...") if attempt < max_retries - 1: continue else: validation_report = self._create_fallback_validation(f"JSON parsing failed after {max_retries} attempts: {str(e)}") except Exception as e: logger.error(f"Validation LLM error (attempt {attempt + 1}): {e}") if attempt < max_retries - 1: continue else: # Use basic validation as final fallback logger.info("Using basic heuristic validation as fallback") validation_report = self._create_basic_validation(question, generated_answer, retrieved_documents) # Ensure we have a validation report if validation_report is None: logger.info("Creating basic validation as final fallback") validation_report = self._create_basic_validation(question, generated_answer, retrieved_documents) # Create complete evaluation structure evaluation = { "interaction_id": interaction_id, "timestamp": datetime.now(pytz.timezone('Africa/Cairo')).isoformat(), "question": question, "retrieved_documents": cleaned_documents, "generated_answer": generated_answer, "validation_report": validation_report } # Save to JSON file self._save_evaluation(evaluation) return evaluation except Exception as e: logger.error(f"Error during validation: {e}") return self._create_error_evaluation(question, retrieved_documents, generated_answer, str(e)) def _format_documents_for_validation(self, documents: List[Dict[str, Any]]) -> str: """Format retrieved documents for validation prompt.""" if not documents: return "No documents provided." formatted_docs = [] for i, doc in enumerate(documents, 1): doc_info = f"Document {i}:\n" doc_info += f"Source: {doc.get('source', 'Unknown')}\n" doc_info += f"Provider: {doc.get('provider', 'Unknown')}\n" doc_info += f"Page: {doc.get('page_number', 'Unknown')}\n" doc_info += f"Content: {doc.get('snippet', doc.get('content', 'No content'))}\n" formatted_docs.append(doc_info) return "\n\n".join(formatted_docs) def _create_fallback_validation(self, error_msg: str) -> Dict[str, str]: """Create a fallback validation report when JSON parsing fails.""" return { "Accuracy_Rating": "0", "Accuracy_Comment": f"Validation failed due to parsing error: {error_msg}", "Coherence_Rating": "0", "Coherence_Comment": "Unable to evaluate due to validation system error", "Relevance_Rating": "0", "Relevance_Comment": "Unable to evaluate due to validation system error", "Completeness_Rating": "0", "Completeness_Comment": "Unable to evaluate due to validation system error", "Citations_Attribution_Rating": "0", "Citations_Attribution_Comment": "Unable to evaluate due to validation system error", "Length_Rating": "0", "Length_Comment": "Unable to evaluate due to validation system error", "Overall_Rating": "0", "Final_Summary_and_Improvement_Plan": f"Validation system encountered an error: {error_msg}" } def _extract_json_from_response(self, response_text: str) -> Dict[str, str]: """Extract JSON from response that might contain extra text.""" try: # Try to find JSON in the response start_idx = response_text.find('{') end_idx = response_text.rfind('}') if start_idx != -1 and end_idx != -1 and end_idx > start_idx: json_text = response_text[start_idx:end_idx + 1] return json.loads(json_text) else: raise ValueError("No JSON object found in response") except Exception as e: logger.error(f"Failed to extract JSON from response: {e}") return None def _create_basic_validation(self, question: str, answer: str, documents: List[Dict[str, Any]]) -> Dict[str, str]: """Create a basic validation when LLM fails but we can still provide some assessment.""" # Basic heuristic scoring accuracy_score = "75" # Assume reasonable accuracy if documents are provided coherence_score = "80" if len(answer) > 100 and "." in answer else "60" relevance_score = "70" if any(word in answer.lower() for word in question.lower().split()) else "50" completeness_score = "70" if len(answer) > 200 else "50" citations_score = "80" if "Source:" in answer else "30" length_score = "75" if 100 < len(answer) < 2000 else "60" # Calculate overall as average scores = [int(accuracy_score), int(coherence_score), int(relevance_score), int(completeness_score), int(citations_score), int(length_score)] overall_score = str(sum(scores) // len(scores)) return { "Accuracy_Rating": accuracy_score, "Accuracy_Comment": "Basic heuristic assessment - LLM validation unavailable. Answer appears to reference provided documents.", "Coherence_Rating": coherence_score, "Coherence_Comment": "Basic heuristic assessment - Answer structure and length suggest reasonable coherence.", "Relevance_Rating": relevance_score, "Relevance_Comment": "Basic heuristic assessment - Answer appears to address key terms from the question.", "Completeness_Rating": completeness_score, "Completeness_Comment": "Basic heuristic assessment - Answer length suggests reasonable completeness.", "Citations_Attribution_Rating": citations_score, "Citations_Attribution_Comment": "Basic heuristic assessment - Citations detected in answer format." if "Source:" in answer else "Basic heuristic assessment - Limited citation formatting detected.", "Length_Rating": length_score, "Length_Comment": "Basic heuristic assessment - Answer length appears appropriate for medical question.", "Overall_Rating": overall_score, "Final_Summary_and_Improvement_Plan": f"Basic validation completed (Overall: {overall_score}/100). LLM-based validation was unavailable, so heuristic scoring was used. For full validation, ensure the validation LLM service is accessible." } def _create_error_evaluation( self, question: str, documents: List[Dict[str, Any]], answer: str, error_msg: str ) -> Dict[str, Any]: """Create an error evaluation when validation completely fails.""" return { "interaction_id": str(uuid.uuid4()), "timestamp": datetime.now(pytz.timezone('Africa/Cairo')).isoformat(), "question": question, "retrieved_documents": documents, "generated_answer": answer, "validation_report": { "Accuracy_Rating": "0", "Accuracy_Comment": f"Validation error: {error_msg}", "Coherence_Rating": "0", "Coherence_Comment": f"Validation error: {error_msg}", "Relevance_Rating": "0", "Relevance_Comment": f"Validation error: {error_msg}", "Completeness_Rating": "0", "Completeness_Comment": f"Validation error: {error_msg}", "Citations_Attribution_Rating": "0", "Citations_Attribution_Comment": f"Validation error: {error_msg}", "Length_Rating": "0", "Length_Comment": f"Validation error: {error_msg}", "Overall_Rating": "0", "Final_Summary_and_Improvement_Plan": f"System error prevented validation: {error_msg}" }, "error": error_msg } def _save_evaluation(self, evaluation: Dict[str, Any]) -> None: """Save evaluation to GitHub repository.""" try: logger.info(f"Attempting to save evaluation with ID: {evaluation['interaction_id']}") # Try to save to GitHub first github_storage = get_github_storage() logger.info("GitHub storage instance obtained, calling save_validation_results...") success = github_storage.save_validation_results(evaluation) if success: logger.info(f"✓ Evaluation saved to GitHub successfully with ID: {evaluation['interaction_id']}") else: logger.warning(f"GitHub save failed for evaluation {evaluation['interaction_id']}, falling back to local storage") # Fallback to local storage if GitHub fails evaluations = [] if os.path.exists(self.evaluation_file): try: with open(self.evaluation_file, 'r', encoding='utf-8') as f: evaluations = json.load(f) logger.info(f"Loaded {len(evaluations)} existing evaluations from local file") except (json.JSONDecodeError, FileNotFoundError) as e: logger.warning(f"Could not load local file: {e}") evaluations = [] # Add new evaluation evaluations.append(evaluation) # Save back to local file with open(self.evaluation_file, 'w', encoding='utf-8') as f: json.dump(evaluations, f, indent=2, ensure_ascii=False) logger.info(f"✓ Evaluation saved locally (GitHub failed) with ID: {evaluation['interaction_id']}") except Exception as e: logger.error(f"Failed to save evaluation: {e}") logger.error(f"Traceback: {traceback.format_exc()}") def get_evaluation_summary(self, limit: int = 10) -> Dict[str, Any]: """Get summary of recent evaluations from GitHub repository.""" try: # Try to get data from GitHub first github_storage = get_github_storage() github_results = github_storage.get_validation_results(limit) if github_results and "error" not in github_results: return github_results # Fallback to local file if GitHub fails if not os.path.exists(self.evaluation_file): return {"message": "No evaluations found", "evaluations": []} with open(self.evaluation_file, 'r', encoding='utf-8') as f: evaluations = json.load(f) # Get recent evaluations recent_evaluations = evaluations[-limit:] if evaluations else [] # Calculate average scores if recent_evaluations: total_scores = { "accuracy": 0, "coherence": 0, "relevance": 0, "completeness": 0, "citations": 0, "length": 0, "overall": 0 } count = len(recent_evaluations) for eval_data in recent_evaluations: report = eval_data.get("validation_report", {}) total_scores["accuracy"] += int(report.get("Accuracy_Rating", 0)) total_scores["coherence"] += int(report.get("Coherence_Rating", 0)) total_scores["relevance"] += int(report.get("Relevance_Rating", 0)) total_scores["completeness"] += int(report.get("Completeness_Rating", 0)) total_scores["citations"] += int(report.get("Citations_Attribution_Rating", 0)) total_scores["length"] += int(report.get("Length_Rating", 0)) total_scores["overall"] += int(report.get("Overall_Rating", 0)) averages = {key: round(value / count, 1) for key, value in total_scores.items()} else: averages = {} return { "total_evaluations": len(evaluations), "recent_count": len(recent_evaluations), "average_scores": averages, "evaluations": recent_evaluations } except Exception as e: logger.error(f"Failed to get evaluation summary: {e}") return {"error": str(e), "evaluations": []} # Global validator instance _validator = None def get_validator() -> MedicalAnswerValidator: """Get the global validator instance with lazy loading.""" global _validator if _validator is None: _validator = MedicalAnswerValidator() return _validator def validate_medical_answer( question: str, retrieved_documents: List[Dict[str, Any]], generated_answer: str ) -> Dict[str, Any]: """ Convenience function to validate a medical answer. Args: question: The original user question retrieved_documents: List of retrieved documents with metadata generated_answer: The AI-generated answer to validate Returns: Dict containing the complete evaluation with metadata """ validator = get_validator() return validator.validate_answer(question, retrieved_documents, generated_answer)