File size: 14,993 Bytes
2b44e69
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358

"""Validation Agent for Invoice Processing"""

# TODO: Implement agent
import asyncio
import os
import pandas as pd
from typing import Dict, Any, List, Tuple
from fuzzywuzzy import fuzz
import numpy as np
import time
from agents.base_agent import BaseAgent
from state import (
    InvoiceProcessingState, ValidationResult, ValidationStatus,
    ProcessingStatus
)
from datetime import datetime, timedelta

from utils.logger import StructuredLogger
from difflib import SequenceMatcher

class ValidationAgent(BaseAgent):
    """Agent responsible for validating invoice data against purchase orders"""
    
    health_history: List[Dict[str, Any]] = []  # global history for metrics

    def __init__(self, config: Dict[str, Any] = None):
        # pass
        super().__init__(agent_name="validation_agent",config=config or {})
        self.logger = StructuredLogger(__name__)
        self.po_file = self.config.get("po_file","data/purchase_orders.csv")
        self.tolerance = self.config.get("tolerance",0.05)
        self.successful_executions = 0
        self.failed_executions = 0
        self.total_duration = 0.0
        self.total_executions = 0
        self.last_run = None
        # self.match_threshold = self.config.get("match_threshold",80)

    def _validate_preconditions(self, state: InvoiceProcessingState, workflow_type) -> bool:
        # pass
        if not state.invoice_data:
            self.logger.logger.error("No invoice data available for validation.")
            return False
        return True

    def _validate_postconditions(self, state: InvoiceProcessingState) -> bool:
        # pass
        return hasattr(state,'validation_result') and state.validation_result is not None

    async def execute(self, state: InvoiceProcessingState, workflow_type) -> InvoiceProcessingState:
        # pass
        self.logger.logger.info(f"[ValidationAgent] Starting validation for {state.file_name}")
        start_time = time.time()
        try:
            if not self._validate_preconditions(state, workflow_type):
                state.status = ProcessingStatus.FAILED
                self._log_decision(state,"Validation Failed","Precondition not met",confidence = 0.0)
                return state
            invoice_data = state.invoice_data
            matching_pos = await self._find_matching_pos(invoice_data)
            validation_result = await self._validate_against_pos(invoice_data,matching_pos)
            state.validation_result = validation_result
            state.current_agent = "validation_agent"
            state.overall_status = ProcessingStatus.IN_PROGRESS
    
            if self._should_escalate_validation(validation_result, invoice_data):
                state.escalation_required = True
            self._validate_postconditions(state)
            self.successful_executions += 1
            self.last_run = datetime.utcnow().isoformat()
            # print("ValidationResult().confidence_score", state.validation_result.confidence_score)
            self._log_decision(
                state,
                "Validation Successful",
                "PDF text successfully validated and checked by AI",
                state.validation_result.confidence_score,
                state.process_id
            )
            return state
        except Exception as e:
            self.logger.logger.error(f"[ValidationAgent] Execution failed: {e}")
            self.failed_executions += 1
            state.overall_status = ProcessingStatus.FAILED
            return state

        finally:
            duration = (time.time() - start_time) * 1000  # ms
            self.total_executions += 1
            self.total_duration += duration
            self._record_health_metrics(duration)
    
    def _load_purchase_orders(self) -> pd.DataFrame:
        # pass
        """load po data from csv"""
        try:
            df = pd.read_csv(self.po_file)
            self.logger.logger.info(f"[ValidationAgent] Loaded {len(df)} purchase orders")
            return df
        except Exception as e:
            self.logger.logger.error(f"[ValidationAgent] failed to load purchase order: {e}")
            raise

    async def _find_matching_pos(self, invoice_data) -> List[Dict[str, Any]]:
        """find POs matching invoice order_id or fuzzy customer/items"""
        po_df = self._load_purchase_orders()
        matches = []
        for _,po in po_df.iterrows():
            customer_score = fuzz.token_sort_ratio(po["customer_name"], invoice_data.customer_name)
            order_id_score = fuzz.token_sort_ratio(po["order_id"], invoice_data.order_id)
            for item in invoice_data.item_details:
                item_score = fuzz.token_sort_ratio(po["item_name"],item.item_name)
                print(f"Compairing PO item {po['item_name']} with invoice item {item.item_name}: score = {item_score}")

            if (customer_score >= 80) and (item_score >=80) and (order_id_score >=80) and (po['invoice_number'] == int(invoice_data.invoice_number)):
                matches.append(po.to_dict())

        print("matches.....", matches)
        return matches


    async def _validate_against_pos(self, invoice_data, matching_pos: List[Dict[str, Any]]) -> ValidationResult:
        # pass

        if not matching_pos:
            return ValidationResult(po_found=False, validation_status='missing_po',validation_result='No matching purchase order found',
            discrepancies = [],
            confidence_score = 0.0)
        po_data = matching_pos[0]
        discrepancies = self._validate_item_against_po(invoice_data,po_data)
        discrepancies += self._validate_totals(invoice_data,po_data)
        actual_amount = [item.amount for item in invoice_data.item_details][0]
        actual_quantity = [item.quantity for item in invoice_data.item_details][0]
        actual_rate = [item.rate for item in invoice_data.item_details][0]
        amount_diff = abs(actual_amount - po_data.get('expected_amount',0))
        tolerance_limit = po_data.get('expected_amount',0)*self.tolerance
        amount_match = amount_diff <= tolerance_limit
        
        validation_result = ValidationResult(
            po_found=True,
            quantity_match=actual_quantity == po_data.get('quantity'),
            rate_match=abs(actual_rate - po_data.get('rate', 0)) <= tolerance_limit,
            amount_match=amount_match,
            validation_status=ValidationStatus.NOT_STARTED,  # temporary
            validation_result="; ".join(discrepancies) if discrepancies else "All checks passed",
            discrepencies=discrepancies,
            confidence_score=0.0,  # temporary
            expected_amount=po_data.get('amount'),
            po_data=po_data
        )
        validation_result.validation_status = self._determine_validation_status(validation_result)
        validation_result.confidence_score = self._calculate_validation_confidence(validation_result, matching_pos, invoice_data)
        return validation_result

    def _validate_item_against_po(self, item, po_data: Dict[str, Any]) -> List[str]:
        # pass
        # print("itemmmmmmmmm", item.item_details.quantity)
        print("po_-------------", po_data)
        discrepancies = []
        for item in item.item_details:
            if item.quantity != po_data.get('quantity'):
                discrepancies.append(f"Quantity mismatch: Expected {po_data['quantity']}, Found {item.quantity}")
            if abs(item.rate - po_data.get('rate',0)) > po_data.get('rate',0)*self.tolerance:
                discrepancies.append(f"Rate mismatch: Expected {po_data['rate']}, Found {item.rate}")
            return discrepancies

    def _validate_totals(self, invoice_data, po_data: Dict[str, Any]) -> List[str]:
        # pass
        discrepancies = []
        expected = po_data.get('expected_amount',0)
        actual = [item.amount for item in invoice_data.item_details][0]
        diff = abs(expected-actual)
        if diff > expected*self.tolerance:
            discrepancies.append(f"Total amount mismatch: Expected {expected}, Actual {actual} (Difference:{diff:.2f})")
        return discrepancies

    def _calculate_validation_confidence(self, validation_result: ValidationResult,
                                         matching_pos: List[Dict[str, Any]], invoice_data) -> float:
        """
        Compute an intelligent, weighted confidence score across 7 key dimensions:
        invoice_number, order_id, customer_name, item_name, amount, rate, quantity.
        Each field contributes based on importance.
        """
    
        if not validation_result.po_found or not matching_pos:
            return 0.0
    
        po_data = matching_pos[0]
    
        # Extract PO (expected) values
        expected = {
            "invoice_number": po_data.get("invoice_number", ""),
            "order_id": po_data.get("order_id", ""),
            "customer_name": po_data.get("customer_name", ""),
            "item_name": po_data.get("item_name", ""),
            "amount": float(po_data.get("expected_amount", po_data.get("amount", 0))),
            "rate": float(po_data.get("rate", 0)),
            "quantity": float(po_data.get("quantity", 0))
        }
    
        # Extract actual (from invoice)
        actual = {
            "invoice_number": invoice_data.invoice_number,
            "order_id": invoice_data.order_id,
            "customer_name": invoice_data.customer_name,
        }
    
        # Handle line-item level (assuming single dominant item)
        if invoice_data.item_details:
            item = invoice_data.item_details[0]
            actual.update({
                "item_name": item.item_name,
                "amount": float(item.amount or 0),
                "rate": float(item.rate or 0),
                "quantity": float(item.quantity or 0)
            })
    
        # Define weights intelligently (sum = 1)
        weights = {
            "invoice_number": 0.20,
            "order_id": 0.15,
            "customer_name": 0.05,
            "item_name": 0.05,
            "amount": 0.25,
            "rate": 0.15,
            "quantity": 0.15
        }
    
        # --- Similarity functions ---
        def numeric_similarity(expected_val, actual_val):
            if expected_val == 0:
                return 1.0 if actual_val == 0 else 0.0
            diff_ratio = abs(expected_val - actual_val) / (abs(expected_val) + 1e-6)
            return max(0.0, 1.0 - diff_ratio)
    
        def text_similarity(a, b):
            return SequenceMatcher(None, str(a).lower(), str(b).lower()).ratio()
    
        # --- Compute weighted similarities ---
        weighted_scores = []
        for field, weight in weights.items():
            exp_val, act_val = expected.get(field), actual.get(field)
    
            if isinstance(exp_val, (int, float)) and isinstance(act_val, (int, float)):
                score = numeric_similarity(exp_val, act_val)
            else:
                score = text_similarity(exp_val, act_val)
    
            weighted_scores.append(weight * score)
    
        # Combine to final confidence
        confidence = sum(weighted_scores)
        confidence = round(confidence * 100, 2)  # convert to %
        confidence = max(0.0, min(confidence, 100.0))  # clamp 0–100
    
        self.logger.logger.debug(f"Validation Confidence (weighted): {confidence}%")
        return confidence



    def _determine_validation_status(self, validation_result: ValidationResult) -> ValidationStatus:
        """
        Determine the final validation status based on PO existence, discrepancies, and amount match.
        """
        if not validation_result.po_found:
            return ValidationStatus.MISSING_PO
    
        discrepancies_count = len(validation_result.discrepencies)
    
        if discrepancies_count == 0 and validation_result.amount_match:
            return ValidationStatus.VALID
    
        if validation_result.amount_match and discrepancies_count <= 2:
            return ValidationStatus.PARTIAL_MATCH
    
        return ValidationStatus.INVALID


    def _should_escalate_validation(self, validation_result: ValidationResult, invoice_data) -> bool:
        # pass
        return validation_result.validation_status in ['invalid','missing_po']

    def _record_health_metrics(self, duration: float):
        """Record the health metrics after each execution"""
        success_rate = (
            (self.successful_executions / self.total_executions) * 100
            if self.total_executions > 0 else 0
        )
        avg_duration = (
            self.total_duration / self.total_executions
            if self.total_executions > 0 else 0
        )

        metrics = {
            "Agent": "Validation Agent βœ…",
            "Executions": self.total_executions,
            "Success Rate (%)": round(success_rate, 2),
            "Avg Duration (ms)": round(avg_duration, 2),
            "Total Failures": self.failed_executions,
            # "Timestamp": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
        }
        metrics_data = {}
        executions = 0
        success_rate = 0.0
        avg_duration = 0.0
        failures = 0
        last_run = None

        if self.metrics:
            print("self.metrics from validation agent", self.metrics)
            executions = self.metrics["processed"]
            print("executions.....", executions)
            avg_duration = self.metrics["avg_latency_ms"]
            failures = self.metrics["errors"]
            last_run = self.metrics["last_run_at"]
            print("last_run.....", last_run)
            success_rate = (executions - failures) / (executions + 1e-6)

        # if last_run == None:
        last_run = self.last_run

        # 3. Health logic
        overall_status = "🟒 Healthy"
        if failures > 3:
            overall_status = "🟠 Degraded"
        if executions > 0 and success_rate < 0.5:
            overall_status = "πŸ”΄ Unhealthy"
    
        print("metrics from val---....1", metrics)
    
        metrics.update({
            "Last Run": str(last_run) if last_run else "Not applicable",
            "Overall Health": overall_status,
        })
        print("metrics from val---....", metrics)
        # maintain up to last 50 records
        ValidationAgent.health_history.append(metrics)
        # ValidationAgent.health_history = ValidationAgent.health_history[-50:]

    async def health_check(self) -> Dict[str, Any]:
        """
        Returns the health metrics summary for UI display.
        """
        await asyncio.sleep(0.05)
        if not ValidationAgent.health_history:
            return {
                "Agent": "Validation Agent βœ…",
                "Executions": 0,
                "Success Rate (%)": 0.0,
                "Avg Duration (ms)": 0.0,
                "Total Failures": 0,
            }


        latest = ValidationAgent.health_history[-1]
        print("latest.....", latest)
        return latest