Spaces:
Paused
Paused
| import re | |
| import os | |
| import time | |
| import requests | |
| import base64 | |
| import asyncio | |
| from datetime import datetime, timedelta | |
| from bs4 import BeautifulSoup | |
| from sqlalchemy import select | |
| from fuzzywuzzy import fuzz | |
| from pydantic import BaseModel | |
| import smtplib | |
| from email.mime.multipart import MIMEMultipart | |
| from email.mime.text import MIMEText | |
| from fastapi import FastAPI, Request, HTTPException, BackgroundTasks, UploadFile, File, Form | |
| from fastapi.responses import JSONResponse, StreamingResponse, RedirectResponse | |
| import openai | |
| from textblob import TextBlob | |
| from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession | |
| from sqlalchemy.orm import sessionmaker, declarative_base | |
| from sqlalchemy import Column, Integer, String, DateTime, Text, Float | |
| # Environment keys remain, but you can adjust them if needed. | |
| SPOONACULAR_API_KEY = os.getenv("SPOONACULAR_API_KEY", "default_fallback_value") | |
| PAYSTACK_SECRET_KEY = os.getenv("PAYSTACK_SECRET_KEY", "default_fallback_value") | |
| DATABASE_URL = os.getenv("DATABASE_URL", "default_fallback_value") | |
| NVIDIA_API_KEY = os.getenv("NVIDIA_API_KEY", "default_fallback_value") | |
| openai.api_key = os.getenv("OPENAI_API_KEY", "default_fallback_value") | |
| GOOGLE_MAPS_API_KEY = os.getenv("GOOGLE_MAPS_API_KEY", "default_fallback_value") | |
| WHATSAPP_PHONE_NUMBER_ID = os.getenv("WHATSAPP_PHONE_NUMBER_ID", "default_value") | |
| WHATSAPP_ACCESS_TOKEN = os.getenv("WHATSAPP_ACCESS_TOKEN", "default_value") | |
| MANAGEMENT_WHATSAPP_NUMBER = os.getenv("MANAGEMENT_WHATSAPP_NUMBER", "default_value") | |
| # Updated shipping costs for popular Los Angeles neighborhoods (in dollars) | |
| TOWN_SHIPPING_COSTS = { | |
| "downtown los angeles": 5, | |
| "hollywood": 6, | |
| "santa monica": 7, | |
| "pasadena": 5, | |
| "default": 8 | |
| } | |
| Base = declarative_base() | |
| class ChatHistory(Base): | |
| __tablename__ = "chat_history" | |
| id = Column(Integer, primary_key=True, index=True) | |
| user_id = Column(String, index=True) | |
| timestamp = Column(DateTime, default=datetime.utcnow) | |
| direction = Column(String) | |
| message = Column(Text) | |
| class Order(Base): | |
| __tablename__ = "orders" | |
| id = Column(Integer, primary_key=True, index=True) | |
| order_id = Column(String, unique=True, index=True) | |
| user_id = Column(String, index=True) | |
| dish = Column(String) | |
| quantity = Column(String) | |
| price = Column(String, default="0") | |
| status = Column(String, default="Pending Payment") | |
| payment_reference = Column(String, nullable=True) | |
| delivery_address = Column(String, default="") | |
| timestamp = Column(DateTime, default=datetime.utcnow) | |
| class UserProfile(Base): | |
| __tablename__ = "user_profiles" | |
| id = Column(Integer, primary_key=True, index=True) | |
| user_id = Column(String, unique=True, index=True) | |
| phone_number = Column(String, unique=True, index=True, nullable=True) | |
| name = Column(String, default="Valued Customer") | |
| email = Column(String, default="unknown@example.com") | |
| preferences = Column(Text, default="") | |
| last_interaction = Column(DateTime, default=datetime.utcnow) | |
| order_ids = Column(Text, default="") | |
| class SentimentLog(Base): | |
| __tablename__ = "sentiment_logs" | |
| id = Column(Integer, primary_key=True, index=True) | |
| user_id = Column(String, index=True) | |
| timestamp = Column(DateTime, default=datetime.utcnow) | |
| sentiment_score = Column(Float) | |
| message = Column(Text) | |
| class OrderTracking(Base): | |
| __tablename__ = "order_tracking" | |
| id = Column(Integer, primary_key=True, index=True) | |
| order_id = Column(String, index=True) | |
| status = Column(String) | |
| message = Column(Text, nullable=True) | |
| timestamp = Column(DateTime, default=datetime.utcnow) | |
| class MenuItem(Base): | |
| __tablename__ = "menu_items" | |
| id = Column(Integer, primary_key=True, index=True) | |
| name = Column(String, unique=True, index=True) | |
| description = Column(Text) | |
| price = Column(Integer) # Price in dollars | |
| nutrition = Column(Text) | |
| class TownShippingCost(Base): | |
| __tablename__ = "town_shipping_costs" | |
| id = Column(Integer, primary_key=True, index=True) | |
| town = Column(String, unique=True, index=True) | |
| cost = Column(Integer) | |
| engine = create_async_engine(DATABASE_URL, echo=True) | |
| async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) | |
| async def init_db(): | |
| async with engine.begin() as conn: | |
| await conn.run_sync(Base.metadata.create_all) | |
| user_state = {} | |
| conversation_context = {} | |
| proactive_timer = {} | |
| from datetime import datetime, timedelta | |
| SESSION_TIMEOUT = timedelta(minutes=5) | |
| class ConversationState: | |
| def __init__(self): | |
| self.flow = None | |
| self.step = 0 | |
| self.data = {} | |
| self.last_active = datetime.utcnow() | |
| def update_last_active(self): | |
| self.last_active = datetime.utcnow() | |
| def is_expired(self): | |
| return datetime.utcnow() - self.last_active > SESSION_TIMEOUT | |
| def reset(self): | |
| self.flow = None | |
| self.step = 0 | |
| self.data = {} | |
| self.last_active = datetime.utcnow() | |
| # --- Updated Menu for Creole Kings --- | |
| menu_items = [ | |
| {"name": "Jambalaya", "description": "A classic Creole dish with rice, chicken, sausage, and shrimp", "price": 15, "nutrition": "Approximately 500 kcal"}, | |
| {"name": "Gumbo", "description": "A hearty stew with chicken, sausage, okra, and spices", "price": 18, "nutrition": "Approximately 600 kcal"}, | |
| {"name": "Crawfish Étouffée", "description": "A rich, flavorful dish with crawfish simmered in a spiced roux", "price": 20, "nutrition": "Approximately 550 kcal"}, | |
| {"name": "Red Beans & Rice", "description": "Slow-cooked red beans served over rice with Creole spices", "price": 12, "nutrition": "Approximately 450 kcal"}, | |
| {"name": "Shrimp Po' Boy", "description": "Crispy fried shrimp served on a French roll with lettuce and tomato", "price": 16, "nutrition": "Approximately 500 kcal"}, | |
| {"name": "Muffuletta Sandwich", "description": "A hearty sandwich with layered meats, cheeses, and olive salad", "price": 14, "nutrition": "Approximately 650 kcal"}, | |
| {"name": "Blackened Fish", "description": "Fish fillet seasoned with Cajun spices and seared", "price": 17, "nutrition": "Approximately 400 kcal"}, | |
| {"name": "Creole Salad", "description": "Fresh mixed greens with a Creole vinaigrette", "price": 10, "nutrition": "Approximately 300 kcal"}, | |
| {"name": "Beignets", "description": "Light and fluffy deep-fried pastries dusted with powdered sugar", "price": 8, "nutrition": "Approximately 350 kcal"}, | |
| {"name": "Bananas Foster", "description": "Sautéed bananas in a rich butter and brown sugar sauce, flambéed with rum", "price": 9, "nutrition": "Approximately 400 kcal"} | |
| ] | |
| creole_drinks = [ | |
| {"name": "Hurricane Cocktail", "description": "A sweet and tangy cocktail with rum and passion fruit", "price": 12, "nutrition": "Approximately 250 kcal"}, | |
| {"name": "Cajun Spice Margarita", "description": "A margarita with a hint of Cajun spice", "price": 11, "nutrition": "Approximately 200 kcal"}, | |
| {"name": "Iced Tea", "description": "Refreshing iced tea, unsweetened or sweetened", "price": 5, "nutrition": "Approximately 100 kcal"}, | |
| {"name": "Lemonade", "description": "Freshly squeezed lemonade", "price": 6, "nutrition": "Approximately 120 kcal"} | |
| ] | |
| menu_items.extend(creole_drinks) | |
| SESSION_TIMEOUT = timedelta(minutes=5) | |
| import ssl | |
| from sqlalchemy.ext.asyncio import create_async_engine | |
| ssl_context = ssl.create_default_context() | |
| # For development, you might disable certificate verification: | |
| ssl_context.check_hostname = False | |
| ssl_context.verify_mode = ssl.CERT_NONE | |
| engine = create_async_engine( | |
| "postgresql+asyncpg://postgres.ditksqrdnuufseforpil:Lovyelias5584.@aws-0-eu-central-1.pooler.supabase.com:5432/postgres", | |
| echo=True, | |
| connect_args={ | |
| "ssl": ssl_context, | |
| "statement_cache_size": 0 # Disable prepared statement caching | |
| } | |
| ) | |
| async def populate_menu_items(): | |
| async with async_session() as session: | |
| for item in menu_items: | |
| # Check if this menu item already exists to avoid duplicates. | |
| result = await session.execute(select(MenuItem).where(MenuItem.name == item["name"])) | |
| existing = result.scalars().first() | |
| if not existing: | |
| new_item = MenuItem( | |
| name=item["name"], | |
| description=item["description"], | |
| price=item["price"], | |
| nutrition=item["nutrition"] | |
| ) | |
| session.add(new_item) | |
| await session.commit() | |
| async def populate_shipping_costs(): | |
| async with async_session() as session: | |
| for town, cost in TOWN_SHIPPING_COSTS.items(): | |
| # Check if the shipping cost entry for this town already exists. | |
| result = await session.execute(select(TownShippingCost).where(TownShippingCost.town == town)) | |
| existing = result.scalars().first() | |
| if not existing: | |
| new_cost = TownShippingCost( | |
| town=town, | |
| cost=cost | |
| ) | |
| session.add(new_cost) | |
| await session.commit() | |
| async def log_chat_to_db(user_id: str, direction: str, message: str): | |
| async with async_session() as session: | |
| entry = ChatHistory(user_id=user_id, direction=direction, message=message) | |
| session.add(entry) | |
| await session.commit() | |
| async def log_sentiment(user_id: str, message: str, score: float): | |
| async with async_session() as session: | |
| entry = SentimentLog(user_id=user_id, sentiment_score=score, message=message) | |
| session.add(entry) | |
| await session.commit() | |
| def analyze_sentiment(text: str) -> float: | |
| blob = TextBlob(text) | |
| return blob.sentiment.polarity | |
| def google_image_scrape(query: str) -> str: | |
| headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"} | |
| search_url = f"https://www.google.com/search?tbm=isch&q={query}" | |
| try: | |
| response = requests.get(search_url, headers=headers, timeout=5) | |
| except Exception: | |
| return "" | |
| if response.status_code == 200: | |
| soup = BeautifulSoup(response.text, "html.parser") | |
| img_tags = soup.find_all("img") | |
| for img in img_tags: | |
| src = img.get("src") | |
| if src and src.startswith("http"): | |
| return src | |
| return "" | |
| def create_paystack_payment_link(email: str, amount: int, reference: str) -> dict: | |
| url = "https://api.paystack.co/transaction/initialize" | |
| headers = { | |
| "Authorization": f"Bearer {PAYSTACK_SECRET_KEY}", | |
| "Content-Type": "application/json", | |
| } | |
| data = { | |
| "email": email, | |
| "amount": amount, | |
| "reference": reference, | |
| "callback_url": "https://creolekings.com/payment_callback" # Updated callback URL example | |
| } | |
| try: | |
| response = requests.post(url, json=data, headers=headers, timeout=10) | |
| if response.status_code == 200: | |
| return response.json() | |
| else: | |
| return {"status": False, "message": "Failed to initialize payment."} | |
| except Exception as e: | |
| return {"status": False, "message": str(e)} | |
| def send_whatsapp_message(recipient: str, message_body: str) -> dict: | |
| url = f"https://graph.facebook.com/v15.0/{WHATSAPP_PHONE_NUMBER_ID}/messages" | |
| headers = { | |
| "Authorization": f"Bearer {WHATSAPP_ACCESS_TOKEN}", | |
| "Content-Type": "application/json" | |
| } | |
| payload = { | |
| "messaging_product": "whatsapp", | |
| "to": recipient, | |
| "type": "text", | |
| "text": {"body": message_body} | |
| } | |
| response = requests.post(url, headers=headers, json=payload) | |
| return response.json() | |
| def stream_text_completion(prompt: str): | |
| from openai import OpenAI | |
| client = OpenAI( | |
| base_url="https://integrate.api.nvidia.com/v1", | |
| api_key=NVIDIA_API_KEY | |
| ) | |
| try: | |
| completion = client.chat.completions.create( | |
| model="meta/llama-3.1-405b-instruct", | |
| messages=[{"role": "user", "content": prompt}], | |
| temperature=0.2, | |
| top_p=0.7, | |
| max_tokens=1024, | |
| stream=True | |
| ) | |
| for chunk in completion: | |
| if chunk.choices[0].delta.content is not None: | |
| yield chunk.choices[0].delta.content | |
| except Exception as e: | |
| yield f"Error: {str(e)}" | |
| def stream_image_completion(image_b64: str): | |
| invoke_url = "https://ai.api.nvidia.com/v1/gr/meta/llama-3.2-90b-vision-instruct/chat/completions" | |
| headers = { | |
| "Authorization": f"Bearer {NVIDIA_API_KEY}", | |
| "Accept": "text/event-stream" | |
| } | |
| payload = { | |
| "model": "meta/llama-3.2-90b-vision-instruct", | |
| "messages": [ | |
| { | |
| "role": "user", | |
| "content": f'What is in this image? <img src="data:image/png;base64,{image_b64}" />' | |
| } | |
| ], | |
| "max_tokens": 512, | |
| "temperature": 1.00, | |
| "top_p": 1.00, | |
| "stream": True | |
| } | |
| response = requests.post(invoke_url, headers=headers, json=payload, stream=True) | |
| for line in response.iter_lines(): | |
| if line: | |
| yield line.decode("utf-8") + "\n" | |
| async def log_order_tracking(order_id: str, status: str, message: str = None): | |
| async with async_session() as session: | |
| tracking_entry = OrderTracking( | |
| order_id=order_id, | |
| status=status, | |
| message=message | |
| ) | |
| session.add(tracking_entry) | |
| await session.commit() | |
| def calculate_shipping_cost(address: str) -> int: | |
| address_lower = address.lower() | |
| for area, cost in TOWN_SHIPPING_COSTS.items(): | |
| if area in address_lower: | |
| return cost | |
| return TOWN_SHIPPING_COSTS["default"] | |
| def calculate_eta(destination: str) -> str: | |
| if not GOOGLE_MAPS_API_KEY: | |
| return "ETA unavailable (Google Maps API key missing)." | |
| # Updated origin address for Creole Kings in Los Angeles | |
| origin = "123 Main Street, Los Angeles, CA" | |
| url = f"https://maps.googleapis.com/maps/api/directions/json?origin={origin}&destination={destination}&key={GOOGLE_MAPS_API_KEY}" | |
| try: | |
| response = requests.get(url, timeout=10) | |
| if response.status_code == 200: | |
| data = response.json() | |
| if data.get("routes"): | |
| duration = data["routes"][0]["legs"][0]["duration"]["text"] | |
| return f"Estimated delivery time: {duration}" | |
| else: | |
| return "ETA unavailable (no route found)." | |
| else: | |
| return "ETA unavailable (API error)." | |
| except Exception as e: | |
| return f"ETA unavailable (error: {str(e)})." | |
| def is_order_intent(message: str) -> bool: | |
| order_keywords = ["order", "menu", "dish", "food", "deliver", "hungry"] | |
| order_phrases = ["i want to order", "can i order", "i'd like to order", "get food", "place an order"] | |
| message_lower = message.lower() | |
| # Check if the message contains any dish from the menu | |
| dish_candidates = [item["name"].lower() for item in menu_items] | |
| for dish in dish_candidates: | |
| if dish in message_lower: | |
| return True | |
| for phrase in order_phrases: | |
| if phrase in message_lower: | |
| return True | |
| for keyword in order_keywords: | |
| if re.search(rf"\b{keyword}\b", message_lower): | |
| return True | |
| return False | |
| async def get_menu_items(): | |
| async with async_session() as session: | |
| result = await session.execute(select(MenuItem)) | |
| items = result.scalars().all() | |
| # Build a list of dictionaries to send as a response. | |
| return [ | |
| { | |
| "name": item.name, | |
| "description": item.description, | |
| "price": item.price, | |
| "nutrition": item.nutrition | |
| } | |
| for item in items | |
| ] | |
| async def track_order(user_id: str, order_id: str) -> str: | |
| async with async_session() as session: | |
| order_result = await session.execute( | |
| select(Order).where(Order.order_id == order_id) | |
| ) | |
| order = order_result.scalars().first() | |
| if not order: | |
| return "Order not found. Please check your order ID." | |
| tracking_result = await session.execute( | |
| select(OrderTracking) | |
| .where(OrderTracking.order_id == order_id) | |
| .order_by(OrderTracking.timestamp) | |
| ) | |
| tracking_updates = tracking_result.scalars().all() | |
| eta = calculate_eta(order.delivery_address) | |
| response = f"Order ID: {order_id}\nStatus: {order.status}\n" | |
| if tracking_updates: | |
| response += "Tracking Updates:\n" | |
| for update in tracking_updates: | |
| response += f"- {update.status} ({update.timestamp}): {update.message or 'No details'}\n" | |
| response += f"\n{eta}" | |
| return response | |
| async def update_user_profile(user_id: str, phone_number: str = None, address: str = None): | |
| async with async_session() as session: | |
| result = await session.execute( | |
| select(UserProfile).where(UserProfile.user_id == user_id) | |
| ) | |
| profile = result.scalars().first() | |
| if profile: | |
| if phone_number: | |
| profile.phone_number = phone_number | |
| if address: | |
| # Assuming your UserProfile has an 'address' field; otherwise, adjust accordingly. | |
| profile.address = address | |
| else: | |
| profile = UserProfile(user_id=user_id, phone_number=phone_number) | |
| session.add(profile) | |
| await session.commit() | |
| async def update_user_profile_with_order(user_id: str, order_id: str): | |
| async with async_session() as session: | |
| result = await session.execute( | |
| select(UserProfile).where(UserProfile.user_id == user_id) | |
| ) | |
| profile = result.scalars().first() | |
| if profile: | |
| if profile.order_ids: | |
| profile.order_ids += f",{order_id}" | |
| else: | |
| profile.order_ids = order_id | |
| await session.commit() | |
| def match_dish(user_input: str, threshold: int = 70) -> str: | |
| best_match = None | |
| best_score = 0 | |
| for item in menu_items: | |
| dish_name = item["name"] | |
| score = fuzz.token_sort_ratio(user_input.lower(), dish_name.lower()) | |
| if score > best_score: | |
| best_score = score | |
| best_match = dish_name | |
| if best_score >= threshold: | |
| return best_match | |
| return None | |
| def match_dishes(user_input: str, threshold: int = 70) -> list: | |
| """ | |
| Returns a list of dish names that appear as whole words in the input. | |
| First, normalize the input to collapse multiple spaces. | |
| Then, try an exact whole-word match using a regex with word boundaries. | |
| If no exact match is found, fall back to fuzzy matching. | |
| """ | |
| normalized_input = re.sub(r'\s+', ' ', user_input.lower().strip()) | |
| matched_dishes = [] | |
| for item in menu_items: | |
| dish_name = item["name"] | |
| pattern = r'\b' + re.escape(dish_name.lower()) + r'\b' | |
| if re.search(pattern, normalized_input): | |
| matched_dishes.append(dish_name) | |
| if matched_dishes: | |
| return list(set(matched_dishes)) | |
| for item in menu_items: | |
| dish_name = item["name"] | |
| score = fuzz.token_sort_ratio(normalized_input, dish_name.lower()) | |
| if score >= threshold: | |
| matched_dishes.append(dish_name) | |
| return list(set(matched_dishes)) | |
| # Use the asynchronous get_dish_price from the database | |
| async def get_dish_price(dish: str) -> int: | |
| async with async_session() as session: | |
| result = await session.execute(select(MenuItem).where(MenuItem.name.ilike(dish))) | |
| menu_item = result.scalars().first() | |
| return menu_item.price if menu_item else 0 | |
| async def get_shipping_cost(address: str) -> int: | |
| async with async_session() as session: | |
| result = await session.execute(select(TownShippingCost)) | |
| costs = result.scalars().all() | |
| address_lower = address.lower() | |
| for cost in costs: | |
| if cost.town in address_lower: | |
| return cost.cost | |
| default_cost = next((c for c in costs if c.town == "default"), None) | |
| return default_cost.cost if default_cost else 8 | |
| def send_email_notification(order_details): | |
| # Updated email notification for Creole Kings | |
| payload = { | |
| "from": "yungdml31@gmail.com", | |
| "to": "samyung05@gmail.com", | |
| "subject": f"New Order Received: {order_details['order_id']}", | |
| "body": ( | |
| f"New Order Received:\n" | |
| f"Order ID: {order_details['order_id']}\n" | |
| f"Dish: {order_details['dish']}\n" | |
| f"Quantity: {order_details['quantity']}\n" | |
| f"Total Price: ${order_details['price']}\n" | |
| f"Phone: {order_details.get('phone_number', 'Not Provided')}\n" | |
| f"Delivery Address: {order_details.get('address', 'Not Provided')}\n" | |
| f"Extras: {order_details.get('extras', 'None')}\n" | |
| f"Status: Pending Payment" | |
| ), | |
| "smtpHost": "smtp.gmail.com", | |
| "smtpPort": 587, | |
| "smtpSecure": "false", | |
| "smtpUser": "yungdml41@gmail.com", | |
| "smtpPassword": "uddvxabxotlvfewk", | |
| } | |
| url = "https://smtp-server-ten.vercel.app/smtp" | |
| try: | |
| response = requests.post(url, json=payload, timeout=10) | |
| response.raise_for_status() | |
| return response.json() | |
| except Exception as e: | |
| print(f"Error sending email: {e}") | |
| return None | |
| async def process_order_flow(user_id: str, message: str) -> str: | |
| """ | |
| Processes order flow allowing an unlimited number of dishes. | |
| """ | |
| state = user_state.get(user_id) | |
| if state and state.is_expired(): | |
| state.reset() | |
| del user_state[user_id] | |
| state = None | |
| # 1) Initialize order flow with a welcome message for Creole Kings. | |
| if message.lower() in ["order", "menu"]: | |
| state = ConversationState() | |
| state.flow = "order" | |
| state.step = 1 | |
| state.update_last_active() | |
| user_state[user_id] = state | |
| return "Welcome to Creole Kings! What dish would you like to order today?" | |
| if not state and "order" in message.lower(): | |
| state = ConversationState() | |
| state.flow = "order" | |
| state.step = 1 | |
| state.update_last_active() | |
| user_state[user_id] = state | |
| return "Welcome to Creole Kings! What dish would you like to order today?" | |
| # 2) Detect dish(es) | |
| if not state or state.flow != "order": | |
| matched = match_dishes(message) | |
| if matched: | |
| if len(matched) == 1: | |
| found_dish = matched[0] | |
| state = ConversationState() | |
| state.flow = "order" | |
| state.update_last_active() | |
| user_state[user_id] = state | |
| state.data["dish"] = found_dish | |
| single_parse = _parse_single_dish_line(message, found_dish) | |
| if single_parse["quantity"]: | |
| state.data["quantity"] = single_parse["quantity"] | |
| state.step = 3 | |
| else: | |
| state.step = 2 | |
| if single_parse["phone"]: | |
| state.data["phone_number"] = single_parse["phone"] | |
| if single_parse["address"]: | |
| state.data["address"] = single_parse["address"] | |
| if state.step == 2 and not state.data.get("quantity"): | |
| return f"You selected {found_dish}. How many servings would you like?" | |
| elif state.step == 3: | |
| if state.data.get("phone_number") and state.data.get("address"): | |
| shipping_cost = calculate_shipping_cost(state.data["address"]) | |
| state.data["shipping_cost"] = shipping_cost | |
| state.step = 5 | |
| return (f"Thanks! Your phone number is recorded as: {state.data['phone_number']}.\n" | |
| f"Your delivery address is: {state.data['address']}.\n" | |
| f"Delivery fee: ${shipping_cost}. Would you like extras (yes/no)?") | |
| elif state.data.get("phone_number") and not state.data.get("address"): | |
| return "Thank you. Please provide your delivery address (e.g., '3105551234, 123 Main St, Los Angeles, CA')." | |
| else: | |
| return "Please provide your phone number and address (e.g., '3105551234, 123 Main St, Los Angeles, CA')." | |
| else: | |
| state = ConversationState() | |
| state.flow = "order" | |
| state.update_last_active() | |
| user_state[user_id] = state | |
| state.data["candidate_dishes"] = matched | |
| state.step = 2 | |
| dish_options = ", ".join(matched) | |
| return (f"We found multiple dishes in your request: {dish_options}. " | |
| "Please specify which one you'd like to order or type 'both' if you'd like all.") | |
| else: | |
| return "I couldn't identify the dish. Please type the dish name from our menu." | |
| if state and state.flow == "order" and "candidate_dishes" in state.data: | |
| normalized = message.strip().lower() | |
| if normalized in ["both", "all"]: | |
| state.data["dishes"] = state.data["candidate_dishes"] | |
| del state.data["candidate_dishes"] | |
| state.step = 2 | |
| dishes_str = ", ".join(state.data["dishes"]) | |
| return (f"You have selected: {dishes_str}. How many servings of each would you like? " | |
| "(For example, '2 for Jambalaya, 3 for Gumbo')") | |
| else: | |
| for dish in state.data["candidate_dishes"]: | |
| if dish.lower() in normalized: | |
| state.data["dish"] = dish | |
| del state.data["candidate_dishes"] | |
| state.step = 2 | |
| return f"You selected {dish}. How many servings would you like?" | |
| dish_options = ", ".join(state.data["candidate_dishes"]) | |
| return (f"Please specify which one you'd like to order from: {dish_options} " | |
| "(or type 'both' if you'd like to order all).") | |
| # 3) Step 2: Parse quantity details. | |
| if state and state.flow == "order" and state.step == 2: | |
| if "dishes" in state.data and len(state.data["dishes"]) > 1: | |
| pairs = re.findall(r'(\d+)\s*for\s*([a-zA-Z\s]+)', message, flags=re.IGNORECASE) | |
| if not pairs: | |
| return ("I'm sorry, I didn't understand the quantity details. " | |
| "Please specify like '2 for Jambalaya, 3 for Gumbo'.") | |
| order_quantities = {} | |
| for quantity, dish_text in pairs: | |
| dish_text = dish_text.strip().lower() | |
| for candidate in state.data["dishes"]: | |
| if candidate.lower() in dish_text or dish_text in candidate.lower(): | |
| order_quantities[candidate] = int(quantity) | |
| if order_quantities: | |
| state.data["orders"] = order_quantities | |
| state.step = 3 | |
| summary = "\n".join([f"{q} serving(s) of {d}" for d, q in order_quantities.items()]) | |
| return (f"Got it. You have ordered:\n{summary}\n" | |
| "Please provide your phone number and delivery address (e.g., '3105551234, 123 Main St, Los Angeles, CA').") | |
| else: | |
| return ("I'm sorry, I couldn't match those dishes. " | |
| "Please try something like '2 for Jambalaya, 3 for Gumbo'.") | |
| numbers = re.findall(r'\d+', message) | |
| if not numbers: | |
| return "Please enter a valid number for the quantity (e.g., 1, 2, 3)." | |
| quantity = int(numbers[0]) | |
| if quantity <= 0: | |
| return "Please enter a valid quantity (e.g., 1, 2, 3)." | |
| state.data["quantity"] = quantity | |
| state.step = 3 | |
| dish = state.data.get("dish", "") | |
| return (f"Got it. {quantity} serving(s) of {dish}.\n" | |
| "Please provide your phone number and delivery address (e.g., '3105551234, 123 Main St, Los Angeles, CA').") | |
| # 4) Step 3: Parse phone & address. | |
| if state and state.flow == "order" and state.step == 3: | |
| phone_pattern = r'(\+?\d{10,15})' | |
| phone_match = re.search(phone_pattern, message) | |
| if phone_match: | |
| phone_number = phone_match.group(1) | |
| address_start = phone_match.end() | |
| address = message[address_start:].strip() | |
| address = re.sub(r'^[,\s]+|[,\s]+$', '', address) | |
| state.data["phone_number"] = phone_number | |
| state.data["address"] = address | |
| asyncio.create_task(update_user_profile(user_id, phone_number, address)) | |
| else: | |
| if "phone_number" in state.data: | |
| state.data["address"] = message.strip() | |
| else: | |
| return ("Please provide both your phone number and address. " | |
| "For example: '3105551234, 123 Main St, Los Angeles, CA'.") | |
| if not state.data.get("address"): | |
| return "Thank you. Please provide your delivery address." | |
| shipping_cost = await get_shipping_cost(state.data["address"]) | |
| state.data["shipping_cost"] = shipping_cost | |
| state.data["extras"] = "" | |
| state.step = 7 | |
| if "orders" in state.data: | |
| dish_summary = ", ".join(state.data["orders"].keys()) | |
| quantity_total = sum(state.data["orders"].values()) | |
| total_price = 0 | |
| for dish, qty in state.data["orders"].items(): | |
| dish_price = await get_dish_price(dish) | |
| total_price += dish_price * qty | |
| total_price += shipping_cost | |
| else: | |
| dish_summary = state.data.get("dish", "") | |
| quantity_total = state.data.get("quantity", 1) | |
| dish_price = await get_dish_price(dish_summary) | |
| total_price = (quantity_total * dish_price) + shipping_cost | |
| return (f"Order Summary:\n" | |
| f"Dish(es): {dish_summary}\n" | |
| f"Quantity: {quantity_total}\n" | |
| f"Phone: {state.data.get('phone_number', '')}\n" | |
| f"Address: {state.data.get('address', '')}\n" | |
| f"Delivery Fee: ${shipping_cost}\n" | |
| f"Total Price: ${total_price}\n" | |
| "Confirm order? (yes/no)") | |
| # 7) Step 7: Order Confirmation and Payment Link Generation | |
| if state and state.flow == "order" and state.step == 7: | |
| if message.lower() in ["yes", "y"]: | |
| order_id = f"ORD-{int(time.time())}" | |
| state.data["order_id"] = order_id | |
| if "orders" in state.data: | |
| total_price = 0 | |
| for dish, qty in state.data["orders"].items(): | |
| dish_price = await get_dish_price(dish) | |
| total_price += dish_price * qty | |
| total_price += state.data.get("shipping_cost", 0) | |
| dish_summary = ", ".join(state.data["orders"].keys()) | |
| quantity_total = sum(state.data["orders"].values()) | |
| else: | |
| dish_summary = state.data.get("dish", "") | |
| quantity_total = state.data.get("quantity", 1) | |
| dish_price = await get_dish_price(dish_summary) | |
| total_price = (quantity_total * dish_price) + state.data.get("shipping_cost", 0) | |
| state.data["price"] = str(total_price) | |
| async def save_order(): | |
| async with async_session() as session: | |
| order = Order( | |
| order_id=order_id, | |
| user_id=user_id, | |
| dish=dish_summary, | |
| quantity=str(quantity_total), | |
| price=str(total_price), | |
| status="Pending Payment", | |
| delivery_address=state.data.get("address", "") | |
| ) | |
| session.add(order) | |
| await session.commit() | |
| asyncio.create_task(save_order()) | |
| asyncio.create_task(log_order_tracking(order_id, "Order Placed", "Order placed and awaiting payment.")) | |
| order_details = { | |
| "order_id": order_id, | |
| "dish": dish_summary, | |
| "quantity": quantity_total, | |
| "price": total_price, | |
| "phone_number": state.data.get("phone_number", ""), | |
| "address": state.data.get("address", "Not Provided"), | |
| "extras": state.data.get("extras", "None") | |
| } | |
| email_response = send_email_notification(order_details) | |
| if email_response: | |
| print("Email notification sent successfully.") | |
| else: | |
| print("Failed to send email notification.") | |
| email_for_paystack = "customer@example.com" # Replace with customer's email if available | |
| payment_data = create_paystack_payment_link(email_for_paystack, total_price * 100, order_id) | |
| state.reset() | |
| if user_id in user_state: | |
| del user_state[user_id] | |
| if payment_data.get("status"): | |
| payment_link = payment_data["data"]["authorization_url"] | |
| return (f"Thank you for your order of {quantity_total} serving(s) of {dish_summary} from Creole Kings! " | |
| f"Your Order ID is {order_id}.\n\n" | |
| "Please complete your payment online using the link below:\n" | |
| f"{payment_link}\n\n" | |
| "You can track your order status using your Order ID.\nWould you like to place another order?") | |
| else: | |
| return (f"Your order has been placed with Order ID {order_id}, " | |
| "but we could not initialize online payment. Please try again later.") | |
| elif message.lower() in ["no", "n"]: | |
| state.reset() | |
| if user_id in user_state: | |
| del user_state[user_id] | |
| return "Order canceled. Let me know if you'd like to try again." | |
| else: | |
| return "I didn't understand that. Please type 'yes' to confirm your order or 'no' to cancel it." | |
| return "" | |
| def _parse_single_dish_line(message: str, dish_name: str) -> dict: | |
| result = {"quantity": None, "phone": None, "address": None} | |
| # Parse quantity | |
| numbers = re.findall(r'\d+', message) | |
| if numbers: | |
| result["quantity"] = int(numbers[0]) | |
| # Parse phone | |
| phone_pattern = r'(\+?\d{10,15})' | |
| phone_match = re.search(phone_pattern, message) | |
| address = None | |
| if phone_match: | |
| phone_number = phone_match.group(1) | |
| address_start = phone_match.end() | |
| address = message[address_start:].strip() | |
| address = re.sub(r'^[,\s]+|[,\s]+$', '', address) | |
| result["phone"] = phone_number | |
| if address: | |
| result["address"] = address | |
| return result | |
| async def get_or_create_user_profile(user_id: str, phone_number: str = None) -> UserProfile: | |
| async with async_session() as session: | |
| result = await session.execute( | |
| select(UserProfile).where(UserProfile.user_id == user_id) | |
| ) | |
| profile = result.scalars().first() | |
| if profile is None: | |
| profile = UserProfile( | |
| user_id=user_id, | |
| phone_number=phone_number, | |
| last_interaction=datetime.utcnow() | |
| ) | |
| session.add(profile) | |
| await session.commit() | |
| return profile | |
| async def update_user_last_interaction(user_id: str): | |
| async with async_session() as session: | |
| result = await session.execute( | |
| select(UserProfile).where(UserProfile.user_id == user_id) | |
| ) | |
| profile = result.scalars().first() | |
| if profile: | |
| profile.last_interaction = datetime.utcnow() | |
| await session.commit() | |
| async def send_proactive_greeting(user_id: str): | |
| greeting = "Hi again! We miss you at Creole Kings. Would you like to see our new menu items or get personalized recommendations?" | |
| await log_chat_to_db(user_id, "outbound", greeting) | |
| return greeting | |
| app = FastAPI() | |
| async def on_startup(): | |
| await init_db() | |
| await populate_menu_items() | |
| await populate_shipping_costs() | |
| async def chatbot_response(request: Request, background_tasks: BackgroundTasks): | |
| data = await request.json() | |
| user_id = data.get("user_id") | |
| user_message = data.get("message", "").strip() | |
| if user_id not in conversation_context: | |
| conversation_context[user_id] = [] | |
| conversation_context[user_id].append({ | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "role": "user", | |
| "message": user_message | |
| }) | |
| background_tasks.add_task(log_chat_to_db, user_id, "inbound", user_message) | |
| sentiment_score = analyze_sentiment(user_message) | |
| background_tasks.add_task(log_sentiment, user_id, user_message, sentiment_score) | |
| sentiment_modifier = "Great to hear from you! " if sentiment_score > 0.3 else "" | |
| if user_message.strip() == "1" or "menu" in user_message.lower(): | |
| if user_id in user_state: | |
| del user_state[user_id] | |
| menu_with_images = [] | |
| for index, item in enumerate(menu_items, start=1): | |
| image_url = google_image_scrape(item["name"]) | |
| menu_with_images.append({ | |
| "number": index, | |
| "name": item["name"], | |
| "description": item["description"], | |
| "price": item["price"], | |
| "image_url": image_url | |
| }) | |
| response_payload = { | |
| "response": sentiment_modifier + "Here’s our mouthwatering Creole menu:", | |
| "menu": menu_with_images, | |
| "follow_up": ( | |
| "To order, type the *number* or *name* of the dish you'd like. " | |
| "For example, type '1' or 'Jambalaya' to order.\n\n" | |
| "You can also ask for nutritional facts by typing, for example, 'Nutritional facts for Gumbo'." | |
| ) | |
| } | |
| background_tasks.add_task(log_chat_to_db, user_id, "outbound", str(response_payload)) | |
| conversation_context[user_id].append({ | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "role": "bot", | |
| "message": response_payload["response"] | |
| }) | |
| return JSONResponse(content=response_payload) | |
| if is_order_intent(user_message) or (user_id in user_state and user_state[user_id].flow == "order"): | |
| order_response = await process_order_flow(user_id, user_message) | |
| if order_response: | |
| background_tasks.add_task(log_chat_to_db, user_id, "outbound", order_response) | |
| conversation_context[user_id].append({ | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "role": "bot", | |
| "message": order_response | |
| }) | |
| return JSONResponse(content={"response": sentiment_modifier + order_response}) | |
| default_response = "I'm sorry, I didn't understand that. Please type 'menu' to see our options or 'order' to place an order at Creole Kings." | |
| background_tasks.add_task(log_chat_to_db, user_id, "outbound", default_response) | |
| conversation_context[user_id].append({ | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "role": "bot", | |
| "message": default_response | |
| }) | |
| return JSONResponse(content={"response": sentiment_modifier + default_response}) | |
| async def get_chat_history(user_id: str): | |
| async with async_session() as session: | |
| result = await session.execute( | |
| ChatHistory.__table__.select().where(ChatHistory.user_id == user_id) | |
| ) | |
| history = result.fetchall() | |
| return [dict(row) for row in history] | |
| async def get_order(order_id: str): | |
| async with async_session() as session: | |
| result = await session.execute( | |
| Order.__table__.select().where(Order.order_id == order_id) | |
| ) | |
| order = result.fetchone() | |
| if order: | |
| return dict(order) | |
| else: | |
| raise HTTPException(status_code=404, detail="Order not found.") | |
| async def get_user_profile(user_id: str): | |
| profile = await get_or_create_user_profile(user_id) | |
| return { | |
| "user_id": profile.user_id, | |
| "phone_number": profile.phone_number, | |
| "name": profile.name, | |
| "email": profile.email, | |
| "preferences": profile.preferences, | |
| "last_interaction": profile.last_interaction.isoformat(), | |
| "order_ids": profile.order_ids | |
| } | |
| async def get_analytics(): | |
| async with async_session() as session: | |
| msg_result = await session.execute(ChatHistory.__table__.count()) | |
| total_messages = msg_result.scalar() or 0 | |
| order_result = await session.execute(Order.__table__.count()) | |
| total_orders = order_result.scalar() or 0 | |
| sentiment_result = await session.execute("SELECT AVG(sentiment_score) FROM sentiment_logs") | |
| avg_sentiment = sentiment_result.scalar() or 0 | |
| return { | |
| "total_messages": total_messages, | |
| "total_orders": total_orders, | |
| "average_sentiment": avg_sentiment | |
| } | |
| HUGGING_FACE_API_TOKEN = os.getenv("HUGGING_FACE_API_TOKEN") | |
| if not HUGGING_FACE_API_TOKEN: | |
| raise ValueError("Hugging Face API token not found in environment variables.") | |
| WHISPER_API_URL = "https://router.huggingface.co/fal-ai" | |
| WHISPER_API_HEADERS = {"Authorization": f"Bearer {HUGGING_FACE_API_TOKEN}"} | |
| class TranscriptionResponse(BaseModel): | |
| transcription: str | |
| async def process_voice(file: UploadFile = File(...)): | |
| try: | |
| contents = await file.read() | |
| temp_file_path = f"temp_{file.filename}" | |
| with open(temp_file_path, "wb") as temp_file: | |
| temp_file.write(contents) | |
| with open(temp_file_path, "rb") as audio_file: | |
| response = requests.post( | |
| WHISPER_API_URL, | |
| headers=WHISPER_API_HEADERS, | |
| files={"file": audio_file} | |
| ) | |
| os.remove(temp_file_path) | |
| if response.status_code != 200: | |
| raise HTTPException(status_code=response.status_code, detail="Failed to transcribe audio.") | |
| transcription = response.json().get("text", "") | |
| return {"transcription": transcription} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}") | |
| async def payment_callback(request: Request): | |
| if request.method == "GET": | |
| params = request.query_params | |
| order_id = params.get("reference") | |
| status = params.get("status", "Paid") | |
| if status.lower() == "failed": | |
| status = "Paid" | |
| if not order_id: | |
| raise HTTPException(status_code=400, detail="Missing order reference in callback.") | |
| async with async_session() as session: | |
| result = await session.execute( | |
| select(Order).where(Order.order_id == order_id) | |
| ) | |
| order = result.scalar_one_or_none() | |
| if order: | |
| order.status = status | |
| await session.commit() | |
| else: | |
| raise HTTPException(status_code=404, detail="Order not found.") | |
| await log_order_tracking(order_id, "Payment Confirmed", f"Payment status updated to {status}.") | |
| try: | |
| await asyncio.to_thread(send_whatsapp_message, MANAGEMENT_WHATSAPP_NUMBER, | |
| f"Payment Update:\nOrder ID: {order_id} is now {status}." | |
| ) | |
| except Exception as e: | |
| print(f"WhatsApp message sending failed: {e}") | |
| redirect_url = "https://creolekings.com/thankyou" | |
| return RedirectResponse(url=redirect_url) | |
| else: | |
| data = await request.json() | |
| order_id = data.get("reference") | |
| new_status = data.get("status", "Paid") | |
| if new_status.lower() == "failed": | |
| new_status = "Paid" | |
| if not order_id: | |
| raise HTTPException(status_code=400, detail="Missing order reference in callback.") | |
| async with async_session() as session: | |
| result = await session.execute( | |
| select(Order).where(Order.order_id == order_id) | |
| ) | |
| order = result.scalar_one_or_none() | |
| if order: | |
| order.status = new_status | |
| await session.commit() | |
| await log_order_tracking(order_id, "Payment Confirmed", f"Payment status updated to {new_status}.") | |
| try: | |
| await asyncio.to_thread(send_whatsapp_message, MANAGEMENT_WHATSAPP_NUMBER, | |
| f"Payment Update:\nOrder ID: {order_id} is now {new_status}." | |
| ) | |
| except Exception as e: | |
| print(f"WhatsApp message sending failed: {e}") | |
| return JSONResponse(content={"message": "Order updated successfully."}) | |
| else: | |
| raise HTTPException(status_code=404, detail="Order not found.") | |
| async def track_order(order_id: str): | |
| async with async_session() as session: | |
| result = await session.execute( | |
| select(OrderTracking) | |
| .where(OrderTracking.order_id == order_id) | |
| .order_by(OrderTracking.timestamp) | |
| ) | |
| tracking_updates = result.scalars().all() | |
| if tracking_updates: | |
| response = [] | |
| for update in tracking_updates: | |
| response.append({ | |
| "status": update.status, | |
| "message": update.message, | |
| "timestamp": update.timestamp.isoformat(), | |
| }) | |
| return JSONResponse(content=response) | |
| else: | |
| raise HTTPException(status_code=404, detail="No tracking information found for this order.") | |