Spaces:
Sleeping
Sleeping
| import os | |
| import logging | |
| from datetime import datetime | |
| from typing import Optional, Dict, List, Any | |
| from pathlib import Path | |
| from pymongo import MongoClient | |
| from pymongo.errors import ConnectionFailure, PyMongoError | |
| from dotenv import load_dotenv | |
| from langchain_core.tools import tool | |
| from langchain_core.messages import HumanMessage | |
| from src.llms.groqllm import GroqLLM | |
| # Load environment variables | |
| load_dotenv() | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class MongoDBConnection: | |
| """Singleton MongoDB connection handler""" | |
| _instance = None | |
| _client = None | |
| _db = None | |
| def __new__(cls): | |
| if cls._instance is None: | |
| cls._instance = super(MongoDBConnection, cls).__new__(cls) | |
| return cls._instance | |
| def __init__(self): | |
| if self._client is None: | |
| self._connect() | |
| def _connect(self): | |
| """Initialize MongoDB connection""" | |
| try: | |
| mongodb_url = os.getenv('MONGODB_URL') or os.getenv('MONGO_URL') or os.getenv('DATABASE_URL') | |
| if not mongodb_url: | |
| logger.error("No MongoDB URL found in environment variables") | |
| raise ConnectionError("MongoDB URL not configured") | |
| self._client = MongoClient(mongodb_url, serverSelectionTimeoutMS=5000) | |
| # Test the connection | |
| self._client.admin.command('ping') | |
| # Get database name from URL or use default | |
| db_name = os.getenv('MONGODB_DATABASE', 'sparrow_logistics') | |
| self._db = self._client[db_name] | |
| logger.info(f"Successfully connected to MongoDB database: {db_name}") | |
| except Exception as e: | |
| logger.error(f"Failed to connect to MongoDB: {e}") | |
| raise ConnectionError(f"MongoDB connection failed: {e}") | |
| def db(self): | |
| """Get database instance""" | |
| if self._db is None: | |
| self._connect() | |
| return self._db | |
| def client(self): | |
| """Get client instance""" | |
| if self._client is None: | |
| self._connect() | |
| return self._client | |
| def test_connection(self) -> bool: | |
| """Test if MongoDB connection is alive""" | |
| try: | |
| self._client.admin.command('ping') | |
| return True | |
| except Exception as e: | |
| logger.error(f"MongoDB connection test failed: {e}") | |
| return False | |
| # Initialize global connection | |
| mongo_conn = MongoDBConnection() | |
| def get_today_str() -> str: | |
| """Get current date in a human-readable format.""" | |
| return datetime.now().strftime("%a %b %d, %Y") | |
| def think_tool(reflection: str) -> str: | |
| """ | |
| Tool for strategic reflection on execution progress and decision-making. | |
| Use this tool after each search or database query to analyze results and plan next steps systematically. | |
| This creates a deliberate pause in customer query execution workflow for quality decision-making. | |
| When to use: | |
| - After receiving database results: What key information did I find? | |
| - Before deciding next steps: Do I have enough to answer comprehensively? | |
| - When assessing execution gaps: What specific information am I still missing? | |
| - Before concluding execution: Can I provide a complete answer now? | |
| Args: | |
| reflection: Your detailed reflection on the execution progress, findings, gaps, and next steps | |
| Returns: | |
| Confirmation that reflection was recorded for decision-making | |
| """ | |
| logger.info(f"Agent reflection: {reflection}") | |
| return f"Reflection recorded: {reflection}" | |
| def track_package(tracking_number: str) -> str: | |
| """ | |
| Track customer packages/parcels by looking up tracking information in the database. | |
| This tool queries the MongoDB database to find package tracking information, | |
| delivery status, location updates, and estimated delivery times. | |
| Args: | |
| tracking_number: The tracking number provided by the customer | |
| Returns: | |
| A string describing comprehensive information about the parcel status, location, and delivery details | |
| """ | |
| try: | |
| if not tracking_number or not tracking_number.strip(): | |
| return "Error: Please provide a valid tracking number to track your package." | |
| tracking_number = tracking_number.strip().upper() | |
| logger.info(f"Tracking package: {tracking_number}") | |
| # Query the packages collection | |
| db = mongo_conn.db | |
| packages_collection = db.packages | |
| # Find package by tracking number | |
| package = packages_collection.find_one({ | |
| "$or": [ | |
| {"tracking_number": tracking_number}, | |
| {"tracking_id": tracking_number}, | |
| {"reference_number": tracking_number} | |
| ] | |
| }) | |
| if not package: | |
| # Also check tracking_history collection for historical data | |
| tracking_collection = db.tracking_history | |
| tracking_record = tracking_collection.find_one({ | |
| "tracking_number": tracking_number | |
| }) | |
| if tracking_record: | |
| return f"Found tracking history for {tracking_number}: {tracking_record.get('status', 'Status unknown')}. Last update: {tracking_record.get('last_updated', 'Unknown')}" | |
| return f"Sorry, I couldn't find any package with tracking number '{tracking_number}'. Please double-check the tracking number and try again." | |
| # Build comprehensive tracking response | |
| status = package.get('status', 'Unknown') | |
| location = package.get('current_location', 'Location unknown') | |
| destination = package.get('destination', 'Not specified') | |
| estimated_delivery = package.get('estimated_delivery', 'Not available') | |
| last_updated = package.get('last_updated', 'Unknown') | |
| # Get tracking events if available | |
| tracking_events = package.get('tracking_events', []) | |
| events_summary = "" | |
| if tracking_events and len(tracking_events) > 0: | |
| latest_event = tracking_events[-1] if tracking_events else {} | |
| events_summary = f" Latest event: {latest_event.get('description', 'No description')} at {latest_event.get('location', 'unknown location')}" | |
| response = f"Package {tracking_number}: Status is '{status}'. Currently at: {location}. Destination: {destination}. Estimated delivery: {estimated_delivery}. Last updated: {last_updated}.{events_summary}" | |
| logger.info(f"Successfully retrieved tracking info for {tracking_number}") | |
| return response | |
| except PyMongoError as e: | |
| logger.error(f"Database error while tracking package {tracking_number}: {e}") | |
| return f"Sorry, I'm having trouble accessing the tracking database right now. Please try again in a moment. (Error: Database connection issue)" | |
| except Exception as e: | |
| logger.error(f"Unexpected error while tracking package {tracking_number}: {e}") | |
| return f"Sorry, I encountered an unexpected error while tracking your package. Please try again or contact support if the issue persists." | |
| def get_user_information(user_id: str) -> str: | |
| """ | |
| Retrieve comprehensive user information including account details, shipping history, | |
| and preferences from the database. | |
| Args: | |
| user_id: The unique identifier of the user (could be user ID, email, or phone number) | |
| Returns: | |
| A string containing user details, shipping history, preferences, and account status | |
| """ | |
| try: | |
| if not user_id or not user_id.strip(): | |
| return "Error: Please provide a valid user ID, email, or phone number to look up user information." | |
| user_id = user_id.strip() | |
| logger.info(f"Looking up user information for: {user_id}") | |
| db = mongo_conn.db | |
| users_collection = db.users | |
| # Search for user by various identifiers | |
| user = users_collection.find_one({ | |
| "$or": [ | |
| {"user_id": user_id}, | |
| {"_id": user_id}, | |
| {"email": user_id}, | |
| {"phone": user_id}, | |
| {"customer_id": user_id} | |
| ] | |
| }) | |
| if not user: | |
| return f"Sorry, I couldn't find any user account with the identifier '{user_id}'. Please check your user ID, email, or phone number and try again." | |
| # Extract user information | |
| name = user.get('name', user.get('full_name', 'Name not available')) | |
| email = user.get('email', 'Email not provided') | |
| phone = user.get('phone', 'Phone not provided') | |
| account_status = user.get('status', 'Unknown') | |
| join_date = user.get('created_at', user.get('join_date', 'Unknown')) | |
| # Get shipping statistics | |
| packages_collection = db.packages | |
| user_packages = list(packages_collection.find({"user_id": user.get('user_id', user.get('_id'))})) | |
| total_packages = len(user_packages) | |
| delivered_packages = len([p for p in user_packages if p.get('status', '').lower() in ['delivered', 'completed']]) | |
| in_transit = len([p for p in user_packages if p.get('status', '').lower() in ['in_transit', 'shipped', 'out_for_delivery']]) | |
| # Get recent packages | |
| recent_packages = sorted(user_packages, key=lambda x: x.get('created_at', datetime.min), reverse=True)[:3] | |
| recent_summary = "" | |
| if recent_packages: | |
| recent_list = [] | |
| for pkg in recent_packages: | |
| pkg_info = f"#{pkg.get('tracking_number', 'No tracking')} ({pkg.get('status', 'Unknown status')})" | |
| recent_list.append(pkg_info) | |
| recent_summary = f" Recent packages: {', '.join(recent_list)}." | |
| # Check for preferences | |
| preferences = user.get('preferences', {}) | |
| pref_summary = "" | |
| if preferences: | |
| delivery_pref = preferences.get('delivery_preference', 'Standard') | |
| notification_pref = preferences.get('notifications', 'Email') | |
| pref_summary = f" Preferences: {delivery_pref} delivery, {notification_pref} notifications." | |
| response = (f"User found: {name} ({email}, {phone}). Account status: {account_status}. " | |
| f"Member since: {join_date}. Shipping history: {total_packages} total packages, " | |
| f"{delivered_packages} delivered, {in_transit} currently in transit.{recent_summary}{pref_summary}") | |
| logger.info(f"Successfully retrieved user info for {user_id}") | |
| return response | |
| except PyMongoError as e: | |
| logger.error(f"Database error while looking up user {user_id}: {e}") | |
| return f"Sorry, I'm having trouble accessing the user database right now. Please try again in a moment. (Error: Database connection issue)" | |
| except Exception as e: | |
| logger.error(f"Unexpected error while looking up user {user_id}: {e}") | |
| return f"Sorry, I encountered an unexpected error while looking up user information. Please try again or contact support if the issue persists." | |
| def estimated_time_analysis(origin: str, destination: str) -> str: | |
| """ | |
| Estimate delivery time for a parcel based on origin and destination using | |
| database routing information, historical data, and current service conditions. | |
| Args: | |
| origin: The starting location/address for the shipment | |
| destination: The destination location/address for the shipment | |
| Returns: | |
| A string describing the estimated delivery time with detailed breakdown | |
| """ | |
| try: | |
| if not origin or not destination or not origin.strip() or not destination.strip(): | |
| return "Error: Please provide both origin and destination locations for delivery time estimation." | |
| origin = origin.strip() | |
| destination = destination.strip() | |
| logger.info(f"Estimating delivery time from {origin} to {destination}") | |
| db = mongo_conn.db | |
| # Check routes collection for specific routing data | |
| routes_collection = db.delivery_routes | |
| route = routes_collection.find_one({ | |
| "$or": [ | |
| {"origin": {"$regex": origin, "$options": "i"}, "destination": {"$regex": destination, "$options": "i"}}, | |
| {"route_name": {"$regex": f"{origin}.*{destination}", "$options": "i"}} | |
| ] | |
| }) | |
| # Check historical delivery data for similar routes | |
| packages_collection = db.packages | |
| historical_deliveries = list(packages_collection.find({ | |
| "origin": {"$regex": origin, "$options": "i"}, | |
| "destination": {"$regex": destination, "$options": "i"}, | |
| "status": {"$in": ["delivered", "completed"]}, | |
| "delivery_time_days": {"$exists": True} | |
| }).limit(10)) | |
| # Calculate estimation | |
| if route: | |
| estimated_days = route.get('estimated_days', 2) | |
| service_type = route.get('service_type', 'Standard') | |
| confidence = "High (based on established route)" | |
| elif historical_deliveries: | |
| # Calculate average from historical data | |
| delivery_times = [d.get('delivery_time_days', 2) for d in historical_deliveries] | |
| estimated_days = sum(delivery_times) / len(delivery_times) | |
| service_type = "Standard" | |
| confidence = f"Medium (based on {len(historical_deliveries)} similar deliveries)" | |
| else: | |
| # Fallback estimation based on location analysis | |
| estimated_days = 2 # Default | |
| service_type = "Standard" | |
| confidence = "Low (general estimation)" | |
| # Simple distance-based adjustment | |
| if any(word in destination.lower() for word in ['international', 'overseas', 'abroad']): | |
| estimated_days = 7 | |
| service_type = "International" | |
| elif any(word in destination.lower() for word in ['express', 'priority', 'urgent']): | |
| estimated_days = 1 | |
| service_type = "Express" | |
| # Check for current service alerts | |
| alerts_collection = db.service_alerts | |
| current_alerts = list(alerts_collection.find({ | |
| "status": "active", | |
| "$or": [ | |
| {"affected_locations": {"$regex": origin, "$options": "i"}}, | |
| {"affected_locations": {"$regex": destination, "$options": "i"}} | |
| ] | |
| })) | |
| alert_info = "" | |
| if current_alerts: | |
| delay_info = [] | |
| for alert in current_alerts[:2]: # Show max 2 alerts | |
| alert_desc = alert.get('description', 'Service delay') | |
| additional_days = alert.get('estimated_delay_days', 1) | |
| delay_info.append(f"{alert_desc} (+{additional_days} days)") | |
| estimated_days += additional_days | |
| alert_info = f" Current alerts affecting delivery: {'; '.join(delay_info)}." | |
| # Format the response | |
| estimated_days = round(estimated_days, 1) | |
| business_days = max(1, int(estimated_days)) | |
| response = (f"Estimated delivery time from {origin} to {destination}: {estimated_days} days " | |
| f"({business_days} business days) via {service_type} service. " | |
| f"Confidence level: {confidence}.{alert_info}") | |
| logger.info(f"Successfully calculated delivery estimate: {origin} → {destination} = {estimated_days} days") | |
| return response | |
| except PyMongoError as e: | |
| logger.error(f"Database error while calculating delivery estimate: {e}") | |
| return f"Sorry, I'm having trouble accessing the delivery routing database. Using general estimate: 2-3 business days for standard delivery. Please try again later for more accurate timing." | |
| except Exception as e: | |
| logger.error(f"Unexpected error during delivery estimation: {e}") | |
| return f"Sorry, I encountered an unexpected error while calculating delivery time. General estimate: 2-3 business days for standard delivery from {origin} to {destination}." | |
| def search_packages(search_criteria: str) -> str: | |
| """ | |
| Search for packages in the database using flexible criteria like user name, | |
| destination, status, date range, or other package attributes. | |
| Args: | |
| search_criteria: Search terms or criteria (e.g., "user John", "status delivered", "destination New York") | |
| Returns: | |
| A string containing search results with package information | |
| """ | |
| try: | |
| if not search_criteria or not search_criteria.strip(): | |
| return "Error: Please provide search criteria to find packages." | |
| criteria = search_criteria.strip().lower() | |
| logger.info(f"Searching packages with criteria: {criteria}") | |
| db = mongo_conn.db | |
| packages_collection = db.packages | |
| # Build search query based on criteria | |
| search_query = {} | |
| # Parse search criteria | |
| if "status" in criteria: | |
| status_terms = ["delivered", "in_transit", "pending", "shipped", "out_for_delivery"] | |
| for status in status_terms: | |
| if status in criteria: | |
| search_query["status"] = {"$regex": status, "$options": "i"} | |
| break | |
| if "user" in criteria or "customer" in criteria: | |
| # Extract user identifier after "user" keyword | |
| user_part = criteria.split("user")[-1].strip() | |
| if user_part: | |
| search_query["$or"] = [ | |
| {"user_id": {"$regex": user_part, "$options": "i"}}, | |
| {"customer_name": {"$regex": user_part, "$options": "i"}}, | |
| {"recipient_name": {"$regex": user_part, "$options": "i"}} | |
| ] | |
| if "destination" in criteria or "to" in criteria: | |
| dest_part = criteria.split("destination")[-1].strip() if "destination" in criteria else criteria.split("to")[-1].strip() | |
| if dest_part: | |
| search_query["destination"] = {"$regex": dest_part, "$options": "i"} | |
| if "origin" in criteria or "from" in criteria: | |
| origin_part = criteria.split("origin")[-1].strip() if "origin" in criteria else criteria.split("from")[-1].strip() | |
| if origin_part: | |
| search_query["origin"] = {"$regex": origin_part, "$options": "i"} | |
| # If no specific criteria matched, do a general text search | |
| if not search_query: | |
| search_query = { | |
| "$or": [ | |
| {"tracking_number": {"$regex": criteria, "$options": "i"}}, | |
| {"customer_name": {"$regex": criteria, "$options": "i"}}, | |
| {"destination": {"$regex": criteria, "$options": "i"}}, | |
| {"origin": {"$regex": criteria, "$options": "i"}}, | |
| {"description": {"$regex": criteria, "$options": "i"}} | |
| ] | |
| } | |
| # Execute search | |
| results = list(packages_collection.find(search_query).limit(10)) | |
| if not results: | |
| return f"No packages found matching '{search_criteria}'. Please try different search terms or check if the information is correct." | |
| # Format results | |
| response_parts = [f"Found {len(results)} package(s) matching '{search_criteria}':"] | |
| for i, package in enumerate(results, 1): | |
| tracking_num = package.get('tracking_number', 'No tracking') | |
| status = package.get('status', 'Unknown') | |
| destination = package.get('destination', 'Unknown destination') | |
| customer = package.get('customer_name', package.get('recipient_name', 'Unknown customer')) | |
| response_parts.append(f"{i}. #{tracking_num} - {customer} → {destination} (Status: {status})") | |
| logger.info(f"Search completed: found {len(results)} packages") | |
| return "\n".join(response_parts) | |
| except PyMongoError as e: | |
| logger.error(f"Database error during package search: {e}") | |
| return f"Sorry, I'm having trouble searching the package database right now. Please try again in a moment." | |
| except Exception as e: | |
| logger.error(f"Unexpected error during package search: {e}") | |
| return f"Sorry, I encountered an unexpected error while searching for packages. Please try again." | |
| def get_service_alerts(location: str = "") -> str: | |
| """ | |
| Retrieve current service alerts, delays, and operational updates that might affect deliveries. | |
| Args: | |
| location: Optional location to filter alerts (e.g., "New York", "California") | |
| Returns: | |
| A string containing current service alerts and operational information | |
| """ | |
| try: | |
| logger.info(f"Getting service alerts for location: {location or 'all locations'}") | |
| db = mongo_conn.db | |
| alerts_collection = db.service_alerts | |
| # Build query | |
| query = {"status": "active"} | |
| if location and location.strip(): | |
| query["$or"] = [ | |
| {"affected_locations": {"$regex": location.strip(), "$options": "i"}}, | |
| {"title": {"$regex": location.strip(), "$options": "i"}}, | |
| {"description": {"$regex": location.strip(), "$options": "i"}} | |
| ] | |
| alerts = list(alerts_collection.find(query).sort("priority", -1).limit(5)) | |
| if not alerts: | |
| location_msg = f" for {location}" if location else "" | |
| return f"Good news! There are currently no active service alerts{location_msg}. All services are operating normally." | |
| # Format alerts | |
| response_parts = [] | |
| location_msg = f" affecting {location}" if location else "" | |
| response_parts.append(f"Current service alerts{location_msg}:") | |
| for i, alert in enumerate(alerts, 1): | |
| title = alert.get('title', 'Service Alert') | |
| description = alert.get('description', 'No details available') | |
| severity = alert.get('severity', 'Medium') | |
| affected_locations = alert.get('affected_locations', []) | |
| estimated_resolution = alert.get('estimated_resolution', 'Unknown') | |
| location_info = "" | |
| if affected_locations: | |
| if isinstance(affected_locations, list): | |
| location_info = f" (Affects: {', '.join(affected_locations[:3])})" | |
| else: | |
| location_info = f" (Affects: {affected_locations})" | |
| response_parts.append(f"{i}. [{severity}] {title}{location_info}") | |
| response_parts.append(f" {description}") | |
| if estimated_resolution != 'Unknown': | |
| response_parts.append(f" Expected resolution: {estimated_resolution}") | |
| logger.info(f"Retrieved {len(alerts)} service alerts") | |
| return "\n".join(response_parts) | |
| except PyMongoError as e: | |
| logger.error(f"Database error while getting service alerts: {e}") | |
| return "Sorry, I'm having trouble accessing service alert information right now. Please check our website or contact support for current service status." | |
| except Exception as e: | |
| logger.error(f"Unexpected error while getting service alerts: {e}") | |
| return "Sorry, I encountered an unexpected error while retrieving service alerts. Please try again later." | |
| # List of all available tools | |
| tools = [ | |
| think_tool, | |
| track_package, | |
| get_user_information, | |
| estimated_time_analysis, | |
| search_packages, | |
| get_service_alerts | |
| ] | |
| # Tools by name for easy access | |
| tools_by_name = {tool.name: tool for tool in tools} |