Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import pandas as pd | |
| import numpy as np | |
| from sentence_transformers import SentenceTransformer | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| import torch | |
| import json | |
| import os | |
| import glob | |
| from pathlib import Path | |
| from datetime import datetime, timedelta | |
| import edge_tts | |
| import asyncio | |
| import requests | |
| from collections import defaultdict | |
| from audio_recorder_streamlit import audio_recorder | |
| import streamlit.components.v1 as components | |
| from urllib.parse import quote | |
| from xml.etree import ElementTree as ET | |
| from datasets import load_dataset | |
| # π§ Initialize session state variables | |
| SESSION_VARS = { | |
| 'search_history': [], # Track search history | |
| 'last_voice_input': "", # Last voice input | |
| 'transcript_history': [], # Conversation history | |
| 'should_rerun': False, # Trigger for UI updates | |
| 'search_columns': [], # Available search columns | |
| 'initial_search_done': False, # First search flag | |
| 'tts_voice': "en-US-AriaNeural", # Default voice | |
| 'arxiv_last_query': "", # Last ArXiv search | |
| 'dataset_loaded': False, # Dataset load status | |
| 'current_page': 0, # Current data page | |
| 'data_cache': None, # Data cache | |
| 'dataset_info': None, # Dataset metadata | |
| 'nps_submitted': False, # Track if user submitted NPS | |
| 'nps_last_shown': None, # When NPS was last shown | |
| 'voice_recorder_key': str(datetime.now()) # Unique key for voice recorder | |
| } | |
| # π Constants | |
| ROWS_PER_PAGE = 100 | |
| MIN_SEARCH_SCORE = 0.3 | |
| EXACT_MATCH_BOOST = 2.0 | |
| # Initialize session state | |
| for var, default in SESSION_VARS.items(): | |
| if var not in st.session_state: | |
| st.session_state[var] = default | |
| class NPSTracker: | |
| """π― Net Promoter Score Tracker - Measuring happiness in numbers!""" | |
| def __init__(self, log_file="nps_logs.csv"): | |
| self.log_file = Path(log_file) | |
| self.initialize_log() | |
| def initialize_log(self): | |
| """π Create log file if it doesn't exist""" | |
| if not self.log_file.exists(): | |
| df = pd.DataFrame(columns=['timestamp', 'score', 'feedback']) | |
| df.to_csv(self.log_file, index=False) | |
| def log_response(self, score, feedback=""): | |
| """ποΈ Log new NPS response""" | |
| new_entry = pd.DataFrame([{ | |
| 'timestamp': datetime.now().isoformat(), | |
| 'score': score, | |
| 'feedback': feedback | |
| }]) | |
| if self.log_file.exists(): | |
| df = pd.read_csv(self.log_file) | |
| df = pd.concat([df, new_entry], ignore_index=True) | |
| else: | |
| df = new_entry | |
| df.to_csv(self.log_file, index=False) | |
| def get_nps_stats(self, days=30): | |
| """π Calculate NPS stats for recent period""" | |
| if not self.log_file.exists(): | |
| return { | |
| 'nps_score': 0, | |
| 'promoters': 0, | |
| 'passives': 0, | |
| 'detractors': 0, | |
| 'total_responses': 0, | |
| 'recent_feedback': [] | |
| } | |
| df = pd.read_csv(self.log_file) | |
| df['timestamp'] = pd.to_datetime(df['timestamp']) | |
| cutoff = datetime.now() - timedelta(days=days) | |
| recent_df = df[df['timestamp'] > cutoff] | |
| if len(recent_df) == 0: | |
| return { | |
| 'nps_score': 0, | |
| 'promoters': 0, | |
| 'passives': 0, | |
| 'detractors': 0, | |
| 'total_responses': 0, | |
| 'recent_feedback': [] | |
| } | |
| total = len(recent_df) | |
| promoters = len(recent_df[recent_df['score'] >= 9]) | |
| passives = len(recent_df[recent_df['score'].between(7, 8)]) | |
| detractors = len(recent_df[recent_df['score'] <= 6]) | |
| nps = ((promoters/total) - (detractors/total)) * 100 | |
| recent_feedback = recent_df[recent_df['feedback'].notna()].sort_values( | |
| 'timestamp', ascending=False | |
| )['feedback'].head(5).tolist() | |
| return { | |
| 'nps_score': round(nps, 1), | |
| 'promoters': promoters, | |
| 'passives': passives, | |
| 'detractors': detractors, | |
| 'total_responses': total, | |
| 'recent_feedback': recent_feedback | |
| } | |
| def setup_voice_recorder(): | |
| """π€ Create an in-browser voice recorder component""" | |
| return components.html( | |
| """ | |
| <div style="display: flex; flex-direction: column; align-items: center; gap: 10px;"> | |
| <button id="startButton" | |
| style="padding: 10px 20px; background: #ff4b4b; color: white; border: none; border-radius: 5px; cursor: pointer"> | |
| Start Recording | |
| </button> | |
| <button id="stopButton" | |
| style="padding: 10px 20px; background: #4b4bff; color: white; border: none; border-radius: 5px; cursor: pointer" | |
| disabled> | |
| Stop Recording | |
| </button> | |
| <audio id="audioPlayback" controls style="display: none;"></audio> | |
| <div id="statusText" style="color: #666;">Ready to record...</div> | |
| </div> | |
| <script> | |
| let mediaRecorder; | |
| let audioChunks = []; | |
| document.getElementById('startButton').onclick = async () => { | |
| try { | |
| const stream = await navigator.mediaDevices.getUserMedia({ audio: true }); | |
| mediaRecorder = new MediaRecorder(stream); | |
| mediaRecorder.ondataavailable = (e) => { | |
| audioChunks.push(e.data); | |
| }; | |
| mediaRecorder.onstop = () => { | |
| const audioBlob = new Blob(audioChunks, { type: 'audio/wav' }); | |
| const audioUrl = URL.createObjectURL(audioBlob); | |
| document.getElementById('audioPlayback').src = audioUrl; | |
| document.getElementById('audioPlayback').style.display = 'block'; | |
| // Send to Python | |
| const reader = new FileReader(); | |
| reader.readAsDataURL(audioBlob); | |
| reader.onloadend = () => { | |
| window.parent.postMessage({ | |
| type: 'voiceData', | |
| data: reader.result | |
| }, '*'); | |
| }; | |
| }; | |
| mediaRecorder.start(); | |
| document.getElementById('startButton').disabled = true; | |
| document.getElementById('stopButton').disabled = false; | |
| document.getElementById('statusText').textContent = 'Recording...'; | |
| } catch (err) { | |
| console.error('Error:', err); | |
| document.getElementById('statusText').textContent = 'Error: ' + err.message; | |
| } | |
| }; | |
| document.getElementById('stopButton').onclick = () => { | |
| mediaRecorder.stop(); | |
| document.getElementById('startButton').disabled = false; | |
| document.getElementById('stopButton').disabled = true; | |
| document.getElementById('statusText').textContent = 'Recording complete!'; | |
| }; | |
| </script> | |
| """, | |
| height=200, | |
| ) | |
| def render_nps_sidebar(): | |
| """π¨ Show NPS metrics in sidebar""" | |
| tracker = NPSTracker() | |
| stats = tracker.get_nps_stats() | |
| st.sidebar.markdown("### π User Satisfaction Metrics") | |
| score_color = ( | |
| "π’" if stats['nps_score'] >= 50 else | |
| "π‘" if stats['nps_score'] >= 0 else | |
| "π΄" | |
| ) | |
| st.sidebar.metric( | |
| "Net Promoter Score", | |
| f"{score_color} {stats['nps_score']}" | |
| ) | |
| st.sidebar.markdown("#### Response Breakdown") | |
| col1, col2, col3 = st.sidebar.columns(3) | |
| with col1: | |
| st.metric("π", stats['promoters']) | |
| with col2: | |
| st.metric("π", stats['passives']) | |
| with col3: | |
| st.metric("π", stats['detractors']) | |
| if stats['recent_feedback']: | |
| st.sidebar.markdown("#### Recent Feedback") | |
| for feedback in stats['recent_feedback']: | |
| st.sidebar.info(feedback[:100] + "..." if len(feedback) > 100 else feedback) | |
| def render_nps_survey(): | |
| """π― Show NPS survey form""" | |
| tracker = NPSTracker() | |
| st.markdown("### π Your Feedback Matters!") | |
| score = st.slider( | |
| "How likely are you to recommend this search tool to others?", | |
| 0, 10, | |
| help="0 = Not likely at all, 10 = Extremely likely" | |
| ) | |
| feedback = st.text_area("Additional feedback (optional)") | |
| if st.button("Submit Feedback", key="nps_submit"): | |
| tracker.log_response(score, feedback) | |
| st.session_state['nps_submitted'] = True | |
| st.success("Thank you for your feedback! π") | |
| st.experimental_rerun() | |
| [... Rest of your existing code for search functionality ...] | |
| def main(): | |
| st.title("π₯ Smart Video Search with Voice & Feedback") | |
| # Initialize search | |
| search = VideoSearch() | |
| # Add NPS metrics to sidebar | |
| with st.sidebar: | |
| render_nps_sidebar() | |
| # Show survey periodically | |
| current_time = datetime.now() | |
| if (not st.session_state.get('nps_submitted') and | |
| (not st.session_state.get('nps_last_shown') or | |
| current_time - st.session_state['nps_last_shown'] > timedelta(hours=24))): | |
| with st.expander("π Quick Feedback", expanded=True): | |
| render_nps_survey() | |
| st.session_state['nps_last_shown'] = current_time | |
| # Create main tabs | |
| tab1, tab2, tab3, tab4 = st.tabs([ | |
| "π Search", "ποΈ Voice Input", "π ArXiv", "π Files" | |
| ]) | |
| # Search Tab | |
| with tab1: | |
| st.subheader("Search Videos") | |
| col1, col2 = st.columns([3, 1]) | |
| with col1: | |
| query = st.text_input("Enter search query:", | |
| value="" if st.session_state['initial_search_done'] else "aliens") | |
| with col2: | |
| search_column = st.selectbox("Search in:", | |
| ["All Fields"] + st.session_state['search_columns']) | |
| col3, col4 = st.columns(2) | |
| with col3: | |
| num_results = st.slider("Max results:", 1, 100, 20) | |
| with col4: | |
| search_button = st.button("π Search") | |
| if (search_button or not st.session_state['initial_search_done']) and query: | |
| st.session_state['initial_search_done'] = True | |
| selected_column = None if search_column == "All Fields" else search_column | |
| with st.spinner("Searching..."): | |
| results = search.search(query, selected_column, num_results) | |
| if results: | |
| st.session_state['search_history'].append({ | |
| 'query': query, | |
| 'timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
| 'results': results[:5] | |
| }) | |
| st.write(f"Found {len(results)} results:") | |
| for i, result in enumerate(results, 1): | |
| with st.expander(f"Result {i}", expanded=(i==1)): | |
| render_result(result) | |
| else: | |
| st.warning("No matching results found.") | |
| # Voice Input Tab | |
| with tab2: | |
| st.subheader("Voice Search") | |
| st.write("ποΈ Record your query:") | |
| voice_recorder = setup_voice_recorder() | |
| if 'voice_data' in st.session_state: | |
| with st.spinner("Processing voice..."): | |
| voice_query = transcribe_audio(st.session_state['voice_data']) | |
| st.markdown("**Transcribed Text:**") | |
| st.write(voice_query) | |
| if st.button("π Search with Voice"): | |
| results = search.search(voice_query, None, 20) | |
| for i, result in enumerate(results, 1): | |
| with st.expander(f"Result {i}", expanded=(i==1)): | |
| render_result(result) | |
| # ArXiv Tab | |
| with tab3: | |
| st.subheader("ArXiv Search") | |
| arxiv_query = st.text_input("Search ArXiv:", value=st.session_state['arxiv_last_query']) | |
| vocal_summary = st.checkbox("π Quick Audio Summary", value=True) | |
| titles_summary = st.checkbox("π Titles Only", value=True) | |
| full_audio = st.checkbox("π Full Audio Summary", value=False) | |
| if st.button("π Search ArXiv"): | |
| st.session_state['arxiv_last_query'] = arxiv_query | |
| perform_arxiv_lookup(arxiv_query, vocal_summary, titles_summary, full_audio) | |
| # File Manager Tab | |
| with tab4: | |
| show_file_manager() | |
| if __name__ == "__main__": | |
| main() |