Spaces:
Sleeping
Sleeping
| from fastapi import Request | |
| from fastapi.responses import JSONResponse | |
| from starlette.middleware.base import BaseHTTPMiddleware | |
| from nacl.signing import VerifyKey | |
| from nacl.exceptions import BadSignatureError | |
| import hashlib | |
| import secrets | |
| import base58 | |
| import requests | |
| import json | |
| import httpx | |
| from geopy.geocoders import Nominatim | |
| from geopy.exc import GeocoderServiceError | |
| geolocator = Nominatim(user_agent="velocity") | |
| def GEOCODER(coords): | |
| try: | |
| lat = coords.get("latitude") | |
| lon = coords.get("longitude") | |
| if lat is None or lon is None: | |
| return { | |
| "status": "error", | |
| "message": "Missing latitude or longitude.", | |
| "data": None | |
| } | |
| location = geolocator.reverse(f"{lat}, {lon}", exactly_one=True) | |
| if location and "address" in location.raw: | |
| country_name = location.raw["address"].get("country") | |
| country_code = location.raw["address"].get("country_code") | |
| return { | |
| "status": "success", | |
| "message": "Location resolved successfully.", | |
| "data": { | |
| "country": country_name, | |
| "code": country_code.upper() if country_code else None | |
| } | |
| } | |
| return { | |
| "status": "not_found", | |
| "message": "Location could not be resolved.", | |
| "data": None | |
| } | |
| except GeocoderServiceError as e: | |
| return { | |
| "status": "error", | |
| "message": f"Geocoding service error: {str(e)}", | |
| "data": None | |
| } | |
| except Exception as e: | |
| return { | |
| "status": "error", | |
| "message": f"Unexpected error: {str(e)}", | |
| "data": None | |
| } | |
| def SignatureVerification(X_401_Addr,X_401_Nonce,X_401_Sign,challange): | |
| if X_401_Addr and X_401_Nonce and X_401_Sign: | |
| try: | |
| signature_bytes = base58.b58decode(X_401_Sign) | |
| verify_key = VerifyKey(base58.b58decode(X_401_Addr)) | |
| payload_bytes = challange.encode("utf-8") | |
| verify_key.verify(payload_bytes, signature_bytes) | |
| return True | |
| except BadSignatureError: | |
| return False | |
| def SecretNonceGenerator(): | |
| random_bytes = secrets.token_bytes(32) | |
| return hashlib.sha256(random_bytes).hexdigest() | |
| def TokenCheck(walletPublicKey): | |
| url = f"https://mainnet.helius-rpc.com/?api-key=4e833ada-d32c-48c5-b020-c11b2253f25b" | |
| payload = { | |
| "jsonrpc": "2.0", | |
| "id": "1", | |
| "method": "getTokenAccountsByOwner", | |
| "params": [ | |
| walletPublicKey, | |
| {"mint":"3d4XyPWkUJzruF5c2qc1QfpLgsaNaDLMtTya1bWBpump" }, | |
| {"encoding": "jsonParsed"} | |
| ] | |
| } | |
| headers = {"Content-Type": "application/json"} | |
| try: | |
| response = requests.post(url, json=payload, headers=headers) | |
| response.raise_for_status() | |
| data = response.json() | |
| except requests.exceptions.RequestException as e: | |
| return {"status": False, "message": f"API Request Failed: {e}"} | |
| token_accounts = data.get("result", {}).get("value") | |
| if not token_accounts: | |
| return False | |
| try: | |
| account_info = token_accounts[0]["account"]["data"]["parsed"]["info"] | |
| token_amount = account_info["tokenAmount"] | |
| ui_amount = float(token_amount.get("uiAmount")) | |
| except (TypeError, KeyError, IndexError): | |
| return {"status": False, "message": "Could not parse token account data"} | |
| if ui_amount >= 100000.0: | |
| print(ui_amount) | |
| return True | |
| else: | |
| print(ui_amount) | |
| return False | |
| class x401Kit(BaseHTTPMiddleware): | |
| def __init__(self, app, protected_paths: list, required_mint: str, mint_amount:int , helius_api_key: str, geo_code:bool ,geo_code_locs:list,secret_domain="VELOCITY"): | |
| super().__init__(app) | |
| self.protected_paths = protected_paths | |
| self.required_mint = required_mint | |
| self.mint_amount=mint_amount | |
| self.helius_api_key = helius_api_key | |
| self.secret_domain = secret_domain | |
| self.geo_code=geo_code | |
| self.geo_code_locs=geo_code_locs | |
| async def dispatch(self, request: Request, call_next): | |
| if request.method == "OPTIONS": | |
| return await call_next(request) | |
| if not any(request.url.path.startswith(p) for p in self.protected_paths): | |
| return await call_next(request) | |
| NONCE=SecretNonceGenerator() | |
| X_401_Nonce=request.headers.get("X-401-Nonce") | |
| X_401_Sign=request.headers.get("X-401-Signature") | |
| X_401_Addr=request.headers.get("X-401-Addr") | |
| lat=request.headers.get("X-Lat") | |
| long=request.headers.get("X-Long") | |
| coords={ | |
| "latitude":lat, | |
| "longitude":long | |
| } | |
| REQUIRED_SERVICE=None | |
| if not X_401_Addr and not X_401_Nonce and not X_401_Sign : | |
| payload401={ | |
| "X-401-Status":"Authrequired", | |
| "x-401-Mechanism":"SOLANA", | |
| "X-401-Nonce":NONCE, | |
| "Access-Control-Allow-Origin": "*", | |
| "Access-Control-Allow-Credentials": "true", | |
| "Access-Control-Expose-Headers": "x-401-Nonce, x-401-Mechanism, x-401-Status" | |
| } | |
| return JSONResponse(content={ | |
| "message":"401 Auth Required", | |
| "information":"Non persistant stateless auth" | |
| },headers=payload401,status_code=401) | |
| challange=f"CHALLENGE::{X_401_Nonce}::{request.url.path}::{self.secret_domain}" | |
| signverify=SignatureVerification(X_401_Addr,X_401_Nonce,X_401_Sign,challange) | |
| tokenverify=TokenCheck(X_401_Addr) | |
| print(tokenverify) | |
| if signverify == True and tokenverify == True: | |
| if self.geo_code==True: | |
| country=GEOCODER(coords) | |
| print(country) | |
| if country["data"] is None: | |
| return JSONResponse( | |
| content={"status": "locerror", "message": f"Auth error"}, | |
| headers= { | |
| "Access-Control-Allow-Origin": "*", | |
| "Access-Control-Allow-Credentials": "true" | |
| }, | |
| status_code=500 | |
| ) | |
| if country["data"]["code"] in self.geo_code_locs: | |
| return JSONResponse( | |
| content={"status": "locdeny", "message":f"access denied for {country['data']['country']}"}, | |
| headers= { | |
| "Access-Control-Allow-Origin": "*", | |
| "Access-Control-Allow-Credentials": "true" | |
| }, | |
| status_code=401 | |
| ) | |
| response = await call_next(request) | |
| if response.headers.get("content-type") == "application/json": | |
| body_bytes = b"" | |
| async for chunk in response.body_iterator: | |
| body_bytes += chunk | |
| try: | |
| data = json.loads(body_bytes.decode()) | |
| except json.JSONDecodeError: | |
| return response | |
| data["address"] = X_401_Addr | |
| response_headers = dict(response.headers) | |
| response_headers.pop("content-length", None) | |
| print(data) | |
| return JSONResponse( | |
| content=data, | |
| status_code=response.status_code, | |
| headers=response_headers | |
| ) | |
| return response | |
| elif tokenverify==False: | |
| return JSONResponse( | |
| content={"status": "error", "message": "Missing required token"}, | |
| status_code=500, | |
| headers={ | |
| "Access-Control-Allow-Origin": "*", | |
| "Access-Control-Allow-Credentials": "true" | |
| } | |
| ) | |
| elif signverify==False: | |
| print("failed") | |
| return JSONResponse( | |
| content={"status": "error", "message": "bad signature"}, | |
| status_code=500, | |
| headers={ | |
| "Access-Control-Allow-Origin": "*", | |
| "Access-Control-Allow-Credentials": "true" | |
| } | |
| ) | |
| else: | |
| return JSONResponse( | |
| content={"status": "error", "message": "Authentication failed"}, | |
| status_code=500, | |
| headers={ | |
| "Access-Control-Allow-Origin": "*", | |
| "Access-Control-Allow-Credentials": "true" | |
| } | |
| ) | |