lightning / subscriptions.py
sharktide's picture
Update subscriptions.py
bbfb6eb verified
import os
import psycopg
from psycopg.rows import dict_row
from supabase import create_client, Client
import asyncio
conn = None
conn_lock = asyncio.Lock()
SUPABASE_URL = os.getenv("SUPABASE_URL")
SUPABASE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY")
supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
POSTGRE_SECRET = os.getenv("POSTGRE_SECRET")
conn = psycopg.connect(POSTGRE_SECRET, row_factory=dict_row, sslmode="verify-full", sslrootcert="prod-ca-2021.crt")
async def get_conn():
global conn
if conn is None or conn.closed:
conn = psycopg.connect(POSTGRE_SECRET, row_factory=dict_row, sslmode="verify-full", sslrootcert="prod-ca-2021.crt")
return conn
def normalize_plan_key(plan_name: str | None) -> str:
if not plan_name:
return "free"
normalized = "".join(ch for ch in str(plan_name).lower() if ch.isalpha())
if "professional" in normalized:
return "professional"
if "creator" in normalized:
return "creator"
if "core" in normalized:
return "core"
if "light" in normalized:
return "light"
return "free"
async def fetch_subscription(jwt: str):
auth_res = supabase.auth.get_user(jwt)
if auth_res.user is None:
return {"error": "Invalid or expired session"}
user = auth_res.user
email = user.email
async with conn_lock:
connection = await get_conn()
try:
with connection.cursor() as cur:
cur.execute("""
with cust as (
select id
from stripe.customers
where email = %s
),
subs as (
select
s.id as subscription_id,
s.status,
s.current_period_end,
s.items->'data'->0->'price'->>'id' as price_id
from stripe.subscriptions s
join cust on s.customer = cust.id
where s.status in ('active', 'trialing', 'past_due')
)
select
subs.subscription_id,
subs.status,
subs.current_period_end,
subs.price_id,
prices.nickname,
prices.product as product_id,
products.name as product_name
from subs
left join stripe.prices prices
on prices.id = subs.price_id
left join stripe.products products
on prices.product = products.id;
""", (email,))
rows = cur.fetchall()
except psycopg.OperationalError:
connection = psycopg.connect(POSTGRE_SECRET, row_factory=dict_row)
with connection.cursor() as cur:
cur.execute("""
with cust as (
select id
from stripe.customers
where email = %s
),
subs as (
select
s.id as subscription_id,
s.status,
s.current_period_end,
s.items->'data'->0->'price'->>'id' as price_id
from stripe.subscriptions s
join cust on s.customer = cust.id
where s.status in ('active', 'trialing', 'past_due')
)
select
subs.subscription_id,
subs.status,
subs.current_period_end,
subs.price_id,
prices.nickname,
prices.product as product_id,
products.name as product_name
from subs
left join stripe.prices prices
on prices.id = subs.price_id
left join stripe.products products
on prices.product = products.id;
""", (email,))
rows = cur.fetchall()
if not rows:
return {
"email": email,
"signed_up": user.created_at.isoformat(),
"subscription": None,
"plan_key": "free",
}
subscriptions = []
preferred_plan_key = "free"
for row in rows:
plan_key = normalize_plan_key(row["product_name"] or row["nickname"])
subscriptions.append({
"subscription_id": row["subscription_id"],
"status": row["status"],
"current_period_end": row["current_period_end"],
"price_id": row["price_id"],
"product_name": row["product_name"],
"nickname": row["nickname"],
"plan_key": plan_key,
})
if row["status"] in ("active", "trialing"):
preferred_plan_key = plan_key
return {
"email": email,
"signed_up": user.created_at.isoformat(),
"subscription": subscriptions,
"plan_key": preferred_plan_key,
}