import base64 import hashlib import imghdr import json import logging import os import re from collections import deque import requests from DrissionPage._configs.chromium_options import ChromiumOptions from flask import Response, jsonify from DrissionPage import ChromiumPage from requests.exceptions import ProxyError from app.config import configure_logging, IMAGE_MODEL_NAMES, ProxyPool, G_TOKEN, POPAI_BASE_URL configure_logging() proxy_pool = ProxyPool() current_token_index = 0 def send_http_request(url, headers, data): try: response = requests.post(url, headers=headers, json=data) response.raise_for_status() return response except requests.exceptions.RequestException as e: logging.error("HTTP request error: %s", e) raise def get_env_variable(var_name): return os.getenv(var_name) def send_chat_message(req, auth_token, channel_id, final_user_content, model_name, user_stream, image_url, user_model_name): logging.info("Channel ID: %s", channel_id) # logging.info("Final User Content: %s", final_user_content) logging.info("Model Name: %s", model_name) logging.info("Image URL: %s", image_url) logging.info("User stream: %s", user_stream) url = "https://api.popai.pro/api/v1/chat/send" headers = { "Accept": "text/event-stream", "Accept-Encoding": "gzip, deflate, br, zstd", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", "App-Name": "popai-web", "Authorization": auth_token, "Content-Type": "application/json", "Device-Info": '{"web_id":"drBt-M9G_I9eKAgB8TdnY","baidu_id":"18f1fd3dc7749443876b69"}', "Gtoken": G_TOKEN, "Origin": "https://www.popai.pro", "Priority": "u=1, i", "Referer": "https://www.popai.pro/", "Sec-Ch-Ua": '"Chromium";v="124", "Google Chrome";v="124", "Not-A.Brand";v="99"', "Sec-Ch-Ua-Mobile": "?0", "Sec-Ch-Ua-Platform": "Windows", "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "same-site", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/124.0.0.0 Safari/537.36", "Cache-Control": "no-cache" } data = { "isGetJson": True, "version": "1.3.6", "language": "zh-CN", "channelId": channel_id, "message": final_user_content, "model": model_name, "messageIds": [], "imageUrls": image_url, "improveId": None, "richMessageId": None, "isNewChat": False, "action": None, "isGeneratePpt": False, "isSlidesChat": False, "roleEnum": None, "pptCoordinates": "", "translateLanguage": None, "docPromptTemplateId": None } max_retries = 3 for attempt in range(max_retries): try: logging.info("Using G_TOKEN: %s", headers["Gtoken"]) response = request_with_proxy_chat(url, headers, data, True) # logging.info("Response headers: %s", response.headers) # 检查响应头中的错误码 if response.headers.get('YJ-X-Content'): raise Exception(f"Popai response error. Error: {response.headers.get('YJ-X-Content')}") # 如果响应的内容类型是 'text/event-stream;charset=UTF-8' if response.headers.get('Content-Type') == 'text/event-stream;charset=UTF-8': if not user_stream: return stream_2_json(response, model_name, user_model_name) return stream_response(response, model_name) else: return stream_2_json(response, model_name, user_model_name) except requests.exceptions.RequestException as e: logging.error("send_chat_message error: %s", e) if attempt == max_retries - 1: return handle_error(e) except Exception as e: logging.error("send_chat_message error: %s", e) try: if "60001" in str(e): logging.warning(f"Received 60001 error code on attempt {attempt + 1}. Retrying...") if attempt == 1: # 第二次失败后更新 G_TOKEN headers["Gtoken"] = updateGtoken() # 更新G_TOKEN后需要更新header continue if attempt == max_retries - 1: return handle_error(e) except Exception as e: logging.error("Update_gtoken error: %s", e) return handle_error(e) return Exception(f"All attempts to send chat message failed.") def stream_response(resp, model_name): logging.info("Entering stream_response function") def generate(): for message in handle_http_response(resp): message_id = message.get("messageId", "") objectid = message.get("chunkId", "") content = message.get("content", "") wrapped_chunk = { "id": message_id, "object": "chat.completion", "created": 0, "model": model_name, "choices": [ { "index": 0, "delta": { "role": "assistant", "content": content }, "finish_reason": "stop", } ], "usage": { "prompt_tokens": 13, "completion_tokens": 7, "total_tokens": 20 }, "system_fingerprint": None } event_data = f"data: {json.dumps(wrapped_chunk, ensure_ascii=False)}\n\n" yield event_data.encode('utf-8') logging.info("Exiting stream_response function") return Response(generate(), mimetype='text/event-stream; charset=UTF-8') def stream_2_json(resp, model_name, user_model_name): logging.info("Entering stream_2_json function") chunks = [] merged_content = "" append_to_chunks = chunks.append for message in handle_http_response(resp): message_id = message.get("messageId", "") objectid = message.get("chunkId", "") content = message.get("content", "") merged_content += content if user_model_name in IMAGE_MODEL_NAMES: # 如果 model_name 在 IMAGE_MODEL_NAMES 内,转换为包含 URL 的格式 wrapped_chunk = { "created": 0, "data": [ {"url": extract_url_from_content(merged_content)} ] } else: wrapped_chunk = { "id": message_id, "object": "chat.completion", "created": 0, "model": model_name, "choices": [ { "index": 0, "message": { "role": "assistant", "content": merged_content }, "finish_reason": "stop", } ], "usage": { "prompt_tokens": 13, "completion_tokens": 7, "total_tokens": 20 }, "system_fingerprint": None } append_to_chunks(wrapped_chunk) logging.info("Exiting stream_2_json function") if chunks: return jsonify(chunks[-1]) else: raise Exception("No data available") def process_content(message): text_array = [] image_url_array = [] if isinstance(message, str): return message, image_url_array if isinstance(message, list): for msg in message: content_type = msg.get("type") if content_type == "text": text_array.append(msg.get("text", "")) elif content_type == "image_url": url = msg.get("image_url", {}).get("url", "") if is_base64_image(url): url = upload_image_to_telegraph(url) image_url_array.append(url) return '\n'.join(text_array), image_url_array def upload_image_to_telegraph(base64_string): try: if base64_string.startswith('data:image'): base64_string = base64_string.split(',')[1] image_data = base64.b64decode(base64_string) image_type = imghdr.what(None, image_data) if image_type is None: raise ValueError("Invalid image data") mime_type = f"image/{image_type}" files = {'file': (f'image.{image_type}', image_data, mime_type)} response = request_with_proxy_image('https://telegra.ph/upload', files=files) response.raise_for_status() json_response = response.json() if isinstance(json_response, list) and 'src' in json_response[0]: return 'https://telegra.ph' + json_response[0]['src'] else: raise ValueError(f"Unexpected response format: {json_response}") except requests.exceptions.RequestException as e: raise Exception(f"Failed to upload image. Error: {e}") except Exception as e: raise Exception(f"Failed to upload image. An error occurred: {e}") def is_base64_image(base64_string): return base64_string.startswith('data:image') def process_msg_content(content): if isinstance(content, str): return content elif isinstance(content, list): return ' '.join(item.get("text") for item in content if item.get("type") == "text") return None def get_user_contents(messages, limit): limit = int(limit) selected_messages = deque(maxlen=limit) first_user_message = None # 过滤并处理用户消息 for message in messages: if message.get("role") == "user": content = process_msg_content(message.get("content")) if content: selected_messages.append(content) if first_user_message is None: first_user_message = content # 检查是否有足够的消息 if selected_messages: end_user_message = selected_messages[-1] else: end_user_message = None # 拼接消息内容 if selected_messages: selected_messages.pop() # 移除最后一条数据 concatenated_messages = ' \n'.join(selected_messages) return first_user_message, end_user_message, concatenated_messages # def get_user_contents(messages, limit=3): # user_messages = [str(message.get("content", '')) for message in messages if message.get("role") == "user"] # end_message = user_messages[-1] if user_messages else None # selected_messages = user_messages[-limit-1:-1] if len(user_messages) > limit else user_messages[:-1] # concatenated_messages = ' '.join(selected_messages) # return end_message, concatenated_messages # def get_user_contents(messages, limit=3): # contents = [] # user_content_added = False # for message in messages: # if message.get("role") == "user" and not user_content_added: # contents.append(str(message.get("content", ''))) # user_content_added = True # return contents def fetch_channel_id(auth_token, model_name, content, template_id): url = "https://api.popai.pro/api/v1/chat/getChannel" headers = { "Accept": "application/json", "Accept-Encoding": "gzip, deflate, br, zstd", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", "App-Name": "popai-web", "Authorization": auth_token, "Content-Type": "application/json", "Device-Info": '{"web_id":"drBt-M9G_I9eKAgB8TdnY","baidu_id":"18f1fd3dc7749443876b69"}', "Language": "en", "Origin": "https://www.popai.pro", "Referer": "https://www.popai.pro/", "Pop-Url": "https://www.popai.pro/creation/All/Image", "Sec-Ch-Ua": '"Chromium";v="124", "Google Chrome";v="124", "Not-A.Brand";v="99"', "Sec-Ch-Ua-Mobile": "?0", "Sec-Ch-Ua-Platform": "Windows", "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "same-site", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36" } data = { "model": model_name, "templateId": template_id, "message": content, "language": "English", "fileType": None } try: response = request_with_proxy_chat(url, headers, data, False) response.raise_for_status() response_data = response.json() return response_data.get('data', {}).get('channelId') except requests.exceptions.RequestException as e: logging.error("fetch_channel_id error: %s", e) raise Exception(f"Failed to fetch channel_id. Error: {e}") from e def map_model_name(model_name): model_mapping = { "gpt-4": "GPT-4", "dalle3": "GPT-4", "dalle-3": "GPT-4", "dall-e-3": "GPT-4", "gpt-3.5": "Standard", "websearch": "Web Search", "internet": "Web Search", "gpt-4o": "GPT-4o" } sorted_keys = sorted(model_mapping.keys(), key=len, reverse=True) for key in sorted_keys: if model_name.lower().startswith(key): return model_mapping[key] return "GPT-4" def generate_hash(contents, model_name, token): concatenated = ''.join(contents) return token + model_name + hashlib.md5(concatenated.encode('utf-8')).hexdigest() def handle_http_response(resp): buffer = "" json_object_counter = 0 for chunk in resp.iter_content(chunk_size=None): buffer += chunk.decode('utf-8') while "\n\n" in buffer: json_object, buffer = buffer.split("\n\n", 1) if json_object.startswith("data:"): json_object = json_object[len("data:"):].strip() json_object_counter += 1 if json_object_counter == 1: continue try: chunk_json = json.loads(json_object) except json.JSONDecodeError as e: logging.error(f"Failed to parse JSON: {e}") continue for message in chunk_json: yield message def get_next_auth_token(tokens): if not tokens: raise ValueError("No tokens provided.") auth_tokens = tokens.split(',') global current_token_index token = auth_tokens[current_token_index] current_token_index = (current_token_index + 1) % len(auth_tokens) logging.info("Using token: %s", token) return token def handle_error(e): error_response = { "error": { "message": str(e), "type": "popai_2_api_error" } } return jsonify(error_response), 500 def get_request_parameters(body): messages = body.get("messages", []) model_name = body.get("model") prompt = body.get("prompt", False) stream = body.get("stream", False) return messages, model_name, prompt, stream def extract_url_from_content(content): # 使用正则表达式从 Markdown 内容中提取 URL match = re.search(r'\!\[.*?\]\((.*?)\)', content) return match.group(1) if match else content def request_with_proxy_image(url, files): return request_with_proxy(url, None, None, False, files) def request_with_proxy_chat(url, headers, data, stream): return request_with_proxy(url, headers, data, stream, None) def request_with_proxy(url, headers, data, stream, files): try: proxies = proxy_pool.get_random_proxy() logging.info("Use proxy url %s", proxies) if proxies: response = requests.post(url, headers=headers, json=data, stream=stream, files=files, proxies=proxies) else: response = requests.post(url, headers=headers, json=data, stream=stream, files=files) except ProxyError as e: logging.error(f"Proxy error occurred: {e}") raise Exception("Proxy error occurred") return response # def get_gtoken_back(): # gtoken = None # try: # with open('./recaptcha__zh_cn.js', 'r', encoding='utf-8', errors='ignore') as f: # str_js = f.read() # # options = uc.ChromeOptions() # # 无头模式 # # options.add_argument('--headless') # options.add_argument('--disable-gpu') # options.add_argument('--no-sandbox') # options.add_argument('--disable-dev-shm-usage') # # driver = uc.Chrome(options=options) # try: # driver.get(POPAI_BASE_URL) # # 设置最长等待时间s # WebDriverWait(driver, 20).until(lambda d: d.execute_async_script(str_js)) # gtoken = driver.execute_async_script(str_js) # # with open('gtoken.txt', 'a', encoding='utf-8', errors='ignore') as f: # f.write(gtoken) # f.write('\n') # finally: # driver.quit() # 确保浏览器实例在异常情况下也能关闭 # except Exception as e: # logging.error(f"An error occurred: {e}") # # return gtoken def get_gtoken(): gtoken = '' try: co = ChromiumOptions() co.headless() co.set_argument('--headless=new') co.set_argument('--no-sandbox') page = ChromiumPage(co) page.get('https://www.popai.pro') gtoken = page.run_js_loaded(''' return window.grecaptcha.enterprise.execute("6LfP64kpAAAAAP_Jl8kdL0-09UKzowM87iddJqXA", {action: "LOGIN"}); ''') except Exception as e: logging.error(f"An error occurred: {e}") return gtoken def updateGtoken(): global G_TOKEN G_TOKEN = get_gtoken() logging.info("G_TOKEN updated successfully") # logging.info("G_TOKEN: %s", G_TOKEN) return G_TOKEN