import streamlit as st import pandas as pd from langchain.memory import ConversationBufferMemory from langchain_google_genai import ChatGoogleGenerativeAI import google.generativeai as genai from PIL import Image import PyPDF2 import os from langchain.agents import initialize_agent, Tool from langchain.agents.agent_types import AgentType from difflib import get_close_matches from dotenv import load_dotenv # Load environment variables load_dotenv() # Configure Google API genai.configure(api_key=os.getenv('GOOGLE_API_KEY')) def configure_gemini(api_key): genai.configure(api_key=api_key) return genai.GenerativeModel('gemini-2.0-flash-thinking-exp') model = configure_gemini(os.environ['GOOGLE_API_KEY']) # Initialize Gemini models llm_flash_exp = ChatGoogleGenerativeAI( model="gemini-2.0-flash-exp", max_retries=2 ) class SmartShoppingAssistant: def __init__(self, products_df): self.df = products_df # Preprocess product names for faster matching self.df['CleanName'] = self.df['ProductName'].str.upper().str.strip().str.replace(r'\s+', ' ', regex=True) self.product_names = self.df['CleanName'].tolist() self.memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True) self.setup_agent() def find_closest_product(self, product_name, threshold=0.7): # Increased threshold product_name = product_name.upper().strip() matches = get_close_matches( product_name, self.product_names, n=3, cutoff=threshold ) return matches if matches else [] def match_products_with_catalogue(self, extracted_items): """Match extracted items with catalogue products using Gemini""" product_list = self.df['ProductName'].tolist() product_string = ", ".join(product_list) prompt = f""" Given these extracted items and quantities: {extracted_items} And this product catalogue: {product_string} Match each item with the most appropriate product from the catalogue. For each item, provide: 1. The exact product name from the catalogue 2. The quantity (if specified, otherwise assume 1) 3. Any specific requirements (brand, size, etc.) Format the response as: ProductName == "MATCHED_PRODUCT" quantity: NUMBER or ProductName == "MATCHED_PRODUCT" quantity: NUMBER Only include products that have good matches in the catalogue. """ try: matches = llm_flash_exp.predict(prompt) return matches.strip() except Exception as e: return f"Error matching products: {str(e)}" def search_products_fuzzy(self, product_names_with_quantities): """Improved fuzzy search with batch processing""" results = pd.DataFrame() matched_products = set() for item in product_names_with_quantities: product_info = item.split('quantity:') clean_name = product_info[0].strip().upper().replace('PRODUCTNAME ==', '').strip(' "\'') quantity = int(product_info[1].strip()) if len(product_info) > 1 else 1 if clean_name in matched_products: continue # Skip already matched products closest_matches = self.find_closest_product(clean_name) for match in closest_matches: matched = self.df[self.df['CleanName'] == match] if not matched.empty: matched = matched.copy() matched['Quantity'] = quantity results = pd.concat([results, matched]) matched_products.add(clean_name) break # Take first good match return results.drop_duplicates(subset=['CleanName']) def setup_agent(self): """Set up the LangChain agent with necessary tools""" def search_products(query): try: # Split into individual product entries product_entries = [entry.strip() for entry in query.split('or')] results = self.search_products_fuzzy(product_entries) if not results.empty: # Format results with quantity formatted_results = results.apply( lambda x: f"{x['ProductName']} (Quantity: {x['Quantity']})\nPrice: ${x['RetailPrice']:.2f}\n", axis=1 ) return "\n".join(formatted_results) return "No products found matching your criteria." except Exception as e: return f"Error executing query: {str(e)}" tools = [ Tool( name="Product Search", func=search_products, description="Search for products in the supermarket database using fuzzy matching" ) ] self.agent = initialize_agent( tools=tools, memory=self.memory, llm=llm_flash_exp, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, verbose=True, max_iterations=5 ) def process_natural_language_query(self, query): """Process natural language query with two-step matching""" try: # First step: Extract items and quantities extraction_prompt = f""" Extract the products and their quantities from this shopping request. Include any specific requirements mentioned. Shopping request: {query} Format each item with its quantity (assume 1 if not specified). """ extracted_items = llm_flash_exp.predict(extraction_prompt) # Second step: Match with catalogue matched_products = self.match_products_with_catalogue(extracted_items) # Third step: Search and return results result = self.agent.run(f"Search for products matching the specified names: {matched_products}") return result except Exception as e: return f"Error processing query: {str(e)}" def extract_text_from_image(self, image): """Extract text from uploaded image using Gemini""" prompt = """ Analyze this image and extract products and their quantities. If quantities aren't specified, make reasonable assumptions based on typical shopping patterns. List each item with its quantity. """ try: response = model.generate_content([prompt, image]) return response.text except Exception as e: return f"Error processing image: {str(e)}" def extract_text_from_pdf(self, pdf_file): """Extract text from uploaded PDF""" try: pdf_reader = PyPDF2.PdfReader(pdf_file) text = "" for page in pdf_reader.pages: text += page.extract_text() return text except Exception as e: return f"Error processing PDF: {str(e)}" # Add cart management functions def add_to_cart(product): if 'cart' not in st.session_state: st.session_state.cart = [] # Check if product exists in cart existing = next((item for item in st.session_state.cart if item['ProductName'] == product['ProductName']), None) if existing: existing['Quantity'] += product['Quantity'] else: st.session_state.cart.append(product) def remove_from_cart(product_name): st.session_state.cart = [item for item in st.session_state.cart if item['ProductName'] != product_name] def generate_receipt(): from fpdf import FPDF pdf = FPDF() pdf.add_page() pdf.set_font("Arial", size=12) pdf.cell(200, 10, txt="Bon Marche Receipt", ln=1, align='C') pdf.cell(200, 10, txt=f"Date: {pd.Timestamp.now().strftime('%Y-%m-%d %H:%M')}", ln=1) total = 0 for item in st.session_state.cart: price = item['RetailPrice'] * item['Quantity'] pdf.cell(200, 10, txt=f"{item['ProductName']} x{item['Quantity']} - ${price:.2f}", ln=1) total += price pdf.cell(200, 10, txt=f"Total: ${total:.2f}", ln=1) return pdf.output(dest='S').encode('latin1') # Update main function def main(): st.set_page_config(page_title="Smart Shopping Assistant", layout="wide") st.title("🛒 Smart Shopping Assistant") @st.cache_data def load_product_data(): return pd.read_csv('supermarket4i.csv') # Ensure correct filename df = load_product_data() assistant = SmartShoppingAssistant(df) with st.sidebar: st.header("Upload Shopping List") uploaded_file = st.file_uploader( "Upload an image or PDF of your shopping list", type=['png', 'jpg', 'jpeg', 'pdf'] ) if uploaded_file: try: if uploaded_file.type.startswith('image'): with st.spinner("Extracting items from image..."): image = Image.open(uploaded_file) extracted_text = assistant.extract_text_from_image(image) st.session_state.query = extracted_text elif uploaded_file.type == 'application/pdf': with st.spinner("Extracting items from PDF..."): extracted_text = assistant.extract_text_from_pdf(uploaded_file) st.session_state.query = extracted_text except Exception as e: st.error(f"Error processing file: {str(e)}") col1, col2 = st.columns([2, 1]) with col1: st.header("Search Products") query = st.text_area( "Describe what you're looking for (include quantities if needed):", height=100, value=st.session_state.get('query', '') ) if st.button("Search"): if query: with st.spinner("Searching..."): results = assistant.process_natural_language_query(query) st.session_state.last_results = results # Display results with add to cart buttons if isinstance(results, str): st.write(results) else: for _, row in results.iterrows(): cola, colb = st.columns([3,1]) with cola: st.write(f"**{row['ProductName']}**") st.write(f"Price: ${row['RetailPrice']} | Qty: {row['Quantity']}") with colb: if st.button("Add", key=row['ProductName']): add_to_cart(row.to_dict()) with col2: st.header("Shopping Cart") if 'cart' in st.session_state and st.session_state.cart: total = 0 for item in st.session_state.cart: cols = st.columns([3,1,1]) with cols[0]: st.write(f"{item['ProductName']} x{item['Quantity']}") with cols[1]: st.write(f"${item['RetailPrice'] * item['Quantity']:.2f}") with cols[2]: if st.button("❌", key=f"del_{item['ProductName']}"): remove_from_cart(item['ProductName']) st.rerun() total += item['RetailPrice'] * item['Quantity'] st.divider() st.write(f"**Total: ${total:.2f}**") if st.button("Checkout"): receipt = generate_receipt() st.download_button( label="Download Receipt", data=receipt, file_name="bon_marche_receipt.pdf", mime="application/pdf" ) else: st.write("Your cart is empty") if __name__ == "__main__": main()