Ali2206 commited on
Commit
1fbdffc
·
verified ·
1 Parent(s): 61dcdad

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +79 -36
app.py CHANGED
@@ -6,7 +6,8 @@ import gradio as gr
6
  import requests
7
  import logging
8
  import time
9
- from typing import Optional
 
10
 
11
  # Configure logging
12
  logging.basicConfig(level=logging.DEBUG)
@@ -39,58 +40,99 @@ class TokenManager:
39
  self.last_obtained: float = 0
40
  self.token_expiry: int = 3600 # 1 hour default expiry
41
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
42
  def get_token(self, force_refresh: bool = False) -> str:
43
  current_time = time.time()
44
 
45
  if not force_refresh and self.token and (current_time - self.last_obtained) < self.token_expiry:
46
  return self.token
47
 
 
 
 
 
 
 
48
  for attempt in range(MAX_TOKEN_RETRIES):
49
  try:
50
- login_payload = {
51
- "username": ADMIN_EMAIL,
52
- "password": ADMIN_PASSWORD,
53
- "device_token": "admin-device-token"
54
- }
55
-
56
  logger.debug(f"Attempting to obtain admin token (attempt {attempt + 1})")
57
 
58
- # Ensure proper headers for JSON content
59
- headers = {
60
- "Content-Type": "application/json",
61
- "Accept": "application/json"
62
- }
63
-
64
- res = requests.post(
65
- f"{BACKEND_URL}/auth/login",
66
- json=login_payload, # Using json parameter automatically sets content-type
67
- headers=headers,
68
- timeout=10
69
- )
70
-
71
- logger.debug(f"Response status: {res.status_code}, content: {res.text}")
72
-
73
- if res.status_code == 200:
74
- token_data = res.json()
75
- self.token = token_data.get("access_token")
76
- if not self.token:
77
- raise Exception("No access_token in response")
78
-
79
  self.last_obtained = current_time
80
- self.token_expiry = token_data.get("expires_in", 3600)
81
  logger.info("Successfully obtained admin token")
82
  return self.token
83
 
84
- logger.error(f"Token request failed: {res.status_code} - {res.text}")
85
- if res.status_code == 422:
86
- raise Exception(f"Validation error: {res.json().get('detail', 'Unknown validation error')}")
87
- if res.status_code == 401:
88
- raise Exception("Invalid admin credentials")
89
 
90
  except requests.exceptions.RequestException as e:
91
  logger.error(f"Network error during token fetch: {str(e)}")
92
  if attempt < MAX_TOKEN_RETRIES - 1:
93
- time.sleep(TOKEN_RETRY_DELAY * (attempt + 1)) # Exponential backoff
94
  continue
95
  except Exception as e:
96
  logger.error(f"Unexpected error during token fetch: {str(e)}")
@@ -98,10 +140,11 @@ class TokenManager:
98
  time.sleep(TOKEN_RETRY_DELAY)
99
  continue
100
 
101
- raise Exception("Failed to obtain admin token after multiple attempts")
102
 
103
  token_manager = TokenManager()
104
 
 
105
  @app.get("/")
106
  def root():
107
  logger.debug("Root endpoint accessed")
 
6
  import requests
7
  import logging
8
  import time
9
+ from typing import Optional, Dict, Any
10
+ import json
11
 
12
  # Configure logging
13
  logging.basicConfig(level=logging.DEBUG)
 
40
  self.last_obtained: float = 0
41
  self.token_expiry: int = 3600 # 1 hour default expiry
42
 
43
+ def _make_login_request(self, payload: Dict[str, Any]) -> Optional[str]:
44
+ """Helper method to make the login request with different approaches"""
45
+ approaches = [
46
+ self._try_json_request,
47
+ self._try_form_data_request,
48
+ self._try_urlencoded_request
49
+ ]
50
+
51
+ for approach in approaches:
52
+ try:
53
+ result = approach(payload)
54
+ if result:
55
+ return result
56
+ except Exception as e:
57
+ logger.warning(f"Approach failed: {approach.__name__}, error: {str(e)}")
58
+ continue
59
+
60
+ return None
61
+
62
+ def _try_json_request(self, payload: Dict[str, Any]) -> Optional[str]:
63
+ """Try sending as JSON with proper headers"""
64
+ headers = {
65
+ "Content-Type": "application/json",
66
+ "Accept": "application/json"
67
+ }
68
+ res = requests.post(
69
+ f"{BACKEND_URL}/auth/login",
70
+ json=payload,
71
+ headers=headers,
72
+ timeout=10
73
+ )
74
+ return self._process_response(res)
75
+
76
+ def _try_form_data_request(self, payload: Dict[str, Any]) -> Optional[str]:
77
+ """Try sending as form data"""
78
+ res = requests.post(
79
+ f"{BACKEND_URL}/auth/login",
80
+ data=payload,
81
+ timeout=10
82
+ )
83
+ return self._process_response(res)
84
+
85
+ def _try_urlencoded_request(self, payload: Dict[str, Any]) -> Optional[str]:
86
+ """Try sending as x-www-form-urlencoded"""
87
+ headers = {
88
+ "Content-Type": "application/x-www-form-urlencoded",
89
+ }
90
+ res = requests.post(
91
+ f"{BACKEND_URL}/auth/login",
92
+ data=payload,
93
+ headers=headers,
94
+ timeout=10
95
+ )
96
+ return self._process_response(res)
97
+
98
+ def _process_response(self, response: requests.Response) -> Optional[str]:
99
+ """Process the response and extract token if successful"""
100
+ if response.status_code == 200:
101
+ token_data = response.json()
102
+ return token_data.get("access_token")
103
+ logger.error(f"Request failed: {response.status_code} - {response.text}")
104
+ return None
105
+
106
  def get_token(self, force_refresh: bool = False) -> str:
107
  current_time = time.time()
108
 
109
  if not force_refresh and self.token and (current_time - self.last_obtained) < self.token_expiry:
110
  return self.token
111
 
112
+ login_payload = {
113
+ "username": ADMIN_EMAIL,
114
+ "password": ADMIN_PASSWORD,
115
+ "device_token": "admin-device-token"
116
+ }
117
+
118
  for attempt in range(MAX_TOKEN_RETRIES):
119
  try:
 
 
 
 
 
 
120
  logger.debug(f"Attempting to obtain admin token (attempt {attempt + 1})")
121
 
122
+ token = self._make_login_request(login_payload)
123
+ if token:
124
+ self.token = token
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
125
  self.last_obtained = current_time
 
126
  logger.info("Successfully obtained admin token")
127
  return self.token
128
 
129
+ if attempt < MAX_TOKEN_RETRIES - 1:
130
+ time.sleep(TOKEN_RETRY_DELAY * (attempt + 1))
 
 
 
131
 
132
  except requests.exceptions.RequestException as e:
133
  logger.error(f"Network error during token fetch: {str(e)}")
134
  if attempt < MAX_TOKEN_RETRIES - 1:
135
+ time.sleep(TOKEN_RETRY_DELAY * (attempt + 1))
136
  continue
137
  except Exception as e:
138
  logger.error(f"Unexpected error during token fetch: {str(e)}")
 
140
  time.sleep(TOKEN_RETRY_DELAY)
141
  continue
142
 
143
+ raise Exception("Failed to obtain admin token after multiple attempts and approaches")
144
 
145
  token_manager = TokenManager()
146
 
147
+
148
  @app.get("/")
149
  def root():
150
  logger.debug("Root endpoint accessed")