Spaces:
Sleeping
Sleeping
| # src/streamlit_app.py | |
| import os | |
| import re | |
| import tempfile | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| import streamlit as st | |
| import requests | |
| import logging | |
| import traceback | |
| import io | |
| import time | |
| from pydub.utils import mediainfo | |
| from datetime import datetime | |
| from typing import Optional, Tuple, Dict, Any | |
| from transformers import pipeline | |
| import torch | |
| from predict import predict, load_model | |
| from config import config | |
| # ========================= | |
| # API CONFIGURATION | |
| # ========================= | |
| def get_api_base(): | |
| """Get API base URL from secrets or environment""" | |
| try: | |
| api_base = st.secrets.get("API_BASE", "") | |
| if api_base: | |
| return api_base | |
| except: | |
| pass | |
| return os.getenv("API_BASE", "http://localhost:5000") | |
| API_BASE = get_api_base() | |
| def check_api_health(): | |
| """Check if Flask API is healthy""" | |
| try: | |
| response = requests.get(f"{API_BASE}/healthz", timeout=5) | |
| if response.status_code == 200: | |
| return True, response.json() | |
| else: | |
| return False, {"error": f"HTTP {response.status_code}"} | |
| except requests.exceptions.RequestException as e: | |
| return False, {"error": str(e)} | |
| def send_analysis_to_api(analysis_data): | |
| """Send analysis results to Flask API for storage""" | |
| try: | |
| response = requests.post(f"{API_BASE}/api/analysis", | |
| json=analysis_data, | |
| timeout=10) | |
| return response.status_code == 200 | |
| except requests.exceptions.RequestException as e: | |
| logger.error(f"Failed to send analysis to API: {e}") | |
| return False | |
| # Access keywords via config instance | |
| DRUG_KEYWORDS = config.DRUG_KEYWORDS | |
| HIGH_RISK_KEYWORDS = config.HIGH_RISK_KEYWORDS | |
| from utils import ( | |
| logger, | |
| security_manager, | |
| file_manager, | |
| model_manager, | |
| setup_production_logging, | |
| AudioValidator, | |
| is_valid_audio | |
| ) | |
| # Load model once at app startup | |
| load_model(config.MODEL_PATH) | |
| # Additional context patterns for better detection | |
| DRUG_CONTEXT_PATTERNS = [ | |
| r'(?i)(picked?\s*(it|them)\s*up|got\s*the\s*(stuff|package|goods))', | |
| r'(?i)(meet\s*(at|near|behind)|behind\s*the\s*(metro|station))', | |
| r'(?i)(too\s*risky|cops?\s*(were|are)\s*there)', | |
| r'(?i)(same\s*source|better\s*this\s*time)', | |
| r'(?i)(payment|pay|crypto|money|cash)\s*(through|via|using)', | |
| r'(?i)(bringing|getting|delivery)', | |
| r'(?i)(saturday|party|rave)', | |
| r'(?i)(mumbai|supplier)', | |
| r'(?i)(straight\s*from|coming\s*from)' | |
| ] | |
| def load_whisper_model(): | |
| """Load Whisper model using Transformers pipeline - HF Spaces compatible""" | |
| try: | |
| device = 0 if torch.cuda.is_available() else -1 | |
| transcriber = pipeline( | |
| "automatic-speech-recognition", | |
| model="openai/whisper-tiny.en", | |
| device=device, | |
| return_timestamps=False | |
| ) | |
| logger.info("Loaded Whisper model via transformers pipeline") | |
| return transcriber | |
| except Exception as e: | |
| logger.error(f"Failed to load Whisper model: {e}") | |
| st.error("Failed to load speech recognition model. Please contact system administrator.") | |
| st.stop() | |
| def transcribe_audio_production(model, audio_path: str, progress_callback=None) -> str: | |
| """Transcribe audio using transformers pipeline""" | |
| try: | |
| if progress_callback: | |
| progress_callback(50, "Transcribing audio...") | |
| result = model(audio_path) | |
| transcription = result.get("text", "").strip() | |
| if progress_callback: | |
| progress_callback(80, "Transcription completed") | |
| if not transcription: | |
| logger.warning("Empty transcription result") | |
| return "" | |
| logger.info(f"Transcription completed: {len(transcription)} characters") | |
| return transcription | |
| except Exception as e: | |
| logger.error(f"Transcription failed: {e}") | |
| raise e | |
| import re | |
| from sentence_transformers import SentenceTransformer, util | |
| # Load sentence embedding model (for semantic similarity) | |
| model = SentenceTransformer('all-MiniLM-L6-v2') | |
| def simulate_conversation_voice_based(transcribed_text: str, similarity_threshold: float = 0.7) -> str: | |
| """ | |
| Enhanced voice-based conversation simulation. | |
| Detects speaker changes using: | |
| - Heuristic patterns | |
| - Punctuation and prosody | |
| - Semantic similarity between sentences | |
| """ | |
| if not transcribed_text: | |
| return "" | |
| # Split transcript into sentences | |
| sentences = re.split(r'(?<=[?.!β¦])\s+', transcribed_text.strip()) | |
| # Voice change patterns (expanded fillers, discourse markers, interjections) | |
| voice_change_patterns = [ | |
| r'\b(yeah|yes|okay|alright|sure|no|nah|uh|um|hmm|oh)\b', | |
| r'\b(but|however|actually|wait|hold on|well|anyway|so|then|now|listen|look|see|hey)\b', | |
| r'\b(what|how|when|where|why|who)\b', | |
| r'\b(i think|i mean|you know|like|I guess|maybe)\b' | |
| ] | |
| # Compile regex once for speed | |
| voice_change_regex = re.compile("|".join(voice_change_patterns), re.IGNORECASE) | |
| convo_lines = [] | |
| current_speaker = "Speaker_A" | |
| for i, sentence in enumerate(sentences): | |
| if not sentence.strip(): | |
| continue | |
| speaker_change = False | |
| # Heuristic: voice change indicators | |
| if voice_change_regex.search(sentence): | |
| if i > 0 and convo_lines: | |
| if re.match(r'^\s*(yeah|yes|okay|but|what|no|hey|well|listen|um|uh|oh)', sentence.strip(), re.IGNORECASE): | |
| speaker_change = True | |
| # Additional heuristics | |
| if i > 0 and not speaker_change: | |
| prev_sentence = sentences[i-1] if i-1 < len(sentences) else "" | |
| # Punctuation-based: previous sentence ends with question, current starts like an answer | |
| if prev_sentence.endswith("?") and re.match(r'^\s*(yeah|yes|no|sure|maybe|i|uh|um)', sentence.strip(), re.IGNORECASE): | |
| speaker_change = True | |
| # Semantic similarity check | |
| try: | |
| sim = util.cos_sim(model.encode(prev_sentence), model.encode(sentence)).item() | |
| if sim < similarity_threshold: | |
| speaker_change = True | |
| except: | |
| pass # fallback if embedding fails | |
| # Length + transition word heuristic | |
| if re.search(r'\b(anyway|so|well|now|then|alright|listen)\b', sentence.strip(), re.IGNORECASE) and len(sentence.split()) > 3: | |
| speaker_change = True | |
| # Switch speaker if change detected | |
| if speaker_change: | |
| current_speaker = "Speaker_B" if current_speaker == "Speaker_A" else "Speaker_A" | |
| convo_lines.append(f"{current_speaker}: {sentence.strip()}") | |
| return "\n".join(convo_lines) | |
| def highlight_drug_lines_html(conversation_text: str, keywords: list) -> Tuple[str, Dict]: | |
| """ENHANCED version with regex word boundary matching and context-aware keyword detection""" | |
| if not conversation_text: | |
| return "", {} | |
| AMBIGUOUS_TERMS = {"e", "x", "line", "ice", "horse", "420"} | |
| def has_context_verbs(text): | |
| """Check if drug-related verbs are present in the text around slang keywords""" | |
| return bool(re.search(r'\b(smoke|roll|pop|hit|take|buy|sell|party|snort|inject)\b', text, re.IGNORECASE)) | |
| def is_keyword_in_line(line: str, kw: str) -> bool: | |
| """Return True if keyword found with appropriate context for ambiguous slang""" | |
| pattern = rf'\b{re.escape(kw)}\b' | |
| if re.search(pattern, line, re.IGNORECASE): | |
| if kw in AMBIGUOUS_TERMS: | |
| return has_context_verbs(line) | |
| return True | |
| return False | |
| lines = conversation_text.split("\n") | |
| line_hits = {} | |
| highlighted_lines = [] | |
| total_keyword_matches = 0 | |
| for line in lines: | |
| hits = [] | |
| for kw in sorted(keywords, key=len, reverse=True): | |
| if ' ' in kw: # Multi-word keywords | |
| pattern = rf'\b{re.escape(kw)}\b' | |
| if re.search(pattern, line, re.IGNORECASE): | |
| hits.append(kw) | |
| total_keyword_matches += 1 | |
| else: # Single word keywords | |
| if is_keyword_in_line(line, kw): | |
| hits.append(kw) | |
| total_keyword_matches += 1 | |
| if hits: | |
| highlighted_lines.append(f"<p style='color:#e57373'><b>[DRUG]</b> {line}</p>") | |
| line_hits[line] = hits | |
| else: | |
| highlighted_lines.append(f"<p>{line}</p>") | |
| logger.info(f"Keyword detection: {total_keyword_matches} matches across {len(line_hits)} lines") | |
| return "".join(highlighted_lines), line_hits | |
| def compute_enhanced_drug_score(text: str, conversation_text: str, detected_keywords: Dict) -> Tuple[float, int, int]: | |
| """ENHANCED drug detection scoring""" | |
| try: | |
| # Count keywords from detected_keywords | |
| high_risk_count = 0 | |
| total_keyword_count = 0 | |
| for line_keywords in detected_keywords.values(): | |
| total_keyword_count += len(line_keywords) | |
| for kw in line_keywords: | |
| if kw.lower() in [hr.lower() for hr in HIGH_RISK_KEYWORDS]: | |
| high_risk_count += 1 | |
| # Check full text for missed keywords | |
| text_lower = text.lower() | |
| additional_high_risk = sum(1 for kw in HIGH_RISK_KEYWORDS if kw.lower() in text_lower) | |
| additional_total = sum(1 for kw in DRUG_KEYWORDS if kw.lower() in text_lower) | |
| # Use the higher count | |
| high_risk_count = max(high_risk_count, additional_high_risk) | |
| total_keyword_count = max(total_keyword_count, additional_total) | |
| # Keyword density | |
| total_words = len(text.split()) | |
| keyword_density = total_keyword_count / max(total_words, 1) | |
| # Context pattern scoring | |
| context_score = 0 | |
| matched_patterns = 0 | |
| for pattern in DRUG_CONTEXT_PATTERNS: | |
| if re.search(pattern, text): | |
| context_score += 0.15 | |
| matched_patterns += 1 | |
| # Enhanced scoring calculation | |
| enhanced_score = 0 | |
| if high_risk_count > 0: | |
| enhanced_score += min(high_risk_count * 0.4, 0.8) | |
| enhanced_score += min(keyword_density * 2, 0.2) | |
| enhanced_score += min(context_score, 0.5) | |
| enhanced_score = min(enhanced_score, 1.0) | |
| logger.info(f"Enhanced scoring - High-risk: {high_risk_count}, " | |
| f"Total: {total_keyword_count}, " | |
| f"Density: {keyword_density:.3f}, " | |
| f"Context: {context_score:.3f}, " | |
| f"Score: {enhanced_score:.3f}, " | |
| f"Patterns: {matched_patterns}") | |
| return enhanced_score, high_risk_count, total_keyword_count | |
| except Exception as e: | |
| logger.error(f"Enhanced scoring error: {e}") | |
| return 0.0, 0, 0 | |
| def compute_multimodal_risk(pred_label: int, pred_prob: float, text: str, | |
| simulated_text: str, detected_keywords: Dict) -> Tuple[float, int]: | |
| """ENHANCED multimodal risk assessment""" | |
| try: | |
| enhanced_score, high_risk_count, total_keyword_count = compute_enhanced_drug_score( | |
| text, simulated_text, detected_keywords | |
| ) | |
| # Weighting logic | |
| if high_risk_count >= 1: | |
| model_weight, keyword_weight = 0.2, 0.8 | |
| decision_reason = f"High-risk keywords detected (count={high_risk_count})" | |
| elif total_keyword_count >= 3: | |
| model_weight, keyword_weight = 0.3, 0.7 | |
| decision_reason = f"Strong keyword evidence (count={total_keyword_count})" | |
| elif high_risk_count >= 1 or total_keyword_count >= 2: | |
| model_weight, keyword_weight = 0.4, 0.6 | |
| decision_reason = f"Moderate keyword evidence" | |
| else: | |
| model_weight, keyword_weight = 0.7, 0.3 | |
| decision_reason = f"Relying on ML model" | |
| # Score combination | |
| risk_score = (model_weight * pred_prob) + (keyword_weight * enhanced_score) | |
| # Decision logic | |
| if high_risk_count >= 1: | |
| adjusted_pred_label = 1 | |
| final_reason = f"DRUG - High-risk keywords: {high_risk_count}" | |
| elif enhanced_score >= 0.4: | |
| adjusted_pred_label = 1 | |
| final_reason = f"DRUG - Strong keyword evidence: {enhanced_score:.3f}" | |
| elif enhanced_score >= 0.3 and pred_prob >= 0.2: | |
| adjusted_pred_label = 1 | |
| final_reason = f"DRUG - Combined evidence: enhanced={enhanced_score:.3f}, ml={pred_prob:.3f}" | |
| elif pred_prob >= config.THRESHOLD: | |
| adjusted_pred_label = 1 | |
| final_reason = f"DRUG - High ML confidence: {pred_prob:.3f}" | |
| else: | |
| adjusted_pred_label = 0 | |
| final_reason = f"NON_DRUG - Low confidence: enhanced={enhanced_score:.3f}, ml={pred_prob:.3f}" | |
| # Risk score adjustment | |
| if adjusted_pred_label == 1 and risk_score < 0.5: | |
| risk_score = max(risk_score, 0.6) | |
| logger.info(f"Risk assessment - {final_reason}, final_risk={risk_score:.4f}") | |
| return min(max(risk_score, 0.0), 1.0), adjusted_pred_label | |
| except Exception as e: | |
| logger.error(f"Risk assessment error: {e}") | |
| return 0.5, 0 | |
| def is_valid_audio(file_path) -> bool: | |
| """Check if the file is a valid audio by inspecting metadata""" | |
| try: | |
| info = mediainfo(file_path) | |
| return info.get("duration") is not None | |
| except: | |
| return False | |
| def estimate_processing_time(audio_path): | |
| """Estimate processing time based on audio duration""" | |
| try: | |
| info = mediainfo(audio_path) | |
| duration_seconds = float(info.get("duration", 0)) | |
| transcription_time = max(duration_seconds * 0.25, 5) | |
| analysis_time = 5 | |
| total_time = transcription_time + analysis_time | |
| return { | |
| "total": int(total_time), | |
| "transcription": int(transcription_time), | |
| "analysis": analysis_time | |
| } | |
| except: | |
| return {"total": 30, "transcription": 25, "analysis": 5} | |
| def show_activity_indicator(): | |
| """Show that system is active during long operations""" | |
| activity_messages = [ | |
| "π§ AI models are thinking...", | |
| "π Analyzing speech patterns...", | |
| "π Computing risk scores...", | |
| "π― Detecting keywords...", | |
| "β‘ Almost done...", | |
| ] | |
| import random | |
| return random.choice(activity_messages) | |
| def main(): | |
| """Production main application""" | |
| # Initialize variables early | |
| uploaded_file = None | |
| audio_path = None | |
| try: | |
| # Initialize production logging | |
| setup_production_logging() | |
| # Page configuration | |
| st.set_page_config( | |
| page_title="π¨ Drug Audio Analyzer", | |
| layout="wide", | |
| initial_sidebar_state="collapsed" | |
| ) | |
| st.title("π¨ Audio-Based Drug Conversation Detection System") | |
| st.markdown( | |
| "This AI powered system analyzes uploaded conversations to detect potential drug-related content, " | |
| "highlight risk keywords, and provide actionable insights to the Karnataka Police." | |
| ) | |
| # Initialize models with progress tracking | |
| init_progress = st.progress(0) | |
| init_status = st.empty() | |
| init_status.text("Step 1/3: Validating model files...") | |
| init_progress.progress(33) | |
| # Model validation | |
| model_available, model_msg = model_manager.validate_model_availability() | |
| if model_available: | |
| init_status.text("Step 2/3: Model validation successful") | |
| init_progress.progress(66) | |
| else: | |
| init_status.error(f"β Model validation failed: {model_msg}") | |
| st.stop() | |
| init_status.text("Step 3/3: System ready for audio processing") | |
| init_progress.progress(100) | |
| # Clear initialization progress | |
| init_progress.empty() | |
| init_status.empty() | |
| # Sidebar with system info | |
| with st.sidebar: | |
| st.success("β System Status: Operational") | |
| if st.button("ποΈ Clear System Cache"): | |
| st.cache_resource.clear() | |
| st.success("Cache cleared successfully!") | |
| # File Input Section | |
| st.subheader("π Select Audio Source") | |
| st.info(f"π΅ Formats: {', '.join(config.ALLOWED_EXTENSIONS)}") | |
| st.info(f"β±οΈ Max duration: {config.MAX_AUDIO_DURATION//60} minutes") | |
| input_option = st.radio( | |
| "Choose audio input:", | |
| ["Upload your own file", "Use sample test file"] | |
| ) | |
| if input_option == "Upload your own file": | |
| uploaded_file = st.file_uploader( | |
| "π Upload an audio file", | |
| type=None, | |
| help="All audio formats supported (wav, mp3, m4a, flac, ogg, etc.)" | |
| ) | |
| if uploaded_file: | |
| # Check file size | |
| file_size_mb = uploaded_file.size / (1024 * 1024) | |
| if file_size_mb > config.MAX_FILE_SIZE_MB: | |
| st.error(f"β File too large: {file_size_mb:.2f} MB. Max allowed is {config.MAX_FILE_SIZE_MB} MB.") | |
| st.stop() | |
| # Validate file | |
| file_valid, file_msg = AudioValidator.validate_file(uploaded_file) | |
| if not file_valid: | |
| st.error(f"β {file_msg}") | |
| logger.warning(f"File validation failed: {file_msg}") | |
| st.stop() | |
| st.success(f"β {file_msg}") | |
| # Create temp file and validate audio | |
| audio_path = file_manager.create_secure_temp_file(uploaded_file) | |
| if not is_valid_audio(audio_path): | |
| st.error("β Uploaded file is not a valid audio") | |
| st.stop() | |
| elif input_option == "Use sample test file": | |
| sample_dir = "data/audio_sample" | |
| if os.path.exists(sample_dir): | |
| sample_files = [ | |
| f for f in os.listdir(sample_dir) | |
| if f.lower().endswith((".wav", ".mp3", ".flac", ".ogg", ".m4a")) | |
| ] | |
| if sample_files: | |
| sample_files_display = ["-- Select a sample file --"] | |
| for f in sample_files: | |
| file_path = os.path.join(sample_dir, f) | |
| size_mb = os.path.getsize(file_path) / (1024 * 1024) | |
| sample_files_display.append(f"{f} ({size_mb:.2f} MB)") | |
| selected_sample = st.selectbox("π΅ Choose a sample test file:", sample_files_display) | |
| if selected_sample != "-- Select a sample file --": | |
| selected_file = selected_sample.split(" (")[0] | |
| audio_path = os.path.join(sample_dir, selected_file) | |
| file_size_mb = os.path.getsize(audio_path) / (1024 * 1024) | |
| if file_size_mb > config.MAX_FILE_SIZE_MB: | |
| st.warning(f"β οΈ This sample file exceeds the max allowed size ({config.MAX_FILE_SIZE_MB} MB).") | |
| else: | |
| st.warning("β οΈ No sample files found.") | |
| else: | |
| st.error(f"β Sample folder not found: {sample_dir}") | |
| # Check if audio file is selected | |
| if not audio_path: | |
| st.info("Please upload a file or select a sample test file to continue.") | |
| st.stop() | |
| # Audio player and file info | |
| st.audio(audio_path) | |
| file_info = os.path.getsize(audio_path) / (1024 * 1024) | |
| # Get the correct filename to display | |
| if uploaded_file: | |
| display_filename = uploaded_file.name | |
| else: | |
| display_filename = os.path.basename(audio_path) | |
| # Start processing button | |
| if st.button("π Start Audio Analysis", type="primary", use_container_width=True): | |
| st.balloons() | |
| # Create processing stages | |
| st.markdown("---") | |
| st.markdown("### π Audio Processing Pipeline") | |
| # Stage 1: Model Loading | |
| with st.container(): | |
| stage1_col1, stage1_col2 = st.columns([1, 4]) | |
| with stage1_col1: | |
| st.markdown("**Stage 1:**") | |
| with stage1_col2: | |
| with st.spinner("Loading Whisper speech recognition model..."): | |
| model = load_whisper_model() | |
| st.success("β Speech recognition model loaded successfully") | |
| # Stage 2: Audio Transcription | |
| with st.container(): | |
| stage2_col1, stage2_col2 = st.columns([1, 4]) | |
| with stage2_col1: | |
| st.markdown("**Stage 2:**") | |
| with stage2_col2: | |
| transcription_container = st.empty() | |
| transcription_container.info("π€ Starting audio transcription...") | |
| progress_bar = st.progress(0) | |
| status_text = st.empty() | |
| start_time = time.time() | |
| def update_progress(pct, message=""): | |
| elapsed = time.time() - start_time | |
| estimated_total = elapsed / (pct/100) if pct > 0 else 0 | |
| remaining = max(0, estimated_total - elapsed) | |
| progress_bar.progress(pct) | |
| status_text.text(f"πΉ {message} ({pct}%, ETA ~{int(remaining)}s)") | |
| status_text.text("πΉ Preparing audio for transcription...") | |
| progress_bar.progress(10) | |
| time.sleep(0.5) | |
| status_text.text("πΉ Running speech-to-text analysis...") | |
| progress_bar.progress(30) | |
| transcription = transcribe_audio_production(model, audio_path, progress_callback=update_progress) | |
| progress_bar.empty() | |
| status_text.empty() | |
| transcription_container.success(f"β Transcription completed ({len(transcription)} characters)") | |
| # Show transcription results | |
| if transcription: | |
| st.markdown("### π Transcription Results") | |
| # Raw transcription | |
| with st.expander("View Raw Transcription", expanded=True): | |
| st.text_area("Transcribed Text:", value=transcription, height=100, disabled=True) | |
| # Generate voice-based conversation simulation | |
| st.info("π€ Analyzing speech patterns to identify potential speakers...") | |
| with st.spinner("Processing speaker analysis..."): | |
| simulated_text = simulate_conversation_voice_based(transcription) | |
| if simulated_text: | |
| with st.expander("View Voice-Based Speaker Analysis", expanded=False): | |
| st.text_area("Speaker Analysis:", value=simulated_text, height=150, disabled=True) | |
| st.caption("π§ AI-detected speaker changes based on speech patterns, tone indicators, and conversational cues") | |
| # Compute speaker statistics | |
| lines = simulated_text.split('\n') | |
| speaker_a_lines = sum(1 for line in lines if line.startswith('Speaker_A:')) | |
| speaker_b_lines = sum(1 for line in lines if line.startswith('Speaker_B:')) | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.metric("ποΈ Speaker A Lines", speaker_a_lines) | |
| with col2: | |
| st.metric("ποΈ Speaker B Lines", speaker_b_lines) | |
| if speaker_b_lines > 0: | |
| st.success("β Multiple speakers detected in conversation") | |
| else: | |
| st.info("βΉοΈ Single speaker detected (monologue)") | |
| else: | |
| # Fallback if voice-based detection fails | |
| simulated_text = transcription # Use raw transcription as fallback | |
| st.warning("β οΈ Voice-based speaker detection failed. Using raw transcription for analysis.") | |
| else: | |
| st.error("β οΈ No transcription produced. Please check the audio file.") | |
| st.stop() | |
| # Stage 3: Analysis | |
| with st.container(): | |
| stage4_col1, stage4_col2 = st.columns([1, 4]) | |
| with stage4_col1: | |
| st.markdown("**Stage 3:**") | |
| with stage4_col2: | |
| analysis_container = st.empty() | |
| analysis_container.info("π§ Running AI analysis and keyword detection...") | |
| analysis_steps = st.empty() | |
| analysis_steps.text("β Running ML model prediction...") | |
| pred_label, raw_prob = predict(transcription) | |
| analysis_steps.text("β Using voice-based conversation analysis...") | |
| analysis_steps.text("β Detecting drug-related keywords...") | |
| highlighted_html, detected_keywords = highlight_drug_lines_html(simulated_text, DRUG_KEYWORDS) | |
| analysis_steps.text("β Computing risk assessment...") | |
| risk_score, adjusted_prediction = compute_multimodal_risk( | |
| pred_label, raw_prob, transcription, simulated_text, detected_keywords | |
| ) | |
| analysis_steps.empty() | |
| analysis_container.success("β Analysis completed successfully") | |
| st.markdown("---") | |
| st.success("π **Processing Complete!** Results are shown below.") | |
| # Enhanced Analysis Section | |
| st.subheader("π Enhanced Analysis") | |
| enhanced_score, high_risk_count, total_keyword_count = compute_enhanced_drug_score( | |
| transcription, simulated_text, detected_keywords | |
| ) | |
| st.write(f"**High-Risk Keywords Detected:** {high_risk_count}") | |
| st.write(f"**Total Drug Keywords Detected:** {total_keyword_count}") | |
| st.write(f"**Enhanced Drug Score:** {enhanced_score:.2f}/1.0") | |
| # Results presentation | |
| st.markdown("---") | |
| st.subheader("π Analysis Results") | |
| # Main result display | |
| if adjusted_prediction == 1: | |
| st.markdown( | |
| """ | |
| <div style='padding: 1.5rem; background: linear-gradient(90deg, #ffebee 0%, #ffcdd2 100%); | |
| border-left: 6px solid #d32f2f; border-radius: 8px; margin: 1rem 0;'> | |
| <h2 style='color: #c62828; margin: 0; display: flex; align-items: center;'> | |
| π¨ DRUG-RELATED CONTENT DETECTED | |
| </h2> | |
| <p style='margin: 0.5rem 0 0 0; color: #5d4037; font-size: 1.1rem;'> | |
| <strong>High-confidence detection of drug-related conversation patterns</strong> | |
| </p> | |
| </div> | |
| """, | |
| unsafe_allow_html=True | |
| ) | |
| # Confidence assessment | |
| if enhanced_score >= 0.6: | |
| confidence_level = "HIGH" | |
| confidence_color = "red" | |
| elif enhanced_score >= 0.3: | |
| confidence_level = "MEDIUM" | |
| confidence_color = "orange" | |
| else: | |
| confidence_level = "LOW" | |
| confidence_color = "yellow" | |
| st.markdown(f"**Confidence Level:** <span style='color: {confidence_color}; font-weight: bold;'>{confidence_level}</span>", | |
| unsafe_allow_html=True) | |
| else: | |
| st.markdown( | |
| """ | |
| <div style='padding: 1.5rem; background: linear-gradient(90deg, #e8f5e8 0%, #c8e6c9 100%); | |
| border-left: 6px solid #388e3c; border-radius: 8px; margin: 1rem 0;'> | |
| <h2 style='color: #2e7d32; margin: 0; display: flex; align-items: center;'> | |
| β NO DRUG CONTENT DETECTED | |
| </h2> | |
| <p style='margin: 0.5rem 0 0 0; color: #2d5016; font-size: 1.1rem;'> | |
| <strong>Conversation appears to be non-drug related</strong> | |
| </p> | |
| </div> | |
| """, | |
| unsafe_allow_html=True | |
| ) | |
| # Metrics dashboard | |
| col1, col2, col3, col4 = st.columns(4) | |
| with col1: | |
| st.metric( | |
| "π€ ML Model Analysis", | |
| f"{raw_prob:.1%}", | |
| f"{'Drug' if pred_label == 1 else 'Non-Drug'}" | |
| ) | |
| with col2: | |
| st.metric( | |
| "π― Enhanced Score Analysis", | |
| f"{enhanced_score:.1%}", | |
| f"{high_risk_count} high-risk" | |
| ) | |
| with col3: | |
| st.metric( | |
| "β οΈ Risk Level", | |
| f"{risk_score:.1%}", | |
| "π΄ CRITICAL" if risk_score >= 0.7 else | |
| "π HIGH" if risk_score >= 0.5 else | |
| "π‘ MEDIUM" if risk_score >= 0.3 else "π’ LOW" | |
| ) | |
| with col4: | |
| st.metric( | |
| "π Keywords Found", | |
| f"{total_keyword_count}", | |
| f"{len(detected_keywords)} flagged lines" | |
| ) | |
| # Drug highlights section | |
| if adjusted_prediction == 1: | |
| st.subheader("π‘ Drug-Related Lines Highlighted") | |
| st.markdown(highlighted_html, unsafe_allow_html=True) | |
| if detected_keywords: | |
| st.subheader("π Detected Keywords per Line") | |
| for line, kws in detected_keywords.items(): | |
| high_risk_kws = [kw for kw in kws if kw.lower() in [hr.lower() for hr in HIGH_RISK_KEYWORDS]] | |
| regular_kws = [kw for kw in kws if kw not in high_risk_kws] | |
| display_text = f"**Line:** `{line}`\n" | |
| if high_risk_kws: | |
| display_text += f"π¨ **High-Risk Keywords:** {', '.join(high_risk_kws)}\n" | |
| if regular_kws: | |
| display_text += f"β οΈ **Other Keywords:** {', '.join(regular_kws)}" | |
| st.markdown(display_text) | |
| # Final Risk Assessment section | |
| st.subheader("π¨ Final Risk Assessment") | |
| st.write(f"**Overall Risk Score:** {risk_score:.2f}/1.0") | |
| # Determine risk level | |
| if risk_score >= 0.7: | |
| risk_level = "π΄ **CRITICAL RISK**" | |
| elif risk_score >= 0.5: | |
| risk_level = "π **HIGH RISK**" | |
| elif risk_score >= 0.3: | |
| risk_level = "π‘ **MEDIUM RISK**" | |
| else: | |
| risk_level = "π’ **LOW RISK**" | |
| st.markdown(f"**Risk Level:** {risk_level}") | |
| # Show comparison between ML and enhanced prediction | |
| if pred_label != adjusted_prediction: | |
| st.info(f"π **Prediction Adjusted**: ML model predicted {'DRUG' if pred_label == 1 else 'NON_DRUG'}, " | |
| f"but enhanced analysis adjusted it to {'DRUG' if adjusted_prediction == 1 else 'NON_DRUG'}") | |
| # System analysis summary | |
| st.markdown("---") | |
| st.subheader("π Analysis Summary") | |
| # Create summary dataframe | |
| summary_data = { | |
| "Analysis Component": [ | |
| "ML Model Prediction", | |
| "Enhanced Prediction", | |
| "Overall Risk Score", | |
| "High-Risk Keywords", | |
| "Total Keywords Detected", | |
| "Flagged Conversation Lines", | |
| "Processing Status" | |
| ], | |
| "Result": [ | |
| f"{'DRUG' if pred_label == 1 else 'NON_DRUG'} ({raw_prob:.1%} confidence)", | |
| f"{'DRUG' if adjusted_prediction == 1 else 'NON_DRUG'}", | |
| f"{risk_score:.1%} ({'CRITICAL' if risk_score >= 0.7 else 'HIGH' if risk_score >= 0.5 else 'MEDIUM' if risk_score >= 0.3 else 'LOW'})", | |
| str(high_risk_count), | |
| str(total_keyword_count), | |
| str(len(detected_keywords)), | |
| "β Complete" | |
| ] | |
| } | |
| summary_df = pd.DataFrame(summary_data) | |
| st.dataframe(summary_df, use_container_width=True, hide_index=True) | |
| # API integration for drug content | |
| if adjusted_prediction == 1: | |
| analysis_data = { | |
| "type": "audio_analysis", | |
| "filename": uploaded_file.name if uploaded_file else "sample_file", | |
| "prediction": "DRUG", | |
| "confidence": risk_score, | |
| "keywords_detected": total_keyword_count, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| send_analysis_to_api(analysis_data) | |
| # Download analysis report | |
| if adjusted_prediction == 1: | |
| st.markdown("---") | |
| st.markdown("### π₯ Export Analysis Report") | |
| # Create detailed report | |
| report_data = { | |
| "timestamp": [pd.Timestamp.now()], | |
| "filename": [uploaded_file.name if uploaded_file else "sample_file"], | |
| "file_size_mb": [uploaded_file.size / (1024*1024) if uploaded_file else file_info], | |
| "ml_prediction": ["DRUG" if pred_label == 1 else "NON_DRUG"], | |
| "ml_confidence": [raw_prob], | |
| "enhanced_prediction": ["DRUG" if adjusted_prediction == 1 else "NON_DRUG"], | |
| "risk_score": [risk_score], | |
| "high_risk_keywords": [high_risk_count], | |
| "total_keywords": [total_keyword_count], | |
| "flagged_lines": [len(detected_keywords)], | |
| "transcription_length": [len(transcription)] | |
| } | |
| report_df = pd.DataFrame(report_data) | |
| csv_data = report_df.to_csv(index=False).encode("utf-8") | |
| st.download_button( | |
| label="π Download Analysis Report (CSV)", | |
| data=csv_data, | |
| file_name=f"drug_analysis_report_{pd.Timestamp.now().strftime('%Y%m%d_%H%M%S')}.csv", | |
| mime="text/csv" | |
| ) | |
| # Debug section | |
| with st.expander("π Debug Information (Click to expand)"): | |
| st.write("**Text being analyzed:**") | |
| st.code(transcription) | |
| detected_keywords_full = [kw for kw in DRUG_KEYWORDS if kw.lower() in transcription.lower()] | |
| detected_high_risk = [kw for kw in HIGH_RISK_KEYWORDS if kw.lower() in transcription.lower()] | |
| st.write(f"**All keywords found in full text:** {detected_keywords_full}") | |
| st.write(f"**High-risk keywords found:** {detected_high_risk}") | |
| st.write(f"**Line-by-line detection:** {detected_keywords}") | |
| # Check context patterns | |
| matched_contexts = [] | |
| for pattern in DRUG_CONTEXT_PATTERNS: | |
| if re.search(pattern, transcription): | |
| matched_contexts.append(pattern) | |
| st.write(f"**Context patterns matched:** {len(matched_contexts)}") | |
| except Exception as e: | |
| logger.error(f"Processing error: {e}") | |
| logger.error(traceback.format_exc()) | |
| st.error(f"β Processing failed: {str(e)}") | |
| st.error("Please check the logs for more details or contact the system administrator.") | |
| finally: | |
| # Cleanup temporary file | |
| try: | |
| if uploaded_file and audio_path and not file_manager.is_sample_file(audio_path): | |
| file_manager.cleanup_file(audio_path, is_temp=True) | |
| except Exception as e: | |
| logger.warning(f"Failed to delete temporary file {audio_path}: {e}") | |
| if __name__ == "__main__": | |
| main() |