Spaces:
Runtime error
Runtime error
| from fastapi.responses import JSONResponse, StreamingResponse | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from playwright.sync_api import sync_playwright | |
| from bs4 import BeautifulSoup | |
| from fastapi import FastAPI | |
| from random import choice | |
| from io import BytesIO | |
| from time import sleep | |
| from PIL import Image | |
| import requests | |
| #-------------------------------------------------- << RUN >> --------------------------------------------------# | |
| # uvicorn app:app | |
| #-------------------------------------------------- << VARIABLES >> --------------------------------------------------# | |
| #-------------------------------------------------- << FUNCTIONS >> --------------------------------------------------# | |
| def get_email(): | |
| try: | |
| url = 'https://temp-mail107.p.rapidapi.com/domains' | |
| headers = { | |
| 'x-rapidapi-key': 'd3349ed2b8msh2a16ceeb6b2a192p173237jsn4ff2c7c6467e', | |
| 'x-rapidapi-host': 'temp-mail107.p.rapidapi.com' | |
| } | |
| response = requests.get(url, headers = headers) | |
| data = response.json() | |
| domains = data['data'] | |
| domain = choice(domains) | |
| url = 'https://temp-mail107.p.rapidapi.com/create-account' | |
| payload = { 'domain': domain } | |
| headers = { | |
| 'x-rapidapi-key': 'd3349ed2b8msh2a16ceeb6b2a192p173237jsn4ff2c7c6467e', | |
| 'x-rapidapi-host': 'temp-mail107.p.rapidapi.com', | |
| 'Content-Type': 'application/json' | |
| } | |
| response = requests.post(url, json = payload, headers = headers) | |
| data = response.json() | |
| email = data['data']['address'] | |
| return email | |
| except Exception as error: | |
| print(f'error : {error}') | |
| def get_verification_url(email): | |
| try: | |
| url = "https://temp-mail107.p.rapidapi.com/check-message" | |
| querystring = {"address" : email} | |
| headers = { | |
| "x-rapidapi-key": "d3349ed2b8msh2a16ceeb6b2a192p173237jsn4ff2c7c6467e", | |
| "x-rapidapi-host": "temp-mail107.p.rapidapi.com" | |
| } | |
| response = requests.get(url, headers = headers, params = querystring) | |
| data = response.json() | |
| while data['data'] == None: | |
| sleep(2) | |
| response = requests.get(url, headers = headers, params = querystring) | |
| data = response.json() | |
| html = data['data']['html'] | |
| soup = BeautifulSoup(html, 'html.parser') | |
| verification_url = soup.find('a').get('href') | |
| return verification_url | |
| except Exception as error: | |
| print(f'error : {error}') | |
| class Bot: | |
| def __init__(self): | |
| self.playwright = None | |
| self.browser = None | |
| self.page = None | |
| def get_captcha(self): | |
| if self.page: | |
| self.page.close() | |
| if self.browser: | |
| self.browser.close() | |
| if self.playwright: | |
| self.playwright.stop() | |
| self.playwright = sync_playwright().start() | |
| self.browser = self.playwright.chromium.launch(headless = True, args = ['--start-maximized']) | |
| self.page = self.browser.new_page(no_viewport = True) | |
| self.page.goto('https://www.thinkdiffusion.com/api/auth/signup') | |
| self.page.wait_for_timeout(1000) | |
| captcha_element = self.page.query_selector('body > div.c526f8a4f.c8a792eff > main > section > div > div > div > form > div.c8b219657.c9a020846 > div.c8b219657.c1f2d1695 > div > div.cc9af1055.cddcd8014 > div.c4026b06e.cb1295f34 > img') | |
| captcha_base64 = captcha_element.screenshot(type = 'png') | |
| captcha_image = Image.open(BytesIO(captcha_base64)) | |
| captcha_buffer = BytesIO() | |
| captcha_image.save(captcha_buffer, format = 'png') | |
| captcha_buffer.seek(0) | |
| return captcha_buffer | |
| def generate_account(self, captcha_solution): | |
| if self.playwright and self.browser and self.page: | |
| if captcha_solution.strip() != '': | |
| try: | |
| email = get_email() | |
| password = 'Qwerty12321*' | |
| self.page.locator('#email').press_sequentially(email) | |
| self.page.locator('#password').press_sequentially(password) | |
| self.page.locator('#captcha').press_sequentially(captcha_solution) | |
| self.page.locator('body > div.c526f8a4f.c8a792eff > main > section > div > div > div > form > div.c2f62a829 > button').click() | |
| verification_url = get_verification_url(email) | |
| self.page.goto(verification_url) | |
| self.page.wait_for_timeout(3000) | |
| cookies = self.page.context.cookies() | |
| csrf_token = [cookie['value'] for cookie in cookies if cookie['name'] == 'csrf_token'][0] | |
| app_session = [cookie['value'] for cookie in cookies if cookie['name'] == 'appSession'][0] | |
| self.page.close() | |
| self.browser.close() | |
| self.playwright.stop() | |
| self.page = None | |
| self.browser = None | |
| self.playwright = None | |
| headers = { | |
| 'user-agent' : 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36' | |
| } | |
| data = { | |
| 'civitaiModelVersionId' : None, | |
| 'presetSlug' : 'turbo-comfy-exp', | |
| 'runningTime' : 1200, | |
| 'teamId' : None | |
| } | |
| cookies = { | |
| 'csrf_token' : csrf_token, | |
| 'appSession' : app_session | |
| } | |
| url = 'https://www.thinkdiffusion.com/api/machines/create' | |
| response = requests.post(url = url, headers = headers, cookies = cookies, data = data) | |
| data = response.json() | |
| subdomain = data['subdomain'] | |
| url = f'https://{subdomain}.thinkdiffusion.xyz' | |
| return {'status' : 'success', 'url' : url} | |
| except Exception as exception: | |
| self.page.close() | |
| self.browser.close() | |
| self.playwright.stop() | |
| self.page = None | |
| self.browser = None | |
| self.playwright = None | |
| return {'status' : 'error', 'message' : str(exception)} | |
| else: | |
| return {'status' : 'error', 'message' : 'captcha solution cannot be empty.'} | |
| else: | |
| return {'status' : 'error', 'message' : 'no browser available'} | |
| #-------------------------------------------------- << APP >> --------------------------------------------------# | |
| app = FastAPI() | |
| app.add_middleware(CORSMiddleware, allow_origins = ['*'], allow_credentials = True, allow_methods = ['*'], allow_headers = ['*']) | |
| #-------------------------------------------------- << BOT >> --------------------------------------------------# | |
| bot = Bot() | |
| #-------------------------------------------------- << ROUTES >> --------------------------------------------------# | |
| def main_route(): | |
| try: | |
| return JSONResponse(status_code = 200, content = {'status' : 'success'}) | |
| except Exception as exception: | |
| return JSONResponse(status_code = 500, content = {'status' : 'error', 'message' : exception}) | |
| def get_captcha_route(): | |
| try: | |
| captcha_buffer = bot.get_captcha() | |
| return StreamingResponse(status_code = 200, content = captcha_buffer, media_type = 'image/png') | |
| except Exception as exception: | |
| return JSONResponse(status_code = 500, content = {'status' : 'error', 'message' : exception}) | |
| def generate_account_route(captcha_solution: str): | |
| try: | |
| response = bot.generate_account(captcha_solution) | |
| if response['status'] == 'success': | |
| return JSONResponse(status_code = 200, content = {'status' : response['status'], 'url' : response['url']}) | |
| if response['status'] == 'error': | |
| return JSONResponse(status_code = 500, content = {'status' : response['status'], 'message' : response['message']}) | |
| except Exception as exception: | |
| return JSONResponse(status_code = 500, content = {'status' : 'error', 'message' : exception}) |