Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import os, json, datetime, hashlib | |
| from langchain_community.vectorstores import FAISS | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| from langchain_groq import ChatGroq | |
| from langchain.chains import LLMChain | |
| from langchain.prompts import PromptTemplate | |
| from gtts import gTTS | |
| from pathlib import Path | |
| from dotenv import load_dotenv | |
| from sentence_transformers import SentenceTransformer, util | |
| import altair as alt | |
| import speech_recognition as sr | |
| from transformers import pipeline | |
| import torch | |
| import pickle | |
| # Load environment variables | |
| load_dotenv() | |
| GROQ_API_KEY = os.getenv("GROQ_API_KEY") | |
| CRISIS_KEYWORDS = ["suicide", "kill myself", "end it all", "worthless", "can't go on", "hurt myself", "self harm", "want to disappear", "no reason to live"] | |
| # Initialize session state | |
| if "authenticated" not in st.session_state: | |
| st.session_state.authenticated = False | |
| if "username" not in st.session_state: | |
| st.session_state.username = None | |
| if "transcribed_text" not in st.session_state: | |
| st.session_state.transcribed_text = "" | |
| # User management functions | |
| def hash_password(password): | |
| """Hash password using SHA-256 with salt""" | |
| salt = "dilbot_secure_salt_2024" # we can change this | |
| return hashlib.sha256((password + salt).encode()).hexdigest() | |
| def get_secure_users_path(): | |
| """Get path to users file in a hidden directory""" | |
| secure_dir = ".secure_data" | |
| os.makedirs(secure_dir, exist_ok=True) | |
| return os.path.join(secure_dir, "users_encrypted.json") | |
| def load_users(): | |
| """Load users from secure file""" | |
| users_path = get_secure_users_path() | |
| if os.path.exists(users_path): | |
| try: | |
| with open(users_path, "r") as f: | |
| return json.load(f) | |
| except: | |
| return {} | |
| return {} | |
| def save_users(users): | |
| """Save users to secure file""" | |
| users_path = get_secure_users_path() | |
| with open(users_path, "w") as f: | |
| json.dump(users, f, indent=4) | |
| def create_user_directory(username): | |
| """Create user-specific directory structure""" | |
| user_dir = f"users/{username}" | |
| os.makedirs(user_dir, exist_ok=True) | |
| return user_dir | |
| def get_user_file_path(username, filename): | |
| """Get path to user-specific file""" | |
| user_dir = f"users/{username}" | |
| return os.path.join(user_dir, filename) | |
| def signup(username, password, email): | |
| """Register new user""" | |
| users = load_users() | |
| if username in users: | |
| return False, "Username already exists" | |
| users[username] = { | |
| "password": hash_password(password), | |
| "email": email, | |
| "created_at": str(datetime.datetime.now()) | |
| } | |
| save_users(users) | |
| create_user_directory(username) | |
| return True, "Account created successfully!" | |
| def login(username, password): | |
| """Authenticate user""" | |
| users = load_users() | |
| if username not in users: | |
| return False, "Username not found" | |
| if users[username]["password"] == hash_password(password): | |
| return True, "Login successful!" | |
| return False, "Incorrect password" | |
| # Emotion detection | |
| def load_emotion_model(): | |
| return pipeline( | |
| "text-classification", | |
| model="j-hartmann/emotion-english-distilroberta-base", | |
| top_k=1, | |
| device=-1 | |
| ) | |
| def detect_emotion(text): | |
| emotion_pipeline = load_emotion_model() | |
| prediction = emotion_pipeline(text)[0][0] | |
| return prediction['label'].lower(), prediction['score'] | |
| # Authentication UI | |
| def show_auth_page(): | |
| st.set_page_config(page_title="DilBot - Login", page_icon="π§ ") | |
| st.title("π§ DilBot - Emotional AI Companion") | |
| st.markdown("Welcome! Please login or create an account to continue.") | |
| tab1, tab2 = st.tabs(["Login", "Sign Up"]) | |
| with tab1: | |
| st.subheader("Login to Your Account") | |
| login_username = st.text_input("Username", key="login_user") | |
| login_password = st.text_input("Password", type="password", key="login_pass") | |
| if st.button("Login", key="login_btn"): | |
| if login_username and login_password: | |
| success, message = login(login_username, login_password) | |
| if success: | |
| st.session_state.authenticated = True | |
| st.session_state.username = login_username | |
| st.success(message) | |
| st.rerun() | |
| else: | |
| st.error(message) | |
| else: | |
| st.warning("Please fill in all fields") | |
| with tab2: | |
| st.subheader("Create New Account") | |
| signup_username = st.text_input("Choose Username", key="signup_user") | |
| signup_email = st.text_input("Email Address", key="signup_email") | |
| signup_password = st.text_input("Choose Password", type="password", key="signup_pass") | |
| signup_confirm = st.text_input("Confirm Password", type="password", key="signup_confirm") | |
| if st.button("Create Account", key="signup_btn"): | |
| if all([signup_username, signup_email, signup_password, signup_confirm]): | |
| if signup_password != signup_confirm: | |
| st.error("Passwords don't match!") | |
| elif len(signup_password) < 6: | |
| st.error("Password must be at least 6 characters long!") | |
| else: | |
| success, message = signup(signup_username, signup_password, signup_email) | |
| if success: | |
| st.success(message) | |
| st.info("You can now login with your credentials!") | |
| else: | |
| st.error(message) | |
| else: | |
| st.warning("Please fill in all fields") | |
| # Main app functions | |
| def build_user_vectorstore(username, quotes): | |
| """Build and save user-specific vectorstore""" | |
| embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") | |
| vectorstore = FAISS.from_texts(quotes, embedding=embeddings) | |
| # Save vectorstore for user | |
| vectorstore_path = get_user_file_path(username, "vectorstore") | |
| vectorstore.save_local(vectorstore_path) | |
| return vectorstore | |
| def load_user_vectorstore(username): | |
| """Load user-specific vectorstore""" | |
| vectorstore_path = get_user_file_path(username, "vectorstore") | |
| if os.path.exists(vectorstore_path): | |
| embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") | |
| return FAISS.load_local(vectorstore_path, embeddings, allow_dangerous_deserialization=True) | |
| return None | |
| def save_user_journal(username, user_input, emotion, score, response): | |
| """Save journal entry for specific user""" | |
| journal_path = get_user_file_path(username, "journal.json") | |
| entry = { | |
| "date": str(datetime.date.today()), | |
| "timestamp": str(datetime.datetime.now()), | |
| "user_input": user_input, | |
| "emotion": emotion, | |
| "confidence": round(score * 100, 2), | |
| "response": response | |
| } | |
| journal = [] | |
| if os.path.exists(journal_path): | |
| with open(journal_path, "r") as f: | |
| journal = json.load(f) | |
| journal.append(entry) | |
| with open(journal_path, "w") as f: | |
| json.dump(journal, f, indent=4) | |
| def load_user_journal(username): | |
| """Load journal for specific user""" | |
| journal_path = get_user_file_path(username, "journal.json") | |
| if os.path.exists(journal_path): | |
| with open(journal_path, "r") as f: | |
| return json.load(f) | |
| return [] | |
| def is_crisis(text): | |
| """Check for crisis keywords""" | |
| return any(phrase in text.lower() for phrase in CRISIS_KEYWORDS) | |
| def speak(text, username): | |
| """Generate and play audio response""" | |
| tts = gTTS(text) | |
| audio_path = get_user_file_path(username, "response.mp3") | |
| tts.save(audio_path) | |
| st.audio(audio_path, format="audio/mp3") | |
| def transcribe_audio_file(uploaded_audio): | |
| """Transcribe uploaded audio file""" | |
| recognizer = sr.Recognizer() | |
| try: | |
| with sr.AudioFile(uploaded_audio) as source: | |
| audio_data = recognizer.record(source) | |
| text = recognizer.recognize_google(audio_data) | |
| return text | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| def show_main_app(): | |
| """Main DilBot application""" | |
| username = st.session_state.username | |
| st.set_page_config(page_title="DilBot - Emotional AI", page_icon="π§ ") | |
| # Header with logout | |
| col1, col2 = st.columns([4, 1]) | |
| with col1: | |
| st.title(f"π§ DilBot - Welcome back, {username}!") | |
| st.markdown("Your personal emotional AI companion") | |
| with col2: | |
| if st.button("Logout", key="logout_btn"): | |
| st.session_state.authenticated = False | |
| st.session_state.username = None | |
| st.rerun() | |
| # Quote categories | |
| quote_categories = { | |
| "Grief": ["Grief is the price we pay for love.", "Tears are the silent language of grief.", "What we have once enjoyed we can never lose; all that we love deeply becomes a part of us."], | |
| "Motivation": ["Believe in yourself and all that you are.", "Tough times never last, but tough people do.", "The only way to do great work is to love what you do."], | |
| "Healing": ["Every wound has its own time to heal.", "It's okay to take your time to feel better.", "Healing is not linear, and that's perfectly okay."], | |
| "Relationships": ["The best relationships are built on trust.", "Love is not about possession but appreciation.", "Healthy relationships require both people to show up authentically."] | |
| } | |
| # UI for quote selection and file upload | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| selected_category = st.selectbox("π― Choose a quote theme:", list(quote_categories.keys())) | |
| with col2: | |
| uploaded_quotes = st.file_uploader("π Upload your own quotes (.txt)", type=["txt"]) | |
| uploaded_audio = st.file_uploader("π€ Upload a voice message (.wav)", type=["wav"]) | |
| # Handle vectorstore | |
| current_quotes = [] | |
| vectorstore = None | |
| if uploaded_quotes: | |
| custom_quotes = uploaded_quotes.read().decode("utf-8").splitlines() | |
| custom_quotes = [quote.strip() for quote in custom_quotes if quote.strip()] | |
| vectorstore = build_user_vectorstore(username, custom_quotes) | |
| current_quotes = custom_quotes | |
| st.success(f"β {len(custom_quotes)} custom quotes uploaded and saved!") | |
| else: | |
| default_quotes = quote_categories[selected_category] | |
| vectorstore = load_user_vectorstore(username) | |
| if vectorstore is None: | |
| vectorstore = build_user_vectorstore(username, default_quotes) | |
| current_quotes = default_quotes | |
| # Voice transcription | |
| if uploaded_audio and st.button("ποΈ Transcribe Voice"): | |
| with st.spinner("Transcribing your voice..."): | |
| transcribed = transcribe_audio_file(uploaded_audio) | |
| if transcribed.startswith("Error:"): | |
| st.error(transcribed) | |
| else: | |
| st.session_state.transcribed_text = transcribed | |
| st.success("β Voice transcribed successfully!") | |
| # Input area | |
| user_input = st.text_area( | |
| "π¬ What's on your mind?", | |
| value=st.session_state.transcribed_text, | |
| height=100, | |
| placeholder="Share your thoughts, feelings, or experiences..." | |
| ) | |
| final_input = user_input.strip() or st.session_state.transcribed_text.strip() | |
| # Main interaction button | |
| if st.button("π§ Talk to DilBot", type="primary"): | |
| if not final_input: | |
| st.warning("β οΈ Please enter something to share or upload a voice message.") | |
| else: | |
| with st.spinner("DilBot is thinking and feeling..."): | |
| # Emotion detection | |
| emotion, score = detect_emotion(final_input) | |
| # Get AI response | |
| prompt_template = PromptTemplate( | |
| input_variables=["context", "user_input", "username"], | |
| template="""You are DilBot, an empathetic emotional support AI companion for {username}. | |
| Use the following emotional quote context to respond gently, supportively, and personally. | |
| Context quotes: | |
| {context} | |
| User's message: | |
| {user_input} | |
| Respond as DilBot with warmth, empathy, and understanding. Keep it conversational and supportive.""" | |
| ) | |
| # Get similar quotes | |
| similar_docs = vectorstore.similarity_search(final_input, k=2) | |
| context = "\n".join([doc.page_content for doc in similar_docs]) | |
| # Generate response | |
| groq_llm = ChatGroq(api_key=GROQ_API_KEY, model="llama3-70b-8192") | |
| chain = LLMChain(llm=groq_llm, prompt=prompt_template) | |
| response = chain.run(context=context, user_input=final_input, username=username) | |
| # Save to user's journal | |
| save_user_journal(username, final_input, emotion, score, response) | |
| # Display results | |
| col1, col2 = st.columns([2, 1]) | |
| with col1: | |
| st.success(f"**Emotion Detected:** {emotion.capitalize()} ({round(score*100)}% confidence)") | |
| with col2: | |
| if is_crisis(final_input): | |
| st.error("π¨ Crisis detected! Please reach out to a mental health professional immediately.") | |
| # Show relevant quote | |
| if current_quotes: | |
| model = SentenceTransformer("all-MiniLM-L6-v2") | |
| quote_embeddings = model.encode(current_quotes, convert_to_tensor=True) | |
| user_embedding = model.encode(final_input, convert_to_tensor=True) | |
| sims = util.pytorch_cos_sim(user_embedding, quote_embeddings)[0] | |
| best_match = sims.argmax().item() | |
| selected_quote = current_quotes[best_match] | |
| st.info(f"π **Quote for you:** *{selected_quote}*") | |
| # Show response | |
| st.markdown("### π€ DilBot's Response:") | |
| st.markdown(f"> {response}") | |
| # Audio response | |
| speak(response, username) | |
| # Clear transcribed text after successful interaction | |
| st.session_state.transcribed_text = "" | |
| # User's personal dashboard | |
| st.markdown("---") | |
| st.header("π Your Personal Dashboard") | |
| # Load user's journal | |
| journal_data = load_user_journal(username) | |
| if journal_data: | |
| # Mood tracker | |
| st.subheader("π Your Daily Mood Tracker") | |
| # Prepare data for chart | |
| df_data = [] | |
| for entry in journal_data: | |
| df_data.append({ | |
| "date": entry["date"], | |
| "emotion": entry["emotion"].capitalize(), | |
| "confidence": entry["confidence"] | |
| }) | |
| if df_data: | |
| chart = alt.Chart(alt.Data(values=df_data)).mark_bar().encode( | |
| x=alt.X('date:N', title='Date'), | |
| y=alt.Y('count():Q', title='Frequency'), | |
| color=alt.Color('emotion:N', title='Emotion'), | |
| tooltip=['date:N', 'emotion:N', 'count():Q'] | |
| ).properties( | |
| width=600, | |
| height=300, | |
| title="Your Emotional Journey Over Time" | |
| ) | |
| st.altair_chart(chart, use_container_width=True) | |
| # Recent conversations | |
| st.subheader("π¬ Recent Conversations") | |
| recent_entries = journal_data[-5:] if len(journal_data) >= 5 else journal_data | |
| for i, entry in enumerate(reversed(recent_entries)): | |
| with st.expander(f"π {entry['date']} - {entry['emotion'].capitalize()} ({entry['confidence']}%)"): | |
| st.markdown(f"**You said:** {entry['user_input']}") | |
| st.markdown(f"**DilBot replied:** {entry['response']}") | |
| # Statistics | |
| st.subheader("π Your Emotional Statistics") | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| st.metric("Total Conversations", len(journal_data)) | |
| with col2: | |
| emotions = [entry['emotion'] for entry in journal_data] | |
| most_common = max(set(emotions), key=emotions.count) if emotions else "None" | |
| st.metric("Most Common Emotion", most_common.capitalize()) | |
| with col3: | |
| if journal_data: | |
| avg_confidence = sum(entry['confidence'] for entry in journal_data) / len(journal_data) | |
| st.metric("Avg. Confidence", f"{avg_confidence:.1f}%") | |
| else: | |
| st.info("π Start your first conversation with DilBot to see your personal dashboard!") | |
| st.markdown("---") | |
| st.caption("Built by Ahmad Sana Farooq (Member of CSG Hackathon Team) | Your data is stored privately and securely") | |
| # Main app logic | |
| def main(): | |
| if not st.session_state.authenticated: | |
| show_auth_page() | |
| else: | |
| show_main_app() | |
| if __name__ == "__main__": | |
| main() |