Spaces:
Sleeping
Sleeping
| """ | |
| Payments Router - API endpoints for credit purchases via Razorpay. | |
| Endpoints: | |
| - GET /payments/packages - List available credit packages | |
| - POST /payments/create-order - Create a Razorpay order | |
| - POST /payments/verify - Verify payment and add credits | |
| - POST /payments/webhook/razorpay - Handle Razorpay webhooks | |
| - GET /payments/history - Get user's payment history | |
| """ | |
| import logging | |
| import uuid | |
| from datetime import datetime | |
| from typing import List, Optional | |
| from fastapi import APIRouter, Depends, HTTPException, Query, Request, status, BackgroundTasks | |
| from pydantic import BaseModel | |
| from sqlalchemy import select, desc, func | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from core.database import get_db | |
| from core.models import User, PaymentTransaction | |
| from services.drive_service import DriveService | |
| from services.razorpay_service import ( | |
| RazorpayService, | |
| RazorpayConfigError, | |
| RazorpayOrderError, | |
| RazorpayVerificationError, | |
| get_razorpay_service, | |
| is_razorpay_configured, | |
| get_package, | |
| list_packages, | |
| CREDIT_PACKAGES | |
| ) | |
| logger = logging.getLogger(__name__) | |
| router = APIRouter(prefix="/payments", tags=["payments"]) | |
| drive_service = DriveService() | |
| # ============================================================================= | |
| # Request/Response Models | |
| # ============================================================================= | |
| class PackageResponse(BaseModel): | |
| """Credit package details.""" | |
| id: str | |
| name: str | |
| credits: int | |
| amount_paise: int | |
| amount_rupees: float | |
| currency: str | |
| class PackagesListResponse(BaseModel): | |
| """List of available packages.""" | |
| packages: List[PackageResponse] | |
| class CreateOrderRequest(BaseModel): | |
| """Request to create a payment order.""" | |
| package_id: str | |
| class CreateOrderResponse(BaseModel): | |
| """Response with Razorpay order details.""" | |
| transaction_id: str | |
| razorpay_order_id: str | |
| amount_paise: int | |
| currency: str | |
| key_id: str | |
| package_id: str | |
| credits: int | |
| class VerifyPaymentRequest(BaseModel): | |
| """Request to verify a payment.""" | |
| razorpay_order_id: str | |
| razorpay_payment_id: str | |
| razorpay_signature: str | |
| class VerifyPaymentResponse(BaseModel): | |
| """Response after payment verification.""" | |
| success: bool | |
| message: str | |
| transaction_id: str | |
| credits_added: int | |
| new_balance: int | |
| class PaymentHistoryItem(BaseModel): | |
| """Single payment in history.""" | |
| transaction_id: str | |
| razorpay_payment_id: Optional[str] = None # Razorpay payment ID | |
| package_id: str | |
| credits_amount: int | |
| amount_paise: int | |
| currency: str | |
| status: str | |
| gateway: str | |
| created_at: str | |
| paid_at: Optional[str] = None | |
| error_message: Optional[str] = None # Failure reason from webhook | |
| class PaymentHistoryResponse(BaseModel): | |
| """Payment history response with pagination.""" | |
| transactions: List[PaymentHistoryItem] | |
| total_count: int | |
| page: int | |
| limit: int | |
| # ============================================================================= | |
| # Helper Functions | |
| # ============================================================================= | |
| def generate_transaction_id() -> str: | |
| """Generate a unique transaction ID.""" | |
| return f"txn_{uuid.uuid4().hex[:16]}" | |
| def update_verified_by(transaction: PaymentTransaction, source: str) -> bool: | |
| """ | |
| Update the verified_by field based on the verification source. | |
| Args: | |
| transaction: The payment transaction to update | |
| source: Either "client" or "webhook" | |
| Returns: | |
| True if the value was changed, False otherwise | |
| """ | |
| current = transaction.verified_by | |
| other = "webhook" if source == "client" else "client" | |
| if current == other: | |
| # Already verified by the other source, mark as "both" | |
| transaction.verified_by = "both" | |
| return True | |
| elif current is None or current == source: | |
| # First verification or same source | |
| transaction.verified_by = source | |
| return current != source | |
| # Already "both" or same value - no change needed | |
| return False | |
| async def process_successful_payment( | |
| transaction: PaymentTransaction, | |
| user: User, | |
| payment_id: str, | |
| source: str, | |
| db: AsyncSession, | |
| signature: Optional[str] = None | |
| ) -> int: | |
| """ | |
| Process a successful payment - update transaction and add credits. | |
| Args: | |
| transaction: The payment transaction | |
| user: The user to credit | |
| payment_id: Gateway payment ID | |
| source: "client" or "webhook" | |
| db: Database session | |
| signature: Optional Razorpay signature (client-side only) | |
| Returns: | |
| Number of credits added (0 if already processed) | |
| """ | |
| credits_added = 0 | |
| if transaction.status != "paid": | |
| # First time processing - add credits using transaction manager | |
| from services.credit_service import CreditTransactionManager | |
| await CreditTransactionManager.add_credits( | |
| session=db, | |
| user=user, | |
| amount=transaction.credits_amount, | |
| source="payment", | |
| reference_type="payment", | |
| reference_id=transaction.transaction_id, | |
| reason=f"Purchase: {transaction.package_id}", | |
| metadata={ | |
| "package_id": transaction.package_id, | |
| "gateway": "razorpay", | |
| "gateway_payment_id": payment_id, | |
| "amount_paise": transaction.amount_paise, | |
| "verified_by": source, | |
| "signature": signature | |
| } | |
| ) | |
| transaction.status = "paid" | |
| transaction.gateway_payment_id = payment_id | |
| transaction.paid_at = datetime.utcnow() | |
| if signature: | |
| transaction.razorpay_signature = signature | |
| update_verified_by(transaction, source) | |
| credits_added = transaction.credits_amount | |
| logger.info( | |
| f"Payment processed ({source}): {transaction.transaction_id}, " | |
| f"added {credits_added} credits to user {user.user_id}, " | |
| f"new balance: {user.credits}" | |
| ) | |
| else: | |
| # Already paid - just update verified_by if needed | |
| if update_verified_by(transaction, source): | |
| logger.info( | |
| f"Payment {transaction.transaction_id} was already verified, " | |
| f"{source} also verified - marked as '{transaction.verified_by}'" | |
| ) | |
| await db.commit() | |
| return credits_added | |
| # ============================================================================= | |
| # Endpoints | |
| # ============================================================================= | |
| async def get_packages(): | |
| """ | |
| List all available credit packages. | |
| No authentication required - this is public info for pricing display. | |
| """ | |
| packages = list_packages() | |
| return PackagesListResponse( | |
| packages=[PackageResponse(**pkg) for pkg in packages] | |
| ) | |
| async def create_order( | |
| req: Request, | |
| request: CreateOrderRequest, | |
| db: AsyncSession = Depends(get_db) | |
| ): | |
| """ | |
| Create a Razorpay order for credit purchase. | |
| The client should use the returned order_id to open | |
| Razorpay checkout. After payment, call /verify endpoint. | |
| Auth handled by AuthMiddleware - user in request.state.user | |
| """ | |
| user = req.state.user | |
| # Check if Razorpay is configured | |
| if not is_razorpay_configured(): | |
| raise HTTPException( | |
| status_code=status.HTTP_503_SERVICE_UNAVAILABLE, | |
| detail="Payment service is not configured" | |
| ) | |
| # Get package | |
| package = get_package(request.package_id) | |
| if not package: | |
| available = list(CREDIT_PACKAGES.keys()) | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=f"Invalid package ID. Available packages: {available}" | |
| ) | |
| try: | |
| # Initialize Razorpay service | |
| razorpay_service = get_razorpay_service() | |
| # Generate our transaction ID | |
| transaction_id = generate_transaction_id() | |
| # Create Razorpay order | |
| order = razorpay_service.create_order( | |
| amount_paise=package.amount_paise, | |
| transaction_id=transaction_id, | |
| currency=package.currency, | |
| notes={ | |
| "user_id": user.user_id, | |
| "package_id": package.id, | |
| "credits": str(package.credits) | |
| } | |
| ) | |
| # Save transaction to database | |
| transaction = PaymentTransaction( | |
| transaction_id=transaction_id, | |
| user_id=user.id, # Integer FK to users.id | |
| gateway="razorpay", | |
| gateway_order_id=order["id"], | |
| package_id=package.id, | |
| credits_amount=package.credits, | |
| amount_paise=package.amount_paise, | |
| currency=package.currency, | |
| status="created", | |
| extra_data={"razorpay_order": order} | |
| ) | |
| db.add(transaction) | |
| await db.commit() | |
| logger.info( | |
| f"Created payment order: {transaction_id} for user {user.user_id}, " | |
| f"package={package.id}, amount={package.amount_paise}" | |
| ) | |
| return CreateOrderResponse( | |
| transaction_id=transaction_id, | |
| razorpay_order_id=order["id"], | |
| amount_paise=package.amount_paise, | |
| currency=package.currency, | |
| key_id=razorpay_service.key_id, | |
| package_id=package.id, | |
| credits=package.credits | |
| ) | |
| except RazorpayConfigError as e: | |
| logger.error(f"Razorpay config error: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_503_SERVICE_UNAVAILABLE, | |
| detail="Payment service configuration error" | |
| ) | |
| except RazorpayOrderError as e: | |
| logger.error(f"Razorpay order error: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Failed to create payment order" | |
| ) | |
| async def verify_payment( | |
| req: Request, | |
| request: VerifyPaymentRequest, | |
| background_tasks: BackgroundTasks, | |
| db: AsyncSession = Depends(get_db) | |
| ): | |
| """ | |
| Verify Razorpay payment and add credits. | |
| Called after successful Razorpay checkout. | |
| Verifies the payment signature and credits the user. | |
| Auth handled by AuthMiddleware - user in request.state.user | |
| """ | |
| user = req.state.user | |
| try: | |
| razorpay_service = get_razorpay_service() | |
| # Find the transaction | |
| result = await db.execute( | |
| select(PaymentTransaction).where( | |
| PaymentTransaction.gateway_order_id == request.razorpay_order_id, | |
| PaymentTransaction.user_id == user.id # Integer FK comparison | |
| ) | |
| ) | |
| transaction = result.scalar_one_or_none() | |
| if not transaction: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="Transaction not found" | |
| ) | |
| # Check if already processed - still track verification source | |
| if transaction.status == "paid": | |
| update_verified_by(transaction, "client") | |
| await db.commit() | |
| return VerifyPaymentResponse( | |
| success=True, | |
| message="Payment already processed", | |
| transaction_id=transaction.transaction_id, | |
| credits_added=0, | |
| new_balance=user.credits | |
| ) | |
| # Security Check: Ensure payment_id hasn't been used for another transaction | |
| # This prevents replay attacks where a valid payment_id is used for multiple orders | |
| existing_usage = await db.execute( | |
| select(PaymentTransaction).where( | |
| PaymentTransaction.gateway_payment_id == request.razorpay_payment_id, | |
| PaymentTransaction.status == "paid" | |
| ) | |
| ) | |
| if existing_usage.scalar_one_or_none(): | |
| logger.warning( | |
| f"Payment replay attempt: Payment ID {request.razorpay_payment_id} " | |
| f"already used for another transaction. User: {user.user_id}" | |
| ) | |
| raise HTTPException( | |
| status_code=status.HTTP_409_CONFLICT, | |
| detail="Payment ID already used" | |
| ) | |
| # Verify signature | |
| is_valid = razorpay_service.verify_payment_signature( | |
| order_id=request.razorpay_order_id, | |
| payment_id=request.razorpay_payment_id, | |
| signature=request.razorpay_signature | |
| ) | |
| if not is_valid: | |
| transaction.status = "failed" | |
| transaction.error_message = "Invalid payment signature" | |
| await db.commit() | |
| logger.warning( | |
| f"Invalid payment signature for transaction {transaction.transaction_id}" | |
| ) | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Payment verification failed" | |
| ) | |
| # Process the payment using helper function | |
| credits_added = await process_successful_payment( | |
| transaction=transaction, | |
| user=user, | |
| payment_id=request.razorpay_payment_id, | |
| source="client", | |
| db=db, | |
| signature=request.razorpay_signature | |
| ) | |
| # Sync DB to Drive (Async) | |
| from services.backup_service import get_backup_service | |
| backup_service = get_backup_service() | |
| background_tasks.add_task(backup_service.backup_async) | |
| return VerifyPaymentResponse( | |
| success=True, | |
| message="Payment successful! Credits added.", | |
| transaction_id=transaction.transaction_id, | |
| credits_added=credits_added, | |
| new_balance=user.credits | |
| ) | |
| except RazorpayVerificationError as e: | |
| logger.error(f"Payment verification error: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Payment verification failed" | |
| ) | |
| async def razorpay_webhook( | |
| request: Request, | |
| background_tasks: BackgroundTasks, | |
| db: AsyncSession = Depends(get_db) | |
| ): | |
| """ | |
| Handle Razorpay webhook events. | |
| This provides backup verification in case the client-side | |
| verification fails (e.g., network issues after payment). | |
| Razorpay will retry webhooks for up to 24 hours if not acknowledged. | |
| """ | |
| # Get signature from headers | |
| signature = request.headers.get("X-Razorpay-Signature") | |
| if not signature: | |
| logger.warning("Webhook received without signature") | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Missing webhook signature" | |
| ) | |
| # Get raw body for signature verification | |
| body = await request.body() | |
| try: | |
| razorpay_service = get_razorpay_service() | |
| # Verify webhook signature | |
| if not razorpay_service.verify_webhook_signature(body, signature): | |
| logger.warning("Invalid webhook signature") | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Invalid webhook signature" | |
| ) | |
| # Parse webhook payload | |
| import json | |
| payload = json.loads(body) | |
| event = payload.get("event") | |
| logger.info(f"Received Razorpay webhook: {event}") | |
| # Handle payment captured event | |
| if event == "payment.captured": | |
| payment = payload.get("payload", {}).get("payment", {}).get("entity", {}) | |
| order_id = payment.get("order_id") | |
| payment_id = payment.get("id") | |
| if order_id: | |
| # Find transaction | |
| result = await db.execute( | |
| select(PaymentTransaction).where( | |
| PaymentTransaction.gateway_order_id == order_id | |
| ) | |
| ) | |
| transaction = result.scalar_one_or_none() | |
| if transaction: | |
| # Find user | |
| user_result = await db.execute( | |
| select(User).where(User.id == transaction.user_id) # Integer FK lookup | |
| ) | |
| user = user_result.scalar_one_or_none() | |
| if user: | |
| # Process payment using helper function | |
| await process_successful_payment( | |
| transaction=transaction, | |
| user=user, | |
| payment_id=payment_id, | |
| source="webhook", | |
| db=db | |
| ) | |
| # Sync DB to Drive (Async) | |
| from services.backup_service import get_backup_service | |
| backup_service = get_backup_service() | |
| background_tasks.add_task(backup_service.backup_async) | |
| # Handle payment failed event | |
| elif event == "payment.failed": | |
| payment = payload.get("payload", {}).get("payment", {}).get("entity", {}) | |
| order_id = payment.get("order_id") | |
| # Razorpay provides detailed error information | |
| error_code = payment.get("error_code", "") | |
| error_description = payment.get("error_description", "Payment failed") | |
| error_reason = payment.get("error_reason", "") | |
| error_source = payment.get("error_source", "") | |
| # Build a comprehensive error message | |
| error_parts = [] | |
| if error_description: | |
| error_parts.append(error_description) | |
| if error_reason and error_reason != error_description: | |
| error_parts.append(f"Reason: {error_reason}") | |
| if error_source: | |
| error_parts.append(f"Source: {error_source}") | |
| if error_code: | |
| error_parts.append(f"Code: {error_code}") | |
| full_error_message = " | ".join(error_parts) if error_parts else "Payment failed" | |
| if order_id: | |
| result = await db.execute( | |
| select(PaymentTransaction).where( | |
| PaymentTransaction.gateway_order_id == order_id | |
| ) | |
| ) | |
| transaction = result.scalar_one_or_none() | |
| if transaction and transaction.status == "created": | |
| transaction.status = "failed" | |
| transaction.error_message = full_error_message | |
| await db.commit() | |
| logger.info( | |
| f"Webhook: Marked transaction {transaction.transaction_id} as failed: {full_error_message}" | |
| ) | |
| return {"status": "ok"} | |
| except RazorpayConfigError: | |
| logger.error("Razorpay not configured for webhook processing") | |
| raise HTTPException( | |
| status_code=status.HTTP_503_SERVICE_UNAVAILABLE, | |
| detail="Payment service not configured" | |
| ) | |
| async def get_payment_history( | |
| req: Request, | |
| page: int = Query(1, ge=1, description="Page number"), | |
| limit: int = Query(20, ge=1, le=100, description="Items per page"), | |
| db: AsyncSession = Depends(get_db) | |
| ): | |
| """ | |
| Get user's payment history with pagination. | |
| Returns payment transactions ordered by newest first. | |
| Auth handled by AuthMiddleware - user in request.state.user | |
| """ | |
| user = req.state.user | |
| # Get total count | |
| count_result = await db.execute( | |
| select(func.count(PaymentTransaction.id)) | |
| .where(PaymentTransaction.user_id == user.id) # Integer FK comparison | |
| ) | |
| total_count = count_result.scalar() or 0 | |
| # Calculate offset | |
| offset = (page - 1) * limit | |
| # Get paginated transactions | |
| result = await db.execute( | |
| select(PaymentTransaction) | |
| .where(PaymentTransaction.user_id == user.id) # Integer FK comparison | |
| .order_by(desc(PaymentTransaction.created_at)) | |
| .offset(offset) | |
| .limit(limit) | |
| ) | |
| transactions = result.scalars().all() | |
| history = [] | |
| for txn in transactions: | |
| history.append(PaymentHistoryItem( | |
| transaction_id=txn.transaction_id, | |
| razorpay_payment_id=txn.gateway_payment_id, | |
| package_id=txn.package_id, | |
| credits_amount=txn.credits_amount, | |
| amount_paise=txn.amount_paise, | |
| currency=txn.currency, | |
| status=txn.status, | |
| gateway=txn.gateway, | |
| created_at=txn.created_at.isoformat() if txn.created_at else None, | |
| paid_at=txn.paid_at.isoformat() if txn.paid_at else None, | |
| error_message=txn.error_message | |
| )) | |
| return PaymentHistoryResponse( | |
| transactions=history, | |
| total_count=total_count, | |
| page=page, | |
| limit=limit | |
| ) | |
| async def get_payment_analytics( | |
| request: Request, | |
| db: AsyncSession = Depends(get_db) | |
| ): | |
| """ | |
| Get payment analytics for the current user. | |
| Returns: | |
| Analytics including total revenue, success rate, etc. | |
| """ | |
| user = request.state.user | |
| from services.payment_service import PaymentTransactionManager | |
| analytics = await PaymentTransactionManager.get_analytics( | |
| session=db, | |
| user_id=user.id | |
| ) | |
| return analytics | |