| | """ |
| | RAG Query Engine for Lab Report Decoder |
| | Uses Hugging Face models - OPTIMIZED for speed |
| | """ |
| |
|
| | from sentence_transformers import SentenceTransformer |
| | from transformers import pipeline |
| | import chromadb |
| | from typing import List, Dict |
| | from pdf_extractor import LabResult |
| | import os |
| |
|
| | class LabReportRAG: |
| | """RAG system for explaining lab results - Fast and efficient""" |
| | |
| | def __init__(self, db_path: str = "./chroma_db"): |
| | """Initialize the RAG system with fast models""" |
| | |
| | print("π Loading models (optimized for speed)...") |
| | |
| | |
| | self.embedding_model = SentenceTransformer('all-MiniLM-L6-v2') |
| | print("β
Embeddings loaded") |
| | |
| | |
| | print("π Loading text generation model...") |
| | try: |
| | |
| | self.text_generator = pipeline( |
| | "text2text-generation", |
| | model="google/flan-t5-small", |
| | max_length=256, |
| | device=-1 |
| | ) |
| | print("β
Text generation model loaded (Flan-T5-small)") |
| | except Exception as e: |
| | print(f"β οΈ Model loading error: {e}") |
| | self.text_generator = None |
| | |
| | |
| | try: |
| | self.client = chromadb.PersistentClient(path=db_path) |
| | self.collection = self.client.get_collection("lab_reports") |
| | print("β
Vector database loaded") |
| | except Exception as e: |
| | print(f"β οΈ Vector database not found: {e}") |
| | self.collection = None |
| | |
| | def _retrieve_context(self, query: str, k: int = 2) -> str: |
| | """Retrieve relevant context from vector database""" |
| | if self.collection is None: |
| | return "Limited medical information available." |
| | |
| | try: |
| | |
| | query_embedding = self.embedding_model.encode(query).tolist() |
| | |
| | |
| | results = self.collection.query( |
| | query_embeddings=[query_embedding], |
| | n_results=k |
| | ) |
| | |
| | |
| | if results and results['documents'] and len(results['documents'][0]) > 0: |
| | context = "\n".join(results['documents'][0]) |
| | |
| | return context[:1000] |
| | else: |
| | return "No specific information found." |
| | except Exception as e: |
| | print(f"Retrieval error: {e}") |
| | return "Error retrieving information." |
| | |
| | def _generate_text(self, prompt: str) -> str: |
| | """Generate text - with fallback to template-based""" |
| | if self.text_generator is None: |
| | return "AI model not available. Using basic explanation." |
| | |
| | try: |
| | |
| | result = self.text_generator( |
| | prompt, |
| | max_length=256, |
| | do_sample=True, |
| | temperature=0.7, |
| | num_return_sequences=1 |
| | ) |
| | return result[0]['generated_text'].strip() |
| | except Exception as e: |
| | print(f"Generation error: {e}") |
| | return "Unable to generate detailed explanation." |
| | |
| | def explain_result(self, result: LabResult) -> str: |
| | """Generate explanation for a single lab result""" |
| | |
| | print(f" Explaining: {result.test_name} ({result.status})...") |
| | |
| | |
| | if result.status == 'normal': |
| | return self._explain_normal(result) |
| | elif result.status == 'high': |
| | return self._explain_high(result) |
| | elif result.status == 'low': |
| | return self._explain_low(result) |
| | else: |
| | return self._explain_unknown(result) |
| | |
| | def _explain_normal(self, result: LabResult) -> str: |
| | """Fast template for normal results""" |
| | context = self._retrieve_context(f"{result.test_name} normal meaning", k=1) |
| | |
| | explanation = f"""β
Your {result.test_name} level of {result.value} {result.unit} is within the normal range ({result.reference_range}). |
| | |
| | This indicates healthy levels. """ |
| | |
| | if context and len(context) > 20: |
| | |
| | explanation += f"\n\n{context[:300]}" |
| | |
| | return explanation |
| | |
| | def _explain_high(self, result: LabResult) -> str: |
| | """Fast template for high results""" |
| | context = self._retrieve_context(f"{result.test_name} high causes treatment", k=2) |
| | |
| | explanation = f"""β οΈ Your {result.test_name} level of {result.value} {result.unit} is ABOVE the normal range ({result.reference_range}). |
| | |
| | """ |
| | |
| | if context and len(context) > 20: |
| | explanation += f"{context[:400]}\n\n" |
| | |
| | explanation += "π‘ Recommendation: Discuss these results with your healthcare provider for personalized advice." |
| | |
| | return explanation |
| | |
| | def _explain_low(self, result: LabResult) -> str: |
| | """Fast template for low results""" |
| | context = self._retrieve_context(f"{result.test_name} low causes treatment", k=2) |
| | |
| | explanation = f"""β οΈ Your {result.test_name} level of {result.value} {result.unit} is BELOW the normal range ({result.reference_range}). |
| | |
| | """ |
| | |
| | if context and len(context) > 20: |
| | explanation += f"{context[:400]}\n\n" |
| | |
| | explanation += "π‘ Recommendation: Consult with your healthcare provider about these results." |
| | |
| | return explanation |
| | |
| | def _explain_unknown(self, result: LabResult) -> str: |
| | """Template for unknown status""" |
| | return f"""Your {result.test_name} result is {result.value} {result.unit}. |
| | |
| | Reference range: {result.reference_range} |
| | |
| | We couldn't automatically determine if this is within normal range. Please consult your healthcare provider to interpret this result.""" |
| | |
| | def explain_all_results(self, results: List[LabResult]) -> Dict[str, str]: |
| | """Generate explanations for all lab results - FAST""" |
| | explanations = {} |
| | |
| | print(f"π§ Generating explanations for {len(results)} results...") |
| | |
| | for i, result in enumerate(results, 1): |
| | print(f" [{i}/{len(results)}] {result.test_name}...") |
| | try: |
| | explanation = self.explain_result(result) |
| | explanations[result.test_name] = explanation |
| | except Exception as e: |
| | print(f" Error: {e}") |
| | explanations[result.test_name] = f"Unable to generate explanation for {result.test_name}." |
| | |
| | print("β
All explanations generated") |
| | return explanations |
| | |
| | def answer_followup_question(self, question: str, lab_results: List[LabResult]) -> str: |
| | """Answer follow-up questions - FAST""" |
| | |
| | print(f"π¬ Processing question: {question[:50]}...") |
| | |
| | |
| | results_summary = [] |
| | for r in lab_results[:10]: |
| | results_summary.append( |
| | f"{r.test_name}: {r.value} {r.unit} ({r.status})" |
| | ) |
| | results_context = "\n".join(results_summary) |
| | |
| | |
| | medical_context = self._retrieve_context(question, k=2) |
| | |
| | |
| | if "food" in question.lower() or "eat" in question.lower() or "diet" in question.lower(): |
| | answer = f"""Based on your lab results:\n\n{results_context}\n\n""" |
| | if medical_context and len(medical_context) > 20: |
| | answer += f"{medical_context[:500]}" |
| | else: |
| | answer += "For dietary recommendations specific to your results, please consult with a healthcare provider or nutritionist." |
| | |
| | elif "why" in question.lower() or "cause" in question.lower(): |
| | answer = f"""Regarding your question about your results:\n\n""" |
| | if medical_context and len(medical_context) > 20: |
| | answer += f"{medical_context[:500]}" |
| | else: |
| | answer += "There can be various causes for abnormal lab results. Your healthcare provider can help identify the specific cause in your case." |
| | |
| | else: |
| | |
| | if medical_context and len(medical_context) > 20: |
| | answer = medical_context[:500] |
| | else: |
| | answer = f"""Based on your results:\n{results_context}\n\nFor specific medical advice about your results, please consult with your healthcare provider.""" |
| | |
| | print("β
Answer generated") |
| | return answer |
| | |
| | def generate_summary(self, results: List[LabResult]) -> str: |
| | """Generate overall summary - FAST""" |
| | |
| | print("π Generating summary...") |
| | |
| | abnormal = [r for r in results if r.status in ['high', 'low']] |
| | normal = [r for r in results if r.status == 'normal'] |
| | |
| | if not abnormal: |
| | return """β
Excellent news! All your lab results are within normal ranges. |
| | |
| | This suggests that the tested parameters are functioning well. Continue maintaining your current health habits, and follow your healthcare provider's recommendations for routine monitoring.""" |
| | |
| | |
| | summary = f"""π Lab Results Summary |
| | |
| | Total Tests: {len(results)} |
| | β
Normal: {len(normal)} |
| | β οΈ Abnormal: {len(abnormal)} |
| | |
| | """ |
| | |
| | if abnormal: |
| | summary += "**Tests Outside Normal Range:**\n" |
| | for r in abnormal[:5]: |
| | status_emoji = "β" if r.status == "high" else "β" |
| | summary += f"{status_emoji} {r.test_name}: {r.value} {r.unit} ({r.status})\n" |
| | |
| | if len(abnormal) > 5: |
| | summary += f"... and {len(abnormal) - 5} more\n" |
| | |
| | summary += "\n" |
| | |
| | |
| | if abnormal: |
| | abnormal_names = ", ".join([r.test_name for r in abnormal[:3]]) |
| | context = self._retrieve_context(f"{abnormal_names} interpretation", k=2) |
| | |
| | if context and len(context) > 20: |
| | summary += f"**Key Information:**\n{context[:400]}\n\n" |
| | |
| | summary += """**Next Steps:** |
| | 1. Review these results with your healthcare provider |
| | 2. Discuss any concerns or symptoms you're experiencing |
| | 3. Follow recommended treatment or monitoring plans |
| | |
| | Remember: These results are for educational purposes. Always consult your doctor for medical advice.""" |
| | |
| | print("β
Summary generated") |
| | return summary |
| |
|
| |
|
| | |
| | if __name__ == "__main__": |
| | print("Testing RAG system...") |
| | |
| | try: |
| | rag = LabReportRAG() |
| | print("\nβ
RAG system initialized successfully!") |
| | |
| | |
| | from pdf_extractor import LabResult |
| | test_result = LabResult( |
| | test_name="Hemoglobin", |
| | value="10.5", |
| | unit="g/dL", |
| | reference_range="12.0-15.5", |
| | status="low" |
| | ) |
| | |
| | explanation = rag.explain_result(test_result) |
| | print(f"\nTest Explanation:\n{explanation}") |
| | |
| | except Exception as e: |
| | print(f"\nβ Error: {e}") |