zenith-backend / app /routers /feature_flags.py
teoat's picture
fix(backend): fix port and health check robustness
d29a5a0 verified
"""
Feature Flags API Router
Provides secure admin endpoints for managing feature flags
"""
import logging
from typing import Any, Optional
from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel, Field
from sqlalchemy.orm import Session
from app.services.infrastructure.rbac_service import require_admin
from core.database import User, get_db
from core.feature_flags.models import FeatureFlag
from core.feature_flags.service import feature_flag_service
logger = logging.getLogger(__name__)
router = APIRouter()
# ===== PYDANTIC SCHEMAS =====
class FeatureFlagBase(BaseModel):
"""Base feature flag schema"""
name: str = Field(..., min_length=1, max_length=100, description="Feature flag name")
description: Optional[str] = Field(None, max_length=500, description="Feature flag description")
enabled: bool = Field(default=False, description="Whether the flag is enabled")
rollout_percentage: int = Field(default=0, ge=0, le=100, description="Rollout percentage (0-100)")
target_users: Optional[list[str]] = Field(default=None, description="List of target user IDs")
target_contexts: Optional[dict[str, Any]] = Field(default=None, description="Targeting rules/context")
class FeatureFlagCreate(FeatureFlagBase):
"""Schema for creating a feature flag"""
pass
class FeatureFlagUpdate(BaseModel):
"""Schema for updating a feature flag"""
description: Optional[str] = Field(None, max_length=500)
enabled: Optional[bool] = None
rollout_percentage: Optional[int] = Field(None, ge=0, le=100)
target_users: Optional[list[str]] = None
target_contexts: Optional[dict[str, Any]] = None
class FeatureFlagResponse(FeatureFlagBase):
"""Response schema for feature flag"""
flag_id: str
created_at: str
updated_at: str
disabled_at: Optional[str] = None
disabled_reason: Optional[str] = None
class Config:
from_attributes = True
class FeatureFlagListResponse(BaseModel):
"""Response for listing feature flags"""
flags: list[FeatureFlagResponse]
total: int
enabled_count: int
class PublicFeatureFlagResponse(BaseModel):
"""Public response schema for feature flags (no sensitive data)"""
name: str
enabled: bool
rollout_percentage: int
class Config:
from_attributes = True
class PublicFeatureFlagsResponse(BaseModel):
"""Response for public feature flags endpoint"""
flags: dict[str, bool]
timestamp: str
# ===== PUBLIC ENDPOINTS (NO AUTH REQUIRED) =====
@router.get("/public", response_model=PublicFeatureFlagsResponse)
async def get_public_feature_flags(
user_id: Optional[str] = None,
db: Session = Depends(get_db),
) -> PublicFeatureFlagsResponse:
"""
Get feature flags for public/frontend consumption (NO AUTH REQUIRED)
This endpoint is used by the frontend to check feature flags without authentication.
It evaluates rollout percentages and returns a simple enabled/disabled status.
Args:
user_id: Optional user identifier for percentage-based rollout
Returns:
Dictionary of feature flag names to their enabled status
"""
import hashlib
from datetime import datetime
try:
# Get all non-disabled flags
flags = db.query(FeatureFlag).filter(FeatureFlag.disabled_at.is_(None)).all()
result = {}
for flag in flags:
# Simple enabled/disabled check
if not flag.enabled:
result[flag.name] = False
continue
# Check rollout percentage
if flag.rollout_percentage >= 100:
result[flag.name] = True
elif flag.rollout_percentage <= 0:
result[flag.name] = False
elif user_id:
# Consistent hash-based rollout
hash_value = int(hashlib.md5(f"{flag.name}:{user_id}".encode()).hexdigest()[:8], 16)
percentage = (hash_value % 100) / 100.0
result[flag.name] = percentage <= (flag.rollout_percentage / 100.0)
else:
# No user_id provided, use random for percentage
result[flag.name] = flag.rollout_percentage >= 50 # Conservative default
return PublicFeatureFlagsResponse(flags=result, timestamp=datetime.now().isoformat())
except Exception as e:
# Log error but don't expose details to public
logger.error(f"Error getting public feature flags: {e}")
return PublicFeatureFlagsResponse(flags={}, timestamp=datetime.now().isoformat())
# ===== FEATURE FLAG ENDPOINTS =====
@router.post("/", response_model=FeatureFlagResponse, status_code=status.HTTP_201_CREATED)
async def create_feature_flag(
flag_data: FeatureFlagCreate,
current_user: User = Depends(require_admin), # 🔴 ADMIN REQUIRED
db: Session = Depends(get_db),
) -> FeatureFlagResponse:
"""
Create a new feature flag.
**Requires admin role.**
This endpoint allows administrators to create new feature flags
for controlling application behavior and gradual feature rollouts.
"""
try:
# Check if flag with this name already exists
existing = db.query(FeatureFlag).filter(FeatureFlag.name == flag_data.name).first()
if existing:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT, detail=f"Feature flag '{flag_data.name}' already exists"
)
# Create the feature flag
flag = feature_flag_service.create_flag(
name=flag_data.name,
description=flag_data.description,
enabled=flag_data.enabled,
rollout_percentage=flag_data.rollout_percentage,
target_users=flag_data.target_users,
target_contexts=flag_data.target_contexts,
created_by=current_user.id if hasattr(current_user, "id") else "system",
)
return FeatureFlagResponse.from_orm(flag)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to create feature flag: {str(e)}"
)
@router.put("/{flag_name}", response_model=FeatureFlagResponse)
async def update_feature_flag(
flag_name: str,
flag_update: FeatureFlagUpdate,
current_user: User = Depends(require_admin), # 🔴 ADMIN REQUIRED
db: Session = Depends(get_db),
) -> FeatureFlagResponse:
"""
Update an existing feature flag.
**Requires admin role.**
This endpoint allows administrators to modify feature flag settings,
including enabling/disabling features and adjusting rollout percentages.
"""
try:
# Check if flag exists
existing = db.query(FeatureFlag).filter(FeatureFlag.name == flag_name).first()
if not existing:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Feature flag '{flag_name}' not found")
# Update the flag
success = feature_flag_service.update_flag(
flag_name=flag_name,
updates={k: v for k, v in flag_update.dict(exclude_unset=True).items() if v is not None},
updated_by=current_user.id if hasattr(current_user, "id") else "system",
)
if not success:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to update feature flag '{flag_name}'"
)
# Get updated flag
updated_flag = db.query(FeatureFlag).filter(FeatureFlag.name == flag_name).first()
return FeatureFlagResponse.from_orm(updated_flag)
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to update feature flag: {str(e)}"
)
@router.delete("/{flag_name}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_feature_flag(
flag_name: str,
disabled_reason: Optional[str] = None,
current_user: User = Depends(require_admin), # 🔴 ADMIN REQUIRED
db: Session = Depends(get_db),
):
"""
Delete (disable) a feature flag.
**Requires admin role.**
This endpoint allows administrators to permanently disable feature flags.
The flag is marked as disabled rather than deleted to maintain audit trail.
"""
try:
# Check if flag exists and is not already disabled
existing = db.query(FeatureFlag).filter(FeatureFlag.name == flag_name).first()
if not existing:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Feature flag '{flag_name}' not found")
if existing.disabled_at:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail=f"Feature flag '{flag_name}' is already disabled"
)
# Delete (disable) the flag
success = feature_flag_service.delete_flag(
flag_name=flag_name,
disabled_reason=disabled_reason or "Disabled by admin",
disabled_by=current_user.id if hasattr(current_user, "id") else "system",
)
if not success:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to delete feature flag '{flag_name}'"
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to delete feature flag: {str(e)}"
)
@router.get("/", response_model=FeatureFlagListResponse)
async def list_feature_flags(
enabled_only: bool = False,
skip: int = 0,
limit: int = 100,
current_user: User = Depends(require_admin), # 🔴 ADMIN REQUIRED
db: Session = Depends(get_db),
) -> FeatureFlagListResponse:
"""
List all feature flags.
**Requires admin role.**
Returns a paginated list of all feature flags with summary statistics.
"""
try:
query = db.query(FeatureFlag)
if enabled_only:
query = query.filter(FeatureFlag.enabled, FeatureFlag.disabled_at.is_(None))
total = query.count()
flags = query.offset(skip).limit(limit).all()
enabled_count = sum(1 for f in flags if f.enabled and not f.disabled_at)
return FeatureFlagListResponse(
flags=[FeatureFlagResponse.from_orm(flag) for flag in flags], total=total, enabled_count=enabled_count
)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to list feature flags: {str(e)}"
)
@router.get("/{flag_name}", response_model=FeatureFlagResponse)
async def get_feature_flag(
flag_name: str,
current_user: User = Depends(require_admin), # 🔴 ADMIN REQUIRED
db: Session = Depends(get_db),
) -> FeatureFlagResponse:
"""
Get a specific feature flag by name.
**Requires admin role.**
"""
try:
flag = db.query(FeatureFlag).filter(FeatureFlag.name == flag_name).first()
if not flag:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Feature flag '{flag_name}' not found")
return FeatureFlagResponse.from_orm(flag)
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to get feature flag: {str(e)}"
)
@router.post("/{flag_name}/toggle", response_model=FeatureFlagResponse)
async def toggle_feature_flag(
flag_name: str,
current_user: User = Depends(require_admin), # 🔴 ADMIN REQUIRED
db: Session = Depends(get_db),
) -> FeatureFlagResponse:
"""
Toggle a feature flag on/off.
**Requires admin role.**
This is a convenience endpoint for quickly enabling/disabling flags.
"""
try:
# Check if flag exists
existing = db.query(FeatureFlag).filter(FeatureFlag.name == flag_name).first()
if not existing:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Feature flag '{flag_name}' not found")
if existing.disabled_at:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Feature flag '{flag_name}' is disabled and cannot be toggled",
)
# Toggle the flag
new_state = not existing.enabled
success = feature_flag_service.update_flag(
flag_name=flag_name,
updates={"enabled": new_state},
updated_by=current_user.id if hasattr(current_user, "id") else "system",
)
if not success:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to toggle feature flag '{flag_name}'"
)
# Get updated flag
updated_flag = db.query(FeatureFlag).filter(FeatureFlag.name == flag_name).first()
return FeatureFlagResponse.from_orm(updated_flag)
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to toggle feature flag: {str(e)}"
)