File size: 5,909 Bytes
5564ecb
 
 
 
 
 
 
 
 
 
 
 
 
 
dd9b2b9
5564ecb
 
 
 
 
 
 
 
74168a7
 
 
5564ecb
312de80
e29792b
74168a7
b8799b3
 
 
 
 
74168a7
 
 
 
 
 
 
 
 
 
5564ecb
 
 
 
 
78566ad
5564ecb
 
 
 
dd9b2b9
5564ecb
 
 
 
 
 
 
 
 
da67be0
e29792b
da67be0
 
 
 
 
 
 
 
 
 
 
 
 
5564ecb
 
 
 
 
 
dd9b2b9
5564ecb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dd9b2b9
5564ecb
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import httpx
from core.conf import settings
from typing import Optional, Dict, Any
from core.token_manager import get_access_token

API_BASE_URL = settings.API_BASE_URL

class API():
    @staticmethod
    async def get(endpoint: str, params: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None, api_base: str = None):
        if api_base:
            url = f"{api_base}{endpoint}"
        else:
            url = f"{API_BASE_URL}{endpoint}"
        access_token = await get_access_token()
        headers = headers or {}
        headers.setdefault("Content-Type", "application/json")
        headers["Authorization"] = f"Bearer {access_token}"
        async with httpx.AsyncClient() as client:
            try:
                response = await client.get(url, headers=headers, params=params)
                response.raise_for_status()
                return response.json()
            # except httpx.HTTPStatusError as http_err:
            #     return {"error": f"HTTP {http_err.response.status_code}: {http_err.response.text}"}

            except httpx.HTTPStatusError as http_err:
                ## RETRY FOR 403 ERROR
                if http_err.response.status_code == 403 or http_err.response.status_code == 401:
                    headers.pop("Authorization", None)
                    
                    if http_err.response.status_code == 403:
                        headers["x-access-token"] = access_token
                        headers["token_type"] = "anonymous"
                        
                    try:
                        response = await client.get(url, headers=headers, params=params)
                        response.raise_for_status()
                        return response.json()
                    except httpx.HTTPStatusError as second_err:
                        return {"error": f"[403 RETRY] HTTP {second_err.response.status_code}: {second_err.response.text}"}
                    except Exception as second_err:
                        return {"error": f"[403 RETRY] Request failed: {str(second_err)}"}
                else:
                    return {"error": f"HTTP {http_err.response.status_code}: {http_err.response.text}"}
            except Exception as err:
                return {"error": f"Request failed: {str(err)}"}

    @staticmethod
    async def post(endpoint: str , payload: Dict[str, Any] = None,headers: Optional[Dict[str, str]] = None, api_base: str = None):
        url = ""
        if api_base:
            url = f"{api_base}{endpoint}"
        else:
            url = f"{API_BASE_URL}{endpoint}"
        access_token = await get_access_token()
        headers = headers or {}
        headers.setdefault("Content-Type", "application/json")
        headers["Authorization"] = f"Bearer {access_token}"
        async with httpx.AsyncClient() as client:
            try:
                response = await client.post(url, json=payload, headers=headers)
                response.raise_for_status()
                return response.json()
            except httpx.HTTPStatusError as http_err:
                ## RETRY FOR 403 ERROR
                if http_err.response.status_code == 403 or http_err.response.status_code == 401:
                    headers.pop("Authorization", None)
                    headers["x-access-token"] = access_token
                    headers["token_type"] = "anonymous"
                    try:
                        response = await client.post(url, json=payload, headers=headers)
                        response.raise_for_status()
                        return response.json()
                    except httpx.HTTPStatusError as second_err:
                        return {"error": f"[403 RETRY] HTTP {second_err.response.status_code}: {second_err.response.text}"}
                    except Exception as second_err:
                        return {"error": f"[403 RETRY] Request failed: {str(second_err)}"}
                else:
                    return {"error": f"HTTP {http_err.response.status_code}: {http_err.response.text}"}
            except Exception as err:
                return {"error": f"Request failed: {str(err)}"}

    @staticmethod
    async def put_api(endpoint: str, payload: Dict[str, Any], headers: Optional[Dict[str, str]] = None):
        url = f"{API_BASE_URL}{endpoint}"
        access_token = await get_access_token()
        headers = headers or {}
        headers.setdefault("Content-Type", "application/json")
        headers["Authorization"] = f"Bearer {access_token}"
        async with httpx.AsyncClient() as client:
            try:
                response = await client.put(url, json=payload, headers=headers)
                response.raise_for_status()
                return response.json()
            except httpx.HTTPStatusError as http_err:
                return {"error": f"HTTP {http_err.response.status_code}: {http_err.response.text}"}
            except Exception as err:
                return {"error": f"Request failed: {str(err)}"}

    @staticmethod
    async def delete_api(endpoint: str, params: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None):
        url = f"{API_BASE_URL}{endpoint}"
        access_token = await get_access_token()
        headers = headers or {}
        headers.setdefault("Content-Type", "application/json")
        headers["Authorization"] = f"Bearer {access_token}"
        async with httpx.AsyncClient() as client:
            try:
                response = await client.delete(url, headers=headers, params=params)
                response.raise_for_status()
                return response.json()
            except httpx.HTTPStatusError as http_err:
                return {"error": f"HTTP {http_err.response.status_code}: {http_err.response.text}"}
            except Exception as err:
                return {"error": f"Request failed: {str(err)}"}

api: API = API()