# src/streamlit_app.py import os import re import tempfile import pandas as pd import matplotlib.pyplot as plt import streamlit as st import requests import logging import traceback import io import time from pydub.utils import mediainfo from datetime import datetime from typing import Optional, Tuple, Dict, Any from transformers import pipeline import torch from predict import predict, load_model from config import config # ========================= # API CONFIGURATION # ========================= @st.cache_data(ttl=60) def get_api_base(): """Get API base URL from secrets or environment""" try: api_base = st.secrets.get("API_BASE", "") if api_base: return api_base except: pass return os.getenv("API_BASE", "http://localhost:5000") API_BASE = get_api_base() @st.cache_data(ttl=30) def check_api_health(): """Check if Flask API is healthy""" try: response = requests.get(f"{API_BASE}/healthz", timeout=5) if response.status_code == 200: return True, response.json() else: return False, {"error": f"HTTP {response.status_code}"} except requests.exceptions.RequestException as e: return False, {"error": str(e)} def send_analysis_to_api(analysis_data): """Send analysis results to Flask API for storage""" try: response = requests.post(f"{API_BASE}/api/analysis", json=analysis_data, timeout=10) return response.status_code == 200 except requests.exceptions.RequestException as e: logger.error(f"Failed to send analysis to API: {e}") return False # Access keywords via config instance DRUG_KEYWORDS = config.DRUG_KEYWORDS HIGH_RISK_KEYWORDS = config.HIGH_RISK_KEYWORDS from utils import ( logger, security_manager, file_manager, model_manager, setup_production_logging, AudioValidator, is_valid_audio ) # Load model once at app startup load_model(config.MODEL_PATH) # Additional context patterns for better detection DRUG_CONTEXT_PATTERNS = [ r'(?i)(picked?\s*(it|them)\s*up|got\s*the\s*(stuff|package|goods))', r'(?i)(meet\s*(at|near|behind)|behind\s*the\s*(metro|station))', r'(?i)(too\s*risky|cops?\s*(were|are)\s*there)', r'(?i)(same\s*source|better\s*this\s*time)', r'(?i)(payment|pay|crypto|money|cash)\s*(through|via|using)', r'(?i)(bringing|getting|delivery)', r'(?i)(saturday|party|rave)', r'(?i)(mumbai|supplier)', r'(?i)(straight\s*from|coming\s*from)' ] @st.cache_resource def load_whisper_model(): """Load Whisper model using Transformers pipeline - HF Spaces compatible""" try: device = 0 if torch.cuda.is_available() else -1 transcriber = pipeline( "automatic-speech-recognition", model="openai/whisper-tiny.en", device=device, return_timestamps=False ) logger.info("Loaded Whisper model via transformers pipeline") return transcriber except Exception as e: logger.error(f"Failed to load Whisper model: {e}") st.error("Failed to load speech recognition model. Please contact system administrator.") st.stop() def transcribe_audio_production(model, audio_path: str, progress_callback=None) -> str: """Transcribe audio using transformers pipeline""" try: if progress_callback: progress_callback(50, "Transcribing audio...") result = model(audio_path) transcription = result.get("text", "").strip() if progress_callback: progress_callback(80, "Transcription completed") if not transcription: logger.warning("Empty transcription result") return "" logger.info(f"Transcription completed: {len(transcription)} characters") return transcription except Exception as e: logger.error(f"Transcription failed: {e}") raise e import re from sentence_transformers import SentenceTransformer, util # Load sentence embedding model (for semantic similarity) model = SentenceTransformer('all-MiniLM-L6-v2') def simulate_conversation_voice_based(transcribed_text: str, similarity_threshold: float = 0.7) -> str: """ Enhanced voice-based conversation simulation. Detects speaker changes using: - Heuristic patterns - Punctuation and prosody - Semantic similarity between sentences """ if not transcribed_text: return "" # Split transcript into sentences sentences = re.split(r'(?<=[?.!âĻ])\s+', transcribed_text.strip()) # Voice change patterns (expanded fillers, discourse markers, interjections) voice_change_patterns = [ r'\b(yeah|yes|okay|alright|sure|no|nah|uh|um|hmm|oh)\b', r'\b(but|however|actually|wait|hold on|well|anyway|so|then|now|listen|look|see|hey)\b', r'\b(what|how|when|where|why|who)\b', r'\b(i think|i mean|you know|like|I guess|maybe)\b' ] # Compile regex once for speed voice_change_regex = re.compile("|".join(voice_change_patterns), re.IGNORECASE) convo_lines = [] current_speaker = "Speaker_A" for i, sentence in enumerate(sentences): if not sentence.strip(): continue speaker_change = False # Heuristic: voice change indicators if voice_change_regex.search(sentence): if i > 0 and convo_lines: if re.match(r'^\s*(yeah|yes|okay|but|what|no|hey|well|listen|um|uh|oh)', sentence.strip(), re.IGNORECASE): speaker_change = True # Additional heuristics if i > 0 and not speaker_change: prev_sentence = sentences[i-1] if i-1 < len(sentences) else "" # Punctuation-based: previous sentence ends with question, current starts like an answer if prev_sentence.endswith("?") and re.match(r'^\s*(yeah|yes|no|sure|maybe|i|uh|um)', sentence.strip(), re.IGNORECASE): speaker_change = True # Semantic similarity check try: sim = util.cos_sim(model.encode(prev_sentence), model.encode(sentence)).item() if sim < similarity_threshold: speaker_change = True except: pass # fallback if embedding fails # Length + transition word heuristic if re.search(r'\b(anyway|so|well|now|then|alright|listen)\b', sentence.strip(), re.IGNORECASE) and len(sentence.split()) > 3: speaker_change = True # Switch speaker if change detected if speaker_change: current_speaker = "Speaker_B" if current_speaker == "Speaker_A" else "Speaker_A" convo_lines.append(f"{current_speaker}: {sentence.strip()}") return "\n".join(convo_lines) def highlight_drug_lines_html(conversation_text: str, keywords: list) -> Tuple[str, Dict]: """ENHANCED version with regex word boundary matching and context-aware keyword detection""" if not conversation_text: return "", {} AMBIGUOUS_TERMS = {"e", "x", "line", "ice", "horse", "420"} def has_context_verbs(text): """Check if drug-related verbs are present in the text around slang keywords""" return bool(re.search(r'\b(smoke|roll|pop|hit|take|buy|sell|party|snort|inject)\b', text, re.IGNORECASE)) def is_keyword_in_line(line: str, kw: str) -> bool: """Return True if keyword found with appropriate context for ambiguous slang""" pattern = rf'\b{re.escape(kw)}\b' if re.search(pattern, line, re.IGNORECASE): if kw in AMBIGUOUS_TERMS: return has_context_verbs(line) return True return False lines = conversation_text.split("\n") line_hits = {} highlighted_lines = [] total_keyword_matches = 0 for line in lines: hits = [] for kw in sorted(keywords, key=len, reverse=True): if ' ' in kw: # Multi-word keywords pattern = rf'\b{re.escape(kw)}\b' if re.search(pattern, line, re.IGNORECASE): hits.append(kw) total_keyword_matches += 1 else: # Single word keywords if is_keyword_in_line(line, kw): hits.append(kw) total_keyword_matches += 1 if hits: highlighted_lines.append(f"
[DRUG] {line}
") line_hits[line] = hits else: highlighted_lines.append(f"{line}
") logger.info(f"Keyword detection: {total_keyword_matches} matches across {len(line_hits)} lines") return "".join(highlighted_lines), line_hits def compute_enhanced_drug_score(text: str, conversation_text: str, detected_keywords: Dict) -> Tuple[float, int, int]: """ENHANCED drug detection scoring""" try: # Count keywords from detected_keywords high_risk_count = 0 total_keyword_count = 0 for line_keywords in detected_keywords.values(): total_keyword_count += len(line_keywords) for kw in line_keywords: if kw.lower() in [hr.lower() for hr in HIGH_RISK_KEYWORDS]: high_risk_count += 1 # Check full text for missed keywords text_lower = text.lower() additional_high_risk = sum(1 for kw in HIGH_RISK_KEYWORDS if kw.lower() in text_lower) additional_total = sum(1 for kw in DRUG_KEYWORDS if kw.lower() in text_lower) # Use the higher count high_risk_count = max(high_risk_count, additional_high_risk) total_keyword_count = max(total_keyword_count, additional_total) # Keyword density total_words = len(text.split()) keyword_density = total_keyword_count / max(total_words, 1) # Context pattern scoring context_score = 0 matched_patterns = 0 for pattern in DRUG_CONTEXT_PATTERNS: if re.search(pattern, text): context_score += 0.15 matched_patterns += 1 # Enhanced scoring calculation enhanced_score = 0 if high_risk_count > 0: enhanced_score += min(high_risk_count * 0.4, 0.8) enhanced_score += min(keyword_density * 2, 0.2) enhanced_score += min(context_score, 0.5) enhanced_score = min(enhanced_score, 1.0) logger.info(f"Enhanced scoring - High-risk: {high_risk_count}, " f"Total: {total_keyword_count}, " f"Density: {keyword_density:.3f}, " f"Context: {context_score:.3f}, " f"Score: {enhanced_score:.3f}, " f"Patterns: {matched_patterns}") return enhanced_score, high_risk_count, total_keyword_count except Exception as e: logger.error(f"Enhanced scoring error: {e}") return 0.0, 0, 0 def compute_multimodal_risk(pred_label: int, pred_prob: float, text: str, simulated_text: str, detected_keywords: Dict) -> Tuple[float, int]: """ENHANCED multimodal risk assessment""" try: enhanced_score, high_risk_count, total_keyword_count = compute_enhanced_drug_score( text, simulated_text, detected_keywords ) # Weighting logic if high_risk_count >= 1: model_weight, keyword_weight = 0.2, 0.8 decision_reason = f"High-risk keywords detected (count={high_risk_count})" elif total_keyword_count >= 3: model_weight, keyword_weight = 0.3, 0.7 decision_reason = f"Strong keyword evidence (count={total_keyword_count})" elif high_risk_count >= 1 or total_keyword_count >= 2: model_weight, keyword_weight = 0.4, 0.6 decision_reason = f"Moderate keyword evidence" else: model_weight, keyword_weight = 0.7, 0.3 decision_reason = f"Relying on ML model" # Score combination risk_score = (model_weight * pred_prob) + (keyword_weight * enhanced_score) # Decision logic if high_risk_count >= 1: adjusted_pred_label = 1 final_reason = f"DRUG - High-risk keywords: {high_risk_count}" elif enhanced_score >= 0.4: adjusted_pred_label = 1 final_reason = f"DRUG - Strong keyword evidence: {enhanced_score:.3f}" elif enhanced_score >= 0.3 and pred_prob >= 0.2: adjusted_pred_label = 1 final_reason = f"DRUG - Combined evidence: enhanced={enhanced_score:.3f}, ml={pred_prob:.3f}" elif pred_prob >= config.THRESHOLD: adjusted_pred_label = 1 final_reason = f"DRUG - High ML confidence: {pred_prob:.3f}" else: adjusted_pred_label = 0 final_reason = f"NON_DRUG - Low confidence: enhanced={enhanced_score:.3f}, ml={pred_prob:.3f}" # Risk score adjustment if adjusted_pred_label == 1 and risk_score < 0.5: risk_score = max(risk_score, 0.6) logger.info(f"Risk assessment - {final_reason}, final_risk={risk_score:.4f}") return min(max(risk_score, 0.0), 1.0), adjusted_pred_label except Exception as e: logger.error(f"Risk assessment error: {e}") return 0.5, 0 def is_valid_audio(file_path) -> bool: """Check if the file is a valid audio by inspecting metadata""" try: info = mediainfo(file_path) return info.get("duration") is not None except: return False def estimate_processing_time(audio_path): """Estimate processing time based on audio duration""" try: info = mediainfo(audio_path) duration_seconds = float(info.get("duration", 0)) transcription_time = max(duration_seconds * 0.25, 5) analysis_time = 5 total_time = transcription_time + analysis_time return { "total": int(total_time), "transcription": int(transcription_time), "analysis": analysis_time } except: return {"total": 30, "transcription": 25, "analysis": 5} def show_activity_indicator(): """Show that system is active during long operations""" activity_messages = [ "đ§ AI models are thinking...", "đ Analyzing speech patterns...", "đ Computing risk scores...", "đ¯ Detecting keywords...", "⥠Almost done...", ] import random return random.choice(activity_messages) def main(): """Production main application""" # Initialize variables early uploaded_file = None audio_path = None try: # Initialize production logging setup_production_logging() # Page configuration st.set_page_config( page_title="đ¨ Drug Audio Analyzer", layout="wide", initial_sidebar_state="collapsed" ) st.title("đ¨ Audio-Based Drug Conversation Detection System") st.markdown( "This AI powered system analyzes uploaded conversations to detect potential drug-related content, " "highlight risk keywords, and provide actionable insights to the Karnataka Police." ) # Initialize models with progress tracking init_progress = st.progress(0) init_status = st.empty() init_status.text("Step 1/3: Validating model files...") init_progress.progress(33) # Model validation model_available, model_msg = model_manager.validate_model_availability() if model_available: init_status.text("Step 2/3: Model validation successful") init_progress.progress(66) else: init_status.error(f"â Model validation failed: {model_msg}") st.stop() init_status.text("Step 3/3: System ready for audio processing") init_progress.progress(100) # Clear initialization progress init_progress.empty() init_status.empty() # Sidebar with system info with st.sidebar: st.success("â System Status: Operational") if st.button("đī¸ Clear System Cache"): st.cache_resource.clear() st.success("Cache cleared successfully!") # File Input Section st.subheader("đ Select Audio Source") st.info(f"đĩ Formats: {', '.join(config.ALLOWED_EXTENSIONS)}") st.info(f"âąī¸ Max duration: {config.MAX_AUDIO_DURATION//60} minutes") input_option = st.radio( "Choose audio input:", ["Upload your own file", "Use sample test file"] ) if input_option == "Upload your own file": uploaded_file = st.file_uploader( "đ Upload an audio file", type=None, help="All audio formats supported (wav, mp3, m4a, flac, ogg, etc.)" ) if uploaded_file: # Check file size file_size_mb = uploaded_file.size / (1024 * 1024) if file_size_mb > config.MAX_FILE_SIZE_MB: st.error(f"â File too large: {file_size_mb:.2f} MB. Max allowed is {config.MAX_FILE_SIZE_MB} MB.") st.stop() # Validate file file_valid, file_msg = AudioValidator.validate_file(uploaded_file) if not file_valid: st.error(f"â {file_msg}") logger.warning(f"File validation failed: {file_msg}") st.stop() st.success(f"â {file_msg}") # Create temp file and validate audio audio_path = file_manager.create_secure_temp_file(uploaded_file) if not is_valid_audio(audio_path): st.error("â Uploaded file is not a valid audio") st.stop() elif input_option == "Use sample test file": sample_dir = "data/audio_sample" if os.path.exists(sample_dir): sample_files = [ f for f in os.listdir(sample_dir) if f.lower().endswith((".wav", ".mp3", ".flac", ".ogg", ".m4a")) ] if sample_files: sample_files_display = ["-- Select a sample file --"] for f in sample_files: file_path = os.path.join(sample_dir, f) size_mb = os.path.getsize(file_path) / (1024 * 1024) sample_files_display.append(f"{f} ({size_mb:.2f} MB)") selected_sample = st.selectbox("đĩ Choose a sample test file:", sample_files_display) if selected_sample != "-- Select a sample file --": selected_file = selected_sample.split(" (")[0] audio_path = os.path.join(sample_dir, selected_file) file_size_mb = os.path.getsize(audio_path) / (1024 * 1024) if file_size_mb > config.MAX_FILE_SIZE_MB: st.warning(f"â ī¸ This sample file exceeds the max allowed size ({config.MAX_FILE_SIZE_MB} MB).") else: st.warning("â ī¸ No sample files found.") else: st.error(f"â Sample folder not found: {sample_dir}") # Check if audio file is selected if not audio_path: st.info("Please upload a file or select a sample test file to continue.") st.stop() # Audio player and file info st.audio(audio_path) file_info = os.path.getsize(audio_path) / (1024 * 1024) # Get the correct filename to display if uploaded_file: display_filename = uploaded_file.name else: display_filename = os.path.basename(audio_path) # Start processing button if st.button("đ Start Audio Analysis", type="primary", use_container_width=True): st.balloons() # Create processing stages st.markdown("---") st.markdown("### đ Audio Processing Pipeline") # Stage 1: Model Loading with st.container(): stage1_col1, stage1_col2 = st.columns([1, 4]) with stage1_col1: st.markdown("**Stage 1:**") with stage1_col2: with st.spinner("Loading Whisper speech recognition model..."): model = load_whisper_model() st.success("â Speech recognition model loaded successfully") # Stage 2: Audio Transcription with st.container(): stage2_col1, stage2_col2 = st.columns([1, 4]) with stage2_col1: st.markdown("**Stage 2:**") with stage2_col2: transcription_container = st.empty() transcription_container.info("đ¤ Starting audio transcription...") progress_bar = st.progress(0) status_text = st.empty() start_time = time.time() def update_progress(pct, message=""): elapsed = time.time() - start_time estimated_total = elapsed / (pct/100) if pct > 0 else 0 remaining = max(0, estimated_total - elapsed) progress_bar.progress(pct) status_text.text(f"đš {message} ({pct}%, ETA ~{int(remaining)}s)") status_text.text("đš Preparing audio for transcription...") progress_bar.progress(10) time.sleep(0.5) status_text.text("đš Running speech-to-text analysis...") progress_bar.progress(30) transcription = transcribe_audio_production(model, audio_path, progress_callback=update_progress) progress_bar.empty() status_text.empty() transcription_container.success(f"â Transcription completed ({len(transcription)} characters)") # Show transcription results if transcription: st.markdown("### đ Transcription Results") # Raw transcription with st.expander("View Raw Transcription", expanded=True): st.text_area("Transcribed Text:", value=transcription, height=100, disabled=True) # Generate voice-based conversation simulation st.info("đ¤ Analyzing speech patterns to identify potential speakers...") with st.spinner("Processing speaker analysis..."): simulated_text = simulate_conversation_voice_based(transcription) if simulated_text: with st.expander("View Voice-Based Speaker Analysis", expanded=False): st.text_area("Speaker Analysis:", value=simulated_text, height=150, disabled=True) st.caption("đ§ AI-detected speaker changes based on speech patterns, tone indicators, and conversational cues") # Compute speaker statistics lines = simulated_text.split('\n') speaker_a_lines = sum(1 for line in lines if line.startswith('Speaker_A:')) speaker_b_lines = sum(1 for line in lines if line.startswith('Speaker_B:')) col1, col2 = st.columns(2) with col1: st.metric("đī¸ Speaker A Lines", speaker_a_lines) with col2: st.metric("đī¸ Speaker B Lines", speaker_b_lines) if speaker_b_lines > 0: st.success("â Multiple speakers detected in conversation") else: st.info("âšī¸ Single speaker detected (monologue)") else: # Fallback if voice-based detection fails simulated_text = transcription # Use raw transcription as fallback st.warning("â ī¸ Voice-based speaker detection failed. Using raw transcription for analysis.") else: st.error("â ī¸ No transcription produced. Please check the audio file.") st.stop() # Stage 3: Analysis with st.container(): stage4_col1, stage4_col2 = st.columns([1, 4]) with stage4_col1: st.markdown("**Stage 3:**") with stage4_col2: analysis_container = st.empty() analysis_container.info("đ§ Running AI analysis and keyword detection...") analysis_steps = st.empty() analysis_steps.text("â Running ML model prediction...") pred_label, raw_prob = predict(transcription) analysis_steps.text("â Using voice-based conversation analysis...") analysis_steps.text("â Detecting drug-related keywords...") highlighted_html, detected_keywords = highlight_drug_lines_html(simulated_text, DRUG_KEYWORDS) analysis_steps.text("â Computing risk assessment...") risk_score, adjusted_prediction = compute_multimodal_risk( pred_label, raw_prob, transcription, simulated_text, detected_keywords ) analysis_steps.empty() analysis_container.success("â Analysis completed successfully") st.markdown("---") st.success("đ **Processing Complete!** Results are shown below.") # Enhanced Analysis Section st.subheader("đ Enhanced Analysis") enhanced_score, high_risk_count, total_keyword_count = compute_enhanced_drug_score( transcription, simulated_text, detected_keywords ) st.write(f"**High-Risk Keywords Detected:** {high_risk_count}") st.write(f"**Total Drug Keywords Detected:** {total_keyword_count}") st.write(f"**Enhanced Drug Score:** {enhanced_score:.2f}/1.0") # Results presentation st.markdown("---") st.subheader("đ Analysis Results") # Main result display if adjusted_prediction == 1: st.markdown( """High-confidence detection of drug-related conversation patterns
Conversation appears to be non-drug related