Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import pandas as pd | |
| from langchain.memory import ConversationBufferMemory | |
| from langchain_google_genai import ChatGoogleGenerativeAI | |
| import google.generativeai as genai | |
| from PIL import Image | |
| import PyPDF2 | |
| import os | |
| from langchain.agents import initialize_agent, Tool | |
| from langchain.agents.agent_types import AgentType | |
| from difflib import get_close_matches | |
| from dotenv import load_dotenv | |
| # Load environment variables | |
| load_dotenv() | |
| # Configure Google API | |
| genai.configure(api_key=os.getenv('GOOGLE_API_KEY')) | |
| def configure_gemini(api_key): | |
| genai.configure(api_key=api_key) | |
| return genai.GenerativeModel('gemini-2.0-flash-thinking-exp') | |
| model = configure_gemini(os.environ['GOOGLE_API_KEY']) | |
| # Initialize Gemini models | |
| llm_flash_exp = ChatGoogleGenerativeAI( | |
| model="gemini-2.0-flash-exp", | |
| max_retries=2 | |
| ) | |
| class SmartShoppingAssistant: | |
| def __init__(self, products_df): | |
| self.df = products_df | |
| self.memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True) | |
| self.setup_agent() | |
| def find_closest_product(self, product_name, threshold=0.6): | |
| matches = get_close_matches( | |
| product_name.upper(), | |
| self.df['ProductName'].str.upper().tolist(), | |
| n=3, # Increased to get more potential matches | |
| cutoff=threshold | |
| ) | |
| return matches if matches else [] | |
| def match_products_with_catalogue(self, extracted_items): | |
| """Match extracted items with catalogue products using Gemini""" | |
| product_list = self.df['ProductName'].tolist() | |
| product_string = ", ".join(product_list) | |
| prompt = f""" | |
| Given these extracted items and quantities: | |
| {extracted_items} | |
| And this product catalogue: | |
| {product_string} | |
| Match each item with the most appropriate product from the catalogue. | |
| For each item, provide: | |
| 1. The exact product name from the catalogue | |
| 2. The quantity (if specified, otherwise assume 1) | |
| 3. Any specific requirements (brand, size, etc.) | |
| Format the response as: | |
| ProductName == "MATCHED_PRODUCT" quantity: NUMBER or ProductName == "MATCHED_PRODUCT" quantity: NUMBER | |
| Only include products that have good matches in the catalogue. | |
| """ | |
| try: | |
| matches = llm_flash_exp.predict(prompt) | |
| return matches.strip() | |
| except Exception as e: | |
| return f"Error matching products: {str(e)}" | |
| def search_products_fuzzy(self, product_names_with_quantities): | |
| """Search for products using fuzzy matching with quantity information""" | |
| results = pd.DataFrame() | |
| for item in product_names_with_quantities: | |
| product_info = item.split('quantity:') | |
| product_name = product_info[0].strip() | |
| quantity = int(product_info[1].strip()) if len(product_info) > 1 else 1 | |
| # Clean up product name | |
| if 'ProductName ==' in product_name: | |
| product_name = product_name.split('==')[1].strip(' "\'') | |
| closest_matches = self.find_closest_product(product_name) | |
| for match in closest_matches: | |
| matched_products = self.df[self.df['ProductName'].str.upper() == match.upper()] | |
| if not matched_products.empty: | |
| matched_products['Quantity'] = quantity | |
| results = pd.concat([results, matched_products]) | |
| break | |
| return results | |
| def setup_agent(self): | |
| """Set up the LangChain agent with necessary tools""" | |
| def search_products(query): | |
| try: | |
| # Split into individual product entries | |
| product_entries = [entry.strip() for entry in query.split('or')] | |
| results = self.search_products_fuzzy(product_entries) | |
| if not results.empty: | |
| # Format results with quantity | |
| formatted_results = results.apply( | |
| lambda x: f"{x['ProductName']} (Quantity: {x['Quantity']})\nPrice: ${x['RetailPrice']:.2f}\n", | |
| axis=1 | |
| ) | |
| return "\n".join(formatted_results) | |
| return "No products found matching your criteria." | |
| except Exception as e: | |
| return f"Error executing query: {str(e)}" | |
| tools = [ | |
| Tool( | |
| name="Product Search", | |
| func=search_products, | |
| description="Search for products in the supermarket database using fuzzy matching" | |
| ) | |
| ] | |
| self.agent = initialize_agent( | |
| tools=tools, | |
| memory=self.memory, | |
| llm=llm_flash_exp, | |
| agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, | |
| verbose=True, | |
| max_iterations=3 | |
| ) | |
| def process_natural_language_query(self, query): | |
| """Process natural language query with two-step matching""" | |
| try: | |
| # First step: Extract items and quantities | |
| extraction_prompt = f""" | |
| Extract the products and their quantities from this shopping request. | |
| Include any specific requirements mentioned. | |
| Shopping request: {query} | |
| Format each item with its quantity (assume 1 if not specified). | |
| """ | |
| extracted_items = llm_flash_exp.predict(extraction_prompt) | |
| # Second step: Match with catalogue | |
| matched_products = self.match_products_with_catalogue(extracted_items) | |
| # Third step: Search and return results | |
| result = self.agent.run(f"Search for products matching the specified names: {matched_products}") | |
| return result | |
| except Exception as e: | |
| return f"Error processing query: {str(e)}" | |
| def extract_text_from_image(self, image): | |
| """Extract text from uploaded image using Gemini""" | |
| prompt = """ | |
| Analyze this image and extract products and their quantities. | |
| If quantities aren't specified, make reasonable assumptions based on typical shopping patterns. | |
| List each item with its quantity. | |
| """ | |
| try: | |
| response = model.generate_content([prompt, image]) | |
| return response.text | |
| except Exception as e: | |
| return f"Error processing image: {str(e)}" | |
| def extract_text_from_pdf(self, pdf_file): | |
| """Extract text from uploaded PDF""" | |
| try: | |
| pdf_reader = PyPDF2.PdfReader(pdf_file) | |
| text = "" | |
| for page in pdf_reader.pages: | |
| text += page.extract_text() | |
| return text | |
| except Exception as e: | |
| return f"Error processing PDF: {str(e)}" | |
| # Main function remains the same | |
| def main(): | |
| st.set_page_config(page_title="Smart Shopping Assistant", layout="wide") | |
| st.title("🛒 Smart Shopping Assistant") | |
| def load_product_data(): | |
| return pd.read_csv('supermarket4.csv') | |
| df = load_product_data() | |
| assistant = SmartShoppingAssistant(df) | |
| with st.sidebar: | |
| st.header("Upload Shopping List") | |
| uploaded_file = st.file_uploader( | |
| "Upload an image or PDF of your shopping list", | |
| type=['png', 'jpg', 'jpeg', 'pdf'] | |
| ) | |
| if uploaded_file: | |
| try: | |
| if uploaded_file.type.startswith('image'): | |
| with st.spinner("Extracting items from image..."): | |
| image = Image.open(uploaded_file) | |
| extracted_text = assistant.extract_text_from_image(image) | |
| st.session_state.query = extracted_text | |
| elif uploaded_file.type == 'application/pdf': | |
| with st.spinner("Extracting items from PDF..."): | |
| extracted_text = assistant.extract_text_from_pdf(uploaded_file) | |
| st.session_state.query = extracted_text | |
| except Exception as e: | |
| st.error(f"Error processing file: {str(e)}") | |
| col1, col2 = st.columns([2, 1]) | |
| with col1: | |
| st.header("Search Products") | |
| query = st.text_area( | |
| "Describe what you're looking for (include quantities if needed):", | |
| height=100, | |
| placeholder="Example: 2 boxes of healthy breakfast cereals under $5, 1 gallon of milk", | |
| value=st.session_state.get('query', '') | |
| ) | |
| if st.button("Search"): | |
| if query: | |
| with st.spinner("Searching for products..."): | |
| results = assistant.process_natural_language_query(query) | |
| st.write("### Results") | |
| st.write(results) | |
| else: | |
| st.warning("Please enter a search query or upload a shopping list.") | |
| with col2: | |
| st.header("Shopping Cart") | |
| if 'cart' not in st.session_state: | |
| st.session_state.cart = [] | |
| st.write("Your cart is empty" if not st.session_state.cart else "Cart items here") | |
| if __name__ == "__main__": | |
| main() |