BraveShopper / main.py
rairo's picture
Update main.py
2713cdf verified
raw
history blame
10.7 kB
import os
import logging
import socket
import io
import requests
import dns.resolver
import pandas as pd
import google.generativeai as genai
from heyoo import WhatsApp
from dotenv import load_dotenv
from flask import Flask, request, make_response
from typing import Dict, List, Union
import json
from PIL import Image
# Initialize Flask App
app = Flask(__name__)
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# Load environment variables
load_dotenv()
# DNS Configuration
nameserver1 = os.getenv('nameserver1', '8.8.8.8')
nameserver2 = os.getenv('nameserver2', '8.8.4.4')
def setup_dns():
"""Configure DNS resolution globally"""
resolver = dns.resolver.Resolver()
resolver.nameservers = [nameserver1, nameserver2]
_orig_getaddrinfo = socket.getaddrinfo
def new_getaddrinfo(*args, **kwargs):
try:
if args and args[0] == 'graph.facebook.com':
answers = resolver.resolve('graph.facebook.com', 'A')
ip = str(answers[0])
return _orig_getaddrinfo(ip, *args[1:], **kwargs)
except Exception as e:
logger.error(f"DNS resolution failed: {e}")
return _orig_getaddrinfo(*args, **kwargs)
socket.getaddrinfo = new_getaddrinfo
setup_dns()
# Initialize WhatsApp
try:
messenger = WhatsApp(
os.getenv("whatsapp_token"),
phone_number_id=os.getenv("phone_number_id")
)
except Exception as e:
logger.error(f"Failed to initialize WhatsApp: {str(e)}")
raise
VERIFY_TOKEN = "30cca545-3838-48b2-80a7-9e43b1ae8ce4"
# Gemini Configuration
genai.configure(api_key=os.getenv("google_api_key"))
vision_model = genai.GenerativeModel('gemini-2.0-flash-exp')
text_model = genai.GenerativeModel('gemini-2.0-flash-exp')
# Product Data and Cart
try:
df = pd.read_csv("supermarket_data1.csv")
product_data = df.to_dict('records')
except Exception as e:
logger.error(f"Failed to load product data: {str(e)}")
raise
carts: Dict[str, List[Dict]] = {}
class ShoppingAssistant:
def __init__(self):
self.product_data = product_data
self.last_analyzed_products = {}
def process_input(self, content: Union[str, bytes], content_type: str, mobile: str) -> dict:
"""Multimodal input processor"""
try:
if content_type == "text":
return self._process_text(content, mobile)
elif content_type == "image":
return self._process_image(content, mobile)
else:
return {"error": "Unsupported content type"}
except Exception as e:
logger.error(f"Processing error: {str(e)}")
return {"error": "Failed to process input"}
def _process_text(self, text: str, mobile: str) -> dict:
"""Process text input using Gemini"""
response = text_model.generate_content(
f"""As a retail shopping assistant, analyze this request and extract shopping context,
specific products needed, pick only one item per product extracted
unless the user specifies the amount or number of items for a particular product.
If the user specifies budget make sure your suggest is within that budget.
If a user suggest a scenario planning a dinner, planning a party, I want to clean my car etc, be analytic and creative
and suggest appropriate product
Request: {text}"""
)
analysis = json.loads(response.text)
return self._match_products(analysis.get('products', []), mobile)
def _process_image(self, image_bytes: bytes, mobile: str) -> dict:
"""Process image input using Gemini Vision"""
img = Image.open(io.BytesIO(image_bytes))
response = vision_model.generate_content([
"Analyze this shopping-related image and identify products",
img
])
analysis = json.loads(response.text)
return self._match_products(analysis.get('products', []), mobile)
def _match_products(self, detected_items: List[dict], mobile: str) -> dict:
"""Match detected items with inventory"""
matched_products = []
for item in detected_items:
product = next(
(p for p in self.product_data
if p['product'].lower() == item['name'].lower()),
None
)
if product:
matched_products.append({
**product,
'quantity': item.get('quantity', 1)
})
if mobile:
self.last_analyzed_products[mobile] = matched_products
return {
"products": matched_products,
"total": sum(p['price']*p.get('quantity',1) for p in matched_products),
"message": f"Found {len(matched_products)} matching products"
}
def create_reply_button_message(text: str, buttons: List[Dict]) -> Dict:
"""Create interactive button message"""
return {
"type": "button",
"body": {"text": text},
"action": {
"buttons": [
{
"type": "reply",
"reply": {
"id": btn["id"],
"title": btn["title"]
}
} for btn in buttons
]
}
}
@app.get("/")
def verify_token():
"""Webhook verification"""
if request.args.get("hub.verify_token") == VERIFY_TOKEN:
return request.args.get("hub.challenge")
return "Invalid verification token"
@app.post("/")
def webhook():
"""Main webhook handler"""
try:
data = request.get_json()
mobile = messenger.get_mobile(data)
message_type = messenger.get_message_type(data)
assistant = ShoppingAssistant()
# Handle text input
if message_type == "text":
message = messenger.get_message(data)
result = assistant.process_input(message, "text", mobile)
if result.get('products'):
products_text = "πŸ›’ *Suggested Products:*\n\n"
for p in result['products']:
products_text += f"➀ {p['product']} x{p.get('quantity',1)}\n Price: ${p['price']}\n\n"
products_text += f"πŸ’΅ Total: ${result['total']:.2f}"
buttons = [
{"id": "add_to_cart", "title": "πŸ›’ Add to Cart"},
{"id": "cancel", "title": "❌ Cancel"}
]
messenger.send_reply_button(
mobile,
create_reply_button_message(products_text, buttons)
)
else:
messenger.send_message("❌ No matching products found", mobile)
# Handle image input
elif message_type == "image":
image_url = messenger.get_image_url(data)
response = requests.get(image_url, headers={
"Authorization": f"Bearer {os.getenv('whatsapp_token')}"
})
if response.status_code == 200:
result = assistant.process_input(response.content, "image", mobile)
if result.get('products'):
products_text = "πŸ“Έ *Products Found in Your Image:*\n\n"
for p in result['products']:
products_text += f"➀ {p['product']} x{p.get('quantity',1)}\n Price: ${p['price']}\n\n"
products_text += f"πŸ’΅ Total: ${result['total']:.2f}"
buttons = [
{"id": "add_to_cart", "title": "πŸ›’ Add to Cart"},
{"id": "cancel", "title": "❌ Cancel"}
]
messenger.send_reply_button(
mobile,
create_reply_button_message(products_text, buttons)
)
else:
messenger.send_message("❌ No products found in the image", mobile)
# Handle button interactions
elif message_type == "interactive":
response = messenger.get_interactive_response(data)
button_id = response.get("id")
if button_id == "add_to_cart":
if mobile in assistant.last_analyzed_products:
if mobile not in carts:
carts[mobile] = []
carts[mobile].extend(assistant.last_analyzed_products[mobile])
buttons = [
{"id": "continue_shopping", "title": "πŸ›οΈ Continue Shopping"},
{"id": "checkout", "title": "πŸ’³ Checkout"}
]
messenger.send_reply_button(
mobile,
create_reply_button_message(
"βœ… Items added to cart!\nWhat would you like to do next?",
buttons
)
)
else:
messenger.send_message("❌ No products to add", mobile)
elif button_id == "checkout":
if mobile in carts and carts[mobile]:
total = sum(p['price']*p.get('quantity',1) for p in carts[mobile])
receipt = "πŸ“‹ *Your Cart Summary:*\n\n"
receipt += "\n".join(
f"➀ {p['product']} x{p.get('quantity',1)} - ${p['price']}"
for p in carts[mobile]
)
receipt += f"\n\nπŸ’³ Total: ${total:.2f}"
buttons = [
{"id": "confirm_checkout", "title": "βœ… Confirm Order"},
{"id": "edit_cart", "title": "✏️ Edit Cart"}
]
messenger.send_reply_button(
mobile,
create_reply_button_message(receipt, buttons)
)
else:
messenger.send_message("πŸ›’ Your cart is empty!", mobile)
return "OK", 200
except Exception as e:
logger.error(f"Webhook error: {str(e)}")
return "Internal Server Error", 500
if __name__ == "__main__":
app.run(host="0.0.0.0", port=7860)