File size: 6,942 Bytes
63c9bbb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import pandas as pd
import os
import logging
import re

# Configure logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

def extract_order_number(query):
    """Extracts the order number from the user's query using regex."""
    match = re.search(r'\b\d{3}-\d{7}-\d{7}\b', query)
    if match:
        return match.group(0)
    return None

def extract_tracking_number(query):
    """Extracts the tracking number from the user's query using regex."""
    match = re.search(r'\b[A-Z0-9]{10,}\b', query)
    if match:
        return match.group(0)
    return None

class OrderTrackingAgent:
    def __init__(self, csv_path='./data/order_track.csv'):
        self.csv_path = csv_path
        self.order_data = None
        self.pending_actions = {}  # To handle ongoing refund/replacement flows
        self.load_data()

    def load_data(self):
        """Load the order tracking data from the CSV file."""
        try:
            if os.path.exists(self.csv_path):
                self.order_data = pd.read_csv(self.csv_path)
                logger.info("Order tracking data loaded successfully.")
                logger.debug(f"Columns in CSV: {self.order_data.columns.tolist()}")
            else:
                raise FileNotFoundError(f"CSV file not found at {self.csv_path}")
        except Exception as e:
            logger.error(f"Error loading order tracking data: {e}")

    def process_query(self, query, order_number=None, tracking_number=None):
        """

        Process order-related queries.



        Args:

            query (str): The user's query.

            order_number (str, optional): The order number extracted from the query.

            tracking_number (str, optional): The tracking number extracted from the query.

        Returns:

            str: The response to the user's query.

        """
        logger.info(f"Processing query: {query}")

        # Extract identifiers if not explicitly provided
        if not order_number and not tracking_number:
            order_number = self.extract_order_number(query)
            tracking_number = self.extract_tracking_number(query)

        logger.debug(f"Extracted Order Number: {order_number}")
        logger.debug(f"Extracted Tracking Number: {tracking_number}")

        if "refund" in query.lower():
            return self.handle_refund_flow(query, order_number)
        elif "replace" in query.lower():
            return self.handle_replacement_flow(query, order_number)

        # Normal tracking queries
        if order_number:
            return self._find_order_by_column("Order number", order_number)
        elif tracking_number:
            return self._find_order_by_column("Tracking number", tracking_number)

        return "Please provide a valid order number or tracking number to proceed."

    def _find_order_by_column(self, column_name, value):
        """Find order details based on a specific column."""
        try:
            result = self.order_data[self.order_data[column_name].astype(str).str.strip() == value.strip()]
            if not result.empty:
                order_info = result.to_dict(orient="records")[0]
                response = f"**Order Details:**\n"
                for key, val in order_info.items():
                    response += f"**{key}:** {val}\n"
                return response.strip()
            else:
                return f"No order found with {column_name.lower()}: {value}. Please check the number and try again."
        except KeyError:
            return f"Invalid column name: {column_name}. Unable to process the query."
        except Exception as e:
            logger.error(f"Error retrieving order data: {e}")
            return "An error occurred while processing your request."

    def handle_refund_flow(self, query, order_number):
        """Handles refund flow step-by-step."""
        if "order number" not in self.pending_actions:
            if not order_number:
                return "Sure! Please provide your order number to proceed with the refund."
            self.pending_actions["order number"] = order_number
            return "Order found! Confirm if you'd like to initiate the refund."

        if "confirmed" not in self.pending_actions:
            if "yes" in query.lower():
                self.pending_actions["confirmed"] = True
                return "Thank you! I have initiated your refund process. What time shall I arrange to pick up the item? Please provide a convenient date and time."

        if "pickup time" not in self.pending_actions:
            pickup_time = query.strip()
            self.pending_actions["pickup time"] = pickup_time
            return f"Got it! Your pickup is scheduled for {pickup_time}. Please ensure the parcel is securely packed. Once we receive it, the refund will be credited to your Amazon Pay balance."

        # Clear the flow once complete
        self.pending_actions.clear()
        return "Refund process complete. Let me know if there's anything else I can help with!"

    def handle_replacement_flow(self, query, order_number):
        """Handles replacement flow step-by-step."""
        if "order number" not in self.pending_actions:
            if not order_number:
                return "Sure! Please provide your order number to process the replacement."
            self.pending_actions["order number"] = order_number
            return "Order found! Confirm if you'd like to proceed with the replacement request."

        if "confirmed" not in self.pending_actions:
            if "yes" in query.lower():
                self.pending_actions["confirmed"] = True
                return "Your replacement process has been initiated. What time shall I arrange to pick up the item for replacement? Please provide a convenient date and time."

        if "pickup time" not in self.pending_actions:
            pickup_time = query.strip()
            self.pending_actions["pickup time"] = pickup_time
            return f"Got it! Your pickup is scheduled for {pickup_time}. Please ensure the item is in its original condition and securely packed. Once we receive it, your replacement will be processed."

        # Clear the flow once complete
        self.pending_actions.clear()
        return "Replacement process complete. Let me know if there's anything else I can help with!"

    def extract_order_number(self, query):
        """Extracts the order number from the query."""
        return extract_order_number(query)

    def extract_tracking_number(self, query):
        """Extracts the tracking number from the query."""
        return extract_tracking_number(query)

    def clear_context(self):
        """Reset the context if required (placeholder for compatibility)."""
        self.pending_actions.clear()