Spaces:
Runtime error
Runtime error
| from fastapi import FastAPI, HTTPException, Depends, Form | |
| from fastapi.security import OAuth2PasswordBearer | |
| import httpx | |
| import os | |
| from dotenv import load_dotenv | |
| # Load environment variables | |
| load_dotenv() | |
| app = FastAPI() | |
| # Bearer token for API authentication | |
| BEARER_TOKEN = os.getenv("MEDUCINE_API_BEARER_TOKEN") | |
| # Base URL for the Meducine API | |
| BASE_URL = os.getenv("BASE_URL") | |
| # OAuth2PasswordBearer provides the token as a dependency | |
| oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/login") | |
| async def login(email: str = Form(...), password: str = Form(...)): | |
| async with httpx.AsyncClient() as client: | |
| try: | |
| response = await client.post( | |
| f"{BASE_URL}/actions/meducine-restapi/auth/login", | |
| data={"email": email, "password": password}, | |
| headers={"Authorization": f"Bearer {BEARER_TOKEN}"} | |
| ) | |
| response.raise_for_status() # Raise an error for bad responses (4xx or 5xx) | |
| return handle_response(response) # Assuming this function formats the response correctly | |
| except httpx.HTTPStatusError as e: | |
| raise HTTPException(status_code=e.response.status_code, detail=e.response.text) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def logout(email: str = Form(...), password: str = Form(...)): | |
| async with httpx.AsyncClient() as client: | |
| response = await client.post( | |
| f"{BASE_URL}/actions/meducine-restapi/auth/logout", | |
| data={"email": email, "password": password}, | |
| headers={"Authorization": f"Bearer {BEARER_TOKEN}"} | |
| ) | |
| return handle_response(response) | |
| async def get_identity(token: str = Depends(oauth2_scheme)): | |
| async with httpx.AsyncClient() as client: | |
| response = await client.get( | |
| f"{BASE_URL}/actions/meducine-restapi/auth/identity", | |
| headers={"Authorization": f"Bearer {token}"} | |
| ) | |
| return handle_response(response) | |
| async def check_premium_access(feature: str, token: str = Depends(oauth2_scheme)): | |
| async with httpx.AsyncClient() as client: | |
| response = await client.get( | |
| f"{BASE_URL}/actions/meducine-restapi/user/has-premium-access", | |
| params={"feature": feature}, | |
| headers={"Authorization": f"Bearer {token}"} | |
| ) | |
| return handle_response(response) | |
| def handle_response(response: httpx.Response): | |
| """ | |
| Handles the response from the Meducine API, returning appropriate responses based on status codes. | |
| """ | |
| if response.status_code in range(200, 300): | |
| return response.json() # Successful request | |
| elif response.status_code in range(400, 500): | |
| raise HTTPException(status_code=response.status_code, detail=response.json()) # Client error | |
| elif response.status_code in range(500, 600): | |
| raise HTTPException(status_code=response.status_code, detail="Server error") # Server error | |
| else: | |
| raise HTTPException(status_code=500, detail="Unexpected error") | |
| # Run the application | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="127.0.0.1", port=8000) | |