Spaces:
Sleeping
Sleeping
| import os | |
| import logging | |
| import socket | |
| import io | |
| import requests | |
| import dns.resolver | |
| import pandas as pd | |
| import google.generativeai as genai | |
| from heyoo import WhatsApp | |
| from dotenv import load_dotenv | |
| from flask import Flask, request, make_response | |
| from typing import Dict, List, Union | |
| import json | |
| from PIL import Image | |
| # Initialize Flask App | |
| app = Flask(__name__) | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Load environment variables | |
| load_dotenv() | |
| # DNS Configuration | |
| nameserver1 = os.getenv('nameserver1', '8.8.8.8') | |
| nameserver2 = os.getenv('nameserver2', '8.8.4.4') | |
| def setup_dns(): | |
| """Configure DNS resolution globally""" | |
| resolver = dns.resolver.Resolver() | |
| resolver.nameservers = [nameserver1, nameserver2] | |
| _orig_getaddrinfo = socket.getaddrinfo | |
| def new_getaddrinfo(*args, **kwargs): | |
| try: | |
| if args and args[0] == 'graph.facebook.com': | |
| answers = resolver.resolve('graph.facebook.com', 'A') | |
| ip = str(answers[0]) | |
| return _orig_getaddrinfo(ip, *args[1:], **kwargs) | |
| except Exception as e: | |
| logger.error(f"DNS resolution failed: {e}") | |
| return _orig_getaddrinfo(*args, **kwargs) | |
| socket.getaddrinfo = new_getaddrinfo | |
| setup_dns() | |
| # Initialize WhatsApp | |
| try: | |
| messenger = WhatsApp( | |
| os.getenv("whatsapp_token"), | |
| phone_number_id=os.getenv("phone_number_id") | |
| ) | |
| except Exception as e: | |
| logger.error(f"Failed to initialize WhatsApp: {str(e)}") | |
| raise | |
| VERIFY_TOKEN = "30cca545-3838-48b2-80a7-9e43b1ae8ce4" | |
| # Gemini Configuration | |
| genai.configure(api_key=os.getenv("google_api_key")) | |
| vision_model = genai.GenerativeModel('gemini-2.0-flash-exp') | |
| text_model = genai.GenerativeModel('gemini-2.0-flash-exp') | |
| # Product Data and Cart | |
| try: | |
| df = pd.read_csv("supermarket_data1.csv") | |
| product_data = df.to_dict('records') | |
| except Exception as e: | |
| logger.error(f"Failed to load product data: {str(e)}") | |
| raise | |
| carts: Dict[str, List[Dict]] = {} | |
| class ShoppingAssistant: | |
| def __init__(self): | |
| self.product_data = product_data | |
| self.last_analyzed_products = {} | |
| def process_input(self, content: Union[str, bytes], content_type: str, mobile: str) -> dict: | |
| """Multimodal input processor""" | |
| try: | |
| if content_type == "text": | |
| return self._process_text(content, mobile) | |
| elif content_type == "image": | |
| return self._process_image(content, mobile) | |
| else: | |
| return {"error": "Unsupported content type"} | |
| except Exception as e: | |
| logger.error(f"Processing error: {str(e)}") | |
| return {"error": "Failed to process input"} | |
| def _process_text(self, text: str, mobile: str) -> dict: | |
| """Process text input using Gemini""" | |
| response = text_model.generate_content( | |
| f"""As a retail shopping assistant, analyze this request and extract shopping context, | |
| specific products needed, pick only one item per product extracted | |
| unless the user specifies the amount or number of items for a particular product. | |
| If the user specifies budget make sure your suggest is within that budget. | |
| If a user suggest a scenario planning a dinner, planning a party, I want to clean my car etc, be analytic and creative | |
| and suggest appropriate product | |
| Request: {text}""" | |
| ) | |
| analysis = json.loads(response.text) | |
| return self._match_products(analysis.get('products', []), mobile) | |
| def _process_image(self, image_bytes: bytes, mobile: str) -> dict: | |
| """Process image input using Gemini Vision""" | |
| img = Image.open(io.BytesIO(image_bytes)) | |
| response = vision_model.generate_content([ | |
| "Analyze this shopping-related image and identify products", | |
| img | |
| ]) | |
| analysis = json.loads(response.text) | |
| return self._match_products(analysis.get('products', []), mobile) | |
| def _match_products(self, detected_items: List[dict], mobile: str) -> dict: | |
| """Match detected items with inventory""" | |
| matched_products = [] | |
| for item in detected_items: | |
| product = next( | |
| (p for p in self.product_data | |
| if p['product'].lower() == item['name'].lower()), | |
| None | |
| ) | |
| if product: | |
| matched_products.append({ | |
| **product, | |
| 'quantity': item.get('quantity', 1) | |
| }) | |
| if mobile: | |
| self.last_analyzed_products[mobile] = matched_products | |
| return { | |
| "products": matched_products, | |
| "total": sum(p['price']*p.get('quantity',1) for p in matched_products), | |
| "message": f"Found {len(matched_products)} matching products" | |
| } | |
| def create_reply_button_message(text: str, buttons: List[Dict]) -> Dict: | |
| """Create interactive button message""" | |
| return { | |
| "type": "button", | |
| "body": {"text": text}, | |
| "action": { | |
| "buttons": [ | |
| { | |
| "type": "reply", | |
| "reply": { | |
| "id": btn["id"], | |
| "title": btn["title"] | |
| } | |
| } for btn in buttons | |
| ] | |
| } | |
| } | |
| def verify_token(): | |
| """Webhook verification""" | |
| if request.args.get("hub.verify_token") == VERIFY_TOKEN: | |
| return request.args.get("hub.challenge") | |
| return "Invalid verification token" | |
| def webhook(): | |
| """Main webhook handler""" | |
| try: | |
| data = request.get_json() | |
| mobile = messenger.get_mobile(data) | |
| message_type = messenger.get_message_type(data) | |
| assistant = ShoppingAssistant() | |
| # Handle text input | |
| if message_type == "text": | |
| message = messenger.get_message(data) | |
| result = assistant.process_input(message, "text", mobile) | |
| if result.get('products'): | |
| products_text = "π *Suggested Products:*\n\n" | |
| for p in result['products']: | |
| products_text += f"β€ {p['product']} x{p.get('quantity',1)}\n Price: ${p['price']}\n\n" | |
| products_text += f"π΅ Total: ${result['total']:.2f}" | |
| buttons = [ | |
| {"id": "add_to_cart", "title": "π Add to Cart"}, | |
| {"id": "cancel", "title": "β Cancel"} | |
| ] | |
| messenger.send_reply_button( | |
| mobile, | |
| create_reply_button_message(products_text, buttons) | |
| ) | |
| else: | |
| messenger.send_message("β No matching products found", mobile) | |
| # Handle image input | |
| elif message_type == "image": | |
| image_url = messenger.get_image_url(data) | |
| response = requests.get(image_url, headers={ | |
| "Authorization": f"Bearer {os.getenv('whatsapp_token')}" | |
| }) | |
| if response.status_code == 200: | |
| result = assistant.process_input(response.content, "image", mobile) | |
| if result.get('products'): | |
| products_text = "πΈ *Products Found in Your Image:*\n\n" | |
| for p in result['products']: | |
| products_text += f"β€ {p['product']} x{p.get('quantity',1)}\n Price: ${p['price']}\n\n" | |
| products_text += f"π΅ Total: ${result['total']:.2f}" | |
| buttons = [ | |
| {"id": "add_to_cart", "title": "π Add to Cart"}, | |
| {"id": "cancel", "title": "β Cancel"} | |
| ] | |
| messenger.send_reply_button( | |
| mobile, | |
| create_reply_button_message(products_text, buttons) | |
| ) | |
| else: | |
| messenger.send_message("β No products found in the image", mobile) | |
| # Handle button interactions | |
| elif message_type == "interactive": | |
| response = messenger.get_interactive_response(data) | |
| button_id = response.get("id") | |
| if button_id == "add_to_cart": | |
| if mobile in assistant.last_analyzed_products: | |
| if mobile not in carts: | |
| carts[mobile] = [] | |
| carts[mobile].extend(assistant.last_analyzed_products[mobile]) | |
| buttons = [ | |
| {"id": "continue_shopping", "title": "ποΈ Continue Shopping"}, | |
| {"id": "checkout", "title": "π³ Checkout"} | |
| ] | |
| messenger.send_reply_button( | |
| mobile, | |
| create_reply_button_message( | |
| "β Items added to cart!\nWhat would you like to do next?", | |
| buttons | |
| ) | |
| ) | |
| else: | |
| messenger.send_message("β No products to add", mobile) | |
| elif button_id == "checkout": | |
| if mobile in carts and carts[mobile]: | |
| total = sum(p['price']*p.get('quantity',1) for p in carts[mobile]) | |
| receipt = "π *Your Cart Summary:*\n\n" | |
| receipt += "\n".join( | |
| f"β€ {p['product']} x{p.get('quantity',1)} - ${p['price']}" | |
| for p in carts[mobile] | |
| ) | |
| receipt += f"\n\nπ³ Total: ${total:.2f}" | |
| buttons = [ | |
| {"id": "confirm_checkout", "title": "β Confirm Order"}, | |
| {"id": "edit_cart", "title": "βοΈ Edit Cart"} | |
| ] | |
| messenger.send_reply_button( | |
| mobile, | |
| create_reply_button_message(receipt, buttons) | |
| ) | |
| else: | |
| messenger.send_message("π Your cart is empty!", mobile) | |
| return "OK", 200 | |
| except Exception as e: | |
| logger.error(f"Webhook error: {str(e)}") | |
| return "Internal Server Error", 500 | |
| if __name__ == "__main__": | |
| app.run(host="0.0.0.0", port=7860) |