Spaces:
Sleeping
Sleeping
| from fastapi import APIRouter, Form, Request,Depends,HTTPException | |
| from service import ChatService | |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| from request import RequestChat | |
| from typing import Optional | |
| from fastapi.requests import Request | |
| router = APIRouter() | |
| class JWTBearer(HTTPBearer): | |
| def __init__(self, auto_error: bool = True): | |
| super(JWTBearer, self).__init__(auto_error=auto_error) | |
| async def __call__(self, request: Request): | |
| credentials: HTTPAuthorizationCredentials = await super(JWTBearer, self).__call__(request) | |
| if credentials: | |
| if credentials.scheme != "Bearer": | |
| raise HTTPException(status_code=401, detail="Invalid authentication scheme.") | |
| return credentials.credentials | |
| else: | |
| raise HTTPException(status_code=401, detail="Invalid authorization code.") | |
| jwt_bearer = JWTBearer() | |
| import decode_token | |
| from service import RedirectService | |
| from typing import Optional | |
| from fastapi import Request, Query | |
| from fastapi.responses import JSONResponse | |
| async def zalo_callback_get( | |
| amount: Optional[int] = Query(None), | |
| appid: Optional[str] = Query(None), | |
| apptransid: Optional[str] = Query(None), | |
| bankcode: Optional[str] = Query(None), | |
| checksum: Optional[str] = Query(None), | |
| discountamount: Optional[int] = Query(None), | |
| pmcid: Optional[int] = Query(None), | |
| status: Optional[int] = Query(None) | |
| ): | |
| return await RedirectService.zalo_redirect(status,apptransid) | |
| async def zalo_callback_get( | |
| amount: Optional[int] = Query(None), | |
| appid: Optional[str] = Query(None), | |
| apptransid: Optional[str] = Query(None), | |
| bankcode: Optional[str] = Query(None), | |
| checksum: Optional[str] = Query(None), | |
| discountamount: Optional[int] = Query(None), | |
| pmcid: Optional[int] = Query(None), | |
| status: Optional[int] = Query(None) | |
| ): | |
| return await RedirectService.zalo_group_redirect(status,apptransid) | |
| async def zalo_web_callback_get(request: Request): | |
| return await RedirectService.zalo_web_redirect(request.query_params) | |
| async def return_url_handler_1(request: Request): | |
| query_params = request.query_params | |
| return await RedirectService.android_momo_redirect(query_params) | |
| async def return_url_handler_group_momo(request: Request): | |
| query_params = request.query_params | |
| return await RedirectService.android_group_momo_redirect(query_params) | |
| from urllib.parse import urlencode | |
| async def return_url_handler_web(request: Request): | |
| query_params = request.query_params | |
| return await RedirectService.web_momo_redirect(query_params) | |
| from typing import Dict | |
| async def vnpay_ipn_redirect(request: Request): | |
| params: Dict[str, str] = dict(request.query_params) | |
| return await RedirectService.android_vnpay_ipn_redirect(params) | |
| async def vnpay_ipn_redirect(request: Request): | |
| params: Dict[str, str] = dict(request.query_params) | |
| return await RedirectService.android_vnpay_group_ipn_redirect(params) | |
| async def vnpay_ipn_redirect(request: Request): | |
| query_params = request.query_params | |
| return await RedirectService.web_vnpay_ipn_redirect(query_params) |