root_agent / upstash_redis_test.py
airsltd's picture
update
8abdadf
import os
import requests
from dotenv import load_dotenv
# 加载 .env 文件中的环境变量
load_dotenv()
UPSTASH_REDIS_REST_URL = os.getenv("UPSTASH_REDIS_REST_URL")
UPSTASH_REDIS_REST_TOKEN = os.getenv("UPSTASH_REDIS_REST_TOKEN")
if not UPSTASH_REDIS_REST_URL or not UPSTASH_REDIS_REST_TOKEN:
print("错误:UPSTASH_REDIS_REST_URL 或 UPSTASH_REDIS_REST_TOKEN 未设置。请检查 .env 文件。")
exit(1)
headers = {
"Authorization": f"Bearer {UPSTASH_REDIS_REST_TOKEN}",
"Content-Type": "application/json"
}
def execute_redis_command(command, *args):
url = ""
send_json_payload = False
payload = []
if command.upper() == "SET":
if len(args) != 2:
print("SET 命令需要两个参数:key 和 value。")
return None
key, value = args
url = f"{UPSTASH_REDIS_REST_URL}/set/{key}/{value}"
elif command.upper() == "GET":
if len(args) != 1:
print("GET 命令需要一个参数:key。")
return None
key = args[0]
url = f"{UPSTASH_REDIS_REST_URL}/get/{key}"
elif command.upper() == "DEL":
if len(args) != 1:
print("DEL 命令需要一个参数:key。")
return None
key = args[0]
url = f"{UPSTASH_REDIS_REST_URL}/del/{key}"
else:
# 对于其他命令,使用通用格式,并发送 JSON 数组作为 payload
url = f"{UPSTASH_REDIS_REST_URL}/{command}"
payload = [arg for arg in args]
send_json_payload = True
try:
if send_json_payload:
response = requests.post(url, headers=headers, json=payload)
else:
response = requests.post(url, headers=headers) # 不发送 json=payload
response.raise_for_status() # 如果请求失败,抛出 HTTPError
return response.json()
except requests.exceptions.HTTPError as http_err:
print(f"HTTP 错误发生: {http_err}")
print(f"响应内容: {response.text}")
except requests.exceptions.ConnectionError as conn_err:
print(f"连接错误发生: {conn_err}")
except requests.exceptions.Timeout as timeout_err:
print(f"请求超时: {timeout_err}")
except requests.exceptions.RequestException as req_err:
print(f"发生未知错误: {req_err}")
return None
def test_redis_connection():
print("正在测试 Upstash Redis 连接...")
# SET 命令
key = "mykey"
value = "hello_upstash"
print(f"正在设置键 '{key}' 为值 '{value}'...")
set_result = execute_redis_command("SET", key, value)
if set_result:
print(f"SET 结果: {set_result}")
if set_result and set_result.get('result') == "OK":
print("SET 命令成功。")
else:
print("SET 命令失败。")
return # 如果 SET 失败,才返回
# GET 命令
print(f"正在获取键 '{key}' 的值...")
get_result = execute_redis_command("GET", key)
if get_result is not None:
print(f"GET 结果: {get_result}")
if get_result and get_result.get('result') == value:
print("GET 命令成功,值匹配。")
else:
print("GET 命令失败,值不匹配或结果格式不正确。")
else:
print("GET 命令失败。")
return
# DEL 命令 (可选,用于清理)
print(f"正在删除键 '{key}'...")
del_result = execute_redis_command("DEL", key)
if del_result is not None:
print(f"DEL 结果: {del_result}")
if del_result and del_result.get('result') == 1:
print("DEL 命令成功。")
else:
print("DEL 命令失败或结果格式不正确。")
else:
print("DEL 命令失败。")
if __name__ == "__main__":
test_redis_connection()