| | import os |
| | import json |
| | import random |
| | import json |
| | import os |
| | import uuid |
| | import ssl |
| | import certifi |
| | import aiohttp |
| | import asyncio |
| |
|
| | import requests |
| | from typing import Dict, NewType, Union, Optional, List, get_type_hints |
| |
|
| | sha256 = NewType('sha_256_hash', str) |
| |
|
| | url = 'https://bing.com/chat' |
| | model = ['gpt-4'] |
| | supports_stream = True |
| | needs_auth = False |
| |
|
| | ssl_context = ssl.create_default_context() |
| | ssl_context.load_verify_locations(certifi.where()) |
| |
|
| |
|
| | class optionsSets: |
| | optionSet: dict = { |
| | 'tone': str, |
| | 'optionsSets': list |
| | } |
| |
|
| | jailbreak: dict = { |
| | "optionsSets": [ |
| | 'saharasugg', |
| | 'enablenewsfc', |
| | 'clgalileo', |
| | 'gencontentv3', |
| | "nlu_direct_response_filter", |
| | "deepleo", |
| | "disable_emoji_spoken_text", |
| | "responsible_ai_policy_235", |
| | "enablemm", |
| | "h3precise" |
| | |
| | "dtappid", |
| | "cricinfo", |
| | "cricinfov2", |
| | "dv3sugg", |
| | "nojbfedge" |
| | ] |
| | } |
| |
|
| |
|
| | class Defaults: |
| | delimiter = '\x1e' |
| | ip_address = f'13.{random.randint(104, 107)}.{random.randint(0, 255)}.{random.randint(0, 255)}' |
| |
|
| | allowedMessageTypes = [ |
| | 'Chat', |
| | 'Disengaged', |
| | 'AdsQuery', |
| | 'SemanticSerp', |
| | 'GenerateContentQuery', |
| | 'SearchQuery', |
| | 'ActionRequest', |
| | 'Context', |
| | 'Progress', |
| | 'AdsQuery', |
| | 'SemanticSerp' |
| | ] |
| |
|
| | sliceIds = [ |
| |
|
| | |
| | |
| | |
| |
|
| | 'winmuid3tf', |
| | 'osbsdusgreccf', |
| | 'ttstmout', |
| | 'crchatrev', |
| | 'winlongmsgtf', |
| | 'ctrlworkpay', |
| | 'norespwtf', |
| | 'tempcacheread', |
| | 'temptacache', |
| | '505scss0', |
| | '508jbcars0', |
| | '515enbotdets0', |
| | '5082tsports', |
| | '515vaoprvs', |
| | '424dagslnv1s0', |
| | 'kcimgattcf', |
| | '427startpms0' |
| | ] |
| |
|
| | location = { |
| | 'locale': 'en-US', |
| | 'market': 'en-US', |
| | 'region': 'US', |
| | 'locationHints': [ |
| | { |
| | 'country': 'United States', |
| | 'state': 'California', |
| | 'city': 'Los Angeles', |
| | 'timezoneoffset': 8, |
| | 'countryConfidence': 8, |
| | 'Center': { |
| | 'Latitude': 34.0536909, |
| | 'Longitude': -118.242766 |
| | }, |
| | 'RegionType': 2, |
| | 'SourceType': 1 |
| | } |
| | ], |
| | } |
| |
|
| |
|
| | def _format(msg: dict) -> str: |
| | return json.dumps(msg, ensure_ascii=False) + Defaults.delimiter |
| |
|
| |
|
| | async def create_conversation(): |
| | for _ in range(5): |
| | create = requests.get('https://www.bing.com/turing/conversation/create', |
| | headers={ |
| | 'authority': 'edgeservices.bing.com', |
| | 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', |
| | 'accept-language': 'en-US,en;q=0.9', |
| | 'cache-control': 'max-age=0', |
| | 'sec-ch-ua': '"Chromium";v="110", "Not A(Brand";v="24", "Microsoft Edge";v="110"', |
| | 'sec-ch-ua-arch': '"x86"', |
| | 'sec-ch-ua-bitness': '"64"', |
| | 'sec-ch-ua-full-version': '"110.0.1587.69"', |
| | 'sec-ch-ua-full-version-list': '"Chromium";v="110.0.5481.192", "Not A(Brand";v="24.0.0.0", "Microsoft Edge";v="110.0.1587.69"', |
| | 'sec-ch-ua-mobile': '?0', |
| | 'sec-ch-ua-model': '""', |
| | 'sec-ch-ua-platform': '"Windows"', |
| | 'sec-ch-ua-platform-version': '"15.0.0"', |
| | 'sec-fetch-dest': 'document', |
| | 'sec-fetch-mode': 'navigate', |
| | 'sec-fetch-site': 'none', |
| | 'sec-fetch-user': '?1', |
| | 'upgrade-insecure-requests': '1', |
| | 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36 Edg/110.0.1587.69', |
| | 'x-edge-shopping-flag': '1', |
| | 'x-forwarded-for': Defaults.ip_address |
| | }) |
| |
|
| | conversationId = create.json().get('conversationId') |
| | clientId = create.json().get('clientId') |
| | conversationSignature = create.json().get('conversationSignature') |
| |
|
| | if not conversationId or not clientId or not conversationSignature and _ == 4: |
| | raise Exception('Failed to create conversation.') |
| |
|
| | return conversationId, clientId, conversationSignature |
| |
|
| |
|
| | async def stream_generate(prompt: str, mode: optionsSets.optionSet = optionsSets.jailbreak, context: bool or str = False): |
| | timeout = aiohttp.ClientTimeout(total=900) |
| | session = aiohttp.ClientSession(timeout=timeout) |
| |
|
| | conversationId, clientId, conversationSignature = await create_conversation() |
| |
|
| | wss = await session.ws_connect('wss://sydney.bing.com/sydney/ChatHub', ssl=ssl_context, autoping=False, |
| | headers={ |
| | 'accept': 'application/json', |
| | 'accept-language': 'en-US,en;q=0.9', |
| | 'content-type': 'application/json', |
| | 'sec-ch-ua': '"Not_A Brand";v="99", "Microsoft Edge";v="110", "Chromium";v="110"', |
| | 'sec-ch-ua-arch': '"x86"', |
| | 'sec-ch-ua-bitness': '"64"', |
| | 'sec-ch-ua-full-version': '"109.0.1518.78"', |
| | 'sec-ch-ua-full-version-list': '"Chromium";v="110.0.5481.192", "Not A(Brand";v="24.0.0.0", "Microsoft Edge";v="110.0.1587.69"', |
| | 'sec-ch-ua-mobile': '?0', |
| | 'sec-ch-ua-model': '', |
| | 'sec-ch-ua-platform': '"Windows"', |
| | 'sec-ch-ua-platform-version': '"15.0.0"', |
| | 'sec-fetch-dest': 'empty', |
| | 'sec-fetch-mode': 'cors', |
| | 'sec-fetch-site': 'same-origin', |
| | 'x-ms-client-request-id': str(uuid.uuid4()), |
| | 'x-ms-useragent': 'azsdk-js-api-client-factory/1.0.0-beta.1 core-rest-pipeline/1.10.0 OS/Win32', |
| | 'Referer': 'https://www.bing.com/search?q=Bing+AI&showconv=1&FORM=hpcodx', |
| | 'Referrer-Policy': 'origin-when-cross-origin', |
| | 'x-forwarded-for': Defaults.ip_address |
| | }) |
| |
|
| | await wss.send_str(_format({'protocol': 'json', 'version': 1})) |
| | await wss.receive(timeout=900) |
| |
|
| | struct = { |
| | 'arguments': [ |
| | { |
| | **mode, |
| | 'source': 'cib', |
| | 'allowedMessageTypes': Defaults.allowedMessageTypes, |
| | 'sliceIds': Defaults.sliceIds, |
| | 'traceId': os.urandom(16).hex(), |
| | 'isStartOfSession': True, |
| | 'message': Defaults.location | { |
| | 'author': 'user', |
| | 'inputMethod': 'Keyboard', |
| | 'text': prompt, |
| | 'messageType': 'Chat' |
| | }, |
| | 'conversationSignature': conversationSignature, |
| | 'participant': { |
| | 'id': clientId |
| | }, |
| | 'conversationId': conversationId |
| | } |
| | ], |
| | 'invocationId': '0', |
| | 'target': 'chat', |
| | 'type': 4 |
| | } |
| |
|
| | if context: |
| | struct['arguments'][0]['previousMessages'] = [ |
| | { |
| | "author": "user", |
| | "description": context, |
| | "contextType": "WebPage", |
| | "messageType": "Context", |
| | "messageId": "discover-web--page-ping-mriduna-----" |
| | } |
| | ] |
| |
|
| | await wss.send_str(_format(struct)) |
| |
|
| | final = False |
| | draw = False |
| | resp_txt = '' |
| | result_text = '' |
| | resp_txt_no_link = '' |
| | cache_text = '' |
| |
|
| | while not final: |
| | msg = await wss.receive(timeout=900) |
| | objects = msg.data.split(Defaults.delimiter) |
| |
|
| | for obj in objects: |
| | if obj is None or not obj: |
| | continue |
| |
|
| | response = json.loads(obj) |
| | if response.get('type') == 1 and response['arguments'][0].get('messages',): |
| | if not draw: |
| | if (response['arguments'][0]['messages'][0]['contentOrigin'] != 'Apology') and not draw: |
| | resp_txt = result_text + \ |
| | response['arguments'][0]['messages'][0]['adaptiveCards'][0]['body'][0].get( |
| | 'text', '') |
| | resp_txt_no_link = result_text + \ |
| | response['arguments'][0]['messages'][0].get( |
| | 'text', '') |
| |
|
| | if response['arguments'][0]['messages'][0].get('messageType',): |
| | resp_txt = ( |
| | resp_txt |
| | + response['arguments'][0]['messages'][0]['adaptiveCards'][0]['body'][0]['inlines'][0].get('text') |
| | + '\n' |
| | ) |
| | result_text = ( |
| | result_text |
| | + response['arguments'][0]['messages'][0]['adaptiveCards'][0]['body'][0]['inlines'][0].get('text') |
| | + '\n' |
| | ) |
| |
|
| | if cache_text.endswith(' '): |
| | final = True |
| | if wss and not wss.closed: |
| | await wss.close() |
| | if session and not session.closed: |
| | await session.close() |
| | |
| | yield (resp_txt.replace(cache_text, '')) |
| | cache_text = resp_txt |
| |
|
| | elif response.get('type') == 2: |
| | if response['item']['result'].get('error'): |
| | if wss and not wss.closed: |
| | await wss.close() |
| | if session and not session.closed: |
| | await session.close() |
| |
|
| | raise Exception( |
| | f"{response['item']['result']['value']}: {response['item']['result']['message']}") |
| |
|
| | if draw: |
| | cache = response['item']['messages'][1]['adaptiveCards'][0]['body'][0]['text'] |
| | response['item']['messages'][1]['adaptiveCards'][0]['body'][0]['text'] = ( |
| | cache + resp_txt) |
| |
|
| | if (response['item']['messages'][-1]['contentOrigin'] == 'Apology' and resp_txt): |
| | response['item']['messages'][-1]['text'] = resp_txt_no_link |
| | response['item']['messages'][-1]['adaptiveCards'][0]['body'][0]['text'] = resp_txt |
| |
|
| | |
| |
|
| | final = True |
| | if wss and not wss.closed: |
| | await wss.close() |
| | if session and not session.closed: |
| | await session.close() |
| |
|
| |
|
| | def run(generator): |
| | loop = asyncio.get_event_loop() |
| | gen = generator.__aiter__() |
| |
|
| | while True: |
| | try: |
| | next_val = loop.run_until_complete(gen.__anext__()) |
| | yield next_val |
| |
|
| | except StopAsyncIteration: |
| | break |
| |
|
| | |
| |
|
| |
|
| | def convert(messages): |
| | context = "" |
| |
|
| | for message in messages: |
| | context += "[%s](#message)\n%s\n\n" % (message['role'], |
| | message['content']) |
| |
|
| | return context |
| |
|
| |
|
| | def _create_completion(model: str, messages: list, stream: bool, **kwargs): |
| | if len(messages) < 2: |
| | prompt = messages[0]['content'] |
| | context = False |
| |
|
| | else: |
| | prompt = messages[-1]['content'] |
| | context = convert(messages[:-1]) |
| |
|
| | response = run(stream_generate(prompt, optionsSets.jailbreak, context)) |
| | for token in response: |
| | yield (token) |
| |
|
| | |
| |
|
| |
|
| | params = f'g4f.Providers.{os.path.basename(__file__)[:-3]} supports: ' + \ |
| | '(%s)' % ', '.join( |
| | [f"{name}: {get_type_hints(_create_completion)[name].__name__}" for name in _create_completion.__code__.co_varnames[:_create_completion.__code__.co_argcount]]) |