File size: 4,101 Bytes
b515e8c
 
 
 
 
 
 
 
 
4459bdc
b515e8c
 
4459bdc
b515e8c
 
 
 
 
4459bdc
b515e8c
 
 
 
 
 
 
 
 
4459bdc
b515e8c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4459bdc
b515e8c
 
 
 
 
 
4459bdc
6ed180c
 
 
 
 
 
 
 
 
 
b515e8c
 
 
4459bdc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import re
import hashlib
import io
import json
from datetime import datetime
from typing import Dict, List, Tuple
from bson import ObjectId
import logging

logger = logging.getLogger(__name__)

def clean_text_response(text: str) -> str:
    """Clean and format text response"""
    text = re.sub(r'\n\s*\n', '\n\n', text)
    text = re.sub(r'[ ]+', ' ', text)
    return text.replace("**", "").replace("__", "").strip()

def extract_section(text: str, heading: str) -> str:
    """Extract a section from text based on heading"""
    try:
        pattern = rf"{re.escape(heading)}:\s*\n(.*?)(?=\n[A-Z][^\n]*:|\Z)"
        match = re.search(pattern, text, re.DOTALL | re.IGNORECASE)
        return match.group(1).strip() if match else ""
    except Exception as e:
        logger.error(f"Section extraction failed for heading '{heading}': {e}")
        return ""

def structure_medical_response(text: str) -> Dict:
    """Structure medical response into sections"""
    def extract_improved(text: str, heading: str) -> str:
        patterns = [
            rf"{re.escape(heading)}:\s*\n(.*?)(?=\n\s*\n|\Z)",
            rf"\*\*{re.escape(heading)}\*\*:\s*\n(.*?)(?=\n\s*\n|\Z)",
            rf"{re.escape(heading)}[\s\-]+(.*?)(?=\n\s*\n|\Z)",
            rf"\n{re.escape(heading)}\s*\n(.*?)(?=\n\s*\n|\Z)"
        ]
        for pattern in patterns:
            match = re.search(pattern, text, re.DOTALL | re.IGNORECASE)
            if match:
                content = match.group(1).strip()
                content = re.sub(r'^\s*[\-\*]\s*', '', content, flags=re.MULTILINE)
                return content
        return ""
    
    text = text.replace('**', '').replace('__', '')
    return {
        "summary": extract_improved(text, "Summary of Patient's Medical History") or 
                  extract_improved(text, "Summarize the patient's medical history"),
        "risks": extract_improved(text, "Identify Risks or Red Flags") or 
                extract_improved(text, "Risks or Red Flags"),
        "missed_issues": extract_improved(text, "Missed Diagnoses or Treatments") or 
                       extract_improved(text, "What the doctor might have missed"),
        "recommendations": extract_improved(text, "Suggest Next Clinical Steps") or 
                         extract_improved(text, "Suggested Clinical Actions")
    }

def serialize_patient(patient: dict) -> dict:
    """Serialize patient data for JSON response"""
    patient_copy = patient.copy()
    if "_id" in patient_copy:
        patient_copy["_id"] = str(patient_copy["_id"])
    return patient_copy

def compute_patient_data_hash(data: dict) -> str:
    """Compute hash of patient data for change detection"""
    # Custom JSON encoder to handle datetime objects
    class DateTimeEncoder(json.JSONEncoder):
        def default(self, obj):
            if isinstance(obj, datetime):
                return obj.isoformat()
            elif isinstance(obj, ObjectId):
                return str(obj)
            return super().default(obj)
    
    serialized = json.dumps(data, sort_keys=True, cls=DateTimeEncoder)
    return hashlib.sha256(serialized.encode()).hexdigest()

def compute_file_content_hash(file_content: bytes) -> str:
    """Compute hash of file content"""
    return hashlib.sha256(file_content).hexdigest()

def create_notification(user_id: str, title: str, message: str, notification_type: str = "info", patient_id: str = None) -> dict:
    """Create a notification object"""
    return {
        "user_id": user_id,
        "title": title,
        "message": message,
        "type": notification_type,
        "read": False,
        "timestamp": datetime.utcnow(),
        "patient_id": patient_id
    }

def format_risk_level(risk_level: str) -> str:
    """Normalize risk level names"""
    risk_level_mapping = {
        'low': 'low',
        'medium': 'moderate',
        'moderate': 'moderate', 
        'high': 'high',
        'severe': 'severe',
        'critical': 'severe',
        'none': 'none',
        'unknown': 'none'
    }
    return risk_level_mapping.get(risk_level.lower(), 'none')