| import os |
| import time |
| import logging |
| import requests |
| import concurrent.futures |
| from datetime import datetime, timedelta |
| from apscheduler.schedulers.background import BackgroundScheduler |
| from flask import Flask, request, jsonify, Response, stream_with_context |
|
|
| logging.basicConfig(level=logging.INFO, |
| format='%(asctime)s - %(levelname)s - %(message)s') |
|
|
| API_ENDPOINT = "https://api.siliconflow.cn/v1/user/info" |
| TEST_MODEL_ENDPOINT = "https://api.siliconflow.cn/v1/chat/completions" |
| MODELS_ENDPOINT = "https://api.siliconflow.cn/v1/models" |
|
|
| app = Flask(__name__) |
|
|
| all_models = [] |
| free_models = [] |
|
|
| invalid_keys_global = [] |
| free_keys_global = [] |
| unverified_keys_global = [] |
| valid_keys_global = [] |
|
|
| def get_credit_summary(api_key): |
| """ |
| 使用 API 密钥获取额度信息。 |
| """ |
| headers = { |
| "Authorization": f"Bearer {api_key}", |
| "Content-Type": "application/json" |
| } |
| try: |
| response = requests.get(API_ENDPOINT, headers=headers) |
| response.raise_for_status() |
| data = response.json().get("data", {}) |
| total_balance = data.get("totalBalance", 0) |
| return {"total_balance": float(total_balance)} |
| except requests.exceptions.RequestException as e: |
| logging.error(f"获取额度信息失败,API Key:{api_key},错误信息:{e}") |
| return None |
| except (KeyError, TypeError) as e: |
| logging.error(f"解析额度信息失败,API Key:{api_key},错误信息:{e}") |
| return None |
| except ValueError as e: |
| logging.error(f"total_balance 无法转换为浮点数,API Key:{api_key},错误信息:{e}") |
| return None |
|
|
| FREE_MODEL_TEST_KEY = "sk-bmjbjzleaqfgtqfzmcnsbagxrlohriadnxqrzfocbizaxukw" |
|
|
| def test_model_availability(api_key, model_name): |
| """ |
| 测试指定的模型是否可用。 |
| """ |
| headers = { |
| "Authorization": f"Bearer {api_key}", |
| "Content-Type": "application/json" |
| } |
| try: |
| response = requests.post(TEST_MODEL_ENDPOINT, |
| headers=headers, |
| json={ |
| "model": model_name, |
| "messages": [{"role": "user", "content": "hi"}], |
| "max_tokens": 10, |
| "stream": False |
| }, |
| timeout=10) |
| |
| if response.status_code == 429 or response.status_code == 200 : |
| return True |
| else: |
| return False |
| except requests.exceptions.RequestException as e: |
| logging.error(f"测试模型 {model_name} 可用性失败,API Key:{api_key},错误信息:{e}") |
| return False |
|
|
| def refresh_models(): |
| """ |
| 刷新模型列表和免费模型列表。 |
| """ |
| global all_models, free_models |
|
|
| |
| all_models = get_all_models(FREE_MODEL_TEST_KEY) |
|
|
| free_models = [] |
|
|
| |
| with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: |
| |
| future_to_model = {executor.submit(test_model_availability, FREE_MODEL_TEST_KEY, model): model for model in all_models} |
|
|
| for future in concurrent.futures.as_completed(future_to_model): |
| model = future_to_model[future] |
| try: |
| is_free = future.result() |
| if is_free: |
| free_models.append(model) |
| except Exception as exc: |
| logging.error(f"模型 {model} 测试生成异常: {exc}") |
|
|
| logging.info(f"所有模型列表:{all_models}") |
| logging.info(f"免费模型列表:{free_models}") |
|
|
| def load_keys(): |
| """ |
| 从环境变量中加载 keys,并根据额度和模型可用性进行分类,然后记录到日志中。 |
| """ |
| keys_str = os.environ.get("KEYS") |
| test_model = os.environ.get("TEST_MODEL", "Pro/google/gemma-2-9b-it") |
|
|
| invalid_keys = [] |
| free_keys = [] |
| unverified_keys = [] |
| valid_keys = [] |
|
|
| if keys_str: |
| keys = [key.strip() for key in keys_str.split(',')] |
| logging.info(f"加载的 keys:{keys}") |
|
|
| for key in keys: |
| credit_summary = get_credit_summary(key) |
| if credit_summary is None: |
| invalid_keys.append(key) |
| else: |
| total_balance = credit_summary.get("total_balance", 0) |
| if total_balance <= 0: |
| free_keys.append(key) |
| else: |
| if test_model_availability(key, test_model): |
| valid_keys.append(key) |
| else: |
| unverified_keys.append(key) |
|
|
| logging.info(f"无效 KEY:{invalid_keys}") |
| logging.info(f"免费 KEY:{free_keys}") |
| logging.info(f"未实名 KEY:{unverified_keys}") |
| logging.info(f"有效 KEY:{valid_keys}") |
| |
| |
| global invalid_keys_global, free_keys_global, unverified_keys_global, valid_keys_global |
| invalid_keys_global = invalid_keys |
| free_keys_global = free_keys |
| unverified_keys_global = unverified_keys |
| valid_keys_global = valid_keys |
|
|
| else: |
| logging.warning("环境变量 KEYS 未设置。") |
|
|
| def get_all_models(api_key): |
| """ |
| 获取所有模型列表。 |
| """ |
| headers = { |
| "Authorization": f"Bearer {api_key}", |
| "Content-Type": "application/json" |
| } |
| try: |
| response = requests.get(MODELS_ENDPOINT, headers=headers, params={"sub_type": "chat"}) |
| response.raise_for_status() |
| data = response.json() |
| |
| if isinstance(data, dict) and 'data' in data and isinstance(data['data'], list): |
| return [model.get("id") for model in data["data"] if isinstance(model, dict) and "id" in model] |
| else: |
| logging.error("获取模型列表失败:响应数据格式不正确") |
| return [] |
| except requests.exceptions.RequestException as e: |
| logging.error(f"获取模型列表失败,API Key:{api_key},错误信息:{e}") |
| return [] |
| except (KeyError, TypeError) as e: |
| logging.error(f"解析模型列表失败,API Key:{api_key},错误信息:{e}") |
| return [] |
|
|
| def determine_request_type(model_name): |
| """ |
| 根据用户请求的模型判断请求类型。 |
| """ |
| if model_name in free_models: |
| return "free" |
| elif model_name in all_models: |
| return "paid" |
| else: |
| return "unknown" |
|
|
| def select_key(request_type): |
| """ |
| 根据请求类型选择合适的 KEY。 |
| """ |
| if request_type == "free": |
| |
| available_keys = free_keys_global + unverified_keys_global + valid_keys_global |
| elif request_type == "paid": |
| |
| available_keys = unverified_keys_global + valid_keys_global |
| else: |
| |
| available_keys = free_keys_global + unverified_keys_global + valid_keys_global |
|
|
| if not available_keys: |
| return None |
|
|
| |
| key = available_keys[int(time.time() * 1000) % len(available_keys)] |
| return key |
|
|
| def check_authorization(request): |
| """ |
| 检查请求头中的 Authorization 字段是否匹配环境变量 AUTHORIZATION_KEY。 |
| """ |
| authorization_key = os.environ.get("AUTHORIZATION_KEY") |
| if not authorization_key: |
| logging.warning("环境变量 AUTHORIZATION_KEY 未设置,请设置后重试。") |
| return False |
|
|
| auth_header = request.headers.get('Authorization') |
| if not auth_header: |
| logging.warning("请求头中缺少 Authorization 字段。") |
| return False |
|
|
| if auth_header != f"Bearer {authorization_key}": |
| logging.warning(f"无效的 Authorization 密钥:{auth_header}") |
| return False |
|
|
| return True |
|
|
| |
| scheduler = BackgroundScheduler() |
|
|
| |
| scheduler.add_job(load_keys, 'interval', hours=1) |
| |
| scheduler.add_job(refresh_models, 'interval', minutes=10) |
|
|
| @app.route('/') |
| def index(): |
| """ |
| 处理根路由的访问请求。 |
| """ |
| return "<h1>Welcome to SiliconFlow</h1>" |
|
|
| @app.route('/check_tokens', methods=['POST']) |
| def check_tokens(): |
| """ |
| 处理前端发送的 Token 检测请求。 |
| """ |
| tokens = request.json.get('tokens', []) |
| test_model = os.environ.get("TEST_MODEL", "Pro/google/gemma-2-9b-it") |
|
|
| results = [] |
| for token in tokens: |
| credit_summary = get_credit_summary(token) |
| if credit_summary is None: |
| results.append({"token": token, "type": "无效 KEY", "balance": 0, "message": "无法获取额度信息"}) |
| else: |
| total_balance = credit_summary.get("total_balance", 0) |
| if total_balance <= 0: |
| results.append({"token": token, "type": "免费 KEY", "balance": total_balance, "message": "额度不足"}) |
| else: |
| if test_model_availability(token, test_model): |
| results.append({"token": token, "type": "有效 KEY", "balance": total_balance, "message": "可以使用指定模型"}) |
| else: |
| results.append({"token": token, "type": "未实名 KEY", "balance": total_balance, "message": "无法使用指定模型"}) |
|
|
| return jsonify(results) |
|
|
| @app.route('/handsome/v1/chat/completions', methods=['POST']) |
| def handsome_chat_completions(): |
| """ |
| 处理 /handsome/v1/chat/completions 路由的请求,添加鉴权。 |
| """ |
| if not check_authorization(request): |
| return jsonify({"error": "Unauthorized"}), 401 |
|
|
| data = request.get_json() |
| if not data or 'model' not in data: |
| return jsonify({"error": "Invalid request data"}), 400 |
|
|
| model_name = data['model'] |
| request_type = determine_request_type(model_name) |
| api_key = select_key(request_type) |
|
|
| if not api_key: |
| return jsonify({"error": "No available API key for this request type"}), 400 |
|
|
| headers = { |
| "Authorization": f"Bearer {api_key}", |
| "Content-Type": "application/json" |
| } |
|
|
| |
| try: |
| response = requests.post( |
| TEST_MODEL_ENDPOINT, |
| headers=headers, |
| json=data, |
| stream=data.get("stream", False), |
| timeout=60 |
| ) |
| |
| |
| if response.status_code == 429: |
| return jsonify(response.json()), 429 |
|
|
| if data.get("stream", False): |
| return Response(stream_with_context(response.iter_content(chunk_size=1024)), content_type=response.headers['Content-Type']) |
| else: |
| response.raise_for_status() |
| return jsonify(response.json()) |
| except requests.exceptions.RequestException as e: |
| return jsonify({"error": str(e)}), 500 |
|
|
| @app.route('/handsome/v1/models', methods=['GET']) |
| def list_models(): |
| """ |
| 处理 /handsome/v1/models 路由的请求,返回模型列表。 |
| """ |
| if not check_authorization(request): |
| return jsonify({"error": "Unauthorized"}), 401 |
|
|
| |
| return jsonify({ |
| "data": [{"id": model, "object": "model"} for model in all_models], |
| "free_models": free_models |
| }) |
|
|
| def get_billing_info(): |
| """ |
| 获取所有KEY的额度信息。 |
| """ |
| total_balance = 0 |
| for key in valid_keys_global + unverified_keys_global: |
| credit_summary = get_credit_summary(key) |
| if credit_summary: |
| total_balance += credit_summary.get("total_balance", 0) |
| return total_balance |
|
|
| @app.route('/handsome/v1/dashboard/billing/usage', methods=['GET']) |
| def billing_usage(): |
| """ |
| 处理 /handsome/v1/dashboard/billing/usage 路由的请求,返回用量信息(修改后,始终返回 0)。 |
| """ |
| if not check_authorization(request): |
| return jsonify({"error": "Unauthorized"}), 401 |
|
|
| end_date = datetime.now() |
| start_date = end_date - timedelta(days=30) |
|
|
| |
| daily_usage = [] |
| current_date = start_date |
| while current_date <= end_date: |
| daily_usage.append({ |
| "timestamp": int(current_date.timestamp()), |
| "daily_usage": 0 |
| }) |
| current_date += timedelta(days=1) |
|
|
| return jsonify({ |
| "object": "list", |
| "data": daily_usage, |
| "total_usage": 0 |
| }) |
|
|
| @app.route('/handsome/v1/dashboard/billing/subscription', methods=['GET']) |
| def billing_subscription(): |
| """ |
| 处理 /handsome/v1/dashboard/billing/subscription 路由的请求,返回订阅信息。 |
| """ |
| if not check_authorization(request): |
| return jsonify({"error": "Unauthorized"}), 401 |
|
|
| total_balance = get_billing_info() |
|
|
| return jsonify({ |
| "object": "billing_subscription", |
| "has_payment_method": False, |
| "canceled": False, |
| "canceled_at": None, |
| "delinquent": None, |
| "access_until": int(datetime(9999, 12, 31).timestamp()), |
| "soft_limit": 0, |
| "hard_limit": total_balance, |
| "system_hard_limit": total_balance, |
| "soft_limit_usd": 0, |
| "hard_limit_usd": total_balance, |
| "system_hard_limit_usd": total_balance, |
| "plan": { |
| "name": "SiliconFlow API", |
| "id": "siliconflow-api" |
| }, |
| "account_name": "SiliconFlow User", |
| "po_number": None, |
| "billing_email": None, |
| "tax_ids": [], |
| "billing_address": None, |
| "business_address": None |
| }) |
|
|
| if __name__ == '__main__': |
| |
| logging.info(f"环境变量:{os.environ}") |
|
|
| |
| invalid_keys_global = [] |
| free_keys_global = [] |
| unverified_keys_global = [] |
| valid_keys_global = [] |
|
|
| |
| scheduler.start() |
|
|
| |
| load_keys() |
| logging.info("首次加载 keys 已手动触发执行") |
|
|
| |
| refresh_models() |
| logging.info("首次刷新模型列表已手动触发执行") |
|
|
| |
| app.run(debug=False, host='0.0.0.0', port=int(os.environ.get('PORT', 7860))) |
|
|