ppt-web / src /landppt /api /config_api.py
26fwyzpz6f-max
Clean deploy without binary files
6aecb2e
Raw
History Blame Contribute Delete
27.9 kB
"""
Configuration management API for LandPPT
Supports per-user isolated configuration and system-level defaults
"""
from typing import Dict, Any, List, Optional
from fastapi import APIRouter, HTTPException, Depends, Request
from pydantic import BaseModel
import logging
from ..services.db_config_service import (
get_db_config_service,
DatabaseConfigService,
get_user_llm_timeout_seconds,
)
from ..auth.middleware import get_current_admin_user, get_current_user_required
from ..database.models import User
logger = logging.getLogger(__name__)
router = APIRouter()
# Pydantic models
class ConfigUpdateRequest(BaseModel):
config: Dict[str, Any]
class DefaultProviderRequest(BaseModel):
provider: str
# ==================== User Config Endpoints ====================
# These endpoints allow users to manage their own configuration
@router.get("/api/config/user")
async def get_user_config(
user: User = Depends(get_current_user_required)
):
"""Get current user's configuration (merged with system defaults, excluding admin-only categories)"""
try:
config_service = get_db_config_service()
# Use get_all_config_for_user which filters admin-only categories for non-admins
config = await config_service.get_all_config_for_user(user_id=user.id, is_admin=user.is_admin)
return {
"success": True,
"config": config,
"user_id": user.id,
"is_admin": user.is_admin
}
except Exception as e:
logger.error(f"Failed to get user configuration: {e}")
raise HTTPException(status_code=500, detail="Failed to get user configuration")
@router.get("/api/config/user/{category}")
async def get_user_config_by_category(
category: str,
user: User = Depends(get_current_user_required)
):
"""Get user's configuration for a specific category"""
try:
config_service = get_db_config_service()
schema = config_service.get_config_schema(include_admin_only=True)
config = await config_service.get_all_config_for_user(user_id=user.id, is_admin=user.is_admin)
config = {
key: value
for key, value in config.items()
if schema.get(key, {}).get("category") == category
}
return {
"success": True,
"config": config,
"category": category
}
except Exception as e:
logger.error(f"Failed to get user configuration for category {category}: {e}")
raise HTTPException(status_code=500, detail=f"Failed to get configuration for category {category}")
@router.post("/api/config/user")
async def update_user_config(
request: ConfigUpdateRequest,
user: User = Depends(get_current_user_required)
):
"""Update current user's configuration"""
try:
config_service = get_db_config_service()
# Prevent non-admin users from updating admin-only keys (which are stored at system scope).
schema = config_service.get_config_schema(include_admin_only=True)
admin_only_keys = {
key for key, settings in schema.items()
if settings.get("admin_only", False)
}
filtered_config = request.config
if not user.is_admin:
filtered_config = {
key: value for key, value in (request.config or {}).items()
if key not in admin_only_keys
}
success = await config_service.update_config(filtered_config, user_id=user.id)
if success:
return {
"success": True,
"message": "User configuration updated successfully"
}
else:
raise HTTPException(status_code=500, detail="Failed to update user configuration")
except Exception as e:
logger.error(f"Failed to update user configuration: {e}")
raise HTTPException(status_code=500, detail="Failed to update user configuration")
@router.post("/api/config/user/{category}")
async def update_user_config_by_category(
category: str,
request: ConfigUpdateRequest,
user: User = Depends(get_current_user_required)
):
"""Update user's configuration for a specific category"""
try:
config_service = get_db_config_service()
# Disallow updates to admin-only categories for non-admin users.
if not user.is_admin and category in getattr(config_service, "ADMIN_ONLY_CATEGORIES", set()):
raise HTTPException(status_code=403, detail="Not authorized to update this category")
# Filter config to only include keys from the specified category
schema = config_service.get_config_schema()
filtered_config = {
key: value
for key, value in request.config.items()
if key in schema
and schema[key].get("category") == category
and (user.is_admin or not schema[key].get("admin_only", False))
}
success = await config_service.update_config(filtered_config, user_id=user.id)
if success:
return {
"success": True,
"message": f"User configuration for {category} updated successfully"
}
else:
raise HTTPException(status_code=500, detail=f"Failed to update configuration for {category}")
except Exception as e:
logger.error(f"Failed to update user configuration for category {category}: {e}")
raise HTTPException(status_code=500, detail=f"Failed to update configuration for {category}")
@router.post("/api/config/user/reset")
async def reset_user_config(
user: User = Depends(get_current_user_required)
):
"""Reset user's configuration to system defaults"""
try:
config_service = get_db_config_service()
success = await config_service.reset_user_config(user.id)
if success:
return {
"success": True,
"message": "User configuration reset to system defaults"
}
else:
raise HTTPException(status_code=500, detail="Failed to reset user configuration")
except Exception as e:
logger.error(f"Failed to reset user configuration: {e}")
raise HTTPException(status_code=500, detail="Failed to reset user configuration")
@router.post("/api/config/user/reset/{category}")
async def reset_user_config_by_category(
category: str,
user: User = Depends(get_current_user_required)
):
"""Reset user's configuration for a specific category to system defaults"""
try:
config_service = get_db_config_service()
success = await config_service.reset_user_config(user.id, category=category)
if success:
return {
"success": True,
"message": f"User configuration for {category} reset to system defaults"
}
else:
raise HTTPException(status_code=500, detail=f"Failed to reset configuration for {category}")
except Exception as e:
logger.error(f"Failed to reset user configuration for category {category}: {e}")
raise HTTPException(status_code=500, detail=f"Failed to reset configuration for {category}")
# Set default AI provider for user
@router.post("/api/config/default-provider")
async def set_user_default_provider(
request: DefaultProviderRequest,
user: User = Depends(get_current_user_required)
):
"""Set default AI provider for current user"""
try:
config_service = get_db_config_service()
success = await config_service.update_config(
{"default_ai_provider": request.provider},
user_id=user.id
)
if success:
return {
"success": True,
"message": f"Default provider set to {request.provider}",
"provider": request.provider
}
else:
raise HTTPException(status_code=500, detail="Failed to set default provider")
except Exception as e:
logger.error(f"Failed to set default provider: {e}")
raise HTTPException(status_code=500, detail="Failed to set default provider")
@router.get("/api/config/current-provider")
async def get_user_current_provider(
user: User = Depends(get_current_user_required)
):
"""Get current user's default AI provider"""
try:
config_service = get_db_config_service()
provider = await config_service.get_config_value("default_ai_provider", user_id=user.id)
return {
"success": True,
"current_provider": provider
}
except Exception as e:
logger.error(f"Failed to get current provider: {e}")
raise HTTPException(status_code=500, detail="Failed to get current provider")
@router.get("/api/config/schema")
async def get_config_schema(
user: User = Depends(get_current_user_required)
):
"""Get configuration schema (excluding admin-only categories for non-admins)"""
try:
config_service = get_db_config_service()
# Filter out admin-only categories for non-admin users
schema = config_service.get_config_schema(include_admin_only=user.is_admin)
return {
"success": True,
"schema": schema,
"is_admin": user.is_admin
}
except Exception as e:
logger.error(f"Failed to get configuration schema: {e}")
raise HTTPException(status_code=500, detail="Failed to get configuration schema")
# ==================== Admin System Config Endpoints ====================
# These endpoints allow admins to manage system-wide default configuration
@router.get("/api/config/system")
async def get_system_config(
user: User = Depends(get_current_admin_user)
):
"""Get system default configuration (admin only)"""
try:
config_service = get_db_config_service()
await config_service.initialize_system_defaults()
config = await config_service.get_all_config(user_id=None) # None = system defaults
return {
"success": True,
"config": config,
"is_system": True
}
except Exception as e:
logger.error(f"Failed to get system configuration: {e}")
raise HTTPException(status_code=500, detail="Failed to get system configuration")
@router.post("/api/config/system")
async def update_system_config(
request: ConfigUpdateRequest,
user: User = Depends(get_current_admin_user)
):
"""Update system default configuration (admin only)"""
try:
config_service = get_db_config_service()
success = await config_service.update_config(request.config, user_id=None) # None = system
if success:
return {
"success": True,
"message": "System configuration updated successfully"
}
else:
raise HTTPException(status_code=500, detail="Failed to update system configuration")
except Exception as e:
logger.error(f"Failed to update system configuration: {e}")
raise HTTPException(status_code=500, detail="Failed to update system configuration")
@router.post("/api/config/system/{category}")
async def update_system_config_by_category(
category: str,
request: ConfigUpdateRequest,
user: User = Depends(get_current_admin_user)
):
"""Update system configuration for a specific category (admin only)"""
try:
config_service = get_db_config_service()
# Filter config to only include keys from the specified category
schema = config_service.get_config_schema()
filtered_config = {
key: value
for key, value in request.config.items()
if key in schema and schema[key].get("category") == category
}
success = await config_service.update_config(filtered_config, user_id=None)
if success:
return {
"success": True,
"message": f"System configuration for {category} updated successfully"
}
else:
raise HTTPException(status_code=500, detail=f"Failed to update system configuration for {category}")
except Exception as e:
logger.error(f"Failed to update system configuration for category {category}: {e}")
raise HTTPException(status_code=500, detail=f"Failed to update system configuration for {category}")
@router.post("/api/config/system/initialize")
async def initialize_system_config(
user: User = Depends(get_current_admin_user)
):
"""Initialize system default configurations from schema defaults (admin only)"""
try:
config_service = get_db_config_service()
count = await config_service.initialize_system_defaults()
return {
"success": True,
"message": f"Initialized {count} system default configurations",
"count": count
}
except Exception as e:
logger.error(f"Failed to initialize system configurations: {e}")
raise HTTPException(status_code=500, detail="Failed to initialize system configurations")
# ==================== Legacy Compatibility Endpoints ====================
# These maintain backward compatibility with old API calls
@router.get("/api/config/all")
async def get_all_config(
user: User = Depends(get_current_user_required)
):
"""Get all configuration values for current user (legacy compatibility)"""
try:
config_service = get_db_config_service()
# Backward-compatible promotion: if an admin previously saved a Tavily key as a user override,
# promote it to system scope so other users can use research without configuring their own key.
if getattr(user, "is_admin", False):
try:
system_tavily = await config_service.get_config_value("tavily_api_key", user_id=None)
if not system_tavily and await config_service.is_user_override(user.id, "tavily_api_key"):
admin_config = await config_service.get_all_config(user_id=user.id)
admin_tavily = admin_config.get("tavily_api_key")
if admin_tavily:
await config_service.update_config({"tavily_api_key": admin_tavily}, user_id=None)
except Exception:
pass
config = await config_service.get_all_config(user_id=user.id)
# Filter out admin_only fields for non-admin users
if not user.is_admin:
schema = config_service.get_config_schema()
admin_only_keys = [
key for key, settings in schema.items()
if settings.get("admin_only", False)
]
for key in admin_only_keys:
if key in config:
del config[key]
# Redact sensitive values so system/admin defaults are never sent to the browser.
# If a normal user has their own override, it's OK to return it to that user.
tavily_value = config.get("tavily_api_key")
tavily_configured = bool(str(tavily_value).strip()) if tavily_value is not None else False
user_has_override = False
if not getattr(user, "is_admin", False):
try:
user_has_override = await config_service.is_user_override(user.id, "tavily_api_key")
except Exception:
user_has_override = False
if getattr(user, "is_admin", False) or not user_has_override:
config.pop("tavily_api_key", None)
config["tavily_api_key_configured"] = tavily_configured
config["tavily_uses_admin_default"] = bool(tavily_configured and (not user.is_admin) and (not user_has_override))
return {
"success": True,
"config": config
}
except Exception as e:
logger.error(f"Failed to get configuration: {e}")
raise HTTPException(status_code=500, detail="Failed to get configuration")
@router.post("/api/config/all")
async def update_all_config(
request: ConfigUpdateRequest,
user: User = Depends(get_current_user_required)
):
"""Update all configuration values for current user (legacy compatibility)"""
try:
config_service = get_db_config_service()
# Prevent non-admin users from updating admin-only keys (which are stored at system scope).
schema = config_service.get_config_schema(include_admin_only=True)
admin_only_keys = {
key for key, settings in schema.items()
if settings.get("admin_only", False)
}
filtered_config = request.config
if not user.is_admin:
filtered_config = {
key: value for key, value in (request.config or {}).items()
if key not in admin_only_keys
}
# For sensitive settings, admins act as the "system default" so other users can use
# the feature without configuring their own key.
system_scoped = {}
user_scoped = dict(filtered_config or {})
if getattr(user, "is_admin", False) and "tavily_api_key" in user_scoped:
system_scoped["tavily_api_key"] = user_scoped.pop("tavily_api_key")
if system_scoped:
ok = await config_service.update_config(system_scoped, user_id=None)
if not ok:
raise HTTPException(status_code=500, detail="Failed to update configuration")
success = True
if user_scoped:
success = await config_service.update_config(user_scoped, user_id=user.id)
if success:
return {
"success": True,
"message": "Configuration updated successfully"
}
else:
raise HTTPException(status_code=500, detail="Failed to update configuration")
except Exception as e:
logger.error(f"Failed to update configuration: {e}")
raise HTTPException(status_code=500, detail="Failed to update configuration")
@router.get("/api/config/landppt/models")
async def get_landppt_models(
user: User = Depends(get_current_user_required)
):
"""
Fetch available models for LandPPT using system-level config.
The API key is used on the backend and never exposed to the frontend.
"""
import aiohttp
try:
config_service = get_db_config_service()
# Get system-level config (user_id=None)
api_key = await config_service.get_config_value("landppt_api_key", user_id=None)
base_url = await config_service.get_config_value("landppt_base_url", user_id=None)
if not api_key:
return {
"success": False,
"error": "管理员尚未配置 LandPPT API Key"
}
if not base_url:
base_url = "https://api.openai.com/v1"
# Ensure base URL ends with /v1
if not base_url.endswith('/v1'):
base_url = base_url.rstrip('/') + '/v1'
# Fetch models from the API
models_url = f"{base_url}/models"
timeout_seconds = await get_user_llm_timeout_seconds(user.id)
async with aiohttp.ClientSession() as session:
async with session.get(
models_url,
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
},
timeout=aiohttp.ClientTimeout(total=timeout_seconds)
) as response:
if response.status == 200:
data = await response.json()
models = data.get("data", [])
# Filter and sort models
model_ids = sorted([m.get("id", "") for m in models if m.get("id")])
return {
"success": True,
"models": model_ids
}
else:
error_text = await response.text()
logger.error(f"Failed to fetch LandPPT models: {response.status} - {error_text}")
return {
"success": False,
"error": f"获取模型列表失败: HTTP {response.status}"
}
except Exception as e:
logger.error(f"Failed to get LandPPT models: {e}")
return {
"success": False,
"error": f"获取模型列表失败: {str(e)}"
}
@router.post("/api/config/landppt/test")
async def test_landppt_provider(
user: User = Depends(get_current_user_required)
):
"""
Test LandPPT provider using system-level config.
The API key is used on the backend and never exposed to the frontend.
"""
import aiohttp
try:
config_service = get_db_config_service()
# Get system-level config (user_id=None)
api_key = await config_service.get_config_value("landppt_api_key", user_id=None)
base_url = await config_service.get_config_value("landppt_base_url", user_id=None)
model = await config_service.get_config_value("landppt_model", user_id=user.id)
if not api_key:
return {
"success": False,
"error": "管理员尚未配置 LandPPT API Key"
}
if not base_url:
base_url = "https://api.openai.com/v1"
if not model:
model = "gpt-4o"
# Ensure base URL ends with /v1
if not base_url.endswith('/v1'):
base_url = base_url.rstrip('/') + '/v1'
# Test with a simple chat completion
test_url = f"{base_url}/chat/completions"
timeout_seconds = await get_user_llm_timeout_seconds(user.id)
async with aiohttp.ClientSession() as session:
async with session.post(
test_url,
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
},
json={
"model": model,
"messages": [{"role": "user", "content": "Hello"}]
},
timeout=aiohttp.ClientTimeout(total=timeout_seconds)
) as response:
if response.status == 200:
data = await response.json()
return {
"success": True,
"message": "LandPPT 提供者测试成功",
"model": model,
"response_preview": data.get("choices", [{}])[0].get("message", {}).get("content", "")[:50]
}
else:
error_text = await response.text()
logger.error(f"LandPPT test failed: {response.status} - {error_text}")
return {
"success": False,
"error": f"测试失败: HTTP {response.status}"
}
except Exception as e:
logger.error(f"Failed to test LandPPT provider: {e}")
return {
"success": False,
"error": f"测试失败: {str(e)}"
}
# Generic category routes last
@router.get("/api/config/{category}")
async def get_config_by_category(
category: str,
user: User = Depends(get_current_user_required)
):
"""Get configuration values by category for current user"""
try:
config_service = get_db_config_service()
if not user.is_admin and category in getattr(config_service, "ADMIN_ONLY_CATEGORIES", set()):
raise HTTPException(status_code=403, detail="Forbidden")
config = await config_service.get_config_by_category(category, user_id=user.id)
# Filter out admin_only fields for non-admin users
if not user.is_admin:
schema = config_service.get_config_schema()
admin_only_keys = [
key for key, settings in schema.items()
if settings.get("admin_only", False)
]
for key in admin_only_keys:
if key in config:
del config[key]
# Redact sensitive values so system/admin defaults are never sent to the browser.
# If a normal user has their own override, it's OK to return it to that user.
tavily_value = config.get("tavily_api_key")
tavily_configured = bool(str(tavily_value).strip()) if tavily_value is not None else False
user_has_override = False
if not getattr(user, "is_admin", False):
try:
user_has_override = await config_service.is_user_override(user.id, "tavily_api_key")
except Exception:
user_has_override = False
if getattr(user, "is_admin", False) or not user_has_override:
config.pop("tavily_api_key", None)
config["tavily_api_key_configured"] = tavily_configured
config["tavily_uses_admin_default"] = bool(tavily_configured and (not user.is_admin) and (not user_has_override))
return {
"success": True,
"config": config,
"category": category
}
except Exception as e:
logger.error(f"Failed to get configuration for category {category}: {e}")
raise HTTPException(status_code=500, detail=f"Failed to get configuration for category {category}")
@router.post("/api/config/{category}")
async def update_config_by_category(
category: str,
request: ConfigUpdateRequest,
user: User = Depends(get_current_user_required)
):
"""Update configuration values for a specific category for current user"""
try:
config_service = get_db_config_service()
if not user.is_admin and category in getattr(config_service, "ADMIN_ONLY_CATEGORIES", set()):
raise HTTPException(status_code=403, detail="Forbidden")
# Filter config to only include keys from the specified category
schema = config_service.get_config_schema()
filtered_config = {
key: value
for key, value in request.config.items()
if key in schema and schema[key].get("category") == category
}
# For sensitive settings (e.g. Tavily key), admins set the system default.
system_scoped = {}
user_scoped = dict(filtered_config or {})
if category == "generation_params" and getattr(user, "is_admin", False) and "tavily_api_key" in user_scoped:
system_scoped["tavily_api_key"] = user_scoped.pop("tavily_api_key")
if system_scoped:
ok = await config_service.update_config(system_scoped, user_id=None)
if not ok:
raise HTTPException(status_code=500, detail=f"Failed to update configuration for {category}")
success = True
if user_scoped:
success = await config_service.update_config(user_scoped, user_id=user.id)
if success:
return {
"success": True,
"message": f"Configuration for {category} updated successfully"
}
else:
raise HTTPException(status_code=500, detail=f"Failed to update configuration for {category}")
except Exception as e:
logger.error(f"Failed to update configuration for category {category}: {e}")
raise HTTPException(status_code=500, detail=f"Failed to update configuration for {category}")