Spaces:
Sleeping
Sleeping
| from fastapi import Request | |
| from fastapi.responses import JSONResponse | |
| from starlette.middleware.base import BaseHTTPMiddleware | |
| from nacl.signing import VerifyKey | |
| from nacl.exceptions import BadSignatureError | |
| import hashlib | |
| import secrets | |
| import base58 | |
| import requests | |
| import json | |
| import httpx | |
| import jwt | |
| from jwt.exceptions import ( | |
| ExpiredSignatureError, | |
| InvalidAudienceError, | |
| InvalidSignatureError, | |
| DecodeError | |
| ) | |
| import datetime | |
| import time | |
| import base64 | |
| #from geopy.geocoders import Nominatim | |
| #from geopy.exc import GeocoderServiceError | |
| #geolocator = Nominatim(user_agent="velocity") | |
| def SignatureVerification(mpc,X_401_Addr,X_401_Nonce,X_401_Sign,challange): | |
| if X_401_Addr and X_401_Nonce and X_401_Sign: | |
| try: | |
| signature_bytes=None | |
| if mpc=='true': | |
| signature_bytes = base64.b64decode(X_401_Sign) | |
| elif mpc=='false': | |
| signature_bytes=base58.b58decode(X_401_Sign) | |
| verify_key = VerifyKey(base58.b58decode(X_401_Addr)) | |
| payload_bytes = challange.encode("utf-8") | |
| verify_key.verify(payload_bytes, signature_bytes) | |
| return True | |
| except BadSignatureError: | |
| return False | |
| def SecretNonceGenerator(): | |
| random_bytes = secrets.token_bytes(32) | |
| return hashlib.sha256(random_bytes).hexdigest() | |
| def verifyJWT(token_string,aud): | |
| try: | |
| payload = jwt.decode( | |
| token_string, | |
| "jwttoken", | |
| algorithms=["HS256"], | |
| audience=aud, | |
| issuer="velocity401" | |
| ) | |
| wallet_address = payload.get("sub") | |
| print(f"β Token valid for wallet: {wallet_address}") | |
| return wallet_address | |
| except ExpiredSignatureError: | |
| print("π¨ Token rejected: Signature has expired.") | |
| return None | |
| except InvalidAudienceError: | |
| print(f"π¨ Token rejected: Invalid audience. Expected '{EXPECTED_AUDIENCE}'.") | |
| return None | |
| except InvalidSignatureError: | |
| print("π¨ Token rejected: Invalid signature (key mismatch).") | |
| return None | |
| except DecodeError as e: | |
| print(f"π¨ Token rejected: Malformed JWT structure. Error: {e}") | |
| return None | |
| except Exception as e: | |
| print(f"β οΈ An unexpected error occurred during verification: {e}") | |
| return None | |
| def generateJWT(wallet,aud): | |
| EXPIRY_DURATION = datetime.timedelta(days=3) | |
| try: | |
| payload = { | |
| "sub": wallet, | |
| "aud":aud, | |
| "issuer":"velocity401", | |
| "exp": datetime.datetime.utcnow() + EXPIRY_DURATION, | |
| "iat": datetime.datetime.utcnow() | |
| } | |
| token = jwt.encode(payload, "jwttoken", algorithm="HS256") | |
| return token | |
| except Exception as e: | |
| return None | |
| def TokenCheck(walletPublicKey,required_mint,mint_amount): | |
| url = f"https://mainnet.helius-rpc.com/?api-key=4e833ada-d32c-48c5-b020-c11b2253f25b" | |
| payload = { | |
| "jsonrpc": "2.0", | |
| "id": "1", | |
| "method": "getTokenAccountsByOwner", | |
| "params": [ | |
| walletPublicKey, | |
| {"mint":required_mint}, | |
| {"encoding": "jsonParsed"} | |
| ] | |
| } | |
| headers = {"Content-Type": "application/json"} | |
| try: | |
| response = requests.post(url, json=payload, headers=headers) | |
| response.raise_for_status() | |
| data = response.json() | |
| except requests.exceptions.RequestException as e: | |
| return {"status": False, "message": f"API Request Failed: {e}"} | |
| token_accounts = data.get("result", {}).get("value") | |
| if not token_accounts: | |
| return False | |
| try: | |
| account_info = token_accounts[0]["account"]["data"]["parsed"]["info"] | |
| token_amount = account_info["tokenAmount"] | |
| ui_amount = float(token_amount.get("uiAmount")) | |
| except (TypeError, KeyError, IndexError): | |
| return {"status": False, "message": "Could not parse token account data"} | |
| if float(ui_amount) >= float(mint_amount): | |
| print(ui_amount) | |
| print(True) | |
| return True | |
| else: | |
| print(ui_amount) | |
| return False | |
| class x401Kit(BaseHTTPMiddleware): | |
| def __init__(self, app,protected_paths:list): | |
| super().__init__(app) | |
| self.protected_paths = protected_paths | |
| async def dispatch(self, request: Request, call_next): | |
| if request.method == "OPTIONS": | |
| return await call_next(request) | |
| if not any(request.url.path.startswith(p) for p in self.protected_paths): | |
| return await call_next(request) | |
| NONCE=SecretNonceGenerator() | |
| aud=request.headers.get("origin") | |
| X_401_Nonce=request.headers.get("X-401-Nonce") | |
| X_401_Sign=request.headers.get("X-401-Signature") | |
| X_401_Addr=request.headers.get("X-401-Addr") | |
| client_jwt=request.headers.get("x-jwt") | |
| REQUIRED_SERVICE=None | |
| if client_jwt: | |
| decoded=verifyJWT(client_jwt,aud) | |
| return JSONResponse( | |
| content={"status":"identified","token":decoded,"message":"identified already"}, | |
| status_code=200, | |
| headers={ | |
| "Access-Control-Allow-Origin": "*", | |
| "Access-Control-Allow-Credentials": "true" | |
| }) | |
| if not X_401_Addr and not X_401_Nonce and not X_401_Sign : | |
| payload401={ | |
| "X-401-Status":"Authrequired", | |
| "x-401-Mechanism":"SOLANA", | |
| "X-401-Nonce":NONCE, | |
| "Access-Control-Allow-Origin": "*", | |
| "Access-Control-Allow-Credentials": "true", | |
| "Access-Control-Expose-Headers": "x-401-Nonce, x-401-Mechanism, x-401-Status" | |
| } | |
| return JSONResponse(content={ | |
| "message":"401 Auth Required", | |
| "information":"Non persistant stateless auth", | |
| "issuer":"velocityinfra" | |
| },headers=payload401,status_code=401) | |
| required_mint = request.headers.get("required_mint") | |
| mint_amount=float(request.headers.get("mint_amount")) | |
| print(mint_amount) | |
| helius_api_key = request.headers.get("helius_api_key") | |
| mpc=request.headers.get("mpc") | |
| challange=f"CHALLENGE::{X_401_Nonce}::{request.url.path}::VELOCITY401" | |
| signverify=SignatureVerification(mpc,X_401_Addr,X_401_Nonce,X_401_Sign,challange) | |
| tokenverify=None | |
| if required_mint=="no": | |
| tokenverify=True | |
| else: | |
| tokenverify=TokenCheck(X_401_Addr,required_mint,mint_amount) | |
| print(tokenverify) | |
| if signverify == True and tokenverify ==True: | |
| response = await call_next(request) | |
| if response.headers.get("content-type") == "application/json": | |
| body_bytes = b"" | |
| async for chunk in response.body_iterator: | |
| body_bytes += chunk | |
| try: | |
| data = json.loads(body_bytes.decode()) | |
| except json.JSONDecodeError: | |
| return response | |
| JWTTOKEN=generateJWT(X_401_Addr,aud) | |
| data["token"] = JWTTOKEN | |
| response_headers = dict(response.headers) | |
| response_headers.pop("content-length", None) | |
| print(data) | |
| return JSONResponse( | |
| content=data, | |
| status_code=response.status_code, | |
| headers=response_headers | |
| ) | |
| return response | |
| elif tokenverify==False: | |
| return JSONResponse( | |
| content={"status": "tokenerror", "message": "Missing required token"}, | |
| status_code=500, | |
| headers={ | |
| "Access-Control-Allow-Origin": "*", | |
| "Access-Control-Allow-Credentials": "true" | |
| }) | |
| elif signverify==False: | |
| print("failed") | |
| return JSONResponse( | |
| content={"status": "signerror", "message": "bad signature"}, | |
| status_code=500, | |
| headers={ | |
| "Access-Control-Allow-Origin": "*", | |
| "Access-Control-Allow-Credentials": "true" | |
| }) | |
| else: | |
| return JSONResponse( | |
| content={"status": "autherror", "message": "Authentication failed"}, | |
| status_code=500, | |
| headers={ | |
| "Access-Control-Allow-Origin": "*", | |
| "Access-Control-Allow-Credentials": "true" | |
| }) |