import os import requests from dotenv import load_dotenv # 加载 .env 文件中的环境变量 load_dotenv() UPSTASH_REDIS_REST_URL = os.getenv("UPSTASH_REDIS_REST_URL") UPSTASH_REDIS_REST_TOKEN = os.getenv("UPSTASH_REDIS_REST_TOKEN") if not UPSTASH_REDIS_REST_URL or not UPSTASH_REDIS_REST_TOKEN: print("错误:UPSTASH_REDIS_REST_URL 或 UPSTASH_REDIS_REST_TOKEN 未设置。请检查 .env 文件。") exit(1) headers = { "Authorization": f"Bearer {UPSTASH_REDIS_REST_TOKEN}", "Content-Type": "application/json" } def execute_redis_command(command, *args): url = "" send_json_payload = False payload = [] if command.upper() == "SET": if len(args) != 2: print("SET 命令需要两个参数:key 和 value。") return None key, value = args url = f"{UPSTASH_REDIS_REST_URL}/set/{key}/{value}" elif command.upper() == "GET": if len(args) != 1: print("GET 命令需要一个参数:key。") return None key = args[0] url = f"{UPSTASH_REDIS_REST_URL}/get/{key}" elif command.upper() == "DEL": if len(args) != 1: print("DEL 命令需要一个参数:key。") return None key = args[0] url = f"{UPSTASH_REDIS_REST_URL}/del/{key}" else: # 对于其他命令,使用通用格式,并发送 JSON 数组作为 payload url = f"{UPSTASH_REDIS_REST_URL}/{command}" payload = [arg for arg in args] send_json_payload = True try: if send_json_payload: response = requests.post(url, headers=headers, json=payload) else: response = requests.post(url, headers=headers) # 不发送 json=payload response.raise_for_status() # 如果请求失败,抛出 HTTPError return response.json() except requests.exceptions.HTTPError as http_err: print(f"HTTP 错误发生: {http_err}") print(f"响应内容: {response.text}") except requests.exceptions.ConnectionError as conn_err: print(f"连接错误发生: {conn_err}") except requests.exceptions.Timeout as timeout_err: print(f"请求超时: {timeout_err}") except requests.exceptions.RequestException as req_err: print(f"发生未知错误: {req_err}") return None def test_redis_connection(): print("正在测试 Upstash Redis 连接...") # SET 命令 key = "mykey" value = "hello_upstash" print(f"正在设置键 '{key}' 为值 '{value}'...") set_result = execute_redis_command("SET", key, value) if set_result: print(f"SET 结果: {set_result}") if set_result and set_result.get('result') == "OK": print("SET 命令成功。") else: print("SET 命令失败。") return # 如果 SET 失败,才返回 # GET 命令 print(f"正在获取键 '{key}' 的值...") get_result = execute_redis_command("GET", key) if get_result is not None: print(f"GET 结果: {get_result}") if get_result and get_result.get('result') == value: print("GET 命令成功,值匹配。") else: print("GET 命令失败,值不匹配或结果格式不正确。") else: print("GET 命令失败。") return # DEL 命令 (可选,用于清理) print(f"正在删除键 '{key}'...") del_result = execute_redis_command("DEL", key) if del_result is not None: print(f"DEL 结果: {del_result}") if del_result and del_result.get('result') == 1: print("DEL 命令成功。") else: print("DEL 命令失败或结果格式不正确。") else: print("DEL 命令失败。") if __name__ == "__main__": test_redis_connection()