import os import stripe from fastapi import APIRouter, Request, HTTPException from core.subscription.db import SessionLocal from core.subscription.models import User from clerk_backend_api import Clerk stripe.api_key = os.getenv("STRIPE_SECRET_KEY") endpoint_secret = os.getenv("STRIPE_WEBHOOK_SECRET") clerk_secret = os.getenv("CLERK_SECRET_KEY") clerk = Clerk(bearer_auth=clerk_secret) if clerk_secret else None router = APIRouter(prefix="/api/webhooks", tags=["webhooks"]) @router.post("/clerk") async def clerk_webhook(request: Request): import json payload = await request.body() try: data = json.loads(payload) evt_type = data.get("type") if evt_type == "user.created": clerk_id = data.get("data", {}).get("id") if clerk_id: from core.subscription.models import UserUsage db = SessionLocal() try: user = db.query(User).filter(User.clerk_id == clerk_id).first() if not user: user = User(clerk_id=clerk_id, tier="free") db.add(user) usage = ( db.query(UserUsage) .filter(UserUsage.user_id == clerk_id) .first() ) if not usage: usage = UserUsage(user_id=clerk_id) db.add(usage) db.commit() except Exception: db.rollback() finally: db.close() except Exception: pass return {"status": "success"} @router.post("/stripe") async def stripe_webhook(request: Request): payload = await request.body() sig_header = request.headers.get("stripe-signature") if not endpoint_secret: raise HTTPException( status_code=500, detail="Trzeba ustawić STRIPE_WEBHOOK_SECRET w zmiennych ENV. Względy bezpieczeństwa!", ) try: event = stripe.Webhook.construct_event(payload, sig_header, endpoint_secret) except ValueError: raise HTTPException(status_code=400, detail="Invalid payload") except stripe.error.SignatureVerificationError: raise HTTPException(status_code=400, detail="Invalid signature") if event["type"] == "checkout.session.completed": session = event["data"]["object"] clerk_id = session.get("client_reference_id") customer_id = session.get("customer") subscription_id = session.get("subscription") if clerk_id: await activate_subscription(clerk_id, customer_id, subscription_id, "pro") elif event["type"] == "customer.subscription.deleted": subscription = event["data"]["object"] await disable_subscription_by_sub_id(subscription.get("id")) return {"status": "success"} async def activate_subscription( clerk_id: str, customer_id: str, subscription_id: str, tier: str ): # Aktualizacja 2-fazowa - najpierw uderzamy do systemu zew (Clerk), jeżeli się uda zatwierdzamy bazę do stanu aktualnego if clerk: try: clerk.users.update_user( clerk_id, public_metadata={"stripe_subscription": tier} ) except Exception as e: from core.audit_logger import audit_log try: audit_log( "ERROR", f"Failed Clerk sub update - DB ROLLBACK triggered: {e}" ) except Exception: pass return # Nie uderzamy o bazę jeśli Clerk odrzucił (fail fast) db = SessionLocal() try: user = db.query(User).filter(User.clerk_id == clerk_id).first() if not user: user = User(clerk_id=clerk_id) db.add(user) user.tier = tier user.stripe_customer_id = customer_id user.stripe_subscription_id = subscription_id db.commit() except Exception as e: db.rollback() from core.audit_logger import audit_log try: audit_log("ERROR", f"Failed DB sub update po pomyslnym Clerk: {e}") except Exception: pass # System jest trochę rozjechany: Clerk załapał PRO, baza nie załapała # W checker.py od teraz uderzamy Clerk jako główne źródło wiedzy więc check limits zadziała dla PRO finally: db.close() async def disable_subscription_by_sub_id(subscription_id: str): db = SessionLocal() try: user = ( db.query(User) .filter(User.stripe_subscription_id == subscription_id) .first() ) if user: clerk_id = user.clerk_id if clerk: try: clerk.users.update_user( clerk_id, public_metadata={"stripe_subscription": "free"} ) except Exception: pass user.tier = "free" db.commit() except Exception: db.rollback() finally: db.close()