datapass-server / stripe_webhook.py
waroca's picture
Upload folder using huggingface_hub
f1b8a40 verified
import stripe
import os
import json
from fastapi import Request, HTTPException
from datetime import datetime, timedelta
import datasets_registry
import subscriptions_ledger
from fastapi.responses import JSONResponse
async def handle_stripe_webhook(request: Request):
payload = await request.body()
sig_header = request.headers.get("stripe-signature")
webhook_secret = os.getenv("STRIPE_WEBHOOK_SECRET")
try:
event = stripe.Webhook.construct_event(
payload, sig_header, webhook_secret
)
except ValueError as e:
raise HTTPException(status_code=400, detail="Invalid payload")
except stripe.error.SignatureVerificationError as e:
raise HTTPException(status_code=400, detail="Invalid signature")
event_type = event["type"]
data_object = event["data"]["object"]
if event_type == "checkout.session.completed":
await handle_checkout_session(data_object, event["id"])
elif event_type == "invoice.paid":
await handle_invoice_paid(data_object, event["id"])
return JSONResponse({"status": "success"})
async def handle_checkout_session(session, event_id):
# Metadata should contain hf_user and dataset_id
metadata = session.get("metadata", {})
hf_user = metadata.get("hf_user")
dataset_id = metadata.get("dataset_id")
if not hf_user or not dataset_id:
print(f"Missing metadata in session {session['id']}")
return
# Determine plan and duration
# In a real scenario, we might look up the line items to find the price ID
# For simplicity, we assume the first line item's price ID matches our registry
# Or we can pass the plan_id in metadata too.
# Let's try to get price_id from the session if possible, or rely on metadata
# If subscription mode, we might need to fetch the subscription details
subscription_id = session.get("subscription")
if subscription_id:
# It's a subscription
sub = stripe.Subscription.retrieve(subscription_id)
price_id = sub["items"]["data"][0]["price"]["id"]
current_period_end = sub["current_period_end"]
end_date = datetime.fromtimestamp(current_period_end)
else:
# One-time payment?
# For this hackathon, let's assume subscriptions.
print("Non-subscription checkout not fully supported yet.")
return
plan_info = datasets_registry.get_plan_by_price_id(price_id)
if not plan_info:
print(f"Unknown price_id {price_id}")
return
plan_id = plan_info["plan"]["plan_id"]
# Generate unique access token for this subscription
access_token = subscriptions_ledger.generate_access_token()
ledger_entry = {
"event_id": event_id,
"hf_user": hf_user,
"dataset_id": dataset_id,
"plan_id": plan_id,
"subscription_start": datetime.utcnow().isoformat() + "Z",
"subscription_end": end_date.isoformat() + "Z",
"source": "stripe",
"access_token": access_token,
"created_at": datetime.utcnow().isoformat() + "Z",
"stripe_customer_id": session.get("customer"),
"stripe_subscription_id": subscription_id
}
subscriptions_ledger.append_subscription_event(ledger_entry)
async def handle_invoice_paid(invoice, event_id):
subscription_id = invoice.get("subscription")
if not subscription_id:
return
# We need to find the user and dataset associated with this subscription
# We can query Stripe or look up in our ledger if we stored subscription_id
# For now, let's assume we can get it from the subscription metadata in Stripe
sub = stripe.Subscription.retrieve(subscription_id)
metadata = sub.get("metadata", {})
hf_user = metadata.get("hf_user")
dataset_id = metadata.get("dataset_id")
if not hf_user or not dataset_id:
# Try to find from previous ledger entries?
# For simplicity, we assume metadata is preserved on the subscription object in Stripe
print(f"Missing metadata in subscription {subscription_id}")
return
price_id = sub["items"]["data"][0]["price"]["id"]
current_period_end = sub["current_period_end"]
end_date = datetime.fromtimestamp(current_period_end)
plan_info = datasets_registry.get_plan_by_price_id(price_id)
if not plan_info:
return
plan_id = plan_info["plan"]["plan_id"]
# For renewals, try to preserve the existing access token
existing_sub = subscriptions_ledger.get_active_subscription(hf_user, dataset_id)
if existing_sub and existing_sub.get("access_token"):
access_token = existing_sub["access_token"]
else:
access_token = subscriptions_ledger.generate_access_token()
ledger_entry = {
"event_id": event_id,
"hf_user": hf_user,
"dataset_id": dataset_id,
"plan_id": plan_id,
"subscription_start": datetime.utcnow().isoformat() + "Z", # Or period start
"subscription_end": end_date.isoformat() + "Z",
"source": "stripe",
"access_token": access_token,
"created_at": datetime.utcnow().isoformat() + "Z",
"stripe_customer_id": invoice.get("customer"),
"stripe_subscription_id": subscription_id
}
subscriptions_ledger.append_subscription_event(ledger_entry)