Spaces:
Sleeping
Sleeping
| import os | |
| import requests | |
| from dotenv import load_dotenv | |
| # 加载 .env 文件中的环境变量 | |
| load_dotenv() | |
| UPSTASH_REDIS_REST_URL = os.getenv("UPSTASH_REDIS_REST_URL") | |
| UPSTASH_REDIS_REST_TOKEN = os.getenv("UPSTASH_REDIS_REST_TOKEN") | |
| if not UPSTASH_REDIS_REST_URL or not UPSTASH_REDIS_REST_TOKEN: | |
| print("错误:UPSTASH_REDIS_REST_URL 或 UPSTASH_REDIS_REST_TOKEN 未设置。请检查 .env 文件。") | |
| exit(1) | |
| headers = { | |
| "Authorization": f"Bearer {UPSTASH_REDIS_REST_TOKEN}", | |
| "Content-Type": "application/json" | |
| } | |
| def execute_redis_command(command, *args): | |
| url = "" | |
| send_json_payload = False | |
| payload = [] | |
| if command.upper() == "SET": | |
| if len(args) != 2: | |
| print("SET 命令需要两个参数:key 和 value。") | |
| return None | |
| key, value = args | |
| url = f"{UPSTASH_REDIS_REST_URL}/set/{key}/{value}" | |
| elif command.upper() == "GET": | |
| if len(args) != 1: | |
| print("GET 命令需要一个参数:key。") | |
| return None | |
| key = args[0] | |
| url = f"{UPSTASH_REDIS_REST_URL}/get/{key}" | |
| elif command.upper() == "DEL": | |
| if len(args) != 1: | |
| print("DEL 命令需要一个参数:key。") | |
| return None | |
| key = args[0] | |
| url = f"{UPSTASH_REDIS_REST_URL}/del/{key}" | |
| else: | |
| # 对于其他命令,使用通用格式,并发送 JSON 数组作为 payload | |
| url = f"{UPSTASH_REDIS_REST_URL}/{command}" | |
| payload = [arg for arg in args] | |
| send_json_payload = True | |
| try: | |
| if send_json_payload: | |
| response = requests.post(url, headers=headers, json=payload) | |
| else: | |
| response = requests.post(url, headers=headers) # 不发送 json=payload | |
| response.raise_for_status() # 如果请求失败,抛出 HTTPError | |
| return response.json() | |
| except requests.exceptions.HTTPError as http_err: | |
| print(f"HTTP 错误发生: {http_err}") | |
| print(f"响应内容: {response.text}") | |
| except requests.exceptions.ConnectionError as conn_err: | |
| print(f"连接错误发生: {conn_err}") | |
| except requests.exceptions.Timeout as timeout_err: | |
| print(f"请求超时: {timeout_err}") | |
| except requests.exceptions.RequestException as req_err: | |
| print(f"发生未知错误: {req_err}") | |
| return None | |
| def test_redis_connection(): | |
| print("正在测试 Upstash Redis 连接...") | |
| # SET 命令 | |
| key = "mykey" | |
| value = "hello_upstash" | |
| print(f"正在设置键 '{key}' 为值 '{value}'...") | |
| set_result = execute_redis_command("SET", key, value) | |
| if set_result: | |
| print(f"SET 结果: {set_result}") | |
| if set_result and set_result.get('result') == "OK": | |
| print("SET 命令成功。") | |
| else: | |
| print("SET 命令失败。") | |
| return # 如果 SET 失败,才返回 | |
| # GET 命令 | |
| print(f"正在获取键 '{key}' 的值...") | |
| get_result = execute_redis_command("GET", key) | |
| if get_result is not None: | |
| print(f"GET 结果: {get_result}") | |
| if get_result and get_result.get('result') == value: | |
| print("GET 命令成功,值匹配。") | |
| else: | |
| print("GET 命令失败,值不匹配或结果格式不正确。") | |
| else: | |
| print("GET 命令失败。") | |
| return | |
| # DEL 命令 (可选,用于清理) | |
| print(f"正在删除键 '{key}'...") | |
| del_result = execute_redis_command("DEL", key) | |
| if del_result is not None: | |
| print(f"DEL 结果: {del_result}") | |
| if del_result and del_result.get('result') == 1: | |
| print("DEL 命令成功。") | |
| else: | |
| print("DEL 命令失败或结果格式不正确。") | |
| else: | |
| print("DEL 命令失败。") | |
| if __name__ == "__main__": | |
| test_redis_connection() | |