Spaces:
Paused
Paused
| """ | |
| Feature Flags API Router | |
| Provides secure admin endpoints for managing feature flags | |
| """ | |
| import logging | |
| from typing import Any, Optional | |
| from fastapi import APIRouter, Depends, HTTPException, status | |
| from pydantic import BaseModel, Field | |
| from sqlalchemy.orm import Session | |
| from app.services.infrastructure.rbac_service import require_admin | |
| from core.database import User, get_db | |
| from core.feature_flags.models import FeatureFlag | |
| from core.feature_flags.service import feature_flag_service | |
| logger = logging.getLogger(__name__) | |
| router = APIRouter() | |
| # ===== PYDANTIC SCHEMAS ===== | |
| class FeatureFlagBase(BaseModel): | |
| """Base feature flag schema""" | |
| name: str = Field(..., min_length=1, max_length=100, description="Feature flag name") | |
| description: Optional[str] = Field(None, max_length=500, description="Feature flag description") | |
| enabled: bool = Field(default=False, description="Whether the flag is enabled") | |
| rollout_percentage: int = Field(default=0, ge=0, le=100, description="Rollout percentage (0-100)") | |
| target_users: Optional[list[str]] = Field(default=None, description="List of target user IDs") | |
| target_contexts: Optional[dict[str, Any]] = Field(default=None, description="Targeting rules/context") | |
| class FeatureFlagCreate(FeatureFlagBase): | |
| """Schema for creating a feature flag""" | |
| pass | |
| class FeatureFlagUpdate(BaseModel): | |
| """Schema for updating a feature flag""" | |
| description: Optional[str] = Field(None, max_length=500) | |
| enabled: Optional[bool] = None | |
| rollout_percentage: Optional[int] = Field(None, ge=0, le=100) | |
| target_users: Optional[list[str]] = None | |
| target_contexts: Optional[dict[str, Any]] = None | |
| class FeatureFlagResponse(FeatureFlagBase): | |
| """Response schema for feature flag""" | |
| flag_id: str | |
| created_at: str | |
| updated_at: str | |
| disabled_at: Optional[str] = None | |
| disabled_reason: Optional[str] = None | |
| class Config: | |
| from_attributes = True | |
| class FeatureFlagListResponse(BaseModel): | |
| """Response for listing feature flags""" | |
| flags: list[FeatureFlagResponse] | |
| total: int | |
| enabled_count: int | |
| class PublicFeatureFlagResponse(BaseModel): | |
| """Public response schema for feature flags (no sensitive data)""" | |
| name: str | |
| enabled: bool | |
| rollout_percentage: int | |
| class Config: | |
| from_attributes = True | |
| class PublicFeatureFlagsResponse(BaseModel): | |
| """Response for public feature flags endpoint""" | |
| flags: dict[str, bool] | |
| timestamp: str | |
| # ===== PUBLIC ENDPOINTS (NO AUTH REQUIRED) ===== | |
| async def get_public_feature_flags( | |
| user_id: Optional[str] = None, | |
| db: Session = Depends(get_db), | |
| ) -> PublicFeatureFlagsResponse: | |
| """ | |
| Get feature flags for public/frontend consumption (NO AUTH REQUIRED) | |
| This endpoint is used by the frontend to check feature flags without authentication. | |
| It evaluates rollout percentages and returns a simple enabled/disabled status. | |
| Args: | |
| user_id: Optional user identifier for percentage-based rollout | |
| Returns: | |
| Dictionary of feature flag names to their enabled status | |
| """ | |
| import hashlib | |
| from datetime import datetime | |
| try: | |
| # Get all non-disabled flags | |
| flags = db.query(FeatureFlag).filter(FeatureFlag.disabled_at.is_(None)).all() | |
| result = {} | |
| for flag in flags: | |
| # Simple enabled/disabled check | |
| if not flag.enabled: | |
| result[flag.name] = False | |
| continue | |
| # Check rollout percentage | |
| if flag.rollout_percentage >= 100: | |
| result[flag.name] = True | |
| elif flag.rollout_percentage <= 0: | |
| result[flag.name] = False | |
| elif user_id: | |
| # Consistent hash-based rollout | |
| hash_value = int(hashlib.md5(f"{flag.name}:{user_id}".encode()).hexdigest()[:8], 16) | |
| percentage = (hash_value % 100) / 100.0 | |
| result[flag.name] = percentage <= (flag.rollout_percentage / 100.0) | |
| else: | |
| # No user_id provided, use random for percentage | |
| result[flag.name] = flag.rollout_percentage >= 50 # Conservative default | |
| return PublicFeatureFlagsResponse(flags=result, timestamp=datetime.now().isoformat()) | |
| except Exception as e: | |
| # Log error but don't expose details to public | |
| logger.error(f"Error getting public feature flags: {e}") | |
| return PublicFeatureFlagsResponse(flags={}, timestamp=datetime.now().isoformat()) | |
| # ===== FEATURE FLAG ENDPOINTS ===== | |
| async def create_feature_flag( | |
| flag_data: FeatureFlagCreate, | |
| current_user: User = Depends(require_admin), # 🔴 ADMIN REQUIRED | |
| db: Session = Depends(get_db), | |
| ) -> FeatureFlagResponse: | |
| """ | |
| Create a new feature flag. | |
| **Requires admin role.** | |
| This endpoint allows administrators to create new feature flags | |
| for controlling application behavior and gradual feature rollouts. | |
| """ | |
| try: | |
| # Check if flag with this name already exists | |
| existing = db.query(FeatureFlag).filter(FeatureFlag.name == flag_data.name).first() | |
| if existing: | |
| raise HTTPException( | |
| status_code=status.HTTP_409_CONFLICT, detail=f"Feature flag '{flag_data.name}' already exists" | |
| ) | |
| # Create the feature flag | |
| flag = feature_flag_service.create_flag( | |
| name=flag_data.name, | |
| description=flag_data.description, | |
| enabled=flag_data.enabled, | |
| rollout_percentage=flag_data.rollout_percentage, | |
| target_users=flag_data.target_users, | |
| target_contexts=flag_data.target_contexts, | |
| created_by=current_user.id if hasattr(current_user, "id") else "system", | |
| ) | |
| return FeatureFlagResponse.from_orm(flag) | |
| except Exception as e: | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to create feature flag: {str(e)}" | |
| ) | |
| async def update_feature_flag( | |
| flag_name: str, | |
| flag_update: FeatureFlagUpdate, | |
| current_user: User = Depends(require_admin), # 🔴 ADMIN REQUIRED | |
| db: Session = Depends(get_db), | |
| ) -> FeatureFlagResponse: | |
| """ | |
| Update an existing feature flag. | |
| **Requires admin role.** | |
| This endpoint allows administrators to modify feature flag settings, | |
| including enabling/disabling features and adjusting rollout percentages. | |
| """ | |
| try: | |
| # Check if flag exists | |
| existing = db.query(FeatureFlag).filter(FeatureFlag.name == flag_name).first() | |
| if not existing: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Feature flag '{flag_name}' not found") | |
| # Update the flag | |
| success = feature_flag_service.update_flag( | |
| flag_name=flag_name, | |
| updates={k: v for k, v in flag_update.dict(exclude_unset=True).items() if v is not None}, | |
| updated_by=current_user.id if hasattr(current_user, "id") else "system", | |
| ) | |
| if not success: | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to update feature flag '{flag_name}'" | |
| ) | |
| # Get updated flag | |
| updated_flag = db.query(FeatureFlag).filter(FeatureFlag.name == flag_name).first() | |
| return FeatureFlagResponse.from_orm(updated_flag) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to update feature flag: {str(e)}" | |
| ) | |
| async def delete_feature_flag( | |
| flag_name: str, | |
| disabled_reason: Optional[str] = None, | |
| current_user: User = Depends(require_admin), # 🔴 ADMIN REQUIRED | |
| db: Session = Depends(get_db), | |
| ): | |
| """ | |
| Delete (disable) a feature flag. | |
| **Requires admin role.** | |
| This endpoint allows administrators to permanently disable feature flags. | |
| The flag is marked as disabled rather than deleted to maintain audit trail. | |
| """ | |
| try: | |
| # Check if flag exists and is not already disabled | |
| existing = db.query(FeatureFlag).filter(FeatureFlag.name == flag_name).first() | |
| if not existing: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Feature flag '{flag_name}' not found") | |
| if existing.disabled_at: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, detail=f"Feature flag '{flag_name}' is already disabled" | |
| ) | |
| # Delete (disable) the flag | |
| success = feature_flag_service.delete_flag( | |
| flag_name=flag_name, | |
| disabled_reason=disabled_reason or "Disabled by admin", | |
| disabled_by=current_user.id if hasattr(current_user, "id") else "system", | |
| ) | |
| if not success: | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to delete feature flag '{flag_name}'" | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to delete feature flag: {str(e)}" | |
| ) | |
| async def list_feature_flags( | |
| enabled_only: bool = False, | |
| skip: int = 0, | |
| limit: int = 100, | |
| current_user: User = Depends(require_admin), # 🔴 ADMIN REQUIRED | |
| db: Session = Depends(get_db), | |
| ) -> FeatureFlagListResponse: | |
| """ | |
| List all feature flags. | |
| **Requires admin role.** | |
| Returns a paginated list of all feature flags with summary statistics. | |
| """ | |
| try: | |
| query = db.query(FeatureFlag) | |
| if enabled_only: | |
| query = query.filter(FeatureFlag.enabled, FeatureFlag.disabled_at.is_(None)) | |
| total = query.count() | |
| flags = query.offset(skip).limit(limit).all() | |
| enabled_count = sum(1 for f in flags if f.enabled and not f.disabled_at) | |
| return FeatureFlagListResponse( | |
| flags=[FeatureFlagResponse.from_orm(flag) for flag in flags], total=total, enabled_count=enabled_count | |
| ) | |
| except Exception as e: | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to list feature flags: {str(e)}" | |
| ) | |
| async def get_feature_flag( | |
| flag_name: str, | |
| current_user: User = Depends(require_admin), # 🔴 ADMIN REQUIRED | |
| db: Session = Depends(get_db), | |
| ) -> FeatureFlagResponse: | |
| """ | |
| Get a specific feature flag by name. | |
| **Requires admin role.** | |
| """ | |
| try: | |
| flag = db.query(FeatureFlag).filter(FeatureFlag.name == flag_name).first() | |
| if not flag: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Feature flag '{flag_name}' not found") | |
| return FeatureFlagResponse.from_orm(flag) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to get feature flag: {str(e)}" | |
| ) | |
| async def toggle_feature_flag( | |
| flag_name: str, | |
| current_user: User = Depends(require_admin), # 🔴 ADMIN REQUIRED | |
| db: Session = Depends(get_db), | |
| ) -> FeatureFlagResponse: | |
| """ | |
| Toggle a feature flag on/off. | |
| **Requires admin role.** | |
| This is a convenience endpoint for quickly enabling/disabling flags. | |
| """ | |
| try: | |
| # Check if flag exists | |
| existing = db.query(FeatureFlag).filter(FeatureFlag.name == flag_name).first() | |
| if not existing: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Feature flag '{flag_name}' not found") | |
| if existing.disabled_at: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=f"Feature flag '{flag_name}' is disabled and cannot be toggled", | |
| ) | |
| # Toggle the flag | |
| new_state = not existing.enabled | |
| success = feature_flag_service.update_flag( | |
| flag_name=flag_name, | |
| updates={"enabled": new_state}, | |
| updated_by=current_user.id if hasattr(current_user, "id") else "system", | |
| ) | |
| if not success: | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to toggle feature flag '{flag_name}'" | |
| ) | |
| # Get updated flag | |
| updated_flag = db.query(FeatureFlag).filter(FeatureFlag.name == flag_name).first() | |
| return FeatureFlagResponse.from_orm(updated_flag) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to toggle feature flag: {str(e)}" | |
| ) | |