Spaces:
Sleeping
Sleeping
File size: 15,590 Bytes
be3beaf |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 |
import os
import pyaudio
import pandas as pd
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
import time
import speech_recognition as sr
from textblob import TextBlob
import streamlit as st
import seaborn as sns
import plotly.express as px
from datetime import datetime, timedelta
import gspread
from google.oauth2.service_account import Credentials
# Set up paths
csv_file_path = r"C:\Users\Muthuraja\OneDrive\Attachments\Desktop\second\context.csv" # Path to your CSV file
output_csv_path = r"C:\Users\Muthuraja\OneDrive\Attachments\Desktop\second\context.csv" # Path to save query results
# Google Sheets setup
SCOPE = ["https://spreadsheets.google.com/feeds", "https://www.googleapis.com/auth/drive"]
CREDS_PATH = r"C:\Users\Muthuraja\Downloads\modern-cycling-444916-g6-82c207d3eb47.json" # Provide your Google credentials path
# Initialize Google Sheets connection
def initialize_google_sheets():
credentials = Credentials.from_service_account_file(CREDS_PATH, scopes=SCOPE)
try:
client = gspread.authorize(credentials)
sheet = client.open("infosys").sheet1 # Change Google Sheet name to "SalesStores"
return sheet
except gspread.exceptions.APIError as e:
st.error(f"Google Sheets API error: {e}")
return None
sheet = initialize_google_sheets()
# Function to safely load the CSV dataset
def load_csv_safely(file_path):
try:
# Attempt to read with error handling for bad lines
df = pd.read_csv(file_path, on_bad_lines='skip') # Skips malformed lines
# Check if the required columns exist
required_columns = ['question', 'product', 'price', 'features', 'ratings', 'discount']
for column in required_columns:
if column not in df.columns:
raise Exception(f"CSV does not contain the required column: '{column}'. Please check your CSV.")
# If 'Timestamp' column doesn't exist, create it as NaT or empty
if 'Timestamp' not in df.columns:
df['Timestamp'] = pd.NaT # Set it to NaT (Not a Time) initially
return df
except pd.errors.ParserError as e:
st.error(f"Error reading CSV file: {e}")
return None
except Exception as e:
st.error(f"An error occurred: {e}")
return None
dataset = load_csv_safely(csv_file_path) # Load the dataset safely
embedding_model = SentenceTransformer('all-MiniLM-L6-v2') # Pre-trained sentence transformer model
# Function to filter data by date
def filter_data_by_date(data, date_filter):
if date_filter == "Today":
start_date = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
data = data[data['Timestamp'] >= start_date]
elif date_filter == "One Week":
start_date = datetime.now() - timedelta(weeks=1)
data = data[data['Timestamp'] >= start_date]
return data
# Function to recognize speech using SpeechRecognition and PyAudio in chunks
def listen_to_speech():
recognizer = sr.Recognizer()
# Initialize PyAudio microphone stream
with sr.Microphone() as source:
recognizer.adjust_for_ambient_noise(source)
st.write("Listening...") # Optional: Add a message to indicate listening state
try:
# Listen for the audio input
audio = recognizer.listen(source, timeout=5, phrase_time_limit=10) # Listen for up to 10 seconds
st.write("Recognizing...") # Optional: Add a message for recognition process
# Use Google's speech recognition to convert audio to text
text = recognizer.recognize_google(audio)
st.write(f"Recognized: {text}")
return text # Return the text detected from the audio
except sr.UnknownValueError:
st.error("Sorry, I could not understand the audio.") # Handle case when the audio is unclear
return None
except sr.RequestError:
st.error("Could not request results from Google Speech Recognition service.") # Handle network issues
return None
except Exception as e:
st.error(f"An error occurred: {e}")
return None
# Function to check if the text is a greeting
def is_greeting(text):
greetings = ["hello", "hi", "hey", "good morning", "good afternoon", "good evening", "hola"]
return any(greeting in text.lower() for greeting in greetings)
# Function to respond to greetings
def respond_to_greeting():
st.write("Hi there! How can I assist you today? 😊")
# Function to extract the product name from the query
def extract_product_name(query):
# Ensure that all product names are strings and handle NaN values
for product in dataset['product'].fillna('Unknown').astype(str):
if product.lower() in query.lower():
return product
return None
# Function to find the best matching answer using embeddings (Retrieve part of RAG)
def find_answer(query):
if dataset is None:
return "Dataset not loaded properly."
# Compute the embedding of the query
query_embedding = embedding_model.encode([query])
# Compute embeddings for all the dataset questions
dataset_embeddings = embedding_model.encode(dataset['question'].tolist())
# Find the closest match using cosine similarity
similarities = cosine_similarity(query_embedding, dataset_embeddings)
# Get the index of the most similar question
closest_idx = np.argmax(similarities)
# Retrieve the product info associated with the closest question
closest_question = dataset.iloc[closest_idx]
product_name = closest_question['product']
price = closest_question['price']
features = closest_question['features']
ratings = closest_question['ratings']
discount = closest_question['discount']
# Ensure 'Timestamp' column exists before appending
if 'Timestamp' not in closest_question.index:
closest_question['Timestamp'] = datetime.now()
# Save the query and response to CSV
save_query_to_csv(query, product_name, price, features, ratings, discount)
# Return specific info based on query
if "price" in query.lower():
return f"The price of {product_name} is {price}"
elif "features" in query.lower():
return f"Features of {product_name}: {features}"
elif "discount" in query.lower():
return f"The discount on {product_name} is {discount}%"
else:
return f"Product: {product_name}\nPrice: {price}\nFeatures: {features}\nRatings: {ratings}\nDiscount: {discount}%"
# Function to save the query and answer to 'context.csv'
def save_query_to_csv(query, product_name, price, features, ratings, discount):
new_entry = {
'question': query,
'product': product_name,
'price': price,
'features': features,
'ratings': ratings,
'discount': discount,
'Timestamp': datetime.now() # Ensure the timestamp is correct
}
new_entry_df = pd.DataFrame([new_entry])
# Append to CSV (ensure header is only added for the first entry)
new_entry_df.to_csv(output_csv_path, mode='a', header=not os.path.exists(output_csv_path), index=False)
# Function for sentiment analysis using TextBlob with emojis
def analyze_sentiment_with_emoji(text):
# Create a TextBlob object
blob = TextBlob(text)
# Get the sentiment polarity (-1 to 1)
sentiment_score = blob.sentiment.polarity
# Determine sentiment and corresponding emoji based on the polarity score
if sentiment_score > 0:
sentiment = "Positive"
emoji = "😊" # Happy emoji for positive sentiment
elif sentiment_score < 0:
sentiment = "Negative"
emoji = "😞" # Sad emoji for negative sentiment
else:
sentiment = "Neutral"
emoji = "😐" # Neutral emoji for neutral sentiment
return sentiment, sentiment_score, emoji
# Function to provide product recommendations (only product names) based on the query
def recommend_products(query):
if dataset is None:
return "Dataset not loaded properly."
# Ensure all product names are strings and handle missing data
dataset['product'] = dataset['product'].fillna('Unknown').astype(str)
# Compute the embedding of the query
query_embedding = embedding_model.encode([query])
# Compute embeddings for all the dataset product names
dataset_embeddings = embedding_model.encode(dataset['product'].tolist())
# Find the closest match using cosine similarity
similarities = cosine_similarity(query_embedding, dataset_embeddings)
# Get the indices of the top 3 recommendations
top_indices = np.argsort(similarities[0])[-3:][::-1] # Get top 3 recommendations
# Return at least 3 recommendations
recommendations = []
for idx in top_indices:
product = dataset.iloc[idx]
recommendations.append({
'product': product['product'],
'price': product['price'],
'features': product['features'],
'ratings': product['ratings'],
'discount': product['discount']
}) # Append product details
# If there are less than 3 recommendations, pad with default responses
while len(recommendations) < 3:
recommendations.append({
'product': 'No recommendation available',
'price': 'N/A',
'features': 'N/A',
'ratings': 'N/A',
'discount': 'N/A'
})
return recommendations
# Function to handle the entire continuous interaction loop
def continuous_interaction():
st.title("Speech Recognition with Product Queries")
if st.button("Start Speech Recognition"):
while True: # Loop for continuous listening
user_input = listen_to_speech()
if user_input:
# Check if the user is greeting
if is_greeting(user_input):
respond_to_greeting()
continue # Skip the rest of the code and just greet
# Extract product name if mentioned
product_name = extract_product_name(user_input)
if product_name:
# If the user asks for a product like "iPhone price", respond with product details
st.write(f"Let me check the details for {product_name}:")
product_details = dataset[dataset['product'].str.lower() == product_name.lower()]
if not product_details.empty:
product_info = product_details.iloc[0]
st.write(f"Product: {product_info['product']}")
st.write(f"Price: {product_info['price']}")
st.write(f"Features: {product_info['features']}")
st.write(f"Ratings: {product_info['ratings']}")
st.write(f"Discount: {product_info['discount']}%")
else:
st.write("Sorry, I couldn't find the product you're asking for.")
else:
# If no specific product is mentioned, perform normal question answering
answer = find_answer(user_input)
st.write(f"Answer: {answer}")
# Sentiment Analysis with Emoji
sentiment, sentiment_score, emoji = analyze_sentiment_with_emoji(user_input)
st.write(f"Sentiment: {sentiment} (Score: {sentiment_score}) {emoji}")
# Product Recommendations based on query
st.write("Here are some product recommendations based on your query: ")
recommendations = recommend_products(user_input)
if recommendations:
for idx, rec in enumerate(recommendations, 1):
st.write(f"**Recommendation {idx}:**")
st.write(f"**Product**: {rec['product']}")
st.write(f"**Price**: {rec['price']}")
st.write(f"**Features**: {rec['features']}")
st.write(f"**Ratings**: {rec['ratings']}")
st.write(f"**Discount**: {rec['discount']}%")
st.write("---") # Separator between recommendations
# Handle objections if any
st.write("Do you like the recommendation or should I try again?")
# Dashboard function with time filtering
def display_dashboard():
st.title("Product Dashboard")
st.write("Welcome to the product query dashboard!")
# Sidebar time filter
time_filter = st.sidebar.selectbox(
"Select time period",
["All Time", "Today", "One Week"]
)
query_results_df = pd.read_csv(output_csv_path, on_bad_lines='skip') # Load query results from 'context.csv'
# Check if 'Timestamp' column exists
if 'Timestamp' not in query_results_df.columns:
query_results_df['Timestamp'] = pd.to_datetime('now') # Add current timestamp if column is missing
# Filter data based on time selection
query_results_df = filter_data_by_date(query_results_df, time_filter)
st.subheader(f"Recent Queries Summary ({time_filter})")
st.write(query_results_df.tail(10)) # Show the last 10 queries
sentiment_counts = query_results_df['question'].apply(lambda x: analyze_sentiment_with_emoji(x)[0]).value_counts()
st.subheader(f"Sentiment Analysis Distribution ({time_filter})")
st.write(sentiment_counts)
sentiment_fig = px.pie(
sentiment_counts,
names=sentiment_counts.index,
values=sentiment_counts.values,
title=f"Sentiment Distribution of Queries ({time_filter})"
)
st.plotly_chart(sentiment_fig)
# Ensure 'Timestamp' is properly converted to datetime
query_results_df['Timestamp'] = pd.to_datetime(query_results_df['Timestamp'], errors='coerce')
query_results_df['sentiment_score'] = query_results_df['question'].apply(lambda x: analyze_sentiment_with_emoji(x)[1])
sentiment_time_fig = px.line(
query_results_df,
x='Timestamp',
y='sentiment_score',
title=f"Sentiment Score Over Time ({time_filter})"
)
st.plotly_chart(sentiment_time_fig)
product_counts = query_results_df['product'].value_counts()
st.subheader(f"Product Popularity ({time_filter})")
st.write(product_counts)
product_popularity_fig = px.pie(
product_counts,
names=product_counts.index,
values=product_counts.values,
title=f"Product Popularity ({time_filter})"
)
st.plotly_chart(product_popularity_fig)
# Most recommended products
recommended_products = query_results_df['product'].value_counts()
st.subheader(f"Most Recommended Products ({time_filter})")
st.write(recommended_products)
recommended_products_fig = px.bar(
recommended_products,
x=recommended_products.index,
y=recommended_products.values,
title=f"Top Recommended Products ({time_filter})"
)
st.plotly_chart(recommended_products_fig)
# Main code logic for running the application
if __name__ == "__main__":
choice = st.sidebar.selectbox("Select Mode", ["Dashboard", "Speech Recognition"])
if choice == "Dashboard":
display_dashboard() # Display dashboard if selected
else:
continuous_interaction() # Speech recognition interaction
|