audio-dashboard / src /streamlit_app.py
lawlevisan's picture
Update src/streamlit_app.py
63c80f9 verified
# src/streamlit_app.py
import os
import re
import tempfile
import pandas as pd
import matplotlib.pyplot as plt
import streamlit as st
import requests
import logging
import traceback
import io
import time
from pydub.utils import mediainfo
from datetime import datetime
from typing import Optional, Tuple, Dict, Any
from transformers import pipeline
import torch
from predict import predict, load_model
from config import config
# =========================
# API CONFIGURATION
# =========================
@st.cache_data(ttl=60)
def get_api_base():
"""Get API base URL from secrets or environment"""
try:
api_base = st.secrets.get("API_BASE", "")
if api_base:
return api_base
except:
pass
return os.getenv("API_BASE", "http://localhost:5000")
API_BASE = get_api_base()
@st.cache_data(ttl=30)
def check_api_health():
"""Check if Flask API is healthy"""
try:
response = requests.get(f"{API_BASE}/healthz", timeout=5)
if response.status_code == 200:
return True, response.json()
else:
return False, {"error": f"HTTP {response.status_code}"}
except requests.exceptions.RequestException as e:
return False, {"error": str(e)}
def send_analysis_to_api(analysis_data):
"""Send analysis results to Flask API for storage"""
try:
response = requests.post(f"{API_BASE}/api/analysis",
json=analysis_data,
timeout=10)
return response.status_code == 200
except requests.exceptions.RequestException as e:
logger.error(f"Failed to send analysis to API: {e}")
return False
# Access keywords via config instance
DRUG_KEYWORDS = config.DRUG_KEYWORDS
HIGH_RISK_KEYWORDS = config.HIGH_RISK_KEYWORDS
from utils import (
logger,
security_manager,
file_manager,
model_manager,
setup_production_logging,
AudioValidator,
is_valid_audio
)
# Load model once at app startup
load_model(config.MODEL_PATH)
# Additional context patterns for better detection
DRUG_CONTEXT_PATTERNS = [
r'(?i)(picked?\s*(it|them)\s*up|got\s*the\s*(stuff|package|goods))',
r'(?i)(meet\s*(at|near|behind)|behind\s*the\s*(metro|station))',
r'(?i)(too\s*risky|cops?\s*(were|are)\s*there)',
r'(?i)(same\s*source|better\s*this\s*time)',
r'(?i)(payment|pay|crypto|money|cash)\s*(through|via|using)',
r'(?i)(bringing|getting|delivery)',
r'(?i)(saturday|party|rave)',
r'(?i)(mumbai|supplier)',
r'(?i)(straight\s*from|coming\s*from)'
]
@st.cache_resource
def load_whisper_model():
"""Load Whisper model using Transformers pipeline - HF Spaces compatible"""
try:
device = 0 if torch.cuda.is_available() else -1
transcriber = pipeline(
"automatic-speech-recognition",
model="openai/whisper-tiny.en",
device=device,
return_timestamps=False
)
logger.info("Loaded Whisper model via transformers pipeline")
return transcriber
except Exception as e:
logger.error(f"Failed to load Whisper model: {e}")
st.error("Failed to load speech recognition model. Please contact system administrator.")
st.stop()
def transcribe_audio_production(model, audio_path: str, progress_callback=None) -> str:
"""Transcribe audio using transformers pipeline"""
try:
if progress_callback:
progress_callback(50, "Transcribing audio...")
result = model(audio_path)
transcription = result.get("text", "").strip()
if progress_callback:
progress_callback(80, "Transcription completed")
if not transcription:
logger.warning("Empty transcription result")
return ""
logger.info(f"Transcription completed: {len(transcription)} characters")
return transcription
except Exception as e:
logger.error(f"Transcription failed: {e}")
raise e
import re
from sentence_transformers import SentenceTransformer, util
# Load sentence embedding model (for semantic similarity)
model = SentenceTransformer('all-MiniLM-L6-v2')
def simulate_conversation_voice_based(transcribed_text: str, similarity_threshold: float = 0.7) -> str:
"""
Enhanced voice-based conversation simulation.
Detects speaker changes using:
- Heuristic patterns
- Punctuation and prosody
- Semantic similarity between sentences
"""
if not transcribed_text:
return ""
# Split transcript into sentences
sentences = re.split(r'(?<=[?.!…])\s+', transcribed_text.strip())
# Voice change patterns (expanded fillers, discourse markers, interjections)
voice_change_patterns = [
r'\b(yeah|yes|okay|alright|sure|no|nah|uh|um|hmm|oh)\b',
r'\b(but|however|actually|wait|hold on|well|anyway|so|then|now|listen|look|see|hey)\b',
r'\b(what|how|when|where|why|who)\b',
r'\b(i think|i mean|you know|like|I guess|maybe)\b'
]
# Compile regex once for speed
voice_change_regex = re.compile("|".join(voice_change_patterns), re.IGNORECASE)
convo_lines = []
current_speaker = "Speaker_A"
for i, sentence in enumerate(sentences):
if not sentence.strip():
continue
speaker_change = False
# Heuristic: voice change indicators
if voice_change_regex.search(sentence):
if i > 0 and convo_lines:
if re.match(r'^\s*(yeah|yes|okay|but|what|no|hey|well|listen|um|uh|oh)', sentence.strip(), re.IGNORECASE):
speaker_change = True
# Additional heuristics
if i > 0 and not speaker_change:
prev_sentence = sentences[i-1] if i-1 < len(sentences) else ""
# Punctuation-based: previous sentence ends with question, current starts like an answer
if prev_sentence.endswith("?") and re.match(r'^\s*(yeah|yes|no|sure|maybe|i|uh|um)', sentence.strip(), re.IGNORECASE):
speaker_change = True
# Semantic similarity check
try:
sim = util.cos_sim(model.encode(prev_sentence), model.encode(sentence)).item()
if sim < similarity_threshold:
speaker_change = True
except:
pass # fallback if embedding fails
# Length + transition word heuristic
if re.search(r'\b(anyway|so|well|now|then|alright|listen)\b', sentence.strip(), re.IGNORECASE) and len(sentence.split()) > 3:
speaker_change = True
# Switch speaker if change detected
if speaker_change:
current_speaker = "Speaker_B" if current_speaker == "Speaker_A" else "Speaker_A"
convo_lines.append(f"{current_speaker}: {sentence.strip()}")
return "\n".join(convo_lines)
def highlight_drug_lines_html(conversation_text: str, keywords: list) -> Tuple[str, Dict]:
"""ENHANCED version with regex word boundary matching and context-aware keyword detection"""
if not conversation_text:
return "", {}
AMBIGUOUS_TERMS = {"e", "x", "line", "ice", "horse", "420"}
def has_context_verbs(text):
"""Check if drug-related verbs are present in the text around slang keywords"""
return bool(re.search(r'\b(smoke|roll|pop|hit|take|buy|sell|party|snort|inject)\b', text, re.IGNORECASE))
def is_keyword_in_line(line: str, kw: str) -> bool:
"""Return True if keyword found with appropriate context for ambiguous slang"""
pattern = rf'\b{re.escape(kw)}\b'
if re.search(pattern, line, re.IGNORECASE):
if kw in AMBIGUOUS_TERMS:
return has_context_verbs(line)
return True
return False
lines = conversation_text.split("\n")
line_hits = {}
highlighted_lines = []
total_keyword_matches = 0
for line in lines:
hits = []
for kw in sorted(keywords, key=len, reverse=True):
if ' ' in kw: # Multi-word keywords
pattern = rf'\b{re.escape(kw)}\b'
if re.search(pattern, line, re.IGNORECASE):
hits.append(kw)
total_keyword_matches += 1
else: # Single word keywords
if is_keyword_in_line(line, kw):
hits.append(kw)
total_keyword_matches += 1
if hits:
highlighted_lines.append(f"<p style='color:#e57373'><b>[DRUG]</b> {line}</p>")
line_hits[line] = hits
else:
highlighted_lines.append(f"<p>{line}</p>")
logger.info(f"Keyword detection: {total_keyword_matches} matches across {len(line_hits)} lines")
return "".join(highlighted_lines), line_hits
def compute_enhanced_drug_score(text: str, conversation_text: str, detected_keywords: Dict) -> Tuple[float, int, int]:
"""ENHANCED drug detection scoring"""
try:
# Count keywords from detected_keywords
high_risk_count = 0
total_keyword_count = 0
for line_keywords in detected_keywords.values():
total_keyword_count += len(line_keywords)
for kw in line_keywords:
if kw.lower() in [hr.lower() for hr in HIGH_RISK_KEYWORDS]:
high_risk_count += 1
# Check full text for missed keywords
text_lower = text.lower()
additional_high_risk = sum(1 for kw in HIGH_RISK_KEYWORDS if kw.lower() in text_lower)
additional_total = sum(1 for kw in DRUG_KEYWORDS if kw.lower() in text_lower)
# Use the higher count
high_risk_count = max(high_risk_count, additional_high_risk)
total_keyword_count = max(total_keyword_count, additional_total)
# Keyword density
total_words = len(text.split())
keyword_density = total_keyword_count / max(total_words, 1)
# Context pattern scoring
context_score = 0
matched_patterns = 0
for pattern in DRUG_CONTEXT_PATTERNS:
if re.search(pattern, text):
context_score += 0.15
matched_patterns += 1
# Enhanced scoring calculation
enhanced_score = 0
if high_risk_count > 0:
enhanced_score += min(high_risk_count * 0.4, 0.8)
enhanced_score += min(keyword_density * 2, 0.2)
enhanced_score += min(context_score, 0.5)
enhanced_score = min(enhanced_score, 1.0)
logger.info(f"Enhanced scoring - High-risk: {high_risk_count}, "
f"Total: {total_keyword_count}, "
f"Density: {keyword_density:.3f}, "
f"Context: {context_score:.3f}, "
f"Score: {enhanced_score:.3f}, "
f"Patterns: {matched_patterns}")
return enhanced_score, high_risk_count, total_keyword_count
except Exception as e:
logger.error(f"Enhanced scoring error: {e}")
return 0.0, 0, 0
def compute_multimodal_risk(pred_label: int, pred_prob: float, text: str,
simulated_text: str, detected_keywords: Dict) -> Tuple[float, int]:
"""ENHANCED multimodal risk assessment"""
try:
enhanced_score, high_risk_count, total_keyword_count = compute_enhanced_drug_score(
text, simulated_text, detected_keywords
)
# Weighting logic
if high_risk_count >= 1:
model_weight, keyword_weight = 0.2, 0.8
decision_reason = f"High-risk keywords detected (count={high_risk_count})"
elif total_keyword_count >= 3:
model_weight, keyword_weight = 0.3, 0.7
decision_reason = f"Strong keyword evidence (count={total_keyword_count})"
elif high_risk_count >= 1 or total_keyword_count >= 2:
model_weight, keyword_weight = 0.4, 0.6
decision_reason = f"Moderate keyword evidence"
else:
model_weight, keyword_weight = 0.7, 0.3
decision_reason = f"Relying on ML model"
# Score combination
risk_score = (model_weight * pred_prob) + (keyword_weight * enhanced_score)
# Decision logic
if high_risk_count >= 1:
adjusted_pred_label = 1
final_reason = f"DRUG - High-risk keywords: {high_risk_count}"
elif enhanced_score >= 0.4:
adjusted_pred_label = 1
final_reason = f"DRUG - Strong keyword evidence: {enhanced_score:.3f}"
elif enhanced_score >= 0.3 and pred_prob >= 0.2:
adjusted_pred_label = 1
final_reason = f"DRUG - Combined evidence: enhanced={enhanced_score:.3f}, ml={pred_prob:.3f}"
elif pred_prob >= config.THRESHOLD:
adjusted_pred_label = 1
final_reason = f"DRUG - High ML confidence: {pred_prob:.3f}"
else:
adjusted_pred_label = 0
final_reason = f"NON_DRUG - Low confidence: enhanced={enhanced_score:.3f}, ml={pred_prob:.3f}"
# Risk score adjustment
if adjusted_pred_label == 1 and risk_score < 0.5:
risk_score = max(risk_score, 0.6)
logger.info(f"Risk assessment - {final_reason}, final_risk={risk_score:.4f}")
return min(max(risk_score, 0.0), 1.0), adjusted_pred_label
except Exception as e:
logger.error(f"Risk assessment error: {e}")
return 0.5, 0
def is_valid_audio(file_path) -> bool:
"""Check if the file is a valid audio by inspecting metadata"""
try:
info = mediainfo(file_path)
return info.get("duration") is not None
except:
return False
def estimate_processing_time(audio_path):
"""Estimate processing time based on audio duration"""
try:
info = mediainfo(audio_path)
duration_seconds = float(info.get("duration", 0))
transcription_time = max(duration_seconds * 0.25, 5)
analysis_time = 5
total_time = transcription_time + analysis_time
return {
"total": int(total_time),
"transcription": int(transcription_time),
"analysis": analysis_time
}
except:
return {"total": 30, "transcription": 25, "analysis": 5}
def show_activity_indicator():
"""Show that system is active during long operations"""
activity_messages = [
"🧠 AI models are thinking...",
"πŸ” Analyzing speech patterns...",
"πŸ“Š Computing risk scores...",
"🎯 Detecting keywords...",
"⚑ Almost done...",
]
import random
return random.choice(activity_messages)
def main():
"""Production main application"""
# Initialize variables early
uploaded_file = None
audio_path = None
try:
# Initialize production logging
setup_production_logging()
# Page configuration
st.set_page_config(
page_title="🚨 Drug Audio Analyzer",
layout="wide",
initial_sidebar_state="collapsed"
)
st.title("🚨 Audio-Based Drug Conversation Detection System")
st.markdown(
"This AI powered system analyzes uploaded conversations to detect potential drug-related content, "
"highlight risk keywords, and provide actionable insights to the Karnataka Police."
)
# Initialize models with progress tracking
init_progress = st.progress(0)
init_status = st.empty()
init_status.text("Step 1/3: Validating model files...")
init_progress.progress(33)
# Model validation
model_available, model_msg = model_manager.validate_model_availability()
if model_available:
init_status.text("Step 2/3: Model validation successful")
init_progress.progress(66)
else:
init_status.error(f"❌ Model validation failed: {model_msg}")
st.stop()
init_status.text("Step 3/3: System ready for audio processing")
init_progress.progress(100)
# Clear initialization progress
init_progress.empty()
init_status.empty()
# Sidebar with system info
with st.sidebar:
st.success("βœ… System Status: Operational")
if st.button("πŸ—‘οΈ Clear System Cache"):
st.cache_resource.clear()
st.success("Cache cleared successfully!")
# File Input Section
st.subheader("πŸŽ™ Select Audio Source")
st.info(f"🎡 Formats: {', '.join(config.ALLOWED_EXTENSIONS)}")
st.info(f"⏱️ Max duration: {config.MAX_AUDIO_DURATION//60} minutes")
input_option = st.radio(
"Choose audio input:",
["Upload your own file", "Use sample test file"]
)
if input_option == "Upload your own file":
uploaded_file = st.file_uploader(
"πŸ“‚ Upload an audio file",
type=None,
help="All audio formats supported (wav, mp3, m4a, flac, ogg, etc.)"
)
if uploaded_file:
# Check file size
file_size_mb = uploaded_file.size / (1024 * 1024)
if file_size_mb > config.MAX_FILE_SIZE_MB:
st.error(f"❌ File too large: {file_size_mb:.2f} MB. Max allowed is {config.MAX_FILE_SIZE_MB} MB.")
st.stop()
# Validate file
file_valid, file_msg = AudioValidator.validate_file(uploaded_file)
if not file_valid:
st.error(f"❌ {file_msg}")
logger.warning(f"File validation failed: {file_msg}")
st.stop()
st.success(f"βœ… {file_msg}")
# Create temp file and validate audio
audio_path = file_manager.create_secure_temp_file(uploaded_file)
if not is_valid_audio(audio_path):
st.error("❌ Uploaded file is not a valid audio")
st.stop()
elif input_option == "Use sample test file":
sample_dir = "data/audio_sample"
if os.path.exists(sample_dir):
sample_files = [
f for f in os.listdir(sample_dir)
if f.lower().endswith((".wav", ".mp3", ".flac", ".ogg", ".m4a"))
]
if sample_files:
sample_files_display = ["-- Select a sample file --"]
for f in sample_files:
file_path = os.path.join(sample_dir, f)
size_mb = os.path.getsize(file_path) / (1024 * 1024)
sample_files_display.append(f"{f} ({size_mb:.2f} MB)")
selected_sample = st.selectbox("🎡 Choose a sample test file:", sample_files_display)
if selected_sample != "-- Select a sample file --":
selected_file = selected_sample.split(" (")[0]
audio_path = os.path.join(sample_dir, selected_file)
file_size_mb = os.path.getsize(audio_path) / (1024 * 1024)
if file_size_mb > config.MAX_FILE_SIZE_MB:
st.warning(f"⚠️ This sample file exceeds the max allowed size ({config.MAX_FILE_SIZE_MB} MB).")
else:
st.warning("⚠️ No sample files found.")
else:
st.error(f"❌ Sample folder not found: {sample_dir}")
# Check if audio file is selected
if not audio_path:
st.info("Please upload a file or select a sample test file to continue.")
st.stop()
# Audio player and file info
st.audio(audio_path)
file_info = os.path.getsize(audio_path) / (1024 * 1024)
# Get the correct filename to display
if uploaded_file:
display_filename = uploaded_file.name
else:
display_filename = os.path.basename(audio_path)
# Start processing button
if st.button("πŸš€ Start Audio Analysis", type="primary", use_container_width=True):
st.balloons()
# Create processing stages
st.markdown("---")
st.markdown("### πŸ”„ Audio Processing Pipeline")
# Stage 1: Model Loading
with st.container():
stage1_col1, stage1_col2 = st.columns([1, 4])
with stage1_col1:
st.markdown("**Stage 1:**")
with stage1_col2:
with st.spinner("Loading Whisper speech recognition model..."):
model = load_whisper_model()
st.success("βœ… Speech recognition model loaded successfully")
# Stage 2: Audio Transcription
with st.container():
stage2_col1, stage2_col2 = st.columns([1, 4])
with stage2_col1:
st.markdown("**Stage 2:**")
with stage2_col2:
transcription_container = st.empty()
transcription_container.info("🎀 Starting audio transcription...")
progress_bar = st.progress(0)
status_text = st.empty()
start_time = time.time()
def update_progress(pct, message=""):
elapsed = time.time() - start_time
estimated_total = elapsed / (pct/100) if pct > 0 else 0
remaining = max(0, estimated_total - elapsed)
progress_bar.progress(pct)
status_text.text(f"πŸ”Ή {message} ({pct}%, ETA ~{int(remaining)}s)")
status_text.text("πŸ”Ή Preparing audio for transcription...")
progress_bar.progress(10)
time.sleep(0.5)
status_text.text("πŸ”Ή Running speech-to-text analysis...")
progress_bar.progress(30)
transcription = transcribe_audio_production(model, audio_path, progress_callback=update_progress)
progress_bar.empty()
status_text.empty()
transcription_container.success(f"βœ… Transcription completed ({len(transcription)} characters)")
# Show transcription results
if transcription:
st.markdown("### πŸ“ Transcription Results")
# Raw transcription
with st.expander("View Raw Transcription", expanded=True):
st.text_area("Transcribed Text:", value=transcription, height=100, disabled=True)
# Generate voice-based conversation simulation
st.info("🎀 Analyzing speech patterns to identify potential speakers...")
with st.spinner("Processing speaker analysis..."):
simulated_text = simulate_conversation_voice_based(transcription)
if simulated_text:
with st.expander("View Voice-Based Speaker Analysis", expanded=False):
st.text_area("Speaker Analysis:", value=simulated_text, height=150, disabled=True)
st.caption("🧠 AI-detected speaker changes based on speech patterns, tone indicators, and conversational cues")
# Compute speaker statistics
lines = simulated_text.split('\n')
speaker_a_lines = sum(1 for line in lines if line.startswith('Speaker_A:'))
speaker_b_lines = sum(1 for line in lines if line.startswith('Speaker_B:'))
col1, col2 = st.columns(2)
with col1:
st.metric("πŸŽ™οΈ Speaker A Lines", speaker_a_lines)
with col2:
st.metric("πŸŽ™οΈ Speaker B Lines", speaker_b_lines)
if speaker_b_lines > 0:
st.success("βœ… Multiple speakers detected in conversation")
else:
st.info("ℹ️ Single speaker detected (monologue)")
else:
# Fallback if voice-based detection fails
simulated_text = transcription # Use raw transcription as fallback
st.warning("⚠️ Voice-based speaker detection failed. Using raw transcription for analysis.")
else:
st.error("⚠️ No transcription produced. Please check the audio file.")
st.stop()
# Stage 3: Analysis
with st.container():
stage4_col1, stage4_col2 = st.columns([1, 4])
with stage4_col1:
st.markdown("**Stage 3:**")
with stage4_col2:
analysis_container = st.empty()
analysis_container.info("🧠 Running AI analysis and keyword detection...")
analysis_steps = st.empty()
analysis_steps.text("β†’ Running ML model prediction...")
pred_label, raw_prob = predict(transcription)
analysis_steps.text("β†’ Using voice-based conversation analysis...")
analysis_steps.text("β†’ Detecting drug-related keywords...")
highlighted_html, detected_keywords = highlight_drug_lines_html(simulated_text, DRUG_KEYWORDS)
analysis_steps.text("β†’ Computing risk assessment...")
risk_score, adjusted_prediction = compute_multimodal_risk(
pred_label, raw_prob, transcription, simulated_text, detected_keywords
)
analysis_steps.empty()
analysis_container.success("βœ… Analysis completed successfully")
st.markdown("---")
st.success("πŸŽ‰ **Processing Complete!** Results are shown below.")
# Enhanced Analysis Section
st.subheader("πŸ” Enhanced Analysis")
enhanced_score, high_risk_count, total_keyword_count = compute_enhanced_drug_score(
transcription, simulated_text, detected_keywords
)
st.write(f"**High-Risk Keywords Detected:** {high_risk_count}")
st.write(f"**Total Drug Keywords Detected:** {total_keyword_count}")
st.write(f"**Enhanced Drug Score:** {enhanced_score:.2f}/1.0")
# Results presentation
st.markdown("---")
st.subheader("πŸ“Š Analysis Results")
# Main result display
if adjusted_prediction == 1:
st.markdown(
"""
<div style='padding: 1.5rem; background: linear-gradient(90deg, #ffebee 0%, #ffcdd2 100%);
border-left: 6px solid #d32f2f; border-radius: 8px; margin: 1rem 0;'>
<h2 style='color: #c62828; margin: 0; display: flex; align-items: center;'>
🚨 DRUG-RELATED CONTENT DETECTED
</h2>
<p style='margin: 0.5rem 0 0 0; color: #5d4037; font-size: 1.1rem;'>
<strong>High-confidence detection of drug-related conversation patterns</strong>
</p>
</div>
""",
unsafe_allow_html=True
)
# Confidence assessment
if enhanced_score >= 0.6:
confidence_level = "HIGH"
confidence_color = "red"
elif enhanced_score >= 0.3:
confidence_level = "MEDIUM"
confidence_color = "orange"
else:
confidence_level = "LOW"
confidence_color = "yellow"
st.markdown(f"**Confidence Level:** <span style='color: {confidence_color}; font-weight: bold;'>{confidence_level}</span>",
unsafe_allow_html=True)
else:
st.markdown(
"""
<div style='padding: 1.5rem; background: linear-gradient(90deg, #e8f5e8 0%, #c8e6c9 100%);
border-left: 6px solid #388e3c; border-radius: 8px; margin: 1rem 0;'>
<h2 style='color: #2e7d32; margin: 0; display: flex; align-items: center;'>
βœ… NO DRUG CONTENT DETECTED
</h2>
<p style='margin: 0.5rem 0 0 0; color: #2d5016; font-size: 1.1rem;'>
<strong>Conversation appears to be non-drug related</strong>
</p>
</div>
""",
unsafe_allow_html=True
)
# Metrics dashboard
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric(
"πŸ€– ML Model Analysis",
f"{raw_prob:.1%}",
f"{'Drug' if pred_label == 1 else 'Non-Drug'}"
)
with col2:
st.metric(
"🎯 Enhanced Score Analysis",
f"{enhanced_score:.1%}",
f"{high_risk_count} high-risk"
)
with col3:
st.metric(
"⚠️ Risk Level",
f"{risk_score:.1%}",
"πŸ”΄ CRITICAL" if risk_score >= 0.7 else
"🟠 HIGH" if risk_score >= 0.5 else
"🟑 MEDIUM" if risk_score >= 0.3 else "🟒 LOW"
)
with col4:
st.metric(
"πŸ” Keywords Found",
f"{total_keyword_count}",
f"{len(detected_keywords)} flagged lines"
)
# Drug highlights section
if adjusted_prediction == 1:
st.subheader("πŸ’‘ Drug-Related Lines Highlighted")
st.markdown(highlighted_html, unsafe_allow_html=True)
if detected_keywords:
st.subheader("πŸ” Detected Keywords per Line")
for line, kws in detected_keywords.items():
high_risk_kws = [kw for kw in kws if kw.lower() in [hr.lower() for hr in HIGH_RISK_KEYWORDS]]
regular_kws = [kw for kw in kws if kw not in high_risk_kws]
display_text = f"**Line:** `{line}`\n"
if high_risk_kws:
display_text += f"🚨 **High-Risk Keywords:** {', '.join(high_risk_kws)}\n"
if regular_kws:
display_text += f"⚠️ **Other Keywords:** {', '.join(regular_kws)}"
st.markdown(display_text)
# Final Risk Assessment section
st.subheader("🚨 Final Risk Assessment")
st.write(f"**Overall Risk Score:** {risk_score:.2f}/1.0")
# Determine risk level
if risk_score >= 0.7:
risk_level = "πŸ”΄ **CRITICAL RISK**"
elif risk_score >= 0.5:
risk_level = "🟠 **HIGH RISK**"
elif risk_score >= 0.3:
risk_level = "🟑 **MEDIUM RISK**"
else:
risk_level = "🟒 **LOW RISK**"
st.markdown(f"**Risk Level:** {risk_level}")
# Show comparison between ML and enhanced prediction
if pred_label != adjusted_prediction:
st.info(f"πŸ”„ **Prediction Adjusted**: ML model predicted {'DRUG' if pred_label == 1 else 'NON_DRUG'}, "
f"but enhanced analysis adjusted it to {'DRUG' if adjusted_prediction == 1 else 'NON_DRUG'}")
# System analysis summary
st.markdown("---")
st.subheader("πŸ“ˆ Analysis Summary")
# Create summary dataframe
summary_data = {
"Analysis Component": [
"ML Model Prediction",
"Enhanced Prediction",
"Overall Risk Score",
"High-Risk Keywords",
"Total Keywords Detected",
"Flagged Conversation Lines",
"Processing Status"
],
"Result": [
f"{'DRUG' if pred_label == 1 else 'NON_DRUG'} ({raw_prob:.1%} confidence)",
f"{'DRUG' if adjusted_prediction == 1 else 'NON_DRUG'}",
f"{risk_score:.1%} ({'CRITICAL' if risk_score >= 0.7 else 'HIGH' if risk_score >= 0.5 else 'MEDIUM' if risk_score >= 0.3 else 'LOW'})",
str(high_risk_count),
str(total_keyword_count),
str(len(detected_keywords)),
"βœ… Complete"
]
}
summary_df = pd.DataFrame(summary_data)
st.dataframe(summary_df, use_container_width=True, hide_index=True)
# API integration for drug content
if adjusted_prediction == 1:
analysis_data = {
"type": "audio_analysis",
"filename": uploaded_file.name if uploaded_file else "sample_file",
"prediction": "DRUG",
"confidence": risk_score,
"keywords_detected": total_keyword_count,
"timestamp": datetime.now().isoformat()
}
send_analysis_to_api(analysis_data)
# Download analysis report
if adjusted_prediction == 1:
st.markdown("---")
st.markdown("### πŸ“₯ Export Analysis Report")
# Create detailed report
report_data = {
"timestamp": [pd.Timestamp.now()],
"filename": [uploaded_file.name if uploaded_file else "sample_file"],
"file_size_mb": [uploaded_file.size / (1024*1024) if uploaded_file else file_info],
"ml_prediction": ["DRUG" if pred_label == 1 else "NON_DRUG"],
"ml_confidence": [raw_prob],
"enhanced_prediction": ["DRUG" if adjusted_prediction == 1 else "NON_DRUG"],
"risk_score": [risk_score],
"high_risk_keywords": [high_risk_count],
"total_keywords": [total_keyword_count],
"flagged_lines": [len(detected_keywords)],
"transcription_length": [len(transcription)]
}
report_df = pd.DataFrame(report_data)
csv_data = report_df.to_csv(index=False).encode("utf-8")
st.download_button(
label="πŸ“„ Download Analysis Report (CSV)",
data=csv_data,
file_name=f"drug_analysis_report_{pd.Timestamp.now().strftime('%Y%m%d_%H%M%S')}.csv",
mime="text/csv"
)
# Debug section
with st.expander("πŸ› Debug Information (Click to expand)"):
st.write("**Text being analyzed:**")
st.code(transcription)
detected_keywords_full = [kw for kw in DRUG_KEYWORDS if kw.lower() in transcription.lower()]
detected_high_risk = [kw for kw in HIGH_RISK_KEYWORDS if kw.lower() in transcription.lower()]
st.write(f"**All keywords found in full text:** {detected_keywords_full}")
st.write(f"**High-risk keywords found:** {detected_high_risk}")
st.write(f"**Line-by-line detection:** {detected_keywords}")
# Check context patterns
matched_contexts = []
for pattern in DRUG_CONTEXT_PATTERNS:
if re.search(pattern, transcription):
matched_contexts.append(pattern)
st.write(f"**Context patterns matched:** {len(matched_contexts)}")
except Exception as e:
logger.error(f"Processing error: {e}")
logger.error(traceback.format_exc())
st.error(f"❌ Processing failed: {str(e)}")
st.error("Please check the logs for more details or contact the system administrator.")
finally:
# Cleanup temporary file
try:
if uploaded_file and audio_path and not file_manager.is_sample_file(audio_path):
file_manager.cleanup_file(audio_path, is_temp=True)
except Exception as e:
logger.warning(f"Failed to delete temporary file {audio_path}: {e}")
if __name__ == "__main__":
main()