GrowthOps_OS / app /main.py
Corin1998's picture
Update app/main.py
105ee76 verified
from fastapi import FastAPI, Depends, HTTPException, Request, status, Form
from fastapi.responses import HTMLResponse, FileResponse
from fastapi.middleware.cors import CORSMiddleware
from prometheus_client import Counter, generate_latest, CONTENT_TYPE_LATEST
from sqlalchemy.orm import Session
from loguru import logger
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import Response
from starlette.templating import Jinja2Templates
from .config import Settings
from .database import Base, engine, get_db
from .models import *
from .schemas import *
from .security import *
from .tasks import celery_app, generate_pptx, generate_docx
from .telemetry import setup_tracing
import os
settings = Settings()
# Observability
setup_tracing()
# DB init
Base.metadata.create_all(bind=engine)
# FastAPI app
app = FastAPI(title="GrowthOps OS", version="0.1.0")
# CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
API_CALLS = Counter("growthops_api_calls", "Total API calls", ["path", "method", "status"])
templates = Jinja2Templates(directory="templates")
class AuditMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
response = None
error_text = None
try:
response = await call_next(request)
return response
except Exception as e:
error_text = str(e)
logger.exception("Unhandled error")
raise
finally:
try:
tenant_id = None
user_id = None
auth = request.headers.get("authorization", "")
if auth.startswith("Bearer "):
token = auth.split(" ", 1)[1]
try:
payload = parse_token(token)
db = next(get_db())
user = db.query(User).filter(User.email == payload.get("sub")).first()
if user:
tenant_id = user.tenant_id
user_id = user.id
except Exception:
pass
db = next(get_db())
al = AuditLog(
tenant_id=tenant_id,
user_id=user_id,
path=request.url.path,
method=request.method,
status_code=getattr(response, "status_code", 500),
meta={"client": request.client.host if request.client else None}
)
db.add(al)
if error_text:
err = ErrorLog(tenant_id=tenant_id, user_id=user_id, path=request.url.path, error=error_text)
db.add(err)
db.commit()
API_CALLS.labels(path=request.url.path, method=request.method, status=str(getattr(response, "status_code", 500))).inc()
except Exception as e:
logger.error(f"audit failed: {e}")
app.add_middleware(AuditMiddleware)
@app.get("/healthz")
def healthz():
return {"ok": True}
@app.get("/metrics")
def metrics():
return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST)
# ------------------ Auth ------------------
@app.post("/auth/bootstrap_admin", response_model=SimpleMessage)
def bootstrap_admin(body: dict, db: Session = Depends(get_db)):
email = body.get("email")
password = body.get("password")
if not email or not password:
raise HTTPException(400, "email/password required")
user = db.query(User).filter(User.email == email).first()
if user:
return {"message": "exists"}
tenant = db.query(Tenant).filter(Tenant.name == "default").first()
if not tenant:
tenant = Tenant(name="default")
db.add(tenant)
db.commit()
db.refresh(tenant)
u = User(email=email, password_hash=hash_password(password), tenant_id=tenant.id, is_tenant_admin=True)
db.add(u)
db.commit()
return {"message": "admin created"}
@app.post("/auth/register", response_model=UserOut)
def register(user: UserCreate, current: User = Depends(require_tenant_admin), db: Session = Depends(get_db)):
if db.query(User).filter(User.email == user.email).first():
raise HTTPException(400, "Email already exists")
u = User(email=user.email, password_hash=hash_password(user.password), tenant_id=user.tenant_id, is_tenant_admin=user.is_tenant_admin)
db.add(u)
db.commit()
db.refresh(u)
return u
@app.post("/auth/login", response_model=TokenOut)
def login(req: LoginRequest, db: Session = Depends(get_db)):
user = db.query(User).filter(User.email == req.email).first()
if not user or not verify_password(req.password, user.password_hash):
raise HTTPException(401, "Invalid credentials")
token = create_access_token(sub=user.email, expires_in=3600)
return {"access_token": token, "token_type": "bearer"}
@app.get("/auth/me", response_model=UserOut)
def me(current: User = Depends(get_current_user)):
return current
# ------------------ Tenants / Plans ------------------
@app.post("/tenants", response_model=TenantOut)
def create_tenant_api(t: TenantCreate, current: User = Depends(require_tenant_admin), db: Session = Depends(get_db)):
if db.query(Tenant).filter(Tenant.name == t.name).first():
raise HTTPException(400, "Tenant exists")
tenant = Tenant(name=t.name)
db.add(tenant)
db.commit()
db.refresh(tenant)
return tenant
@app.get("/tenants", response_model=list[TenantOut])
def list_tenants(current: User = Depends(require_tenant_admin), db: Session = Depends(get_db)):
return db.query(Tenant).all()
@app.post("/plans", response_model=SimpleMessage)
def create_plan(p: PlanCreate, current: User = Depends(require_tenant_admin), db: Session = Depends(get_db)):
plan = Plan(name=p.name, monthly_quota=p.monthly_quota, features=p.features)
db.add(plan)
db.commit()
return {"message": "ok"}
@app.post("/subscriptions", response_model=SimpleMessage)
def create_subscription(s: SubscriptionCreate, current: User = Depends(require_tenant_admin), db: Session = Depends(get_db)):
sub = Subscription(tenant_id=s.tenant_id, plan_id=s.plan_id, active=True)