Spaces:
Sleeping
Sleeping
| import requests | |
| import os | |
| from datetime import datetime | |
| from supabase import create_client | |
| SUPABASE_URL = os.getenv("SUPABASE_URL") | |
| SUPABASE_API_KEY = os.getenv("SUPABASE_API_KEY") | |
| supabase = create_client(SUPABASE_URL, SUPABASE_API_KEY) | |
| def refresh_snapchat_token(): | |
| # Get latest token from Supabase | |
| result = supabase.table("snapchat_tokens").select("*").order("refreshed_at", desc=True).limit(1).execute() | |
| if not result.data: | |
| print("β No tokens found in Supabase") | |
| return | |
| latest = result.data[0] | |
| data = { | |
| "grant_type": "refresh_token", | |
| "refresh_token": latest["refresh_token"], | |
| "client_id": os.getenv("CLIENT_ID"), | |
| "client_secret": os.getenv("CLIENT_SECRET") | |
| } | |
| headers = {"Content-Type": "application/x-www-form-urlencoded"} | |
| url = "https://accounts.snapchat.com/login/oauth2/access_token" | |
| response = requests.post(url, headers=headers, data=data) | |
| res_data = response.json() | |
| if "access_token" in res_data: | |
| supabase.table("snapchat_tokens").insert({ | |
| "access_token": res_data["access_token"], | |
| "refresh_token": res_data["refresh_token"], | |
| "refreshed_at": datetime.utcnow().isoformat() | |
| }).execute() | |
| print(f"β Token refreshed and saved at {datetime.utcnow().isoformat()}") | |
| else: | |
| print("β Failed to refresh token:", res_data) |