ml / task /scheduled_tasks.py
devin15's picture
Upload 31 files
3979178 verified
import json
import os
import time
import uuid
from datetime import datetime
import httpx
import pytz
from core.utils import refresh_token_via_rest
from core.config import get_settings
# 2. 使用切片并处理长度不足的情况
def show_last_20(text: str) -> str:
return text[-20:] if len(text) > 20 else text
async def daily_task():
"""异步每日任务"""
current_time = datetime.now(pytz.timezone(get_settings().TIMEZONE))
print(f"执行每日任务: {current_time}")
# 定时发起聊天会话 签到
url = 'https://arcane.getmerlin.in/v1/thread/unified'
ready_refresh_tokens = os.getenv("READY_REFRESH_TOKEN")
if ready_refresh_tokens:
ready_tokens_array = ready_refresh_tokens.split(',')
for e in ready_tokens_array:
id_token = await refresh_token_via_rest(e)
print(f'{e} 准备签到: {id_token}')
request_headers = {**get_settings().HEADERS, 'authorization': f"Bearer {id_token}"} # 从环境变量中获取新的TOKEN
json_data = {
"attachments": [],
"chatId": str(uuid.uuid4()),
"language": "AUTO",
"message": {
"childId": str(uuid.uuid4()),
"content": '介绍二叉树',
"context": "",
"id": str(uuid.uuid4()),
"parentId": 'root'
},
"metadata": {
"largeContext": False,
"merlinMagic": False,
"proFinderMode": False,
"webAccess": False
},
"mode": "UNIFIED_CHAT",
# "model": "claude-3.5-sonnet"
"model": 'gpt-4o-mini'
}
# 发送 POST 请求并处理流式响应
with httpx.Client(timeout=httpx.Timeout(30.0)) as client:
try:
with client.stream('POST', url, headers=request_headers, json=json_data) as response:
if response.status_code == 200:
current_time = datetime.now(pytz.timezone(get_settings().TIMEZONE))
# print(f'{id_token}在{current_time} 签到成功')
print(f'{show_last_20(id_token)}{current_time} 签到成功')
# for line in response.iter_lines():
# print(line)
# if line:
# # 处理每一行数据
# # 通常需要去掉 "data: " 前缀并解析 JSON
# if line and line.startswith('data: '):
# try:
# json_str = line[6:] # 去掉 "data: " 前缀
# if json_str.strip() == '[DONE]':
# break
# json_data = json.loads(json_str)
# # 处理 json_data
# if 'choices' in json_data and len(json_data['choices']) > 0:
# content = json_data['choices'][0].get('delta', {}).get('content', '')
# if content:
# print(content, end='', flush=True)
# except json.JSONDecodeError:
# continue
#
except httpx.HTTPStatusError as exc:
print(f"HTTP 错误发生: {e}")
raise
time.sleep(30)
async def weekly_report_task():
"""异步每周报告任务"""
current_time = datetime.now(pytz.timezone(get_settings().TIMEZONE))
print(f"生成每周报告: {current_time}")
# 实现异步周报生成逻辑
async def data_cleanup_task():
"""异步数据清理任务"""
current_time = datetime.now(pytz.timezone(get_settings().TIMEZONE))
print(f"执行数据清理: {current_time}")
# 实现异步数据清理逻辑