import streamlit as st import pandas as pd from langchain.memory import ConversationBufferMemory from langchain_google_genai import ChatGoogleGenerativeAI import google.generativeai as genai from PIL import Image import PyPDF2 import os from langchain.agents import initialize_agent, Tool from langchain.agents.agent_types import AgentType from difflib import get_close_matches from dotenv import load_dotenv # Load environment variables load_dotenv() # Configure Google API genai.configure(api_key=os.getenv('GOOGLE_API_KEY')) def configure_gemini(api_key): genai.configure(api_key=api_key) return genai.GenerativeModel('gemini-2.0-flash-thinking-exp') model = configure_gemini(os.environ['GOOGLE_API_KEY']) # Initialize Gemini models llm_flash_exp = ChatGoogleGenerativeAI( model="gemini-2.0-flash-exp", max_retries=2 ) class SmartShoppingAssistant: def __init__(self, products_df): self.df = products_df self.memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True) self.setup_agent() def find_closest_product(self, product_name, threshold=0.6): matches = get_close_matches( product_name.upper(), self.df['ProductName'].str.upper().tolist(), n=3, # Increased to get more potential matches cutoff=threshold ) return matches if matches else [] def match_products_with_catalogue(self, extracted_items): """Match extracted items with catalogue products using Gemini""" product_list = self.df['ProductName'].tolist() product_string = ", ".join(product_list) prompt = f""" Given these extracted items and quantities: {extracted_items} And this product catalogue: {product_string} Match each item with the most appropriate product from the catalogue. For each item, provide: 1. The exact product name from the catalogue 2. The quantity (if specified, otherwise assume 1) 3. Any specific requirements (brand, size, etc.) Format the response as: ProductName == "MATCHED_PRODUCT" quantity: NUMBER or ProductName == "MATCHED_PRODUCT" quantity: NUMBER Only include products that have good matches in the catalogue. """ try: matches = llm_flash_exp.predict(prompt) return matches.strip() except Exception as e: return f"Error matching products: {str(e)}" def search_products_fuzzy(self, product_names_with_quantities): """Search for products using fuzzy matching with quantity information""" results = pd.DataFrame() for item in product_names_with_quantities: product_info = item.split('quantity:') product_name = product_info[0].strip() quantity = int(product_info[1].strip()) if len(product_info) > 1 else 1 # Clean up product name if 'ProductName ==' in product_name: product_name = product_name.split('==')[1].strip(' "\'') closest_matches = self.find_closest_product(product_name) for match in closest_matches: matched_products = self.df[self.df['ProductName'].str.upper() == match.upper()] if not matched_products.empty: matched_products['Quantity'] = quantity results = pd.concat([results, matched_products]) break return results def setup_agent(self): """Set up the LangChain agent with necessary tools""" def search_products(query): try: # Split into individual product entries product_entries = [entry.strip() for entry in query.split('or')] results = self.search_products_fuzzy(product_entries) if not results.empty: # Format results with quantity formatted_results = results.apply( lambda x: f"{x['ProductName']} (Quantity: {x['Quantity']})\nPrice: ${x['RetailPrice']:.2f}\n", axis=1 ) return "\n".join(formatted_results) return "No products found matching your criteria." except Exception as e: return f"Error executing query: {str(e)}" tools = [ Tool( name="Product Search", func=search_products, description="Search for products in the supermarket database using fuzzy matching" ) ] self.agent = initialize_agent( tools=tools, memory=self.memory, llm=llm_flash_exp, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, verbose=True, max_iterations=3 ) def process_natural_language_query(self, query): """Process natural language query with two-step matching""" try: # First step: Extract items and quantities extraction_prompt = f""" Extract the products and their quantities from this shopping request. Include any specific requirements mentioned. Shopping request: {query} Format each item with its quantity (assume 1 if not specified). """ extracted_items = llm_flash_exp.predict(extraction_prompt) # Second step: Match with catalogue matched_products = self.match_products_with_catalogue(extracted_items) # Third step: Search and return results result = self.agent.run(f"Search for products matching the specified names: {matched_products}") return result except Exception as e: return f"Error processing query: {str(e)}" def extract_text_from_image(self, image): """Extract text from uploaded image using Gemini""" prompt = """ Analyze this image and extract products and their quantities. If quantities aren't specified, make reasonable assumptions based on typical shopping patterns. List each item with its quantity. """ try: response = model.generate_content([prompt, image]) return response.text except Exception as e: return f"Error processing image: {str(e)}" def extract_text_from_pdf(self, pdf_file): """Extract text from uploaded PDF""" try: pdf_reader = PyPDF2.PdfReader(pdf_file) text = "" for page in pdf_reader.pages: text += page.extract_text() return text except Exception as e: return f"Error processing PDF: {str(e)}" # Main function remains the same def main(): st.set_page_config(page_title="Smart Shopping Assistant", layout="wide") st.title("🛒 Smart Shopping Assistant") @st.cache_data def load_product_data(): return pd.read_csv('supermarket4.csv') df = load_product_data() assistant = SmartShoppingAssistant(df) with st.sidebar: st.header("Upload Shopping List") uploaded_file = st.file_uploader( "Upload an image or PDF of your shopping list", type=['png', 'jpg', 'jpeg', 'pdf'] ) if uploaded_file: try: if uploaded_file.type.startswith('image'): with st.spinner("Extracting items from image..."): image = Image.open(uploaded_file) extracted_text = assistant.extract_text_from_image(image) st.session_state.query = extracted_text elif uploaded_file.type == 'application/pdf': with st.spinner("Extracting items from PDF..."): extracted_text = assistant.extract_text_from_pdf(uploaded_file) st.session_state.query = extracted_text except Exception as e: st.error(f"Error processing file: {str(e)}") col1, col2 = st.columns([2, 1]) with col1: st.header("Search Products") query = st.text_area( "Describe what you're looking for (include quantities if needed):", height=100, placeholder="Example: 2 boxes of healthy breakfast cereals under $5, 1 gallon of milk", value=st.session_state.get('query', '') ) if st.button("Search"): if query: with st.spinner("Searching for products..."): results = assistant.process_natural_language_query(query) st.write("### Results") st.write(results) else: st.warning("Please enter a search query or upload a shopping list.") with col2: st.header("Shopping Cart") if 'cart' not in st.session_state: st.session_state.cart = [] st.write("Your cart is empty" if not st.session_state.cart else "Cart items here") if __name__ == "__main__": main()