Spaces:
Paused
Paused
File size: 9,919 Bytes
df4585d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 | # src/core/grok_api_client.py
import json
import time
import base64
import re
import requests
from curl_cffi import requests as curl_requests
from src.core.logger import logger # 从新的位置导入 logger
from src.core.utils import Utils # 从新的位置导入 Utils
from app import CONFIG, DEFAULT_HEADERS # 导入 CONFIG 和 DEFAULT_HEADERS
class GrokApiClient:
def __init__(self, model_id):
if model_id not in CONFIG["MODELS"]:
raise ValueError(f"不支持的模型: {model_id}")
self.model_id = CONFIG["MODELS"][model_id]
def process_message_content(self, content):
if isinstance(content, str):
return content
return None
def get_image_type(self, base64_string):
mime_type = 'image/jpeg'
if 'data:image' in base64_string:
import re
matches = re.search(r'data:([a-zA-Z0-9]+\/[a-zA-Z0-9-.+]+);base64,', base64_string)
if matches:
mime_type = matches.group(1)
extension = mime_type.split('/')[1]
file_name = f"image.{extension}"
return {
"mimeType": mime_type,
"fileName": file_name
}
def upload_base64_file(self, message, model):
try:
message_base64 = base64.b64encode(message.encode('utf-8')).decode('utf-8')
upload_data = {
"fileName": "message.txt",
"fileMimeType": "text/plain",
"content": message_base64
}
logger.info("发送文字文件请求", "Server")
cookie = f"{Utils.create_auth_headers(model, True)};{CONFIG['SERVER']['CF_CLEARANCE']}"
proxy_options = Utils.get_proxy_options()
response = curl_requests.post(
"https://grok.com/rest/app-chat/upload-file",
headers={
**DEFAULT_HEADERS,
"Cookie":cookie
},
json=upload_data,
impersonate="chrome133a",
verify=False,
**proxy_options
)
if response.status_code != 200:
logger.error(f"上传文件失败,状态码:{response.status_code}", "Server")
raise Exception(f"上传文件失败,状态码:{response.status_code}")
result = response.json()
logger.info(f"上传文件成功: {result}", "Server")
return result.get("fileMetadataId", "")
except Exception as error:
logger.error(str(error), "Server")
raise Exception(f"上传文件失败,状态码:{response.status_code}")
def upload_base64_image(self, base64_data, url):
try:
if 'data:image' in base64_data:
image_buffer = base64_data.split(',')[1]
else:
image_buffer = base64_data
image_info = self.get_image_type(base64_data)
mime_type = image_info["mimeType"]
file_name = image_info["fileName"]
upload_data = {
"rpc": "uploadFile",
"req": {
"fileName": file_name,
"fileMimeType": mime_type,
"content": image_buffer
}
}
logger.info("发送图片请求", "Server")
proxy_options = Utils.get_proxy_options()
response = curl_requests.post(
url,
headers={
**DEFAULT_HEADERS,
"Cookie":CONFIG["SERVER"]['COOKIE']
},
json=upload_data,
impersonate="chrome133a",
verify=False,
**proxy_options
)
if response.status_code != 200:
logger.error(f"上传图片失败,状态码:{response.status_code}", "Server")
return ''
result = response.json()
logger.info(f"上传图片成功: {result}", "Server")
return result.get("fileMetadataId", "")
except Exception as error:
logger.error(str(error), "Server")
return ''
def prepare_chat_request(self, request):
if ((request["model"] == 'grok-2-imageGen' or request["model"] == 'grok-3-imageGen') and not CONFIG["API"]["PICGO_KEY"] and not CONFIG["API"]["TUMY_KEY"] and request.get("stream", False)):
raise ValueError("该模型流式输出需要配置PICGO或者TUMY图床密钥!")
todo_messages = request["messages"]
if request["model"] in ['grok-2-imageGen', 'grok-3-imageGen', 'grok-3-deepsearch']:
last_message = todo_messages[-1]
if last_message["role"] != 'user':
raise ValueError('此模型最后一条消息必须是用户消息!')
todo_messages = [last_message]
file_attachments = []
messages = ''
last_role = None
last_content = ''
message_length = 0
convert_to_file = False
last_message_content = ''
search = request["model"] in ['grok-2-search', 'grok-3-search']
# 移除标签及其内容和base64图片
def remove_think_tags(text):
import re
text = re.sub(r'[\s\S]*?<\/think>', '', text).strip()
text = re.sub(r'!\[image\]\(data:.*?base64,.*?\)', '[图片]', text)
return text
def process_content(content):
if isinstance(content, list):
text_content = ''
for item in content:
if item["type"] == 'image_url':
text_content += ("[图片]" if not text_content else '\n[图片]')
elif item["type"] == 'text':
text_content += (remove_think_tags(item["text"]) if not text_content else '\n' + remove_think_tags(item["text"]))
return text_content
elif isinstance(content, dict) and content is not None:
if content["type"] == 'image_url':
return "[图片]"
elif content["type"] == 'text':
return remove_think_tags(content["text"])
return remove_think_tags(self.process_message_content(content))
for current in todo_messages:
role = 'assistant' if current["role"] == 'assistant' else 'user'
is_last_message = current == todo_messages[-1]
if is_last_message and "content" in current:
if isinstance(current["content"], list):
for item in current["content"]:
if item["type"] == 'image_url':
processed_image = self.upload_base64_image(
item["image_url"]["url"],
f"{CONFIG['API']['BASE_URL']}/api/rpc"
)
if processed_image:
file_attachments.append(processed_image)
elif isinstance(current["content"], dict) and current["content"].get("type") == 'image_url':
processed_image = self.upload_base64_image(
current["content"]["image_url"]["url"],
f"{CONFIG['API']['BASE_URL']}/api/rpc"
)
if processed_image:
file_attachments.append(processed_image)
text_content = process_content(current.get("content", ""))
if is_last_message and convert_to_file:
last_message_content = f"{role.upper()}: {text_content or '[图片]'}\n"
continue
if text_content or (is_last_message and file_attachments):
if role == last_role and text_content:
last_content += '\n' + text_content
messages = messages[:messages.rindex(f"{role.upper()}: ")] + f"{role.upper()}: {last_content}\n"
else:
messages += f"{role.upper()}: {text_content or '[图片]'}\n"
last_content = text_content
last_role = role
message_length += len(messages)
if message_length >= 40000:
convert_to_file = True
if convert_to_file:
file_id = self.upload_base64_file(messages, request["model"])
if file_id:
file_attachments.insert(0, file_id)
messages = last_message_content.strip()
if messages.strip() == '':
if convert_to_file:
messages = '基于txt文件内容进行回复:'
else:
raise ValueError('消息内容为空!')
return {
"temporary": CONFIG["API"].get("IS_TEMP_CONVERSATION", False),
"modelName": self.model_id,
"message": messages.strip(),
"fileAttachments": file_attachments[:4],
"imageAttachments": [],
"disableSearch": False,
"enableImageGeneration": True,
"returnImageBytes": False,
"enableImageStreaming": False,
"imageGenerationCount": 1,
"forceConcise": False,
"toolOverrides": {
"imageGen": request["model"] in ['grok-2-imageGen', 'grok-3-imageGen'],
"webSearch": search,
"xSearch": search,
"xMediaSearch": search,
"trendsSearch": search,
"xPostAnalyze": search
},
"enableSideBySide": True,
"isPreset": False,
"sendFinalMetadata": True,
"customInstructions": "",
"deepsearchPreset": "default" if request["model"] == 'grok-3-deepsearch' else "",
"isReasoning": request["model"] == 'grok-3-reasoning'
} |