import os import logging import socket import io import requests import dns.resolver import pandas as pd import google.generativeai as genai from heyoo import WhatsApp from dotenv import load_dotenv from flask import Flask, request, make_response from typing import Dict, List, Union import json from PIL import Image # Initialize Flask App app = Flask(__name__) # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) # Load environment variables load_dotenv() # DNS Configuration nameserver1 = os.getenv('nameserver1', '8.8.8.8') nameserver2 = os.getenv('nameserver2', '8.8.4.4') def setup_dns(): """Configure DNS resolution globally""" resolver = dns.resolver.Resolver() resolver.nameservers = [nameserver1, nameserver2] _orig_getaddrinfo = socket.getaddrinfo def new_getaddrinfo(*args, **kwargs): try: if args and args[0] == 'graph.facebook.com': answers = resolver.resolve('graph.facebook.com', 'A') ip = str(answers[0]) return _orig_getaddrinfo(ip, *args[1:], **kwargs) except Exception as e: logger.error(f"DNS resolution failed: {e}") return _orig_getaddrinfo(*args, **kwargs) socket.getaddrinfo = new_getaddrinfo setup_dns() # Initialize WhatsApp try: messenger = WhatsApp( os.getenv("whatsapp_token"), phone_number_id=os.getenv("phone_number_id") ) except Exception as e: logger.error(f"Failed to initialize WhatsApp: {str(e)}") raise VERIFY_TOKEN = "30cca545-3838-48b2-80a7-9e43b1ae8ce4" # Gemini Configuration genai.configure(api_key=os.getenv("google_api_key")) vision_model = genai.GenerativeModel('gemini-2.0-flash-exp') text_model = genai.GenerativeModel('gemini-2.0-flash-exp') # Product Data and Cart try: df = pd.read_csv("supermarket_data1.csv") product_data = df.to_dict('records') except Exception as e: logger.error(f"Failed to load product data: {str(e)}") raise carts: Dict[str, List[Dict]] = {} class ShoppingAssistant: def __init__(self): self.product_data = product_data self.last_analyzed_products = {} def process_input(self, content: Union[str, bytes], content_type: str, mobile: str) -> dict: """Multimodal input processor""" try: if content_type == "text": return self._process_text(content, mobile) elif content_type == "image": return self._process_image(content, mobile) else: return {"error": "Unsupported content type"} except Exception as e: logger.error(f"Processing error: {str(e)}") return {"error": "Failed to process input"} def _process_text(self, text: str, mobile: str) -> dict: """Process text input using Gemini""" response = text_model.generate_content( f"""As a retail shopping assistant, analyze this request and extract shopping context, specific products needed, pick only one item per product extracted unless the user specifies the amount or number of items for a particular product. If the user specifies budget make sure your suggest is within that budget. If a user suggest a scenario planning a dinner, planning a party, I want to clean my car etc, be analytic and creative and suggest appropriate product Request: {text}""" ) analysis = json.loads(response.text) return self._match_products(analysis.get('products', []), mobile) def _process_image(self, image_bytes: bytes, mobile: str) -> dict: """Process image input using Gemini Vision""" img = Image.open(io.BytesIO(image_bytes)) response = vision_model.generate_content([ "Analyze this shopping-related image and identify products", img ]) analysis = json.loads(response.text) return self._match_products(analysis.get('products', []), mobile) def _match_products(self, detected_items: List[dict], mobile: str) -> dict: """Match detected items with inventory""" matched_products = [] for item in detected_items: product = next( (p for p in self.product_data if p['product'].lower() == item['name'].lower()), None ) if product: matched_products.append({ **product, 'quantity': item.get('quantity', 1) }) if mobile: self.last_analyzed_products[mobile] = matched_products return { "products": matched_products, "total": sum(p['price']*p.get('quantity',1) for p in matched_products), "message": f"Found {len(matched_products)} matching products" } def create_reply_button_message(text: str, buttons: List[Dict]) -> Dict: """Create interactive button message""" return { "type": "button", "body": {"text": text}, "action": { "buttons": [ { "type": "reply", "reply": { "id": btn["id"], "title": btn["title"] } } for btn in buttons ] } } @app.get("/") def verify_token(): """Webhook verification""" if request.args.get("hub.verify_token") == VERIFY_TOKEN: return request.args.get("hub.challenge") return "Invalid verification token" @app.post("/") def webhook(): """Main webhook handler""" try: data = request.get_json() mobile = messenger.get_mobile(data) message_type = messenger.get_message_type(data) assistant = ShoppingAssistant() # Handle text input if message_type == "text": message = messenger.get_message(data) result = assistant.process_input(message, "text", mobile) if result.get('products'): products_text = "šŸ›’ *Suggested Products:*\n\n" for p in result['products']: products_text += f"āž¤ {p['product']} x{p.get('quantity',1)}\n Price: ${p['price']}\n\n" products_text += f"šŸ’µ Total: ${result['total']:.2f}" buttons = [ {"id": "add_to_cart", "title": "šŸ›’ Add to Cart"}, {"id": "cancel", "title": "āŒ Cancel"} ] messenger.send_reply_button( mobile, create_reply_button_message(products_text, buttons) ) else: messenger.send_message("āŒ No matching products found", mobile) # Handle image input elif message_type == "image": image_url = messenger.get_image_url(data) response = requests.get(image_url, headers={ "Authorization": f"Bearer {os.getenv('whatsapp_token')}" }) if response.status_code == 200: result = assistant.process_input(response.content, "image", mobile) if result.get('products'): products_text = "šŸ“ø *Products Found in Your Image:*\n\n" for p in result['products']: products_text += f"āž¤ {p['product']} x{p.get('quantity',1)}\n Price: ${p['price']}\n\n" products_text += f"šŸ’µ Total: ${result['total']:.2f}" buttons = [ {"id": "add_to_cart", "title": "šŸ›’ Add to Cart"}, {"id": "cancel", "title": "āŒ Cancel"} ] messenger.send_reply_button( mobile, create_reply_button_message(products_text, buttons) ) else: messenger.send_message("āŒ No products found in the image", mobile) # Handle button interactions elif message_type == "interactive": response = messenger.get_interactive_response(data) button_id = response.get("id") if button_id == "add_to_cart": if mobile in assistant.last_analyzed_products: if mobile not in carts: carts[mobile] = [] carts[mobile].extend(assistant.last_analyzed_products[mobile]) buttons = [ {"id": "continue_shopping", "title": "šŸ›ļø Continue Shopping"}, {"id": "checkout", "title": "šŸ’³ Checkout"} ] messenger.send_reply_button( mobile, create_reply_button_message( "āœ… Items added to cart!\nWhat would you like to do next?", buttons ) ) else: messenger.send_message("āŒ No products to add", mobile) elif button_id == "checkout": if mobile in carts and carts[mobile]: total = sum(p['price']*p.get('quantity',1) for p in carts[mobile]) receipt = "šŸ“‹ *Your Cart Summary:*\n\n" receipt += "\n".join( f"āž¤ {p['product']} x{p.get('quantity',1)} - ${p['price']}" for p in carts[mobile] ) receipt += f"\n\nšŸ’³ Total: ${total:.2f}" buttons = [ {"id": "confirm_checkout", "title": "āœ… Confirm Order"}, {"id": "edit_cart", "title": "āœļø Edit Cart"} ] messenger.send_reply_button( mobile, create_reply_button_message(receipt, buttons) ) else: messenger.send_message("šŸ›’ Your cart is empty!", mobile) return "OK", 200 except Exception as e: logger.error(f"Webhook error: {str(e)}") return "Internal Server Error", 500 if __name__ == "__main__": app.run(host="0.0.0.0", port=7860)