| import gradio as gr |
| import os |
| from datetime import datetime |
| from typing import List, Dict, Tuple |
| from extractors import extract_docx, extract_pdf, validate_extraction |
| from tagging import tag_speakers_advanced |
| from chunking import chunk_text_semantic |
| from llm import query_llm, extract_structured_data |
| from reporting import generate_enhanced_csv, generate_enhanced_pdf |
| from dashboard import generate_comprehensive_dashboard |
| from validation import validate_transcript_quality, check_data_completeness |
|
|
| |
| try: |
| from logger import get_logger, LogContext |
| logger = get_logger() |
| except ImportError: |
| |
| import logging |
| logger = logging.getLogger(__name__) |
| logging.basicConfig(level=logging.INFO) |
| class LogContext: |
| def __init__(self, *args, **kwargs): pass |
| def __enter__(self): return self |
| def __exit__(self, *args): pass |
|
|
| try: |
| from redaction import PIIRedactor, redact_quotes, generate_redaction_report |
| HAS_REDACTION = True |
| except ImportError: |
| HAS_REDACTION = False |
| logger.warning("Redaction module not available - PII masking disabled") |
|
|
| |
| try: |
| from production_logger import init_session, ProductionLogger, PerformanceMonitor |
| HAS_PRODUCTION_LOGGING = True |
| except ImportError: |
| HAS_PRODUCTION_LOGGING = False |
| print("β οΈ Production logging not available - using basic logging") |
|
|
| |
| from contextlib import contextmanager |
|
|
| class ProductionLogger: |
| def __init__(self, session_id): |
| self.session_id = session_id |
| self.logger = self |
| def info(self, msg): |
| print(f"[INFO] {msg}") |
| def warning(self, msg): |
| print(f"[WARNING] {msg}") |
| def error(self, msg): |
| print(f"[ERROR] {msg}") |
| def log_warning(self, msg): |
| print(f"[WARNING] {msg}") |
| def log_transcript_start(self, file_name, file_type, interviewee_type): |
| print(f"[INFO] Processing started: {file_name}") |
| def log_transcript_complete(self, file_name, quality_score, word_count, processing_time): |
| print(f"[INFO] Processing complete: {file_name} | Quality: {quality_score:.2f}") |
| def log_transcript_error(self, file_name, error_type, error_details): |
| print(f"[ERROR] Processing failed: {file_name} - {error_type}") |
| def log_quote_extraction(self, quote_count, top_score, themes): |
| print(f"[INFO] Quote extraction complete: {quote_count} quotes") |
| def finalize_session(self): |
| print(f"[INFO] Session {self.session_id} complete") |
| return {} |
|
|
| class PerformanceMonitor: |
| def __init__(self, logger): |
| self.logger = logger |
| self.timers = {} |
| def start_timer(self, name): |
| import time |
| self.timers[name] = time.time() |
| def end_timer(self, name): |
| import time |
| if name in self.timers: |
| elapsed = time.time() - self.timers[name] |
| del self.timers[name] |
| return elapsed |
| return 0 |
| @contextmanager |
| def measure(self, name): |
| self.start_timer(name) |
| try: |
| yield |
| finally: |
| self.end_timer(name) |
|
|
| def init_session(session_id): |
| return ProductionLogger(session_id) |
|
|
| |
| try: |
| from quote_extractor import extract_quotes_from_results |
| HAS_QUOTE_EXTRACTION = True |
| except ImportError: |
| HAS_QUOTE_EXTRACTION = False |
| print("β οΈ Quote extraction not available - reports will not include storytelling quotes") |
| def extract_quotes_from_results(results, interviewee_type): |
| """Stub function when quote_extractor is not available""" |
| return {"quotes": [], "themes": {}, "top_quotes": []} |
|
|
| |
| try: |
| from validation import verify_consensus_claims, validate_summary_quality |
| HAS_ENHANCED_VALIDATION = True |
| except ImportError: |
| HAS_ENHANCED_VALIDATION = False |
| print("β οΈ Enhanced validation functions not available - using basic validation only") |
|
|
| |
| def load_env_file(filepath='.env'): |
| """Manually load environment variables from .env file""" |
| if os.path.exists(filepath): |
| with open(filepath, 'r') as f: |
| for line in f: |
| line = line.strip() |
| |
| if line and not line.startswith('#'): |
| if '=' in line: |
| key, value = line.split('=', 1) |
| os.environ[key.strip()] = value.strip() |
| print(f"β
Loaded configuration from {filepath}") |
| return True |
| return False |
|
|
| |
| |
| |
|
|
| |
| if os.path.exists('.env'): |
| load_env_file('.env') |
| print("β
Loaded .env file (local development mode)") |
| else: |
| print("βΉοΈ No .env file found - using HuggingFace Spaces configuration") |
|
|
| |
| os.environ.setdefault("USE_HF_API", "False") |
| os.environ.setdefault("USE_LMSTUDIO", "False") |
| os.environ.setdefault("DEBUG_MODE", os.getenv("DEBUG_MODE", "False")) |
| os.environ.setdefault("LLM_BACKEND", "local") |
| os.environ.setdefault("LLM_TIMEOUT", "120") |
| os.environ.setdefault("MAX_TOKENS_PER_REQUEST", "1500") |
| os.environ.setdefault("LLM_TEMPERATURE", "0.7") |
|
|
| print("β
Configuration loaded for HuggingFace Spaces") |
|
|
| |
| |
| is_hf_spaces = not os.path.exists('.env') and (os.getenv('SPACE_ID') or os.getenv('SYSTEM') == 'spaces') |
| hf_token = os.getenv("HUGGINGFACE_TOKEN", "") |
|
|
| if is_hf_spaces or not os.path.exists('.env'): |
| |
| if hf_token: |
| print("π Detected cloud/Spaces environment - forcing HF API mode for best performance...") |
| os.environ["USE_HF_API"] = "True" |
| os.environ["USE_LMSTUDIO"] = "False" |
| os.environ["LLM_BACKEND"] = "hf_api" |
| os.environ["LLM_TIMEOUT"] = "180" |
| print("β
HF API mode enabled (local models disabled)") |
| else: |
| print("β οΈ WARNING: Running on cloud platform without HUGGINGFACE_TOKEN!") |
| print(" Local models will likely timeout. Please add HUGGINGFACE_TOKEN in Settings.") |
| print(" Get token from: https://huggingface.co/settings/tokens") |
| |
| os.environ["LLM_TIMEOUT"] = "300" |
|
|
| print(f"π TranscriptorAI Enterprise - LLM Backend: {os.getenv('LLM_BACKEND')}") |
| print(f"π§ USE_HF_API: {os.getenv('USE_HF_API')}") |
| print(f"π§ USE_LMSTUDIO: {os.getenv('USE_LMSTUDIO')}") |
| print(f"π§ DEBUG_MODE: {os.getenv('DEBUG_MODE')}") |
| print(f"π§ LLM_TIMEOUT: {os.getenv('LLM_TIMEOUT')}s") |
|
|
| def analyze(files, file_type, user_comments, role_hint, debug_mode, interviewee_type, |
| enable_pii_redaction, redaction_level, progress=gr.Progress()): |
| """ |
| Enhanced analysis pipeline with robust error handling, validation, production logging, |
| and optional PII redaction |
| |
| Args: |
| files: Uploaded transcript files |
| file_type: DOCX or PDF |
| user_comments: User analysis instructions |
| role_hint: Speaker role mapping |
| debug_mode: Enable debug output |
| interviewee_type: HCP, Patient, or Other |
| enable_pii_redaction: Whether to redact PII from outputs |
| redaction_level: strict, moderate, or minimal |
| progress: Gradio progress tracker |
| """ |
| |
| session_id = datetime.now().strftime("%Y%m%d_%H%M%S") |
| prod_logger = init_session(session_id) |
| perf_monitor = PerformanceMonitor(prod_logger) |
|
|
| prod_logger.logger.info(f"="*80) |
| prod_logger.logger.info(f"NEW ANALYSIS SESSION: {session_id}") |
| prod_logger.logger.info(f"Files: {len(files)} | Type: {file_type} | Interviewee: {interviewee_type}") |
| prod_logger.logger.info(f"="*80) |
|
|
| os.environ["DEBUG_MODE"] = str(debug_mode) |
|
|
| if not files: |
| prod_logger.log_warning("No files uploaded") |
| return "Error: No files uploaded", None, None, None |
|
|
| all_results = [] |
| csv_rows = [] |
| processing_errors = [] |
|
|
| progress(0, desc="Initializing...") |
| print(f"[Start] Processing {len(files)} file(s) as {file_type}") |
| |
| |
| interviewee_context = { |
| "HCP": { |
| "focus": "clinical reasoning, peer communication, medical expertise, prescribing patterns", |
| "extract": ["diagnoses", "treatment_rationale", "clinical_decisions", "prescriptions", "guidelines_mentioned"] |
| }, |
| "Patient": { |
| "focus": "symptoms, concerns, emotional state, treatment understanding, adherence", |
| "extract": ["symptoms", "concerns", "treatment_response", "quality_of_life", "side_effects"] |
| }, |
| "Other": { |
| "focus": "context-dependent insights, relevant observations", |
| "extract": ["key_insights", "context", "recommendations"] |
| } |
| }.get(interviewee_type, {}) |
| |
| |
| user_context = f""" |
| Interviewee Type: {interviewee_type} |
| Analysis Focus: {interviewee_context.get('focus', 'general insights')} |
| Key Data Points to Extract: {', '.join(interviewee_context.get('extract', []))} |
| |
| Additional Instructions: |
| {user_comments} |
| """.strip() |
| |
| total_steps = len(files) * 4 + 2 |
| current_step = 0 |
| |
| for i, file in enumerate(files): |
| file_name = os.path.basename(file.name) |
| prod_logger.log_transcript_start(file_name, file_type, interviewee_type) |
| perf_monitor.start_timer(f"transcript_{i+1}_processing") |
|
|
| try: |
| |
| progress((current_step / total_steps), desc=f"Extracting {file_name}...") |
| print(f"[File {i+1}/{len(files)}] Extracting: {file_name}") |
| |
| raw_text = extract_docx(file) if file_type == "DOCX" else extract_pdf(file) |
| current_step += 1 |
| |
| |
| progress((current_step / total_steps), desc=f"Validating {file_name}...") |
| is_valid, validation_msg = validate_extraction(raw_text, file_name) |
| if not is_valid: |
| raise ValueError(f"Extraction validation failed: {validation_msg}") |
| |
| print(f"[File {i+1}] Extracted {len(raw_text)} characters - Valid: {validation_msg}") |
| current_step += 1 |
| |
| |
| progress((current_step / total_steps), desc=f"Analyzing speakers in {file_name}...") |
| tagged_text = tag_speakers_advanced(raw_text, role_hint, interviewee_type) |
| print(f"[File {i+1}] Tagged {len(tagged_text)} characters") |
| current_step += 1 |
| |
| |
| progress((current_step / total_steps), desc=f"Processing {file_name}...") |
| chunks = chunk_text_semantic(tagged_text, interviewee_type) |
| print(f"[File {i+1}] Created {len(chunks)} semantic chunk(s)") |
| current_step += 1 |
| |
| |
| transcript_result = [] |
| structured_data = {} |
| |
| for j, chunk in enumerate(chunks): |
| chunk_progress = (current_step + (j / len(chunks))) / total_steps |
| progress(chunk_progress, desc=f"Analyzing {file_name} ({j+1}/{len(chunks)})...") |
| |
| result, chunk_data = query_llm( |
| chunk, |
| user_context, |
| interviewee_type, |
| extract_structured=True |
| ) |
|
|
| |
| if not isinstance(result, str): |
| print(f"[Warning] LLM result is not a string (type: {type(result)}), converting...") |
| if isinstance(result, dict): |
| result = str(result.get('content', str(result))) |
| else: |
| result = str(result) |
|
|
| |
| if result and isinstance(result, str) and len(result.strip()) > 0: |
| transcript_result.append(result) |
| else: |
| print(f"[Warning] Skipping empty/invalid result for chunk {j+1}") |
|
|
| |
| for key, value in chunk_data.items(): |
| if key not in structured_data: |
| structured_data[key] = [] |
| if isinstance(value, list): |
| structured_data[key].extend(value) |
| else: |
| structured_data[key].append(value) |
| |
| current_step += 1 |
|
|
| |
| |
| cleaned_results = [] |
| for idx, item in enumerate(transcript_result): |
| if isinstance(item, str): |
| cleaned_results.append(item) |
| else: |
| print(f"[Warning] Removing non-string item at index {idx}: {type(item)}") |
| |
| if isinstance(item, dict) and 'content' in item: |
| cleaned_results.append(str(item['content'])) |
| |
|
|
| full_text = "\n\n".join(cleaned_results) |
| |
| |
| quality_score, quality_issues = validate_transcript_quality( |
| full_text, |
| structured_data, |
| interviewee_type |
| ) |
| |
| if quality_score < 0.3: |
| print(f"[Warning] Low quality score ({quality_score:.2f}) for {file_name}: {quality_issues}") |
| processing_errors.append(f"{file_name}: Low quality - {quality_issues}") |
| |
| all_results.append({ |
| "transcript_id": f"Transcript {i+1}", |
| "file_name": file_name, |
| "full_text": full_text, |
| "structured_data": structured_data, |
| "quality_score": quality_score, |
| "word_count": len(raw_text.split()) |
| }) |
| |
| |
| csv_row = { |
| "Transcript ID": f"Transcript {i+1}", |
| "File Name": file_name, |
| "Quality Score": f"{quality_score:.2f}", |
| "Word Count": len(raw_text.split()), |
| } |
| |
| |
| def safe_join(items): |
| """Convert all items to strings before joining""" |
| str_items = [] |
| for item in items: |
| if isinstance(item, str): |
| str_items.append(item) |
| elif isinstance(item, dict): |
| |
| |
| if "name" in item: |
| str_items.append(str(item["name"])) |
| elif "condition" in item: |
| |
| cond = item["condition"] |
| if "severity" in item: |
| str_items.append(f"{cond} ({item['severity']})") |
| else: |
| str_items.append(cond) |
| else: |
| |
| str_items.append(str(item)) |
| else: |
| str_items.append(str(item)) |
| return "; ".join(str_items) |
|
|
| |
| if interviewee_type == "HCP": |
| csv_row.update({ |
| "Diagnoses": safe_join(structured_data.get("diagnoses", [])), |
| "Prescriptions": safe_join(structured_data.get("prescriptions", [])), |
| "Treatment Strategies": safe_join(structured_data.get("treatment_rationale", [])), |
| "Guidelines Mentioned": safe_join(structured_data.get("guidelines_mentioned", [])) |
| }) |
| elif interviewee_type == "Patient": |
| csv_row.update({ |
| "Primary Symptoms": safe_join(structured_data.get("symptoms", [])), |
| "Main Concerns": safe_join(structured_data.get("concerns", [])), |
| "Treatment Response": safe_join(structured_data.get("treatment_response", [])), |
| "Side Effects": safe_join(structured_data.get("side_effects", [])) |
| }) |
| else: |
| csv_row.update({ |
| "Key Insights": safe_join(structured_data.get("key_insights", [])), |
| "Recommendations": safe_join(structured_data.get("recommendations", [])) |
| }) |
| |
| csv_rows.append(csv_row) |
|
|
| |
| processing_time = perf_monitor.end_timer(f"transcript_{i+1}_processing") |
| prod_logger.log_transcript_complete(file_name, quality_score, len(raw_text.split()), processing_time) |
|
|
| print(f"[File {i+1}] β Processing complete") |
|
|
| except Exception as e: |
| |
| import traceback |
| error_type = type(e).__name__ |
| error_details = str(e) |
| error_traceback = traceback.format_exc() |
|
|
| error_msg = f"[{error_type}] {file_name}: {error_details}" |
| print(error_msg) |
|
|
| |
| perf_monitor.end_timer(f"transcript_{i+1}_processing") |
| prod_logger.log_transcript_error(file_name, error_type, error_details[:200]) |
|
|
| |
| processing_errors.append({ |
| "transcript_id": f"Transcript {i+1}", |
| "file_name": file_name, |
| "error_type": error_type, |
| "error_message": error_details[:200], |
| "timestamp": datetime.now().isoformat() |
| }) |
|
|
| all_results.append({ |
| "transcript_id": f"Transcript {i+1}", |
| "file_name": file_name, |
| "full_text": error_msg, |
| "structured_data": {}, |
| "quality_score": 0.0, |
| "word_count": 0, |
| "processing_status": "FAILED", |
| "error_type": error_type |
| }) |
|
|
| |
| csv_rows.append({ |
| "Transcript ID": f"Transcript {i+1}", |
| "File Name": file_name, |
| "Quality Score": 0.0, |
| "Word Count": 0, |
| "Processing Status": "FAILED", |
| "Error Type": error_type, |
| "Error Message": error_details[:100] |
| }) |
| |
| |
| try: |
| progress(0.9, desc="Generating summary and reports...") |
| print("[Summary] Analyzing trends across transcripts") |
|
|
| |
| valid_results = [r for r in all_results if r["quality_score"] > 0] |
|
|
| if not valid_results: |
| return "Error: No transcripts were successfully processed", None, None, None |
|
|
| |
| print("[Quotes] Extracting impactful quotes from transcripts...") |
| with perf_monitor.measure("quote_extraction"): |
| quotes_data = extract_quotes_from_results(valid_results, interviewee_type) |
|
|
| top_score = quotes_data['top_quotes'][0]['impact_score'] if quotes_data['top_quotes'] else 0 |
| themes = list(quotes_data['by_theme'].keys()) |
| prod_logger.log_quote_extraction(len(quotes_data['all_quotes']), top_score, themes) |
|
|
| print(f"[Quotes] Extracted {len(quotes_data['all_quotes'])} quotes, top impact score: {top_score:.2f}" if quotes_data['top_quotes'] else "[Quotes] No quotes extracted") |
|
|
| |
| if enable_pii_redaction and HAS_REDACTION: |
| logger.info(f"Applying PII redaction (level: {redaction_level})") |
|
|
| |
| if quotes_data['all_quotes']: |
| quotes_data['all_quotes'] = redact_quotes(quotes_data['all_quotes'], redaction_level) |
| quotes_data['top_quotes'] = [q for q in quotes_data['all_quotes'] if q.get('impact_score', 0) > 0] |
| quotes_data['top_quotes'].sort(key=lambda x: x['impact_score'], reverse=True) |
| quotes_data['top_quotes'] = quotes_data['top_quotes'][:20] |
|
|
| |
| redactor = PIIRedactor(redaction_level) |
| total_redactions = {"total": 0} |
|
|
| for result in valid_results: |
| redacted_text, redaction_report = redactor.redact_text(result['full_text']) |
| result['full_text'] = redacted_text |
| result['redaction_report'] = redaction_report |
| total_redactions['total'] += sum(redaction_report.values()) |
|
|
| logger.success(f"Redacted {total_redactions['total']} PII items across {len(valid_results)} transcripts") |
| elif enable_pii_redaction and not HAS_REDACTION: |
| logger.warning("PII redaction requested but redaction module not available!") |
| |
| |
| |
| try: |
| from summarizer_enhanced import ( |
| hierarchical_summarize, |
| enhance_summary_with_quotes, |
| validate_summary_consensus |
| ) |
| use_hierarchical = True |
| print("[Summary] Using enhanced hierarchical summarization") |
| except ImportError: |
| use_hierarchical = False |
| print("[Summary] Using standard summarization (hierarchical not available)") |
|
|
| |
| summary_prompt = f""" |
| CROSS-INTERVIEW SYNTHESIS TASK |
| |
| SAMPLE: {len(valid_results)} {interviewee_type} transcripts |
| FOCUS AREAS: {interviewee_context.get('focus', 'general patterns')} |
| """ |
|
|
| |
| if quotes_data['top_quotes']: |
| summary_prompt += f""" |
| |
| TOP PARTICIPANT QUOTES (use these to bring findings to life): |
| """ |
| for i, quote in enumerate(quotes_data['top_quotes'][:10], 1): |
| summary_prompt += f"\n{i}. [{quote['theme'].upper()}] (from {quote['transcript_id']})\n \"{quote['text']}\"\n" |
|
|
| summary_prompt += """ |
| |
| COMPLETE TRANSCRIPT DATA: |
| """ |
|
|
| for idx, result in enumerate(valid_results, 1): |
| summary_prompt += f"\n{'='*60}\nTRANSCRIPT {idx}/{len(valid_results)}: {result['file_name']}\n{'='*60}\n" |
| summary_prompt += f"{result['full_text'][:2000]}\n" |
| |
| summary_prompt += f""" |
| |
| ANALYSIS REQUIREMENTS: |
| |
| 1. QUANTIFY EVERYTHING: |
| - Count participants: "X out of {len(valid_results)} participants mentioned..." |
| - Never use vague terms (many/most/some) |
| - Calculate percentages where relevant |
| |
| 2. INTEGRATE PARTICIPANT VOICE: |
| - Weave in quotes from the "TOP PARTICIPANT QUOTES" section above |
| - Use quotes to bring data to life and prove points |
| - Format as: "X out of {len(valid_results)} mentioned [finding]. As one {interviewee_type.lower()} described, '[quote]'" |
| - Include 3-5 quotes in your narrative |
| |
| 3. IDENTIFY PATTERNS BY CONSENSUS LEVEL: |
| - STRONG CONSENSUS (80%+ = {int(len(valid_results)*0.8)}+ transcripts agree) |
| - MAJORITY VIEW (60-79% = {int(len(valid_results)*0.6)}-{int(len(valid_results)*0.79)} transcripts) |
| - SPLIT PERSPECTIVES (40-59% = mixed views) |
| - MINORITY/OUTLIER (<40% but notable) |
| |
| 4. CROSS-VALIDATE: |
| - Check for contradictions between transcripts |
| - Note where perspectives diverge and why |
| - Flag any quality issues in individual transcripts |
| |
| 5. CITE EVIDENCE: |
| - Reference specific transcript numbers |
| - Brief supporting details |
| - Use participant quotes as proof points |
| - Distinguish verified facts from interpretation |
| |
| OUTPUT FORMAT: |
| Write 2-3 sentence executive overview WITH a compelling quote, then structure as: |
| |
| **STRONG CONSENSUS FINDINGS:** |
| - [Finding with count, supporting quote if available, and business implication] |
| |
| **MAJORITY FINDINGS:** |
| - [Finding with count and quote] |
| |
| **DIVERGENT PERSPECTIVES:** |
| - [Where views split, with quotes showing both sides if possible] |
| |
| **NOTABLE OUTLIERS:** |
| - [Unique but important points, use quote if impactful] |
| |
| **DATA QUALITY NOTES:** |
| - [Any gaps or transcript issues] |
| |
| CRITICAL: Integrate quotes naturally. Use participant voice to make findings memorable and credible. |
| Be specific. Use numbers. Cite transcript IDs. Flag weak evidence. |
| """ |
| |
| |
| print("[Summary] Generating cross-transcript summary...") |
| print("[Summary] Note: This may take 30-60 seconds for large datasets") |
|
|
| try: |
| from llm_robust import query_llm_with_timeout |
|
|
| if use_hierarchical and len(valid_results) > 3: |
| |
| print(f"[Summary] Using hierarchical approach for {len(valid_results)} transcripts") |
| summary, summary_data = hierarchical_summarize( |
| valid_results, |
| quotes_data, |
| interviewee_type, |
| interviewee_context, |
| query_llm_with_timeout, |
| user_context |
| ) |
|
|
| |
| summary = enhance_summary_with_quotes(summary, quotes_data, max_quotes=6) |
|
|
| |
| consensus_warnings = validate_summary_consensus(summary, valid_results) |
| if consensus_warnings: |
| print(f"[Summary] Consensus validation warnings: {len(consensus_warnings)}") |
| for warning in consensus_warnings[:3]: |
| print(f" - {warning}") |
| else: |
| |
| print("[Summary] Using standard single-pass summarization") |
| summary, summary_data = query_llm_with_timeout( |
| summary_prompt, |
| user_context, |
| interviewee_type, |
| extract_structured=False, |
| is_summary=True, |
| max_timeout=60 |
| ) |
| except Exception as e: |
| |
| print(f"[Summary] Critical error: {e}") |
| print("[Summary] Using emergency fallback...") |
| from llm_robust import generate_emergency_summary |
| summary, summary_data = generate_emergency_summary(interviewee_type) |
|
|
| |
| if not isinstance(summary, str): |
| print(f"[Warning] Summary is not a string (type: {type(summary)}), converting...") |
| if isinstance(summary, dict): |
| summary = str(summary.get('content', str(summary))) |
| else: |
| summary = str(summary) |
|
|
| |
| if HAS_ENHANCED_VALIDATION: |
| summary_score, summary_issues = validate_summary_quality( |
| summary, |
| len(valid_results) |
| ) |
| else: |
| summary_score = 1.0 |
| summary_issues = [] |
|
|
| if HAS_ENHANCED_VALIDATION and summary_score < 0.7: |
| print(f"[Warning] Summary quality issues (score: {summary_score:.2f}): {summary_issues}") |
| print("[Summary] Retrying with stricter validation...") |
|
|
| |
| retry_prompt = summary_prompt + f""" |
| |
| CRITICAL: Previous attempt failed validation with these issues: |
| {chr(10).join('- ' + issue for issue in summary_issues)} |
| |
| MANDATORY CORRECTIONS: |
| - Use ONLY specific numbers (e.g., "8 out of {len(valid_results)}" not "most") |
| - Include percentages in parentheses |
| - Cite transcript numbers for every claim |
| - Minimum length: 500 words |
| - No absolute terms (all/everyone/never/always) without 100% evidence |
| """ |
|
|
| try: |
| summary, summary_data = query_llm_with_timeout( |
| retry_prompt, |
| user_context, |
| interviewee_type, |
| extract_structured=False, |
| is_summary=True, |
| max_timeout=60 |
| ) |
| except Exception as e: |
| print(f"[Summary] Retry also failed: {e}") |
| print("[Summary] Using emergency fallback for retry...") |
| summary, summary_data = generate_emergency_summary(interviewee_type) |
|
|
| |
| if not isinstance(summary, str): |
| print(f"[Warning] Retry summary is not a string (type: {type(summary)}), converting...") |
| if isinstance(summary, dict): |
| summary = str(summary.get('content', str(summary))) |
| else: |
| summary = str(summary) |
|
|
| |
| summary_score, summary_issues = validate_summary_quality(summary, len(valid_results)) |
|
|
| if summary_score < 0.7: |
| |
| warning_header = f"""[QUALITY WARNING - Score: {summary_score:.2f}] |
| Validation issues detected: {'; '.join(summary_issues)} |
| Please review findings carefully and verify against source data. |
| |
| {'='*60} |
| |
| """ |
| summary = warning_header + summary |
| print(f"[Warning] Summary still has issues after retry (score: {summary_score:.2f})") |
| else: |
| print(f"[Summary] β Validation passed after retry (score: {summary_score:.2f})") |
| else: |
| print(f"[Summary] β Validation passed (score: {summary_score:.2f})") |
|
|
| |
| if HAS_ENHANCED_VALIDATION: |
| consensus_warnings = verify_consensus_claims(summary, valid_results) |
| if consensus_warnings: |
| print(f"[Warning] Consensus verification issues: {len(consensus_warnings)} found") |
| consensus_note = "\n\n[CONSENSUS VERIFICATION NOTES]:\n" + "\n".join(f"- {w}" for w in consensus_warnings) + "\n\n" |
| summary = summary + consensus_note |
| else: |
| print("[Summary] β Consensus claims verified") |
| else: |
| print("[Summary] β οΈ Consensus verification skipped (enhanced validation not available)") |
|
|
| |
| csv_path = generate_enhanced_csv(csv_rows, interviewee_type) |
| print(f"[CSV] β Saved to {csv_path}") |
| |
| pdf_path = generate_enhanced_pdf( |
| summary, |
| all_results, |
| interviewee_type, |
| processing_errors |
| ) |
| print(f"[PDF] β Saved to {pdf_path}") |
| |
| dashboard = generate_comprehensive_dashboard(csv_rows, interviewee_type) |
| print("[Dashboard] β Generated") |
| |
| |
| output_text = f"""# Analysis Complete |
| |
| ## Summary of Findings |
| {summary} |
| |
| ## Processing Statistics |
| - Total Files: {len(files)} |
| - Successfully Processed: {len(valid_results)} |
| - Failed: {len(processing_errors)} |
| - Average Quality Score: {sum(r['quality_score'] for r in valid_results) / len(valid_results):.2f} |
| |
| """ |
| |
| if processing_errors: |
| |
| error_messages = [] |
| for err in processing_errors: |
| if isinstance(err, dict): |
| |
| error_msg = f"{err.get('transcript_id', 'Unknown')} ({err.get('file_name', 'unknown')}): {err.get('error_type', 'Error')} - {err.get('error_message', 'Unknown error')}" |
| error_messages.append(error_msg) |
| else: |
| error_messages.append(str(err)) |
| output_text += f"\n## Processing Errors\n" + "\n".join(f"- {msg}" for msg in error_messages) |
| |
| output_text += "\n\n---\n\n## Individual Transcript Results\n\n" |
| |
| for result in all_results: |
| output_text += f"### {result['transcript_id']} - {result['file_name']}\n" |
| output_text += f"Quality Score: {result['quality_score']:.2f} | Words: {result['word_count']}\n\n" |
| output_text += result['full_text'] + "\n\n---\n\n" |
| |
| progress(1.0, desc="Complete!") |
|
|
| |
| session_summary = prod_logger.finalize_session() |
| prod_logger.logger.info(f"Session logs saved to: logs/session_{session_id}.*") |
|
|
| return output_text, csv_path, pdf_path, dashboard |
|
|
| except Exception as e: |
| error_msg = f"[Fatal Error] Summary or report generation failed: {str(e)}" |
| print(error_msg) |
| import traceback |
| traceback.print_exc() |
|
|
| prod_logger.log_transcript_error("SUMMARY_GENERATION", type(e).__name__, str(e)) |
| prod_logger.finalize_session() |
|
|
| return error_msg, None, None, None |
|
|
| def generate_narrative_report_ui(csv_file, summary_text, interviewee_type, report_style): |
| """ |
| Wrapper function for Gradio UI to generate narrative reports |
| """ |
| try: |
| from narrative_report_generator import generate_narrative_report |
| import tempfile |
| import os |
|
|
| |
| if csv_file is None: |
| return "Error: No CSV file provided. Please run analysis first.", None, None, None |
|
|
| |
| summary_path = None |
| if summary_text and summary_text.strip(): |
| with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f: |
| f.write(summary_text) |
| summary_path = f.name |
|
|
| |
| llm_backend = "lmstudio" if os.getenv("USE_LMSTUDIO", "False").lower() == "true" else "hf_api" |
|
|
| |
| pdf_path, word_path, html_path = generate_narrative_report( |
| csv_path=csv_file.name if hasattr(csv_file, 'name') else csv_file, |
| summary_path=summary_path, |
| interviewee_type=interviewee_type, |
| report_style=report_style, |
| llm_backend=llm_backend |
| ) |
| |
| |
| if summary_path and os.path.exists(summary_path): |
| os.remove(summary_path) |
| |
| return ( |
| f"β Narrative reports generated successfully!\n\nPDF: {pdf_path}\nWord: {word_path}\nHTML: {html_path}", |
| pdf_path, |
| word_path, |
| html_path |
| ) |
| |
| except Exception as e: |
| import traceback |
| error_detail = traceback.format_exc() |
| return f"Error generating narrative report: {str(e)}\n\n{error_detail}", None, None, None |
|
|
|
|
| with gr.Blocks(theme=gr.themes.Soft()) as demo: |
| gr.Markdown(""" |
| # π― TranscriptorAI - Enterprise Transcript Analyzer |
| |
| Upload multiple transcripts and generate comprehensive, structured insights with advanced AI analysis. |
| """) |
| |
| with gr.Tabs(): |
|
|
| with gr.TabItem("π Transcript Analysis"): |
| with gr.Row(): |
| with gr.Column(scale=1): |
| files = gr.File( |
| label="π Upload Transcripts", |
| file_types=[".docx", ".pdf"], |
| file_count="multiple" |
| ) |
| file_type = gr.Radio( |
| ["DOCX", "PDF"], |
| label="File Type", |
| value="DOCX" |
| ) |
| interviewee_type = gr.Radio( |
| ["HCP", "Patient", "Other"], |
| label="Interviewee Type", |
| value="Patient", |
| info="Select the type of person being interviewed" |
| ) |
| |
| with gr.Column(scale=1): |
| user_comments = gr.Textbox( |
| label="Analysis Instructions", |
| lines=6, |
| placeholder="Enter specific analysis goals, questions to answer, or context...", |
| info="Provide guidance for the AI analyzer" |
| ) |
| role_hint = gr.Textbox( |
| label="Speaker Role Mapping (Optional)", |
| placeholder="e.g., Speaker 1 = Interviewer, Speaker 2 = Doctor", |
| info="Help identify speakers if needed" |
| ) |
| |
| with gr.Row(): |
| debug_mode = gr.Checkbox(label="π Enable Debug Mode", value=False) |
|
|
| with gr.Row(): |
| with gr.Column(): |
| enable_pii_redaction = gr.Checkbox( |
| label="π Enable PII Redaction", |
| value=False, |
| info="Mask sensitive information (names, dates, SSN, emails, etc.)" |
| ) |
| with gr.Column(): |
| redaction_level = gr.Radio( |
| ["minimal", "moderate", "strict"], |
| label="Redaction Level", |
| value="moderate", |
| info="minimal=IDs only, moderate=common PII, strict=all PII including names" |
| ) |
|
|
| with gr.Row(): |
| gr.Markdown(""" |
| **β οΈ IMPORTANT PRIVACY NOTICE:** |
| - If using real patient/healthcare data, ALWAYS enable PII redaction |
| - Private HF Spaces are NOT HIPAA-compliant - use de-identified data only |
| - For HIPAA compliance, deploy on your own HIPAA-certified infrastructure |
| """) |
|
|
| with gr.Row(): |
| analyze_btn = gr.Button("π Analyze Transcripts", variant="primary", scale=2) |
| |
| with gr.Row(): |
| output_text = gr.Textbox(label="π Analysis Report", lines=40) |
| |
| with gr.Row(): |
| csv_output = gr.File(label="π₯ Download CSV") |
| pdf_output = gr.File(label="π₯ Download PDF") |
| |
| with gr.Row(): |
| dashboard_output = gr.Plot(label="π Dashboard Visualization") |
| |
| analyze_btn.click( |
| fn=analyze, |
| inputs=[files, file_type, user_comments, role_hint, debug_mode, interviewee_type, |
| enable_pii_redaction, redaction_level], |
| outputs=[output_text, csv_output, pdf_output, dashboard_output] |
| ) |
| |
| |
| with gr.TabItem("π Narrative Report"): |
| gr.Markdown(""" |
| ## Generate Storytelling Report |
| |
| Transform your analysis into a narrative report with: |
| - Executive summary with key insights |
| - Data-driven storytelling |
| - Professional formatting (PDF, Word, HTML) |
| - Actionable recommendations |
| |
| **Instructions:** First run the analysis in the previous tab, then use the outputs here to generate a narrative report. |
| """) |
| |
| with gr.Row(): |
| with gr.Column(): |
| narrative_csv = gr.File( |
| label="CSV Output from Analysis", |
| file_types=[".csv"] |
| ) |
| narrative_summary = gr.Textbox( |
| label="Copy/Paste Summary Text from Analysis (Optional)", |
| lines=10, |
| placeholder="Paste the executive summary text here..." |
| ) |
| |
| with gr.Column(): |
| narrative_interviewee_type = gr.Radio( |
| ["HCP", "Patient", "Other"], |
| label="Interviewee Type", |
| value="Patient" |
| ) |
| narrative_report_style = gr.Radio( |
| ["executive", "detailed", "presentation"], |
| label="Report Style", |
| value="executive", |
| info="Executive = concise C-level report, Detailed = thorough analysis, Presentation = slide-ready" |
| ) |
| generate_narrative_btn = gr.Button("π Generate Narrative Report", variant="primary") |
| |
| narrative_status = gr.Textbox(label="Status", lines=5) |
| |
| with gr.Row(): |
| narrative_pdf_output = gr.File(label="π₯ Download PDF Report") |
| narrative_word_output = gr.File(label="π₯ Download Word Report") |
| narrative_html_output = gr.File(label="π₯ Download HTML Report") |
| |
| generate_narrative_btn.click( |
| fn=generate_narrative_report_ui, |
| inputs=[narrative_csv, narrative_summary, narrative_interviewee_type, narrative_report_style], |
| outputs=[narrative_status, narrative_pdf_output, narrative_word_output, narrative_html_output] |
| ) |
| |
| |
| with gr.TabItem("β Help"): |
| gr.Markdown(""" |
| ### Quick Start Guide |
| |
| **Step 1: Analyze Transcripts** |
| 1. Upload your DOCX or PDF files |
| 2. Select interviewee type (HCP, Patient, or Other) |
| 3. Add analysis instructions |
| 4. Click "Analyze Transcripts" |
| 5. Download CSV, PDF, and view dashboard |
| |
| **Step 2: Generate Narrative Report (Optional)** |
| 1. Go to "Narrative Report" tab |
| 2. Upload the CSV from Step 1 |
| 3. Optionally paste the summary text |
| 4. Select report style |
| 5. Click "Generate Narrative Report" |
| 6. Download PDF, Word, or HTML versions |
| |
| ### Tips |
| - **CSV Upload**: Download the CSV from analysis, then upload it to narrative report generator |
| - **Summary Text**: Copy from the "Analysis Report" textbox and paste into narrative generator |
| - **Report Styles**: |
| - **Executive**: Best for C-level, investors, decision-makers |
| - **Detailed**: Best for researchers, comprehensive analysis |
| - **Presentation**: Best for slides, briefings, quick overviews |
| |
| ### LLM Configuration |
| - Set `USE_LMSTUDIO=True` to use your local LM Studio |
| - Set `HUGGINGFACE_TOKEN` to use HF API for faster processing |
| - Default: Uses local model (slower but free) |
| |
| ### Support |
| For issues, check the console output or enable debug mode. |
| """) |
| |
| gr.Markdown(""" |
| --- |
| **TranscriptorAI** | Enterprise-grade transcript analysis with narrative reporting |
| """) |
|
|
| if __name__ == "__main__": |
| demo.queue( |
| max_size=10, |
| api_open=False |
| ).launch( |
| server_name="0.0.0.0", |
| server_port=7860, |
| show_error=True |
| ) |
|
|