Spaces:
Sleeping
Sleeping
| """Payment Agent for Invoice Processing""" | |
| # TODO: Implement agent | |
| import os | |
| import json | |
| import requests | |
| from typing import Dict, Any, Optional | |
| from datetime import datetime, timedelta | |
| import google.generativeai as genai | |
| from dotenv import load_dotenv | |
| import time | |
| import requests | |
| from agents.base_agent import BaseAgent | |
| from state import ( | |
| InvoiceProcessingState, PaymentDecision, PaymentStatus, | |
| RiskLevel, ValidationStatus, ProcessingStatus, RiskAssessment | |
| ) | |
| from utils.logger import StructuredLogger | |
| load_dotenv() | |
| class PaymentAgent(BaseAgent): | |
| """Agent responsible for payment processing decisions and execution""" | |
| # Persistent in-memory history (like validation agent) | |
| health_history = [] | |
| def __init__(self, config: Dict[str, Any] = None): | |
| # pass | |
| super().__init__("payment_agent", config) | |
| self.logger = StructuredLogger("PaymentAgent") | |
| self.approved_vendor_list = ["Acme Corporation", "TechNova Ltd", "SupplyCo"] | |
| self.retry_limit = 3 | |
| # Health metrics tracking | |
| self.total_executions = 0 | |
| self.successful_executions = 0 | |
| self.failed_executions = 0 | |
| self.total_duration = 0.0 | |
| self.last_transaction_id = None | |
| self.last_run = None | |
| def _validate_preconditions(self, state: InvoiceProcessingState, workflow_type) -> bool: | |
| # pass | |
| if workflow_type == "expedited": | |
| return bool(state.validation_result.validation_status.VALID and state.invoice_data) | |
| else: | |
| return bool(state.risk_assessment and state.invoice_data) | |
| def _validate_postconditions(self, state: InvoiceProcessingState) -> bool: | |
| # pass | |
| return bool(state.payment_decision) | |
| async def execute(self, state: InvoiceProcessingState, workflow_type) -> InvoiceProcessingState: | |
| # pass | |
| start_time = time.time() | |
| try: | |
| if not self._validate_preconditions(state, workflow_type): | |
| state.overall_status = ProcessingStatus.FAILED | |
| self._log_decision(state, "Payment Agent Failed", "Preconditions not met", confidence=0.0) | |
| return state | |
| invoice_data = state.invoice_data | |
| validation_result = state.validation_result | |
| if workflow_type == "expedited": | |
| risk_assessment = RiskAssessment( | |
| risk_level = RiskLevel.LOW, | |
| risk_score = 0.3, | |
| fraud_indicators = None, | |
| compliance_issues = None, | |
| recommendation = None, | |
| reason = "Expedited Workflow Called", | |
| requires_human_review = "Not needed due to Expedited Workflow" | |
| ) | |
| payment_decision = PaymentDecision( | |
| decision = "auto_pay", | |
| status = PaymentStatus.APPROVED, | |
| approved_amount = invoice_data.total, | |
| transaction_id = f"TXN-{datetime.utcnow().strftime('%Y-%m-%d-%H%M%S')}", | |
| payment_method = self._select_payment_method(invoice_data.total), | |
| approval_chain = ["system_auto_approval"], | |
| rejection_reason = None, | |
| scheduled_date = self._calculate_payment_date(invoice_data.due_date, "ACH") | |
| ) | |
| payment_result = await self._execute_payment(invoice_data, payment_decision) | |
| payment_decision = self._update_payment_decision(payment_decision, payment_result) | |
| justification = await self._generate_payment_justification( | |
| invoice_data, payment_decision, validation_result, risk_assessment | |
| ) | |
| state.payment_decision = payment_decision | |
| state.overall_status = ProcessingStatus.COMPLETED | |
| state.current_agent = "payment_agent" | |
| # success criteria | |
| if payment_decision.status == PaymentStatus.APPROVED: | |
| self.successful_executions += 1 | |
| else: | |
| self.failed_executions += 1 | |
| self.last_transaction_id = payment_decision.transaction_id | |
| self._log_decision(state, payment_decision.status, justification, 95.0, state.process_id) | |
| return state | |
| else: | |
| risk_assessment = state.risk_assessment | |
| payment_decision = await self._make_payment_decision( | |
| invoice_data, validation_result, risk_assessment, state | |
| ) | |
| if payment_decision.decision == "auto_pay": | |
| state.approval_chain = [ | |
| { | |
| "approved_by":"system_auto_approval in payment_agent", | |
| "timestamp": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") | |
| } | |
| ] | |
| else: | |
| state.approval_chain = [{"payment_agent":"Failed or Rejected"}] | |
| payment_result = await self._execute_payment(invoice_data, payment_decision) | |
| payment_decision = self._update_payment_decision(payment_decision, payment_result) | |
| justification = await self._generate_payment_justification( | |
| invoice_data, payment_decision, validation_result, risk_assessment | |
| ) | |
| state.payment_decision = payment_decision | |
| state.overall_status = ProcessingStatus.COMPLETED | |
| state.current_agent = "payment_agent" | |
| # success criteria | |
| if payment_decision.status == PaymentStatus.APPROVED: | |
| print("self.successful_executions---", self.successful_executions) | |
| self.successful_executions += 1 | |
| else: | |
| self.failed_executions += 1 | |
| self.last_transaction_id = payment_decision.transaction_id | |
| self._log_decision(state, payment_decision.status, justification, 95.0, state.process_id) | |
| return state | |
| except Exception as e: | |
| self.failed_executions += 1 | |
| self.logger.logger.error(f"[PaymentAgent] Execution failed: {e}") | |
| state.overall_status = ProcessingStatus.FAILED | |
| return state | |
| finally: | |
| duration = (time.time() - start_time) * 1000 # in ms | |
| print("self.total_executions---", self.total_executions) | |
| self.last_run = datetime.utcnow().isoformat() | |
| self.total_executions += 1 | |
| self.total_duration += duration | |
| self._record_health_metrics(duration) | |
| async def _make_payment_decision(self, invoice_data, validation_result, | |
| risk_assessment, state: InvoiceProcessingState) -> PaymentDecision: | |
| # pass | |
| amount = invoice_data.total or invoice_data.total_amount or 0.0 | |
| risk_level = risk_assessment.risk_level | |
| validation_status = validation_result.validation_status | |
| if risk_level == RiskLevel.CRITICAL or validation_status == ValidationStatus.INVALID: | |
| decision = PaymentDecision( | |
| decision = "reject", | |
| status = PaymentStatus.FAILED, | |
| approved_amount = 0.0, | |
| transaction_id = None, | |
| payment_method = None, | |
| approval_chain = [], | |
| rejection_reason = "Critical Risk or Invalid Validation", | |
| scheduled_date = None | |
| ) | |
| elif risk_level == RiskLevel.LOW or amount < 5000: | |
| decision = PaymentDecision( | |
| decision = "auto_pay", | |
| status = PaymentStatus.APPROVED, | |
| approved_amount = amount, | |
| transaction_id = f"TXN-{datetime.utcnow().strftime('%Y-%m-%d-%H%M%S')}", | |
| payment_method = self._select_payment_method(amount), | |
| approval_chain = ["system_auto_approval"], | |
| rejection_reason = None, | |
| scheduled_date = self._calculate_payment_date(invoice_data.due_date, "ACH") | |
| ) | |
| elif risk_level == RiskLevel.MEDIUM or validation_status == ValidationStatus.PARTIAL_MATCH: | |
| decision = PaymentDecision( | |
| decision = "hold", | |
| status = PaymentStatus.PENDING_APPROVAL, | |
| approved_amount = amount, | |
| transaction_id = None, | |
| payment_method = self._select_payment_method(amount), | |
| approval_chain = ["system_auto_approval", "finance_manager_approval"], | |
| rejection_reason = None, | |
| scheduled_date = self._calculate_payment_date(invoice_data.due_date, "ACH") | |
| ) | |
| else: | |
| decision = PaymentDecision( | |
| decision = "manual_approval", | |
| status = PaymentStatus.PENDING_APPROVAL, | |
| approved_amount = amount, | |
| transaction_id = None, | |
| payment_method = self._select_payment_method(amount), | |
| approval_chain = ["system_auto_approval", "executive_approval"], | |
| rejection_reason = None, | |
| scheduled_date = self._calculate_payment_date(invoice_data.due_date, "WIRE") | |
| ) | |
| return decision | |
| def _select_payment_method(self, amount: float) -> str: | |
| # pass | |
| if amount < 5000: | |
| return "ACH" | |
| elif amount < 25000: | |
| return "WIRE" | |
| return "MANUAL" | |
| def _calculate_payment_date(self, due_date_str: Optional[str], payment_method: str) -> datetime: | |
| # pass | |
| due_date = self._parse_date(due_date_str) | |
| if not due_date: | |
| due_date = datetime.utcnow().date() + timedelta(days=3) | |
| offset = 1 if payment_method == "ACH" else 2 | |
| return datetime.combine(due_date, datetime.min.time()) + timedelta(days=offset) | |
| def _parse_date(self, date_str: str) -> Optional[datetime.date]: | |
| # pass | |
| if not date_str: | |
| return None | |
| try: | |
| return datetime.strptime(date_str, "%Y-%m-%d").date() | |
| except Exception: | |
| return None | |
| # async def _execute_payment(self, invoice_data, payment_decision: PaymentDecision) -> Dict[str, Any]: | |
| # # pass | |
| # await self._async_sleep(1) | |
| # response = requests.post("http://localhost:8000", data=PaymentRequest) | |
| # if payment_decision.status == PaymentStatus.FAILED: | |
| # return {"status": "failed", "message": "Payment rejected by policy."} | |
| # return {"status": "success", "transaction_id": payment_decision.transaction_id or f"TXN-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}", "message": "Payment executed."} | |
| async def _execute_payment(self, invoice_data, payment_decision: PaymentDecision) -> Dict[str, Any]: | |
| """Send payment request to web API and return response with transaction_id""" | |
| import asyncio | |
| await asyncio.sleep(1) | |
| payment_payload = { | |
| "order_id": invoice_data.invoice_number or f"INV-{int(datetime.utcnow().timestamp())}", | |
| "customer_name": invoice_data.customer_name or "Unknown Vendor", | |
| "amount": float(invoice_data.total), | |
| "currency": "USD", | |
| # "method": payment_decision.payment_method.lower(), | |
| "recipient_account": "auto_generated_account", | |
| "due_date": str(invoice_data.due_date or datetime.utcnow().date()) | |
| } | |
| try: | |
| response = requests.post("https://partha181098-invoicepaymentapi.hf.space/initiate_payment", json=payment_payload, timeout=10) | |
| if response.status_code == 200: | |
| result = response.json() | |
| print("res from apiii =======", result) | |
| return { | |
| "status": "success" if result["status"] == "SUCCESS" else "failed", | |
| "transaction_id": result["transaction_id"], | |
| "message": result["message"] | |
| } | |
| else: | |
| print("res from apiii111111 =======", result) | |
| return {"status": "failed", "message": f"HTTP {response.status_code}: {response.text}"} | |
| except Exception as e: | |
| print("res from apiii111111222222222222 =======", result) | |
| return {"status": "failed", "message": f"Payment API error: {e}"} | |
| async def _async_sleep(self, seconds: int): | |
| # pass | |
| import asyncio | |
| await asyncio.sleep(seconds) | |
| def _update_payment_decision(self, payment_decision: PaymentDecision, | |
| payment_result: Dict[str, Any]) -> PaymentDecision: | |
| # pass | |
| if payment_result.get("status") == "success": | |
| payment_decision.status = PaymentStatus.APPROVED | |
| payment_decision.transaction_id = payment_result.get("transaction_id") | |
| else: | |
| payment_decision.status = PaymentStatus.FAILED | |
| payment_decision.rejection_reason = payment_result.get("message") | |
| return payment_decision | |
| async def _generate_payment_justification(self, invoice_data, payment_decision: PaymentDecision, | |
| validation_result, risk_assessment) -> str: | |
| # pass | |
| reason = f"Payment Decision: {payment_decision.status}. " | |
| if payment_decision.status == PaymentStatus.FAILED: | |
| reason += f"Reason: {payment_decision.rejection_reason}" | |
| reason += f"Risk level: {risk_assessment.risk_level}. Validation: {validation_result.validation_status}." | |
| return reason | |
| def _record_health_metrics(self, duration: float): | |
| """Update and record health statistics""" | |
| success_rate = ( | |
| (self.successful_executions / self.total_executions) * 100 | |
| if self.total_executions else 0 | |
| ) | |
| avg_duration = ( | |
| self.total_duration / self.total_executions | |
| if self.total_executions else 0 | |
| ) | |
| overall_status = "π’ Healthy" | |
| if success_rate < 70: | |
| overall_status = "π Degraded" | |
| if success_rate < 60: | |
| overall_status = "π΄ Unhealthy" | |
| metrics = { | |
| "Agent": "Payment Agent π³", | |
| "Executions": self.total_executions, | |
| "Success Rate (%)": round(success_rate, 2), | |
| "Avg Duration (ms)": round(avg_duration, 2), | |
| "Total Failures": self.failed_executions, | |
| "Last Transaction ID": self.last_transaction_id or "N/A", | |
| # "Timestamp": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"), | |
| "Last Run": self.last_run, | |
| "Overall Health": overall_status, | |
| } | |
| PaymentAgent.health_history.append(metrics) | |
| PaymentAgent.health_history = PaymentAgent.health_history[-50:] # keep last 50 | |
| async def health_check(self) -> Dict[str, Any]: | |
| """Return the current or last known health state""" | |
| await self._async_sleep(0.05) | |
| if not PaymentAgent.health_history: | |
| return { | |
| "Agent": "Payment Agent π³", | |
| "Executions": 0, | |
| "Success Rate (%)": 0.0, | |
| "Avg Duration (ms)": 0.0, | |
| "Total Failures": 0, | |
| "Last Transaction ID": "N/A", | |
| } | |
| return PaymentAgent.health_history[-1] | |