query_analysis / app.py
Muthuraja
Update app.py (#12)
be3beaf verified
raw
history blame
15.6 kB
import os
import pyaudio
import pandas as pd
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
import time
import speech_recognition as sr
from textblob import TextBlob
import streamlit as st
import seaborn as sns
import plotly.express as px
from datetime import datetime, timedelta
import gspread
from google.oauth2.service_account import Credentials
# Set up paths
csv_file_path = r"C:\Users\Muthuraja\OneDrive\Attachments\Desktop\second\context.csv" # Path to your CSV file
output_csv_path = r"C:\Users\Muthuraja\OneDrive\Attachments\Desktop\second\context.csv" # Path to save query results
# Google Sheets setup
SCOPE = ["https://spreadsheets.google.com/feeds", "https://www.googleapis.com/auth/drive"]
CREDS_PATH = r"C:\Users\Muthuraja\Downloads\modern-cycling-444916-g6-82c207d3eb47.json" # Provide your Google credentials path
# Initialize Google Sheets connection
def initialize_google_sheets():
credentials = Credentials.from_service_account_file(CREDS_PATH, scopes=SCOPE)
try:
client = gspread.authorize(credentials)
sheet = client.open("infosys").sheet1 # Change Google Sheet name to "SalesStores"
return sheet
except gspread.exceptions.APIError as e:
st.error(f"Google Sheets API error: {e}")
return None
sheet = initialize_google_sheets()
# Function to safely load the CSV dataset
def load_csv_safely(file_path):
try:
# Attempt to read with error handling for bad lines
df = pd.read_csv(file_path, on_bad_lines='skip') # Skips malformed lines
# Check if the required columns exist
required_columns = ['question', 'product', 'price', 'features', 'ratings', 'discount']
for column in required_columns:
if column not in df.columns:
raise Exception(f"CSV does not contain the required column: '{column}'. Please check your CSV.")
# If 'Timestamp' column doesn't exist, create it as NaT or empty
if 'Timestamp' not in df.columns:
df['Timestamp'] = pd.NaT # Set it to NaT (Not a Time) initially
return df
except pd.errors.ParserError as e:
st.error(f"Error reading CSV file: {e}")
return None
except Exception as e:
st.error(f"An error occurred: {e}")
return None
dataset = load_csv_safely(csv_file_path) # Load the dataset safely
embedding_model = SentenceTransformer('all-MiniLM-L6-v2') # Pre-trained sentence transformer model
# Function to filter data by date
def filter_data_by_date(data, date_filter):
if date_filter == "Today":
start_date = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
data = data[data['Timestamp'] >= start_date]
elif date_filter == "One Week":
start_date = datetime.now() - timedelta(weeks=1)
data = data[data['Timestamp'] >= start_date]
return data
# Function to recognize speech using SpeechRecognition and PyAudio in chunks
def listen_to_speech():
recognizer = sr.Recognizer()
# Initialize PyAudio microphone stream
with sr.Microphone() as source:
recognizer.adjust_for_ambient_noise(source)
st.write("Listening...") # Optional: Add a message to indicate listening state
try:
# Listen for the audio input
audio = recognizer.listen(source, timeout=5, phrase_time_limit=10) # Listen for up to 10 seconds
st.write("Recognizing...") # Optional: Add a message for recognition process
# Use Google's speech recognition to convert audio to text
text = recognizer.recognize_google(audio)
st.write(f"Recognized: {text}")
return text # Return the text detected from the audio
except sr.UnknownValueError:
st.error("Sorry, I could not understand the audio.") # Handle case when the audio is unclear
return None
except sr.RequestError:
st.error("Could not request results from Google Speech Recognition service.") # Handle network issues
return None
except Exception as e:
st.error(f"An error occurred: {e}")
return None
# Function to check if the text is a greeting
def is_greeting(text):
greetings = ["hello", "hi", "hey", "good morning", "good afternoon", "good evening", "hola"]
return any(greeting in text.lower() for greeting in greetings)
# Function to respond to greetings
def respond_to_greeting():
st.write("Hi there! How can I assist you today? 😊")
# Function to extract the product name from the query
def extract_product_name(query):
# Ensure that all product names are strings and handle NaN values
for product in dataset['product'].fillna('Unknown').astype(str):
if product.lower() in query.lower():
return product
return None
# Function to find the best matching answer using embeddings (Retrieve part of RAG)
def find_answer(query):
if dataset is None:
return "Dataset not loaded properly."
# Compute the embedding of the query
query_embedding = embedding_model.encode([query])
# Compute embeddings for all the dataset questions
dataset_embeddings = embedding_model.encode(dataset['question'].tolist())
# Find the closest match using cosine similarity
similarities = cosine_similarity(query_embedding, dataset_embeddings)
# Get the index of the most similar question
closest_idx = np.argmax(similarities)
# Retrieve the product info associated with the closest question
closest_question = dataset.iloc[closest_idx]
product_name = closest_question['product']
price = closest_question['price']
features = closest_question['features']
ratings = closest_question['ratings']
discount = closest_question['discount']
# Ensure 'Timestamp' column exists before appending
if 'Timestamp' not in closest_question.index:
closest_question['Timestamp'] = datetime.now()
# Save the query and response to CSV
save_query_to_csv(query, product_name, price, features, ratings, discount)
# Return specific info based on query
if "price" in query.lower():
return f"The price of {product_name} is {price}"
elif "features" in query.lower():
return f"Features of {product_name}: {features}"
elif "discount" in query.lower():
return f"The discount on {product_name} is {discount}%"
else:
return f"Product: {product_name}\nPrice: {price}\nFeatures: {features}\nRatings: {ratings}\nDiscount: {discount}%"
# Function to save the query and answer to 'context.csv'
def save_query_to_csv(query, product_name, price, features, ratings, discount):
new_entry = {
'question': query,
'product': product_name,
'price': price,
'features': features,
'ratings': ratings,
'discount': discount,
'Timestamp': datetime.now() # Ensure the timestamp is correct
}
new_entry_df = pd.DataFrame([new_entry])
# Append to CSV (ensure header is only added for the first entry)
new_entry_df.to_csv(output_csv_path, mode='a', header=not os.path.exists(output_csv_path), index=False)
# Function for sentiment analysis using TextBlob with emojis
def analyze_sentiment_with_emoji(text):
# Create a TextBlob object
blob = TextBlob(text)
# Get the sentiment polarity (-1 to 1)
sentiment_score = blob.sentiment.polarity
# Determine sentiment and corresponding emoji based on the polarity score
if sentiment_score > 0:
sentiment = "Positive"
emoji = "😊" # Happy emoji for positive sentiment
elif sentiment_score < 0:
sentiment = "Negative"
emoji = "😞" # Sad emoji for negative sentiment
else:
sentiment = "Neutral"
emoji = "😐" # Neutral emoji for neutral sentiment
return sentiment, sentiment_score, emoji
# Function to provide product recommendations (only product names) based on the query
def recommend_products(query):
if dataset is None:
return "Dataset not loaded properly."
# Ensure all product names are strings and handle missing data
dataset['product'] = dataset['product'].fillna('Unknown').astype(str)
# Compute the embedding of the query
query_embedding = embedding_model.encode([query])
# Compute embeddings for all the dataset product names
dataset_embeddings = embedding_model.encode(dataset['product'].tolist())
# Find the closest match using cosine similarity
similarities = cosine_similarity(query_embedding, dataset_embeddings)
# Get the indices of the top 3 recommendations
top_indices = np.argsort(similarities[0])[-3:][::-1] # Get top 3 recommendations
# Return at least 3 recommendations
recommendations = []
for idx in top_indices:
product = dataset.iloc[idx]
recommendations.append({
'product': product['product'],
'price': product['price'],
'features': product['features'],
'ratings': product['ratings'],
'discount': product['discount']
}) # Append product details
# If there are less than 3 recommendations, pad with default responses
while len(recommendations) < 3:
recommendations.append({
'product': 'No recommendation available',
'price': 'N/A',
'features': 'N/A',
'ratings': 'N/A',
'discount': 'N/A'
})
return recommendations
# Function to handle the entire continuous interaction loop
def continuous_interaction():
st.title("Speech Recognition with Product Queries")
if st.button("Start Speech Recognition"):
while True: # Loop for continuous listening
user_input = listen_to_speech()
if user_input:
# Check if the user is greeting
if is_greeting(user_input):
respond_to_greeting()
continue # Skip the rest of the code and just greet
# Extract product name if mentioned
product_name = extract_product_name(user_input)
if product_name:
# If the user asks for a product like "iPhone price", respond with product details
st.write(f"Let me check the details for {product_name}:")
product_details = dataset[dataset['product'].str.lower() == product_name.lower()]
if not product_details.empty:
product_info = product_details.iloc[0]
st.write(f"Product: {product_info['product']}")
st.write(f"Price: {product_info['price']}")
st.write(f"Features: {product_info['features']}")
st.write(f"Ratings: {product_info['ratings']}")
st.write(f"Discount: {product_info['discount']}%")
else:
st.write("Sorry, I couldn't find the product you're asking for.")
else:
# If no specific product is mentioned, perform normal question answering
answer = find_answer(user_input)
st.write(f"Answer: {answer}")
# Sentiment Analysis with Emoji
sentiment, sentiment_score, emoji = analyze_sentiment_with_emoji(user_input)
st.write(f"Sentiment: {sentiment} (Score: {sentiment_score}) {emoji}")
# Product Recommendations based on query
st.write("Here are some product recommendations based on your query: ")
recommendations = recommend_products(user_input)
if recommendations:
for idx, rec in enumerate(recommendations, 1):
st.write(f"**Recommendation {idx}:**")
st.write(f"**Product**: {rec['product']}")
st.write(f"**Price**: {rec['price']}")
st.write(f"**Features**: {rec['features']}")
st.write(f"**Ratings**: {rec['ratings']}")
st.write(f"**Discount**: {rec['discount']}%")
st.write("---") # Separator between recommendations
# Handle objections if any
st.write("Do you like the recommendation or should I try again?")
# Dashboard function with time filtering
def display_dashboard():
st.title("Product Dashboard")
st.write("Welcome to the product query dashboard!")
# Sidebar time filter
time_filter = st.sidebar.selectbox(
"Select time period",
["All Time", "Today", "One Week"]
)
query_results_df = pd.read_csv(output_csv_path, on_bad_lines='skip') # Load query results from 'context.csv'
# Check if 'Timestamp' column exists
if 'Timestamp' not in query_results_df.columns:
query_results_df['Timestamp'] = pd.to_datetime('now') # Add current timestamp if column is missing
# Filter data based on time selection
query_results_df = filter_data_by_date(query_results_df, time_filter)
st.subheader(f"Recent Queries Summary ({time_filter})")
st.write(query_results_df.tail(10)) # Show the last 10 queries
sentiment_counts = query_results_df['question'].apply(lambda x: analyze_sentiment_with_emoji(x)[0]).value_counts()
st.subheader(f"Sentiment Analysis Distribution ({time_filter})")
st.write(sentiment_counts)
sentiment_fig = px.pie(
sentiment_counts,
names=sentiment_counts.index,
values=sentiment_counts.values,
title=f"Sentiment Distribution of Queries ({time_filter})"
)
st.plotly_chart(sentiment_fig)
# Ensure 'Timestamp' is properly converted to datetime
query_results_df['Timestamp'] = pd.to_datetime(query_results_df['Timestamp'], errors='coerce')
query_results_df['sentiment_score'] = query_results_df['question'].apply(lambda x: analyze_sentiment_with_emoji(x)[1])
sentiment_time_fig = px.line(
query_results_df,
x='Timestamp',
y='sentiment_score',
title=f"Sentiment Score Over Time ({time_filter})"
)
st.plotly_chart(sentiment_time_fig)
product_counts = query_results_df['product'].value_counts()
st.subheader(f"Product Popularity ({time_filter})")
st.write(product_counts)
product_popularity_fig = px.pie(
product_counts,
names=product_counts.index,
values=product_counts.values,
title=f"Product Popularity ({time_filter})"
)
st.plotly_chart(product_popularity_fig)
# Most recommended products
recommended_products = query_results_df['product'].value_counts()
st.subheader(f"Most Recommended Products ({time_filter})")
st.write(recommended_products)
recommended_products_fig = px.bar(
recommended_products,
x=recommended_products.index,
y=recommended_products.values,
title=f"Top Recommended Products ({time_filter})"
)
st.plotly_chart(recommended_products_fig)
# Main code logic for running the application
if __name__ == "__main__":
choice = st.sidebar.selectbox("Select Mode", ["Dashboard", "Speech Recognition"])
if choice == "Dashboard":
display_dashboard() # Display dashboard if selected
else:
continuous_interaction() # Speech recognition interaction