Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import pandas as pd | |
| from langchain.memory import ConversationBufferMemory | |
| from langchain_google_genai import ChatGoogleGenerativeAI | |
| import google.generativeai as genai | |
| from PIL import Image | |
| import PyPDF2 | |
| import os | |
| from langchain.agents import initialize_agent, Tool | |
| from langchain.agents.agent_types import AgentType | |
| from difflib import get_close_matches | |
| from dotenv import load_dotenv | |
| # Load environment variables | |
| load_dotenv() | |
| # Configure Google API | |
| genai.configure(api_key=os.getenv('GOOGLE_API_KEY')) | |
| def configure_gemini(api_key): | |
| genai.configure(api_key=api_key) | |
| return genai.GenerativeModel('gemini-2.0-flash-thinking-exp') | |
| model = configure_gemini(os.environ['GOOGLE_API_KEY']) | |
| # Initialize Gemini models | |
| llm_flash_exp = ChatGoogleGenerativeAI( | |
| model="gemini-2.0-flash-exp", | |
| max_retries=2 | |
| ) | |
| class SmartShoppingAssistant: | |
| def __init__(self, products_df): | |
| self.df = products_df | |
| # Preprocess product names for faster matching | |
| self.df['CleanName'] = self.df['ProductName'].str.upper().str.strip().str.replace(r'\s+', ' ', regex=True) | |
| self.product_names = self.df['CleanName'].tolist() | |
| self.memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True) | |
| self.setup_agent() | |
| def find_closest_product(self, product_name, threshold=0.7): | |
| product_name = product_name.upper().strip() | |
| matches = get_close_matches( | |
| product_name, | |
| self.product_names, | |
| n=3, | |
| cutoff=threshold | |
| ) | |
| return matches if matches else [] | |
| def match_products_with_catalogue(self, extracted_items): | |
| """Match extracted items with catalogue products using Gemini""" | |
| product_list = self.df['ProductName'].tolist() | |
| product_string = ", ".join(product_list) | |
| prompt = f""" | |
| Given these extracted items and quantities: | |
| {extracted_items} | |
| And this product catalogue: | |
| {product_string} | |
| Match each item with the most appropriate product from the catalogue. | |
| For each item, provide: | |
| 1. The exact product name from the catalogue | |
| 2. The quantity (if specified, otherwise assume 1) | |
| 3. Any specific requirements (brand, size, etc.) | |
| Format the response as one entry per line: | |
| ProductName == "MATCHED_PRODUCT" quantity: NUMBER | |
| """ | |
| try: | |
| matches = llm_flash_exp.predict(prompt) | |
| return matches.strip() | |
| except Exception as e: | |
| return f"Error matching products: {str(e)}" | |
| def search_products_fuzzy(self, product_names_with_quantities): | |
| """Perform fuzzy search and return a DataFrame with product details""" | |
| results = pd.DataFrame() | |
| matched_products = set() | |
| for item in product_names_with_quantities: | |
| # Expect a line like: ProductName == "Some Name" quantity: 3 | |
| parts = item.split('quantity:') | |
| if len(parts) < 2: | |
| continue # skip badly formatted lines | |
| clean_name = parts[0].strip().upper().replace('PRODUCTNAME ==', '').strip(' "\'') | |
| try: | |
| quantity = int(parts[1].strip()) | |
| except Exception: | |
| quantity = 1 | |
| if clean_name in matched_products: | |
| continue | |
| closest_matches = self.find_closest_product(clean_name) | |
| for match in closest_matches: | |
| matched = self.df[self.df['CleanName'] == match] | |
| if not matched.empty: | |
| matched = matched.copy() | |
| matched['Quantity'] = quantity | |
| results = pd.concat([results, matched], ignore_index=True) | |
| matched_products.add(clean_name) | |
| break # Use the first good match | |
| return results.drop_duplicates(subset=['CleanName']) | |
| def setup_agent(self): | |
| """Set up the LangChain agent with necessary tools (if needed)""" | |
| def search_products(query): | |
| try: | |
| product_entries = [entry.strip() for entry in query.split('or')] | |
| results = self.search_products_fuzzy(product_entries) | |
| if not results.empty: | |
| formatted_results = results.apply( | |
| lambda x: f"{x['ProductName']} (Quantity: {x['Quantity']}) - Price: ${x['RetailPrice']:.2f}", | |
| axis=1 | |
| ) | |
| return "\n".join(formatted_results) | |
| return "No products found matching your criteria." | |
| except Exception as e: | |
| return f"Error executing query: {str(e)}" | |
| tools = [ | |
| Tool( | |
| name="Product Search", | |
| func=search_products, | |
| description="Search for products in the supermarket database using fuzzy matching" | |
| ) | |
| ] | |
| self.agent = initialize_agent( | |
| tools=tools, | |
| memory=self.memory, | |
| llm=llm_flash_exp, | |
| agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, | |
| verbose=True, | |
| max_iterations=5 | |
| ) | |
| def process_natural_language_query(self, query): | |
| """Process natural language query: | |
| 1. Extract items and quantities. | |
| 2. Match them with the catalogue. | |
| 3. Convert the matches into a DataFrame so that quantity and price | |
| can be displayed and the total computed. | |
| """ | |
| try: | |
| extraction_prompt = f""" | |
| Extract the products and their quantities from this shopping request. | |
| If a quantity is not specified, assume 1. | |
| Shopping request: {query} | |
| Format each item on a separate line as: | |
| ProductName == "EXTRACTED_PRODUCT" quantity: NUMBER | |
| """ | |
| extracted_items = llm_flash_exp.predict(extraction_prompt) | |
| matched_products_str = self.match_products_with_catalogue(extracted_items) | |
| product_entries = [line.strip() for line in matched_products_str.splitlines() if line.strip()] | |
| results_df = self.search_products_fuzzy(product_entries) | |
| return results_df | |
| except Exception as e: | |
| return f"Error processing query: {str(e)}" | |
| def extract_text_from_image(self, image): | |
| """Extract text from an uploaded image using Gemini""" | |
| prompt = """ | |
| Analyze this image and extract products and their quantities. | |
| If quantities aren't specified, assume 1. | |
| List each item with its quantity. | |
| """ | |
| try: | |
| response = model.generate_content([prompt, image]) | |
| return response.text | |
| except Exception as e: | |
| return f"Error processing image: {str(e)}" | |
| def extract_text_from_pdf(self, pdf_file): | |
| """Extract text from an uploaded PDF""" | |
| try: | |
| pdf_reader = PyPDF2.PdfReader(pdf_file) | |
| text = "" | |
| for page in pdf_reader.pages: | |
| text += page.extract_text() | |
| return text | |
| except Exception as e: | |
| return f"Error processing PDF: {str(e)}" | |
| # --- Cart Management Functions --- | |
| def add_to_cart(product): | |
| if 'cart' not in st.session_state: | |
| st.session_state.cart = [] | |
| # Check if product exists in the cart and update quantity if so. | |
| existing = next((item for item in st.session_state.cart if item['ProductName'] == product['ProductName']), None) | |
| if existing: | |
| existing['Quantity'] += product['Quantity'] | |
| else: | |
| st.session_state.cart.append(product) | |
| def remove_from_cart(product_name): | |
| st.session_state.cart = [item for item in st.session_state.cart if item['ProductName'] != product_name] | |
| def generate_receipt(): | |
| from fpdf import FPDF | |
| pdf = FPDF() | |
| pdf.add_page() | |
| pdf.set_font("Arial", size=12) | |
| pdf.cell(200, 10, txt="Bon Marche Receipt", ln=1, align='C') | |
| pdf.cell(200, 10, txt=f"Date: {pd.Timestamp.now().strftime('%Y-%m-%d %H:%M')}", ln=1) | |
| total = 0 | |
| for item in st.session_state.cart: | |
| price = item['RetailPrice'] * item['Quantity'] | |
| pdf.cell(200, 10, | |
| txt=f"{item['ProductName']} x{item['Quantity']} - ${price:.2f}", | |
| ln=1) | |
| total += price | |
| pdf.cell(200, 10, txt=f"Total: ${total:.2f}", ln=1) | |
| return pdf.output(dest='S').encode('latin1') | |
| # --- Main App Function --- | |
| def main(): | |
| st.set_page_config(page_title="Smart Shopping Assistant", layout="wide") | |
| st.title("🛒 Smart Shopping Assistant") | |
| # Load product data only once. | |
| def load_product_data(): | |
| return pd.read_csv('supermarket4i.csv') | |
| df = load_product_data() | |
| assistant = SmartShoppingAssistant(df) | |
| # Initialize session state variables if not present. | |
| if 'query' not in st.session_state: | |
| st.session_state.query = "" | |
| if 'last_results' not in st.session_state: | |
| st.session_state.last_results = None | |
| if 'cart' not in st.session_state: | |
| st.session_state.cart = [] | |
| with st.sidebar: | |
| st.header("Upload Shopping List") | |
| uploaded_file = st.file_uploader( | |
| "Upload an image or PDF of your shopping list", | |
| type=['png', 'jpg', 'jpeg', 'pdf'] | |
| ) | |
| if uploaded_file: | |
| try: | |
| if uploaded_file.type.startswith('image'): | |
| with st.spinner("Extracting items from image..."): | |
| image = Image.open(uploaded_file) | |
| extracted_text = assistant.extract_text_from_image(image) | |
| st.session_state.query = extracted_text | |
| elif uploaded_file.type == 'application/pdf': | |
| with st.spinner("Extracting items from PDF..."): | |
| extracted_text = assistant.extract_text_from_pdf(uploaded_file) | |
| st.session_state.query = extracted_text | |
| except Exception as e: | |
| st.error(f"Error processing file: {str(e)}") | |
| col1, col2 = st.columns([2, 1]) | |
| with col1: | |
| st.header("Search Products") | |
| # Use session_state query so that we can clear it after search if desired. | |
| query = st.text_area( | |
| "Describe what you're looking for (include quantities if needed):", | |
| height=100, | |
| value=st.session_state.query | |
| ) | |
| if st.button("Search", key="search_button"): | |
| if query: | |
| with st.spinner("Searching..."): | |
| results = assistant.process_natural_language_query(query) | |
| st.session_state.last_results = results | |
| # Optionally clear the query after search: | |
| st.session_state.query = "" | |
| # Display search results if available. | |
| if st.session_state.last_results is not None: | |
| if isinstance(st.session_state.last_results, str): | |
| st.write(st.session_state.last_results) | |
| else: | |
| st.subheader("Results") | |
| for index, row in st.session_state.last_results.iterrows(): | |
| with st.container(): | |
| cols = st.columns([3, 1]) | |
| with cols[0]: | |
| st.write(f"**{row['ProductName']}**") | |
| st.write(f"Price: ${row['RetailPrice']:.2f} | Qty: {row['Quantity']}") | |
| with cols[1]: | |
| # When a product is added, update the cart state without forcing a full rerun. | |
| if st.button("Add", key=f"add_{index}"): | |
| add_to_cart(row.to_dict()) | |
| st.success(f"Added {row['ProductName']} to cart") | |
| total_search = (st.session_state.last_results['RetailPrice'] * st.session_state.last_results['Quantity']).sum() | |
| st.markdown(f"**Total for these items: ${total_search:.2f}**") | |
| with col2: | |
| st.header("Shopping Cart") | |
| if st.session_state.cart: | |
| total_cart = 0 | |
| for item in st.session_state.cart: | |
| cols = st.columns([3, 1, 1]) | |
| with cols[0]: | |
| st.write(f"{item['ProductName']} x{item['Quantity']}") | |
| with cols[1]: | |
| cost = item['RetailPrice'] * item['Quantity'] | |
| st.write(f"${cost:.2f}") | |
| with cols[2]: | |
| if st.button("❌", key=f"del_{item['ProductName']}"): | |
| remove_from_cart(item['ProductName']) | |
| # We use experimental_rerun here so that the cart updates immediately. | |
| st.experimental_rerun() | |
| total_cart += item['RetailPrice'] * item['Quantity'] | |
| st.divider() | |
| st.write(f"**Total: ${total_cart:.2f}**") | |
| if st.button("Checkout"): | |
| receipt = generate_receipt() | |
| st.download_button( | |
| label="Download Receipt", | |
| data=receipt, | |
| file_name="bon_marche_receipt.pdf", | |
| mime="application/pdf" | |
| ) | |
| else: | |
| st.write("Your cart is empty") | |
| if __name__ == "__main__": | |
| main() | |