Spaces:
Sleeping
Sleeping
| import os | |
| import pyaudio | |
| import pandas as pd | |
| from sentence_transformers import SentenceTransformer | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| import numpy as np | |
| import time | |
| import speech_recognition as sr | |
| from textblob import TextBlob | |
| import streamlit as st | |
| import seaborn as sns | |
| import plotly.express as px | |
| from datetime import datetime, timedelta | |
| import gspread | |
| from google.oauth2.service_account import Credentials | |
| # Set up paths | |
| csv_file_path = r"C:\Users\Muthuraja\OneDrive\Attachments\Desktop\second\context.csv" # Path to your CSV file | |
| output_csv_path = r"C:\Users\Muthuraja\OneDrive\Attachments\Desktop\second\context.csv" # Path to save query results | |
| # Google Sheets setup | |
| SCOPE = ["https://spreadsheets.google.com/feeds", "https://www.googleapis.com/auth/drive"] | |
| CREDS_PATH = r"C:\Users\Muthuraja\Downloads\modern-cycling-444916-g6-82c207d3eb47.json" # Provide your Google credentials path | |
| # Initialize Google Sheets connection | |
| def initialize_google_sheets(): | |
| credentials = Credentials.from_service_account_file(CREDS_PATH, scopes=SCOPE) | |
| try: | |
| client = gspread.authorize(credentials) | |
| sheet = client.open("infosys").sheet1 # Change Google Sheet name to "SalesStores" | |
| return sheet | |
| except gspread.exceptions.APIError as e: | |
| st.error(f"Google Sheets API error: {e}") | |
| return None | |
| sheet = initialize_google_sheets() | |
| # Function to safely load the CSV dataset | |
| def load_csv_safely(file_path): | |
| try: | |
| # Attempt to read with error handling for bad lines | |
| df = pd.read_csv(file_path, on_bad_lines='skip') # Skips malformed lines | |
| # Check if the required columns exist | |
| required_columns = ['question', 'product', 'price', 'features', 'ratings', 'discount'] | |
| for column in required_columns: | |
| if column not in df.columns: | |
| raise Exception(f"CSV does not contain the required column: '{column}'. Please check your CSV.") | |
| # If 'Timestamp' column doesn't exist, create it as NaT or empty | |
| if 'Timestamp' not in df.columns: | |
| df['Timestamp'] = pd.NaT # Set it to NaT (Not a Time) initially | |
| return df | |
| except pd.errors.ParserError as e: | |
| st.error(f"Error reading CSV file: {e}") | |
| return None | |
| except Exception as e: | |
| st.error(f"An error occurred: {e}") | |
| return None | |
| dataset = load_csv_safely(csv_file_path) # Load the dataset safely | |
| embedding_model = SentenceTransformer('all-MiniLM-L6-v2') # Pre-trained sentence transformer model | |
| # Function to filter data by date | |
| def filter_data_by_date(data, date_filter): | |
| if date_filter == "Today": | |
| start_date = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) | |
| data = data[data['Timestamp'] >= start_date] | |
| elif date_filter == "One Week": | |
| start_date = datetime.now() - timedelta(weeks=1) | |
| data = data[data['Timestamp'] >= start_date] | |
| return data | |
| # Function to recognize speech using SpeechRecognition and PyAudio in chunks | |
| def listen_to_speech(): | |
| recognizer = sr.Recognizer() | |
| # Initialize PyAudio microphone stream | |
| with sr.Microphone() as source: | |
| recognizer.adjust_for_ambient_noise(source) | |
| st.write("Listening...") # Optional: Add a message to indicate listening state | |
| try: | |
| # Listen for the audio input | |
| audio = recognizer.listen(source, timeout=5, phrase_time_limit=10) # Listen for up to 10 seconds | |
| st.write("Recognizing...") # Optional: Add a message for recognition process | |
| # Use Google's speech recognition to convert audio to text | |
| text = recognizer.recognize_google(audio) | |
| st.write(f"Recognized: {text}") | |
| return text # Return the text detected from the audio | |
| except sr.UnknownValueError: | |
| st.error("Sorry, I could not understand the audio.") # Handle case when the audio is unclear | |
| return None | |
| except sr.RequestError: | |
| st.error("Could not request results from Google Speech Recognition service.") # Handle network issues | |
| return None | |
| except Exception as e: | |
| st.error(f"An error occurred: {e}") | |
| return None | |
| # Function to check if the text is a greeting | |
| def is_greeting(text): | |
| greetings = ["hello", "hi", "hey", "good morning", "good afternoon", "good evening", "hola"] | |
| return any(greeting in text.lower() for greeting in greetings) | |
| # Function to respond to greetings | |
| def respond_to_greeting(): | |
| st.write("Hi there! How can I assist you today? 😊") | |
| # Function to extract the product name from the query | |
| def extract_product_name(query): | |
| # Ensure that all product names are strings and handle NaN values | |
| for product in dataset['product'].fillna('Unknown').astype(str): | |
| if product.lower() in query.lower(): | |
| return product | |
| return None | |
| # Function to find the best matching answer using embeddings (Retrieve part of RAG) | |
| def find_answer(query): | |
| if dataset is None: | |
| return "Dataset not loaded properly." | |
| # Compute the embedding of the query | |
| query_embedding = embedding_model.encode([query]) | |
| # Compute embeddings for all the dataset questions | |
| dataset_embeddings = embedding_model.encode(dataset['question'].tolist()) | |
| # Find the closest match using cosine similarity | |
| similarities = cosine_similarity(query_embedding, dataset_embeddings) | |
| # Get the index of the most similar question | |
| closest_idx = np.argmax(similarities) | |
| # Retrieve the product info associated with the closest question | |
| closest_question = dataset.iloc[closest_idx] | |
| product_name = closest_question['product'] | |
| price = closest_question['price'] | |
| features = closest_question['features'] | |
| ratings = closest_question['ratings'] | |
| discount = closest_question['discount'] | |
| # Ensure 'Timestamp' column exists before appending | |
| if 'Timestamp' not in closest_question.index: | |
| closest_question['Timestamp'] = datetime.now() | |
| # Save the query and response to CSV | |
| save_query_to_csv(query, product_name, price, features, ratings, discount) | |
| # Return specific info based on query | |
| if "price" in query.lower(): | |
| return f"The price of {product_name} is {price}" | |
| elif "features" in query.lower(): | |
| return f"Features of {product_name}: {features}" | |
| elif "discount" in query.lower(): | |
| return f"The discount on {product_name} is {discount}%" | |
| else: | |
| return f"Product: {product_name}\nPrice: {price}\nFeatures: {features}\nRatings: {ratings}\nDiscount: {discount}%" | |
| # Function to save the query and answer to 'context.csv' | |
| def save_query_to_csv(query, product_name, price, features, ratings, discount): | |
| new_entry = { | |
| 'question': query, | |
| 'product': product_name, | |
| 'price': price, | |
| 'features': features, | |
| 'ratings': ratings, | |
| 'discount': discount, | |
| 'Timestamp': datetime.now() # Ensure the timestamp is correct | |
| } | |
| new_entry_df = pd.DataFrame([new_entry]) | |
| # Append to CSV (ensure header is only added for the first entry) | |
| new_entry_df.to_csv(output_csv_path, mode='a', header=not os.path.exists(output_csv_path), index=False) | |
| # Function for sentiment analysis using TextBlob with emojis | |
| def analyze_sentiment_with_emoji(text): | |
| # Create a TextBlob object | |
| blob = TextBlob(text) | |
| # Get the sentiment polarity (-1 to 1) | |
| sentiment_score = blob.sentiment.polarity | |
| # Determine sentiment and corresponding emoji based on the polarity score | |
| if sentiment_score > 0: | |
| sentiment = "Positive" | |
| emoji = "😊" # Happy emoji for positive sentiment | |
| elif sentiment_score < 0: | |
| sentiment = "Negative" | |
| emoji = "😞" # Sad emoji for negative sentiment | |
| else: | |
| sentiment = "Neutral" | |
| emoji = "😐" # Neutral emoji for neutral sentiment | |
| return sentiment, sentiment_score, emoji | |
| # Function to provide product recommendations (only product names) based on the query | |
| def recommend_products(query): | |
| if dataset is None: | |
| return "Dataset not loaded properly." | |
| # Ensure all product names are strings and handle missing data | |
| dataset['product'] = dataset['product'].fillna('Unknown').astype(str) | |
| # Compute the embedding of the query | |
| query_embedding = embedding_model.encode([query]) | |
| # Compute embeddings for all the dataset product names | |
| dataset_embeddings = embedding_model.encode(dataset['product'].tolist()) | |
| # Find the closest match using cosine similarity | |
| similarities = cosine_similarity(query_embedding, dataset_embeddings) | |
| # Get the indices of the top 3 recommendations | |
| top_indices = np.argsort(similarities[0])[-3:][::-1] # Get top 3 recommendations | |
| # Return at least 3 recommendations | |
| recommendations = [] | |
| for idx in top_indices: | |
| product = dataset.iloc[idx] | |
| recommendations.append({ | |
| 'product': product['product'], | |
| 'price': product['price'], | |
| 'features': product['features'], | |
| 'ratings': product['ratings'], | |
| 'discount': product['discount'] | |
| }) # Append product details | |
| # If there are less than 3 recommendations, pad with default responses | |
| while len(recommendations) < 3: | |
| recommendations.append({ | |
| 'product': 'No recommendation available', | |
| 'price': 'N/A', | |
| 'features': 'N/A', | |
| 'ratings': 'N/A', | |
| 'discount': 'N/A' | |
| }) | |
| return recommendations | |
| # Function to handle the entire continuous interaction loop | |
| def continuous_interaction(): | |
| st.title("Speech Recognition with Product Queries") | |
| if st.button("Start Speech Recognition"): | |
| while True: # Loop for continuous listening | |
| user_input = listen_to_speech() | |
| if user_input: | |
| # Check if the user is greeting | |
| if is_greeting(user_input): | |
| respond_to_greeting() | |
| continue # Skip the rest of the code and just greet | |
| # Extract product name if mentioned | |
| product_name = extract_product_name(user_input) | |
| if product_name: | |
| # If the user asks for a product like "iPhone price", respond with product details | |
| st.write(f"Let me check the details for {product_name}:") | |
| product_details = dataset[dataset['product'].str.lower() == product_name.lower()] | |
| if not product_details.empty: | |
| product_info = product_details.iloc[0] | |
| st.write(f"Product: {product_info['product']}") | |
| st.write(f"Price: {product_info['price']}") | |
| st.write(f"Features: {product_info['features']}") | |
| st.write(f"Ratings: {product_info['ratings']}") | |
| st.write(f"Discount: {product_info['discount']}%") | |
| else: | |
| st.write("Sorry, I couldn't find the product you're asking for.") | |
| else: | |
| # If no specific product is mentioned, perform normal question answering | |
| answer = find_answer(user_input) | |
| st.write(f"Answer: {answer}") | |
| # Sentiment Analysis with Emoji | |
| sentiment, sentiment_score, emoji = analyze_sentiment_with_emoji(user_input) | |
| st.write(f"Sentiment: {sentiment} (Score: {sentiment_score}) {emoji}") | |
| # Product Recommendations based on query | |
| st.write("Here are some product recommendations based on your query: ") | |
| recommendations = recommend_products(user_input) | |
| if recommendations: | |
| for idx, rec in enumerate(recommendations, 1): | |
| st.write(f"**Recommendation {idx}:**") | |
| st.write(f"**Product**: {rec['product']}") | |
| st.write(f"**Price**: {rec['price']}") | |
| st.write(f"**Features**: {rec['features']}") | |
| st.write(f"**Ratings**: {rec['ratings']}") | |
| st.write(f"**Discount**: {rec['discount']}%") | |
| st.write("---") # Separator between recommendations | |
| # Handle objections if any | |
| st.write("Do you like the recommendation or should I try again?") | |
| # Dashboard function with time filtering | |
| def display_dashboard(): | |
| st.title("Product Dashboard") | |
| st.write("Welcome to the product query dashboard!") | |
| # Sidebar time filter | |
| time_filter = st.sidebar.selectbox( | |
| "Select time period", | |
| ["All Time", "Today", "One Week"] | |
| ) | |
| query_results_df = pd.read_csv(output_csv_path, on_bad_lines='skip') # Load query results from 'context.csv' | |
| # Check if 'Timestamp' column exists | |
| if 'Timestamp' not in query_results_df.columns: | |
| query_results_df['Timestamp'] = pd.to_datetime('now') # Add current timestamp if column is missing | |
| # Filter data based on time selection | |
| query_results_df = filter_data_by_date(query_results_df, time_filter) | |
| st.subheader(f"Recent Queries Summary ({time_filter})") | |
| st.write(query_results_df.tail(10)) # Show the last 10 queries | |
| sentiment_counts = query_results_df['question'].apply(lambda x: analyze_sentiment_with_emoji(x)[0]).value_counts() | |
| st.subheader(f"Sentiment Analysis Distribution ({time_filter})") | |
| st.write(sentiment_counts) | |
| sentiment_fig = px.pie( | |
| sentiment_counts, | |
| names=sentiment_counts.index, | |
| values=sentiment_counts.values, | |
| title=f"Sentiment Distribution of Queries ({time_filter})" | |
| ) | |
| st.plotly_chart(sentiment_fig) | |
| # Ensure 'Timestamp' is properly converted to datetime | |
| query_results_df['Timestamp'] = pd.to_datetime(query_results_df['Timestamp'], errors='coerce') | |
| query_results_df['sentiment_score'] = query_results_df['question'].apply(lambda x: analyze_sentiment_with_emoji(x)[1]) | |
| sentiment_time_fig = px.line( | |
| query_results_df, | |
| x='Timestamp', | |
| y='sentiment_score', | |
| title=f"Sentiment Score Over Time ({time_filter})" | |
| ) | |
| st.plotly_chart(sentiment_time_fig) | |
| product_counts = query_results_df['product'].value_counts() | |
| st.subheader(f"Product Popularity ({time_filter})") | |
| st.write(product_counts) | |
| product_popularity_fig = px.pie( | |
| product_counts, | |
| names=product_counts.index, | |
| values=product_counts.values, | |
| title=f"Product Popularity ({time_filter})" | |
| ) | |
| st.plotly_chart(product_popularity_fig) | |
| # Most recommended products | |
| recommended_products = query_results_df['product'].value_counts() | |
| st.subheader(f"Most Recommended Products ({time_filter})") | |
| st.write(recommended_products) | |
| recommended_products_fig = px.bar( | |
| recommended_products, | |
| x=recommended_products.index, | |
| y=recommended_products.values, | |
| title=f"Top Recommended Products ({time_filter})" | |
| ) | |
| st.plotly_chart(recommended_products_fig) | |
| # Main code logic for running the application | |
| if __name__ == "__main__": | |
| choice = st.sidebar.selectbox("Select Mode", ["Dashboard", "Speech Recognition"]) | |
| if choice == "Dashboard": | |
| display_dashboard() # Display dashboard if selected | |
| else: | |
| continuous_interaction() # Speech recognition interaction | |