Spaces:
Running
Running
| """ | |
| Payment Transaction Manager | |
| Centralized payment transaction management similar to CreditTransactionManager. | |
| Handles order creation, verification, and analytics. | |
| """ | |
| import uuid | |
| import logging | |
| from datetime import datetime | |
| from typing import Optional, List, Dict, Any | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from sqlalchemy import select, func | |
| from core.models import PaymentTransaction, User | |
| logger = logging.getLogger(__name__) | |
| class PaymentTransactionError(Exception): | |
| """Base exception for payment transaction errors.""" | |
| pass | |
| class TransactionNotFoundError(PaymentTransactionError): | |
| """Transaction not found.""" | |
| pass | |
| class PaymentTransactionManager: | |
| """ | |
| Centralized manager for all payment transactions. | |
| Similar to CreditTransactionManager, provides single source of truth | |
| for payment operations. | |
| """ | |
| async def create_order( | |
| session: AsyncSession, | |
| user: User, | |
| package_id: str, | |
| credits_amount: int, | |
| amount_paise: int, | |
| currency: str, | |
| gateway: str, | |
| gateway_order_id: str, | |
| metadata: Optional[Dict[str, Any]] = None | |
| ) -> PaymentTransaction: | |
| """ | |
| Create a payment order. | |
| Args: | |
| session: Database session | |
| user: User making payment | |
| package_id: Package identifier (e.g., "basic_50") | |
| credits_amount: Number of credits | |
| amount_paise: Amount in paise | |
| currency: Currency code (INR) | |
| gateway: Payment gateway (razorpay) | |
| gateway_order_id: Gateway's order ID | |
| metadata: Additional metadata | |
| Returns: | |
| PaymentTransaction instance | |
| """ | |
| transaction = PaymentTransaction( | |
| transaction_id=f"txn_{uuid.uuid4().hex[:16]}", | |
| user_id=user.id, | |
| gateway=gateway, | |
| gateway_order_id=gateway_order_id, | |
| package_id=package_id, | |
| credits_amount=credits_amount, | |
| amount_paise=amount_paise, | |
| currency=currency, | |
| status="created", | |
| extra_data=metadata or {}, | |
| created_at=datetime.utcnow() | |
| ) | |
| session.add(transaction) | |
| logger.info( | |
| f"Created payment order: {transaction.transaction_id} " | |
| f"for user {user.id}, {credits_amount} credits, " | |
| f"₹{amount_paise/100:.2f}" | |
| ) | |
| return transaction | |
| async def verify_payment( | |
| session: AsyncSession, | |
| transaction_id: str, | |
| gateway_payment_id: str, | |
| verified_by: str = "signature", | |
| metadata: Optional[Dict[str, Any]] = None | |
| ) -> PaymentTransaction: | |
| """ | |
| Verify and mark payment as successful. | |
| Args: | |
| session: Database session | |
| transaction_id: Our transaction ID | |
| gateway_payment_id: Gateway's payment ID | |
| verified_by: Verification method (signature/webhook) | |
| metadata: Additional metadata | |
| Returns: | |
| Updated PaymentTransaction | |
| Raises: | |
| TransactionNotFoundError: If transaction not found | |
| """ | |
| # Get transaction | |
| query = select(PaymentTransaction).where( | |
| PaymentTransaction.transaction_id == transaction_id | |
| ) | |
| result = await session.execute(query) | |
| transaction = result.scalar_one_or_none() | |
| if not transaction: | |
| raise TransactionNotFoundError(f"Transaction {transaction_id} not found") | |
| # Update transaction | |
| transaction.gateway_payment_id = gateway_payment_id | |
| transaction.status = "paid" | |
| transaction.verified_by = verified_by | |
| transaction.paid_at = datetime.utcnow() | |
| if metadata: | |
| transaction.extra_data.update(metadata) | |
| logger.info( | |
| f"Verified payment: {transaction_id}, " | |
| f"payment_id: {gateway_payment_id}, " | |
| f"method: {verified_by}" | |
| ) | |
| return transaction | |
| async def mark_failed( | |
| session: AsyncSession, | |
| transaction_id: str, | |
| error_message: str, | |
| metadata: Optional[Dict[str, Any]] = None | |
| ) -> PaymentTransaction: | |
| """ | |
| Mark payment as failed. | |
| Args: | |
| session: Database session | |
| transaction_id: Our transaction ID | |
| error_message: Failure reason | |
| metadata: Additional metadata | |
| Returns: | |
| Updated PaymentTransaction | |
| """ | |
| query = select(PaymentTransaction).where( | |
| PaymentTransaction.transaction_id == transaction_id | |
| ) | |
| result = await session.execute(query) | |
| transaction = result.scalar_one_or_none() | |
| if not transaction: | |
| raise TransactionNotFoundError(f"Transaction {transaction_id} not found") | |
| transaction.status = "failed" | |
| transaction.error_message = error_message[:1000] # Truncate | |
| if metadata: | |
| transaction.extra_data.update(metadata) | |
| logger.warning(f"Payment failed: {transaction_id}, reason: {error_message}") | |
| return transaction | |
| async def get_transaction_history( | |
| session: AsyncSession, | |
| user_id: int, | |
| limit: int = 50, | |
| offset: int = 0, | |
| status: Optional[str] = None | |
| ) -> List[PaymentTransaction]: | |
| """ | |
| Get payment transaction history for a user. | |
| Args: | |
| session: Database session | |
| user_id: User ID | |
| limit: Maximum number of transactions | |
| offset: Offset for pagination | |
| status: Filter by status (paid/failed/created) | |
| Returns: | |
| List of PaymentTransaction objects | |
| """ | |
| query = select(PaymentTransaction).where( | |
| PaymentTransaction.user_id == user_id | |
| ) | |
| if status: | |
| query = query.where(PaymentTransaction.status == status) | |
| query = query.order_by( | |
| PaymentTransaction.created_at.desc() | |
| ).offset(offset).limit(limit) | |
| result = await session.execute(query) | |
| return list(result.scalars().all()) | |
| async def get_analytics( | |
| session: AsyncSession, | |
| user_id: Optional[int] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Get payment analytics. | |
| Args: | |
| session: Database session | |
| user_id: Optional user ID (if None, get global stats) | |
| Returns: | |
| Dict with analytics data | |
| """ | |
| # Base query | |
| base_query = select(PaymentTransaction) | |
| if user_id: | |
| base_query = base_query.where(PaymentTransaction.user_id == user_id) | |
| # Total transactions | |
| total_count = await session.scalar( | |
| select(func.count()).select_from(base_query.subquery()) | |
| ) | |
| # Successful transactions | |
| paid_query = base_query.where(PaymentTransaction.status == "paid") | |
| paid_count = await session.scalar( | |
| select(func.count()).select_from(paid_query.subquery()) | |
| ) | |
| # Total revenue (in paise) | |
| revenue_result = await session.scalar( | |
| select(func.sum(PaymentTransaction.amount_paise)).select_from( | |
| paid_query.subquery() | |
| ) | |
| ) | |
| total_revenue_paise = revenue_result or 0 | |
| # Total credits purchased | |
| credits_result = await session.scalar( | |
| select(func.sum(PaymentTransaction.credits_amount)).select_from( | |
| paid_query.subquery() | |
| ) | |
| ) | |
| total_credits = credits_result or 0 | |
| # Failed transactions | |
| failed_count = await session.scalar( | |
| select(func.count()).select_from( | |
| base_query.where(PaymentTransaction.status == "failed").subquery() | |
| ) | |
| ) | |
| # Success rate | |
| success_rate = (paid_count / total_count * 100) if total_count > 0 else 0 | |
| return { | |
| "total_transactions": total_count, | |
| "successful_payments": paid_count, | |
| "failed_payments": failed_count, | |
| "success_rate_percent": round(success_rate, 2), | |
| "total_revenue_paise": total_revenue_paise, | |
| "total_revenue_rupees": total_revenue_paise / 100, | |
| "total_credits_purchased": total_credits, | |
| "average_transaction_value_paise": ( | |
| total_revenue_paise // paid_count if paid_count > 0 else 0 | |
| ) | |
| } | |