| from __future__ import annotations | |
| import asyncio | |
| import time | |
| import json | |
| from aiohttp import ClientSession, BaseConnector | |
| from urllib.parse import quote | |
| from typing import List, Dict | |
| try: | |
| from bs4 import BeautifulSoup | |
| has_requirements = True | |
| except ImportError: | |
| has_requirements = False | |
| from ...helper import get_connector | |
| from ....errors import MissingRequirementsError, RateLimitError | |
| BING_URL = "https://www.bing.com" | |
| TIMEOUT_LOGIN = 1200 | |
| TIMEOUT_IMAGE_CREATION = 300 | |
| ERRORS = [ | |
| "this prompt is being reviewed", | |
| "this prompt has been blocked", | |
| "we're working hard to offer image creator in more languages", | |
| "we can't create your images right now" | |
| ] | |
| BAD_IMAGES = [ | |
| "https://r.bing.com/rp/in-2zU3AJUdkgFe7ZKv19yPBHVs.png", | |
| "https://r.bing.com/rp/TX9QuO3WzcCJz1uaaSwQAz39Kb0.jpg", | |
| ] | |
| def create_session(cookies: Dict[str, str], proxy: str = None, connector: BaseConnector = None) -> ClientSession: | |
| """ | |
| Creates a new client session with specified cookies and headers. | |
| Args: | |
| cookies (Dict[str, str]): Cookies to be used for the session. | |
| Returns: | |
| ClientSession: The created client session. | |
| """ | |
| headers = { | |
| "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", | |
| "accept-encoding": "gzip, deflate, br", | |
| "accept-language": "en-US,en;q=0.9,zh-CN;q=0.8,zh-TW;q=0.7,zh;q=0.6", | |
| "content-type": "application/x-www-form-urlencoded", | |
| "referrer-policy": "origin-when-cross-origin", | |
| "referrer": "https://www.bing.com/images/create/", | |
| "origin": "https://www.bing.com", | |
| "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36 Edg/111.0.1661.54", | |
| "sec-ch-ua": "\"Microsoft Edge\";v=\"111\", \"Not(A:Brand\";v=\"8\", \"Chromium\";v=\"111\"", | |
| "sec-ch-ua-mobile": "?0", | |
| "sec-fetch-dest": "document", | |
| "sec-fetch-mode": "navigate", | |
| "sec-fetch-site": "same-origin", | |
| "sec-fetch-user": "?1", | |
| "upgrade-insecure-requests": "1", | |
| } | |
| if cookies: | |
| headers["Cookie"] = "; ".join(f"{k}={v}" for k, v in cookies.items()) | |
| return ClientSession(headers=headers, connector=get_connector(connector, proxy)) | |
| async def create_images(session: ClientSession, prompt: str, timeout: int = TIMEOUT_IMAGE_CREATION) -> List[str]: | |
| """ | |
| Creates images based on a given prompt using Bing's service. | |
| Args: | |
| session (ClientSession): Active client session. | |
| prompt (str): Prompt to generate images. | |
| proxy (str, optional): Proxy configuration. | |
| timeout (int): Timeout for the request. | |
| Returns: | |
| List[str]: A list of URLs to the created images. | |
| Raises: | |
| RuntimeError: If image creation fails or times out. | |
| """ | |
| if not has_requirements: | |
| raise MissingRequirementsError('Install "beautifulsoup4" package') | |
| url_encoded_prompt = quote(prompt) | |
| payload = f"q={url_encoded_prompt}&rt=4&FORM=GENCRE" | |
| url = f"{BING_URL}/images/create?q={url_encoded_prompt}&rt=4&FORM=GENCRE" | |
| async with session.post(url, allow_redirects=False, data=payload, timeout=timeout) as response: | |
| response.raise_for_status() | |
| text = (await response.text()).lower() | |
| if "0 coins available" in text: | |
| raise RateLimitError("No coins left. Log in with a different account or wait a while") | |
| for error in ERRORS: | |
| if error in text: | |
| raise RuntimeError(f"Create images failed: {error}") | |
| if response.status != 302: | |
| url = f"{BING_URL}/images/create?q={url_encoded_prompt}&rt=3&FORM=GENCRE" | |
| async with session.post(url, allow_redirects=False, timeout=timeout) as response: | |
| if response.status != 302: | |
| raise RuntimeError(f"Create images failed. Code: {response.status}") | |
| redirect_url = response.headers["Location"].replace("&nfy=1", "") | |
| redirect_url = f"{BING_URL}{redirect_url}" | |
| request_id = redirect_url.split("id=")[-1] | |
| async with session.get(redirect_url) as response: | |
| response.raise_for_status() | |
| polling_url = f"{BING_URL}/images/create/async/results/{request_id}?q={url_encoded_prompt}" | |
| start_time = time.time() | |
| while True: | |
| if time.time() - start_time > timeout: | |
| raise RuntimeError(f"Timeout error after {timeout} sec") | |
| async with session.get(polling_url) as response: | |
| if response.status != 200: | |
| raise RuntimeError(f"Polling images faild. Code: {response.status}") | |
| text = await response.text() | |
| if not text or "GenerativeImagesStatusPage" in text: | |
| await asyncio.sleep(1) | |
| else: | |
| break | |
| error = None | |
| try: | |
| error = json.loads(text).get("errorMessage") | |
| except: | |
| pass | |
| if error == "Pending": | |
| raise RuntimeError("Prompt is been blocked") | |
| elif error: | |
| raise RuntimeError(error) | |
| return read_images(text) | |
| def read_images(html_content: str) -> List[str]: | |
| """ | |
| Extracts image URLs from the HTML content. | |
| Args: | |
| html_content (str): HTML content containing image URLs. | |
| Returns: | |
| List[str]: A list of image URLs. | |
| """ | |
| soup = BeautifulSoup(html_content, "html.parser") | |
| tags = soup.find_all("img", class_="mimg") | |
| if not tags: | |
| tags = soup.find_all("img", class_="gir_mmimg") | |
| images = [img["src"].split("?w=")[0] for img in tags] | |
| if any(im in BAD_IMAGES for im in images): | |
| raise RuntimeError("Bad images found") | |
| if not images: | |
| raise RuntimeError("No images found") | |
| return images | |