| from typing import List |
|
|
| from fastapi import APIRouter, Depends, HTTPException, Query, Request |
|
|
| from app.core.auth import verify_app_key |
| from app.core.batch import create_task, expire_task |
| from app.services.grok.batch_services.assets import ListService, DeleteService |
| from app.services.token.manager import get_token_manager |
| router = APIRouter() |
|
|
|
|
| @router.get("/cache", dependencies=[Depends(verify_app_key)]) |
| async def cache_stats(request: Request): |
| """获取缓存统计""" |
| from app.services.grok.utils.cache import CacheService |
|
|
| try: |
| cache_service = CacheService() |
| image_stats = cache_service.get_stats("image") |
| video_stats = cache_service.get_stats("video") |
|
|
| mgr = await get_token_manager() |
| pools = mgr.pools |
| accounts = [] |
| for pool_name, pool in pools.items(): |
| for info in pool.list(): |
| raw_token = ( |
| info.token[4:] if info.token.startswith("sso=") else info.token |
| ) |
| masked = ( |
| f"{raw_token[:8]}...{raw_token[-16:]}" |
| if len(raw_token) > 24 |
| else raw_token |
| ) |
| accounts.append( |
| { |
| "token": raw_token, |
| "token_masked": masked, |
| "pool": pool_name, |
| "status": info.status, |
| "last_asset_clear_at": info.last_asset_clear_at, |
| } |
| ) |
|
|
| scope = request.query_params.get("scope") |
| selected_token = request.query_params.get("token") |
| tokens_param = request.query_params.get("tokens") |
| selected_tokens = [] |
| if tokens_param: |
| selected_tokens = [t.strip() for t in tokens_param.split(",") if t.strip()] |
|
|
| online_stats = { |
| "count": 0, |
| "status": "unknown", |
| "token": None, |
| "last_asset_clear_at": None, |
| } |
| online_details = [] |
| account_map = {a["token"]: a for a in accounts} |
| if selected_tokens: |
| total = 0 |
| raw_results = await ListService.fetch_assets_details( |
| selected_tokens, |
| account_map, |
| ) |
| for token, res in raw_results.items(): |
| if res.get("ok"): |
| data = res.get("data", {}) |
| detail = data.get("detail") |
| total += data.get("count", 0) |
| else: |
| account = account_map.get(token) |
| detail = { |
| "token": token, |
| "token_masked": account["token_masked"] if account else token, |
| "count": 0, |
| "status": f"error: {res.get('error')}", |
| "last_asset_clear_at": account["last_asset_clear_at"] |
| if account |
| else None, |
| } |
| if detail: |
| online_details.append(detail) |
| online_stats = { |
| "count": total, |
| "status": "ok" if selected_tokens else "no_token", |
| "token": None, |
| "last_asset_clear_at": None, |
| } |
| scope = "selected" |
| elif scope == "all": |
| total = 0 |
| tokens = list(dict.fromkeys([account["token"] for account in accounts])) |
| raw_results = await ListService.fetch_assets_details( |
| tokens, |
| account_map, |
| ) |
| for token, res in raw_results.items(): |
| if res.get("ok"): |
| data = res.get("data", {}) |
| detail = data.get("detail") |
| total += data.get("count", 0) |
| else: |
| account = account_map.get(token) |
| detail = { |
| "token": token, |
| "token_masked": account["token_masked"] if account else token, |
| "count": 0, |
| "status": f"error: {res.get('error')}", |
| "last_asset_clear_at": account["last_asset_clear_at"] |
| if account |
| else None, |
| } |
| if detail: |
| online_details.append(detail) |
| online_stats = { |
| "count": total, |
| "status": "ok" if accounts else "no_token", |
| "token": None, |
| "last_asset_clear_at": None, |
| } |
| else: |
| token = selected_token |
| if token: |
| raw_results = await ListService.fetch_assets_details( |
| [token], |
| account_map, |
| ) |
| res = raw_results.get(token, {}) |
| data = res.get("data", {}) |
| detail = data.get("detail") if res.get("ok") else None |
| if detail: |
| online_stats = { |
| "count": data.get("count", 0), |
| "status": detail.get("status", "ok"), |
| "token": detail.get("token"), |
| "token_masked": detail.get("token_masked"), |
| "last_asset_clear_at": detail.get("last_asset_clear_at"), |
| } |
| else: |
| match = next((a for a in accounts if a["token"] == token), None) |
| online_stats = { |
| "count": 0, |
| "status": f"error: {res.get('error')}", |
| "token": token, |
| "token_masked": match["token_masked"] if match else token, |
| "last_asset_clear_at": match["last_asset_clear_at"] |
| if match |
| else None, |
| } |
| else: |
| online_stats = { |
| "count": 0, |
| "status": "not_loaded", |
| "token": None, |
| "last_asset_clear_at": None, |
| } |
|
|
| response = { |
| "local_image": image_stats, |
| "local_video": video_stats, |
| "online": online_stats, |
| "online_accounts": accounts, |
| "online_scope": scope or "none", |
| "online_details": online_details, |
| } |
| return response |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| @router.get("/cache/list", dependencies=[Depends(verify_app_key)]) |
| async def list_local( |
| cache_type: str = "image", |
| type_: str = Query(default=None, alias="type"), |
| page: int = 1, |
| page_size: int = 1000, |
| ): |
| """列出本地缓存文件""" |
| from app.services.grok.utils.cache import CacheService |
|
|
| try: |
| if type_: |
| cache_type = type_ |
| cache_service = CacheService() |
| result = cache_service.list_files(cache_type, page, page_size) |
| return {"status": "success", **result} |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| @router.post("/cache/clear", dependencies=[Depends(verify_app_key)]) |
| async def clear_local(data: dict): |
| """清理本地缓存""" |
| from app.services.grok.utils.cache import CacheService |
|
|
| cache_type = data.get("type", "image") |
|
|
| try: |
| cache_service = CacheService() |
| result = cache_service.clear(cache_type) |
| return {"status": "success", "result": result} |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| @router.post("/cache/item/delete", dependencies=[Depends(verify_app_key)]) |
| async def delete_local_item(data: dict): |
| """删除单个本地缓存文件""" |
| from app.services.grok.utils.cache import CacheService |
|
|
| cache_type = data.get("type", "image") |
| name = data.get("name") |
| if not name: |
| raise HTTPException(status_code=400, detail="Missing file name") |
| try: |
| cache_service = CacheService() |
| result = cache_service.delete_file(cache_type, name) |
| return {"status": "success", "result": result} |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| @router.post("/cache/online/clear", dependencies=[Depends(verify_app_key)]) |
| async def clear_online(data: dict): |
| """清理在线缓存""" |
| try: |
| mgr = await get_token_manager() |
| tokens = data.get("tokens") |
|
|
| if isinstance(tokens, list): |
| token_list = [t.strip() for t in tokens if isinstance(t, str) and t.strip()] |
| if not token_list: |
| raise HTTPException(status_code=400, detail="No tokens provided") |
|
|
| token_list = list(dict.fromkeys(token_list)) |
|
|
| results = {} |
| raw_results = await DeleteService.clear_assets( |
| token_list, |
| mgr, |
| ) |
| for token, res in raw_results.items(): |
| if res.get("ok"): |
| results[token] = res.get("data", {}) |
| else: |
| results[token] = {"status": "error", "error": res.get("error")} |
|
|
| return {"status": "success", "results": results} |
|
|
| token = data.get("token") or mgr.get_token() |
| if not token: |
| raise HTTPException( |
| status_code=400, detail="No available token to perform cleanup" |
| ) |
|
|
| raw_results = await DeleteService.clear_assets( |
| [token], |
| mgr, |
| ) |
| res = raw_results.get(token, {}) |
| data = res.get("data", {}) |
| if res.get("ok") and data.get("status") == "success": |
| return {"status": "success", "result": data.get("result")} |
| return {"status": "error", "error": data.get("error") or res.get("error")} |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| @router.post("/cache/online/clear/async", dependencies=[Depends(verify_app_key)]) |
| async def clear_online_async(data: dict): |
| """清理在线缓存(异步批量 + SSE 进度)""" |
| mgr = await get_token_manager() |
| tokens = data.get("tokens") |
| if not isinstance(tokens, list): |
| raise HTTPException(status_code=400, detail="No tokens provided") |
|
|
| token_list = [t.strip() for t in tokens if isinstance(t, str) and t.strip()] |
| if not token_list: |
| raise HTTPException(status_code=400, detail="No tokens provided") |
|
|
| task = create_task(len(token_list)) |
|
|
| async def _run(): |
| try: |
| async def _on_item(item: str, res: dict): |
| ok = bool(res.get("data", {}).get("ok")) |
| task.record(ok) |
|
|
| raw_results = await DeleteService.clear_assets( |
| token_list, |
| mgr, |
| include_ok=True, |
| on_item=_on_item, |
| should_cancel=lambda: task.cancelled, |
| ) |
|
|
| if task.cancelled: |
| task.finish_cancelled() |
| return |
|
|
| results = {} |
| ok_count = 0 |
| fail_count = 0 |
| for token, res in raw_results.items(): |
| data = res.get("data", {}) |
| if data.get("ok"): |
| ok_count += 1 |
| results[token] = {"status": "success", "result": data.get("result")} |
| else: |
| fail_count += 1 |
| results[token] = {"status": "error", "error": data.get("error")} |
|
|
| result = { |
| "status": "success", |
| "summary": { |
| "total": len(token_list), |
| "ok": ok_count, |
| "fail": fail_count, |
| }, |
| "results": results, |
| } |
| task.finish(result) |
| except Exception as e: |
| task.fail_task(str(e)) |
| finally: |
| import asyncio |
| asyncio.create_task(expire_task(task.id, 300)) |
|
|
| import asyncio |
| asyncio.create_task(_run()) |
|
|
| return { |
| "status": "success", |
| "task_id": task.id, |
| "total": len(token_list), |
| } |
|
|
|
|
| @router.post("/cache/online/load/async", dependencies=[Depends(verify_app_key)]) |
| async def load_cache_async(data: dict): |
| """在线资产统计(异步批量 + SSE 进度)""" |
| from app.services.grok.utils.cache import CacheService |
|
|
| mgr = await get_token_manager() |
|
|
| accounts = [] |
| for pool_name, pool in mgr.pools.items(): |
| for info in pool.list(): |
| raw_token = info.token[4:] if info.token.startswith("sso=") else info.token |
| masked = ( |
| f"{raw_token[:8]}...{raw_token[-16:]}" |
| if len(raw_token) > 24 |
| else raw_token |
| ) |
| accounts.append( |
| { |
| "token": raw_token, |
| "token_masked": masked, |
| "pool": pool_name, |
| "status": info.status, |
| "last_asset_clear_at": info.last_asset_clear_at, |
| } |
| ) |
|
|
| account_map = {a["token"]: a for a in accounts} |
|
|
| tokens = data.get("tokens") |
| scope = data.get("scope") |
| selected_tokens: List[str] = [] |
| if isinstance(tokens, list): |
| selected_tokens = [str(t).strip() for t in tokens if str(t).strip()] |
|
|
| if not selected_tokens and scope == "all": |
| selected_tokens = [account["token"] for account in accounts] |
| scope = "all" |
| elif selected_tokens: |
| scope = "selected" |
| else: |
| raise HTTPException(status_code=400, detail="No tokens provided") |
|
|
| task = create_task(len(selected_tokens)) |
|
|
| async def _run(): |
| try: |
| cache_service = CacheService() |
| image_stats = cache_service.get_stats("image") |
| video_stats = cache_service.get_stats("video") |
|
|
| async def _on_item(item: str, res: dict): |
| ok = bool(res.get("data", {}).get("ok")) |
| task.record(ok) |
|
|
| raw_results = await ListService.fetch_assets_details( |
| selected_tokens, |
| account_map, |
| include_ok=True, |
| on_item=_on_item, |
| should_cancel=lambda: task.cancelled, |
| ) |
|
|
| if task.cancelled: |
| task.finish_cancelled() |
| return |
|
|
| online_details = [] |
| total = 0 |
| for token, res in raw_results.items(): |
| data = res.get("data", {}) |
| detail = data.get("detail") |
| if detail: |
| online_details.append(detail) |
| total += data.get("count", 0) |
|
|
| online_stats = { |
| "count": total, |
| "status": "ok" if selected_tokens else "no_token", |
| "token": None, |
| "last_asset_clear_at": None, |
| } |
|
|
| result = { |
| "local_image": image_stats, |
| "local_video": video_stats, |
| "online": online_stats, |
| "online_accounts": accounts, |
| "online_scope": scope or "none", |
| "online_details": online_details, |
| } |
| task.finish(result) |
| except Exception as e: |
| task.fail_task(str(e)) |
| finally: |
| import asyncio |
| asyncio.create_task(expire_task(task.id, 300)) |
|
|
| import asyncio |
| asyncio.create_task(_run()) |
|
|
| return { |
| "status": "success", |
| "task_id": task.id, |
| "total": len(selected_tokens), |
| } |
|
|
|
|