Spaces:
Paused
Paused
| import re | |
| import os | |
| import time | |
| import requests | |
| import base64 | |
| import asyncio | |
| from datetime import datetime, timedelta | |
| from typing import Dict, List, Optional | |
| from fastapi import FastAPI, Request, HTTPException, BackgroundTasks, UploadFile, File, Form | |
| from fastapi.responses import JSONResponse, StreamingResponse, RedirectResponse | |
| from fastapi.encoders import jsonable_encoder | |
| from sqlalchemy import select, update, func | |
| from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession | |
| from sqlalchemy.orm import sessionmaker, declarative_base | |
| from sqlalchemy import Column, Integer, String, DateTime, Text, Float, Boolean | |
| from sqlalchemy import JSON | |
| import logging | |
| import hmac | |
| import hashlib | |
| import json | |
| import uuid | |
| from sqlalchemy.exc import SQLAlchemyError | |
| from pydantic import BaseModel | |
| import os | |
| import random | |
| PAYSTACK_SECRET_KEY = os.getenv("PAYSTACK_SECRET_KEY", "your_default_secret_key") | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger("delivery_service") | |
| # -------------------- | |
| # DATABASE SETUP | |
| # -------------------- | |
| DATABASE_URL = os.getenv("DATABASE_URL", "sqlite+aiosqlite:///./test.db") | |
| engine = create_async_engine(DATABASE_URL, echo=True) | |
| async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) | |
| Base = declarative_base() | |
| # -------------------- | |
| # DATABASE MODELS | |
| # -------------------- | |
| class UserProfile(Base): | |
| __tablename__ = "user_profiles" | |
| id = Column(Integer, primary_key=True, index=True) | |
| user_id = Column(String(36), unique=True, index=True) | |
| phone_number = Column(String(20)) | |
| name = Column(String(100)) | |
| email = Column(String(100)) | |
| preferences = Column(JSON, default={"favorite_cuisines": [], "delivery_times": []}) | |
| last_interaction = Column(DateTime) | |
| ux_mode = Column(String(20), default="default") | |
| accessibility_prefs = Column(JSON, default={}) | |
| current_order_status = Column(String(50)) | |
| # Add other missing columns as needed | |
| class UXPreferences(Base): | |
| __tablename__ = "ux_preferences" | |
| user_id = Column(String(36), primary_key=True) | |
| interaction_speed = Column(String(20), default="normal") | |
| color_scheme = Column(String(20), default="light") | |
| input_preference = Column(String(20), default="buttons") | |
| class ChatHistory(Base): | |
| __tablename__ = "chat_history" | |
| id = Column(Integer, primary_key=True, index=True) | |
| user_id = Column(String(36), index=True) | |
| message = Column(Text) | |
| timestamp = Column(DateTime, default=datetime.utcnow) | |
| class Order(Base): | |
| __tablename__ = "orders" | |
| id = Column(Integer, primary_key=True, index=True) | |
| order_id = Column(String, unique=True, index=True) | |
| user_id = Column(String, index=True) | |
| dish = Column(String, nullable=True) | |
| quantity = Column(String, nullable=True) | |
| price = Column(String, default="0") | |
| status = Column(String, default="Pending Payment") | |
| payment_reference = Column(String, nullable=True) | |
| delivery_address = Column(String, default="") | |
| timestamp = Column(DateTime, default=datetime.utcnow) | |
| pickup_address = Column(String, nullable=True) # <-- Add this | |
| last_location = Column(String, nullable=True) # <-- Add this | |
| contact_number = Column(String, nullable=True) # <-- Add this | |
| class OrderTracking(Base): | |
| __tablename__ = "order_tracking" | |
| id = Column(Integer, primary_key=True, index=True) | |
| order_id = Column(String(36), index=True) | |
| status = Column(String(50)) | |
| message = Column(Text) | |
| timestamp = Column(DateTime, default=datetime.utcnow) | |
| location = Column(Text, nullable=True) # Stored as a JSON string | |
| class DeliveryOrderRequest(BaseModel): | |
| user_id: str | |
| package_description: str | |
| pickup_address: str | |
| delivery_address: str | |
| contact_number: str | |
| class TrackingUpdate(BaseModel): | |
| status: str | |
| message: str | |
| timestamp: str # using ISO formatted datetime strings | |
| location: Optional[Dict] = None | |
| estimated_delivery: Optional[str] = None | |
| # -------------------- | |
| # INITIALIZATION | |
| # -------------------- | |
| async def init_db(): | |
| async with engine.begin() as conn: | |
| await conn.run_sync(Base.metadata.create_all) | |
| logger.info("Database tables created.") | |
| # -------------------- | |
| # STATE MANAGEMENT | |
| # -------------------- | |
| class ConversationState: | |
| def __init__(self): | |
| self.flow: Optional[str] = None | |
| self.step: int = 0 | |
| self.data: Dict = {} | |
| self.context: List[Dict] = [] | |
| self.last_active: datetime = datetime.utcnow() | |
| def update_last_active(self): | |
| self.last_active = datetime.utcnow() | |
| def add_context(self, role: str, message: str): | |
| self.context.append({ | |
| "timestamp": datetime.utcnow(), | |
| "role": role, | |
| "message": message | |
| }) | |
| # Keep only the last 5 messages | |
| if len(self.context) > 5: | |
| self.context = self.context[-5:] | |
| # Global state dictionary | |
| user_state: Dict[str, ConversationState] = {} | |
| # -------------------- | |
| # CORE UTILITIES | |
| # -------------------- | |
| async def update_user_last_interaction(user_id: str): | |
| async with async_session() as session: | |
| await session.execute( | |
| update(UserProfile) | |
| .where(UserProfile.user_id == user_id) | |
| .values(last_interaction=datetime.utcnow()) | |
| ) | |
| await session.commit() | |
| async def get_or_create_user_profile(user_id: str, phone: str = None) -> UserProfile: | |
| async with async_session() as session: | |
| result = await session.execute( | |
| select(UserProfile).where(UserProfile.user_id == user_id) | |
| ) | |
| profile = result.scalar_one_or_none() | |
| if not profile: | |
| profile = UserProfile( | |
| user_id=user_id, | |
| phone_number=phone, | |
| last_interaction=datetime.utcnow() | |
| ) | |
| session.add(profile) | |
| await session.commit() | |
| await session.refresh(profile) | |
| return profile | |
| # -------------------- | |
| # ENHANCED UX FEATURES | |
| # -------------------- | |
| class DeliveryUXManager: | |
| async def generate_response_template(user_id: str) -> Dict: | |
| """Generate a personalized response structure.""" | |
| profile = await get_or_create_user_profile(user_id) | |
| return { | |
| "meta": { | |
| "ux_mode": profile.ux_mode, | |
| "color_scheme": "light", | |
| "interaction_mode": "text" | |
| }, | |
| "content": { | |
| "text": "", | |
| "quick_replies": [], | |
| "carousel": None, | |
| "status_overlay": None | |
| } | |
| } | |
| async def handle_ux_preferences(user_id: str, preference: str): | |
| """Update UX preferences.""" | |
| async with async_session() as session: | |
| prefs = await session.get(UXPreferences, user_id) | |
| if not prefs: | |
| prefs = UXPreferences(user_id=user_id) | |
| session.add(prefs) | |
| # Update specific preference based on input | |
| if preference.startswith("color_"): | |
| prefs.color_scheme = preference.split("_")[1] | |
| elif preference.startswith("speed_"): | |
| prefs.interaction_speed = preference.split("_")[1] | |
| await session.commit() | |
| # -------------------- | |
| # HELPER FUNCTIONS FOR PAYMENT CALLBACK & TRACKING | |
| # -------------------- | |
| async def log_order_tracking(order_id: str, status: str, message: str): | |
| async with async_session() as session: | |
| tracking_entry = OrderTracking( | |
| order_id=order_id, | |
| status=status, | |
| message=message | |
| ) | |
| session.add(tracking_entry) | |
| await session.commit() | |
| async def send_whatsapp_message(number: str, message: str): | |
| # Dummy implementation for sending WhatsApp messages | |
| logger.info(f"Sending WhatsApp message to {number}: {message}") | |
| async def update_user_order_status(user_id: str, order_id: str, status: str): | |
| async with async_session() as session: | |
| await session.execute( | |
| update(UserProfile) | |
| .where(UserProfile.user_id == user_id) | |
| .values(current_order_status=status) | |
| ) | |
| await session.commit() | |
| async def get_order_details(order_id: str) -> Optional[Order]: | |
| async with async_session() as session: | |
| result = await session.execute( | |
| select(Order).where(Order.order_id == order_id) | |
| ) | |
| return result.scalar_one_or_none() | |
| def send_email_notification(order_details): | |
| # Build the email payload based on the delivery request details. | |
| payload = { | |
| "from": "yungdml31@gmail.com", | |
| "to": "samyung05@gmail.com, angelofoodcourt@gmail.com", | |
| "subject": f"New Delivery Request Received: {order_details['order_id']}", | |
| "body": ( | |
| f"New Delivery Request Received:\n" | |
| f"Order ID: {order_details['order_id']}\n" | |
| f"Package Description: {order_details.get('package_description', 'Not Provided')}\n" | |
| f"Pickup Address: {order_details.get('pickup_address', 'Not Provided')}\n" | |
| f"Delivery Address: {order_details.get('delivery_address', 'Not Provided')}\n" | |
| f"Contact Number: {order_details.get('contact_number', 'Not Provided')}\n" | |
| f"Status: Pending Payment" | |
| ), | |
| "smtpHost": "smtp.gmail.com", | |
| "smtpPort": 587, | |
| "smtpSecure": "false", | |
| "smtpUser": "yungdml31@gmail.com", | |
| "smtpPassword": "uddvxabxotlvfewk", | |
| } | |
| url = "https://smtp-server-ten.vercel.app/smtp" # Adjust to your SMTP API endpoint. | |
| try: | |
| response = requests.post(url, json=payload, timeout=10) | |
| response.raise_for_status() | |
| return response.json() | |
| except Exception as e: | |
| print(f"Error sending email: {e}") | |
| return None | |
| def calculate_eta(last_location: dict) -> str: | |
| # Implement actual ETA calculation logic here. | |
| return "30 minutes" | |
| TOWN_SHIPPING_COSTS = { | |
| "lasu gate": 1000, | |
| "ojo": 800, | |
| "ajangbadi": 1200, | |
| "iba": 900, | |
| "okokomaiko": 1500, | |
| "default": 1000 # Default if location is not listed | |
| } | |
| # -------------------- | |
| # MAIN APPLICATION | |
| # -------------------- | |
| app = FastAPI(title="Delivery Service Chatbot") | |
| async def on_startup(): | |
| await init_db() | |
| logger = logging.getLogger(__name__) | |
| async def create_delivery_order(order_req: DeliveryOrderRequest, bg: BackgroundTasks): | |
| try: | |
| logger.info(f"Received order request: {order_req}") | |
| # Generate a unique order ID like "DEL-123456789" | |
| order_id = f"DEL-{random.randint(100000000, 999999999)}" | |
| logger.info(f"Generated order ID: {order_id}") | |
| # Extract delivery location and calculate shipping cost. | |
| delivery_address = order_req.delivery_address.lower() | |
| shipping_cost = TOWN_SHIPPING_COSTS.get("default", 1000) | |
| for town, cost in TOWN_SHIPPING_COSTS.items(): | |
| if town in delivery_address: | |
| shipping_cost = cost | |
| break | |
| logger.info(f"Calculated shipping cost: ₦{shipping_cost}") | |
| # Set item price (adjustable as needed) and calculate total. | |
| item_price = 5000 | |
| total_amount = item_price + shipping_cost | |
| total_amount_kobo = total_amount * 100 # Convert to kobo | |
| logger.info(f"Total order amount: ₦{total_amount}") | |
| # Get user email or default | |
| email = getattr(order_req, "email", "customer@example.com") | |
| # Generate Paystack payment link | |
| payment_data = create_paystack_payment_link(email, total_amount_kobo, order_id) | |
| logger.info(f"Generated payment data: {payment_data}") | |
| # Notify admin via email in the background | |
| email_details = { | |
| "order_id": order_id, | |
| "package_description": order_req.package_description, | |
| "pickup_address": order_req.pickup_address, | |
| "delivery_address": order_req.delivery_address, | |
| "contact_number": order_req.contact_number, | |
| } | |
| bg.add_task(send_email_notification, email_details) | |
| if payment_data.get("status"): | |
| payment_link = payment_data["data"]["authorization_url"] | |
| return { | |
| "order_id": order_id, | |
| "total_amount": f"₦{total_amount}", | |
| "shipping_cost": f"₦{shipping_cost}", | |
| "payment_link": payment_link, | |
| "message": "Delivery request created successfully. Please complete payment using the link provided." | |
| } | |
| else: | |
| logger.error(f"Failed to generate payment link for order {order_id}") | |
| return { | |
| "order_id": order_id, | |
| "total_amount": f"₦{total_amount}", | |
| "shipping_cost": f"₦{shipping_cost}", | |
| "message": "Delivery request created, but payment initialization failed. Please try again later." | |
| } | |
| except Exception as e: | |
| logger.error(f"Error creating delivery order: {e}", exc_info=True) | |
| raise HTTPException(status_code=500, detail=f"Error: {str(e)}") | |
| async def enhanced_chatbot_handler(request: Request, bg: BackgroundTasks): | |
| data = await request.json() | |
| user_id = data["user_id"] | |
| message = data.get("message", "").strip().lower() | |
| # Initialize conversation state if it doesn't exist | |
| if user_id not in user_state: | |
| user_state[user_id] = ConversationState() | |
| state = user_state[user_id] | |
| state.update_last_active() | |
| # Generate a response template | |
| response = await DeliveryUXManager.generate_response_template(user_id) | |
| # Handle specific commands | |
| if message == "delivery": | |
| # For example, generate a dummy delivery ID using the current timestamp | |
| delivery_id = f"DEL-{int(time.time())}" | |
| response["content"]["text"] = f"Your delivery request has been received. Your delivery id is {delivery_id}." | |
| elif message == "rates": | |
| response["content"]["text"] = "Delivery rates:\n• Standard: ₦500\n• Express: ₦1000" | |
| elif message == "track delivery": | |
| response["content"]["text"] = "Please provide your delivery ID (e.g., 'track delivery DEL-123456789')." | |
| else: | |
| response["content"]["text"] = f"Received your message: {message}" | |
| state.add_context("user", message) | |
| bg.add_task(update_user_last_interaction, user_id) | |
| return JSONResponse(response) | |
| def create_paystack_payment_link(email: str, amount: int, reference: str) -> dict: | |
| url = "https://api.paystack.co/transaction/initialize" | |
| headers = { | |
| "Authorization": f"Bearer {PAYSTACK_SECRET_KEY}", | |
| "Content-Type": "application/json", | |
| } | |
| data = { | |
| "email": email, | |
| "amount": amount, # Amount must be in kobo (₦100 = 10000 kobo) | |
| "reference": reference, | |
| "callback_url": "https://your-website.com/payment_callback" | |
| } | |
| response = requests.post(url, json=data, headers=headers, timeout=10) | |
| return response.json() if response.status_code == 200 else {"status": False, "message": "Payment initialization failed"} | |
| async def update_ux_preferences(request: Request): | |
| data = await request.json() | |
| await DeliveryUXManager.handle_ux_preferences( | |
| data["user_id"], | |
| data["preference"] | |
| ) | |
| return {"status": "preferences updated"} | |
| async def get_chat_history(user_id: str): | |
| async with async_session() as session: | |
| result = await session.execute( | |
| select(ChatHistory).where(ChatHistory.user_id == user_id) | |
| ) | |
| history = result.scalars().all() | |
| return [jsonable_encoder({ | |
| "user_id": entry.user_id, | |
| "message": entry.message, | |
| "timestamp": entry.timestamp.isoformat() | |
| }) for entry in history] | |
| async def get_order(order_id: str): | |
| async with async_session() as session: | |
| result = await session.execute( | |
| select(Order).where(Order.order_id == order_id) | |
| ) | |
| order = result.scalar_one_or_none() | |
| if order: | |
| return jsonable_encoder({ | |
| "order_id": order.order_id, | |
| "user_id": order.user_id, | |
| "status": order.status, | |
| "payment_reference": order.payment_reference, | |
| "last_location": order.last_location | |
| }) | |
| else: | |
| raise HTTPException(status_code=404, detail="Order not found.") | |
| async def get_user_profile(user_id: str): | |
| profile = await get_or_create_user_profile(user_id) | |
| return { | |
| "user_id": profile.user_id, | |
| "phone_number": profile.phone_number, | |
| "name": profile.name, | |
| "email": profile.email, | |
| "preferences": profile.preferences, | |
| "last_interaction": profile.last_interaction.isoformat(), | |
| "ux_mode": profile.ux_mode, | |
| "current_order_status": profile.current_order_status | |
| } | |
| async def get_analytics(): | |
| async with async_session() as session: | |
| msg_result = await session.execute( | |
| select(func.count()).select_from(ChatHistory) | |
| ) | |
| total_messages = msg_result.scalar() or 0 | |
| order_result = await session.execute( | |
| select(func.count()).select_from(Order) | |
| ) | |
| total_orders = order_result.scalar() or 0 | |
| # This query assumes a sentiment_logs table exists with a sentiment_score column. | |
| sentiment_result = await session.execute("SELECT AVG(sentiment_score) FROM sentiment_logs") | |
| avg_sentiment = sentiment_result.scalar() or 0 | |
| return { | |
| "total_messages": total_messages, | |
| "total_orders": total_orders, | |
| "average_sentiment": avg_sentiment | |
| } | |
| async def process_voice(file: UploadFile = File(...)): | |
| contents = await file.read() | |
| simulated_text = "Simulated speech-to-text conversion result." | |
| return {"transcription": simulated_text} | |
| async def payment_callback(request: Request): | |
| PAYSTACK_SECRET_KEY = os.getenv("PAYSTACK_SECRET_KEY", "dummy_secret") | |
| MANAGEMENT_WHATSAPP_NUMBER = os.getenv("MANAGEMENT_WHATSAPP_NUMBER", "+1234567890") | |
| try: | |
| if request.method == "POST": | |
| # Verify Paystack signature | |
| signature = request.headers.get("x-paystack-signature") | |
| if not signature: | |
| raise HTTPException(status_code=403, detail="Missing signature") | |
| body = await request.body() | |
| computed_sha = hmac.new( | |
| PAYSTACK_SECRET_KEY.encode(), | |
| body, | |
| digestmod=hashlib.sha512 | |
| ).hexdigest() | |
| if not hmac.compare_digest(computed_sha, signature): | |
| raise HTTPException(status_code=403, detail="Invalid signature") | |
| data = await request.json() | |
| event = data.get("event") | |
| order_id = data.get("data", {}).get("reference") | |
| status = "Paid" if event == "charge.success" else "Failed" | |
| elif request.method == "GET": | |
| data = request.query_params | |
| order_id = data.get("reference") | |
| status = data.get("status", "Paid") | |
| if not order_id: | |
| raise HTTPException(status_code=400, detail="Missing order reference") | |
| async with async_session() as session: | |
| result = await session.execute( | |
| select(Order) | |
| .where(Order.order_id == order_id) | |
| .with_for_update() | |
| ) | |
| order = result.scalar_one_or_none() | |
| if not order: | |
| raise HTTPException(status_code=404, detail="Order not found") | |
| valid_statuses = ["Paid", "Failed", "Pending"] | |
| if status not in valid_statuses: | |
| raise HTTPException(status_code=400, detail="Invalid status") | |
| order.status = status | |
| order.payment_reference = order_id | |
| # Add tracking update | |
| tracking_entry = OrderTracking( | |
| order_id=order_id, | |
| status="Payment Updated", | |
| message=f"Payment status changed to {status}" | |
| ) | |
| session.add(tracking_entry) | |
| await session.commit() | |
| # Send notifications concurrently | |
| await asyncio.gather( | |
| log_order_tracking(order_id, "Payment Updated", f"Payment status changed to {status}"), | |
| send_whatsapp_message( | |
| MANAGEMENT_WHATSAPP_NUMBER, | |
| f"Payment Update: Order {order_id} - {status}" | |
| ), | |
| update_user_order_status(order.user_id, order_id, status) | |
| ) | |
| if request.method == "GET": | |
| redirect_url = os.getenv("PAYMENT_REDIRECT_URL", "https://default-redirect.com") | |
| return RedirectResponse( | |
| url=redirect_url, | |
| status_code=303 | |
| ) | |
| return JSONResponse(content={"status": "success", "order_id": order_id}) | |
| except HTTPException as he: | |
| raise he | |
| except Exception as e: | |
| logger.error(f"Payment callback error: {str(e)}") | |
| raise HTTPException(status_code=500, detail="Internal server error") | |
| from fastapi_cache.decorator import cache | |
| async def get_order_tracking(deliveryId: str, page: int = 1, limit: int = 10): | |
| order_id = deliveryId.upper() # Normalize the order ID to uppercase | |
| try: | |
| async with async_session() as session: | |
| order_result = await session.execute( | |
| select(Order).where(Order.order_id == order_id) | |
| ) | |
| if not order_result.scalar_one_or_none(): | |
| raise HTTPException(status_code=404, detail="Order not found") | |
| tracking_result = await session.execute( | |
| select(OrderTracking) | |
| .where(OrderTracking.order_id == order_id) | |
| .order_by(OrderTracking.timestamp.desc()) | |
| .offset((page-1)*limit) | |
| .limit(limit) | |
| ) | |
| tracking_updates = tracking_result.scalars().all() | |
| if not tracking_updates: | |
| return JSONResponse(content=[]) | |
| response = [ | |
| { | |
| "status": update.status, | |
| "message": update.message, | |
| "timestamp": update.timestamp.isoformat(), | |
| "location": json.loads(update.location) if update.location else None | |
| } | |
| for update in tracking_updates | |
| ] | |
| # Add estimated delivery time if order is shipped | |
| order = await get_order_details(order_id) | |
| if order and order.status.lower() == "shipped": | |
| loc = json.loads(order.last_location) if order.last_location else {} | |
| response[0]["estimated_delivery"] = calculate_eta(loc) | |
| return JSONResponse(content=response) | |
| except HTTPException as he: | |
| raise he | |
| except Exception as e: | |
| logger.error(f"Tracking error: {str(e)}") | |
| raise HTTPException(status_code=500, detail="Internal server error") | |
| # -------------------- | |
| # PROACTIVE FEATURES | |
| # -------------------- | |
| async def send_proactive_update(user_id: str, update_type: str): | |
| """Send unsolicited updates to users.""" | |
| state = user_state.get(user_id) | |
| if not state: | |
| return # User not active | |
| response = await DeliveryUXManager.generate_response_template(user_id) | |
| if update_type == "delivery_eta": | |
| response["content"]["text"] = "📦 Your package is arriving in 15 minutes!" | |
| response["content"]["quick_replies"] = [ | |
| {"title": "Track Live", "action": "track_live"}, | |
| {"title": "Delay Delivery", "action": "delay_delivery"} | |
| ] | |
| return response | |
| # -------------------- | |
| # ERROR HANDLING | |
| # -------------------- | |
| async def ux_error_handler(request: Request, exc: HTTPException): | |
| return JSONResponse({ | |
| "meta": {"error": True}, | |
| "content": { | |
| "text": f"⚠️ Error: {exc.detail}", | |
| "quick_replies": [{"title": "Main Menu", "action": "reset"}] | |
| } | |
| }, status_code=exc.status_code) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=8000) | |