Ali2206 commited on
Commit
d1ab767
Β·
verified Β·
1 Parent(s): 1fbdffc

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +229 -190
app.py CHANGED
@@ -7,14 +7,42 @@ import requests
7
  import logging
8
  import time
9
  from typing import Optional, Dict, Any
10
- import json
 
 
 
 
 
11
 
12
  # Configure logging
13
- logging.basicConfig(level=logging.DEBUG)
 
 
 
14
  logger = logging.getLogger(__name__)
15
- logger.debug("Initializing application")
16
 
17
- app = FastAPI()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
18
 
19
  # CORS Configuration
20
  app.add_middleware(
@@ -27,249 +55,260 @@ app.add_middleware(
27
 
28
  app.include_router(api_router)
29
 
30
- # Constants
31
- BACKEND_URL = "https://rocketfarmstudios-cps-api.hf.space"
32
- ADMIN_EMAIL = "yakdhanali97@gmail.com"
33
- ADMIN_PASSWORD = "123456"
34
- MAX_TOKEN_RETRIES = 3
35
- TOKEN_RETRY_DELAY = 2 # seconds
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
36
 
37
  class TokenManager:
38
  def __init__(self):
39
- self.token: Optional[str] = None
40
- self.last_obtained: float = 0
41
- self.token_expiry: int = 3600 # 1 hour default expiry
42
-
43
- def _make_login_request(self, payload: Dict[str, Any]) -> Optional[str]:
44
- """Helper method to make the login request with different approaches"""
45
- approaches = [
46
- self._try_json_request,
47
- self._try_form_data_request,
48
- self._try_urlencoded_request
49
  ]
50
-
51
- for approach in approaches:
52
  try:
53
- result = approach(payload)
54
- if result:
55
- return result
 
 
 
 
 
 
 
 
 
 
56
  except Exception as e:
57
- logger.warning(f"Approach failed: {approach.__name__}, error: {str(e)}")
58
- continue
59
-
60
  return None
61
 
62
- def _try_json_request(self, payload: Dict[str, Any]) -> Optional[str]:
63
- """Try sending as JSON with proper headers"""
64
- headers = {
65
- "Content-Type": "application/json",
66
- "Accept": "application/json"
67
- }
68
- res = requests.post(
69
- f"{BACKEND_URL}/auth/login",
70
- json=payload,
71
- headers=headers,
72
- timeout=10
73
- )
74
- return self._process_response(res)
75
-
76
- def _try_form_data_request(self, payload: Dict[str, Any]) -> Optional[str]:
77
- """Try sending as form data"""
78
- res = requests.post(
79
- f"{BACKEND_URL}/auth/login",
80
- data=payload,
81
- timeout=10
82
- )
83
- return self._process_response(res)
84
-
85
- def _try_urlencoded_request(self, payload: Dict[str, Any]) -> Optional[str]:
86
- """Try sending as x-www-form-urlencoded"""
87
- headers = {
88
- "Content-Type": "application/x-www-form-urlencoded",
89
  }
90
- res = requests.post(
91
- f"{BACKEND_URL}/auth/login",
92
- data=payload,
93
- headers=headers,
94
- timeout=10
95
- )
96
- return self._process_response(res)
97
-
98
- def _process_response(self, response: requests.Response) -> Optional[str]:
99
- """Process the response and extract token if successful"""
100
- if response.status_code == 200:
101
- token_data = response.json()
102
- return token_data.get("access_token")
103
- logger.error(f"Request failed: {response.status_code} - {response.text}")
104
- return None
105
 
106
- def get_token(self, force_refresh: bool = False) -> str:
107
- current_time = time.time()
108
-
109
- if not force_refresh and self.token and (current_time - self.last_obtained) < self.token_expiry:
110
- return self.token
111
-
112
- login_payload = {
113
- "username": ADMIN_EMAIL,
114
- "password": ADMIN_PASSWORD,
115
- "device_token": "admin-device-token"
116
- }
117
-
118
- for attempt in range(MAX_TOKEN_RETRIES):
119
  try:
120
- logger.debug(f"Attempting to obtain admin token (attempt {attempt + 1})")
 
121
 
122
- token = self._make_login_request(login_payload)
123
- if token:
124
- self.token = token
125
- self.last_obtained = current_time
126
- logger.info("Successfully obtained admin token")
127
  return self.token
128
 
129
- if attempt < MAX_TOKEN_RETRIES - 1:
130
- time.sleep(TOKEN_RETRY_DELAY * (attempt + 1))
131
-
132
- except requests.exceptions.RequestException as e:
133
- logger.error(f"Network error during token fetch: {str(e)}")
134
- if attempt < MAX_TOKEN_RETRIES - 1:
135
- time.sleep(TOKEN_RETRY_DELAY * (attempt + 1))
136
- continue
137
- except Exception as e:
138
- logger.error(f"Unexpected error during token fetch: {str(e)}")
139
- if attempt < MAX_TOKEN_RETRIES - 1:
140
- time.sleep(TOKEN_RETRY_DELAY)
141
- continue
142
 
143
- raise Exception("Failed to obtain admin token after multiple attempts and approaches")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
144
 
145
  token_manager = TokenManager()
146
 
 
 
 
147
 
148
  @app.get("/")
149
- def root():
150
- logger.debug("Root endpoint accessed")
151
- return {"message": "πŸš€ FastAPI with MongoDB + JWT is running."}
152
 
153
  @app.post("/login")
154
- async def redirect_login(request: Request):
155
- logger.info("Redirecting /login to /auth/login")
156
  return RedirectResponse(url="/auth/login", status_code=307)
157
 
158
- def authenticate_admin(email: str = None, password: str = None):
159
- if email != ADMIN_EMAIL or password != ADMIN_PASSWORD:
160
- logger.warning(f"Failed admin login attempt with email: {email}")
161
- raise HTTPException(status_code=401, detail="Unauthorized: Invalid email or password")
162
- logger.info(f"Admin authenticated successfully: {email}")
163
- return True
164
 
165
- def create_doctor(full_name: str, email: str, matricule: str, password: str, specialty: str) -> str:
 
 
 
 
 
 
 
166
  try:
167
  token = token_manager.get_token()
 
 
 
 
168
 
169
  payload = {
170
  "full_name": full_name,
171
  "email": email,
172
  "license_number": matricule,
173
  "password": password,
174
- "specialty": specialty,
175
  }
176
-
177
- headers = {
178
- "Authorization": f"Bearer {token}",
179
- "Content-Type": "application/json"
180
- }
181
-
182
- res = requests.post(
183
  f"{BACKEND_URL}/auth/admin/doctors",
184
  json=payload,
185
  headers=headers,
186
- timeout=10
187
  )
188
-
189
- if res.status_code == 201:
190
  return "βœ… Doctor created successfully!"
191
- elif res.status_code == 401: # Token might be expired
192
- logger.warning("Token expired, attempting refresh...")
193
- token = token_manager.get_token(force_refresh=True)
 
194
  headers["Authorization"] = f"Bearer {token}"
195
- res = requests.post(
196
  f"{BACKEND_URL}/auth/admin/doctors",
197
  json=payload,
198
  headers=headers,
199
- timeout=10
200
  )
201
- if res.status_code == 201:
202
  return "βœ… Doctor created successfully!"
203
-
204
- error_detail = res.json().get('detail', res.text)
205
- return f"❌ Error: {error_detail} (Status: {res.status_code})"
206
-
207
  except requests.exceptions.RequestException as e:
208
- return f"❌ Network Error: {str(e)}"
209
  except Exception as e:
210
- return f"❌ System Error: {str(e)}"
 
211
 
212
- admin_ui = gr.Blocks(css="""
 
 
 
 
 
213
  .gradio-container {
214
- background-color: #1A1B1F;
215
- color: #E2E8F0;
216
  font-family: 'Segoe UI', sans-serif;
217
- padding: 3rem;
 
 
 
 
218
  }
219
- .title-text { text-align: center; font-size: 2rem; font-weight: 700; color: #37B6E9; margin-bottom: 0.5rem; }
220
- .description-text { text-align: center; font-size: 1rem; color: #A0AEC0; margin-bottom: 2rem; }
221
- .gr-box, .gr-form, .gr-column, .gr-panel { background-color: #2D2F36 !important; border-radius: 16px !important; padding: 2rem !important; max-width: 600px; margin: auto; box-shadow: 0 0 0 1px #3B3E47; }
222
- label { font-weight: 600; color: #F7FAFC; margin-bottom: 6px; }
223
- input, select, textarea { background-color: #1A1B1F !important; color: #F7FAFC !important; border: 1px solid #4A5568 !important; font-size: 14px; padding: 10px; border-radius: 10px; }
224
- button { background-color: #37B6E9 !important; color: #1A1B1F !important; border-radius: 10px !important; font-weight: 600; padding: 12px; width: 100%; margin-top: 1.5rem; }
225
- .output-box textarea { background-color: transparent !important; border: none; color: #90CDF4; font-size: 14px; margin-top: 1rem; }
226
- """)
227
 
228
  with admin_ui:
229
- gr.Markdown("<div class='title-text'>πŸ‘¨β€βš•οΈ Doctor Account Creator</div>")
230
- gr.Markdown("<div class='description-text'>Admins can register new doctors using this secure panel</div>")
231
-
232
- with gr.Column():
233
- full_name = gr.Textbox(label="Full Name", placeholder="e.g. Dr. Sarah Hopkins")
234
- email = gr.Textbox(label="Email", placeholder="e.g. doctor@clinic.org")
235
- matricule = gr.Textbox(label="Matricule", placeholder="e.g. DOC-1234")
236
- specialty = gr.Dropdown(
237
- label="Specialty",
238
- choices=["Cardiology", "Neurology", "Pediatrics", "Oncology",
239
- "General Practice", "Psychiatry", "Dermatology", "Orthopedics"],
240
- value="General Practice"
241
- )
242
- password = gr.Textbox(label="Password", type="password", placeholder="Secure password")
243
- submit_btn = gr.Button("Create Doctor Account")
244
- output = gr.Textbox(label="", show_label=False, elem_classes=["output-box"])
245
-
246
- submit_btn.click(
247
- fn=create_doctor,
248
- inputs=[full_name, email, matricule, specialty, password],
249
- outputs=output
250
- )
 
 
 
 
 
 
 
 
 
251
 
252
- app = gr.mount_gradio_app(app, admin_ui, path="/admin-auth")
 
253
 
254
- @app.get("/admin")
255
- async def admin_dashboard(email: str = None, password: str = None, response: Response = None):
256
- logger.debug("Admin dashboard accessed")
257
- try:
258
- authenticate_admin(email, password)
259
- return RedirectResponse(url="/admin-auth", status_code=307)
260
- except HTTPException as e:
 
 
 
 
 
 
261
  response.status_code = 401
262
- return HTMLResponse(content="""
263
- <h1>401 Unauthorized</h1>
264
- <p>Invalid email or password.</p>
265
- <p>Please use the following credentials:</p>
266
- <ul>
267
- <li>Email: yakdhanali97@gmail.com</li>
268
- <li>Password: 123456</li>
269
- </ul>
270
  """)
 
 
 
 
 
271
 
272
  if __name__ == "__main__":
273
- logger.info("Starting application")
274
  import uvicorn
275
- uvicorn.run(app, host="0.0.0.0", port=7860)
 
 
 
 
 
 
 
7
  import logging
8
  import time
9
  from typing import Optional, Dict, Any
10
+ from passlib.context import CryptContext
11
+ from passlib.exc import UnknownHashError
12
+
13
+ # ======================
14
+ # Configuration
15
+ # ======================
16
 
17
  # Configure logging
18
+ logging.basicConfig(
19
+ level=logging.DEBUG,
20
+ format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
21
+ )
22
  logger = logging.getLogger(__name__)
 
23
 
24
+ # Password hashing context
25
+ pwd_context = CryptContext(
26
+ schemes=["bcrypt", "sha256_crypt"], # Supported schemes
27
+ deprecated="auto",
28
+ bcrypt__rounds=12,
29
+ sha256_crypt__rounds=10000
30
+ )
31
+
32
+ # Constants
33
+ BACKEND_URL = "https://rocketfarmstudios-cps-api.hf.space"
34
+ ADMIN_CREDENTIALS = {
35
+ "email": "yakdhanali97@gmail.com",
36
+ "password": "123456"
37
+ }
38
+ MAX_TOKEN_RETRIES = 3
39
+ TOKEN_RETRY_DELAY = 2 # seconds with exponential backoff
40
+
41
+ # ======================
42
+ # Application Setup
43
+ # ======================
44
+
45
+ app = FastAPI(title="Medical Admin Portal")
46
 
47
  # CORS Configuration
48
  app.add_middleware(
 
55
 
56
  app.include_router(api_router)
57
 
58
+ # ======================
59
+ # Security Utilities
60
+ # ======================
61
+
62
+ def verify_password(plain_password: str, hashed_password: str) -> bool:
63
+ """Safely verify password against stored hash with multiple fallbacks"""
64
+ try:
65
+ return pwd_context.verify(plain_password, hashed_password)
66
+ except UnknownHashError:
67
+ logger.warning(f"Unrecognized hash format: {hashed_password[:15]}...")
68
+ # Try common legacy formats if needed
69
+ if hashed_password.startswith("$2a$") or hashed_password.startswith("$2b$"):
70
+ try:
71
+ return pwd_context.verify(plain_password, hashed_password.replace("$2a$", "$2b$"))
72
+ except Exception:
73
+ pass
74
+ return False
75
+ except Exception as e:
76
+ logger.error(f"Password verification error: {str(e)}")
77
+ return False
78
+
79
+ # ======================
80
+ # Token Management
81
+ # ======================
82
 
83
  class TokenManager:
84
  def __init__(self):
85
+ self.token = None
86
+ self.last_refresh = 0
87
+ self.expires_in = 3600 # Default expiry
88
+
89
+ def _make_auth_request(self, payload: Dict[str, Any]) -> Optional[Dict[str, Any]]:
90
+ """Make authentication request with multiple content-type attempts"""
91
+ content_types = [
92
+ ("application/json", lambda d: {"json": d}),
93
+ ("application/x-www-form-urlencoded", lambda d: {"data": d}),
 
94
  ]
95
+
96
+ for content_type, payload_processor in content_types:
97
  try:
98
+ headers = {
99
+ "Content-Type": content_type,
100
+ "Accept": "application/json"
101
+ }
102
+ res = requests.post(
103
+ f"{BACKEND_URL}/auth/login",
104
+ headers=headers,
105
+ **payload_processor(payload),
106
+ timeout=10
107
+ )
108
+ if res.status_code == 200:
109
+ return res.json()
110
+ logger.debug(f"Attempt with {content_type} failed: {res.status_code}")
111
  except Exception as e:
112
+ logger.debug(f"Request with {content_type} failed: {str(e)}")
113
+
 
114
  return None
115
 
116
+ def refresh_token(self) -> str:
117
+ """Obtain a new token with retry logic"""
118
+ payload = {
119
+ "username": ADMIN_CREDENTIALS["email"],
120
+ "password": ADMIN_CREDENTIALS["password"],
121
+ "device_token": "admin-console"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
122
  }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
123
 
124
+ for attempt in range(1, MAX_TOKEN_RETRIES + 1):
 
 
 
 
 
 
 
 
 
 
 
 
125
  try:
126
+ logger.info(f"Attempting token refresh (attempt {attempt})")
127
+ response = self._make_auth_request(payload)
128
 
129
+ if response and "access_token" in response:
130
+ self.token = response["access_token"]
131
+ self.expires_in = response.get("expires_in", 3600)
132
+ self.last_refresh = time.time()
133
+ logger.info("Successfully refreshed admin token")
134
  return self.token
135
 
136
+ logger.warning(f"Token refresh failed: {response}")
 
 
 
 
 
 
 
 
 
 
 
 
137
 
138
+ except Exception as e:
139
+ logger.error(f"Token refresh error: {str(e)}")
140
+
141
+ if attempt < MAX_TOKEN_RETRIES:
142
+ delay = TOKEN_RETRY_DELAY * (2 ** (attempt - 1)) # Exponential backoff
143
+ time.sleep(delay)
144
+
145
+ raise HTTPException(
146
+ status_code=500,
147
+ detail="Failed to obtain admin token after multiple attempts"
148
+ )
149
+
150
+ def get_token(self) -> str:
151
+ """Get current valid token, refreshing if needed"""
152
+ if not self.token or (time.time() - self.last_refresh) > (self.expires_in - 60):
153
+ return self.refresh_token()
154
+ return self.token
155
 
156
  token_manager = TokenManager()
157
 
158
+ # ======================
159
+ # Core Routes
160
+ # ======================
161
 
162
  @app.get("/")
163
+ async def root():
164
+ return {"status": "active", "service": "Medical Admin Portal"}
 
165
 
166
  @app.post("/login")
167
+ async def handle_login(request: Request):
 
168
  return RedirectResponse(url="/auth/login", status_code=307)
169
 
170
+ # ======================
171
+ # Admin Functions
172
+ # ======================
 
 
 
173
 
174
+ def create_doctor(
175
+ full_name: str,
176
+ email: str,
177
+ matricule: str,
178
+ password: str,
179
+ specialty: str
180
+ ) -> str:
181
+ """Create new doctor account with error handling"""
182
  try:
183
  token = token_manager.get_token()
184
+ headers = {
185
+ "Authorization": f"Bearer {token}",
186
+ "Content-Type": "application/json"
187
+ }
188
 
189
  payload = {
190
  "full_name": full_name,
191
  "email": email,
192
  "license_number": matricule,
193
  "password": password,
194
+ "specialty": specialty
195
  }
196
+
197
+ response = requests.post(
 
 
 
 
 
198
  f"{BACKEND_URL}/auth/admin/doctors",
199
  json=payload,
200
  headers=headers,
201
+ timeout=15
202
  )
203
+
204
+ if response.status_code == 201:
205
  return "βœ… Doctor created successfully!"
206
+
207
+ # Handle token expiration
208
+ if response.status_code == 401:
209
+ token = token_manager.refresh_token()
210
  headers["Authorization"] = f"Bearer {token}"
211
+ response = requests.post(
212
  f"{BACKEND_URL}/auth/admin/doctors",
213
  json=payload,
214
  headers=headers,
215
+ timeout=15
216
  )
217
+ if response.status_code == 201:
218
  return "βœ… Doctor created successfully!"
219
+
220
+ error_detail = response.json().get("detail", "Unknown error")
221
+ return f"❌ Error: {error_detail} (Code: {response.status_code})"
222
+
223
  except requests.exceptions.RequestException as e:
224
+ return f"❌ Network error: {str(e)}"
225
  except Exception as e:
226
+ logger.error(f"Doctor creation failed: {str(e)}")
227
+ return f"❌ System error: {str(e)}"
228
 
229
+ # ======================
230
+ # Gradio Interface
231
+ # ======================
232
+
233
+ admin_ui = gr.Blocks(
234
+ css="""
235
  .gradio-container {
 
 
236
  font-family: 'Segoe UI', sans-serif;
237
+ max-width: 800px;
238
+ margin: 0 auto;
239
+ }
240
+ .input-group {
241
+ margin-bottom: 1.5rem;
242
  }
243
+ """
244
+ )
 
 
 
 
 
 
245
 
246
  with admin_ui:
247
+ gr.Markdown("# πŸ‘¨β€βš•οΈ Doctor Account Management")
248
+ gr.Markdown("Create new doctor accounts with appropriate specialties")
249
+
250
+ with gr.Row():
251
+ with gr.Column():
252
+ full_name = gr.Textbox(label="Full Name", placeholder="Dr. First Last")
253
+ email = gr.Textbox(label="Email", placeholder="doctor@hospital.org")
254
+ matricule = gr.Textbox(label="License Number")
255
+
256
+ with gr.Column():
257
+ specialty = gr.Dropdown(
258
+ label="Specialty",
259
+ choices=[
260
+ "General Practice", "Cardiology", "Neurology",
261
+ "Pediatrics", "Orthopedics", "Dermatology"
262
+ ]
263
+ )
264
+ password = gr.Textbox(
265
+ label="Password",
266
+ type="password",
267
+ info="Minimum 8 characters"
268
+ )
269
+
270
+ submit_btn = gr.Button("Create Account", variant="primary")
271
+ output = gr.Textbox(label="Status", interactive=False)
272
+
273
+ submit_btn.click(
274
+ fn=create_doctor,
275
+ inputs=[full_name, email, matricule, specialty, password],
276
+ outputs=output
277
+ )
278
 
279
+ # Mount Gradio interface
280
+ app = gr.mount_gradio_app(app, admin_ui, path="/admin")
281
 
282
+ # ======================
283
+ # Admin Dashboard
284
+ # ======================
285
+
286
+ @app.get("/admin-auth")
287
+ async def admin_auth(
288
+ email: str,
289
+ password: str,
290
+ response: Response
291
+ ):
292
+ """Secure admin dashboard entry point"""
293
+ if (email != ADMIN_CREDENTIALS["email"] or
294
+ not verify_password(password, pwd_context.hash(ADMIN_CREDENTIALS["password"]))):
295
  response.status_code = 401
296
+ return HTMLResponse("""
297
+ <h1>Access Denied</h1>
298
+ <p>Invalid admin credentials</p>
 
 
 
 
 
299
  """)
300
+ return RedirectResponse(url="/admin")
301
+
302
+ # ======================
303
+ # Application Entry
304
+ # ======================
305
 
306
  if __name__ == "__main__":
 
307
  import uvicorn
308
+ uvicorn.run(
309
+ app,
310
+ host="0.0.0.0",
311
+ port=7860,
312
+ log_level="info",
313
+ timeout_keep_alive=60
314
+ )