GrantForge Bot
Deploy to Hugging Face
afd56bc
import os
import stripe
from fastapi import APIRouter, Request, HTTPException
from core.subscription.db import SessionLocal
from core.subscription.models import User
from clerk_backend_api import Clerk
stripe.api_key = os.getenv("STRIPE_SECRET_KEY")
endpoint_secret = os.getenv("STRIPE_WEBHOOK_SECRET")
clerk_secret = os.getenv("CLERK_SECRET_KEY")
clerk = Clerk(bearer_auth=clerk_secret) if clerk_secret else None
router = APIRouter(prefix="/api/webhooks", tags=["webhooks"])
@router.post("/clerk")
async def clerk_webhook(request: Request):
import json
payload = await request.body()
try:
data = json.loads(payload)
evt_type = data.get("type")
if evt_type == "user.created":
clerk_id = data.get("data", {}).get("id")
if clerk_id:
from core.subscription.models import UserUsage
db = SessionLocal()
try:
user = db.query(User).filter(User.clerk_id == clerk_id).first()
if not user:
user = User(clerk_id=clerk_id, tier="free")
db.add(user)
usage = (
db.query(UserUsage)
.filter(UserUsage.user_id == clerk_id)
.first()
)
if not usage:
usage = UserUsage(user_id=clerk_id)
db.add(usage)
db.commit()
except Exception:
db.rollback()
finally:
db.close()
except Exception:
pass
return {"status": "success"}
@router.post("/stripe")
async def stripe_webhook(request: Request):
payload = await request.body()
sig_header = request.headers.get("stripe-signature")
if not endpoint_secret:
raise HTTPException(
status_code=500,
detail="Trzeba ustawić STRIPE_WEBHOOK_SECRET w zmiennych ENV. Względy bezpieczeństwa!",
)
try:
event = stripe.Webhook.construct_event(payload, sig_header, endpoint_secret)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid payload")
except stripe.error.SignatureVerificationError:
raise HTTPException(status_code=400, detail="Invalid signature")
if event["type"] == "checkout.session.completed":
session = event["data"]["object"]
clerk_id = session.get("client_reference_id")
customer_id = session.get("customer")
subscription_id = session.get("subscription")
if clerk_id:
await activate_subscription(clerk_id, customer_id, subscription_id, "pro")
elif event["type"] == "customer.subscription.deleted":
subscription = event["data"]["object"]
await disable_subscription_by_sub_id(subscription.get("id"))
return {"status": "success"}
async def activate_subscription(
clerk_id: str, customer_id: str, subscription_id: str, tier: str
):
# Aktualizacja 2-fazowa - najpierw uderzamy do systemu zew (Clerk), jeżeli się uda zatwierdzamy bazę do stanu aktualnego
if clerk:
try:
clerk.users.update_user(
clerk_id, public_metadata={"stripe_subscription": tier}
)
except Exception as e:
from core.audit_logger import audit_log
try:
audit_log(
"ERROR", f"Failed Clerk sub update - DB ROLLBACK triggered: {e}"
)
except Exception:
pass
return # Nie uderzamy o bazę jeśli Clerk odrzucił (fail fast)
db = SessionLocal()
try:
user = db.query(User).filter(User.clerk_id == clerk_id).first()
if not user:
user = User(clerk_id=clerk_id)
db.add(user)
user.tier = tier
user.stripe_customer_id = customer_id
user.stripe_subscription_id = subscription_id
db.commit()
except Exception as e:
db.rollback()
from core.audit_logger import audit_log
try:
audit_log("ERROR", f"Failed DB sub update po pomyslnym Clerk: {e}")
except Exception:
pass
# System jest trochę rozjechany: Clerk załapał PRO, baza nie załapała
# W checker.py od teraz uderzamy Clerk jako główne źródło wiedzy więc check limits zadziała dla PRO
finally:
db.close()
async def disable_subscription_by_sub_id(subscription_id: str):
db = SessionLocal()
try:
user = (
db.query(User)
.filter(User.stripe_subscription_id == subscription_id)
.first()
)
if user:
clerk_id = user.clerk_id
if clerk:
try:
clerk.users.update_user(
clerk_id, public_metadata={"stripe_subscription": "free"}
)
except Exception:
pass
user.tier = "free"
db.commit()
except Exception:
db.rollback()
finally:
db.close()