Spaces:
Paused
Paused
| # src/core/grok_api_client.py | |
| import json | |
| import time | |
| import base64 | |
| import re | |
| import requests | |
| from curl_cffi import requests as curl_requests | |
| from src.core.logger import logger # 从新的位置导入 logger | |
| from src.core.utils import Utils # 从新的位置导入 Utils | |
| from app import CONFIG, DEFAULT_HEADERS # 导入 CONFIG 和 DEFAULT_HEADERS | |
| class GrokApiClient: | |
| def __init__(self, model_id): | |
| if model_id not in CONFIG["MODELS"]: | |
| raise ValueError(f"不支持的模型: {model_id}") | |
| self.model_id = CONFIG["MODELS"][model_id] | |
| def process_message_content(self, content): | |
| if isinstance(content, str): | |
| return content | |
| return None | |
| def get_image_type(self, base64_string): | |
| mime_type = 'image/jpeg' | |
| if 'data:image' in base64_string: | |
| import re | |
| matches = re.search(r'data:([a-zA-Z0-9]+\/[a-zA-Z0-9-.+]+);base64,', base64_string) | |
| if matches: | |
| mime_type = matches.group(1) | |
| extension = mime_type.split('/')[1] | |
| file_name = f"image.{extension}" | |
| return { | |
| "mimeType": mime_type, | |
| "fileName": file_name | |
| } | |
| def upload_base64_file(self, message, model): | |
| try: | |
| message_base64 = base64.b64encode(message.encode('utf-8')).decode('utf-8') | |
| upload_data = { | |
| "fileName": "message.txt", | |
| "fileMimeType": "text/plain", | |
| "content": message_base64 | |
| } | |
| logger.info("发送文字文件请求", "Server") | |
| cookie = f"{Utils.create_auth_headers(model, True)};{CONFIG['SERVER']['CF_CLEARANCE']}" | |
| proxy_options = Utils.get_proxy_options() | |
| response = curl_requests.post( | |
| "https://grok.com/rest/app-chat/upload-file", | |
| headers={ | |
| **DEFAULT_HEADERS, | |
| "Cookie":cookie | |
| }, | |
| json=upload_data, | |
| impersonate="chrome133a", | |
| verify=False, | |
| **proxy_options | |
| ) | |
| if response.status_code != 200: | |
| logger.error(f"上传文件失败,状态码:{response.status_code}", "Server") | |
| raise Exception(f"上传文件失败,状态码:{response.status_code}") | |
| result = response.json() | |
| logger.info(f"上传文件成功: {result}", "Server") | |
| return result.get("fileMetadataId", "") | |
| except Exception as error: | |
| logger.error(str(error), "Server") | |
| raise Exception(f"上传文件失败,状态码:{response.status_code}") | |
| def upload_base64_image(self, base64_data, url): | |
| try: | |
| if 'data:image' in base64_data: | |
| image_buffer = base64_data.split(',')[1] | |
| else: | |
| image_buffer = base64_data | |
| image_info = self.get_image_type(base64_data) | |
| mime_type = image_info["mimeType"] | |
| file_name = image_info["fileName"] | |
| upload_data = { | |
| "rpc": "uploadFile", | |
| "req": { | |
| "fileName": file_name, | |
| "fileMimeType": mime_type, | |
| "content": image_buffer | |
| } | |
| } | |
| logger.info("发送图片请求", "Server") | |
| proxy_options = Utils.get_proxy_options() | |
| response = curl_requests.post( | |
| url, | |
| headers={ | |
| **DEFAULT_HEADERS, | |
| "Cookie":CONFIG["SERVER"]['COOKIE'] | |
| }, | |
| json=upload_data, | |
| impersonate="chrome133a", | |
| verify=False, | |
| **proxy_options | |
| ) | |
| if response.status_code != 200: | |
| logger.error(f"上传图片失败,状态码:{response.status_code}", "Server") | |
| return '' | |
| result = response.json() | |
| logger.info(f"上传图片成功: {result}", "Server") | |
| return result.get("fileMetadataId", "") | |
| except Exception as error: | |
| logger.error(str(error), "Server") | |
| return '' | |
| def prepare_chat_request(self, request): | |
| if ((request["model"] == 'grok-2-imageGen' or request["model"] == 'grok-3-imageGen') and not CONFIG["API"]["PICGO_KEY"] and not CONFIG["API"]["TUMY_KEY"] and request.get("stream", False)): | |
| raise ValueError("该模型流式输出需要配置PICGO或者TUMY图床密钥!") | |
| todo_messages = request["messages"] | |
| if request["model"] in ['grok-2-imageGen', 'grok-3-imageGen', 'grok-3-deepsearch']: | |
| last_message = todo_messages[-1] | |
| if last_message["role"] != 'user': | |
| raise ValueError('此模型最后一条消息必须是用户消息!') | |
| todo_messages = [last_message] | |
| file_attachments = [] | |
| messages = '' | |
| last_role = None | |
| last_content = '' | |
| message_length = 0 | |
| convert_to_file = False | |
| last_message_content = '' | |
| search = request["model"] in ['grok-2-search', 'grok-3-search'] | |
| # 移除标签及其内容和base64图片 | |
| def remove_think_tags(text): | |
| import re | |
| text = re.sub(r'[\s\S]*?<\/think>', '', text).strip() | |
| text = re.sub(r'!\[image\]\(data:.*?base64,.*?\)', '[图片]', text) | |
| return text | |
| def process_content(content): | |
| if isinstance(content, list): | |
| text_content = '' | |
| for item in content: | |
| if item["type"] == 'image_url': | |
| text_content += ("[图片]" if not text_content else '\n[图片]') | |
| elif item["type"] == 'text': | |
| text_content += (remove_think_tags(item["text"]) if not text_content else '\n' + remove_think_tags(item["text"])) | |
| return text_content | |
| elif isinstance(content, dict) and content is not None: | |
| if content["type"] == 'image_url': | |
| return "[图片]" | |
| elif content["type"] == 'text': | |
| return remove_think_tags(content["text"]) | |
| return remove_think_tags(self.process_message_content(content)) | |
| for current in todo_messages: | |
| role = 'assistant' if current["role"] == 'assistant' else 'user' | |
| is_last_message = current == todo_messages[-1] | |
| if is_last_message and "content" in current: | |
| if isinstance(current["content"], list): | |
| for item in current["content"]: | |
| if item["type"] == 'image_url': | |
| processed_image = self.upload_base64_image( | |
| item["image_url"]["url"], | |
| f"{CONFIG['API']['BASE_URL']}/api/rpc" | |
| ) | |
| if processed_image: | |
| file_attachments.append(processed_image) | |
| elif isinstance(current["content"], dict) and current["content"].get("type") == 'image_url': | |
| processed_image = self.upload_base64_image( | |
| current["content"]["image_url"]["url"], | |
| f"{CONFIG['API']['BASE_URL']}/api/rpc" | |
| ) | |
| if processed_image: | |
| file_attachments.append(processed_image) | |
| text_content = process_content(current.get("content", "")) | |
| if is_last_message and convert_to_file: | |
| last_message_content = f"{role.upper()}: {text_content or '[图片]'}\n" | |
| continue | |
| if text_content or (is_last_message and file_attachments): | |
| if role == last_role and text_content: | |
| last_content += '\n' + text_content | |
| messages = messages[:messages.rindex(f"{role.upper()}: ")] + f"{role.upper()}: {last_content}\n" | |
| else: | |
| messages += f"{role.upper()}: {text_content or '[图片]'}\n" | |
| last_content = text_content | |
| last_role = role | |
| message_length += len(messages) | |
| if message_length >= 40000: | |
| convert_to_file = True | |
| if convert_to_file: | |
| file_id = self.upload_base64_file(messages, request["model"]) | |
| if file_id: | |
| file_attachments.insert(0, file_id) | |
| messages = last_message_content.strip() | |
| if messages.strip() == '': | |
| if convert_to_file: | |
| messages = '基于txt文件内容进行回复:' | |
| else: | |
| raise ValueError('消息内容为空!') | |
| return { | |
| "temporary": CONFIG["API"].get("IS_TEMP_CONVERSATION", False), | |
| "modelName": self.model_id, | |
| "message": messages.strip(), | |
| "fileAttachments": file_attachments[:4], | |
| "imageAttachments": [], | |
| "disableSearch": False, | |
| "enableImageGeneration": True, | |
| "returnImageBytes": False, | |
| "enableImageStreaming": False, | |
| "imageGenerationCount": 1, | |
| "forceConcise": False, | |
| "toolOverrides": { | |
| "imageGen": request["model"] in ['grok-2-imageGen', 'grok-3-imageGen'], | |
| "webSearch": search, | |
| "xSearch": search, | |
| "xMediaSearch": search, | |
| "trendsSearch": search, | |
| "xPostAnalyze": search | |
| }, | |
| "enableSideBySide": True, | |
| "isPreset": False, | |
| "sendFinalMetadata": True, | |
| "customInstructions": "", | |
| "deepsearchPreset": "default" if request["model"] == 'grok-3-deepsearch' else "", | |
| "isReasoning": request["model"] == 'grok-3-reasoning' | |
| } |