| import os |
| import json |
| import re |
| from typing import Tuple, Dict, List |
| from concurrent.futures import ThreadPoolExecutor, TimeoutError as ThreadTimeout |
|
|
|
|
| |
| |
| |
|
|
| DEBUG_MODE = os.getenv("DEBUG_MODE", "False").lower() == "true" |
| USE_HF_API = os.getenv("USE_HF_API", "False").lower() == "true" |
| HF_TOKEN = os.getenv("HUGGINGFACE_TOKEN", "") |
|
|
| |
| |
| |
| def log(msg): |
| if DEBUG_MODE: |
| print(f"[LLM Debug] {msg}") |
|
|
|
|
| def get_system_prompt(interviewee_type: str, is_summary: bool = False) -> str: |
| """Generate context-aware system prompts""" |
| |
| base_prompt = """You are an expert medical transcript analyzer specializing in healthcare interviews. |
| |
| Your task is to extract structured, actionable insights from interview transcripts. |
| |
| Core Principles: |
| - Focus on factual, verifiable medical information |
| - Distinguish between speaker roles accurately |
| - Filter out pleasantries, disclaimers, and off-topic content |
| - Extract specific medical terms, dosages, and treatment details |
| - Identify patterns and clinical reasoning |
| """ |
| |
| if is_summary: |
| return base_prompt + """ |
| CROSS-INTERVIEW SYNTHESIS & VALIDATION TASK: |
| |
| You are analyzing multiple transcripts. Extract verified patterns and flag inconsistencies. |
| |
| STEP 1 - PATTERN IDENTIFICATION: |
| For each theme, count occurrences across transcripts: |
| - How many participants mentioned X? (e.g., "7 out of 10 participants") |
| - Calculate percentages when relevant |
| - What's the range of perspectives? |
| |
| STEP 2 - CLASSIFY BY CONSENSUS LEVEL: |
| - STRONG CONSENSUS (80%+ agreement): Findings most participants agree on |
| - MAJORITY VIEW (60-79%): Significant but not universal agreement |
| - SPLIT PERSPECTIVES (40-59%): Where views diverge |
| - OUTLIERS (<40%): Unique but noteworthy perspectives |
| |
| STEP 3 - CROSS-VALIDATE: |
| - Check for contradictions between transcripts |
| - Note where perspectives differ and why |
| - Flag quality issues (brief transcripts, vague responses) |
| |
| STEP 4 - CITE EVIDENCE: |
| - Reference specific transcript numbers |
| - Include brief supporting quotes/details |
| - Distinguish fact from interpretation |
| |
| OUTPUT FORMAT: |
| Start with 2-3 sentence executive overview, then: |
| |
| **STRONG CONSENSUS FINDINGS:** |
| [List with counts and evidence] |
| |
| **MAJORITY FINDINGS:** |
| [List with counts] |
| |
| **DIVERGENT PERSPECTIVES:** |
| [Where participants disagreed and context] |
| |
| **NOTABLE OUTLIERS:** |
| [Unique but important points] |
| |
| **QUALITY NOTES:** |
| [Any gaps or transcript issues] |
| |
| CRITICAL RULES: |
| - NEVER use vague terms like "many," "most," "some" - always use specific numbers |
| - ALWAYS cite transcript numbers for claims |
| - FLAG weak evidence explicitly |
| - Separate facts from interpretations |
| - NO JSON output - write in clear narrative prose |
| """ |
| |
| if interviewee_type == "HCP": |
| return base_prompt + """ |
| Healthcare Professional Analysis Focus: |
| - Prescribing patterns and medication choices |
| - Diagnostic reasoning and clinical decision-making |
| - Treatment protocols and guidelines referenced |
| - Peer perspectives on efficacy and safety |
| - Barriers to treatment or adoption |
| - Off-label uses or emerging practices |
| |
| Extract and structure: |
| 1. Diagnoses mentioned with context |
| 2. Prescriptions with dosage, frequency, and rationale |
| 3. Treatment strategies and their justifications |
| 4. Clinical guidelines or studies referenced |
| 5. Challenges or barriers discussed |
| 6. Key clinical insights or pearls |
| """ |
| |
| elif interviewee_type == "Patient": |
| return base_prompt + """ |
| Patient Interview Analysis Focus: |
| - Symptom descriptions and severity |
| - Treatment experiences and outcomes |
| - Side effects and tolerability |
| - Quality of life impacts |
| - Adherence challenges and enablers |
| - Emotional and psychological factors |
| - Healthcare system interactions |
| |
| Extract and structure: |
| 1. Primary symptoms with duration and severity |
| 2. Current and past treatments |
| 3. Treatment effectiveness and satisfaction |
| 4. Side effects experienced |
| 5. Concerns and unmet needs |
| 6. Quality of life impacts |
| 7. Support systems and resources |
| """ |
| |
| else: |
| return base_prompt + """ |
| General Interview Analysis Focus: |
| - Main themes and topics discussed |
| - Key insights and observations |
| - Recommendations or suggestions |
| - Contextual factors |
| - Areas of emphasis or concern |
| |
| Extract and structure relevant information based on interview content. |
| """ |
|
|
|
|
| def build_extraction_template(interviewee_type: str) -> str: |
| """Create JSON template for structured data extraction""" |
| |
| if interviewee_type == "HCP": |
| return """{ |
| "diagnoses": ["condition 1", "condition 2"], |
| "prescriptions": ["medication (dose, frequency, indication)"], |
| "treatment_rationale": ["reason for treatment choice"], |
| "guidelines_mentioned": ["guideline or study name"], |
| "clinical_decisions": ["key clinical decision with reasoning"], |
| "barriers": ["barrier to treatment"], |
| "key_insights": ["notable clinical insight"] |
| }""" |
| |
| elif interviewee_type == "Patient": |
| return """{ |
| "symptoms": ["symptom (severity, duration)"], |
| "concerns": ["patient concern or question"], |
| "treatments_current": ["current treatment"], |
| "treatments_past": ["past treatment with outcome"], |
| "treatment_response": ["description of how treatment is working"], |
| "side_effects": ["side effect experienced"], |
| "quality_of_life": ["impact on daily life"], |
| "adherence_factors": ["factor affecting medication adherence"] |
| }""" |
| |
| else: |
| return """{ |
| "key_insights": ["main insight or finding"], |
| "themes": ["recurring theme"], |
| "recommendations": ["recommendation or suggestion"], |
| "context": ["important contextual information"] |
| }""" |
|
|
|
|
| def parse_structured_response(text: str, interviewee_type: str) -> Dict: |
| """Extract structured data from LLM response""" |
|
|
| log(f"Parsing response ({len(text)} chars) for type: {interviewee_type}") |
| log(f"Response preview: {text[:500]}...") |
|
|
| |
| json_match = re.search(r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}', text, re.DOTALL) |
|
|
| if json_match: |
| log(f"Found JSON match: {json_match.group()[:200]}...") |
| try: |
| data = json.loads(json_match.group()) |
| log(f"✅ Successfully extracted JSON with {len(data)} fields: {list(data.keys())}") |
| return data |
| except json.JSONDecodeError as e: |
| log(f"❌ JSON parsing failed: {e}") |
| log(f"Attempted to parse: {json_match.group()[:300]}") |
| else: |
| log("⚠️ No JSON block found in response, using regex fallback") |
|
|
| |
| data = {} |
|
|
| if interviewee_type == "HCP": |
| log("Using HCP extraction patterns...") |
| |
| diag_pattern = r'(?:diagnos[ei]s|condition):\s*([^\n]+)' |
| data["diagnoses"] = re.findall(diag_pattern, text, re.IGNORECASE) |
|
|
| |
| rx_pattern = r'(?:prescri[bp]\w*|medication):\s*([^\n]+)' |
| data["prescriptions"] = re.findall(rx_pattern, text, re.IGNORECASE) |
|
|
| |
| treat_pattern = r'(?:treatment|therapy|rationale):\s*([^\n]+)' |
| data["treatment_rationale"] = re.findall(treat_pattern, text, re.IGNORECASE) |
|
|
| elif interviewee_type == "Patient": |
| log("Using Patient extraction patterns...") |
| |
| symptom_pattern = r'(?:symptom|complaint|experienc\w*):\s*([^\n]+)' |
| data["symptoms"] = re.findall(symptom_pattern, text, re.IGNORECASE) |
|
|
| |
| concern_pattern = r'(?:concern|worry|question|anxious):\s*([^\n]+)' |
| data["concerns"] = re.findall(concern_pattern, text, re.IGNORECASE) |
|
|
| |
| se_pattern = r'(?:side effect|adverse|reaction):\s*([^\n]+)' |
| data["side_effects"] = re.findall(se_pattern, text, re.IGNORECASE) |
|
|
| |
| for key in data: |
| data[key] = list(set([item.strip() for item in data[key] if item.strip()])) |
|
|
| log(f"Fallback extraction result: {len(data)} fields, {sum(len(v) for v in data.values())} total items") |
| log(f"Extracted fields: {data}") |
| return data |
|
|
|
|
| def query_llm_hf_api(prompt: str, max_tokens: int = 1500) -> str: |
| """Use Hugging Face Inference API with proper authentication""" |
| import requests |
| import json |
|
|
| hf_token = os.getenv("HUGGINGFACE_TOKEN", "") |
|
|
| if not hf_token: |
| error_msg = "[Error] HUGGINGFACE_TOKEN not set in environment!" |
| print(f"❌ {error_msg}") |
| return error_msg |
|
|
| print(f"[HF API] Using token for authentication: {hf_token[:20]}...") |
|
|
| try: |
| |
| hf_model = os.getenv("HF_MODEL", "microsoft/Phi-3-mini-4k-instruct") |
| API_URL = f"https://api-inference.huggingface.co/models/{hf_model}" |
|
|
| |
| headers = { |
| "Authorization": f"Bearer {hf_token}", |
| "Content-Type": "application/json" |
| } |
|
|
| |
| temperature = float(os.getenv("LLM_TEMPERATURE", "0.5")) |
|
|
| |
| payload = { |
| "inputs": prompt, |
| "parameters": { |
| "max_new_tokens": max_tokens, |
| "temperature": temperature, |
| "return_full_text": False |
| } |
| } |
|
|
| |
| timeout = int(os.getenv("LLM_TIMEOUT", "60")) |
|
|
| print(f"[HF API] Calling {hf_model} ({max_tokens} tokens, temp={temperature})...") |
| response = requests.post(API_URL, headers=headers, json=payload, timeout=timeout) |
|
|
| print(f"[HF API] Status code: {response.status_code}") |
|
|
| if response.status_code == 200: |
| result = response.json() |
| if isinstance(result, list) and len(result) > 0: |
| generated_text = result[0].get("generated_text", "") |
| print(f"[HF API] ✅ Response: {len(generated_text)} characters") |
| print(f"[HF API] First 200 chars: {generated_text[:200]}") |
| return generated_text |
| else: |
| print(f"[HF API] Unexpected response format: {result}") |
| return "[Error] Unexpected API response format" |
| elif response.status_code == 401: |
| print(f"[HF API] ❌ 401 Unauthorized - Token invalid or expired") |
| print(f"[HF API] Token used: {hf_token}") |
| print(f"[HF API] Response: {response.text[:500]}") |
| return "[Error] Invalid HuggingFace token - create a new one at https://huggingface.co/settings/tokens" |
| else: |
| print(f"[HF API] Failed with status {response.status_code}") |
| print(f"[HF API] Response: {response.text[:500]}") |
| return f"[Error] API returned status {response.status_code}" |
|
|
| except Exception as e: |
| import traceback |
| full_error = traceback.format_exc() |
| print(f"[HF API] Error:\n{full_error}") |
| return f"[Error] HF API failed: {e}" |
|
|
|
|
| def query_llm_lmstudio(prompt: str, max_tokens: int = 1500) -> str: |
| """Query LM Studio local server (OpenAI-compatible API)""" |
| import requests |
| import json |
|
|
| lmstudio_url = os.getenv("LMSTUDIO_URL", "http://localhost:1234/v1/chat/completions") |
|
|
| print(f"[LM Studio] Calling {lmstudio_url}...") |
|
|
| try: |
| payload = { |
| "messages": [ |
| { |
| "role": "user", |
| "content": prompt |
| } |
| ], |
| "temperature": float(os.getenv("LLM_TEMPERATURE", "0.7")), |
| "max_tokens": max_tokens, |
| "stream": False |
| } |
|
|
| response = requests.post(lmstudio_url, json=payload, timeout=120) |
|
|
| print(f"[LM Studio] Status code: {response.status_code}") |
|
|
| if response.status_code == 200: |
| result = response.json() |
| generated_text = result["choices"][0]["message"]["content"] |
| print(f"[LM Studio] ✓ Response: {len(generated_text)} characters") |
| print(f"[LM Studio] First 300 chars: {generated_text[:300]}") |
| return generated_text |
| else: |
| error_msg = f"[Error] LM Studio returned status {response.status_code}: {response.text[:200]}" |
| print(f"[LM Studio] {error_msg}") |
| return error_msg |
|
|
| except requests.exceptions.ConnectionError: |
| error_msg = "[Error] Cannot connect to LM Studio. Make sure:\n1. LM Studio is running\n2. Server is started (in LM Studio's Server tab)\n3. A model is loaded\n4. Server is on http://localhost:1234" |
| print(f"[LM Studio] {error_msg}") |
| return error_msg |
| except Exception as e: |
| error_msg = f"[Error] LM Studio failed: {e}" |
| print(f"[LM Studio] {error_msg}") |
| import traceback |
| traceback.print_exc() |
| return error_msg |
|
|
|
|
| def query_llm_local(prompt: str, max_tokens: int = 1500) -> str: |
| """ |
| Local model inference optimized for HuggingFace Spaces |
| Uses Phi-3-mini for better instruction following and JSON generation |
| """ |
| try: |
| from transformers import AutoModelForCausalLM, AutoTokenizer |
| import torch |
|
|
| |
| model_name = os.getenv("LOCAL_MODEL", "microsoft/Phi-3-mini-4k-instruct") |
|
|
| |
| if not hasattr(query_llm_local, 'model'): |
| print(f"[Local Model] Loading {model_name}...") |
| query_llm_local.tokenizer = AutoTokenizer.from_pretrained( |
| model_name, |
| trust_remote_code=True |
| ) |
| query_llm_local.model = AutoModelForCausalLM.from_pretrained( |
| model_name, |
| torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, |
| device_map="auto", |
| trust_remote_code=True |
| ) |
| print(f"[Local Model] ✅ Model loaded on {query_llm_local.model.device}") |
|
|
| |
| temperature = float(os.getenv("LLM_TEMPERATURE", "0.7")) |
|
|
| |
| inputs = query_llm_local.tokenizer( |
| prompt, |
| return_tensors="pt", |
| truncation=True, |
| max_length=3500 |
| ) |
|
|
| |
| device = query_llm_local.model.device |
| inputs = {k: v.to(device) for k, v in inputs.items()} |
|
|
| |
| print(f"[Local Model] Generating ({max_tokens} max tokens, temp={temperature})...") |
| outputs = query_llm_local.model.generate( |
| **inputs, |
| max_new_tokens=max_tokens, |
| temperature=temperature, |
| do_sample=temperature > 0, |
| pad_token_id=query_llm_local.tokenizer.eos_token_id |
| ) |
|
|
| |
| response = query_llm_local.tokenizer.decode( |
| outputs[0][inputs['input_ids'].shape[1]:], |
| skip_special_tokens=True |
| ) |
|
|
| print(f"[Local Model] ✅ Generated {len(response)} characters") |
| return response.strip() |
|
|
| except Exception as e: |
| import traceback |
| error_details = traceback.format_exc() |
| log(f"Local model error:\n{error_details}") |
| return f"[Error] Local model failed: {e}" |
|
|
|
|
| def query_llm( |
| chunk: str, |
| user_context: str, |
| interviewee_type: str, |
| extract_structured: bool = False, |
| is_summary: bool = False, |
| timeout: int = 120 |
| ) -> Tuple[str, Dict]: |
| """ |
| Main LLM query function with structured extraction |
| |
| Returns: |
| Tuple of (response_text, structured_data_dict) |
| """ |
| |
| system_prompt = get_system_prompt(interviewee_type, is_summary) |
| extraction_template = build_extraction_template(interviewee_type) if extract_structured else "" |
| |
| |
| full_prompt = f"""{system_prompt} |
| |
| User Instructions: |
| {user_context} |
| |
| Transcript Segment to Analyze: |
| {chunk} |
| |
| """ |
| |
| if extract_structured: |
| full_prompt += f""" |
| IMPORTANT: Provide your analysis in two parts: |
| 1. A clear narrative summary (3-5 sentences) |
| 2. Structured data in this exact JSON format: |
| {extraction_template} |
| |
| Be specific and include relevant details (dosages, durations, severity levels, etc.) |
| """ |
| |
| |
| max_prompt_length = 6000 |
| if len(full_prompt) > max_prompt_length: |
| chunk_limit = max_prompt_length - len(system_prompt) - len(user_context) - len(extraction_template) - 500 |
| chunk = chunk[:chunk_limit] |
| full_prompt = f"{system_prompt}\n\nUser Instructions:\n{user_context}\n\nTranscript Segment:\n{chunk}\n\n" |
| if extract_structured: |
| full_prompt += f"Provide analysis and structured JSON: {extraction_template}" |
| log(f"Prompt truncated to {len(full_prompt)} characters") |
| |
| def generate(): |
| |
| use_lmstudio = os.getenv("USE_LMSTUDIO", "False").lower() == "true" |
| use_hf_api = os.getenv("USE_HF_API", "False").lower() == "true" |
| hf_token = os.getenv("HUGGINGFACE_TOKEN", "") |
|
|
| if use_lmstudio: |
| return query_llm_lmstudio(full_prompt, max_tokens=2000) |
| elif use_hf_api and hf_token: |
| return query_llm_hf_api(full_prompt, max_tokens=1500) |
| else: |
| return query_llm_local(full_prompt, max_tokens=1500) |
| |
| |
| with ThreadPoolExecutor(max_workers=1) as executor: |
| future = executor.submit(generate) |
| try: |
| response = future.result(timeout=timeout) |
| log(f"LLM response received ({len(response)} chars)") |
|
|
| |
| structured_data = {} |
| clean_response = response |
| if extract_structured: |
| structured_data = parse_structured_response(response, interviewee_type) |
|
|
| |
| |
| prev_response = "" |
| while prev_response != clean_response: |
| prev_response = clean_response |
| clean_response = re.sub(r'\{[^{}]*\}', '', clean_response, flags=re.DOTALL) |
|
|
| |
| clean_response = re.sub(r'###\s*JSON\s*Structure:', '', clean_response, flags=re.IGNORECASE) |
| clean_response = re.sub(r'###\s*Analysis:', '', clean_response, flags=re.IGNORECASE) |
| clean_response = re.sub(r'###\s*Response:', '', clean_response, flags=re.IGNORECASE) |
| clean_response = re.sub(r'Please provide.*?structured JSON.*', '', clean_response, flags=re.IGNORECASE|re.DOTALL) |
|
|
| clean_response = clean_response.strip() |
| log(f"Cleaned response: {len(clean_response)} chars (removed JSON)") |
|
|
| return clean_response, structured_data |
| |
| except ThreadTimeout: |
| log("LLM generation timed out") |
| return "[Error] LLM generation timed out.", {} |
| except Exception as e: |
| log(f"LLM generation failed: {e}") |
| return f"[Error] LLM generation failed: {e}", {} |
| |
|
|
| def extract_structured_data(text: str, interviewee_type: str) -> Dict: |
| """ |
| Standalone function to extract structured data from existing text |
| Useful for post-processing |
| """ |
| return parse_structured_response(text, interviewee_type) |