| import os |
| import json |
| import random |
| import json |
| import os |
| import uuid |
| import ssl |
| import certifi |
| import aiohttp |
| import asyncio |
|
|
| import requests |
| from ...typing import sha256, Dict, get_type_hints |
|
|
| url = 'https://bing.com/chat' |
| model = ['gpt-4'] |
| supports_stream = True |
| needs_auth = False |
|
|
| ssl_context = ssl.create_default_context() |
| ssl_context.load_verify_locations(certifi.where()) |
|
|
|
|
| class optionsSets: |
| optionSet: dict = { |
| 'tone': str, |
| 'optionsSets': list |
| } |
|
|
| jailbreak: dict = { |
| "optionsSets": [ |
| 'saharasugg', |
| 'enablenewsfc', |
| 'clgalileo', |
| 'gencontentv3', |
| "nlu_direct_response_filter", |
| "deepleo", |
| "disable_emoji_spoken_text", |
| "responsible_ai_policy_235", |
| "enablemm", |
| "h3precise" |
| |
| "dtappid", |
| "cricinfo", |
| "cricinfov2", |
| "dv3sugg", |
| "nojbfedge" |
| ] |
| } |
|
|
|
|
| class Defaults: |
| delimiter = '\x1e' |
| ip_address = f'13.{random.randint(104, 107)}.{random.randint(0, 255)}.{random.randint(0, 255)}' |
|
|
| allowedMessageTypes = [ |
| 'Chat', |
| 'Disengaged', |
| 'AdsQuery', |
| 'SemanticSerp', |
| 'GenerateContentQuery', |
| 'SearchQuery', |
| 'ActionRequest', |
| 'Context', |
| 'Progress', |
| 'AdsQuery', |
| 'SemanticSerp' |
| ] |
|
|
| sliceIds = [ |
|
|
| |
| |
| |
|
|
| 'winmuid3tf', |
| 'osbsdusgreccf', |
| 'ttstmout', |
| 'crchatrev', |
| 'winlongmsgtf', |
| 'ctrlworkpay', |
| 'norespwtf', |
| 'tempcacheread', |
| 'temptacache', |
| '505scss0', |
| '508jbcars0', |
| '515enbotdets0', |
| '5082tsports', |
| '515vaoprvs', |
| '424dagslnv1s0', |
| 'kcimgattcf', |
| '427startpms0' |
| ] |
|
|
| location = { |
| 'locale': 'en-US', |
| 'market': 'en-US', |
| 'region': 'US', |
| 'locationHints': [ |
| { |
| 'country': 'United States', |
| 'state': 'California', |
| 'city': 'Los Angeles', |
| 'timezoneoffset': 8, |
| 'countryConfidence': 8, |
| 'Center': { |
| 'Latitude': 34.0536909, |
| 'Longitude': -118.242766 |
| }, |
| 'RegionType': 2, |
| 'SourceType': 1 |
| } |
| ], |
| } |
|
|
|
|
| def _format(msg: dict) -> str: |
| return json.dumps(msg, ensure_ascii=False) + Defaults.delimiter |
|
|
|
|
| async def create_conversation(): |
| for _ in range(5): |
| create = requests.get('https://www.bing.com/turing/conversation/create', |
| headers={ |
| 'authority': 'edgeservices.bing.com', |
| 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', |
| 'accept-language': 'en-US,en;q=0.9', |
| 'cache-control': 'max-age=0', |
| 'sec-ch-ua': '"Chromium";v="110", "Not A(Brand";v="24", "Microsoft Edge";v="110"', |
| 'sec-ch-ua-arch': '"x86"', |
| 'sec-ch-ua-bitness': '"64"', |
| 'sec-ch-ua-full-version': '"110.0.1587.69"', |
| 'sec-ch-ua-full-version-list': '"Chromium";v="110.0.5481.192", "Not A(Brand";v="24.0.0.0", "Microsoft Edge";v="110.0.1587.69"', |
| 'sec-ch-ua-mobile': '?0', |
| 'sec-ch-ua-model': '""', |
| 'sec-ch-ua-platform': '"Windows"', |
| 'sec-ch-ua-platform-version': '"15.0.0"', |
| 'sec-fetch-dest': 'document', |
| 'sec-fetch-mode': 'navigate', |
| 'sec-fetch-site': 'none', |
| 'sec-fetch-user': '?1', |
| 'upgrade-insecure-requests': '1', |
| 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36 Edg/110.0.1587.69', |
| 'x-edge-shopping-flag': '1', |
| 'x-forwarded-for': Defaults.ip_address |
| }) |
|
|
| conversationId = create.json().get('conversationId') |
| clientId = create.json().get('clientId') |
| conversationSignature = create.json().get('conversationSignature') |
|
|
| if not conversationId or not clientId or not conversationSignature and _ == 4: |
| raise Exception('Failed to create conversation.') |
|
|
| return conversationId, clientId, conversationSignature |
|
|
|
|
| async def stream_generate(prompt: str, mode: optionsSets.optionSet = optionsSets.jailbreak, context: bool or str = False): |
| timeout = aiohttp.ClientTimeout(total=900) |
| session = aiohttp.ClientSession(timeout=timeout) |
|
|
| conversationId, clientId, conversationSignature = await create_conversation() |
|
|
| wss = await session.ws_connect('wss://sydney.bing.com/sydney/ChatHub', ssl=ssl_context, autoping=False, |
| headers={ |
| 'accept': 'application/json', |
| 'accept-language': 'en-US,en;q=0.9', |
| 'content-type': 'application/json', |
| 'sec-ch-ua': '"Not_A Brand";v="99", "Microsoft Edge";v="110", "Chromium";v="110"', |
| 'sec-ch-ua-arch': '"x86"', |
| 'sec-ch-ua-bitness': '"64"', |
| 'sec-ch-ua-full-version': '"109.0.1518.78"', |
| 'sec-ch-ua-full-version-list': '"Chromium";v="110.0.5481.192", "Not A(Brand";v="24.0.0.0", "Microsoft Edge";v="110.0.1587.69"', |
| 'sec-ch-ua-mobile': '?0', |
| 'sec-ch-ua-model': '', |
| 'sec-ch-ua-platform': '"Windows"', |
| 'sec-ch-ua-platform-version': '"15.0.0"', |
| 'sec-fetch-dest': 'empty', |
| 'sec-fetch-mode': 'cors', |
| 'sec-fetch-site': 'same-origin', |
| 'x-ms-client-request-id': str(uuid.uuid4()), |
| 'x-ms-useragent': 'azsdk-js-api-client-factory/1.0.0-beta.1 core-rest-pipeline/1.10.0 OS/Win32', |
| 'Referer': 'https://www.bing.com/search?q=Bing+AI&showconv=1&FORM=hpcodx', |
| 'Referrer-Policy': 'origin-when-cross-origin', |
| 'x-forwarded-for': Defaults.ip_address |
| }) |
|
|
| await wss.send_str(_format({'protocol': 'json', 'version': 1})) |
| await wss.receive(timeout=900) |
|
|
| struct = { |
| 'arguments': [ |
| { |
| **mode, |
| 'source': 'cib', |
| 'allowedMessageTypes': Defaults.allowedMessageTypes, |
| 'sliceIds': Defaults.sliceIds, |
| 'traceId': os.urandom(16).hex(), |
| 'isStartOfSession': True, |
| 'message': Defaults.location | { |
| 'author': 'user', |
| 'inputMethod': 'Keyboard', |
| 'text': prompt, |
| 'messageType': 'Chat' |
| }, |
| 'conversationSignature': conversationSignature, |
| 'participant': { |
| 'id': clientId |
| }, |
| 'conversationId': conversationId |
| } |
| ], |
| 'invocationId': '0', |
| 'target': 'chat', |
| 'type': 4 |
| } |
|
|
| if context: |
| struct['arguments'][0]['previousMessages'] = [ |
| { |
| "author": "user", |
| "description": context, |
| "contextType": "WebPage", |
| "messageType": "Context", |
| "messageId": "discover-web--page-ping-mriduna-----" |
| } |
| ] |
|
|
| await wss.send_str(_format(struct)) |
|
|
| final = False |
| draw = False |
| resp_txt = '' |
| result_text = '' |
| resp_txt_no_link = '' |
| cache_text = '' |
|
|
| while not final: |
| msg = await wss.receive(timeout=900) |
| objects = msg.data.split(Defaults.delimiter) |
|
|
| for obj in objects: |
| if obj is None or not obj: |
| continue |
|
|
| response = json.loads(obj) |
| if response.get('type') == 1 and response['arguments'][0].get('messages',): |
| if not draw: |
| if (response['arguments'][0]['messages'][0]['contentOrigin'] != 'Apology') and not draw: |
| resp_txt = result_text + \ |
| response['arguments'][0]['messages'][0]['adaptiveCards'][0]['body'][0].get( |
| 'text', '') |
| resp_txt_no_link = result_text + \ |
| response['arguments'][0]['messages'][0].get( |
| 'text', '') |
|
|
| if response['arguments'][0]['messages'][0].get('messageType',): |
| resp_txt = ( |
| resp_txt |
| + response['arguments'][0]['messages'][0]['adaptiveCards'][0]['body'][0]['inlines'][0].get('text') |
| + '\n' |
| ) |
| result_text = ( |
| result_text |
| + response['arguments'][0]['messages'][0]['adaptiveCards'][0]['body'][0]['inlines'][0].get('text') |
| + '\n' |
| ) |
|
|
| if cache_text.endswith(' '): |
| final = True |
| if wss and not wss.closed: |
| await wss.close() |
| if session and not session.closed: |
| await session.close() |
| |
| yield (resp_txt.replace(cache_text, '')) |
| cache_text = resp_txt |
|
|
| elif response.get('type') == 2: |
| if response['item']['result'].get('error'): |
| if wss and not wss.closed: |
| await wss.close() |
| if session and not session.closed: |
| await session.close() |
|
|
| raise Exception( |
| f"{response['item']['result']['value']}: {response['item']['result']['message']}") |
|
|
| if draw: |
| cache = response['item']['messages'][1]['adaptiveCards'][0]['body'][0]['text'] |
| response['item']['messages'][1]['adaptiveCards'][0]['body'][0]['text'] = ( |
| cache + resp_txt) |
|
|
| if (response['item']['messages'][-1]['contentOrigin'] == 'Apology' and resp_txt): |
| response['item']['messages'][-1]['text'] = resp_txt_no_link |
| response['item']['messages'][-1]['adaptiveCards'][0]['body'][0]['text'] = resp_txt |
|
|
| |
|
|
| final = True |
| if wss and not wss.closed: |
| await wss.close() |
| if session and not session.closed: |
| await session.close() |
|
|
|
|
| def run(generator): |
| loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(loop) |
| gen = generator.__aiter__() |
|
|
| while True: |
| try: |
| next_val = loop.run_until_complete(gen.__anext__()) |
| yield next_val |
|
|
| except StopAsyncIteration: |
| break |
| |
|
|
| def convert(messages): |
| context = "" |
|
|
| for message in messages: |
| context += "[%s](#message)\n%s\n\n" % (message['role'], |
| message['content']) |
|
|
| return context |
|
|
|
|
| def _create_completion(model: str, messages: list, stream: bool, **kwargs): |
| if len(messages) < 2: |
| prompt = messages[0]['content'] |
| context = False |
|
|
| else: |
| prompt = messages[-1]['content'] |
| context = convert(messages[:-1]) |
|
|
| response = run(stream_generate(prompt, optionsSets.jailbreak, context)) |
| for token in response: |
| yield (token) |
|
|
| |
|
|
|
|
| params = f'g4f.Providers.{os.path.basename(__file__)[:-3]} supports: ' + \ |
| '(%s)' % ', '.join( |
| [f"{name}: {get_type_hints(_create_completion)[name].__name__}" for name in _create_completion.__code__.co_varnames[:_create_completion.__code__.co_argcount]]) |
|
|