File size: 6,561 Bytes
5564ecb
df37f6e
5564ecb
 
 
 
 
df37f6e
5564ecb
df37f6e
 
5564ecb
df37f6e
 
 
 
 
 
5564ecb
 
 
 
dd9b2b9
5564ecb
 
 
 
 
 
 
 
74168a7
 
 
5564ecb
312de80
df37f6e
 
 
 
74168a7
df37f6e
b8799b3
 
 
df37f6e
74168a7
 
 
 
 
df37f6e
 
 
74168a7
df37f6e
 
 
74168a7
df37f6e
 
 
5564ecb
 
 
 
df37f6e
 
 
 
 
 
78566ad
5564ecb
 
 
 
dd9b2b9
5564ecb
 
 
 
 
 
 
 
 
da67be0
df37f6e
 
 
 
da67be0
 
 
 
 
 
 
 
df37f6e
 
 
da67be0
df37f6e
 
 
da67be0
df37f6e
 
 
5564ecb
 
 
 
df37f6e
 
 
5564ecb
dd9b2b9
5564ecb
 
 
 
 
 
 
 
 
df37f6e
 
 
5564ecb
 
 
 
df37f6e
 
 
 
 
5564ecb
dd9b2b9
5564ecb
 
 
 
 
 
 
 
 
df37f6e
 
 
5564ecb
 
 
df37f6e
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
import httpx
from loguru import logger
from core.conf import settings
from typing import Optional, Dict, Any
from core.token_manager import get_access_token

API_BASE_URL = settings.API_BASE_URL
# API_BASE_URL = "https://api.futabus.vn"


class API:
    @staticmethod
    async def get(
        endpoint: str,
        params: Optional[Dict[str, Any]] = None,
        headers: Optional[Dict[str, str]] = None,
        api_base: str = None,
    ):
        if api_base:
            url = f"{api_base}{endpoint}"
        else:
            url = f"{API_BASE_URL}{endpoint}"
        access_token = await get_access_token()
        headers = headers or {}
        headers.setdefault("Content-Type", "application/json")
        headers["Authorization"] = f"Bearer {access_token}"
        async with httpx.AsyncClient() as client:
            try:
                response = await client.get(url, headers=headers, params=params)
                response.raise_for_status()
                return response.json()
            # except httpx.HTTPStatusError as http_err:
            #     return {"error": f"HTTP {http_err.response.status_code}: {http_err.response.text}"}

            except httpx.HTTPStatusError as http_err:
                ## RETRY FOR 403 ERROR
                if (
                    http_err.response.status_code == 403
                    or http_err.response.status_code == 401
                ):
                    headers.pop("Authorization", None)

                    if http_err.response.status_code == 403:
                        headers["x-access-token"] = access_token
                        headers["token_type"] = "anonymous"

                    try:
                        response = await client.get(url, headers=headers, params=params)
                        response.raise_for_status()
                        return response.json()
                    except httpx.HTTPStatusError as second_err:
                        return {
                            "error": f"[403 RETRY] HTTP {second_err.response.status_code}: {second_err.response.text}"
                        }
                    except Exception as second_err:
                        return {
                            "error": f"[403 RETRY] Request failed: {str(second_err)}"
                        }
                else:
                    return {
                        "error": f"HTTP {http_err.response.status_code}: {http_err.response.text}"
                    }
            except Exception as err:
                return {"error": f"Request failed: {str(err)}"}

    @staticmethod
    async def post(
        endpoint: str,
        payload: Dict[str, Any] = None,
        headers: Optional[Dict[str, str]] = None,
        api_base: str = None,
    ):
        url = ""
        if api_base:
            url = f"{api_base}{endpoint}"
        else:
            url = f"{API_BASE_URL}{endpoint}"
        access_token = await get_access_token()
        headers = headers or {}
        headers.setdefault("Content-Type", "application/json")
        headers["Authorization"] = f"Bearer {access_token}"
        async with httpx.AsyncClient() as client:
            try:
                response = await client.post(url, json=payload, headers=headers)
                response.raise_for_status()
                return response.json()
            except httpx.HTTPStatusError as http_err:
                ## RETRY FOR 403 ERROR
                if (
                    http_err.response.status_code == 403
                    or http_err.response.status_code == 401
                ):
                    headers.pop("Authorization", None)
                    headers["x-access-token"] = access_token
                    headers["token_type"] = "anonymous"
                    try:
                        response = await client.post(url, json=payload, headers=headers)
                        response.raise_for_status()
                        return response.json()
                    except httpx.HTTPStatusError as second_err:
                        return {
                            "error": f"[403 RETRY] HTTP {second_err.response.status_code}: {second_err.response.text}"
                        }
                    except Exception as second_err:
                        return {
                            "error": f"[403 RETRY] Request failed: {str(second_err)}"
                        }
                else:
                    return {
                        "error": f"HTTP {http_err.response.status_code}: {http_err.response.text}"
                    }
            except Exception as err:
                return {"error": f"Request failed: {str(err)}"}

    @staticmethod
    async def put_api(
        endpoint: str, payload: Dict[str, Any], headers: Optional[Dict[str, str]] = None
    ):
        url = f"{API_BASE_URL}{endpoint}"
        access_token = await get_access_token()
        headers = headers or {}
        headers.setdefault("Content-Type", "application/json")
        headers["Authorization"] = f"Bearer {access_token}"
        async with httpx.AsyncClient() as client:
            try:
                response = await client.put(url, json=payload, headers=headers)
                response.raise_for_status()
                return response.json()
            except httpx.HTTPStatusError as http_err:
                return {
                    "error": f"HTTP {http_err.response.status_code}: {http_err.response.text}"
                }
            except Exception as err:
                return {"error": f"Request failed: {str(err)}"}

    @staticmethod
    async def delete_api(
        endpoint: str,
        params: Optional[Dict[str, Any]] = None,
        headers: Optional[Dict[str, str]] = None,
    ):
        url = f"{API_BASE_URL}{endpoint}"
        access_token = await get_access_token()
        headers = headers or {}
        headers.setdefault("Content-Type", "application/json")
        headers["Authorization"] = f"Bearer {access_token}"
        async with httpx.AsyncClient() as client:
            try:
                response = await client.delete(url, headers=headers, params=params)
                response.raise_for_status()
                return response.json()
            except httpx.HTTPStatusError as http_err:
                return {
                    "error": f"HTTP {http_err.response.status_code}: {http_err.response.text}"
                }
            except Exception as err:
                return {"error": f"Request failed: {str(err)}"}


api: API = API()