import json import os import time import uuid from datetime import datetime import httpx import pytz from core.utils import refresh_token_via_rest from core.config import get_settings # 2. 使用切片并处理长度不足的情况 def show_last_20(text: str) -> str: return text[-20:] if len(text) > 20 else text async def daily_task(): """异步每日任务""" current_time = datetime.now(pytz.timezone(get_settings().TIMEZONE)) print(f"执行每日任务: {current_time}") # 定时发起聊天会话 签到 url = 'https://arcane.getmerlin.in/v1/thread/unified' ready_refresh_tokens = os.getenv("READY_REFRESH_TOKEN") if ready_refresh_tokens: ready_tokens_array = ready_refresh_tokens.split(',') for e in ready_tokens_array: id_token = await refresh_token_via_rest(e) print(f'{e} 准备签到: {id_token}') request_headers = {**get_settings().HEADERS, 'authorization': f"Bearer {id_token}"} # 从环境变量中获取新的TOKEN json_data = { "attachments": [], "chatId": str(uuid.uuid4()), "language": "AUTO", "message": { "childId": str(uuid.uuid4()), "content": '介绍二叉树', "context": "", "id": str(uuid.uuid4()), "parentId": 'root' }, "metadata": { "largeContext": False, "merlinMagic": False, "proFinderMode": False, "webAccess": False }, "mode": "UNIFIED_CHAT", # "model": "claude-3.5-sonnet" "model": 'gpt-4o-mini' } # 发送 POST 请求并处理流式响应 with httpx.Client(timeout=httpx.Timeout(30.0)) as client: try: with client.stream('POST', url, headers=request_headers, json=json_data) as response: if response.status_code == 200: current_time = datetime.now(pytz.timezone(get_settings().TIMEZONE)) # print(f'{id_token}在{current_time} 签到成功') print(f'{show_last_20(id_token)}在{current_time} 签到成功') # for line in response.iter_lines(): # print(line) # if line: # # 处理每一行数据 # # 通常需要去掉 "data: " 前缀并解析 JSON # if line and line.startswith('data: '): # try: # json_str = line[6:] # 去掉 "data: " 前缀 # if json_str.strip() == '[DONE]': # break # json_data = json.loads(json_str) # # 处理 json_data # if 'choices' in json_data and len(json_data['choices']) > 0: # content = json_data['choices'][0].get('delta', {}).get('content', '') # if content: # print(content, end='', flush=True) # except json.JSONDecodeError: # continue # except httpx.HTTPStatusError as exc: print(f"HTTP 错误发生: {e}") raise time.sleep(30) async def weekly_report_task(): """异步每周报告任务""" current_time = datetime.now(pytz.timezone(get_settings().TIMEZONE)) print(f"生成每周报告: {current_time}") # 实现异步周报生成逻辑 async def data_cleanup_task(): """异步数据清理任务""" current_time = datetime.now(pytz.timezone(get_settings().TIMEZONE)) print(f"执行数据清理: {current_time}") # 实现异步数据清理逻辑