import os import pyaudio import pandas as pd import numpy as np import requests from sentence_transformers import SentenceTransformer from sklearn.metrics.pairwise import cosine_similarity import time import speech_recognition as sr from textblob import TextBlob import streamlit as st import seaborn as sns import plotly.express as px from datetime import datetime, timedelta import gspread from google.oauth2.service_account import Credentials # Groq API setup GROQ_API_KEY = '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' GROQ_API_URL ="https://api.groq.com/openai/v1/chat/completions" # Set up paths for CSV files and Google Sheets credentials csv_file_path = "context.csv" output_csv_path = "contents.csv" # Google Sheets setup SCOPE = ["https://spreadsheets.google.com/feeds", "https://www.googleapis.com/auth/drive"] CREDS_PATH = "modern-cycling-444916-g6-82c207d3eb47.json" # Initialize Google Sheets connection def initialize_google_sheets(): credentials = Credentials.from_service_account_file(CREDS_PATH, scopes=SCOPE) try: client = gspread.authorize(credentials) sheet = client.open("infosys").sheet1 return sheet except gspread.exceptions.APIError as e: st.error(f"Google Sheets API error: {e}") return None sheet = initialize_google_sheets() # Function to safely load the CSV dataset def load_csv_safely(file_path): try: df = pd.read_csv(file_path, encoding='latin1', on_bad_lines='skip') required_columns = ['question', 'product', 'price', 'features', 'ratings', 'discount'] for column in required_columns: if column not in df.columns: raise Exception(f"CSV does not contain the required column: '{column}'") if 'Timestamp' not in df.columns: df['Timestamp'] = pd.NaT return df except pd.errors.ParserError as e: st.error(f"Error reading CSV file: {e}") return None except Exception as e: st.error(f"An error occurred: {e}") return None dataset = load_csv_safely(csv_file_path) embedding_model = SentenceTransformer('all-MiniLM-L6-v2') # Function to filter data by date def filter_data_by_date(data, date_filter): data['Timestamp'] = pd.to_datetime(data['Timestamp'], errors='coerce') if date_filter == "Today": start_date = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) data = data[data['Timestamp'] >= start_date] elif date_filter == "One Week": start_date = datetime.now() - timedelta(weeks=1) data = data[data['Timestamp'] >= start_date] return data # Function to get a response from Groq API def get_groq_response(query): headers = { "Authorization": f"Bearer {GROQ_API_KEY}", "Content-Type": "application/json" } payload = { "model": "llama3-8b-8192", # Update to the correct model ID used by Groq "messages": [{"role": "user", "content": query}] } try: response = requests.post(GROQ_API_URL, headers=headers, json=payload) response.raise_for_status() data = response.json() if 'choices' in data and len(data['choices']) > 0: return data['choices'][0]['message']['content'] else: return "No response from Groq API." except requests.exceptions.RequestException as e: st.error(f"Error making request to Groq API: {e}") return "Error in API request." # Function for speech recognition # Function for speech recognition with automatic device detection def listen_to_speech(): recognizer = sr.Recognizer() # Check available microphone devices try: audio = pyaudio.PyAudio() input_devices = [] for i in range(audio.get_device_count()): device_info = audio.get_device_info_by_index(i) if device_info['maxInputChannels'] > 0: input_devices.append((i, device_info['name'])) if not input_devices: st.error("❌ No input devices found. Please connect a microphone.") return None default_device_index = input_devices[0][0] device_name = input_devices[0][1] st.info(f"🎙 Using microphone: {device_name}") with sr.Microphone(device_index=default_device_index) as source: recognizer.adjust_for_ambient_noise(source) st.write("Listening... 🎧") try: audio_data = recognizer.listen(source, timeout=5, phrase_time_limit=10) st.write("Recognizing... 🧠") text = recognizer.recognize_google(audio_data) st.success(f"Recognized: {text}") return text except sr.UnknownValueError: st.warning("Sorry, I could not understand the audio.") return None except sr.RequestError: st.error("Could not request results from Google Speech Recognition service.") return None except Exception as e: st.error(f"An unexpected error occurred: {e}") return None except Exception as e: st.error(f"Microphone access failed: {e}") return None # Function to check if the text is a greeting def is_greeting(text): greetings = ["hello", "hi", "hey", "good morning", "good afternoon", "good evening", "hola"] return any(greeting in text.lower() for greeting in greetings) # Function to respond to greetings def respond_to_greeting(): st.write("Hi there! How can I assist you today? 😊") # Function to extract the product name from the query def extract_product_name(query): for product in dataset['product'].fillna('Unknown').astype(str): if product.lower() in query.lower(): return product return None # Function to search for relevant product details based on query def find_answer(query): if dataset is None: return "Dataset not loaded properly." query_embedding = embedding_model.encode([query]) combined_columns = dataset['question'].fillna('') + " " + dataset['product'].fillna('') + " " + dataset['features'].fillna('') combined_embeddings = embedding_model.encode(combined_columns.tolist()) similarities = cosine_similarity(query_embedding, combined_embeddings) similarity_threshold = 0.5 closest_idx = np.argmax(similarities) highest_similarity = similarities[0][closest_idx] if highest_similarity < similarity_threshold: return "Sorry, no product found for your query." closest_question = dataset.iloc[closest_idx] product_name = closest_question['product'] price = closest_question['price'] features = closest_question['features'] ratings = closest_question['ratings'] discount = closest_question['discount'] if 'Timestamp' not in closest_question.index: closest_question['Timestamp'] = datetime.now() save_query_to_csv(query, product_name, price, features, ratings, discount) if "price" in query.lower(): return f"The price of {product_name} is {price}" elif "features" in query.lower(): return f"Features of {product_name}: {features}" elif "discount" in query.lower(): return f"The discount on {product_name} is {discount}%" else: return f"Product: {product_name}\nPrice: {price}\nFeatures: {features}\nRatings: {ratings}\nDiscount: {discount}%" # Function to save the query and answer to 'context.csv' def save_query_to_csv(query, product_name, price, features, ratings, discount): new_entry = { 'question': query, 'product': product_name, 'price': price, 'features': features, 'ratings': ratings, 'discount': discount, 'Timestamp': datetime.now() } new_entry_df = pd.DataFrame([new_entry]) new_entry_df.to_csv(output_csv_path, mode='a', header=not os.path.exists(output_csv_path), index=False) # Function for sentiment analysis with emojis def analyze_sentiment_with_emoji(text): blob = TextBlob(text) sentiment_score = blob.sentiment.polarity if sentiment_score > 0: sentiment = "Positive" emoji = "😊" elif sentiment_score < 0: sentiment = "Negative" emoji = "😞" else: sentiment = "Neutral" emoji = "😐" return sentiment, sentiment_score, emoji # Function to provide product recommendations based on the query def recommend_products(query): if dataset is None: return "Dataset not loaded properly." dataset['product'] = dataset['product'].fillna('Unknown').astype(str) query_embedding = embedding_model.encode([query]) dataset_embeddings = embedding_model.encode(dataset['product'].tolist()) similarities = cosine_similarity(query_embedding, dataset_embeddings) top_indices = np.argsort(similarities[0])[-3:][::-1] recommendations = [] for idx in top_indices: product = dataset.iloc[idx] recommendations.append({ 'product': product['product'], 'price': product['price'], 'features': product['features'], 'ratings': product['ratings'], 'discount': product['discount'] }) while len(recommendations) < 3: recommendations.append({ 'product': 'No recommendation available', 'price': 'N/A', 'features': 'N/A', 'ratings': 'N/A', 'discount': 'N/A' }) return recommendations # Function to handle continuous interaction loop def continuous_interaction(): st.title("Speech Recognition with Product Queries") if st.button("Start Speech Recognition"): while True: user_input = listen_to_speech() if user_input: if is_greeting(user_input): respond_to_greeting() continue # Use Groq API for a response to the query groq_response = get_groq_response(user_input) st.write(f"Groq Response: {groq_response}") # Process product name and provide details product_name = extract_product_name(user_input) if product_name: st.write(f"Let me check the details for {product_name}:") product_details = dataset[dataset['product'].str.lower() == product_name.lower()] if not product_details.empty: product_info = product_details.iloc[0] st.write(f"Product: {product_info['product']}") st.write(f"Price: {product_info['price']}") st.write(f"Features: {product_info['features']}") st.write(f"Ratings: {product_info['ratings']}") st.write(f"Discount: {product_info['discount']}%") else: st.write("Sorry, I couldn't find the product you're asking for.") else: answer = find_answer(user_input) st.write(f"Answer: {answer}") sentiment, sentiment_score, emoji = analyze_sentiment_with_emoji(user_input) st.write(f"Sentiment: {sentiment} (Score: {sentiment_score}) {emoji}") st.write("Here are some product recommendations based on your query: ") recommendations = recommend_products(user_input) for idx, rec in enumerate(recommendations, 1): st.write(f"Recommendation {idx}:") st.write(f"Product: {rec['product']}") st.write(f"Price: {rec['price']}") st.write(f"Features: {rec['features']}") st.write(f"Ratings: {rec['ratings']}") st.write(f"Discount: {rec['discount']}%") st.write("---") # Dashboard for visualizations def display_dashboard(): st.title("Product Dashboard") st.write("Welcome to the product query dashboard!") time_filter = st.sidebar.selectbox("Select time period", ["All Time", "Today", "One Week"]) query_results_df = pd.read_csv(output_csv_path, on_bad_lines='skip') if 'Timestamp' not in query_results_df.columns: query_results_df['Timestamp'] = pd.to_datetime('now') query_results_df = filter_data_by_date(query_results_df, time_filter) st.subheader(f"Recent Queries Summary ({time_filter})") st.write(query_results_df.tail(10)) sentiment_counts = query_results_df['question'].apply(lambda x: analyze_sentiment_with_emoji(x)[0]).value_counts() st.subheader(f"Sentiment Analysis Distribution ({time_filter})") st.write(sentiment_counts) sentiment_fig = px.pie( sentiment_counts, names=sentiment_counts.index, values=sentiment_counts.values, title=f"Sentiment Distribution of Queries ({time_filter})" ) st.plotly_chart(sentiment_fig) query_results_df['sentiment_score'] = query_results_df['question'].apply(lambda x: analyze_sentiment_with_emoji(x)[1]) sentiment_time_fig = px.line( query_results_df, x='Timestamp', y='sentiment_score', title=f"Sentiment Score Over Time ({time_filter})" ) st.plotly_chart(sentiment_time_fig) product_counts = query_results_df['product'].value_counts() st.subheader(f"Product Popularity ({time_filter})") st.write(product_counts) product_popularity_fig = px.pie( product_counts, names=product_counts.index, values=product_counts.values, title=f"Product Popularity ({time_filter})" ) st.plotly_chart(product_popularity_fig) recommended_products = query_results_df['product'].value_counts() st.subheader(f"Most Recommended Products ({time_filter})") st.write(recommended_products) recommended_products_fig = px.bar( recommended_products, x=recommended_products.index, y=recommended_products.values, title=f"Top Recommended Products ({time_filter})" ) st.plotly_chart(recommended_products_fig) # Main code to run the app if __name__ == '__main__': mode = st.sidebar.radio("Select Mode", ("Speech Recognition", "Dashboard")) if mode == "Speech Recognition": continuous_interaction() elif mode == "Dashboard": display_dashboard()