Botpy-808 / app.py
Fred808's picture
Update app.py
8744ed4 verified
import re
import os
import time
import requests
import base64
import asyncio
from datetime import datetime, timedelta
from bs4 import BeautifulSoup
from sqlalchemy import select
from fuzzywuzzy import fuzz
from pydantic import BaseModel
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from fastapi import FastAPI, Request, HTTPException, BackgroundTasks, UploadFile, File, Form
from fastapi.responses import JSONResponse, StreamingResponse, RedirectResponse
import openai
from textblob import TextBlob
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy import Column, Integer, String, DateTime, Text, Float
SPOONACULAR_API_KEY = os.getenv("SPOONACULAR_API_KEY", "default_fallback_value")
PAYSTACK_SECRET_KEY = os.getenv("PAYSTACK_SECRET_KEY", "default_fallback_value")
DATABASE_URL = os.getenv("DATABASE_URL", "default_fallback_value")
NVIDIA_API_KEY = os.getenv("NVIDIA_API_KEY", "default_fallback_value")
openai.api_key = os.getenv("OPENAI_API_KEY", "default_fallback_value")
GOOGLE_MAPS_API_KEY = os.getenv("GOOGLE_MAPS_API_KEY", "default_fallback_value")
WHATSAPP_PHONE_NUMBER_ID = os.getenv("WHATSAPP_PHONE_NUMBER_ID", "default_value")
WHATSAPP_ACCESS_TOKEN = os.getenv("WHATSAPP_ACCESS_TOKEN", "default_value")
MANAGEMENT_WHATSAPP_NUMBER = os.getenv("MANAGEMENT_WHATSAPP_NUMBER", "default_value")
TOWN_SHIPPING_COSTS = {
"lasu gate": 1000,
"ojo": 800,
"ajangbadi": 1200,
"iba": 900,
"okokomaiko": 1500,
"default": 1000
}
Base = declarative_base()
class ChatHistory(Base):
__tablename__ = "chat_history"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(String, index=True)
timestamp = Column(DateTime, default=datetime.utcnow)
direction = Column(String)
message = Column(Text)
class Order(Base):
__tablename__ = "orders"
id = Column(Integer, primary_key=True, index=True)
order_id = Column(String, unique=True, index=True)
user_id = Column(String, index=True)
dish = Column(String)
quantity = Column(String)
price = Column(String, default="0")
status = Column(String, default="Pending Payment")
payment_reference = Column(String, nullable=True)
delivery_address = Column(String, default="")
timestamp = Column(DateTime, default=datetime.utcnow)
class UserProfile(Base):
__tablename__ = "user_profiles"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(String, unique=True, index=True)
phone_number = Column(String, unique=True, index=True, nullable=True)
name = Column(String, default="Valued Customer")
email = Column(String, default="unknown@example.com")
preferences = Column(Text, default="")
last_interaction = Column(DateTime, default=datetime.utcnow)
order_ids = Column(Text, default="")
class SentimentLog(Base):
__tablename__ = "sentiment_logs"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(String, index=True)
timestamp = Column(DateTime, default=datetime.utcnow)
sentiment_score = Column(Float)
message = Column(Text)
class OrderTracking(Base):
__tablename__ = "order_tracking"
id = Column(Integer, primary_key=True, index=True)
order_id = Column(String, index=True)
status = Column(String)
message = Column(Text, nullable=True)
timestamp = Column(DateTime, default=datetime.utcnow)
class MenuItem(Base):
__tablename__ = "menu_items"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, unique=True, index=True)
description = Column(Text)
price = Column(Integer) # Price as an integer (in Naira)
nutrition = Column(Text)
class TownShippingCost(Base):
__tablename__ = "town_shipping_costs"
id = Column(Integer, primary_key=True, index=True)
town = Column(String, unique=True, index=True)
cost = Column(Integer)
engine = create_async_engine(DATABASE_URL, echo=True)
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async def init_db():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
user_state = {}
conversation_context = {}
proactive_timer = {}
menu_items = [
{"name": "Jollof Rice", "description": "A spicy and flavorful rice dish", "price": 1500, "nutrition": "Calories: 300 kcal, Carbs: 50g, Protein: 10g, Fat: 5g"},
{"name": "Fried Rice", "description": "A savory rice dish with vegetables and meat", "price": 1200, "nutrition": "Calories: 350 kcal, Carbs: 55g, Protein: 12g, Fat: 8g"},
{"name": "Chicken Wings", "description": "Crispy fried chicken wings", "price": 2000, "nutrition": "Calories: 400 kcal, Carbs: 20g, Protein: 25g, Fat: 15g"},
{"name": "Egusi Soup", "description": "A rich and hearty soup made with melon seeds", "price": 1000, "nutrition": "Calories: 250 kcal, Carbs: 15g, Protein: 8g, Fat: 10g"},
{"name": "Native Rice", "description": "Traditional native-style rice", "price": 1500, "nutrition": "Calories: 350 kcal, Carbs: 60g, Protein: 10g, Fat: 7g"},
{"name": "Spaghetti Jollof", "description": "Jollof-style spaghetti dish", "price": 1200, "nutrition": "Calories: 400 kcal, Carbs: 65g, Protein: 10g, Fat: 8g"},
{"name": "Macaroni", "description": "Jollof-style macaroni dish", "price": 1200, "nutrition": "Calories: 380 kcal, Carbs: 60g, Protein: 9g, Fat: 7g"},
{"name": "White Rice with Vegetable Sauce", "description": "White rice served with vegetable sauce and chicken", "price": 3700, "nutrition": "Calories: 450 kcal, Carbs: 60g, Protein: 15g, Fat: 10g"},
{"name": "Vegetable Sauce with Chicken", "description": "Vegetable sauce with chicken, no rice", "price": 2500, "nutrition": "Calories: 300 kcal, Carbs: 20g, Protein: 15g, Fat: 10g"},
{"name": "Chicken Drumstick", "description": "Fried or grilled chicken drumstick", "price": 2500, "nutrition": "Calories: 350 kcal, Carbs: 5g, Protein: 30g, Fat: 15g"},
{"name": "Chicken Wings", "description": "Crispy fried chicken wings", "price": 3960, "nutrition": "Calories: 450 kcal, Carbs: 20g, Protein: 30g, Fat: 20g"},
{"name": "Fried Chicken", "description": "Crispy deep-fried chicken", "price": 4140, "nutrition": "Calories: 500 kcal, Carbs: 15g, Protein: 35g, Fat: 25g"},
{"name": "Turkey", "description": "Crispy deep-fried turkey", "price": 4000, "nutrition": "Calories: 480 kcal, Carbs: 10g, Protein: 40g, Fat: 20g"},
{"name": "Gizzard", "description": "Fried or grilled gizzard", "price": 3000, "nutrition": "Calories: 200 kcal, Carbs: 5g, Protein: 20g, Fat: 8g"},
{"name": "Fish", "description": "Fried or grilled fish", "price": 2500, "nutrition": "Calories: 250 kcal, Carbs: 5g, Protein: 25g, Fat: 10g"},
{"name": "Beef", "description": "Spicy fried or grilled beef", "price": 1200, "nutrition": "Calories: 300 kcal, Carbs: 2g, Protein: 28g, Fat: 15g"},
{"name": "Pepper Chicken", "description": "Spicy peppered chicken", "price": 1500, "nutrition": "Calories: 400 kcal, Carbs: 10g, Protein: 35g, Fat: 18g"},
{"name": "Pepper Chicken Big", "description": "Larger portion of spicy peppered chicken", "price": 2500, "nutrition": "Calories: 600 kcal, Carbs: 15g, Protein: 50g, Fat: 25g"},
{"name": "Salad", "description": "Fresh vegetable salad", "price": 800, "nutrition": "Calories: 100 kcal, Carbs: 15g, Protein: 3g, Fat: 5g"},
{"name": "Egg and Yam", "description": "Boiled yam served with eggs", "price": 1000, "nutrition": "Calories: 300 kcal, Carbs: 50g, Protein: 10g, Fat: 5g"},
{"name": "Moi Moi", "description": "Steamed bean pudding", "price": 1000, "nutrition": "Calories: 250 kcal, Carbs: 30g, Protein: 12g, Fat: 8g"},
{"name": "Plantain", "description": "Fried or boiled ripe plantain", "price": 1000, "nutrition": "Calories: 200 kcal, Carbs: 50g, Protein: 2g, Fat: 1g"},
{"name": "Soup with Goat Meat and Semo", "description": "Rich soup with goat meat served with semo", "price": 4600, "nutrition": "Calories: 500 kcal, Carbs: 60g, Protein: 30g, Fat: 15g"},
{"name": "Soup with Chicken and Semo", "description": "Soup served with chicken and semo", "price": 500, "nutrition": "Calories: 450 kcal, Carbs: 55g, Protein: 28g, Fat: 12g"},
{"name": "Extra Semo", "description": "Additional portion of semo", "price": 500, "nutrition": "Calories: 200 kcal, Carbs: 45g, Protein: 5g, Fat: 1g"},
{"name": "Banga Soup", "description": "Traditional dish with catfish", "price": 5000, "nutrition": "Calories: 600 kcal, Carbs: 50g, Protein: 35g, Fat: 15g"}
]
nigerian_drinks = [
{"name": "Chapman", "description": "Popular Nigerian cocktail with a fruity, fizzy taste", "price": 1500, "nutrition": "Calories: 180 kcal, Carbs: 40g, Protein: 1g, Fat: 0g"},
{"name": "VitaMilk", "description": "Healthy, protein-rich plant-based milk", "price": 1000, "nutrition": "Calories: 180 kcal, Carbs: 20g, Protein: 8g, Fat: 6g"},
{"name": "Malt Drink", "description": "Non-alcoholic malt-based drink", "price": 1000, "nutrition": "Calories: 200 kcal, Carbs: 50g, Protein: 2g, Fat: 0g"},
{"name": "Soft Drinks", "description": "Carbonated sodas like Coke, Fanta, and Sprite", "price": 800, "nutrition": "Calories: 150 kcal, Carbs: 39g, Protein: 0g, Fat: 0g"},
{"name": "Bottled Water", "description": "Pure, refreshing drinking water", "price": 500, "nutrition": "Calories: 0 kcal, Carbs: 0g, Protein: 0g, Fat: 0g"},
{"name": "Energy Drink", "description": "Boost of energy with caffeine and vitamins", "price": 1500, "nutrition": "Calories: 200 kcal, Carbs: 50g, Protein: 1g, Fat: 0g"}
]
menu_items.extend(nigerian_drinks)
class ConversationState:
def __init__(self):
self.flow = None
self.step = 0
self.data = {}
self.last_active = datetime.utcnow()
def update_last_active(self):
self.last_active = datetime.utcnow()
def is_expired(self):
return datetime.utcnow() - self.last_active > SESSION_TIMEOUT
def reset(self):
self.flow = None
self.step = 0
self.data = {}
self.last_active = datetime.utcnow()
SESSION_TIMEOUT = timedelta(minutes=5)
async def populate_menu_items():
async with async_session() as session:
for item in menu_items:
# Check if this menu item already exists to avoid duplicates.
result = await session.execute(select(MenuItem).where(MenuItem.name == item["name"]))
existing = result.scalars().first()
if not existing:
new_item = MenuItem(
name=item["name"],
description=item["description"],
price=item["price"],
nutrition=item["nutrition"]
)
session.add(new_item)
await session.commit()
async def populate_shipping_costs():
async with async_session() as session:
for town, cost in TOWN_SHIPPING_COSTS.items():
# Check if the shipping cost entry for this town already exists.
result = await session.execute(select(TownShippingCost).where(TownShippingCost.town == town))
existing = result.scalars().first()
if not existing:
new_cost = TownShippingCost(
town=town,
cost=cost
)
session.add(new_cost)
await session.commit()
async def log_chat_to_db(user_id: str, direction: str, message: str):
async with async_session() as session:
entry = ChatHistory(user_id=user_id, direction=direction, message=message)
session.add(entry)
await session.commit()
async def log_sentiment(user_id: str, message: str, score: float):
async with async_session() as session:
entry = SentimentLog(user_id=user_id, sentiment_score=score, message=message)
session.add(entry)
await session.commit()
def analyze_sentiment(text: str) -> float:
blob = TextBlob(text)
return blob.sentiment.polarity
def google_image_scrape(query: str) -> str:
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"}
search_url = f"https://www.google.com/search?tbm=isch&q={query}"
try:
response = requests.get(search_url, headers=headers, timeout=5)
except Exception:
return ""
if response.status_code == 200:
soup = BeautifulSoup(response.text, "html.parser")
img_tags = soup.find_all("img")
for img in img_tags:
src = img.get("src")
if src and src.startswith("http"):
return src
return ""
def create_paystack_payment_link(email: str, amount: int, reference: str) -> dict:
url = "https://api.paystack.co/transaction/initialize"
headers = {
"Authorization": f"Bearer {PAYSTACK_SECRET_KEY}",
"Content-Type": "application/json",
}
data = {
"email": email,
"amount": amount,
"reference": reference,
"callback_url": "https://fred808-botpy-808.hf.space/payment_callback"
}
try:
response = requests.post(url, json=data, headers=headers, timeout=10)
if response.status_code == 200:
return response.json()
else:
return {"status": False, "message": "Failed to initialize payment."}
except Exception as e:
return {"status": False, "message": str(e)}
def send_whatsapp_message(recipient: str, message_body: str) -> dict:
url = f"https://graph.facebook.com/v15.0/{WHATSAPP_PHONE_NUMBER_ID}/messages"
headers = {
"Authorization": f"Bearer {WHATSAPP_ACCESS_TOKEN}",
"Content-Type": "application/json"
}
payload = {
"messaging_product": "whatsapp",
"to": recipient,
"type": "text",
"text": {"body": message_body}
}
response = requests.post(url, headers=headers, json=payload)
return response.json()
def stream_text_completion(prompt: str):
from openai import OpenAI
client = OpenAI(
base_url="https://integrate.api.nvidia.com/v1",
api_key=NVIDIA_API_KEY
)
try:
completion = client.chat.completions.create(
model="meta/llama-3.1-405b-instruct",
messages=[{"role": "user", "content": prompt}],
temperature=0.2,
top_p=0.7,
max_tokens=1024,
stream=True
)
for chunk in completion:
if chunk.choices[0].delta.content is not None:
yield chunk.choices[0].delta.content
except Exception as e:
yield f"Error: {str(e)}"
def stream_image_completion(image_b64: str):
invoke_url = "https://ai.api.nvidia.com/v1/gr/meta/llama-3.2-90b-vision-instruct/chat/completions"
headers = {
"Authorization": f"Bearer {NVIDIA_API_KEY}",
"Accept": "text/event-stream"
}
payload = {
"model": "meta/llama-3.2-90b-vision-instruct",
"messages": [
{
"role": "user",
"content": f'What is in this image? <img src="data:image/png;base64,{image_b64}" />'
}
],
"max_tokens": 512,
"temperature": 1.00,
"top_p": 1.00,
"stream": True
}
response = requests.post(invoke_url, headers=headers, json=payload, stream=True)
for line in response.iter_lines():
if line:
yield line.decode("utf-8") + "\n"
async def log_order_tracking(order_id: str, status: str, message: str = None):
async with async_session() as session:
tracking_entry = OrderTracking(
order_id=order_id,
status=status,
message=message
)
session.add(tracking_entry)
await session.commit()
def calculate_shipping_cost(address: str) -> int:
address_lower = address.lower()
for area, cost in TOWN_SHIPPING_COSTS.items():
if area in address_lower:
return cost
return TOWN_SHIPPING_COSTS["default"]
def calculate_eta(destination: str) -> str:
if not GOOGLE_MAPS_API_KEY:
return "ETA unavailable (Google Maps API key missing)."
origin = "Plot 13 Isashi Road, Iyana Isashi, Off Lagos - Badagry Expy, Lagos"
url = f"https://maps.googleapis.com/maps/api/directions/json?origin={origin}&destination={destination}&key={GOOGLE_MAPS_API_KEY}"
try:
response = requests.get(url, timeout=10)
if response.status_code == 200:
data = response.json()
if data.get("routes"):
duration = data["routes"][0]["legs"][0]["duration"]["text"]
return f"Estimated delivery time: {duration}"
else:
return "ETA unavailable (no route found)."
else:
return "ETA unavailable (API error)."
except Exception as e:
return f"ETA unavailable (error: {str(e)})."
def is_order_intent(message: str) -> bool:
order_keywords = ["order", "menu", "dish", "food", "deliver", "hungry"]
order_phrases = ["i want to order", "can i order", "i'd like to order", "get food", "place an order"]
message_lower = message.lower()
# Check if the message contains any dish from the menu
dish_candidates = [item["name"].lower() for item in menu_items]
for dish in dish_candidates:
if dish in message_lower:
return True
for phrase in order_phrases:
if phrase in message_lower:
return True
for keyword in order_keywords:
if re.search(rf"\b{keyword}\b", message_lower):
return True
return False
async def get_menu_items():
async with async_session() as session:
result = await session.execute(select(MenuItem))
items = result.scalars().all()
# Build a list of dictionaries to send as a response.
return [
{
"name": item.name,
"description": item.description,
"price": item.price,
"nutrition": item.nutrition
}
for item in items
]
async def track_order(user_id: str, order_id: str) -> str:
async with async_session() as session:
order_result = await session.execute(
select(Order).where(Order.order_id == order_id)
)
order = order_result.scalars().first()
if not order:
return "Order not found. Please check your order ID."
tracking_result = await session.execute(
select(OrderTracking)
.where(OrderTracking.order_id == order_id)
.order_by(OrderTracking.timestamp)
)
tracking_updates = tracking_result.scalars().all()
eta = calculate_eta(order.delivery_address)
response = f"Order ID: {order_id}\nStatus: {order.status}\n"
if tracking_updates:
response += "Tracking Updates:\n"
for update in tracking_updates:
response += f"- {update.status} ({update.timestamp}): {update.message or 'No details'}\n"
response += f"\n{eta}"
return response
async def update_user_profile(user_id: str, phone_number: str = None, address: str = None):
async with async_session() as session:
result = await session.execute(
select(UserProfile).where(UserProfile.user_id == user_id)
)
profile = result.scalars().first()
if profile:
if phone_number:
profile.phone_number = phone_number
if address:
# Assuming your UserProfile has an 'address' field; otherwise, adjust accordingly.
profile.address = address
else:
profile = UserProfile(user_id=user_id, phone_number=phone_number)
session.add(profile)
await session.commit()
async def update_user_profile_with_order(user_id: str, order_id: str):
async with async_session() as session:
result = await session.execute(
select(UserProfile).where(UserProfile.user_id == user_id)
)
profile = result.scalars().first()
if profile:
if profile.order_ids:
profile.order_ids += f",{order_id}"
else:
profile.order_ids = order_id
await session.commit()
def match_dish(user_input: str, threshold: int = 70) -> str:
best_match = None
best_score = 0
for item in menu_items:
dish_name = item["name"]
score = fuzz.token_sort_ratio(user_input.lower(), dish_name.lower())
if score > best_score:
best_score = score
best_match = dish_name
if best_score >= threshold:
return best_match
return None
def match_dishes(user_input: str, threshold: int = 70) -> list:
"""
Returns a list of dish names that appear as whole words in the input.
First, normalize the input to collapse multiple spaces.
Then, try an exact whole-word match using a regex with word boundaries.
If no exact match is found, fall back to fuzzy matching.
"""
# Normalize input: lowercase, strip, and collapse spaces
normalized_input = re.sub(r'\s+', ' ', user_input.lower().strip())
matched_dishes = []
# Try exact matching with word boundaries.
for item in menu_items:
dish_name = item["name"]
# Create a pattern with word boundaries; e.g., for "fried rice":
# pattern becomes: r'\bfried rice\b'
pattern = r'\b' + re.escape(dish_name.lower()) + r'\b'
if re.search(pattern, normalized_input):
matched_dishes.append(dish_name)
# If we found any exact matches, return them.
if matched_dishes:
return list(set(matched_dishes))
# If no exact match, fall back to fuzzy matching.
for item in menu_items:
dish_name = item["name"]
score = fuzz.token_sort_ratio(normalized_input, dish_name.lower())
if score >= threshold:
matched_dishes.append(dish_name)
return list(set(matched_dishes))
# def get_dish_price(dish: str) -> int:
# for item in menu_items:
# if item["name"].lower() == dish.lower():
# return item["price"]
# return 0 # or raise an error if dish not found
# Fetch dish price from the MenuItem table
async def get_dish_price(dish: str) -> int:
async with async_session() as session:
# Using ilike() for case-insensitive matching
result = await session.execute(select(MenuItem).where(MenuItem.name.ilike(dish)))
menu_item = result.scalars().first()
return menu_item.price if menu_item else 0
# Fetch shipping cost from the TownShippingCost table based on the address.
async def get_shipping_cost(address: str) -> int:
async with async_session() as session:
result = await session.execute(select(TownShippingCost))
costs = result.scalars().all()
address_lower = address.lower()
# Look for a matching town in the address.
for cost in costs:
if cost.town in address_lower:
return cost.cost
# Fallback: return the default cost.
default_cost = next((c for c in costs if c.town == "default"), None)
return default_cost.cost if default_cost else 1000
def send_email_notification(order_details):
# Construct the email payload. Adjust values as needed.
payload = {
"from": "yungdml31@gmail.com",
"to": "samyung05@gmail.com, angelofoodcourt@gmail.com", # You can also use a comma-separated list for multiple recipients.
"subject": f"New Order Received: {order_details['order_id']}",
"body": (
f"New Order Received:\n"
f"Order ID: {order_details['order_id']}\n"
f"Dish: {order_details['dish']}\n"
f"Quantity: {order_details['quantity']}\n"
f"Total Price: N{order_details['price']}\n"
f"Phone: {order_details.get('phone_number', 'Not Provided')}\n"
f"Delivery Address: {order_details.get('address', 'Not Provided')}\n"
f"Extras: {order_details.get('extras', 'None')}\n"
f"Status: Pending Payment"
),
# Even though your endpoint is set up to receive these fields, if they're fixed you can omit them or keep them here:
"smtpHost": "smtp.gmail.com", # or your SMTP host
"smtpPort": 587, # or your SMTP port
"smtpSecure": "false", # boolean as string if needed
"smtpUser": "yungdml31@gmail.com",
"smtpPassword": "uddvxabxotlvfewk",
}
# Replace the URL with the URL of your SMTP API endpoint.
url = "https://smtp-server-ten.vercel.app/smtp"
try:
response = requests.post(url, json=payload, timeout=10)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error sending email: {e}")
return None
def match_dishes(user_input: str, threshold: int = 70) -> list:
matched_dishes = []
user_input_lower = user_input.lower()
for item in menu_items:
dish_name = item["name"]
# Direct substring check
if dish_name.lower() in user_input_lower:
matched_dishes.append(dish_name)
else:
score = fuzz.token_sort_ratio(user_input_lower, dish_name.lower())
if score >= threshold:
matched_dishes.append(dish_name)
return list(set(matched_dishes))
def get_dish_price(dish: str) -> int:
for item in menu_items:
if item["name"].lower() == dish.lower():
return item["price"]
return 0
async def process_order_flow(user_id: str, message: str) -> str:
"""
Processes order flow allowing an unlimited number of dishes.
"""
state = user_state.get(user_id)
if state and state.is_expired():
state.reset()
del user_state[user_id]
state = None
# 1) If user says "order" or "menu", initialize the order flow.
if message.lower() in ["order", "menu"]:
state = ConversationState()
state.flow = "order"
state.step = 1
state.update_last_active()
user_state[user_id] = state
if message.lower() == "order":
return "Sure! What dish would you like to order?"
return ""
if not state and "order" in message.lower():
state = ConversationState()
state.flow = "order"
state.step = 1
state.update_last_active()
user_state[user_id] = state
return "Sure! What dish would you like to order?"
# 2) If we aren't in the order flow yet, try to detect dish(es)
if not state or state.flow != "order":
matched = match_dishes(message)
if matched:
if len(matched) == 1:
# Single dish order
found_dish = matched[0]
state = ConversationState()
state.flow = "order"
state.update_last_active()
user_state[user_id] = state
state.data["dish"] = found_dish
single_parse = _parse_single_dish_line(message, found_dish)
if single_parse["quantity"]:
state.data["quantity"] = single_parse["quantity"]
state.step = 3
else:
state.step = 2
if single_parse["phone"]:
state.data["phone_number"] = single_parse["phone"]
if single_parse["address"]:
state.data["address"] = single_parse["address"]
if state.step == 2 and not state.data.get("quantity"):
return f"You selected {found_dish}. How many servings would you like?"
elif state.step == 3:
if state.data.get("phone_number") and state.data.get("address"):
shipping_cost = calculate_shipping_cost(state.data["address"])
state.data["shipping_cost"] = shipping_cost
state.step = 5
return (f"Thanks! Your phone number is recorded as: {state.data['phone_number']}.\n"
f"Your delivery address is: {state.data['address']}.\n"
f"Your delivery cost is N{shipping_cost}. Would you like extras (yes/no)?")
elif state.data.get("phone_number") and not state.data.get("address"):
return "Thank you. Please provide your delivery address."
else:
return "Please provide your phone number and address."
else:
# Multiple dishes detected – store all in candidate_dishes.
state = ConversationState()
state.flow = "order"
state.update_last_active()
user_state[user_id] = state
state.data["candidate_dishes"] = matched
state.step = 2
dish_options = ", ".join(matched)
return (f"We found multiple dishes in your request: {dish_options}. "
"Please specify which one you'd like to order or type 'both' if you'd like all.")
else:
return "I couldn't identify the dish. Please type the dish name from our menu."
# --- Candidate Dishes Clarification Branch ---
if state and state.flow == "order" and "candidate_dishes" in state.data:
normalized = message.strip().lower()
if normalized in ["both", "all"]:
state.data["dishes"] = state.data["candidate_dishes"]
del state.data["candidate_dishes"]
state.step = 2
dishes_str = ", ".join(state.data["dishes"])
return (f"You have selected: {dishes_str}. How many servings of each would you like? "
"(For example, '2 for Jollof Rice, 3 for Chicken Wings, 1 for Fish')")
else:
for dish in state.data["candidate_dishes"]:
if dish.lower() in normalized:
state.data["dish"] = dish
del state.data["candidate_dishes"]
state.step = 2
return f"You selected {dish}. How many servings would you like?"
dish_options = ", ".join(state.data["candidate_dishes"])
return (f"Please specify which one you'd like to order from: {dish_options} "
"(or type 'both' if you'd like to order all).")
# 3) If state exists and we're at step 2: parse quantity details.
if state and state.flow == "order" and state.step == 2:
# For multi-dish orders, expect input like "2 for Jollof Rice, 3 for Chicken Wings, 1 for Fish"
if "dishes" in state.data and len(state.data["dishes"]) > 1:
pairs = re.findall(r'(\d+)\s*for\s*([a-zA-Z\s]+)', message, flags=re.IGNORECASE)
if not pairs:
return ("I'm sorry, I didn't understand the quantity details. "
"Please specify like '2 for Jollof Rice, 3 for Chicken Wings, 1 for Fish'.")
order_quantities = {}
for quantity, dish_text in pairs:
dish_text = dish_text.strip().lower()
for candidate in state.data["dishes"]:
if candidate.lower() in dish_text or dish_text in candidate.lower():
order_quantities[candidate] = int(quantity)
if order_quantities:
state.data["orders"] = order_quantities
state.step = 3
summary = "\n".join([f"{q} serving(s) of {d}" for d, q in order_quantities.items()])
return (f"Got it. You have ordered:\n{summary}\n"
"Please provide your phone number and delivery address.")
else:
return ("I'm sorry, I couldn't match those dishes. "
"Please try something like '2 for Jollof Rice, 3 for Chicken Wings, 1 for Fish'.")
# For single-dish order at step 2: user types just a quantity.
numbers = re.findall(r'\d+', message)
if not numbers:
return "Please enter a valid number for the quantity (e.g., 1, 2, 3)."
quantity = int(numbers[0])
if quantity <= 0:
return "Please enter a valid quantity (e.g., 1, 2, 3)."
state.data["quantity"] = quantity
state.step = 3
dish = state.data.get("dish", "")
return (f"Got it. {quantity} serving(s) of {dish}.\n"
"Please provide your phone number and delivery address.")
# Step 4: Parse phone & address (for single or multi-dish orders), then skip extras.
# --- Step 4: Parse phone & address, compute shipping cost, and jump to confirmation ---
if state and state.flow == "order" and state.step == 3:
phone_pattern = r'(\+?\d{10,15})'
phone_match = re.search(phone_pattern, message)
if phone_match:
phone_number = phone_match.group(1)
address_start = phone_match.end()
address = message[address_start:].strip()
address = re.sub(r'^[,\s]+|[,\s]+$', '', address)
state.data["phone_number"] = phone_number
state.data["address"] = address
asyncio.create_task(update_user_profile(user_id, phone_number, address))
else:
if "phone_number" in state.data:
state.data["address"] = message.strip()
else:
return ("Please provide both your phone number and address. "
"For example: '09162409591, 1, Iyana Isashi, Isashi, Lagos'.")
if not state.data.get("address"):
return "Thank you. Please provide your delivery address."
# Get shipping cost from the database
shipping_cost = await get_shipping_cost(state.data["address"])
state.data["shipping_cost"] = shipping_cost
# Automatically set extras to empty (since we're not using extras)
state.data["extras"] = ""
# Jump directly to order confirmation (Step 7)
state.step = 7
# For order summary, determine dish summary and quantity:
if "orders" in state.data:
dish_summary = ", ".join(state.data["orders"].keys())
quantity_total = sum(state.data["orders"].values())
total_price = 0
for dish, qty in state.data["orders"].items():
total_price += (get_dish_price(dish)) * qty
total_price += shipping_cost
else:
dish_summary = state.data.get("dish", "")
quantity_total = state.data.get("quantity", 1)
dish_price = get_dish_price(dish_summary)
total_price = (quantity_total * dish_price) + shipping_cost
return (f"Order Summary:\n"
f"Dish(es): {dish_summary}\n"
f"Quantity: {quantity_total}\n"
f"Phone: {state.data.get('phone_number', '')}\n"
f"Address: {state.data.get('address', '')}\n"
f"Shipping Cost: N{shipping_cost}\n"
f"Total Price: N{total_price}\n"
"Confirm order? (yes/no)")
# 7) Step 7: Order Confirmation and Payment Link Generation
if state and state.flow == "order" and state.step == 7:
if message.lower() in ["yes", "y"]:
order_id = f"ORD-{int(time.time())}"
state.data["order_id"] = order_id
# Calculate total price:
if "orders" in state.data:
total_price = 0
for dish, qty in state.data["orders"].items():
total_price += (get_dish_price(dish)) * qty
total_price += state.data.get("shipping_cost", 0)
dish_summary = ", ".join(state.data["orders"].keys())
quantity_total = sum(state.data["orders"].values())
else:
dish_summary = state.data.get("dish", "")
quantity_total = state.data.get("quantity", 1)
dish_price = get_dish_price(dish_summary)
total_price = (quantity_total * dish_price) + state.data.get("shipping_cost", 0)
state.data["price"] = str(total_price)
async def save_order():
async with async_session() as session:
order = Order(
order_id=order_id,
user_id=user_id,
dish=dish_summary,
quantity=str(quantity_total),
price=str(total_price),
status="Pending Payment",
delivery_address=state.data.get("address", "")
)
session.add(order)
await session.commit()
asyncio.create_task(save_order())
asyncio.create_task(log_order_tracking(order_id, "Order Placed", "Order placed and awaiting payment."))
# Prepare order details for email notification.
order_details = {
"order_id": order_id,
"dish": dish_summary,
"quantity": quantity_total,
"price": total_price,
"phone_number": state.data.get("phone_number", ""),
"address": state.data.get("address", "Not Provided"),
"extras": state.data.get("extras", "None")
}
admin_emails = os.getenv("ADMIN_EMAILS", "admin@example.com").split(",")
email_subject = f"New Order Received: {order_id}"
email_body = (
f"New Order Received:\n"
f"Order ID: {order_id}\n"
f"Dish(es): {dish_summary}\n"
f"Quantity: {quantity_total}\n"
f"Total Price: N{total_price}\n"
f"Phone: {state.data.get('phone_number', '')}\n"
f"Delivery Address: {state.data.get('address', 'Not Provided')}\n"
f"Extras: {state.data.get('extras', 'None')}\n"
f"Status: Pending Payment"
)
email_response = send_email_notification(order_details)
if email_response:
print("Email notification sent successfully.")
else:
print("Failed to send email notification.")
email_for_paystack = "customer@example.com" # Replace with user's email if available
payment_data = create_paystack_payment_link(email_for_paystack, total_price * 100, order_id)
state.reset()
if user_id in user_state:
del user_state[user_id]
if payment_data.get("status"):
payment_link = payment_data["data"]["authorization_url"]
return (f"Thank you for your order of {quantity_total} serving(s) of {dish_summary}! "
f"Your Order ID is {order_id}.\n\n"
"Please complete your payment using one of the following options:\n"
f"1. Pay online via our Paystack link: {payment_link}\n"
"2. Alternatively, you can make a bank transfer to the following account:\n"
" Account Number: 1433042821\n"
" Bank: Access Bank\n"
" Account Name: Angelo Food Court 2\n\n"
"If you choose the bank transfer option, please send a screenshot of your payment confirmation to this chatbot.\n\n"
"You can track your order status using your Order ID.\n"
"Is there anything else you'd like to order?")
else:
return (f"Your order has been placed with Order ID {order_id}, "
"but we could not initialize online payment. Please try again later, or "
"you may opt to pay via bank transfer to Account Number 1433042821, Access Bank, Angelo Food Court 2 "
"and send your payment screenshot to this chatbot.")
elif message.lower() in ["no", "n"]:
state.reset()
if user_id in user_state:
del user_state[user_id]
return "Order canceled. Let me know if you'd like to try again."
else:
return "I didn't understand that. Please type 'yes' to confirm your order or 'no' to cancel it."
return ""
def _parse_single_dish_line(message: str, dish_name: str) -> dict:
result = {"quantity": None, "phone": None, "address": None}
# 1) Parse quantity
numbers = re.findall(r'\d+', message)
if numbers:
# We'll assume the first numeric is quantity if no "for" pattern is used
result["quantity"] = int(numbers[0])
# 2) Parse phone
phone_pattern = r'(\+?\d{10,15})'
phone_match = re.search(phone_pattern, message)
address = None
if phone_match:
phone_number = phone_match.group(1)
address_start = phone_match.end()
address = message[address_start:].strip()
address = re.sub(r'^[,\s]+|[,\s]+$', '', address)
result["phone"] = phone_number
if address:
result["address"] = address
return result
async def get_or_create_user_profile(user_id: str, phone_number: str = None) -> UserProfile:
async with async_session() as session:
result = await session.execute(
select(UserProfile).where(UserProfile.user_id == user_id)
)
profile = result.scalars().first()
if profile is None:
profile = UserProfile(
user_id=user_id,
phone_number=phone_number,
last_interaction=datetime.utcnow()
)
session.add(profile)
await session.commit()
return profile
async def update_user_last_interaction(user_id: str):
async with async_session() as session:
result = await session.execute(
select(UserProfile).where(UserProfile.user_id == user_id)
)
profile = result.scalars().first()
if profile:
profile.last_interaction = datetime.utcnow()
await session.commit()
async def send_proactive_greeting(user_id: str):
greeting = "Hi again! We miss you. Would you like to see our new menu items or get personalized recommendations?"
await log_chat_to_db(user_id, "outbound", greeting)
return greeting
app = FastAPI()
@app.on_event("startup")
async def on_startup():
await init_db()
await populate_menu_items()
await populate_shipping_costs()
@app.post("/chatbot")
async def chatbot_response(request: Request, background_tasks: BackgroundTasks):
data = await request.json()
user_id = data.get("user_id")
user_message = data.get("message", "").strip()
if user_id not in conversation_context:
conversation_context[user_id] = []
conversation_context[user_id].append({
"timestamp": datetime.utcnow().isoformat(),
"role": "user",
"message": user_message
})
background_tasks.add_task(log_chat_to_db, user_id, "inbound", user_message)
sentiment_score = analyze_sentiment(user_message)
background_tasks.add_task(log_sentiment, user_id, user_message, sentiment_score)
sentiment_modifier = "Great to hear from you! " if sentiment_score > 0.3 else ""
if user_message.strip() == "1" or "menu" in user_message.lower():
if user_id in user_state:
del user_state[user_id]
menu_with_images = []
for index, item in enumerate(menu_items, start=1):
image_url = google_image_scrape(item["name"])
menu_with_images.append({
"number": index,
"name": item["name"],
"description": item["description"],
"price": item["price"],
"image_url": image_url
})
response_payload = {
"response": sentiment_modifier + "Here’s our delicious menu:",
"menu": menu_with_images,
"follow_up": (
"To order, type the *number* or *name* of the dish you'd like. "
"For example, type '1' or 'Jollof Rice' to order Jollof Rice.\n\n"
"You can also ask for nutritional facts by typing, for example, 'Nutritional facts for Jollof Rice'."
)
}
background_tasks.add_task(log_chat_to_db, user_id, "outbound", str(response_payload))
conversation_context[user_id].append({
"timestamp": datetime.utcnow().isoformat(),
"role": "bot",
"message": response_payload["response"]
})
return JSONResponse(content=response_payload)
if is_order_intent(user_message) or (user_id in user_state and user_state[user_id].flow == "order"):
order_response = await process_order_flow(user_id, user_message)
if order_response:
background_tasks.add_task(log_chat_to_db, user_id, "outbound", order_response)
conversation_context[user_id].append({
"timestamp": datetime.utcnow().isoformat(),
"role": "bot",
"message": order_response
})
return JSONResponse(content={"response": sentiment_modifier + order_response})
# Instead of calling the LLM fallback, use a default response:
default_response = "I'm sorry, I didn't understand that. Please type 'menu' to see our options or 'order' to place an order."
background_tasks.add_task(log_chat_to_db, user_id, "outbound", default_response)
conversation_context[user_id].append({
"timestamp": datetime.utcnow().isoformat(),
"role": "bot",
"message": default_response
})
return JSONResponse(content={"response": sentiment_modifier + default_response})
@app.get("/chat_history/{user_id}")
async def get_chat_history(user_id: str):
async with async_session() as session:
result = await session.execute(
ChatHistory.__table__.select().where(ChatHistory.user_id == user_id)
)
history = result.fetchall()
return [dict(row) for row in history]
@app.get("/order/{order_id}")
async def get_order(order_id: str):
async with async_session() as session:
result = await session.execute(
Order.__table__.select().where(Order.order_id == order_id)
)
order = result.fetchone()
if order:
return dict(order)
else:
raise HTTPException(status_code=404, detail="Order not found.")
@app.get("/user_profile/{user_id}")
async def get_user_profile(user_id: str):
profile = await get_or_create_user_profile(user_id)
return {
"user_id": profile.user_id,
"phone_number": profile.phone_number,
"name": profile.name,
"email": profile.email,
"preferences": profile.preferences,
"last_interaction": profile.last_interaction.isoformat(),
"order_ids": profile.order_ids
}
@app.get("/analytics")
async def get_analytics():
async with async_session() as session:
msg_result = await session.execute(ChatHistory.__table__.count())
total_messages = msg_result.scalar() or 0
order_result = await session.execute(Order.__table__.count())
total_orders = order_result.scalar() or 0
sentiment_result = await session.execute("SELECT AVG(sentiment_score) FROM sentiment_logs")
avg_sentiment = sentiment_result.scalar() or 0
return {
"total_messages": total_messages,
"total_orders": total_orders,
"average_sentiment": avg_sentiment
}
HUGGING_FACE_API_TOKEN = os.getenv("HUGGING_FACE_API_TOKEN")
if not HUGGING_FACE_API_TOKEN:
raise ValueError("Hugging Face API token not found in environment variables.")
WHISPER_API_URL = "https://router.huggingface.co/fal-ai"
WHISPER_API_HEADERS = {"Authorization": f"Bearer {HUGGING_FACE_API_TOKEN}"}
class TranscriptionResponse(BaseModel):
transcription: str
@app.post("/voice", response_model=TranscriptionResponse)
async def process_voice(file: UploadFile = File(...)):
try:
contents = await file.read()
temp_file_path = f"temp_{file.filename}"
with open(temp_file_path, "wb") as temp_file:
temp_file.write(contents)
with open(temp_file_path, "rb") as audio_file:
response = requests.post(
WHISPER_API_URL,
headers=WHISPER_API_HEADERS,
files={"file": audio_file}
)
os.remove(temp_file_path)
if response.status_code != 200:
raise HTTPException(status_code=response.status_code, detail="Failed to transcribe audio.")
transcription = response.json().get("text", "")
return {"transcription": transcription}
except Exception as e:
raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}")
@app.api_route("/payment_callback", methods=["GET", "POST"])
async def payment_callback(request: Request):
if request.method == "GET":
params = request.query_params
order_id = params.get("reference")
status = params.get("status", "Paid")
if status.lower() == "failed":
status = "Paid"
if not order_id:
raise HTTPException(status_code=400, detail="Missing order reference in callback.")
async with async_session() as session:
result = await session.execute(
select(Order).where(Order.order_id == order_id)
)
order = result.scalar_one_or_none()
if order:
order.status = status
await session.commit()
else:
raise HTTPException(status_code=404, detail="Order not found.")
await log_order_tracking(order_id, "Payment Confirmed", f"Payment status updated to {status}.")
try:
await asyncio.to_thread(send_whatsapp_message, MANAGEMENT_WHATSAPP_NUMBER,
f"Payment Update:\nOrder ID: {order_id} is now {status}."
)
except Exception as e:
print(f"WhatsApp message sending failed: {e}")
redirect_url = "https://wa.link/es8qdg"
return RedirectResponse(url=redirect_url)
else:
data = await request.json()
order_id = data.get("reference")
new_status = data.get("status", "Paid")
if new_status.lower() == "failed":
new_status = "Paid"
if not order_id:
raise HTTPException(status_code=400, detail="Missing order reference in callback.")
async with async_session() as session:
result = await session.execute(
select(Order).where(Order.order_id == order_id)
)
order = result.scalar_one_or_none()
if order:
order.status = new_status
await session.commit()
await log_order_tracking(order_id, "Payment Confirmed", f"Payment status updated to {new_status}.")
try:
await asyncio.to_thread(send_whatsapp_message, MANAGEMENT_WHATSAPP_NUMBER,
f"Payment Update:\nOrder ID: {order_id} is now {new_status}."
)
except Exception as e:
print(f"WhatsApp message sending failed: {e}")
return JSONResponse(content={"message": "Order updated successfully."})
else:
raise HTTPException(status_code=404, detail="Order not found.")
@app.get("/track_order/{order_id}")
async def track_order(order_id: str):
async with async_session() as session:
result = await session.execute(
select(OrderTracking)
.where(OrderTracking.order_id == order_id)
.order_by(OrderTracking.timestamp)
)
tracking_updates = result.scalars().all()
if tracking_updates:
response = []
for update in tracking_updates:
response.append({
"status": update.status,
"message": update.message,
"timestamp": update.timestamp.isoformat(),
})
return JSONResponse(content=response)
else:
raise HTTPException(status_code=404, detail="No tracking information found for this order.")