jmisak's picture
Upload 57 files
52d0298 verified
raw
history blame
22.1 kB
import os
import json
import re
from typing import Tuple, Dict, List, Union, Any
from concurrent.futures import ThreadPoolExecutor, TimeoutError as ThreadTimeout
# Import structured logging
try:
from logger import get_logger
logger = get_logger()
except ImportError:
# Fallback if logger module not available
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
# Option 1: Use Hugging Face Inference API (recommended for better quality)
# Option 2: Use larger local model
# Option 3: Use OpenAI/Anthropic API if available
DEBUG_MODE = os.getenv("DEBUG_MODE", "False").lower() == "true"
USE_HF_API = os.getenv("USE_HF_API", "False").lower() == "true" # Set default to False
HF_TOKEN = os.getenv("HUGGINGFACE_TOKEN", "")
def log(msg):
"""Legacy debug logging function - deprecated, use logger instead"""
if DEBUG_MODE:
logger.debug(msg)
def get_system_prompt(interviewee_type: str, is_summary: bool = False) -> str:
"""Generate context-aware system prompts"""
base_prompt = """You are an expert medical transcript analyzer specializing in healthcare interviews.
Your task is to extract structured, actionable insights from interview transcripts.
Core Principles:
- Focus on factual, verifiable medical information
- Distinguish between speaker roles accurately
- Filter out pleasantries, disclaimers, and off-topic content
- Extract specific medical terms, dosages, and treatment details
- Identify patterns and clinical reasoning
"""
if is_summary:
return base_prompt + """
CROSS-INTERVIEW SYNTHESIS & VALIDATION TASK:
You are analyzing multiple transcripts. Extract verified patterns and flag inconsistencies.
STEP 1 - PATTERN IDENTIFICATION:
For each theme, count occurrences across transcripts:
- How many participants mentioned X? (e.g., "7 out of 10 participants")
- Calculate percentages when relevant
- What's the range of perspectives?
STEP 2 - CLASSIFY BY CONSENSUS LEVEL:
- STRONG CONSENSUS (80%+ agreement): Findings most participants agree on
- MAJORITY VIEW (60-79%): Significant but not universal agreement
- SPLIT PERSPECTIVES (40-59%): Where views diverge
- OUTLIERS (<40%): Unique but noteworthy perspectives
STEP 3 - CROSS-VALIDATE:
- Check for contradictions between transcripts
- Note where perspectives differ and why
- Flag quality issues (brief transcripts, vague responses)
STEP 4 - CITE EVIDENCE:
- Reference specific transcript numbers
- Include brief supporting quotes/details
- Distinguish fact from interpretation
OUTPUT FORMAT:
Start with 2-3 sentence executive overview, then:
**STRONG CONSENSUS FINDINGS:**
[List with counts and evidence]
**MAJORITY FINDINGS:**
[List with counts]
**DIVERGENT PERSPECTIVES:**
[Where participants disagreed and context]
**NOTABLE OUTLIERS:**
[Unique but important points]
**QUALITY NOTES:**
[Any gaps or transcript issues]
CRITICAL RULES:
- NEVER use vague terms like "many," "most," "some" - always use specific numbers
- ALWAYS cite transcript numbers for claims
- FLAG weak evidence explicitly
- Separate facts from interpretations
- NO JSON output - write in clear narrative prose
"""
if interviewee_type == "HCP":
return base_prompt + """
Healthcare Professional Analysis Focus:
- Prescribing patterns and medication choices
- Diagnostic reasoning and clinical decision-making
- Treatment protocols and guidelines referenced
- Peer perspectives on efficacy and safety
- Barriers to treatment or adoption
- Off-label uses or emerging practices
Extract and structure:
1. Diagnoses mentioned with context
2. Prescriptions with dosage, frequency, and rationale
3. Treatment strategies and their justifications
4. Clinical guidelines or studies referenced
5. Challenges or barriers discussed
6. Key clinical insights or pearls
"""
elif interviewee_type == "Patient":
return base_prompt + """
Patient Interview Analysis Focus:
- Symptom descriptions and severity
- Treatment experiences and outcomes
- Side effects and tolerability
- Quality of life impacts
- Adherence challenges and enablers
- Emotional and psychological factors
- Healthcare system interactions
Extract and structure:
1. Primary symptoms with duration and severity
2. Current and past treatments
3. Treatment effectiveness and satisfaction
4. Side effects experienced
5. Concerns and unmet needs
6. Quality of life impacts
7. Support systems and resources
"""
else:
return base_prompt + """
General Interview Analysis Focus:
- Main themes and topics discussed
- Key insights and observations
- Recommendations or suggestions
- Contextual factors
- Areas of emphasis or concern
Extract and structure relevant information based on interview content.
"""
def build_extraction_template(interviewee_type: str) -> str:
"""Create JSON template for structured data extraction"""
if interviewee_type == "HCP":
return """{
"diagnoses": ["condition 1", "condition 2"],
"prescriptions": ["medication (dose, frequency, indication)"],
"treatment_rationale": ["reason for treatment choice"],
"guidelines_mentioned": ["guideline or study name"],
"clinical_decisions": ["key clinical decision with reasoning"],
"barriers": ["barrier to treatment"],
"key_insights": ["notable clinical insight"]
}"""
elif interviewee_type == "Patient":
return """{
"symptoms": ["symptom (severity, duration)"],
"concerns": ["patient concern or question"],
"treatments_current": ["current treatment"],
"treatments_past": ["past treatment with outcome"],
"treatment_response": ["description of how treatment is working"],
"side_effects": ["side effect experienced"],
"quality_of_life": ["impact on daily life"],
"adherence_factors": ["factor affecting medication adherence"]
}"""
else:
return """{
"key_insights": ["main insight or finding"],
"themes": ["recurring theme"],
"recommendations": ["recommendation or suggestion"],
"context": ["important contextual information"]
}"""
def ensure_string_response(response: Any) -> str:
"""
Ensure LLM response is a string, converting if necessary
This function standardizes all LLM responses to prevent dict vs string errors
that were causing issues in app.py lines 240-251, 531-587
Args:
response: LLM response (may be str, dict, or other type)
Returns:
String representation of the response
"""
if isinstance(response, str):
return response
if isinstance(response, dict):
# Try to extract meaningful text from dict
if 'content' in response:
return str(response['content'])
elif 'generated_text' in response:
return str(response['generated_text'])
elif 'text' in response:
return str(response['text'])
elif 'output' in response:
return str(response['output'])
else:
# Fallback: stringify the entire dict
logger.warning(f"Converting dict response to string: {list(response.keys())}")
return str(response)
if response is None:
logger.warning("LLM returned None, using empty string")
return ""
# For any other type, convert to string
logger.warning(f"Converting {type(response).__name__} response to string")
return str(response)
def parse_structured_response(text: str, interviewee_type: str) -> Dict:
"""Extract structured data from LLM response"""
# Ensure text is a string
text = ensure_string_response(text)
log(f"Parsing response ({len(text)} chars) for type: {interviewee_type}")
log(f"Response preview: {text[:500]}...")
# Try to find JSON block
json_match = re.search(r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}', text, re.DOTALL)
if json_match:
log(f"Found JSON match: {json_match.group()[:200]}...")
try:
data = json.loads(json_match.group())
log(f"✅ Successfully extracted JSON with {len(data)} fields: {list(data.keys())}")
return data
except json.JSONDecodeError as e:
log(f"❌ JSON parsing failed: {e}")
log(f"Attempted to parse: {json_match.group()[:300]}")
else:
log("⚠️ No JSON block found in response, using regex fallback")
# Fallback: Extract from text using patterns
data = {}
if interviewee_type == "HCP":
log("Using HCP extraction patterns...")
# Extract diagnoses
diag_pattern = r'(?:diagnos[ei]s|condition):\s*([^\n]+)'
data["diagnoses"] = re.findall(diag_pattern, text, re.IGNORECASE)
# Extract prescriptions
rx_pattern = r'(?:prescri[bp]\w*|medication):\s*([^\n]+)'
data["prescriptions"] = re.findall(rx_pattern, text, re.IGNORECASE)
# Extract treatment rationale
treat_pattern = r'(?:treatment|therapy|rationale):\s*([^\n]+)'
data["treatment_rationale"] = re.findall(treat_pattern, text, re.IGNORECASE)
elif interviewee_type == "Patient":
log("Using Patient extraction patterns...")
# Extract symptoms
symptom_pattern = r'(?:symptom|complaint|experienc\w*):\s*([^\n]+)'
data["symptoms"] = re.findall(symptom_pattern, text, re.IGNORECASE)
# Extract concerns
concern_pattern = r'(?:concern|worry|question|anxious):\s*([^\n]+)'
data["concerns"] = re.findall(concern_pattern, text, re.IGNORECASE)
# Extract side effects
se_pattern = r'(?:side effect|adverse|reaction):\s*([^\n]+)'
data["side_effects"] = re.findall(se_pattern, text, re.IGNORECASE)
# Clean and deduplicate
for key in data:
data[key] = list(set([item.strip() for item in data[key] if item.strip()]))
log(f"Fallback extraction result: {len(data)} fields, {sum(len(v) for v in data.values())} total items")
log(f"Extracted fields: {data}")
return data
def query_llm_hf_api(prompt: str, max_tokens: int = 1500) -> str:
"""Use Hugging Face Inference API with proper authentication"""
import requests
import json
hf_token = os.getenv("HUGGINGFACE_TOKEN", "")
if not hf_token:
error_msg = "[Error] HUGGINGFACE_TOKEN not set in environment!"
logger.error(error_msg)
return error_msg
logger.debug(f"Using HF token for authentication (first 20 chars): {hf_token[:20]}...")
try:
# Get model from environment variable (default to Phi-3 if not set)
hf_model = os.getenv("HF_MODEL", "microsoft/Phi-3-mini-4k-instruct")
API_URL = f"https://api-inference.huggingface.co/models/{hf_model}"
# Use Bearer token in Authorization header
headers = {
"Authorization": f"Bearer {hf_token}",
"Content-Type": "application/json"
}
# Get temperature from environment
temperature = float(os.getenv("LLM_TEMPERATURE", "0.5"))
# Use the FULL prompt (don't truncate - the model can handle it)
payload = {
"inputs": prompt,
"parameters": {
"max_new_tokens": max_tokens, # Use parameter passed to function
"temperature": temperature,
"return_full_text": False
}
}
# Get timeout from environment
timeout = int(os.getenv("LLM_TIMEOUT", "60"))
logger.info(f"Calling HF API: {hf_model} (max_tokens={max_tokens}, temp={temperature})")
response = requests.post(API_URL, headers=headers, json=payload, timeout=timeout)
logger.debug(f"HF API status code: {response.status_code}")
if response.status_code == 200:
result = response.json()
if isinstance(result, list) and len(result) > 0:
generated_text = result[0].get("generated_text", "")
logger.success(f"HF API response received: {len(generated_text)} characters")
logger.debug(f"Response preview: {generated_text[:200]}")
return generated_text
else:
logger.warning(f"Unexpected HF API response format: {result}")
return "[Error] Unexpected API response format"
elif response.status_code == 401:
logger.error("HF API 401 Unauthorized - Token invalid or expired")
logger.debug(f"Response: {response.text[:500]}")
return "[Error] Invalid HuggingFace token - create a new one at https://huggingface.co/settings/tokens"
else:
logger.error(f"HF API failed with status {response.status_code}")
logger.debug(f"Response: {response.text[:500]}")
return f"[Error] API returned status {response.status_code}"
except Exception as e:
import traceback
full_error = traceback.format_exc()
logger.error(f"HF API error: {e}")
logger.debug(full_error)
return f"[Error] HF API failed: {e}"
def query_llm_lmstudio(prompt: str, max_tokens: int = 1500) -> str:
"""Query LM Studio local server (OpenAI-compatible API)"""
import requests
import json
lmstudio_url = os.getenv("LMSTUDIO_URL", "http://localhost:1234/v1/chat/completions")
logger.info(f"Calling LM Studio: {lmstudio_url}")
try:
payload = {
"messages": [
{
"role": "user",
"content": prompt
}
],
"temperature": float(os.getenv("LLM_TEMPERATURE", "0.7")),
"max_tokens": max_tokens,
"stream": False
}
response = requests.post(lmstudio_url, json=payload, timeout=120)
logger.debug(f"LM Studio status code: {response.status_code}")
if response.status_code == 200:
result = response.json()
generated_text = result["choices"][0]["message"]["content"]
logger.success(f"LM Studio response received: {len(generated_text)} characters")
logger.debug(f"Response preview: {generated_text[:300]}")
return generated_text
else:
error_msg = f"[Error] LM Studio returned status {response.status_code}: {response.text[:200]}"
logger.error(error_msg)
return error_msg
except requests.exceptions.ConnectionError:
error_msg = "[Error] Cannot connect to LM Studio. Make sure:\n1. LM Studio is running\n2. Server is started (in LM Studio's Server tab)\n3. A model is loaded\n4. Server is on http://localhost:1234"
logger.error(error_msg)
return error_msg
except Exception as e:
error_msg = f"[Error] LM Studio failed: {e}"
logger.error(error_msg)
import traceback
logger.debug(traceback.format_exc())
return error_msg
def query_llm_local(prompt: str, max_tokens: int = 1500) -> str:
"""
Local model inference optimized for HuggingFace Spaces
Uses Phi-3-mini for better instruction following and JSON generation
"""
try:
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch
# Get model name from environment (can be set in Spaces Variables)
model_name = os.getenv("LOCAL_MODEL", "microsoft/Phi-3-mini-4k-instruct")
# Load model once and cache it
if not hasattr(query_llm_local, 'model'):
logger.info(f"Loading local model: {model_name}")
query_llm_local.tokenizer = AutoTokenizer.from_pretrained(
model_name,
trust_remote_code=True
)
query_llm_local.model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
device_map="auto",
trust_remote_code=True
)
logger.success(f"Model loaded on {query_llm_local.model.device}")
# Get temperature from environment
temperature = float(os.getenv("LLM_TEMPERATURE", "0.7"))
# Tokenize with proper truncation for 4k context
inputs = query_llm_local.tokenizer(
prompt,
return_tensors="pt",
truncation=True,
max_length=3500 # Leave room for response
)
# Move to device
device = query_llm_local.model.device
inputs = {k: v.to(device) for k, v in inputs.items()}
# Generate with proper parameters
logger.info(f"Generating with local model (max_tokens={max_tokens}, temp={temperature})")
outputs = query_llm_local.model.generate(
**inputs,
max_new_tokens=max_tokens,
temperature=temperature,
do_sample=temperature > 0,
pad_token_id=query_llm_local.tokenizer.eos_token_id
)
# Decode only the new tokens (not the prompt)
response = query_llm_local.tokenizer.decode(
outputs[0][inputs['input_ids'].shape[1]:],
skip_special_tokens=True
)
logger.success(f"Local model generated {len(response)} characters")
return response.strip()
except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error(f"Local model error: {e}")
logger.debug(error_details)
return f"[Error] Local model failed: {e}"
def query_llm(
chunk: str,
user_context: str,
interviewee_type: str,
extract_structured: bool = False,
is_summary: bool = False,
timeout: int = 120
) -> Tuple[str, Dict]:
"""
Main LLM query function with structured extraction
Returns:
Tuple of (response_text, structured_data_dict)
"""
system_prompt = get_system_prompt(interviewee_type, is_summary)
extraction_template = build_extraction_template(interviewee_type) if extract_structured else ""
# Build comprehensive prompt
full_prompt = f"""{system_prompt}
User Instructions:
{user_context}
Transcript Segment to Analyze:
{chunk}
"""
if extract_structured:
full_prompt += f"""
IMPORTANT: Provide your analysis in two parts:
1. A clear narrative summary (3-5 sentences)
2. Structured data in this exact JSON format:
{extraction_template}
Be specific and include relevant details (dosages, durations, severity levels, etc.)
"""
# Truncate if needed (but increased limit)
max_prompt_length = 6000 # Increased from 2000
if len(full_prompt) > max_prompt_length:
chunk_limit = max_prompt_length - len(system_prompt) - len(user_context) - len(extraction_template) - 500
chunk = chunk[:chunk_limit]
full_prompt = f"{system_prompt}\n\nUser Instructions:\n{user_context}\n\nTranscript Segment:\n{chunk}\n\n"
if extract_structured:
full_prompt += f"Provide analysis and structured JSON: {extraction_template}"
log(f"Prompt truncated to {len(full_prompt)} characters")
def generate():
# Check environment variables dynamically (not using module-level USE_HF_API)
use_lmstudio = os.getenv("USE_LMSTUDIO", "False").lower() == "true"
use_hf_api = os.getenv("USE_HF_API", "False").lower() == "true"
hf_token = os.getenv("HUGGINGFACE_TOKEN", "")
if use_lmstudio:
return query_llm_lmstudio(full_prompt, max_tokens=2000)
elif use_hf_api and hf_token:
return query_llm_hf_api(full_prompt, max_tokens=1500)
else:
return query_llm_local(full_prompt, max_tokens=1500)
# Execute with timeout
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(generate)
try:
response = future.result(timeout=timeout)
# CRITICAL: Ensure response is a string before any processing
response = ensure_string_response(response)
log(f"LLM response received ({len(response)} chars)")
# Extract structured data if requested
structured_data = {}
clean_response = response
if extract_structured:
structured_data = parse_structured_response(response, interviewee_type)
# Remove JSON blocks from the narrative text (handle nested braces)
# Remove all {....} blocks repeatedly until none remain
prev_response = ""
while prev_response != clean_response:
prev_response = clean_response
clean_response = re.sub(r'\{[^{}]*\}', '', clean_response, flags=re.DOTALL)
# Also remove common JSON artifacts
clean_response = re.sub(r'###\s*JSON\s*Structure:', '', clean_response, flags=re.IGNORECASE)
clean_response = re.sub(r'###\s*Analysis:', '', clean_response, flags=re.IGNORECASE)
clean_response = re.sub(r'###\s*Response:', '', clean_response, flags=re.IGNORECASE)
clean_response = re.sub(r'Please provide.*?structured JSON.*', '', clean_response, flags=re.IGNORECASE|re.DOTALL)
clean_response = clean_response.strip()
log(f"Cleaned response: {len(clean_response)} chars (removed JSON)")
# Final safety check: ensure we're returning a string
clean_response = ensure_string_response(clean_response)
return clean_response, structured_data
except ThreadTimeout:
logger.error("LLM generation timed out")
return "[Error] LLM generation timed out.", {}
except Exception as e:
logger.error(f"LLM generation failed: {e}")
return f"[Error] LLM generation failed: {e}", {}
def extract_structured_data(text: str, interviewee_type: str) -> Dict:
"""
Standalone function to extract structured data from existing text
Useful for post-processing
"""
return parse_structured_response(text, interviewee_type)