Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import re | |
| from typing import Tuple, Dict, List, Union, Any | |
| from concurrent.futures import ThreadPoolExecutor, TimeoutError as ThreadTimeout | |
| # Import structured logging | |
| try: | |
| from logger import get_logger | |
| logger = get_logger() | |
| except ImportError: | |
| # Fallback if logger module not available | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| logging.basicConfig(level=logging.INFO) | |
| # Option 1: Use Hugging Face Inference API (recommended for better quality) | |
| # Option 2: Use larger local model | |
| # Option 3: Use OpenAI/Anthropic API if available | |
| DEBUG_MODE = os.getenv("DEBUG_MODE", "False").lower() == "true" | |
| USE_HF_API = os.getenv("USE_HF_API", "False").lower() == "true" # Set default to False | |
| HF_TOKEN = os.getenv("HUGGINGFACE_TOKEN", "") | |
| def log(msg): | |
| """Legacy debug logging function - deprecated, use logger instead""" | |
| if DEBUG_MODE: | |
| logger.debug(msg) | |
| def get_system_prompt(interviewee_type: str, is_summary: bool = False) -> str: | |
| """Generate context-aware system prompts""" | |
| base_prompt = """You are an expert medical transcript analyzer specializing in healthcare interviews. | |
| Your task is to extract structured, actionable insights from interview transcripts. | |
| Core Principles: | |
| - Focus on factual, verifiable medical information | |
| - Distinguish between speaker roles accurately | |
| - Filter out pleasantries, disclaimers, and off-topic content | |
| - Extract specific medical terms, dosages, and treatment details | |
| - Identify patterns and clinical reasoning | |
| """ | |
| if is_summary: | |
| return base_prompt + """ | |
| CROSS-INTERVIEW SYNTHESIS & VALIDATION TASK: | |
| You are analyzing multiple transcripts. Extract verified patterns and flag inconsistencies. | |
| STEP 1 - PATTERN IDENTIFICATION: | |
| For each theme, count occurrences across transcripts: | |
| - How many participants mentioned X? (e.g., "7 out of 10 participants") | |
| - Calculate percentages when relevant | |
| - What's the range of perspectives? | |
| STEP 2 - CLASSIFY BY CONSENSUS LEVEL: | |
| - STRONG CONSENSUS (80%+ agreement): Findings most participants agree on | |
| - MAJORITY VIEW (60-79%): Significant but not universal agreement | |
| - SPLIT PERSPECTIVES (40-59%): Where views diverge | |
| - OUTLIERS (<40%): Unique but noteworthy perspectives | |
| STEP 3 - CROSS-VALIDATE: | |
| - Check for contradictions between transcripts | |
| - Note where perspectives differ and why | |
| - Flag quality issues (brief transcripts, vague responses) | |
| STEP 4 - CITE EVIDENCE: | |
| - Reference specific transcript numbers | |
| - Include brief supporting quotes/details | |
| - Distinguish fact from interpretation | |
| OUTPUT FORMAT: | |
| Start with 2-3 sentence executive overview, then: | |
| **STRONG CONSENSUS FINDINGS:** | |
| [List with counts and evidence] | |
| **MAJORITY FINDINGS:** | |
| [List with counts] | |
| **DIVERGENT PERSPECTIVES:** | |
| [Where participants disagreed and context] | |
| **NOTABLE OUTLIERS:** | |
| [Unique but important points] | |
| **QUALITY NOTES:** | |
| [Any gaps or transcript issues] | |
| CRITICAL RULES: | |
| - NEVER use vague terms like "many," "most," "some" - always use specific numbers | |
| - ALWAYS cite transcript numbers for claims | |
| - FLAG weak evidence explicitly | |
| - Separate facts from interpretations | |
| - NO JSON output - write in clear narrative prose | |
| """ | |
| if interviewee_type == "HCP": | |
| return base_prompt + """ | |
| Healthcare Professional Analysis Focus: | |
| - Prescribing patterns and medication choices | |
| - Diagnostic reasoning and clinical decision-making | |
| - Treatment protocols and guidelines referenced | |
| - Peer perspectives on efficacy and safety | |
| - Barriers to treatment or adoption | |
| - Off-label uses or emerging practices | |
| Extract and structure: | |
| 1. Diagnoses mentioned with context | |
| 2. Prescriptions with dosage, frequency, and rationale | |
| 3. Treatment strategies and their justifications | |
| 4. Clinical guidelines or studies referenced | |
| 5. Challenges or barriers discussed | |
| 6. Key clinical insights or pearls | |
| """ | |
| elif interviewee_type == "Patient": | |
| return base_prompt + """ | |
| Patient Interview Analysis Focus: | |
| - Symptom descriptions and severity | |
| - Treatment experiences and outcomes | |
| - Side effects and tolerability | |
| - Quality of life impacts | |
| - Adherence challenges and enablers | |
| - Emotional and psychological factors | |
| - Healthcare system interactions | |
| Extract and structure: | |
| 1. Primary symptoms with duration and severity | |
| 2. Current and past treatments | |
| 3. Treatment effectiveness and satisfaction | |
| 4. Side effects experienced | |
| 5. Concerns and unmet needs | |
| 6. Quality of life impacts | |
| 7. Support systems and resources | |
| """ | |
| else: | |
| return base_prompt + """ | |
| General Interview Analysis Focus: | |
| - Main themes and topics discussed | |
| - Key insights and observations | |
| - Recommendations or suggestions | |
| - Contextual factors | |
| - Areas of emphasis or concern | |
| Extract and structure relevant information based on interview content. | |
| """ | |
| def build_extraction_template(interviewee_type: str) -> str: | |
| """Create JSON template for structured data extraction""" | |
| if interviewee_type == "HCP": | |
| return """{ | |
| "diagnoses": ["condition 1", "condition 2"], | |
| "prescriptions": ["medication (dose, frequency, indication)"], | |
| "treatment_rationale": ["reason for treatment choice"], | |
| "guidelines_mentioned": ["guideline or study name"], | |
| "clinical_decisions": ["key clinical decision with reasoning"], | |
| "barriers": ["barrier to treatment"], | |
| "key_insights": ["notable clinical insight"] | |
| }""" | |
| elif interviewee_type == "Patient": | |
| return """{ | |
| "symptoms": ["symptom (severity, duration)"], | |
| "concerns": ["patient concern or question"], | |
| "treatments_current": ["current treatment"], | |
| "treatments_past": ["past treatment with outcome"], | |
| "treatment_response": ["description of how treatment is working"], | |
| "side_effects": ["side effect experienced"], | |
| "quality_of_life": ["impact on daily life"], | |
| "adherence_factors": ["factor affecting medication adherence"] | |
| }""" | |
| else: | |
| return """{ | |
| "key_insights": ["main insight or finding"], | |
| "themes": ["recurring theme"], | |
| "recommendations": ["recommendation or suggestion"], | |
| "context": ["important contextual information"] | |
| }""" | |
| def ensure_string_response(response: Any) -> str: | |
| """ | |
| Ensure LLM response is a string, converting if necessary | |
| This function standardizes all LLM responses to prevent dict vs string errors | |
| that were causing issues in app.py lines 240-251, 531-587 | |
| Args: | |
| response: LLM response (may be str, dict, or other type) | |
| Returns: | |
| String representation of the response | |
| """ | |
| if isinstance(response, str): | |
| return response | |
| if isinstance(response, dict): | |
| # Try to extract meaningful text from dict | |
| if 'content' in response: | |
| return str(response['content']) | |
| elif 'generated_text' in response: | |
| return str(response['generated_text']) | |
| elif 'text' in response: | |
| return str(response['text']) | |
| elif 'output' in response: | |
| return str(response['output']) | |
| else: | |
| # Fallback: stringify the entire dict | |
| logger.warning(f"Converting dict response to string: {list(response.keys())}") | |
| return str(response) | |
| if response is None: | |
| logger.warning("LLM returned None, using empty string") | |
| return "" | |
| # For any other type, convert to string | |
| logger.warning(f"Converting {type(response).__name__} response to string") | |
| return str(response) | |
| def parse_structured_response(text: str, interviewee_type: str) -> Dict: | |
| """Extract structured data from LLM response""" | |
| # Ensure text is a string | |
| text = ensure_string_response(text) | |
| log(f"Parsing response ({len(text)} chars) for type: {interviewee_type}") | |
| log(f"Response preview: {text[:500]}...") | |
| # Try to find JSON block | |
| json_match = re.search(r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}', text, re.DOTALL) | |
| if json_match: | |
| log(f"Found JSON match: {json_match.group()[:200]}...") | |
| try: | |
| data = json.loads(json_match.group()) | |
| log(f"✅ Successfully extracted JSON with {len(data)} fields: {list(data.keys())}") | |
| return data | |
| except json.JSONDecodeError as e: | |
| log(f"❌ JSON parsing failed: {e}") | |
| log(f"Attempted to parse: {json_match.group()[:300]}") | |
| else: | |
| log("⚠️ No JSON block found in response, using regex fallback") | |
| # Fallback: Extract from text using patterns | |
| data = {} | |
| if interviewee_type == "HCP": | |
| log("Using HCP extraction patterns...") | |
| # Extract diagnoses | |
| diag_pattern = r'(?:diagnos[ei]s|condition):\s*([^\n]+)' | |
| data["diagnoses"] = re.findall(diag_pattern, text, re.IGNORECASE) | |
| # Extract prescriptions | |
| rx_pattern = r'(?:prescri[bp]\w*|medication):\s*([^\n]+)' | |
| data["prescriptions"] = re.findall(rx_pattern, text, re.IGNORECASE) | |
| # Extract treatment rationale | |
| treat_pattern = r'(?:treatment|therapy|rationale):\s*([^\n]+)' | |
| data["treatment_rationale"] = re.findall(treat_pattern, text, re.IGNORECASE) | |
| elif interviewee_type == "Patient": | |
| log("Using Patient extraction patterns...") | |
| # Extract symptoms | |
| symptom_pattern = r'(?:symptom|complaint|experienc\w*):\s*([^\n]+)' | |
| data["symptoms"] = re.findall(symptom_pattern, text, re.IGNORECASE) | |
| # Extract concerns | |
| concern_pattern = r'(?:concern|worry|question|anxious):\s*([^\n]+)' | |
| data["concerns"] = re.findall(concern_pattern, text, re.IGNORECASE) | |
| # Extract side effects | |
| se_pattern = r'(?:side effect|adverse|reaction):\s*([^\n]+)' | |
| data["side_effects"] = re.findall(se_pattern, text, re.IGNORECASE) | |
| # Clean and deduplicate | |
| for key in data: | |
| data[key] = list(set([item.strip() for item in data[key] if item.strip()])) | |
| log(f"Fallback extraction result: {len(data)} fields, {sum(len(v) for v in data.values())} total items") | |
| log(f"Extracted fields: {data}") | |
| return data | |
| def query_llm_hf_api(prompt: str, max_tokens: int = 1500) -> str: | |
| """Use Hugging Face Inference API with proper authentication""" | |
| import requests | |
| import json | |
| hf_token = os.getenv("HUGGINGFACE_TOKEN", "") | |
| if not hf_token: | |
| error_msg = "[Error] HUGGINGFACE_TOKEN not set in environment!" | |
| logger.error(error_msg) | |
| return error_msg | |
| logger.debug(f"Using HF token for authentication (first 20 chars): {hf_token[:20]}...") | |
| try: | |
| # Get model from environment variable (default to Phi-3 if not set) | |
| hf_model = os.getenv("HF_MODEL", "microsoft/Phi-3-mini-4k-instruct") | |
| API_URL = f"https://api-inference.huggingface.co/models/{hf_model}" | |
| # Use Bearer token in Authorization header | |
| headers = { | |
| "Authorization": f"Bearer {hf_token}", | |
| "Content-Type": "application/json" | |
| } | |
| # Get temperature from environment | |
| temperature = float(os.getenv("LLM_TEMPERATURE", "0.5")) | |
| # Use the FULL prompt (don't truncate - the model can handle it) | |
| payload = { | |
| "inputs": prompt, | |
| "parameters": { | |
| "max_new_tokens": max_tokens, # Use parameter passed to function | |
| "temperature": temperature, | |
| "return_full_text": False | |
| } | |
| } | |
| # Get timeout from environment | |
| timeout = int(os.getenv("LLM_TIMEOUT", "60")) | |
| logger.info(f"Calling HF API: {hf_model} (max_tokens={max_tokens}, temp={temperature})") | |
| response = requests.post(API_URL, headers=headers, json=payload, timeout=timeout) | |
| logger.debug(f"HF API status code: {response.status_code}") | |
| if response.status_code == 200: | |
| result = response.json() | |
| if isinstance(result, list) and len(result) > 0: | |
| generated_text = result[0].get("generated_text", "") | |
| logger.success(f"HF API response received: {len(generated_text)} characters") | |
| logger.debug(f"Response preview: {generated_text[:200]}") | |
| return generated_text | |
| else: | |
| logger.warning(f"Unexpected HF API response format: {result}") | |
| return "[Error] Unexpected API response format" | |
| elif response.status_code == 401: | |
| logger.error("HF API 401 Unauthorized - Token invalid or expired") | |
| logger.debug(f"Response: {response.text[:500]}") | |
| return "[Error] Invalid HuggingFace token - create a new one at https://huggingface.co/settings/tokens" | |
| else: | |
| logger.error(f"HF API failed with status {response.status_code}") | |
| logger.debug(f"Response: {response.text[:500]}") | |
| return f"[Error] API returned status {response.status_code}" | |
| except Exception as e: | |
| import traceback | |
| full_error = traceback.format_exc() | |
| logger.error(f"HF API error: {e}") | |
| logger.debug(full_error) | |
| return f"[Error] HF API failed: {e}" | |
| def query_llm_lmstudio(prompt: str, max_tokens: int = 1500) -> str: | |
| """Query LM Studio local server (OpenAI-compatible API)""" | |
| import requests | |
| import json | |
| lmstudio_url = os.getenv("LMSTUDIO_URL", "http://localhost:1234/v1/chat/completions") | |
| logger.info(f"Calling LM Studio: {lmstudio_url}") | |
| try: | |
| payload = { | |
| "messages": [ | |
| { | |
| "role": "user", | |
| "content": prompt | |
| } | |
| ], | |
| "temperature": float(os.getenv("LLM_TEMPERATURE", "0.7")), | |
| "max_tokens": max_tokens, | |
| "stream": False | |
| } | |
| response = requests.post(lmstudio_url, json=payload, timeout=120) | |
| logger.debug(f"LM Studio status code: {response.status_code}") | |
| if response.status_code == 200: | |
| result = response.json() | |
| generated_text = result["choices"][0]["message"]["content"] | |
| logger.success(f"LM Studio response received: {len(generated_text)} characters") | |
| logger.debug(f"Response preview: {generated_text[:300]}") | |
| return generated_text | |
| else: | |
| error_msg = f"[Error] LM Studio returned status {response.status_code}: {response.text[:200]}" | |
| logger.error(error_msg) | |
| return error_msg | |
| except requests.exceptions.ConnectionError: | |
| error_msg = "[Error] Cannot connect to LM Studio. Make sure:\n1. LM Studio is running\n2. Server is started (in LM Studio's Server tab)\n3. A model is loaded\n4. Server is on http://localhost:1234" | |
| logger.error(error_msg) | |
| return error_msg | |
| except Exception as e: | |
| error_msg = f"[Error] LM Studio failed: {e}" | |
| logger.error(error_msg) | |
| import traceback | |
| logger.debug(traceback.format_exc()) | |
| return error_msg | |
| def query_llm_local(prompt: str, max_tokens: int = 1500) -> str: | |
| """ | |
| Local model inference optimized for HuggingFace Spaces | |
| Uses Phi-3-mini for better instruction following and JSON generation | |
| """ | |
| try: | |
| from transformers import AutoModelForCausalLM, AutoTokenizer | |
| import torch | |
| # Get model name from environment (can be set in Spaces Variables) | |
| model_name = os.getenv("LOCAL_MODEL", "microsoft/Phi-3-mini-4k-instruct") | |
| # Load model once and cache it | |
| if not hasattr(query_llm_local, 'model'): | |
| logger.info(f"Loading local model: {model_name}") | |
| query_llm_local.tokenizer = AutoTokenizer.from_pretrained( | |
| model_name, | |
| trust_remote_code=True | |
| ) | |
| query_llm_local.model = AutoModelForCausalLM.from_pretrained( | |
| model_name, | |
| torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, | |
| device_map="auto", | |
| trust_remote_code=True | |
| ) | |
| logger.success(f"Model loaded on {query_llm_local.model.device}") | |
| # Get temperature from environment | |
| temperature = float(os.getenv("LLM_TEMPERATURE", "0.7")) | |
| # Tokenize with proper truncation for 4k context | |
| inputs = query_llm_local.tokenizer( | |
| prompt, | |
| return_tensors="pt", | |
| truncation=True, | |
| max_length=3500 # Leave room for response | |
| ) | |
| # Move to device | |
| device = query_llm_local.model.device | |
| inputs = {k: v.to(device) for k, v in inputs.items()} | |
| # Generate with proper parameters | |
| logger.info(f"Generating with local model (max_tokens={max_tokens}, temp={temperature})") | |
| outputs = query_llm_local.model.generate( | |
| **inputs, | |
| max_new_tokens=max_tokens, | |
| temperature=temperature, | |
| do_sample=temperature > 0, | |
| pad_token_id=query_llm_local.tokenizer.eos_token_id | |
| ) | |
| # Decode only the new tokens (not the prompt) | |
| response = query_llm_local.tokenizer.decode( | |
| outputs[0][inputs['input_ids'].shape[1]:], | |
| skip_special_tokens=True | |
| ) | |
| logger.success(f"Local model generated {len(response)} characters") | |
| return response.strip() | |
| except Exception as e: | |
| import traceback | |
| error_details = traceback.format_exc() | |
| logger.error(f"Local model error: {e}") | |
| logger.debug(error_details) | |
| return f"[Error] Local model failed: {e}" | |
| def query_llm( | |
| chunk: str, | |
| user_context: str, | |
| interviewee_type: str, | |
| extract_structured: bool = False, | |
| is_summary: bool = False, | |
| timeout: int = 120 | |
| ) -> Tuple[str, Dict]: | |
| """ | |
| Main LLM query function with structured extraction | |
| Returns: | |
| Tuple of (response_text, structured_data_dict) | |
| """ | |
| system_prompt = get_system_prompt(interviewee_type, is_summary) | |
| extraction_template = build_extraction_template(interviewee_type) if extract_structured else "" | |
| # Build comprehensive prompt | |
| full_prompt = f"""{system_prompt} | |
| User Instructions: | |
| {user_context} | |
| Transcript Segment to Analyze: | |
| {chunk} | |
| """ | |
| if extract_structured: | |
| full_prompt += f""" | |
| IMPORTANT: Provide your analysis in two parts: | |
| 1. A clear narrative summary (3-5 sentences) | |
| 2. Structured data in this exact JSON format: | |
| {extraction_template} | |
| Be specific and include relevant details (dosages, durations, severity levels, etc.) | |
| """ | |
| # Truncate if needed (but increased limit) | |
| max_prompt_length = 6000 # Increased from 2000 | |
| if len(full_prompt) > max_prompt_length: | |
| chunk_limit = max_prompt_length - len(system_prompt) - len(user_context) - len(extraction_template) - 500 | |
| chunk = chunk[:chunk_limit] | |
| full_prompt = f"{system_prompt}\n\nUser Instructions:\n{user_context}\n\nTranscript Segment:\n{chunk}\n\n" | |
| if extract_structured: | |
| full_prompt += f"Provide analysis and structured JSON: {extraction_template}" | |
| log(f"Prompt truncated to {len(full_prompt)} characters") | |
| def generate(): | |
| # Check environment variables dynamically (not using module-level USE_HF_API) | |
| use_lmstudio = os.getenv("USE_LMSTUDIO", "False").lower() == "true" | |
| use_hf_api = os.getenv("USE_HF_API", "False").lower() == "true" | |
| hf_token = os.getenv("HUGGINGFACE_TOKEN", "") | |
| if use_lmstudio: | |
| return query_llm_lmstudio(full_prompt, max_tokens=2000) | |
| elif use_hf_api and hf_token: | |
| return query_llm_hf_api(full_prompt, max_tokens=1500) | |
| else: | |
| return query_llm_local(full_prompt, max_tokens=1500) | |
| # Execute with timeout | |
| with ThreadPoolExecutor(max_workers=1) as executor: | |
| future = executor.submit(generate) | |
| try: | |
| response = future.result(timeout=timeout) | |
| # CRITICAL: Ensure response is a string before any processing | |
| response = ensure_string_response(response) | |
| log(f"LLM response received ({len(response)} chars)") | |
| # Extract structured data if requested | |
| structured_data = {} | |
| clean_response = response | |
| if extract_structured: | |
| structured_data = parse_structured_response(response, interviewee_type) | |
| # Remove JSON blocks from the narrative text (handle nested braces) | |
| # Remove all {....} blocks repeatedly until none remain | |
| prev_response = "" | |
| while prev_response != clean_response: | |
| prev_response = clean_response | |
| clean_response = re.sub(r'\{[^{}]*\}', '', clean_response, flags=re.DOTALL) | |
| # Also remove common JSON artifacts | |
| clean_response = re.sub(r'###\s*JSON\s*Structure:', '', clean_response, flags=re.IGNORECASE) | |
| clean_response = re.sub(r'###\s*Analysis:', '', clean_response, flags=re.IGNORECASE) | |
| clean_response = re.sub(r'###\s*Response:', '', clean_response, flags=re.IGNORECASE) | |
| clean_response = re.sub(r'Please provide.*?structured JSON.*', '', clean_response, flags=re.IGNORECASE|re.DOTALL) | |
| clean_response = clean_response.strip() | |
| log(f"Cleaned response: {len(clean_response)} chars (removed JSON)") | |
| # Final safety check: ensure we're returning a string | |
| clean_response = ensure_string_response(clean_response) | |
| return clean_response, structured_data | |
| except ThreadTimeout: | |
| logger.error("LLM generation timed out") | |
| return "[Error] LLM generation timed out.", {} | |
| except Exception as e: | |
| logger.error(f"LLM generation failed: {e}") | |
| return f"[Error] LLM generation failed: {e}", {} | |
| def extract_structured_data(text: str, interviewee_type: str) -> Dict: | |
| """ | |
| Standalone function to extract structured data from existing text | |
| Useful for post-processing | |
| """ | |
| return parse_structured_response(text, interviewee_type) |