| import logging
|
| import os
|
| import re
|
|
|
| import torch
|
| from transformers import AutoModelForSequenceClassification, AutoTokenizer, pipeline
|
|
|
|
|
| class MedicalReportAnalyzer:
|
| """
|
| A class for analyzing medical text reports using pre-trained NLP models from Hugging Face.
|
|
|
| This analyzer can:
|
| 1. Extract medical entities (conditions, treatments, tests)
|
| 2. Classify report severity
|
| 3. Extract key findings
|
| 4. Identify suggested follow-up actions
|
| """
|
|
|
| def __init__(
|
| self,
|
| ner_model="samrawal/bert-base-uncased_medical-ner",
|
| classifier_model="medicalai/ClinicalBERT",
|
| device=None,
|
| ):
|
| """
|
| Initialize the text analyzer with specific pre-trained models.
|
|
|
| Args:
|
| ner_model (str): Model for named entity recognition
|
| classifier_model (str): Model for text classification
|
| device (str, optional): Device to run models on ('cuda' or 'cpu')
|
| """
|
| self.logger = logging.getLogger(__name__)
|
|
|
|
|
| if device is None:
|
| self.device = "cuda" if torch.cuda.is_available() else "cpu"
|
| else:
|
| self.device = device
|
|
|
| self.logger.info(f"Using device: {self.device}")
|
|
|
|
|
| hf_token = os.environ.get("HF_TOKEN") or os.environ.get("HUGGING_FACE_HUB_TOKEN")
|
|
|
|
|
| try:
|
| self.ner_pipeline = pipeline(
|
| "token-classification",
|
| model=ner_model,
|
| aggregation_strategy="simple",
|
| device=0 if self.device == "cuda" else -1,
|
| token=hf_token,
|
| )
|
| self.logger.info(f"Successfully loaded NER model: {ner_model}")
|
| except Exception as e:
|
| self.logger.error(f"Failed to load NER model: {e}")
|
| self.ner_pipeline = None
|
|
|
|
|
| try:
|
| self.tokenizer = AutoTokenizer.from_pretrained(
|
| classifier_model, token=hf_token
|
| )
|
| self.classifier = AutoModelForSequenceClassification.from_pretrained(
|
| classifier_model, token=hf_token
|
| )
|
| self.classifier.to(self.device)
|
| self.classifier.eval()
|
| self.logger.info(
|
| f"Successfully loaded classifier model: {classifier_model}"
|
| )
|
| except Exception as e:
|
| self.logger.error(f"Failed to load classifier model: {e}")
|
| self.classifier = None
|
|
|
|
|
| self.severity_levels = {
|
| 0: "Normal",
|
| 1: "Mild",
|
| 2: "Moderate",
|
| 3: "Severe",
|
| 4: "Critical",
|
| }
|
|
|
|
|
| self.finding_severity = {
|
| "pneumonia": 3,
|
| "fracture": 3,
|
| "tumor": 4,
|
| "nodule": 2,
|
| "mass": 3,
|
| "edema": 2,
|
| "effusion": 2,
|
| "hemorrhage": 3,
|
| "opacity": 1,
|
| "atelectasis": 2,
|
| "pneumothorax": 3,
|
| "consolidation": 2,
|
| "cardiomegaly": 2,
|
| }
|
|
|
| def extract_entities(self, text):
|
| """
|
| Extract medical entities from the report text.
|
|
|
| Args:
|
| text (str): Medical report text
|
|
|
| Returns:
|
| dict: Dictionary of entity lists by category
|
| """
|
| if not self.ner_pipeline:
|
| self.logger.warning("NER model not available")
|
| return {}
|
|
|
| try:
|
|
|
| entities = self.ner_pipeline(text)
|
|
|
|
|
| grouped_entities = {
|
| "problem": [],
|
| "test": [],
|
| "treatment": [],
|
| "anatomy": [],
|
| }
|
|
|
| for entity in entities:
|
| entity_type = entity.get("entity_group", "").lower()
|
|
|
|
|
| if entity_type in ["problem", "disease", "condition", "diagnosis"]:
|
| category = "problem"
|
| elif entity_type in ["test", "procedure", "examination"]:
|
| category = "test"
|
| elif entity_type in ["treatment", "medication", "drug"]:
|
| category = "treatment"
|
| elif entity_type in ["body_part", "anatomy", "organ"]:
|
| category = "anatomy"
|
| else:
|
| continue
|
|
|
| word = entity.get("word", "")
|
| score = entity.get("score", 0)
|
|
|
|
|
| if score > 0.7 and word not in grouped_entities[category]:
|
| grouped_entities[category].append(word)
|
|
|
| return grouped_entities
|
|
|
| except Exception as e:
|
| self.logger.error(f"Error extracting entities: {e}")
|
| return {}
|
|
|
| def assess_severity(self, text):
|
| """
|
| Assess the severity level of the medical report.
|
|
|
| Args:
|
| text (str): Medical report text
|
|
|
| Returns:
|
| dict: Severity assessment including level and confidence
|
| """
|
| if not self.classifier:
|
| self.logger.warning("Classifier model not available")
|
| return {"level": "Unknown", "score": 0.0}
|
|
|
| try:
|
|
|
| severity_score = 0
|
| confidence = 0.5
|
|
|
|
|
| severe_keywords = [
|
| "severe",
|
| "critical",
|
| "urgent",
|
| "emergency",
|
| "immediate attention",
|
| ]
|
| moderate_keywords = ["moderate", "concerning", "follow-up", "monitor"]
|
| mild_keywords = ["mild", "minimal", "slight", "minor"]
|
| normal_keywords = [
|
| "normal",
|
| "unremarkable",
|
| "no abnormalities",
|
| "within normal limits",
|
| ]
|
|
|
|
|
| text_lower = text.lower()
|
| severe_count = sum(text_lower.count(word) for word in severe_keywords)
|
| moderate_count = sum(text_lower.count(word) for word in moderate_keywords)
|
| mild_count = sum(text_lower.count(word) for word in mild_keywords)
|
| normal_count = sum(text_lower.count(word) for word in normal_keywords)
|
|
|
|
|
| if severe_count > 0:
|
| severity_score += min(severe_count, 2) * 1.5
|
| confidence += 0.1
|
| if moderate_count > 0:
|
| severity_score += min(moderate_count, 3) * 0.75
|
| confidence += 0.05
|
| if mild_count > 0:
|
| severity_score += min(mild_count, 3) * 0.25
|
| confidence += 0.05
|
| if normal_count > 0:
|
| severity_score -= min(normal_count, 3) * 0.75
|
| confidence += 0.1
|
|
|
|
|
| for finding, level in self.finding_severity.items():
|
| if finding in text_lower:
|
| severity_score += level * 0.5
|
| confidence += 0.05
|
|
|
|
|
| severity_score = max(0, min(4, severity_score))
|
| severity_level = int(round(severity_score))
|
|
|
|
|
| severity = self.severity_levels.get(severity_level, "Moderate")
|
|
|
|
|
| confidence = min(0.95, confidence)
|
|
|
| return {
|
| "level": severity,
|
| "score": round(severity_score, 1),
|
| "confidence": round(confidence, 2),
|
| }
|
|
|
| except Exception as e:
|
| self.logger.error(f"Error assessing severity: {e}")
|
| return {"level": "Unknown", "score": 0.0, "confidence": 0.0}
|
|
|
| def extract_findings(self, text):
|
| """
|
| Extract key clinical findings from the report.
|
|
|
| Args:
|
| text (str): Medical report text
|
|
|
| Returns:
|
| list: List of key findings
|
| """
|
| try:
|
|
|
| sentences = re.split(r"[.!?]\s+", text)
|
| findings = []
|
|
|
|
|
| finding_markers = [
|
| "finding",
|
| "observed",
|
| "noted",
|
| "shows",
|
| "reveals",
|
| "demonstrates",
|
| "indicates",
|
| "evident",
|
| "apparent",
|
| "consistent with",
|
| "suggestive of",
|
| ]
|
|
|
|
|
| negation_markers = ["no", "not", "none", "negative", "without", "denies"]
|
|
|
| for sentence in sentences:
|
|
|
| if len(sentence.split()) < 3:
|
| continue
|
|
|
| sentence = sentence.strip()
|
|
|
|
|
| contains_finding_marker = any(
|
| marker in sentence.lower() for marker in finding_markers
|
| )
|
|
|
|
|
| contains_negation = any(
|
| marker in sentence.lower().split() for marker in negation_markers
|
| )
|
|
|
|
|
| if contains_finding_marker or (
|
| contains_negation
|
| and any(
|
| term in sentence.lower()
|
| for term in self.finding_severity.keys()
|
| )
|
| ):
|
| findings.append(sentence)
|
|
|
| return findings
|
|
|
| except Exception as e:
|
| self.logger.error(f"Error extracting findings: {e}")
|
| return []
|
|
|
| def suggest_followup(self, text, entities, severity):
|
| """
|
| Suggest follow-up actions based on report analysis.
|
|
|
| Args:
|
| text (str): Medical report text
|
| entities (dict): Extracted entities
|
| severity (dict): Severity assessment
|
|
|
| Returns:
|
| list: Suggested follow-up actions
|
| """
|
| try:
|
| followups = []
|
|
|
|
|
| severity_level = severity.get("level", "Unknown")
|
| severity_score = severity.get("score", 0)
|
|
|
|
|
| problems = entities.get("problem", [])
|
|
|
|
|
| followup_mentioned = any(
|
| phrase in text.lower()
|
| for phrase in [
|
| "follow up",
|
| "follow-up",
|
| "followup",
|
| "return",
|
| "refer",
|
| "consult",
|
| ]
|
| )
|
|
|
|
|
| if severity_level == "Critical":
|
| followups.append("Immediate specialist consultation recommended.")
|
|
|
| elif severity_level == "Severe":
|
| followups.append("Prompt follow-up with specialist is recommended.")
|
|
|
|
|
| for problem in problems:
|
| if "pneumonia" in problem.lower():
|
| followups.append(
|
| "Consider antibiotic therapy and close monitoring."
|
| )
|
| elif "fracture" in problem.lower():
|
| followups.append(
|
| "Orthopedic consultation for treatment planning."
|
| )
|
| elif "mass" in problem.lower() or "tumor" in problem.lower():
|
| followups.append(
|
| "Further imaging and possible biopsy recommended."
|
| )
|
|
|
| elif severity_level == "Moderate":
|
| followups.append("Follow-up with primary care physician recommended.")
|
| if not followup_mentioned and problems:
|
| followups.append(
|
| "Consider additional imaging or tests for further evaluation."
|
| )
|
|
|
| elif severity_level == "Mild":
|
| if problems:
|
| followups.append(
|
| "Routine follow-up with primary care physician as needed."
|
| )
|
| else:
|
| followups.append("No immediate follow-up required.")
|
|
|
| else:
|
| followups.append(
|
| "No specific follow-up indicated based on this report."
|
| )
|
|
|
|
|
| for critical_term in ["mass", "tumor", "nodule", "opacity"]:
|
| if (
|
| critical_term in text.lower()
|
| and "follow-up" not in " ".join(followups).lower()
|
| ):
|
| followups.append(
|
| f"Follow-up imaging recommended to monitor {critical_term}."
|
| )
|
| break
|
|
|
| return followups
|
|
|
| except Exception as e:
|
| self.logger.error(f"Error suggesting follow-up: {e}")
|
| return ["Unable to generate follow-up recommendations."]
|
|
|
| def analyze(self, text):
|
| """
|
| Perform comprehensive analysis of medical report text.
|
|
|
| Args:
|
| text (str): Medical report text
|
|
|
| Returns:
|
| dict: Complete analysis results
|
| """
|
| try:
|
|
|
| entities = self.extract_entities(text)
|
|
|
|
|
| severity = self.assess_severity(text)
|
|
|
|
|
| findings = self.extract_findings(text)
|
|
|
|
|
| followups = self.suggest_followup(text, entities, severity)
|
|
|
|
|
| report = {
|
| "entities": entities,
|
| "severity": severity,
|
| "findings": findings,
|
| "followup_recommendations": followups,
|
| }
|
|
|
| return report
|
|
|
| except Exception as e:
|
| self.logger.error(f"Error analyzing report: {e}")
|
| return {"error": str(e)}
|
|
|
|
|
|
|
| if __name__ == "__main__":
|
|
|
| logging.basicConfig(level=logging.INFO)
|
|
|
|
|
| analyzer = MedicalReportAnalyzer()
|
|
|
| sample_report = """
|
| CHEST X-RAY EXAMINATION
|
|
|
| CLINICAL HISTORY: 55-year-old male with cough and fever.
|
|
|
| FINDINGS: The heart size is at the upper limits of normal. The lungs are clear without focal consolidation,
|
| effusion, or pneumothorax. There is mild prominence of the pulmonary vasculature. No pleural effusion is seen.
|
| There is a small nodular opacity noted in the right lower lobe measuring approximately 8mm, which is suspicious
|
| and warrants further investigation. The mediastinum is unremarkable. The visualized bony structures show no acute abnormalities.
|
|
|
| IMPRESSION:
|
| 1. Mild cardiomegaly.
|
| 2. 8mm nodular opacity in the right lower lobe, recommend follow-up CT for further evaluation.
|
| 3. No acute pulmonary parenchymal abnormality.
|
|
|
| RECOMMENDATIONS: Follow-up chest CT to further characterize the nodular opacity in the right lower lobe.
|
| """
|
|
|
| results = analyzer.analyze(sample_report)
|
|
|
| print("\nMedical Report Analysis:")
|
| print(
|
| f"\nSeverity: {results['severity']['level']} (Score: {results['severity']['score']})"
|
| )
|
|
|
| print("\nKey Findings:")
|
| for finding in results["findings"]:
|
| print(f"- {finding}")
|
|
|
| print("\nEntities:")
|
| for category, items in results["entities"].items():
|
| if items:
|
| print(f"- {category.capitalize()}: {', '.join(items)}")
|
|
|
| print("\nFollow-up Recommendations:")
|
| for rec in results["followup_recommendations"]:
|
| print(f"- {rec}")
|
|
|