Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, Request, HTTPException, Response | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import RedirectResponse, HTMLResponse | |
| from api import api_router | |
| import gradio as gr | |
| import requests | |
| import logging | |
| import time | |
| from typing import Optional, Dict, Any | |
| import json | |
| # Configure logging | |
| logging.basicConfig(level=logging.DEBUG) | |
| logger = logging.getLogger(__name__) | |
| logger.debug("Initializing application") | |
| app = FastAPI() | |
| # CORS Configuration | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| app.include_router(api_router) | |
| # Constants | |
| BACKEND_URL = "https://rocketfarmstudios-cps-api.hf.space" | |
| ADMIN_EMAIL = "yakdhanali97@gmail.com" | |
| ADMIN_PASSWORD = "123456" | |
| MAX_TOKEN_RETRIES = 3 | |
| TOKEN_RETRY_DELAY = 2 # seconds | |
| class TokenManager: | |
| def __init__(self): | |
| self.token: Optional[str] = None | |
| self.last_obtained: float = 0 | |
| self.token_expiry: int = 3600 # 1 hour default expiry | |
| def _make_login_request(self, payload: Dict[str, Any]) -> Optional[str]: | |
| """Helper method to make the login request with different approaches""" | |
| approaches = [ | |
| self._try_json_request, | |
| self._try_form_data_request, | |
| self._try_urlencoded_request | |
| ] | |
| for approach in approaches: | |
| try: | |
| result = approach(payload) | |
| if result: | |
| return result | |
| except Exception as e: | |
| logger.warning(f"Approach failed: {approach.__name__}, error: {str(e)}") | |
| continue | |
| return None | |
| def _try_json_request(self, payload: Dict[str, Any]) -> Optional[str]: | |
| """Try sending as JSON with proper headers""" | |
| headers = { | |
| "Content-Type": "application/json", | |
| "Accept": "application/json" | |
| } | |
| res = requests.post( | |
| f"{BACKEND_URL}/auth/login", | |
| json=payload, | |
| headers=headers, | |
| timeout=10 | |
| ) | |
| return self._process_response(res) | |
| def _try_form_data_request(self, payload: Dict[str, Any]) -> Optional[str]: | |
| """Try sending as form data""" | |
| res = requests.post( | |
| f"{BACKEND_URL}/auth/login", | |
| data=payload, | |
| timeout=10 | |
| ) | |
| return self._process_response(res) | |
| def _try_urlencoded_request(self, payload: Dict[str, Any]) -> Optional[str]: | |
| """Try sending as x-www-form-urlencoded""" | |
| headers = { | |
| "Content-Type": "application/x-www-form-urlencoded", | |
| } | |
| res = requests.post( | |
| f"{BACKEND_URL}/auth/login", | |
| data=payload, | |
| headers=headers, | |
| timeout=10 | |
| ) | |
| return self._process_response(res) | |
| def _process_response(self, response: requests.Response) -> Optional[str]: | |
| """Process the response and extract token if successful""" | |
| if response.status_code == 200: | |
| token_data = response.json() | |
| return token_data.get("access_token") | |
| logger.error(f"Request failed: {response.status_code} - {response.text}") | |
| return None | |
| def get_token(self, force_refresh: bool = False) -> str: | |
| current_time = time.time() | |
| if not force_refresh and self.token and (current_time - self.last_obtained) < self.token_expiry: | |
| return self.token | |
| login_payload = { | |
| "username": ADMIN_EMAIL, | |
| "password": ADMIN_PASSWORD, | |
| "device_token": "admin-device-token" | |
| } | |
| for attempt in range(MAX_TOKEN_RETRIES): | |
| try: | |
| logger.debug(f"Attempting to obtain admin token (attempt {attempt + 1})") | |
| token = self._make_login_request(login_payload) | |
| if token: | |
| self.token = token | |
| self.last_obtained = current_time | |
| logger.info("Successfully obtained admin token") | |
| return self.token | |
| if attempt < MAX_TOKEN_RETRIES - 1: | |
| time.sleep(TOKEN_RETRY_DELAY * (attempt + 1)) | |
| except requests.exceptions.RequestException as e: | |
| logger.error(f"Network error during token fetch: {str(e)}") | |
| if attempt < MAX_TOKEN_RETRIES - 1: | |
| time.sleep(TOKEN_RETRY_DELAY * (attempt + 1)) | |
| continue | |
| except Exception as e: | |
| logger.error(f"Unexpected error during token fetch: {str(e)}") | |
| if attempt < MAX_TOKEN_RETRIES - 1: | |
| time.sleep(TOKEN_RETRY_DELAY) | |
| continue | |
| raise Exception("Failed to obtain admin token after multiple attempts and approaches") | |
| token_manager = TokenManager() | |
| def root(): | |
| logger.debug("Root endpoint accessed") | |
| return {"message": "π FastAPI with MongoDB + JWT is running."} | |
| async def redirect_login(request: Request): | |
| logger.info("Redirecting /login to /auth/login") | |
| return RedirectResponse(url="/auth/login", status_code=307) | |
| def authenticate_admin(email: str = None, password: str = None): | |
| if email != ADMIN_EMAIL or password != ADMIN_PASSWORD: | |
| logger.warning(f"Failed admin login attempt with email: {email}") | |
| raise HTTPException(status_code=401, detail="Unauthorized: Invalid email or password") | |
| logger.info(f"Admin authenticated successfully: {email}") | |
| return True | |
| def create_doctor(full_name: str, email: str, matricule: str, password: str, specialty: str) -> str: | |
| try: | |
| token = token_manager.get_token() | |
| payload = { | |
| "full_name": full_name, | |
| "email": email, | |
| "license_number": matricule, | |
| "password": password, | |
| "specialty": specialty, | |
| } | |
| headers = { | |
| "Authorization": f"Bearer {token}", | |
| "Content-Type": "application/json" | |
| } | |
| res = requests.post( | |
| f"{BACKEND_URL}/auth/admin/doctors", | |
| json=payload, | |
| headers=headers, | |
| timeout=10 | |
| ) | |
| if res.status_code == 201: | |
| return "β Doctor created successfully!" | |
| elif res.status_code == 401: # Token might be expired | |
| logger.warning("Token expired, attempting refresh...") | |
| token = token_manager.get_token(force_refresh=True) | |
| headers["Authorization"] = f"Bearer {token}" | |
| res = requests.post( | |
| f"{BACKEND_URL}/auth/admin/doctors", | |
| json=payload, | |
| headers=headers, | |
| timeout=10 | |
| ) | |
| if res.status_code == 201: | |
| return "β Doctor created successfully!" | |
| error_detail = res.json().get('detail', res.text) | |
| return f"β Error: {error_detail} (Status: {res.status_code})" | |
| except requests.exceptions.RequestException as e: | |
| return f"β Network Error: {str(e)}" | |
| except Exception as e: | |
| return f"β System Error: {str(e)}" | |
| admin_ui = gr.Blocks(css=""" | |
| .gradio-container { | |
| background-color: #1A1B1F; | |
| color: #E2E8F0; | |
| font-family: 'Segoe UI', sans-serif; | |
| padding: 3rem; | |
| } | |
| .title-text { text-align: center; font-size: 2rem; font-weight: 700; color: #37B6E9; margin-bottom: 0.5rem; } | |
| .description-text { text-align: center; font-size: 1rem; color: #A0AEC0; margin-bottom: 2rem; } | |
| .gr-box, .gr-form, .gr-column, .gr-panel { background-color: #2D2F36 !important; border-radius: 16px !important; padding: 2rem !important; max-width: 600px; margin: auto; box-shadow: 0 0 0 1px #3B3E47; } | |
| label { font-weight: 600; color: #F7FAFC; margin-bottom: 6px; } | |
| input, select, textarea { background-color: #1A1B1F !important; color: #F7FAFC !important; border: 1px solid #4A5568 !important; font-size: 14px; padding: 10px; border-radius: 10px; } | |
| button { background-color: #37B6E9 !important; color: #1A1B1F !important; border-radius: 10px !important; font-weight: 600; padding: 12px; width: 100%; margin-top: 1.5rem; } | |
| .output-box textarea { background-color: transparent !important; border: none; color: #90CDF4; font-size: 14px; margin-top: 1rem; } | |
| """) | |
| with admin_ui: | |
| gr.Markdown("<div class='title-text'>π¨ββοΈ Doctor Account Creator</div>") | |
| gr.Markdown("<div class='description-text'>Admins can register new doctors using this secure panel</div>") | |
| with gr.Column(): | |
| full_name = gr.Textbox(label="Full Name", placeholder="e.g. Dr. Sarah Hopkins") | |
| email = gr.Textbox(label="Email", placeholder="e.g. doctor@clinic.org") | |
| matricule = gr.Textbox(label="Matricule", placeholder="e.g. DOC-1234") | |
| specialty = gr.Dropdown( | |
| label="Specialty", | |
| choices=["Cardiology", "Neurology", "Pediatrics", "Oncology", | |
| "General Practice", "Psychiatry", "Dermatology", "Orthopedics"], | |
| value="General Practice" | |
| ) | |
| password = gr.Textbox(label="Password", type="password", placeholder="Secure password") | |
| submit_btn = gr.Button("Create Doctor Account") | |
| output = gr.Textbox(label="", show_label=False, elem_classes=["output-box"]) | |
| submit_btn.click( | |
| fn=create_doctor, | |
| inputs=[full_name, email, matricule, specialty, password], | |
| outputs=output | |
| ) | |
| app = gr.mount_gradio_app(app, admin_ui, path="/admin-auth") | |
| async def admin_dashboard(email: str = None, password: str = None, response: Response = None): | |
| logger.debug("Admin dashboard accessed") | |
| try: | |
| authenticate_admin(email, password) | |
| return RedirectResponse(url="/admin-auth", status_code=307) | |
| except HTTPException as e: | |
| response.status_code = 401 | |
| return HTMLResponse(content=""" | |
| <h1>401 Unauthorized</h1> | |
| <p>Invalid email or password.</p> | |
| <p>Please use the following credentials:</p> | |
| <ul> | |
| <li>Email: yakdhanali97@gmail.com</li> | |
| <li>Password: 123456</li> | |
| </ul> | |
| """) | |
| if __name__ == "__main__": | |
| logger.info("Starting application") | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) |