import streamlit as st import pandas as pd from langchain.memory import ConversationBufferMemory from langchain_google_genai import ChatGoogleGenerativeAI import google.generativeai as genai from PIL import Image import PyPDF2 import os from langchain.agents import initialize_agent, Tool from langchain.agents.agent_types import AgentType from difflib import get_close_matches from dotenv import load_dotenv # Load environment variables load_dotenv() # Configure Google API genai.configure(api_key=os.getenv('GOOGLE_API_KEY')) def configure_gemini(api_key): genai.configure(api_key=api_key) return genai.GenerativeModel('gemini-2.0-flash-thinking-exp') model = configure_gemini(os.environ['GOOGLE_API_KEY']) # Initialize Gemini models llm_flash_exp = ChatGoogleGenerativeAI( model="gemini-2.0-flash-exp", max_retries=2 ) class SmartShoppingAssistant: def __init__(self, products_df): self.df = products_df # Preprocess product names for faster matching self.df['CleanName'] = self.df['ProductName'].str.upper().str.strip().str.replace(r'\s+', ' ', regex=True) self.product_names = self.df['CleanName'].tolist() self.memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True) self.setup_agent() def find_closest_product(self, product_name, threshold=0.7): product_name = product_name.upper().strip() matches = get_close_matches( product_name, self.product_names, n=3, cutoff=threshold ) return matches if matches else [] def match_products_with_catalogue(self, extracted_items): """Match extracted items with catalogue products using Gemini""" product_list = self.df['ProductName'].tolist() product_string = ", ".join(product_list) prompt = f""" Given these extracted items and quantities: {extracted_items} And this product catalogue: {product_string} Match each item with the most appropriate product from the catalogue. For each item, provide: 1. The exact product name from the catalogue 2. The quantity (if specified, otherwise assume 1) 3. Any specific requirements (brand, size, etc.) Format the response as one entry per line: ProductName == "MATCHED_PRODUCT" quantity: NUMBER """ try: matches = llm_flash_exp.predict(prompt) return matches.strip() except Exception as e: return f"Error matching products: {str(e)}" def search_products_fuzzy(self, product_names_with_quantities): """Perform fuzzy search and return a DataFrame with product details""" results = pd.DataFrame() matched_products = set() for item in product_names_with_quantities: # Expect a line like: ProductName == "Some Name" quantity: 3 parts = item.split('quantity:') if len(parts) < 2: continue # skip badly formatted lines clean_name = parts[0].strip().upper().replace('PRODUCTNAME ==', '').strip(' "\'') try: quantity = int(parts[1].strip()) except Exception: quantity = 1 if clean_name in matched_products: continue closest_matches = self.find_closest_product(clean_name) for match in closest_matches: matched = self.df[self.df['CleanName'] == match] if not matched.empty: matched = matched.copy() matched['Quantity'] = quantity results = pd.concat([results, matched], ignore_index=True) matched_products.add(clean_name) break # Use the first good match return results.drop_duplicates(subset=['CleanName']) def setup_agent(self): """Set up the LangChain agent with necessary tools (if needed)""" def search_products(query): try: product_entries = [entry.strip() for entry in query.split('or')] results = self.search_products_fuzzy(product_entries) if not results.empty: formatted_results = results.apply( lambda x: f"{x['ProductName']} (Quantity: {x['Quantity']}) - Price: ${x['RetailPrice']:.2f}", axis=1 ) return "\n".join(formatted_results) return "No products found matching your criteria." except Exception as e: return f"Error executing query: {str(e)}" tools = [ Tool( name="Product Search", func=search_products, description="Search for products in the supermarket database using fuzzy matching" ) ] self.agent = initialize_agent( tools=tools, memory=self.memory, llm=llm_flash_exp, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, verbose=True, max_iterations=5 ) def process_natural_language_query(self, query): """Process natural language query: 1. Extract items and quantities. 2. Match them with the catalogue. 3. Convert the matches into a DataFrame so that quantity and price can be displayed and the total computed. """ try: extraction_prompt = f""" Extract the products and their quantities from this shopping request. If a quantity is not specified, assume 1. Shopping request: {query} Format each item on a separate line as: ProductName == "EXTRACTED_PRODUCT" quantity: NUMBER """ extracted_items = llm_flash_exp.predict(extraction_prompt) matched_products_str = self.match_products_with_catalogue(extracted_items) product_entries = [line.strip() for line in matched_products_str.splitlines() if line.strip()] results_df = self.search_products_fuzzy(product_entries) return results_df except Exception as e: return f"Error processing query: {str(e)}" def extract_text_from_image(self, image): """Extract text from an uploaded image using Gemini""" prompt = """ Analyze this image and extract products and their quantities. If quantities aren't specified, assume 1. List each item with its quantity. """ try: response = model.generate_content([prompt, image]) return response.text except Exception as e: return f"Error processing image: {str(e)}" def extract_text_from_pdf(self, pdf_file): """Extract text from an uploaded PDF""" try: pdf_reader = PyPDF2.PdfReader(pdf_file) text = "" for page in pdf_reader.pages: text += page.extract_text() return text except Exception as e: return f"Error processing PDF: {str(e)}" # --- Cart Management Functions --- def add_to_cart(product): if 'cart' not in st.session_state: st.session_state.cart = [] # Check if product exists in the cart and update quantity if so. existing = next((item for item in st.session_state.cart if item['ProductName'] == product['ProductName']), None) if existing: existing['Quantity'] += product['Quantity'] else: st.session_state.cart.append(product) def remove_from_cart(product_name): st.session_state.cart = [item for item in st.session_state.cart if item['ProductName'] != product_name] def generate_receipt(): from fpdf import FPDF pdf = FPDF() pdf.add_page() pdf.set_font("Arial", size=12) pdf.cell(200, 10, txt="Bon Marche Receipt", ln=1, align='C') pdf.cell(200, 10, txt=f"Date: {pd.Timestamp.now().strftime('%Y-%m-%d %H:%M')}", ln=1) total = 0 for item in st.session_state.cart: price = item['RetailPrice'] * item['Quantity'] pdf.cell(200, 10, txt=f"{item['ProductName']} x{item['Quantity']} - ${price:.2f}", ln=1) total += price pdf.cell(200, 10, txt=f"Total: ${total:.2f}", ln=1) return pdf.output(dest='S').encode('latin1') # --- Main App Function --- def main(): st.set_page_config(page_title="Smart Shopping Assistant", layout="wide") st.title("🛒 Smart Shopping Assistant") # Load product data only once. @st.cache_data def load_product_data(): return pd.read_csv('supermarket4i.csv') df = load_product_data() assistant = SmartShoppingAssistant(df) # Initialize session state variables if not present. if 'query' not in st.session_state: st.session_state.query = "" if 'last_results' not in st.session_state: st.session_state.last_results = None if 'cart' not in st.session_state: st.session_state.cart = [] with st.sidebar: st.header("Upload Shopping List") uploaded_file = st.file_uploader( "Upload an image or PDF of your shopping list", type=['png', 'jpg', 'jpeg', 'pdf'] ) if uploaded_file: try: if uploaded_file.type.startswith('image'): with st.spinner("Extracting items from image..."): image = Image.open(uploaded_file) extracted_text = assistant.extract_text_from_image(image) st.session_state.query = extracted_text elif uploaded_file.type == 'application/pdf': with st.spinner("Extracting items from PDF..."): extracted_text = assistant.extract_text_from_pdf(uploaded_file) st.session_state.query = extracted_text except Exception as e: st.error(f"Error processing file: {str(e)}") col1, col2 = st.columns([2, 1]) with col1: st.header("Search Products") # Use session_state query so that we can clear it after search if desired. query = st.text_area( "Describe what you're looking for (include quantities if needed):", height=100, value=st.session_state.query ) if st.button("Search", key="search_button"): if query: with st.spinner("Searching..."): results = assistant.process_natural_language_query(query) st.session_state.last_results = results # Optionally clear the query after search: st.session_state.query = "" # Display search results if available. if st.session_state.last_results is not None: if isinstance(st.session_state.last_results, str): st.write(st.session_state.last_results) else: st.subheader("Results") for index, row in st.session_state.last_results.iterrows(): with st.container(): cols = st.columns([3, 1]) with cols[0]: st.write(f"**{row['ProductName']}**") st.write(f"Price: ${row['RetailPrice']:.2f} | Qty: {row['Quantity']}") with cols[1]: # When a product is added, update the cart state without forcing a full rerun. if st.button("Add", key=f"add_{index}"): add_to_cart(row.to_dict()) st.success(f"Added {row['ProductName']} to cart") total_search = (st.session_state.last_results['RetailPrice'] * st.session_state.last_results['Quantity']).sum() st.markdown(f"**Total for these items: ${total_search:.2f}**") with col2: st.header("Shopping Cart") if st.session_state.cart: total_cart = 0 for item in st.session_state.cart: cols = st.columns([3, 1, 1]) with cols[0]: st.write(f"{item['ProductName']} x{item['Quantity']}") with cols[1]: cost = item['RetailPrice'] * item['Quantity'] st.write(f"${cost:.2f}") with cols[2]: if st.button("❌", key=f"del_{item['ProductName']}"): remove_from_cart(item['ProductName']) # We use experimental_rerun here so that the cart updates immediately. st.experimental_rerun() total_cart += item['RetailPrice'] * item['Quantity'] st.divider() st.write(f"**Total: ${total_cart:.2f}**") if st.button("Checkout"): receipt = generate_receipt() st.download_button( label="Download Receipt", data=receipt, file_name="bon_marche_receipt.pdf", mime="application/pdf" ) else: st.write("Your cart is empty") if __name__ == "__main__": main()