Botpy-808 / app.py
Fred808's picture
Update app.py
7f62c94 verified
raw
history blame
37.3 kB
import re
import os
import time
import requests
import base64
import asyncio
from datetime import datetime, timedelta
from bs4 import BeautifulSoup
from sqlalchemy import select
from pydantic import BaseModel
from fastapi import FastAPI, Request, HTTPException, BackgroundTasks, UploadFile, File, Form
from fastapi.responses import JSONResponse, StreamingResponse, RedirectResponse
import openai
from textblob import TextBlob
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy import Column, Integer, String, DateTime, Text, Float
SPOONACULAR_API_KEY = os.getenv("SPOONACULAR_API_KEY", "default_fallback_value")
PAYSTACK_SECRET_KEY = os.getenv("PAYSTACK_SECRET_KEY", "default_fallback_value")
DATABASE_URL = os.getenv("DATABASE_URL", "default_fallback_value")
NVIDIA_API_KEY = os.getenv("NVIDIA_API_KEY", "default_fallback_value")
openai.api_key = os.getenv("OPENAI_API_KEY", "default_fallback_value")
GOOGLE_MAPS_API_KEY = os.getenv("GOOGLE_MAPS_API_KEY", "default_fallback_value")
WHATSAPP_PHONE_NUMBER_ID = os.getenv("WHATSAPP_PHONE_NUMBER_ID", "default_value")
WHATSAPP_ACCESS_TOKEN = os.getenv("WHATSAPP_ACCESS_TOKEN", "default_value")
MANAGEMENT_WHATSAPP_NUMBER = os.getenv("MANAGEMENT_WHATSAPP_NUMBER", "default_value")
TOWN_SHIPPING_COSTS = {
"lasu gate": 1000,
"ojo": 800,
"ajangbadi": 1200,
"iba": 900,
"okokomaiko": 1500,
"default": 1000
}
Base = declarative_base()
class ChatHistory(Base):
__tablename__ = "chat_history"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(String, index=True)
timestamp = Column(DateTime, default=datetime.utcnow)
direction = Column(String)
message = Column(Text)
class Order(Base):
__tablename__ = "orders"
id = Column(Integer, primary_key=True, index=True)
order_id = Column(String, unique=True, index=True)
user_id = Column(String, index=True)
dish = Column(String)
quantity = Column(String)
price = Column(String, default="0")
status = Column(String, default="Pending Payment")
payment_reference = Column(String, nullable=True)
delivery_address = Column(String, default="")
timestamp = Column(DateTime, default=datetime.utcnow)
class UserProfile(Base):
__tablename__ = "user_profiles"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(String, unique=True, index=True)
phone_number = Column(String, unique=True, index=True, nullable=True)
name = Column(String, default="Valued Customer")
email = Column(String, default="unknown@example.com")
preferences = Column(Text, default="")
last_interaction = Column(DateTime, default=datetime.utcnow)
order_ids = Column(Text, default="")
class SentimentLog(Base):
__tablename__ = "sentiment_logs"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(String, index=True)
timestamp = Column(DateTime, default=datetime.utcnow)
sentiment_score = Column(Float)
message = Column(Text)
class OrderTracking(Base):
__tablename__ = "order_tracking"
id = Column(Integer, primary_key=True, index=True)
order_id = Column(String, index=True)
status = Column(String)
message = Column(Text, nullable=True)
timestamp = Column(DateTime, default=datetime.utcnow)
engine = create_async_engine(DATABASE_URL, echo=True)
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async def init_db():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
user_state = {}
conversation_context = {}
proactive_timer = {}
menu_items = [
{"name": "Jollof Rice", "description": "A spicy and flavorful rice dish", "price": 1500, "nutrition": "Calories: 300 kcal, Carbs: 50g, Protein: 10g, Fat: 5g"},
{"name": "Fried Rice", "description": "A savory rice dish with vegetables and meat", "price": 1200, "nutrition": "Calories: 350 kcal, Carbs: 55g, Protein: 12g, Fat: 8g"},
{"name": "Chicken Wings", "description": "Crispy fried chicken wings", "price": 2000, "nutrition": "Calories: 400 kcal, Carbs: 20g, Protein: 25g, Fat: 15g"},
{"name": "Egusi Soup", "description": "A rich and hearty soup made with melon seeds", "price": 1000, "nutrition": "Calories: 250 kcal, Carbs: 15g, Protein: 8g, Fat: 10g"}
]
class ConversationState:
def __init__(self):
self.flow = None
self.step = 0
self.data = {}
self.last_active = datetime.utcnow()
def update_last_active(self):
self.last_active = datetime.utcnow()
def is_expired(self):
return datetime.utcnow() - self.last_active > SESSION_TIMEOUT
def reset(self):
self.flow = None
self.step = 0
self.data = {}
self.last_active = datetime.utcnow()
SESSION_TIMEOUT = timedelta(minutes=5)
async def log_chat_to_db(user_id: str, direction: str, message: str):
async with async_session() as session:
entry = ChatHistory(user_id=user_id, direction=direction, message=message)
session.add(entry)
await session.commit()
async def log_sentiment(user_id: str, message: str, score: float):
async with async_session() as session:
entry = SentimentLog(user_id=user_id, sentiment_score=score, message=message)
session.add(entry)
await session.commit()
def analyze_sentiment(text: str) -> float:
blob = TextBlob(text)
return blob.sentiment.polarity
def google_image_scrape(query: str) -> str:
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"}
search_url = f"https://www.google.com/search?tbm=isch&q={query}"
try:
response = requests.get(search_url, headers=headers, timeout=5)
except Exception:
return ""
if response.status_code == 200:
soup = BeautifulSoup(response.text, "html.parser")
img_tags = soup.find_all("img")
for img in img_tags:
src = img.get("src")
if src and src.startswith("http"):
return src
return ""
def create_paystack_payment_link(email: str, amount: int, reference: str) -> dict:
url = "https://api.paystack.co/transaction/initialize"
headers = {
"Authorization": f"Bearer {PAYSTACK_SECRET_KEY}",
"Content-Type": "application/json",
}
data = {
"email": email,
"amount": amount,
"reference": reference,
"callback_url": "https://custy-bot.vercel.app/payment_callback"
}
try:
response = requests.post(url, json=data, headers=headers, timeout=10)
if response.status_code == 200:
return response.json()
else:
return {"status": False, "message": "Failed to initialize payment."}
except Exception as e:
return {"status": False, "message": str(e)}
def send_whatsapp_message(recipient: str, message_body: str) -> dict:
url = f"https://graph.facebook.com/v15.0/{WHATSAPP_PHONE_NUMBER_ID}/messages"
headers = {
"Authorization": f"Bearer {WHATSAPP_ACCESS_TOKEN}",
"Content-Type": "application/json"
}
payload = {
"messaging_product": "whatsapp",
"to": recipient,
"type": "text",
"text": {"body": message_body}
}
response = requests.post(url, headers=headers, json=payload)
return response.json()
def stream_text_completion(prompt: str):
from openai import OpenAI
client = OpenAI(
base_url="https://integrate.api.nvidia.com/v1",
api_key=NVIDIA_API_KEY
)
try:
completion = client.chat.completions.create(
model="meta/llama-3.1-405b-instruct",
messages=[{"role": "user", "content": prompt}],
temperature=0.2,
top_p=0.7,
max_tokens=1024,
stream=True
)
for chunk in completion:
if chunk.choices[0].delta.content is not None:
yield chunk.choices[0].delta.content
except Exception as e:
yield f"Error: {str(e)}"
def stream_image_completion(image_b64: str):
invoke_url = "https://ai.api.nvidia.com/v1/gr/meta/llama-3.2-90b-vision-instruct/chat/completions"
headers = {
"Authorization": f"Bearer {NVIDIA_API_KEY}",
"Accept": "text/event-stream"
}
payload = {
"model": "meta/llama-3.2-90b-vision-instruct",
"messages": [
{
"role": "user",
"content": f'What is in this image? <img src="data:image/png;base64,{image_b64}" />'
}
],
"max_tokens": 512,
"temperature": 1.00,
"top_p": 1.00,
"stream": True
}
response = requests.post(invoke_url, headers=headers, json=payload, stream=True)
for line in response.iter_lines():
if line:
yield line.decode("utf-8") + "\n"
async def log_order_tracking(order_id: str, status: str, message: str = None):
async with async_session() as session:
tracking_entry = OrderTracking(
order_id=order_id,
status=status,
message=message
)
session.add(tracking_entry)
await session.commit()
from fuzzywuzzy import fuzz
def match_dish(user_input: str, threshold: int = 70) -> str:
best_match = None
best_score = 0
for item in menu_items:
dish_name = item["name"]
score = fuzz.token_sort_ratio(user_input.lower(), dish_name.lower())
if score > best_score:
best_score = score
best_match = dish_name
if best_score >= threshold:
return best_match
return None
def calculate_shipping_cost(address: str) -> int:
address_lower = address.lower()
for area, cost in TOWN_SHIPPING_COSTS.items():
if area in address_lower:
return cost
return TOWN_SHIPPING_COSTS["default"]
def calculate_eta(destination: str) -> str:
if not GOOGLE_MAPS_API_KEY:
return "ETA unavailable (Google Maps API key missing)."
origin = "Plot 13 Isashi Road, Iyana Isashi, Off Lagos - Badagry Expy, Lagos"
url = f"https://maps.googleapis.com/maps/api/directions/json?origin={origin}&destination={destination}&key={GOOGLE_MAPS_API_KEY}"
try:
response = requests.get(url, timeout=10)
if response.status_code == 200:
data = response.json()
if data.get("routes"):
duration = data["routes"][0]["legs"][0]["duration"]["text"]
return f"Estimated delivery time: {duration}"
else:
return "ETA unavailable (no route found)."
else:
return "ETA unavailable (API error)."
except Exception as e:
return f"ETA unavailable (error: {str(e)})."
def is_order_intent(message: str) -> bool:
order_keywords = ["order", "menu", "dish", "food", "deliver", "hungry"]
order_phrases = ["i want to order", "can i order", "i'd like to order", "get food", "place an order"]
message_lower = message.lower()
# Check if the message contains any dish from the menu
dish_candidates = [item["name"].lower() for item in menu_items]
for dish in dish_candidates:
if dish in message_lower:
return True
for phrase in order_phrases:
if phrase in message_lower:
return True
for keyword in order_keywords:
if re.search(rf"\b{keyword}\b", message_lower):
return True
return False
async def track_order(user_id: str, order_id: str) -> str:
async with async_session() as session:
order_result = await session.execute(
select(Order).where(Order.order_id == order_id)
)
order = order_result.scalars().first()
if not order:
return "Order not found. Please check your order ID."
tracking_result = await session.execute(
select(OrderTracking)
.where(OrderTracking.order_id == order_id)
.order_by(OrderTracking.timestamp)
)
tracking_updates = tracking_result.scalars().all()
eta = calculate_eta(order.delivery_address)
response = f"Order ID: {order_id}\nStatus: {order.status}\n"
if tracking_updates:
response += "Tracking Updates:\n"
for update in tracking_updates:
response += f"- {update.status} ({update.timestamp}): {update.message or 'No details'}\n"
response += f"\n{eta}"
return response
async def update_user_profile(user_id: str, phone_number: str = None, address: str = None):
async with async_session() as session:
result = await session.execute(
select(UserProfile).where(UserProfile.user_id == user_id)
)
profile = result.scalars().first()
if profile:
if phone_number:
profile.phone_number = phone_number
if address:
# Assuming your UserProfile has an 'address' field; otherwise, adjust accordingly.
profile.address = address
else:
profile = UserProfile(user_id=user_id, phone_number=phone_number)
session.add(profile)
await session.commit()
async def update_user_profile_with_order(user_id: str, order_id: str):
async with async_session() as session:
result = await session.execute(
select(UserProfile).where(UserProfile.user_id == user_id)
)
profile = result.scalars().first()
if profile:
if profile.order_ids:
profile.order_ids += f",{order_id}"
else:
profile.order_ids = order_id
await session.commit()
async def process_order_flow(user_id: str, message: str) -> str:
state = user_state.get(user_id)
if state and state.is_expired():
state.reset()
del user_state[user_id]
state = None
if message.lower() in ["order", "menu"]:
state = ConversationState()
state.flow = "order"
state.step = 1
state.update_last_active()
user_state[user_id] = state
if message.lower() == "order":
return "Sure! What dish would you like to order?"
return ""
if not state and "order" in message.lower():
state = ConversationState()
state.flow = "order"
state.step = 1
state.update_last_active()
user_state[user_id] = state
return "Sure! What dish would you like to order?"
# If no state exists or not in order flow, try to detect a dish using fuzzy matching.
if not state or state.flow != "order":
found_dish = match_dish(message)
if found_dish:
state = ConversationState()
state.flow = "order"
state.data["dish"] = found_dish
state.update_last_active()
user_state[user_id] = state
numbers = re.findall(r'\d+', message)
if numbers:
quantity = int(numbers[0])
if quantity <= 0:
return "Please enter a valid quantity (e.g., 1, 2, 3)."
state.data["quantity"] = quantity
state.step = 3
phone_pattern = r'(\+?\d{10,15})'
phone_match = re.search(phone_pattern, message)
address = None
if phone_match:
phone_number = phone_match.group(1)
address_start = phone_match.end()
address = message[address_start:].strip()
address = re.sub(r'^[,\s]+|[,\s]+$', '', address)
if phone_match and address:
state.data["phone_number"] = phone_number
state.data["address"] = address
asyncio.create_task(update_user_profile(user_id, phone_number, address))
shipping_cost = calculate_shipping_cost(address)
state.data["shipping_cost"] = shipping_cost
state.step = 5
return (f"Thanks! Your phone number is recorded as: {phone_number}.\n"
f"Your delivery address is: {address}.\n"
f"Your delivery cost is N{shipping_cost}. Would you like to add extras (yes/no)?")
elif phone_match:
state.data["phone_number"] = phone_match.group(1)
asyncio.create_task(update_user_profile(user_id, phone_number))
return "Thank you. Please provide your delivery address."
else:
return ("Please provide both your phone number and delivery address. "
"For example: '09162409591, 1, Iyana Isashi, Isashi, Ojo, Lagos'.")
else:
state.step = 2
return f"You selected {found_dish}. How many servings would you like?"
# If state exists and we're already in order flow:
if state and state.flow == "order":
state.update_last_active()
if state.step == 1:
# Use fuzzy matching on the message in case of typos.
found_dish = match_dish(message)
numbers = re.findall(r'\d+', message)
if found_dish:
state.data["dish"] = found_dish
if numbers:
quantity = int(numbers[0])
if quantity <= 0:
return "Please enter a valid quantity (e.g., 1, 2, 3)."
state.data["quantity"] = quantity
state.step = 3
return (f"You selected {found_dish} with {quantity} serving(s). "
"Please provide your phone number and delivery address.")
else:
state.step = 2
return f"You selected {found_dish}. How many servings would you like?"
else:
return "I couldn't identify the dish. Please type the dish name from our menu."
if state.step == 2:
numbers = re.findall(r'\d+', message)
if not numbers:
return "Please enter a valid number for the quantity (e.g., 1, 2, 3)."
quantity = int(numbers[0])
if quantity <= 0:
return "Please enter a valid quantity (e.g., 1, 2, 3)."
state.data["quantity"] = quantity
state.step = 3
return f"Got it. {quantity} serving(s) of {state.data.get('dish')}. Please provide your phone number and delivery address."
if state.step == 3:
phone_pattern = r'(\+?\d{10,15})'
phone_match = re.search(phone_pattern, message)
address = None
if phone_match:
phone_number = phone_match.group(1)
address_start = phone_match.end()
address = message[address_start:].strip()
address = re.sub(r'^[,\s]+|[,\s]+$', '', address)
if phone_match and address:
state.data["phone_number"] = phone_number
state.data["address"] = address
asyncio.create_task(update_user_profile(user_id, phone_number, address))
shipping_cost = calculate_shipping_cost(address)
state.data["shipping_cost"] = shipping_cost
state.step = 5
return (f"Thanks! Your phone number is recorded as: {phone_number}.\n"
f"Your delivery address is: {address}.\n"
f"Your delivery cost is N{shipping_cost}. Would you like extras (yes/no)?")
elif phone_match:
state.data["phone_number"] = phone_match.group(1)
asyncio.create_task(update_user_profile(user_id, phone_number))
return "Thank you. Please provide your delivery address."
else:
return ("Please provide both your phone number and delivery address. "
"For example: '09162409591, 1, Iyana Isashi, Isashi, Ojo, Lagos'.")
# Steps 4, 5, 6, and 7 remain unchanged.
if state.step == 4:
state.data["address"] = message
asyncio.create_task(update_user_profile(user_id, address=message))
shipping_cost = calculate_shipping_cost(message)
state.data["shipping_cost"] = shipping_cost
state.step = 5
return (f"Thanks. Your delivery address is recorded as: {message}.\n"
f"Your delivery cost is N{shipping_cost}. Would you like extras (yes/no)?")
if state.step == 5:
if message.lower() in ["yes", "y"]:
state.step = 6
return "Please list the extras you'd like (e.g., drinks, sides)."
elif message.lower() in ["no", "n"]:
state.data["extras"] = ""
state.step = 7
dish = state.data.get("dish", "")
quantity = state.data.get("quantity", 1)
phone = state.data.get("phone_number", "")
address = state.data.get("address", "")
shipping_cost = state.data.get("shipping_cost", 0)
price_per_serving = 1500
total_price = (quantity * price_per_serving) + shipping_cost
summary = (f"Order Summary:\nDish: {dish}\nQuantity: {quantity}\n"
f"Phone: {phone}\nAddress: {address}\n"
f"Shipping Cost: N{shipping_cost}\n"
f"Total Price: N{total_price}\n"
f"Extras: None\nConfirm order? (yes/no)")
return summary
else:
return "Please respond with 'yes' or 'no' regarding extras."
if state.step == 6:
state.data["extras"] = message
state.step = 7
dish = state.data.get("dish", "")
quantity = state.data.get("quantity", 1)
phone = state.data.get("phone_number", "")
address = state.data.get("address", "")
shipping_cost = state.data.get("shipping_cost", 0)
extras = state.data.get("extras", "")
price_per_serving = 1500
total_price = (quantity * price_per_serving) + shipping_cost
summary = (f"Order Summary:\nDish: {dish}\nQuantity: {quantity}\n"
f"Phone: {phone}\nAddress: {address}\n"
f"Shipping Cost: N{shipping_cost}\n"
f"Total Price: N{total_price}\n"
f"Extras: {extras}\nConfirm order? (yes/no)")
return summary
if state.step == 7:
if message.lower() in ["yes", "y"]:
order_id = f"ORD-{int(time.time())}"
state.data["order_id"] = order_id
price_per_serving = 1500
quantity = state.data.get("quantity", 1)
shipping_cost = state.data.get("shipping_cost", 0)
total_price = (quantity * price_per_serving) + shipping_cost
state.data["price"] = str(total_price)
async def save_order():
async with async_session() as session:
order = Order(
order_id=order_id,
user_id=user_id,
dish=state.data["dish"],
quantity=str(quantity),
price=str(total_price),
status="Pending Payment",
delivery_address=state.data.get("address", ""),
shipping_cost=str(shipping_cost)
)
session.add(order)
await session.commit()
asyncio.create_task(save_order())
asyncio.create_task(log_order_tracking(order_id, "Order Placed", "Order placed and awaiting payment."))
async def notify_management_order(order_details: dict):
message_body = (
f"New Order Received:\n"
f"Order ID: {order_details['order_id']}\n"
f"Dish: {order_details['dish']}\n"
f"Quantity: {order_details['quantity']}\n"
f"Total Price: {order_details['price']}\n"
f"Phone: {state.data.get('phone_number', '')}\n"
f"Delivery Address: {order_details.get('address', 'Not Provided')}\n"
f"Extras: {state.data.get('extras', 'None')}\n"
f"Status: Pending Payment"
)
await asyncio.to_thread(send_whatsapp_message, MANAGEMENT_WHATSAPP_NUMBER, message_body)
order_details = {
"order_id": order_id,
"dish": state.data["dish"],
"quantity": state.data["quantity"],
"price": state.data["price"],
"address": state.data.get("address", "")
}
asyncio.create_task(notify_management_order(order_details))
email = "customer@example.com"
payment_data = create_paystack_payment_link(email, total_price * 100, order_id)
dish_name = state.data.get("dish", "")
state.reset()
if user_id in user_state:
del user_state[user_id]
if payment_data.get("status"):
payment_link = payment_data["data"]["authorization_url"]
return (f"Thank you for your order of {quantity} serving(s) of {dish_name}! "
f"Your Order ID is {order_id}.\nPlease complete payment here: {payment_link}\n"
"You can track your order status using your Order ID.\n"
"Is there anything else you'd like to order?")
else:
return (f"Your order has been placed with Order ID {order_id}, "
"but we could not initialize payment. Please try again later.")
else:
state.reset()
if user_id in user_state:
del user_state[user_id]
return "Order canceled. Let me know if you'd like to try again."
return ""
async def get_or_create_user_profile(user_id: str, phone_number: str = None) -> UserProfile:
async with async_session() as session:
result = await session.execute(
select(UserProfile).where(UserProfile.user_id == user_id)
)
profile = result.scalars().first()
if profile is None:
profile = UserProfile(
user_id=user_id,
phone_number=phone_number,
last_interaction=datetime.utcnow()
)
session.add(profile)
await session.commit()
return profile
async def update_user_last_interaction(user_id: str):
async with async_session() as session:
result = await session.execute(
select(UserProfile).where(UserProfile.user_id == user_id)
)
profile = result.scalars().first()
if profile:
profile.last_interaction = datetime.utcnow()
await session.commit()
async def send_proactive_greeting(user_id: str):
greeting = "Hi again! We miss you. Would you like to see our new menu items or get personalized recommendations?"
await log_chat_to_db(user_id, "outbound", greeting)
return greeting
app = FastAPI()
@app.on_event("startup")
async def on_startup():
await init_db()
@app.post("/chatbot")
async def chatbot_response(request: Request, background_tasks: BackgroundTasks):
data = await request.json()
user_id = data.get("user_id")
user_message = data.get("message", "").strip()
if user_id not in conversation_context:
conversation_context[user_id] = []
conversation_context[user_id].append({
"timestamp": datetime.utcnow().isoformat(),
"role": "user",
"message": user_message
})
background_tasks.add_task(log_chat_to_db, user_id, "inbound", user_message)
sentiment_score = analyze_sentiment(user_message)
background_tasks.add_task(log_sentiment, user_id, user_message, sentiment_score)
sentiment_modifier = "Great to hear from you! " if sentiment_score > 0.3 else ""
if user_message.strip() == "1" or "menu" in user_message.lower():
if user_id in user_state:
del user_state[user_id]
menu_with_images = []
for index, item in enumerate(menu_items, start=1):
image_url = google_image_scrape(item["name"])
menu_with_images.append({
"number": index,
"name": item["name"],
"description": item["description"],
"price": item["price"],
"image_url": image_url
})
response_payload = {
"response": sentiment_modifier + "Here’s our delicious menu:",
"menu": menu_with_images,
"follow_up": (
"To order, type the *number* or *name* of the dish you'd like. "
"For example, type '1' or 'Jollof Rice' to order Jollof Rice.\n\n"
"You can also ask for nutritional facts by typing, for example, 'Nutritional facts for Jollof Rice'."
)
}
background_tasks.add_task(log_chat_to_db, user_id, "outbound", str(response_payload))
conversation_context[user_id].append({
"timestamp": datetime.utcnow().isoformat(),
"role": "bot",
"message": response_payload["response"]
})
return JSONResponse(content=response_payload)
if is_order_intent(user_message) or (user_id in user_state and user_state[user_id].flow == "order"):
order_response = await process_order_flow(user_id, user_message)
if order_response:
background_tasks.add_task(log_chat_to_db, user_id, "outbound", order_response)
conversation_context[user_id].append({
"timestamp": datetime.utcnow().isoformat(),
"role": "bot",
"message": order_response
})
return JSONResponse(content={"response": sentiment_modifier + order_response})
# Instead of calling the LLM fallback, use a default response:
default_response = "I'm sorry, I didn't understand that. Please type 'menu' to see our options or 'order' to place an order."
background_tasks.add_task(log_chat_to_db, user_id, "outbound", default_response)
conversation_context[user_id].append({
"timestamp": datetime.utcnow().isoformat(),
"role": "bot",
"message": default_response
})
return JSONResponse(content={"response": sentiment_modifier + default_response})
@app.get("/chat_history/{user_id}")
async def get_chat_history(user_id: str):
async with async_session() as session:
result = await session.execute(
ChatHistory.__table__.select().where(ChatHistory.user_id == user_id)
)
history = result.fetchall()
return [dict(row) for row in history]
@app.get("/order/{order_id}")
async def get_order(order_id: str):
async with async_session() as session:
result = await session.execute(
Order.__table__.select().where(Order.order_id == order_id)
)
order = result.fetchone()
if order:
return dict(order)
else:
raise HTTPException(status_code=404, detail="Order not found.")
@app.get("/user_profile/{user_id}")
async def get_user_profile(user_id: str):
profile = await get_or_create_user_profile(user_id)
return {
"user_id": profile.user_id,
"phone_number": profile.phone_number,
"name": profile.name,
"email": profile.email,
"preferences": profile.preferences,
"last_interaction": profile.last_interaction.isoformat(),
"order_ids": profile.order_ids
}
@app.get("/analytics")
async def get_analytics():
async with async_session() as session:
msg_result = await session.execute(ChatHistory.__table__.count())
total_messages = msg_result.scalar() or 0
order_result = await session.execute(Order.__table__.count())
total_orders = order_result.scalar() or 0
sentiment_result = await session.execute("SELECT AVG(sentiment_score) FROM sentiment_logs")
avg_sentiment = sentiment_result.scalar() or 0
return {
"total_messages": total_messages,
"total_orders": total_orders,
"average_sentiment": avg_sentiment
}
HUGGING_FACE_API_TOKEN = os.getenv("HUGGING_FACE_API_TOKEN")
if not HUGGING_FACE_API_TOKEN:
raise ValueError("Hugging Face API token not found in environment variables.")
WHISPER_API_URL = "https://router.huggingface.co/fal-ai"
WHISPER_API_HEADERS = {"Authorization": f"Bearer {HUGGING_FACE_API_TOKEN}"}
class TranscriptionResponse(BaseModel):
transcription: str
@app.post("/voice", response_model=TranscriptionResponse)
async def process_voice(file: UploadFile = File(...)):
try:
contents = await file.read()
temp_file_path = f"temp_{file.filename}"
with open(temp_file_path, "wb") as temp_file:
temp_file.write(contents)
with open(temp_file_path, "rb") as audio_file:
response = requests.post(
WHISPER_API_URL,
headers=WHISPER_API_HEADERS,
files={"file": audio_file}
)
os.remove(temp_file_path)
if response.status_code != 200:
raise HTTPException(status_code=response.status_code, detail="Failed to transcribe audio.")
transcription = response.json().get("text", "")
return {"transcription": transcription}
except Exception as e:
raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}")
@app.api_route("/payment_callback", methods=["GET", "POST"])
async def payment_callback(request: Request):
if request.method == "GET":
params = request.query_params
order_id = params.get("reference")
status = params.get("status", "Paid")
if not order_id:
raise HTTPException(status_code=400, detail="Missing order reference in callback.")
async with async_session() as session:
result = await session.execute(
Order.__table__.select().where(Order.order_id == order_id)
)
order = result.scalar_one_or_none()
if order:
order.status = status
await session.commit()
else:
raise HTTPException(status_code=404, detail="Order not found.")
await log_order_tracking(order_id, "Payment Confirmed", f"Payment status updated to {status}.")
await asyncio.to_thread(send_whatsapp_message, MANAGEMENT_WHATSAPP_NUMBER,
f"Payment Update:\nOrder ID: {order_id} is now {status}."
)
redirect_url = f"https://wa.link/am87s2"
return RedirectResponse(url=redirect_url)
else:
data = await request.json()
order_id = data.get("reference")
new_status = data.get("status", "Paid")
if not order_id:
raise HTTPException(status_code=400, detail="Missing order reference in callback.")
async with async_session() as session:
result = await session.execute(
Order.__table__.select().where(Order.order_id == order_id)
)
order = result.scalar_one_or_none()
if order:
order.status = new_status
await session.commit()
await log_order_tracking(order_id, "Payment Confirmed", f"Payment status updated to {new_status}.")
await asyncio.to_thread(send_whatsapp_message, MANAGEMENT_WHATSAPP_NUMBER,
f"Payment Update:\nOrder ID: {order_id} is now {new_status}."
)
return JSONResponse(content={"message": "Order updated successfully."})
else:
raise HTTPException(status_code=404, detail="Order not found.")
@app.get("/track_order/{order_id}")
async def track_order(order_id: str):
async with async_session() as session:
result = await session.execute(
select(OrderTracking)
.where(OrderTracking.order_id == order_id)
.order_by(OrderTracking.timestamp)
)
tracking_updates = result.scalars().all()
if tracking_updates:
response = []
for update in tracking_updates:
response.append({
"status": update.status,
"message": update.message,
"timestamp": update.timestamp.isoformat(),
})
return JSONResponse(content=response)
else:
raise HTTPException(status_code=404, detail="No tracking information found for this order.")