| import re |
| import os |
| import time |
| import requests |
| import base64 |
| import asyncio |
| from datetime import datetime, timedelta |
| from bs4 import BeautifulSoup |
| from sqlalchemy import select |
| from pydantic import BaseModel |
|
|
| from fastapi import FastAPI, Request, HTTPException, BackgroundTasks, UploadFile, File, Form |
| from fastapi.responses import JSONResponse, StreamingResponse, RedirectResponse |
|
|
| import openai |
|
|
| from textblob import TextBlob |
|
|
| from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession |
| from sqlalchemy.orm import sessionmaker, declarative_base |
| from sqlalchemy import Column, Integer, String, DateTime, Text, Float |
|
|
| SPOONACULAR_API_KEY = os.getenv("SPOONACULAR_API_KEY", "default_fallback_value") |
| PAYSTACK_SECRET_KEY = os.getenv("PAYSTACK_SECRET_KEY", "default_fallback_value") |
| DATABASE_URL = os.getenv("DATABASE_URL", "default_fallback_value") |
| NVIDIA_API_KEY = os.getenv("NVIDIA_API_KEY", "default_fallback_value") |
| openai.api_key = os.getenv("OPENAI_API_KEY", "default_fallback_value") |
| GOOGLE_MAPS_API_KEY = os.getenv("GOOGLE_MAPS_API_KEY", "default_fallback_value") |
|
|
| WHATSAPP_PHONE_NUMBER_ID = os.getenv("WHATSAPP_PHONE_NUMBER_ID", "default_value") |
| WHATSAPP_ACCESS_TOKEN = os.getenv("WHATSAPP_ACCESS_TOKEN", "default_value") |
| MANAGEMENT_WHATSAPP_NUMBER = os.getenv("MANAGEMENT_WHATSAPP_NUMBER", "default_value") |
|
|
| TOWN_SHIPPING_COSTS = { |
| "lasu gate": 1000, |
| "ojo": 800, |
| "ajangbadi": 1200, |
| "iba": 900, |
| "okokomaiko": 1500, |
| "default": 1000 |
| } |
|
|
| Base = declarative_base() |
|
|
| class ChatHistory(Base): |
| __tablename__ = "chat_history" |
| id = Column(Integer, primary_key=True, index=True) |
| user_id = Column(String, index=True) |
| timestamp = Column(DateTime, default=datetime.utcnow) |
| direction = Column(String) |
| message = Column(Text) |
|
|
| class Order(Base): |
| __tablename__ = "orders" |
| id = Column(Integer, primary_key=True, index=True) |
| order_id = Column(String, unique=True, index=True) |
| user_id = Column(String, index=True) |
| dish = Column(String) |
| quantity = Column(String) |
| price = Column(String, default="0") |
| status = Column(String, default="Pending Payment") |
| payment_reference = Column(String, nullable=True) |
| delivery_address = Column(String, default="") |
| timestamp = Column(DateTime, default=datetime.utcnow) |
|
|
| class UserProfile(Base): |
| __tablename__ = "user_profiles" |
| id = Column(Integer, primary_key=True, index=True) |
| user_id = Column(String, unique=True, index=True) |
| phone_number = Column(String, unique=True, index=True, nullable=True) |
| name = Column(String, default="Valued Customer") |
| email = Column(String, default="unknown@example.com") |
| preferences = Column(Text, default="") |
| last_interaction = Column(DateTime, default=datetime.utcnow) |
| order_ids = Column(Text, default="") |
|
|
| class SentimentLog(Base): |
| __tablename__ = "sentiment_logs" |
| id = Column(Integer, primary_key=True, index=True) |
| user_id = Column(String, index=True) |
| timestamp = Column(DateTime, default=datetime.utcnow) |
| sentiment_score = Column(Float) |
| message = Column(Text) |
|
|
| class OrderTracking(Base): |
| __tablename__ = "order_tracking" |
| id = Column(Integer, primary_key=True, index=True) |
| order_id = Column(String, index=True) |
| status = Column(String) |
| message = Column(Text, nullable=True) |
| timestamp = Column(DateTime, default=datetime.utcnow) |
|
|
| engine = create_async_engine(DATABASE_URL, echo=True) |
| async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) |
|
|
| async def init_db(): |
| async with engine.begin() as conn: |
| await conn.run_sync(Base.metadata.create_all) |
|
|
| user_state = {} |
| conversation_context = {} |
| proactive_timer = {} |
|
|
| menu_items = [ |
| {"name": "Jollof Rice", "description": "A spicy and flavorful rice dish", "price": 1500, "nutrition": "Calories: 300 kcal, Carbs: 50g, Protein: 10g, Fat: 5g"}, |
| {"name": "Fried Rice", "description": "A savory rice dish with vegetables and meat", "price": 1200, "nutrition": "Calories: 350 kcal, Carbs: 55g, Protein: 12g, Fat: 8g"}, |
| {"name": "Chicken Wings", "description": "Crispy fried chicken wings", "price": 2000, "nutrition": "Calories: 400 kcal, Carbs: 20g, Protein: 25g, Fat: 15g"}, |
| {"name": "Egusi Soup", "description": "A rich and hearty soup made with melon seeds", "price": 1000, "nutrition": "Calories: 250 kcal, Carbs: 15g, Protein: 8g, Fat: 10g"} |
| ] |
|
|
| class ConversationState: |
| def __init__(self): |
| self.flow = None |
| self.step = 0 |
| self.data = {} |
| self.last_active = datetime.utcnow() |
|
|
| def update_last_active(self): |
| self.last_active = datetime.utcnow() |
|
|
| def is_expired(self): |
| return datetime.utcnow() - self.last_active > SESSION_TIMEOUT |
|
|
| def reset(self): |
| self.flow = None |
| self.step = 0 |
| self.data = {} |
| self.last_active = datetime.utcnow() |
|
|
| SESSION_TIMEOUT = timedelta(minutes=5) |
|
|
| async def log_chat_to_db(user_id: str, direction: str, message: str): |
| async with async_session() as session: |
| entry = ChatHistory(user_id=user_id, direction=direction, message=message) |
| session.add(entry) |
| await session.commit() |
|
|
| async def log_sentiment(user_id: str, message: str, score: float): |
| async with async_session() as session: |
| entry = SentimentLog(user_id=user_id, sentiment_score=score, message=message) |
| session.add(entry) |
| await session.commit() |
|
|
| def analyze_sentiment(text: str) -> float: |
| blob = TextBlob(text) |
| return blob.sentiment.polarity |
|
|
| def google_image_scrape(query: str) -> str: |
| headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"} |
| search_url = f"https://www.google.com/search?tbm=isch&q={query}" |
| try: |
| response = requests.get(search_url, headers=headers, timeout=5) |
| except Exception: |
| return "" |
| if response.status_code == 200: |
| soup = BeautifulSoup(response.text, "html.parser") |
| img_tags = soup.find_all("img") |
| for img in img_tags: |
| src = img.get("src") |
| if src and src.startswith("http"): |
| return src |
| return "" |
|
|
| def create_paystack_payment_link(email: str, amount: int, reference: str) -> dict: |
| url = "https://api.paystack.co/transaction/initialize" |
| headers = { |
| "Authorization": f"Bearer {PAYSTACK_SECRET_KEY}", |
| "Content-Type": "application/json", |
| } |
| data = { |
| "email": email, |
| "amount": amount, |
| "reference": reference, |
| "callback_url": "https://custy-bot.vercel.app/payment_callback" |
| } |
| try: |
| response = requests.post(url, json=data, headers=headers, timeout=10) |
| if response.status_code == 200: |
| return response.json() |
| else: |
| return {"status": False, "message": "Failed to initialize payment."} |
| except Exception as e: |
| return {"status": False, "message": str(e)} |
|
|
| def send_whatsapp_message(recipient: str, message_body: str) -> dict: |
| url = f"https://graph.facebook.com/v15.0/{WHATSAPP_PHONE_NUMBER_ID}/messages" |
| headers = { |
| "Authorization": f"Bearer {WHATSAPP_ACCESS_TOKEN}", |
| "Content-Type": "application/json" |
| } |
| payload = { |
| "messaging_product": "whatsapp", |
| "to": recipient, |
| "type": "text", |
| "text": {"body": message_body} |
| } |
| response = requests.post(url, headers=headers, json=payload) |
| return response.json() |
|
|
| def stream_text_completion(prompt: str): |
| from openai import OpenAI |
| client = OpenAI( |
| base_url="https://integrate.api.nvidia.com/v1", |
| api_key=NVIDIA_API_KEY |
| ) |
| try: |
| completion = client.chat.completions.create( |
| model="meta/llama-3.1-405b-instruct", |
| messages=[{"role": "user", "content": prompt}], |
| temperature=0.2, |
| top_p=0.7, |
| max_tokens=1024, |
| stream=True |
| ) |
| for chunk in completion: |
| if chunk.choices[0].delta.content is not None: |
| yield chunk.choices[0].delta.content |
| except Exception as e: |
| yield f"Error: {str(e)}" |
|
|
| def stream_image_completion(image_b64: str): |
| invoke_url = "https://ai.api.nvidia.com/v1/gr/meta/llama-3.2-90b-vision-instruct/chat/completions" |
| headers = { |
| "Authorization": f"Bearer {NVIDIA_API_KEY}", |
| "Accept": "text/event-stream" |
| } |
| payload = { |
| "model": "meta/llama-3.2-90b-vision-instruct", |
| "messages": [ |
| { |
| "role": "user", |
| "content": f'What is in this image? <img src="data:image/png;base64,{image_b64}" />' |
| } |
| ], |
| "max_tokens": 512, |
| "temperature": 1.00, |
| "top_p": 1.00, |
| "stream": True |
| } |
| response = requests.post(invoke_url, headers=headers, json=payload, stream=True) |
| for line in response.iter_lines(): |
| if line: |
| yield line.decode("utf-8") + "\n" |
|
|
| async def log_order_tracking(order_id: str, status: str, message: str = None): |
| async with async_session() as session: |
| tracking_entry = OrderTracking( |
| order_id=order_id, |
| status=status, |
| message=message |
| ) |
| session.add(tracking_entry) |
| await session.commit() |
|
|
| from fuzzywuzzy import fuzz |
|
|
|
|
| def match_dish(user_input: str, threshold: int = 70) -> str: |
| best_match = None |
| best_score = 0 |
| for item in menu_items: |
| dish_name = item["name"] |
| score = fuzz.token_sort_ratio(user_input.lower(), dish_name.lower()) |
| if score > best_score: |
| best_score = score |
| best_match = dish_name |
| if best_score >= threshold: |
| return best_match |
| return None |
|
|
|
|
| def calculate_shipping_cost(address: str) -> int: |
| address_lower = address.lower() |
| for area, cost in TOWN_SHIPPING_COSTS.items(): |
| if area in address_lower: |
| return cost |
| return TOWN_SHIPPING_COSTS["default"] |
|
|
| def calculate_eta(destination: str) -> str: |
| if not GOOGLE_MAPS_API_KEY: |
| return "ETA unavailable (Google Maps API key missing)." |
|
|
| origin = "Plot 13 Isashi Road, Iyana Isashi, Off Lagos - Badagry Expy, Lagos" |
| url = f"https://maps.googleapis.com/maps/api/directions/json?origin={origin}&destination={destination}&key={GOOGLE_MAPS_API_KEY}" |
| |
| try: |
| response = requests.get(url, timeout=10) |
| if response.status_code == 200: |
| data = response.json() |
| if data.get("routes"): |
| duration = data["routes"][0]["legs"][0]["duration"]["text"] |
| return f"Estimated delivery time: {duration}" |
| else: |
| return "ETA unavailable (no route found)." |
| else: |
| return "ETA unavailable (API error)." |
| except Exception as e: |
| return f"ETA unavailable (error: {str(e)})." |
|
|
| def is_order_intent(message: str) -> bool: |
| order_keywords = ["order", "menu", "dish", "food", "deliver", "hungry"] |
| order_phrases = ["i want to order", "can i order", "i'd like to order", "get food", "place an order"] |
| message_lower = message.lower() |
| |
| |
| dish_candidates = [item["name"].lower() for item in menu_items] |
| for dish in dish_candidates: |
| if dish in message_lower: |
| return True |
|
|
| for phrase in order_phrases: |
| if phrase in message_lower: |
| return True |
| for keyword in order_keywords: |
| if re.search(rf"\b{keyword}\b", message_lower): |
| return True |
| return False |
|
|
| async def track_order(user_id: str, order_id: str) -> str: |
| async with async_session() as session: |
| order_result = await session.execute( |
| select(Order).where(Order.order_id == order_id) |
| ) |
| order = order_result.scalars().first() |
| if not order: |
| return "Order not found. Please check your order ID." |
|
|
| tracking_result = await session.execute( |
| select(OrderTracking) |
| .where(OrderTracking.order_id == order_id) |
| .order_by(OrderTracking.timestamp) |
| ) |
| tracking_updates = tracking_result.scalars().all() |
|
|
| eta = calculate_eta(order.delivery_address) |
|
|
| response = f"Order ID: {order_id}\nStatus: {order.status}\n" |
| if tracking_updates: |
| response += "Tracking Updates:\n" |
| for update in tracking_updates: |
| response += f"- {update.status} ({update.timestamp}): {update.message or 'No details'}\n" |
| response += f"\n{eta}" |
| return response |
|
|
| async def update_user_profile(user_id: str, phone_number: str = None, address: str = None): |
| async with async_session() as session: |
| result = await session.execute( |
| select(UserProfile).where(UserProfile.user_id == user_id) |
| ) |
| profile = result.scalars().first() |
| if profile: |
| if phone_number: |
| profile.phone_number = phone_number |
| if address: |
| |
| profile.address = address |
| else: |
| profile = UserProfile(user_id=user_id, phone_number=phone_number) |
| session.add(profile) |
| await session.commit() |
|
|
| async def update_user_profile_with_order(user_id: str, order_id: str): |
| async with async_session() as session: |
| result = await session.execute( |
| select(UserProfile).where(UserProfile.user_id == user_id) |
| ) |
| profile = result.scalars().first() |
| if profile: |
| if profile.order_ids: |
| profile.order_ids += f",{order_id}" |
| else: |
| profile.order_ids = order_id |
| await session.commit() |
|
|
| async def process_order_flow(user_id: str, message: str) -> str: |
| state = user_state.get(user_id) |
| if state and state.is_expired(): |
| state.reset() |
| del user_state[user_id] |
| state = None |
|
|
| if message.lower() in ["order", "menu"]: |
| state = ConversationState() |
| state.flow = "order" |
| state.step = 1 |
| state.update_last_active() |
| user_state[user_id] = state |
| if message.lower() == "order": |
| return "Sure! What dish would you like to order?" |
| return "" |
|
|
| if not state and "order" in message.lower(): |
| state = ConversationState() |
| state.flow = "order" |
| state.step = 1 |
| state.update_last_active() |
| user_state[user_id] = state |
| return "Sure! What dish would you like to order?" |
|
|
| |
| if not state or state.flow != "order": |
| found_dish = match_dish(message) |
| if found_dish: |
| state = ConversationState() |
| state.flow = "order" |
| state.data["dish"] = found_dish |
| state.update_last_active() |
| user_state[user_id] = state |
| numbers = re.findall(r'\d+', message) |
| if numbers: |
| quantity = int(numbers[0]) |
| if quantity <= 0: |
| return "Please enter a valid quantity (e.g., 1, 2, 3)." |
| state.data["quantity"] = quantity |
| state.step = 3 |
| phone_pattern = r'(\+?\d{10,15})' |
| phone_match = re.search(phone_pattern, message) |
| address = None |
| if phone_match: |
| phone_number = phone_match.group(1) |
| address_start = phone_match.end() |
| address = message[address_start:].strip() |
| address = re.sub(r'^[,\s]+|[,\s]+$', '', address) |
| if phone_match and address: |
| state.data["phone_number"] = phone_number |
| state.data["address"] = address |
| asyncio.create_task(update_user_profile(user_id, phone_number, address)) |
| shipping_cost = calculate_shipping_cost(address) |
| state.data["shipping_cost"] = shipping_cost |
| state.step = 5 |
| return (f"Thanks! Your phone number is recorded as: {phone_number}.\n" |
| f"Your delivery address is: {address}.\n" |
| f"Your delivery cost is N{shipping_cost}. Would you like to add extras (yes/no)?") |
| elif phone_match: |
| state.data["phone_number"] = phone_match.group(1) |
| asyncio.create_task(update_user_profile(user_id, phone_number)) |
| return "Thank you. Please provide your delivery address." |
| else: |
| return ("Please provide both your phone number and delivery address. " |
| "For example: '09162409591, 1, Iyana Isashi, Isashi, Ojo, Lagos'.") |
| else: |
| state.step = 2 |
| return f"You selected {found_dish}. How many servings would you like?" |
| |
| |
| if state and state.flow == "order": |
| state.update_last_active() |
| if state.step == 1: |
| |
| found_dish = match_dish(message) |
| numbers = re.findall(r'\d+', message) |
| if found_dish: |
| state.data["dish"] = found_dish |
| if numbers: |
| quantity = int(numbers[0]) |
| if quantity <= 0: |
| return "Please enter a valid quantity (e.g., 1, 2, 3)." |
| state.data["quantity"] = quantity |
| state.step = 3 |
| return (f"You selected {found_dish} with {quantity} serving(s). " |
| "Please provide your phone number and delivery address.") |
| else: |
| state.step = 2 |
| return f"You selected {found_dish}. How many servings would you like?" |
| else: |
| return "I couldn't identify the dish. Please type the dish name from our menu." |
| |
| if state.step == 2: |
| numbers = re.findall(r'\d+', message) |
| if not numbers: |
| return "Please enter a valid number for the quantity (e.g., 1, 2, 3)." |
| quantity = int(numbers[0]) |
| if quantity <= 0: |
| return "Please enter a valid quantity (e.g., 1, 2, 3)." |
| state.data["quantity"] = quantity |
| state.step = 3 |
| return f"Got it. {quantity} serving(s) of {state.data.get('dish')}. Please provide your phone number and delivery address." |
| |
| if state.step == 3: |
| phone_pattern = r'(\+?\d{10,15})' |
| phone_match = re.search(phone_pattern, message) |
| address = None |
| if phone_match: |
| phone_number = phone_match.group(1) |
| address_start = phone_match.end() |
| address = message[address_start:].strip() |
| address = re.sub(r'^[,\s]+|[,\s]+$', '', address) |
| if phone_match and address: |
| state.data["phone_number"] = phone_number |
| state.data["address"] = address |
| asyncio.create_task(update_user_profile(user_id, phone_number, address)) |
| shipping_cost = calculate_shipping_cost(address) |
| state.data["shipping_cost"] = shipping_cost |
| state.step = 5 |
| return (f"Thanks! Your phone number is recorded as: {phone_number}.\n" |
| f"Your delivery address is: {address}.\n" |
| f"Your delivery cost is N{shipping_cost}. Would you like extras (yes/no)?") |
| elif phone_match: |
| state.data["phone_number"] = phone_match.group(1) |
| asyncio.create_task(update_user_profile(user_id, phone_number)) |
| return "Thank you. Please provide your delivery address." |
| else: |
| return ("Please provide both your phone number and delivery address. " |
| "For example: '09162409591, 1, Iyana Isashi, Isashi, Ojo, Lagos'.") |
| |
| |
| if state.step == 4: |
| state.data["address"] = message |
| asyncio.create_task(update_user_profile(user_id, address=message)) |
| shipping_cost = calculate_shipping_cost(message) |
| state.data["shipping_cost"] = shipping_cost |
| state.step = 5 |
| return (f"Thanks. Your delivery address is recorded as: {message}.\n" |
| f"Your delivery cost is N{shipping_cost}. Would you like extras (yes/no)?") |
| |
| if state.step == 5: |
| if message.lower() in ["yes", "y"]: |
| state.step = 6 |
| return "Please list the extras you'd like (e.g., drinks, sides)." |
| elif message.lower() in ["no", "n"]: |
| state.data["extras"] = "" |
| state.step = 7 |
| dish = state.data.get("dish", "") |
| quantity = state.data.get("quantity", 1) |
| phone = state.data.get("phone_number", "") |
| address = state.data.get("address", "") |
| shipping_cost = state.data.get("shipping_cost", 0) |
| price_per_serving = 1500 |
| total_price = (quantity * price_per_serving) + shipping_cost |
| summary = (f"Order Summary:\nDish: {dish}\nQuantity: {quantity}\n" |
| f"Phone: {phone}\nAddress: {address}\n" |
| f"Shipping Cost: N{shipping_cost}\n" |
| f"Total Price: N{total_price}\n" |
| f"Extras: None\nConfirm order? (yes/no)") |
| return summary |
| else: |
| return "Please respond with 'yes' or 'no' regarding extras." |
| |
| if state.step == 6: |
| state.data["extras"] = message |
| state.step = 7 |
| dish = state.data.get("dish", "") |
| quantity = state.data.get("quantity", 1) |
| phone = state.data.get("phone_number", "") |
| address = state.data.get("address", "") |
| shipping_cost = state.data.get("shipping_cost", 0) |
| extras = state.data.get("extras", "") |
| price_per_serving = 1500 |
| total_price = (quantity * price_per_serving) + shipping_cost |
| summary = (f"Order Summary:\nDish: {dish}\nQuantity: {quantity}\n" |
| f"Phone: {phone}\nAddress: {address}\n" |
| f"Shipping Cost: N{shipping_cost}\n" |
| f"Total Price: N{total_price}\n" |
| f"Extras: {extras}\nConfirm order? (yes/no)") |
| return summary |
| |
| if state.step == 7: |
| if message.lower() in ["yes", "y"]: |
| order_id = f"ORD-{int(time.time())}" |
| state.data["order_id"] = order_id |
| price_per_serving = 1500 |
| quantity = state.data.get("quantity", 1) |
| shipping_cost = state.data.get("shipping_cost", 0) |
| total_price = (quantity * price_per_serving) + shipping_cost |
| state.data["price"] = str(total_price) |
| |
| async def save_order(): |
| async with async_session() as session: |
| order = Order( |
| order_id=order_id, |
| user_id=user_id, |
| dish=state.data["dish"], |
| quantity=str(quantity), |
| price=str(total_price), |
| status="Pending Payment", |
| delivery_address=state.data.get("address", ""), |
| shipping_cost=str(shipping_cost) |
| ) |
| session.add(order) |
| await session.commit() |
| asyncio.create_task(save_order()) |
| asyncio.create_task(log_order_tracking(order_id, "Order Placed", "Order placed and awaiting payment.")) |
| |
| async def notify_management_order(order_details: dict): |
| message_body = ( |
| f"New Order Received:\n" |
| f"Order ID: {order_details['order_id']}\n" |
| f"Dish: {order_details['dish']}\n" |
| f"Quantity: {order_details['quantity']}\n" |
| f"Total Price: {order_details['price']}\n" |
| f"Phone: {state.data.get('phone_number', '')}\n" |
| f"Delivery Address: {order_details.get('address', 'Not Provided')}\n" |
| f"Extras: {state.data.get('extras', 'None')}\n" |
| f"Status: Pending Payment" |
| ) |
| await asyncio.to_thread(send_whatsapp_message, MANAGEMENT_WHATSAPP_NUMBER, message_body) |
| order_details = { |
| "order_id": order_id, |
| "dish": state.data["dish"], |
| "quantity": state.data["quantity"], |
| "price": state.data["price"], |
| "address": state.data.get("address", "") |
| } |
| asyncio.create_task(notify_management_order(order_details)) |
| |
| email = "customer@example.com" |
| payment_data = create_paystack_payment_link(email, total_price * 100, order_id) |
| dish_name = state.data.get("dish", "") |
| state.reset() |
| if user_id in user_state: |
| del user_state[user_id] |
| if payment_data.get("status"): |
| payment_link = payment_data["data"]["authorization_url"] |
| return (f"Thank you for your order of {quantity} serving(s) of {dish_name}! " |
| f"Your Order ID is {order_id}.\nPlease complete payment here: {payment_link}\n" |
| "You can track your order status using your Order ID.\n" |
| "Is there anything else you'd like to order?") |
| else: |
| return (f"Your order has been placed with Order ID {order_id}, " |
| "but we could not initialize payment. Please try again later.") |
| else: |
| state.reset() |
| if user_id in user_state: |
| del user_state[user_id] |
| return "Order canceled. Let me know if you'd like to try again." |
| return "" |
|
|
| |
| async def get_or_create_user_profile(user_id: str, phone_number: str = None) -> UserProfile: |
| async with async_session() as session: |
| result = await session.execute( |
| select(UserProfile).where(UserProfile.user_id == user_id) |
| ) |
| profile = result.scalars().first() |
| if profile is None: |
| profile = UserProfile( |
| user_id=user_id, |
| phone_number=phone_number, |
| last_interaction=datetime.utcnow() |
| ) |
| session.add(profile) |
| await session.commit() |
| return profile |
|
|
| async def update_user_last_interaction(user_id: str): |
| async with async_session() as session: |
| result = await session.execute( |
| select(UserProfile).where(UserProfile.user_id == user_id) |
| ) |
| profile = result.scalars().first() |
| if profile: |
| profile.last_interaction = datetime.utcnow() |
| await session.commit() |
|
|
| async def send_proactive_greeting(user_id: str): |
| greeting = "Hi again! We miss you. Would you like to see our new menu items or get personalized recommendations?" |
| await log_chat_to_db(user_id, "outbound", greeting) |
| return greeting |
|
|
| app = FastAPI() |
|
|
| @app.on_event("startup") |
| async def on_startup(): |
| await init_db() |
|
|
| @app.post("/chatbot") |
| async def chatbot_response(request: Request, background_tasks: BackgroundTasks): |
| data = await request.json() |
| user_id = data.get("user_id") |
| user_message = data.get("message", "").strip() |
|
|
| if user_id not in conversation_context: |
| conversation_context[user_id] = [] |
|
|
| conversation_context[user_id].append({ |
| "timestamp": datetime.utcnow().isoformat(), |
| "role": "user", |
| "message": user_message |
| }) |
|
|
| background_tasks.add_task(log_chat_to_db, user_id, "inbound", user_message) |
|
|
| sentiment_score = analyze_sentiment(user_message) |
| background_tasks.add_task(log_sentiment, user_id, user_message, sentiment_score) |
| sentiment_modifier = "Great to hear from you! " if sentiment_score > 0.3 else "" |
|
|
| if user_message.strip() == "1" or "menu" in user_message.lower(): |
| if user_id in user_state: |
| del user_state[user_id] |
| menu_with_images = [] |
| for index, item in enumerate(menu_items, start=1): |
| image_url = google_image_scrape(item["name"]) |
| menu_with_images.append({ |
| "number": index, |
| "name": item["name"], |
| "description": item["description"], |
| "price": item["price"], |
| "image_url": image_url |
| }) |
| response_payload = { |
| "response": sentiment_modifier + "Here’s our delicious menu:", |
| "menu": menu_with_images, |
| "follow_up": ( |
| "To order, type the *number* or *name* of the dish you'd like. " |
| "For example, type '1' or 'Jollof Rice' to order Jollof Rice.\n\n" |
| "You can also ask for nutritional facts by typing, for example, 'Nutritional facts for Jollof Rice'." |
| ) |
| } |
| background_tasks.add_task(log_chat_to_db, user_id, "outbound", str(response_payload)) |
| conversation_context[user_id].append({ |
| "timestamp": datetime.utcnow().isoformat(), |
| "role": "bot", |
| "message": response_payload["response"] |
| }) |
| return JSONResponse(content=response_payload) |
|
|
| if is_order_intent(user_message) or (user_id in user_state and user_state[user_id].flow == "order"): |
| order_response = await process_order_flow(user_id, user_message) |
| if order_response: |
| background_tasks.add_task(log_chat_to_db, user_id, "outbound", order_response) |
| conversation_context[user_id].append({ |
| "timestamp": datetime.utcnow().isoformat(), |
| "role": "bot", |
| "message": order_response |
| }) |
| return JSONResponse(content={"response": sentiment_modifier + order_response}) |
|
|
| |
| default_response = "I'm sorry, I didn't understand that. Please type 'menu' to see our options or 'order' to place an order." |
| background_tasks.add_task(log_chat_to_db, user_id, "outbound", default_response) |
| conversation_context[user_id].append({ |
| "timestamp": datetime.utcnow().isoformat(), |
| "role": "bot", |
| "message": default_response |
| }) |
| return JSONResponse(content={"response": sentiment_modifier + default_response}) |
|
|
|
|
| @app.get("/chat_history/{user_id}") |
| async def get_chat_history(user_id: str): |
| async with async_session() as session: |
| result = await session.execute( |
| ChatHistory.__table__.select().where(ChatHistory.user_id == user_id) |
| ) |
| history = result.fetchall() |
| return [dict(row) for row in history] |
|
|
| @app.get("/order/{order_id}") |
| async def get_order(order_id: str): |
| async with async_session() as session: |
| result = await session.execute( |
| Order.__table__.select().where(Order.order_id == order_id) |
| ) |
| order = result.fetchone() |
| if order: |
| return dict(order) |
| else: |
| raise HTTPException(status_code=404, detail="Order not found.") |
|
|
| @app.get("/user_profile/{user_id}") |
| async def get_user_profile(user_id: str): |
| profile = await get_or_create_user_profile(user_id) |
| return { |
| "user_id": profile.user_id, |
| "phone_number": profile.phone_number, |
| "name": profile.name, |
| "email": profile.email, |
| "preferences": profile.preferences, |
| "last_interaction": profile.last_interaction.isoformat(), |
| "order_ids": profile.order_ids |
| } |
|
|
| @app.get("/analytics") |
| async def get_analytics(): |
| async with async_session() as session: |
| msg_result = await session.execute(ChatHistory.__table__.count()) |
| total_messages = msg_result.scalar() or 0 |
| order_result = await session.execute(Order.__table__.count()) |
| total_orders = order_result.scalar() or 0 |
| sentiment_result = await session.execute("SELECT AVG(sentiment_score) FROM sentiment_logs") |
| avg_sentiment = sentiment_result.scalar() or 0 |
| return { |
| "total_messages": total_messages, |
| "total_orders": total_orders, |
| "average_sentiment": avg_sentiment |
| } |
|
|
| HUGGING_FACE_API_TOKEN = os.getenv("HUGGING_FACE_API_TOKEN") |
| if not HUGGING_FACE_API_TOKEN: |
| raise ValueError("Hugging Face API token not found in environment variables.") |
|
|
| WHISPER_API_URL = "https://router.huggingface.co/fal-ai" |
| WHISPER_API_HEADERS = {"Authorization": f"Bearer {HUGGING_FACE_API_TOKEN}"} |
|
|
| class TranscriptionResponse(BaseModel): |
| transcription: str |
|
|
| @app.post("/voice", response_model=TranscriptionResponse) |
| async def process_voice(file: UploadFile = File(...)): |
| try: |
| contents = await file.read() |
| temp_file_path = f"temp_{file.filename}" |
| with open(temp_file_path, "wb") as temp_file: |
| temp_file.write(contents) |
|
|
| with open(temp_file_path, "rb") as audio_file: |
| response = requests.post( |
| WHISPER_API_URL, |
| headers=WHISPER_API_HEADERS, |
| files={"file": audio_file} |
| ) |
|
|
| os.remove(temp_file_path) |
|
|
| if response.status_code != 200: |
| raise HTTPException(status_code=response.status_code, detail="Failed to transcribe audio.") |
|
|
| transcription = response.json().get("text", "") |
| return {"transcription": transcription} |
|
|
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}") |
|
|
| @app.api_route("/payment_callback", methods=["GET", "POST"]) |
| async def payment_callback(request: Request): |
| if request.method == "GET": |
| params = request.query_params |
| order_id = params.get("reference") |
| status = params.get("status", "Paid") |
| if not order_id: |
| raise HTTPException(status_code=400, detail="Missing order reference in callback.") |
| async with async_session() as session: |
| result = await session.execute( |
| Order.__table__.select().where(Order.order_id == order_id) |
| ) |
| order = result.scalar_one_or_none() |
| if order: |
| order.status = status |
| await session.commit() |
| else: |
| raise HTTPException(status_code=404, detail="Order not found.") |
| await log_order_tracking(order_id, "Payment Confirmed", f"Payment status updated to {status}.") |
| await asyncio.to_thread(send_whatsapp_message, MANAGEMENT_WHATSAPP_NUMBER, |
| f"Payment Update:\nOrder ID: {order_id} is now {status}." |
| ) |
| redirect_url = f"https://wa.link/am87s2" |
| return RedirectResponse(url=redirect_url) |
| else: |
| data = await request.json() |
| order_id = data.get("reference") |
| new_status = data.get("status", "Paid") |
| if not order_id: |
| raise HTTPException(status_code=400, detail="Missing order reference in callback.") |
| async with async_session() as session: |
| result = await session.execute( |
| Order.__table__.select().where(Order.order_id == order_id) |
| ) |
| order = result.scalar_one_or_none() |
| if order: |
| order.status = new_status |
| await session.commit() |
| await log_order_tracking(order_id, "Payment Confirmed", f"Payment status updated to {new_status}.") |
| await asyncio.to_thread(send_whatsapp_message, MANAGEMENT_WHATSAPP_NUMBER, |
| f"Payment Update:\nOrder ID: {order_id} is now {new_status}." |
| ) |
| return JSONResponse(content={"message": "Order updated successfully."}) |
| else: |
| raise HTTPException(status_code=404, detail="Order not found.") |
|
|
| @app.get("/track_order/{order_id}") |
| async def track_order(order_id: str): |
| |
| async with async_session() as session: |
| result = await session.execute( |
| select(OrderTracking) |
| .where(OrderTracking.order_id == order_id) |
| .order_by(OrderTracking.timestamp) |
| ) |
| tracking_updates = result.scalars().all() |
| if tracking_updates: |
| response = [] |
| for update in tracking_updates: |
| response.append({ |
| "status": update.status, |
| "message": update.message, |
| "timestamp": update.timestamp.isoformat(), |
| }) |
| return JSONResponse(content=response) |
| else: |
| raise HTTPException(status_code=404, detail="No tracking information found for this order.") |
| |