ggload / app /api /v1 /admin_api /cache.py
f2d90b38's picture
Upload 120 files
8cdca00 verified
from typing import List
from fastapi import APIRouter, Depends, HTTPException, Query, Request
from app.core.auth import verify_app_key
from app.core.batch import create_task, expire_task
from app.services.grok.batch_services.assets import ListService, DeleteService
from app.services.token.manager import get_token_manager
router = APIRouter()
@router.get("/cache", dependencies=[Depends(verify_app_key)])
async def cache_stats(request: Request):
"""获取缓存统计"""
from app.services.grok.utils.cache import CacheService
try:
cache_service = CacheService()
image_stats = cache_service.get_stats("image")
video_stats = cache_service.get_stats("video")
mgr = await get_token_manager()
pools = mgr.pools
accounts = []
for pool_name, pool in pools.items():
for info in pool.list():
raw_token = (
info.token[4:] if info.token.startswith("sso=") else info.token
)
masked = (
f"{raw_token[:8]}...{raw_token[-16:]}"
if len(raw_token) > 24
else raw_token
)
accounts.append(
{
"token": raw_token,
"token_masked": masked,
"pool": pool_name,
"status": info.status,
"last_asset_clear_at": info.last_asset_clear_at,
}
)
scope = request.query_params.get("scope")
selected_token = request.query_params.get("token")
tokens_param = request.query_params.get("tokens")
selected_tokens = []
if tokens_param:
selected_tokens = [t.strip() for t in tokens_param.split(",") if t.strip()]
online_stats = {
"count": 0,
"status": "unknown",
"token": None,
"last_asset_clear_at": None,
}
online_details = []
account_map = {a["token"]: a for a in accounts}
if selected_tokens:
total = 0
raw_results = await ListService.fetch_assets_details(
selected_tokens,
account_map,
)
for token, res in raw_results.items():
if res.get("ok"):
data = res.get("data", {})
detail = data.get("detail")
total += data.get("count", 0)
else:
account = account_map.get(token)
detail = {
"token": token,
"token_masked": account["token_masked"] if account else token,
"count": 0,
"status": f"error: {res.get('error')}",
"last_asset_clear_at": account["last_asset_clear_at"]
if account
else None,
}
if detail:
online_details.append(detail)
online_stats = {
"count": total,
"status": "ok" if selected_tokens else "no_token",
"token": None,
"last_asset_clear_at": None,
}
scope = "selected"
elif scope == "all":
total = 0
tokens = list(dict.fromkeys([account["token"] for account in accounts]))
raw_results = await ListService.fetch_assets_details(
tokens,
account_map,
)
for token, res in raw_results.items():
if res.get("ok"):
data = res.get("data", {})
detail = data.get("detail")
total += data.get("count", 0)
else:
account = account_map.get(token)
detail = {
"token": token,
"token_masked": account["token_masked"] if account else token,
"count": 0,
"status": f"error: {res.get('error')}",
"last_asset_clear_at": account["last_asset_clear_at"]
if account
else None,
}
if detail:
online_details.append(detail)
online_stats = {
"count": total,
"status": "ok" if accounts else "no_token",
"token": None,
"last_asset_clear_at": None,
}
else:
token = selected_token
if token:
raw_results = await ListService.fetch_assets_details(
[token],
account_map,
)
res = raw_results.get(token, {})
data = res.get("data", {})
detail = data.get("detail") if res.get("ok") else None
if detail:
online_stats = {
"count": data.get("count", 0),
"status": detail.get("status", "ok"),
"token": detail.get("token"),
"token_masked": detail.get("token_masked"),
"last_asset_clear_at": detail.get("last_asset_clear_at"),
}
else:
match = next((a for a in accounts if a["token"] == token), None)
online_stats = {
"count": 0,
"status": f"error: {res.get('error')}",
"token": token,
"token_masked": match["token_masked"] if match else token,
"last_asset_clear_at": match["last_asset_clear_at"]
if match
else None,
}
else:
online_stats = {
"count": 0,
"status": "not_loaded",
"token": None,
"last_asset_clear_at": None,
}
response = {
"local_image": image_stats,
"local_video": video_stats,
"online": online_stats,
"online_accounts": accounts,
"online_scope": scope or "none",
"online_details": online_details,
}
return response
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/cache/list", dependencies=[Depends(verify_app_key)])
async def list_local(
cache_type: str = "image",
type_: str = Query(default=None, alias="type"),
page: int = 1,
page_size: int = 1000,
):
"""列出本地缓存文件"""
from app.services.grok.utils.cache import CacheService
try:
if type_:
cache_type = type_
cache_service = CacheService()
result = cache_service.list_files(cache_type, page, page_size)
return {"status": "success", **result}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/cache/clear", dependencies=[Depends(verify_app_key)])
async def clear_local(data: dict):
"""清理本地缓存"""
from app.services.grok.utils.cache import CacheService
cache_type = data.get("type", "image")
try:
cache_service = CacheService()
result = cache_service.clear(cache_type)
return {"status": "success", "result": result}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/cache/item/delete", dependencies=[Depends(verify_app_key)])
async def delete_local_item(data: dict):
"""删除单个本地缓存文件"""
from app.services.grok.utils.cache import CacheService
cache_type = data.get("type", "image")
name = data.get("name")
if not name:
raise HTTPException(status_code=400, detail="Missing file name")
try:
cache_service = CacheService()
result = cache_service.delete_file(cache_type, name)
return {"status": "success", "result": result}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/cache/online/clear", dependencies=[Depends(verify_app_key)])
async def clear_online(data: dict):
"""清理在线缓存"""
try:
mgr = await get_token_manager()
tokens = data.get("tokens")
if isinstance(tokens, list):
token_list = [t.strip() for t in tokens if isinstance(t, str) and t.strip()]
if not token_list:
raise HTTPException(status_code=400, detail="No tokens provided")
token_list = list(dict.fromkeys(token_list))
results = {}
raw_results = await DeleteService.clear_assets(
token_list,
mgr,
)
for token, res in raw_results.items():
if res.get("ok"):
results[token] = res.get("data", {})
else:
results[token] = {"status": "error", "error": res.get("error")}
return {"status": "success", "results": results}
token = data.get("token") or mgr.get_token()
if not token:
raise HTTPException(
status_code=400, detail="No available token to perform cleanup"
)
raw_results = await DeleteService.clear_assets(
[token],
mgr,
)
res = raw_results.get(token, {})
data = res.get("data", {})
if res.get("ok") and data.get("status") == "success":
return {"status": "success", "result": data.get("result")}
return {"status": "error", "error": data.get("error") or res.get("error")}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/cache/online/clear/async", dependencies=[Depends(verify_app_key)])
async def clear_online_async(data: dict):
"""清理在线缓存(异步批量 + SSE 进度)"""
mgr = await get_token_manager()
tokens = data.get("tokens")
if not isinstance(tokens, list):
raise HTTPException(status_code=400, detail="No tokens provided")
token_list = [t.strip() for t in tokens if isinstance(t, str) and t.strip()]
if not token_list:
raise HTTPException(status_code=400, detail="No tokens provided")
task = create_task(len(token_list))
async def _run():
try:
async def _on_item(item: str, res: dict):
ok = bool(res.get("data", {}).get("ok"))
task.record(ok)
raw_results = await DeleteService.clear_assets(
token_list,
mgr,
include_ok=True,
on_item=_on_item,
should_cancel=lambda: task.cancelled,
)
if task.cancelled:
task.finish_cancelled()
return
results = {}
ok_count = 0
fail_count = 0
for token, res in raw_results.items():
data = res.get("data", {})
if data.get("ok"):
ok_count += 1
results[token] = {"status": "success", "result": data.get("result")}
else:
fail_count += 1
results[token] = {"status": "error", "error": data.get("error")}
result = {
"status": "success",
"summary": {
"total": len(token_list),
"ok": ok_count,
"fail": fail_count,
},
"results": results,
}
task.finish(result)
except Exception as e:
task.fail_task(str(e))
finally:
import asyncio
asyncio.create_task(expire_task(task.id, 300))
import asyncio
asyncio.create_task(_run())
return {
"status": "success",
"task_id": task.id,
"total": len(token_list),
}
@router.post("/cache/online/load/async", dependencies=[Depends(verify_app_key)])
async def load_cache_async(data: dict):
"""在线资产统计(异步批量 + SSE 进度)"""
from app.services.grok.utils.cache import CacheService
mgr = await get_token_manager()
accounts = []
for pool_name, pool in mgr.pools.items():
for info in pool.list():
raw_token = info.token[4:] if info.token.startswith("sso=") else info.token
masked = (
f"{raw_token[:8]}...{raw_token[-16:]}"
if len(raw_token) > 24
else raw_token
)
accounts.append(
{
"token": raw_token,
"token_masked": masked,
"pool": pool_name,
"status": info.status,
"last_asset_clear_at": info.last_asset_clear_at,
}
)
account_map = {a["token"]: a for a in accounts}
tokens = data.get("tokens")
scope = data.get("scope")
selected_tokens: List[str] = []
if isinstance(tokens, list):
selected_tokens = [str(t).strip() for t in tokens if str(t).strip()]
if not selected_tokens and scope == "all":
selected_tokens = [account["token"] for account in accounts]
scope = "all"
elif selected_tokens:
scope = "selected"
else:
raise HTTPException(status_code=400, detail="No tokens provided")
task = create_task(len(selected_tokens))
async def _run():
try:
cache_service = CacheService()
image_stats = cache_service.get_stats("image")
video_stats = cache_service.get_stats("video")
async def _on_item(item: str, res: dict):
ok = bool(res.get("data", {}).get("ok"))
task.record(ok)
raw_results = await ListService.fetch_assets_details(
selected_tokens,
account_map,
include_ok=True,
on_item=_on_item,
should_cancel=lambda: task.cancelled,
)
if task.cancelled:
task.finish_cancelled()
return
online_details = []
total = 0
for token, res in raw_results.items():
data = res.get("data", {})
detail = data.get("detail")
if detail:
online_details.append(detail)
total += data.get("count", 0)
online_stats = {
"count": total,
"status": "ok" if selected_tokens else "no_token",
"token": None,
"last_asset_clear_at": None,
}
result = {
"local_image": image_stats,
"local_video": video_stats,
"online": online_stats,
"online_accounts": accounts,
"online_scope": scope or "none",
"online_details": online_details,
}
task.finish(result)
except Exception as e:
task.fail_task(str(e))
finally:
import asyncio
asyncio.create_task(expire_task(task.id, 300))
import asyncio
asyncio.create_task(_run())
return {
"status": "success",
"task_id": task.id,
"total": len(selected_tokens),
}