|
|
|
|
|
"""
|
|
|
Warp Account Request Limit Checker
|
|
|
获取Warp账户的请求额度信息
|
|
|
"""
|
|
|
|
|
|
import asyncio
|
|
|
import json
|
|
|
import sqlite3
|
|
|
import sys
|
|
|
from datetime import datetime
|
|
|
from typing import Dict, Any, Optional
|
|
|
import httpx
|
|
|
import platform
|
|
|
|
|
|
|
|
|
class WarpRequestLimitChecker:
|
|
|
"""Warp账户请求额度检查器"""
|
|
|
|
|
|
def __init__(self, db_path: str = "../warp_accounts.db"):
|
|
|
"""
|
|
|
初始化检查器
|
|
|
|
|
|
Args:
|
|
|
db_path: 数据库路径
|
|
|
"""
|
|
|
self.db_path = db_path
|
|
|
self.async_client = httpx.AsyncClient(timeout=30.0)
|
|
|
|
|
|
async def __aenter__(self):
|
|
|
return self
|
|
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
|
await self.async_client.aclose()
|
|
|
|
|
|
def get_account_from_db(self, email: Optional[str] = None) -> Optional[Dict[str, Any]]:
|
|
|
"""
|
|
|
从数据库获取账户信息
|
|
|
|
|
|
Args:
|
|
|
email: 账户邮箱,如果为None则获取第一个active账户
|
|
|
|
|
|
Returns:
|
|
|
账户信息字典或None
|
|
|
"""
|
|
|
try:
|
|
|
conn = sqlite3.connect(self.db_path)
|
|
|
conn.row_factory = sqlite3.Row
|
|
|
cursor = conn.cursor()
|
|
|
|
|
|
if email:
|
|
|
cursor.execute("""
|
|
|
SELECT *
|
|
|
FROM accounts
|
|
|
WHERE email = ?
|
|
|
AND status = 'active'
|
|
|
""", (email,))
|
|
|
else:
|
|
|
cursor.execute("""
|
|
|
SELECT *
|
|
|
FROM accounts
|
|
|
WHERE status = 'active'
|
|
|
ORDER BY last_used ASC, id ASC LIMIT 1
|
|
|
""")
|
|
|
|
|
|
row = cursor.fetchone()
|
|
|
conn.close()
|
|
|
|
|
|
if row:
|
|
|
return dict(row)
|
|
|
return None
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"❌ 数据库查询错误: {e}")
|
|
|
return None
|
|
|
|
|
|
async def get_request_limit(self, id_token: str) -> Dict[str, Any]:
|
|
|
"""
|
|
|
获取账户请求额度
|
|
|
|
|
|
Args:
|
|
|
id_token: Firebase ID Token
|
|
|
|
|
|
Returns:
|
|
|
包含额度信息的字典
|
|
|
"""
|
|
|
if not id_token:
|
|
|
return {"success": False, "error": "缺少Firebase ID Token"}
|
|
|
|
|
|
try:
|
|
|
url = "https://app.warp.dev/graphql/v2"
|
|
|
|
|
|
|
|
|
query = """query GetRequestLimitInfo($requestContext: RequestContext!) {
|
|
|
user(requestContext: $requestContext) {
|
|
|
__typename
|
|
|
... on UserOutput {
|
|
|
user {
|
|
|
requestLimitInfo {
|
|
|
isUnlimited
|
|
|
nextRefreshTime
|
|
|
requestLimit
|
|
|
requestsUsedSinceLastRefresh
|
|
|
requestLimitRefreshDuration
|
|
|
isUnlimitedAutosuggestions
|
|
|
acceptedAutosuggestionsLimit
|
|
|
acceptedAutosuggestionsSinceLastRefresh
|
|
|
isUnlimitedVoice
|
|
|
voiceRequestLimit
|
|
|
voiceRequestsUsedSinceLastRefresh
|
|
|
voiceTokenLimit
|
|
|
voiceTokensUsedSinceLastRefresh
|
|
|
isUnlimitedCodebaseIndices
|
|
|
maxCodebaseIndices
|
|
|
maxFilesPerRepo
|
|
|
embeddingGenerationBatchSize
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
... on UserFacingError {
|
|
|
error {
|
|
|
__typename
|
|
|
... on SharedObjectsLimitExceeded {
|
|
|
limit
|
|
|
objectType
|
|
|
message
|
|
|
}
|
|
|
... on PersonalObjectsLimitExceeded {
|
|
|
limit
|
|
|
objectType
|
|
|
message
|
|
|
}
|
|
|
... on AccountDelinquencyError {
|
|
|
message
|
|
|
}
|
|
|
... on GenericStringObjectUniqueKeyConflict {
|
|
|
message
|
|
|
}
|
|
|
}
|
|
|
responseContext {
|
|
|
serverVersion
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
"""
|
|
|
|
|
|
|
|
|
os_category = "Web"
|
|
|
os_name = "Windows"
|
|
|
os_version = "NT 10.0"
|
|
|
app_version = "v0.2025.10.01.08.12.stable_02"
|
|
|
|
|
|
data = {
|
|
|
"operationName": "GetRequestLimitInfo",
|
|
|
"variables": {
|
|
|
"requestContext": {
|
|
|
"clientContext": {
|
|
|
"version": app_version
|
|
|
},
|
|
|
"osContext": {
|
|
|
"category": os_category,
|
|
|
"linuxKernelVersion": None,
|
|
|
"name": os_name,
|
|
|
"version": os_version
|
|
|
}
|
|
|
}
|
|
|
},
|
|
|
"query": query
|
|
|
}
|
|
|
|
|
|
headers = {
|
|
|
"Content-Type": "application/json",
|
|
|
"authorization": f"Bearer {id_token}",
|
|
|
"x-warp-client-id": "warp-app",
|
|
|
"x-warp-client-version": app_version,
|
|
|
"x-warp-os-category": os_category,
|
|
|
"x-warp-os-name": os_name,
|
|
|
"x-warp-os-version": os_version,
|
|
|
}
|
|
|
|
|
|
print("📊 调用GetRequestLimitInfo接口...")
|
|
|
|
|
|
response = await self.async_client.post(
|
|
|
url,
|
|
|
params={"op": "GetRequestLimitInfo"},
|
|
|
json=data,
|
|
|
headers=headers,
|
|
|
)
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
result = response.json()
|
|
|
|
|
|
|
|
|
if "errors" in result:
|
|
|
error_msg = result["errors"][0].get("message", "Unknown error")
|
|
|
print(f"❌ GraphQL错误: {error_msg}")
|
|
|
return {"success": False, "error": error_msg}
|
|
|
|
|
|
|
|
|
data_result = result.get("data", {})
|
|
|
user_data = data_result.get("user", {})
|
|
|
|
|
|
if user_data.get("__typename") == "UserOutput":
|
|
|
user_info = user_data.get("user", {})
|
|
|
request_limit_info = user_info.get("requestLimitInfo", {})
|
|
|
|
|
|
|
|
|
request_limit = request_limit_info.get("requestLimit", 0)
|
|
|
requests_used = request_limit_info.get("requestsUsedSinceLastRefresh", 0)
|
|
|
is_unlimited = request_limit_info.get("isUnlimited", False)
|
|
|
next_refresh_time = request_limit_info.get("nextRefreshTime", "N/A")
|
|
|
refresh_duration = request_limit_info.get("requestLimitRefreshDuration", "WEEKLY")
|
|
|
|
|
|
|
|
|
requests_remaining = request_limit - requests_used
|
|
|
|
|
|
|
|
|
if is_unlimited:
|
|
|
quota_type = "🚀 无限额度"
|
|
|
elif request_limit >= 2500:
|
|
|
quota_type = "🎉 高额度"
|
|
|
else:
|
|
|
quota_type = "📋 普通额度"
|
|
|
|
|
|
print(f"\n✅ 账户额度信息:")
|
|
|
print(f" {quota_type}: {request_limit}")
|
|
|
print(f" 📊 已使用: {requests_used}/{request_limit}")
|
|
|
print(f" 💎 剩余: {requests_remaining}")
|
|
|
print(f" 🔄 刷新周期: {refresh_duration}")
|
|
|
print(f" ⏰ 下次刷新: {next_refresh_time}")
|
|
|
|
|
|
|
|
|
if request_limit_info.get("isUnlimitedAutosuggestions"):
|
|
|
print(f" ✨ 自动建议: 无限制")
|
|
|
if request_limit_info.get("maxCodebaseIndices"):
|
|
|
print(f" 📚 最大代码库索引: {request_limit_info.get('maxCodebaseIndices')}")
|
|
|
|
|
|
return {
|
|
|
"success": True,
|
|
|
"requestLimit": request_limit,
|
|
|
"requestsUsed": requests_used,
|
|
|
"requestsRemaining": requests_remaining,
|
|
|
"isUnlimited": is_unlimited,
|
|
|
"nextRefreshTime": next_refresh_time,
|
|
|
"refreshDuration": refresh_duration,
|
|
|
"quotaType": "unlimited" if is_unlimited else ("high" if request_limit >= 2500 else "normal"),
|
|
|
"autosuggestions": {
|
|
|
"isUnlimited": request_limit_info.get("isUnlimitedAutosuggestions", False),
|
|
|
"limit": request_limit_info.get("acceptedAutosuggestionsLimit", 0),
|
|
|
"used": request_limit_info.get("acceptedAutosuggestionsSinceLastRefresh", 0)
|
|
|
},
|
|
|
"voice": {
|
|
|
"isUnlimited": request_limit_info.get("isUnlimitedVoice", False),
|
|
|
"requestLimit": request_limit_info.get("voiceRequestLimit", 0),
|
|
|
"requestsUsed": request_limit_info.get("voiceRequestsUsedSinceLastRefresh", 0),
|
|
|
"tokenLimit": request_limit_info.get("voiceTokenLimit", 0),
|
|
|
"tokensUsed": request_limit_info.get("voiceTokensUsedSinceLastRefresh", 0)
|
|
|
},
|
|
|
"codebase": {
|
|
|
"isUnlimited": request_limit_info.get("isUnlimitedCodebaseIndices", False),
|
|
|
"maxIndices": request_limit_info.get("maxCodebaseIndices", 0),
|
|
|
"maxFilesPerRepo": request_limit_info.get("maxFilesPerRepo", 0)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
elif user_data.get("__typename") == "UserFacingError":
|
|
|
error = user_data.get("error", {}).get("message", "Unknown error")
|
|
|
print(f"❌ 获取额度失败: {error}")
|
|
|
return {"success": False, "error": error}
|
|
|
else:
|
|
|
print(f"❌ 响应中没有找到用户信息")
|
|
|
return {"success": False, "error": "未找到用户信息"}
|
|
|
|
|
|
else:
|
|
|
error_text = response.text[:500]
|
|
|
print(f"❌ HTTP错误 {response.status_code}")
|
|
|
return {"success": False, "error": f"HTTP {response.status_code}: {error_text}"}
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"❌ 获取额度错误: {e}")
|
|
|
return {"success": False, "error": str(e)}
|
|
|
|
|
|
def update_account_usage(self, email: str) -> bool:
|
|
|
"""
|
|
|
更新账户使用信息
|
|
|
|
|
|
Args:
|
|
|
email: 账户邮箱
|
|
|
|
|
|
Returns:
|
|
|
是否更新成功
|
|
|
"""
|
|
|
try:
|
|
|
conn = sqlite3.connect(self.db_path)
|
|
|
cursor = conn.cursor()
|
|
|
|
|
|
cursor.execute("""
|
|
|
UPDATE accounts
|
|
|
SET last_used = CURRENT_TIMESTAMP,
|
|
|
use_count = use_count + 1,
|
|
|
updated_at = CURRENT_TIMESTAMP
|
|
|
WHERE email = ?
|
|
|
""", (email,))
|
|
|
|
|
|
conn.commit()
|
|
|
conn.close()
|
|
|
return True
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"❌ 更新账户使用信息失败: {e}")
|
|
|
return False
|
|
|
|
|
|
|
|
|
async def check_single_account(email: Optional[str] = None):
|
|
|
"""
|
|
|
检查单个账户的请求额度
|
|
|
|
|
|
Args:
|
|
|
email: 账户邮箱,如果为None则检查第一个active账户
|
|
|
"""
|
|
|
async with WarpRequestLimitChecker() as checker:
|
|
|
|
|
|
account = checker.get_account_from_db(email)
|
|
|
|
|
|
if not account:
|
|
|
print(f"❌ 未找到账户: {email if email else '没有active账户'}")
|
|
|
return
|
|
|
|
|
|
print(f"\n🔍 检查账户: {account['email']}")
|
|
|
print(f" 📅 创建时间: {account['created_at']}")
|
|
|
print(f" 🔢 使用次数: {account['use_count']}")
|
|
|
print(f" ⏱️ 上次使用: {account['last_used']}")
|
|
|
|
|
|
|
|
|
result = await checker.get_request_limit(account['id_token'])
|
|
|
|
|
|
if result['success']:
|
|
|
|
|
|
checker.update_account_usage(account['email'])
|
|
|
|
|
|
|
|
|
output_file = f"request_limit_{account['email'].split('@')[0]}.json"
|
|
|
with open(output_file, 'w', encoding='utf-8') as f:
|
|
|
json.dump(result, f, ensure_ascii=False, indent=2)
|
|
|
print(f"\n💾 结果已保存到: {output_file}")
|
|
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
async def check_all_accounts():
|
|
|
"""检查所有active账户的请求额度"""
|
|
|
async with WarpRequestLimitChecker() as checker:
|
|
|
|
|
|
conn = sqlite3.connect(checker.db_path)
|
|
|
conn.row_factory = sqlite3.Row
|
|
|
cursor = conn.cursor()
|
|
|
|
|
|
cursor.execute("""
|
|
|
SELECT email, id_token
|
|
|
FROM accounts
|
|
|
WHERE status = 'active'
|
|
|
ORDER BY id
|
|
|
""")
|
|
|
|
|
|
accounts = cursor.fetchall()
|
|
|
conn.close()
|
|
|
|
|
|
if not accounts:
|
|
|
print("❌ 没有找到active账户")
|
|
|
return
|
|
|
|
|
|
print(f"📋 找到 {len(accounts)} 个active账户")
|
|
|
|
|
|
results = []
|
|
|
for idx, account in enumerate(accounts, 1):
|
|
|
print(f"\n========== [{idx}/{len(accounts)}] ==========")
|
|
|
print(f"🔍 检查账户: {account['email']}")
|
|
|
|
|
|
result = await checker.get_request_limit(account['id_token'])
|
|
|
result['email'] = account['email']
|
|
|
results.append(result)
|
|
|
|
|
|
|
|
|
if result['success']:
|
|
|
checker.update_account_usage(account['email'])
|
|
|
|
|
|
|
|
|
if idx < len(accounts):
|
|
|
await asyncio.sleep(1)
|
|
|
|
|
|
|
|
|
output_file = f"all_accounts_limit_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
|
|
|
with open(output_file, 'w', encoding='utf-8') as f:
|
|
|
json.dump(results, f, ensure_ascii=False, indent=2)
|
|
|
print(f"\n💾 所有结果已保存到: {output_file}")
|
|
|
|
|
|
|
|
|
print("\n📊 统计摘要:")
|
|
|
success_count = sum(1 for r in results if r['success'])
|
|
|
unlimited_count = sum(1 for r in results if r.get('success') and r.get('isUnlimited'))
|
|
|
high_quota_count = sum(1 for r in results if r.get('success') and r.get('quotaType') == 'high')
|
|
|
normal_quota_count = sum(1 for r in results if r.get('success') and r.get('quotaType') == 'normal')
|
|
|
|
|
|
print(f" ✅ 成功检查: {success_count}/{len(accounts)}")
|
|
|
print(f" 🚀 无限额度: {unlimited_count}")
|
|
|
print(f" 🎉 高额度账户: {high_quota_count}")
|
|
|
print(f" 📋 普通额度账户: {normal_quota_count}")
|
|
|
|
|
|
|
|
|
def main():
|
|
|
"""主函数"""
|
|
|
import argparse
|
|
|
|
|
|
parser = argparse.ArgumentParser(description="Warp账户请求额度检查器")
|
|
|
parser.add_argument("--email", help="指定要检查的账户邮箱")
|
|
|
parser.add_argument("--all", action="store_true", help="检查所有active账户")
|
|
|
parser.add_argument("--db", default="warp_accounts.db", help="数据库路径")
|
|
|
parser.add_argument("--test", action="store_true", help="使用测试数据")
|
|
|
|
|
|
args = parser.parse_args()
|
|
|
|
|
|
if args.test:
|
|
|
|
|
|
test_id_token = ""
|
|
|
|
|
|
async def test_with_token():
|
|
|
async with WarpRequestLimitChecker() as checker:
|
|
|
print("🧪 测试模式 - 使用提供的ID Token")
|
|
|
result = await checker.get_request_limit(test_id_token)
|
|
|
|
|
|
if result['success']:
|
|
|
print("\n✅ 测试成功!")
|
|
|
with open("test_result.json", 'w', encoding='utf-8') as f:
|
|
|
json.dump(result, f, ensure_ascii=False, indent=2)
|
|
|
print("💾 结果已保存到: test_result.json")
|
|
|
else:
|
|
|
print(f"\n❌ 测试失败: {result.get('error')}")
|
|
|
|
|
|
asyncio.run(test_with_token())
|
|
|
|
|
|
elif args.all:
|
|
|
asyncio.run(check_all_accounts())
|
|
|
else:
|
|
|
asyncio.run(check_single_account(args.email))
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
main()
|
|
|
|