Spaces:
Sleeping
Sleeping
| import os | |
| import gradio as gr | |
| import shutil | |
| import numpy as np | |
| from PIL import Image | |
| import asyncio | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from gradio_client import Client | |
| from fastapi import FastAPI, Request | |
| from fastapi.staticfiles import StaticFiles | |
| from starlette.responses import Response | |
| import uvicorn | |
| import requests | |
| from urllib.parse import quote | |
| from unicodedata import normalize | |
| import json | |
| import time | |
| from html.parser import HTMLParser | |
| try: | |
| import cv2 | |
| except ImportError: | |
| os.system('pip install opencv-python') | |
| import cv2 | |
| root = os.path.dirname(os.path.abspath(__file__)) | |
| textures_folder = os.path.join(root, 'textures') | |
| os.makedirs(textures_folder, exist_ok=True) | |
| valid_extensions = ['.jpeg', '.jpg', '.png'] | |
| textures_repo = "https://huggingface.co/datasets/2ch/textures/resolve/main/" | |
| textures_for_download = [ | |
| f"{textures_repo}гауссовский_шум_и_мелкое_зерно.png?download=true", | |
| f"{textures_repo}грязная_матрица.png?download=true", | |
| f"{textures_repo}для_ночных_и_тёмных_кадров_сильный_шум_и_пыль.png?download=true", | |
| f"{textures_repo}для_ночных_и_тёмных_кадров_царапины_шум_пыль_дымка.png?download=true", | |
| f"{textures_repo}для_светлых_и_солнечных_ярких_фото_мелкое_констрастное_зерно.png?download=true", | |
| f"{textures_repo}зернистость_плёнки.png?download=true", | |
| f"{textures_repo}зернистость_плёнки_с_грязью.png?download=true", | |
| f"{textures_repo}испорченная_ворсом_плёнка.png?download=true", | |
| f"{textures_repo}мелкий_цветной_шум.png?download=true", | |
| f"{textures_repo}мелкое_контрастное_зерно_и_средний_цветвой_шум.png?download=true", | |
| f"{textures_repo}очень_мелкое_зерно.png?download=true", | |
| f"{textures_repo}пыльная_плёнка.png?download=true", | |
| f"{textures_repo}сильный_цветовой_шум_для_ночных_фото.png?download=true", | |
| f"{textures_repo}слабый_естественный_шум_матрицы_смартфона.png?download=true", | |
| f"{textures_repo}среднее_зерно.png?download=true", | |
| f"{textures_repo}среднее_монохромное_зерно_пыль_и_ворсинки.png?download=true", | |
| f"{textures_repo}средний_цветной_шум.png?download=true", | |
| f"{textures_repo}старая_матрица.png?download=true", | |
| f"{textures_repo}старая_потёртая_плёнка.png?download=true", | |
| f"{textures_repo}цветной_шум_матрицы.png?download=true", | |
| f"{textures_repo}цветной_шум_на_плёнке.png?download=true", | |
| f"{textures_repo}шумная_матрица.png?download=true", | |
| ] | |
| def dl_textures(texture_url: str) -> None: | |
| texture_for_download = quote(normalize('NFD', texture_url), safe='/?:=') | |
| filename = texture_url.split('/')[-1].split('?')[0] | |
| file_path = os.path.join(textures_folder, filename) | |
| response = requests.get(texture_for_download, stream=True) | |
| response.raise_for_status() | |
| with open(file_path, 'wb') as f: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| def create_texture_preview(texture_folder: str, output_folder: str, size: tuple[int] = (246, 246)) -> None: | |
| os.makedirs(output_folder, exist_ok=True) | |
| for texture in os.listdir(texture_folder): | |
| img_path = os.path.join(texture_folder, texture) | |
| img = cv2.imread(img_path, cv2.IMREAD_UNCHANGED) | |
| start_x = np.random.randint(0, img.shape[1] - size[1]) | |
| start_y = np.random.randint(0, img.shape[0] - size[0]) | |
| img = img[start_y:start_y + size[0], start_x:start_x + size[1]] | |
| cv2.imwrite(os.path.join(output_folder, texture), img) | |
| def prepare_textures(texture_folder: str, output_folder: str) -> None: | |
| with ThreadPoolExecutor(max_workers=len(textures_for_download)) as executor: | |
| futures = [executor.submit(dl_textures, texture_for_download) for texture_for_download in | |
| textures_for_download] | |
| for future in as_completed(futures): | |
| future.result() | |
| create_texture_preview(texture_folder, output_folder, size=(246, 246)) | |
| preview_css = "" | |
| prepare_textures(textures_folder, os.path.join(root, 'preview')) | |
| for i, texture in enumerate(os.listdir(textures_folder), start=1): | |
| if os.path.splitext(texture)[1].lower() in valid_extensions: | |
| preview_css += f"""[data-testid="{i:02d}-radio-label"]::before {{ | |
| background-color: transparent !important; | |
| background-image: url("./preview/{texture}") !important; | |
| }}\n""" | |
| radio_css = """ | |
| html, | |
| body { | |
| background: var(--body-background-fill); | |
| } | |
| .gradio-container { | |
| max-width: 1396px !important; | |
| } | |
| #textures label { | |
| position: relative; | |
| width: 256px; | |
| height: 256px; | |
| display: flex; | |
| flex-direction: row; | |
| align-items: flex-end; | |
| background: none !important; | |
| padding: 4px !important; | |
| transition: .3s; | |
| } | |
| #textures label::before { | |
| width: 246px; | |
| height: 246px; | |
| border-radius: 8px; | |
| display: block; | |
| content: ""; | |
| transition: .3s; | |
| background: red; | |
| position: relative; | |
| top: 0px; | |
| } | |
| #textures label:hover::before, | |
| #textures label:active::before, | |
| #textures label.selected::before { | |
| mix-blend-mode: soft-light; | |
| transition: .3s | |
| } | |
| #textures span:not([data-testid="block-info"]), | |
| #textures input { | |
| position: absolute; | |
| z-index: 999; | |
| } | |
| #textures input { | |
| position: absolute; | |
| z-index: 999; | |
| bottom: 9px; | |
| left: 9px; | |
| } | |
| #textures span:not([data-testid="block-info"]) { | |
| left: 21px; | |
| padding: 2px 8px; | |
| background: rgba(0, 0, 0, .57); | |
| backdrop-filter: blur(3px) | |
| } | |
| #textures { | |
| background-color: hsla(0, 0%, 50%, 1); | |
| } | |
| .built-with, | |
| .show-api, | |
| footer .svelte-mpyp5e { | |
| display: none !important; | |
| } | |
| footer:after { | |
| content: "ну пролапс, ну и что?"; | |
| } | |
| #zoom { | |
| position: absolute; | |
| top: 50%; | |
| left: 50%; | |
| width: 250px; | |
| height: 250px; | |
| background-repeat: no-repeat; | |
| box-shadow: 0px 0px 10px 5px rgba(0, 0, 0, .2); | |
| border-radius: 50%; | |
| cursor: none; | |
| pointer-events: none; | |
| z-index: 999; | |
| opacity: 0; | |
| transform: scale(0); | |
| transition: opacity 500ms, transform 500ms; | |
| } | |
| #textures_tab .image-button { | |
| cursor: none; | |
| } | |
| #textured_result-download-link, | |
| #restored_image-download-link, | |
| #upscaled_image-download-link { | |
| position: absolute; | |
| z-index: 9999; | |
| padding: 2px 4px; | |
| margin: 0 7px; | |
| background: black; | |
| bottom: 0; | |
| right: 0; | |
| font-size: 20px; | |
| transition: 300ms | |
| } | |
| #textured_result-download-link:hover, | |
| #restored_image-download-link:hover, | |
| #upscaled_image-download-link:hover { | |
| color: #99f7a8 | |
| } | |
| #restored_images.disabled, | |
| #upscaled_images.disabled { | |
| height: 0px !important; | |
| opacity: 0; | |
| transition: 300ms | |
| } | |
| #restored_images.enabled, | |
| #upscaled_images.enabled { | |
| transition: 300ms | |
| } | |
| """ + preview_css | |
| custom_js = """ | |
| const PageLoadObserver = new MutationObserver((mutationsList, observer) => { | |
| for (let mutation of mutationsList) { | |
| if (mutation.type === 'childList') { | |
| const tabsDiv = document.querySelector('div.tab-nav'); | |
| if (tabsDiv) { | |
| observer.disconnect(); | |
| document.querySelector('#textures_tab-button').addEventListener('click', () => { | |
| setTimeout(() => { | |
| let labels = document.querySelectorAll('label[data-testid]'); | |
| labels.forEach((label) => { | |
| let input = label.querySelector('input[type="radio"]'); | |
| if (input) { | |
| let title = input.value.split('.')[0].replace(/_/g, ' '); | |
| label.title = title; | |
| } | |
| }); | |
| document.querySelector("label[data-testid='05-radio-label']").click() | |
| }, 150); | |
| }) | |
| function checkImagesAndSetClass(galleryElement) { | |
| const firstDiv = galleryElement.querySelector('div:first-child'); | |
| const hasChildElements = firstDiv && firstDiv.children.length > 0; | |
| const hasImages = galleryElement.querySelectorAll('img').length > 0; | |
| if (hasChildElements || hasImages) { | |
| galleryElement.classList.add('enabled'); | |
| galleryElement.classList.remove('disabled'); | |
| } else { | |
| galleryElement.classList.add('disabled'); | |
| galleryElement.classList.remove('enabled'); | |
| } | |
| } | |
| function setupGalleryObserver(galleryId) { | |
| let gallery = document.getElementById(galleryId); | |
| const observer = new MutationObserver(() => { | |
| checkImagesAndSetClass(gallery); | |
| }); | |
| observer.observe(gallery, { childList: true, subtree: true }); | |
| checkImagesAndSetClass(gallery); | |
| } | |
| // setupGalleryObserver('restored_images'); | |
| // setupGalleryObserver('upscaled_images'); | |
| function magnify(imgID, zoom) { | |
| var img, glass, w, h, bw; | |
| img = document.querySelector(imgID); | |
| glass = document.createElement("DIV"); | |
| glass.setAttribute("id", "zoom"); | |
| img.parentElement.insertBefore(glass, img); | |
| glass.style.backgroundImage = "url('" + img.src + "')"; | |
| glass.style.backgroundRepeat = "no-repeat"; | |
| glass.style.backgroundSize = (img.width * zoom) + "px " + (img.height * zoom) + "px"; | |
| bw = 3; | |
| w = glass.offsetWidth / 2; | |
| h = glass.offsetHeight / 2; | |
| glass.addEventListener("mousemove", moveMagnifier); | |
| img.addEventListener("mousemove", moveMagnifier); | |
| glass.addEventListener("touchmove", moveMagnifier); | |
| img.addEventListener("touchmove", moveMagnifier); | |
| function moveMagnifier(e) { | |
| var pos, x, y; | |
| e.preventDefault(); | |
| pos = getCursorPos(e); | |
| x = pos.x; | |
| y = pos.y; | |
| if (x > img.width - (w / zoom)) { x = img.width - (w / zoom); } | |
| if (x < w / zoom) { x = w / zoom; } | |
| if (y > img.height - (h / zoom)) { y = img.height - (h / zoom); } | |
| if (y < h / zoom) { y = h / zoom; } | |
| glass.style.left = (x - w) + "px"; | |
| glass.style.top = (y - h) + "px"; | |
| glass.style.backgroundPosition = "-" + ((x * zoom) - w + bw) + "px -" + ((y * zoom) - h) + "px"; | |
| glass.style.backgroundImage = "url('" + img.src + "')"; | |
| } | |
| function getCursorPos(e) { | |
| var a, x = 0, y = 0; | |
| e = e || window.event; | |
| a = img.getBoundingClientRect(); | |
| x = e.pageX - a.left; | |
| y = e.pageY - a.top; | |
| x = x - window.scrollX; | |
| y = y - window.scrollY; | |
| return { x: x, y: y }; | |
| } | |
| img.addEventListener("mouseover", function () { | |
| glass.style.opacity = "1"; | |
| glass.style.transform = "scale(1)"; | |
| }); | |
| img.addEventListener("mouseout", function () { | |
| glass.style.opacity = "0"; | |
| glass.style.transform = "scale(0)"; | |
| }); | |
| } | |
| function setupDownloadLink(imgSelector, linkSelector, linkId, magnifyImage) { | |
| const imgElement = document.querySelector(imgSelector); | |
| if (imgElement && imgElement.src) { | |
| let downloadLink = document.querySelector(linkSelector); | |
| if (!downloadLink) { | |
| if (magnifyImage) { | |
| magnify(magnifyImage, 3); | |
| } | |
| downloadLink = document.createElement('a'); | |
| downloadLink.id = linkId; | |
| downloadLink.innerText = 'скачать'; | |
| imgElement.after(downloadLink); | |
| } | |
| downloadLink.href = imgElement.src; | |
| downloadLink.download = ''; | |
| } | |
| } | |
| const DownloadLinkObserverCallback = (mutationsList, observer, imgSelector, linkSelector, linkId, magnifyImage) => { | |
| setupDownloadLink(imgSelector, linkSelector, linkId, magnifyImage); | |
| }; | |
| const DownloadLinkObserverOptions = { childList: true, subtree: true, attributes: true, attributeFilter: ['src'] }; | |
| const ImageTexturedObserver = new MutationObserver((mutationsList, observer) => { | |
| DownloadLinkObserverCallback(mutationsList, observer, '#textured_result img[data-testid="detailed-image"]', '#textured_result-download-link', 'textured_result-download-link', "#textured_result .image-button img"); | |
| }); | |
| ImageTexturedObserver.observe(document, DownloadLinkObserverOptions); | |
| const ImageRestoredObserver = new MutationObserver((mutationsList, observer) => { | |
| DownloadLinkObserverCallback(mutationsList, observer, '#restored_images img[data-testid="detailed-image"]', '#restored_image-download-link', 'restored_image-download-link'); | |
| }); | |
| ImageRestoredObserver.observe(document, DownloadLinkObserverOptions); | |
| const ImageUpscaledObserver = new MutationObserver((mutationsList, observer) => { | |
| DownloadLinkObserverCallback(mutationsList, observer, '#upscaled_images img[data-testid="detailed-image"]', '#upscaled_image-download-link', 'upscaled_image-download-link'); | |
| }); | |
| ImageUpscaledObserver.observe(document, DownloadLinkObserverOptions); | |
| } | |
| } | |
| } | |
| }); | |
| PageLoadObserver.observe(document, { childList: true, subtree: true }); | |
| """ | |
| def extract_path_from_result(predict_answer: str | list[str] | tuple[str]) -> str: | |
| if isinstance(predict_answer, (tuple, list)): | |
| result = predict_answer[0] | |
| shutil.rmtree(os.path.dirname(predict_answer[1]), ignore_errors=True) | |
| else: | |
| result = predict_answer | |
| return result | |
| def restore_face_common(img_path: str, predict_answer: str, model: str) -> None: | |
| result = extract_path_from_result(predict_answer) | |
| if os.path.exists(result): | |
| if os.path.exists(img_path): | |
| os.unlink(img_path) | |
| new_file, new_extension = os.path.splitext(result) | |
| old_file, old_extension = os.path.splitext(img_path) | |
| old_filename = os.path.basename(old_file) | |
| new_location = os.path.join(os.path.dirname(img_path), f"{old_filename}_{model}{new_extension}") | |
| shutil.move(result, new_location) | |
| shutil.rmtree(os.path.dirname(result), ignore_errors=True) | |
| def restore_face_gfpgan(img_path: str) -> None: | |
| client = Client(src="https://xintao-gfpgan.hf.space/", verbose=False) | |
| result = client.predict(img_path, "v1.4", 4, api_name="/predict") | |
| restore_face_common(img_path, result, "gfpgan") | |
| def restore_face_codeformer(img_path: str) -> None: | |
| client = Client(src="https://sczhou-codeformer.hf.space/", verbose=False) | |
| result = client.predict(img_path, True, True, True, 2, 0, api_name="/predict") | |
| restore_face_common(img_path, result, "codeformer") | |
| async def restore_faces_one_image(img_path: str, func_list: list) -> bool: | |
| def run_func(func) -> bool: | |
| for _ in range(3): | |
| try: | |
| func(img_path) | |
| return True | |
| except Exception as e: | |
| print(f"ошибка в {func.__name__}: {e}") | |
| return False | |
| loop = asyncio.get_event_loop() | |
| with ThreadPoolExecutor(max_workers=len(func_list)) as executor: | |
| futures = [loop.run_in_executor(executor, run_func, func) for func in func_list] | |
| results = await asyncio.gather(*futures) | |
| return any(results) | |
| async def restore_faces_batch(input_images: list[str], func_list: list, batch_size: int = 3) -> bool: | |
| results = False | |
| try: | |
| batches = [input_images[i:i + batch_size] for i in range(0, len(input_images), batch_size)] | |
| for batch in batches: | |
| tasks = [restore_faces_one_image(img_path, func_list) for img_path in batch] | |
| results = await asyncio.gather(*tasks) | |
| return any(results) | |
| except Exception as error: | |
| print(error) | |
| return results | |
| def get_file_paths(input_path: str | list[str], extensions_list: list[str]) -> list[str]: | |
| files = [] | |
| def add_files_from_directory(directory): | |
| for file_name in os.listdir(directory): | |
| if os.path.splitext(file_name)[1] in extensions_list: | |
| files.append(os.path.abspath(os.path.join(directory, file_name))) | |
| if isinstance(input_path, list): | |
| for file_path in input_path: | |
| parent_directory = os.path.dirname(file_path) | |
| add_files_from_directory(parent_directory) | |
| else: | |
| add_files_from_directory(input_path) | |
| return files | |
| async def restore_upscale(files: tuple, restore_method: str) -> list[str]: | |
| file_paths = [file.name for file in files] | |
| if restore_method == 'codeformer': | |
| func_list = [restore_face_codeformer] | |
| elif restore_method == 'gfpgan': | |
| func_list = [restore_face_gfpgan] | |
| else: | |
| func_list = [restore_face_codeformer, restore_face_gfpgan] | |
| results = await restore_faces_batch(file_paths, func_list, batch_size=3) | |
| if results: | |
| file_paths = get_file_paths(file_paths, valid_extensions) | |
| return file_paths | |
| else: | |
| return ['https://iili.io/JzrxjDP.png'] | |
| def image_noise_softlight_layer_mix(img, texture, output: str = None, opacity: float = 0.7): | |
| if isinstance(img, Image.Image): | |
| img = np.array(img).astype(float) | |
| elif isinstance(img, np.ndarray): | |
| img = img.astype(float) | |
| if img.shape[2] == 3 and not isinstance(img, Image.Image): | |
| img = cv2.cvtColor(img.astype(np.uint8), cv2.COLOR_RGB2BGR).astype(float) | |
| overlay = cv2.imread(texture, cv2.IMREAD_UNCHANGED).astype(float) | |
| start_x = np.random.randint(0, overlay.shape[1] - img.shape[1]) | |
| start_y = np.random.randint(0, overlay.shape[0] - img.shape[0]) | |
| overlay = overlay[start_y:start_y + img.shape[0], start_x:start_x + img.shape[1]] | |
| if img.shape[2] == 3: | |
| img = cv2.cvtColor(img.astype(np.uint8), cv2.COLOR_RGB2RGBA).astype(float) | |
| if overlay.shape[2] == 3: | |
| overlay = cv2.cvtColor(overlay.astype(np.uint8), cv2.COLOR_RGB2RGBA).astype(float) | |
| overlay[..., 3] *= opacity | |
| img_in_norm = img / 255.0 | |
| img_layer_norm = overlay / 255.0 | |
| comp_alpha = np.minimum(img_in_norm[:, :, 3], img_layer_norm[:, :, 3]) * 1.0 | |
| new_alpha = img_in_norm[:, :, 3] + (1.0 - img_in_norm[:, :, 3]) * comp_alpha | |
| np.seterr(divide='ignore', invalid='ignore') | |
| ratio = comp_alpha / new_alpha | |
| ratio[ratio == np.NAN] = 0.0 | |
| comp = (1.0 - img_in_norm[:, :, :3]) * img_in_norm[:, :, :3] * img_layer_norm[:, :, :3] + img_in_norm[:, :, :3] * ( | |
| 1.0 - (1.0 - img_in_norm[:, :, :3]) * (1.0 - img_layer_norm[:, :, :3])) | |
| ratio_rs = np.reshape(np.repeat(ratio, 3), [comp.shape[0], comp.shape[1], comp.shape[2]]) | |
| img_out = comp * ratio_rs + img_in_norm[:, :, :3] * (1.0 - ratio_rs) | |
| img_out = np.nan_to_num(np.dstack((img_out, img_in_norm[:, :, 3]))) | |
| result = img_out * 255.0 | |
| rgb_image = cv2.cvtColor(result.astype(np.uint8), cv2.COLOR_BGR2RGB) | |
| image = Image.fromarray(rgb_image) | |
| return np.array(image) | |
| def apply_texture(input_image, textures_choice: str, opacity_slider: float): | |
| result = image_noise_softlight_layer_mix(input_image, os.path.join(textures_folder, textures_choice), opacity=opacity_slider) | |
| return [result] | |
| def temp_upload_file(file_path: str) -> str | None: | |
| servers = [ | |
| ('https://transfer.sh/', 'fileToUpload'), | |
| ('https://x0.at/', 'file'), | |
| ('https://tmpfiles.org/api/v1/upload', 'file'), | |
| ('https://uguu.se/upload.php', 'files[]') | |
| ] | |
| for i in range(3): | |
| for server, file_key in servers: | |
| try: | |
| with open(file_path, 'rb') as f: | |
| files = {file_key: f} | |
| response = requests.post(server, files=files) | |
| if response.status_code == 200: | |
| if server == 'https://transfer.sh/': | |
| return response.text.replace("https://transfer.sh/","https://transfer.sh/get/").replace("\n","") | |
| elif server == 'https://tmpfiles.org/api/v1/upload': | |
| response_json = response.json() | |
| if response_json['status'] == 'success': | |
| return response_json['data']['url'].replace("https://tmpfiles.org/", "https://tmpfiles.org/dl/") | |
| elif server == 'https://uguu.se/upload.php': | |
| response_json = response.json() | |
| if response_json['success']: | |
| return response_json['files'][0]['url'] | |
| else: | |
| return response.text | |
| except Exception as e: | |
| print(f'{server}: {e}') | |
| return None | |
| def upload_image(image_path: str) -> str | None: | |
| files = {'source': open(image_path, "rb")} | |
| data = {'key': '6d207e02198a847aa98d0a2a901485a5', 'action': 'upload', 'format': 'json'} | |
| response = requests.post('https://freeimage.host/api/1/upload', files=files, data=data) | |
| if response.json()["status_code"] == 200: | |
| return response.json()["image"]["url"] | |
| else: | |
| return temp_upload_file(image_path) | |
| def get_headers(url: str) -> dict: | |
| session = requests.Session() | |
| anon_auth = session.get(url) | |
| cookies = session.cookies.get_dict() | |
| return { | |
| 'content-type': 'application/json', | |
| 'cookie': f'csrftoken={cookies["csrftoken"]}; replicate_anonymous_id={cookies["replicate_anonymous_id"]};', | |
| 'origin': 'https://replicate.com', | |
| 'x-csrftoken': cookies['csrftoken'], | |
| 'authority': 'replicate.com', | |
| 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/jxl,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', | |
| 'accept-language': 'ru-RU,ru;q=0.9,en-US;q=0.8,en;q=0.7', | |
| 'cache-control': 'no-cache', | |
| 'dnt': '1', | |
| 'pragma': 'no-cache', | |
| 'referer': f'{url}?input=http', | |
| 'sec-ch-ua': '"Chromium";v="117", "Not;A=Brand";v="8"', | |
| 'sec-ch-ua-mobile': '?0', | |
| 'sec-ch-ua-platform': '"Windows"', | |
| 'sec-fetch-dest': 'document', | |
| 'sec-fetch-mode': 'navigate', | |
| 'sec-fetch-site': 'same-origin', | |
| 'sec-fetch-user': '?1', | |
| 'upgrade-insecure-requests': '1', | |
| 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36' | |
| } | |
| def get_version(url: str) -> str: | |
| url = url.rstrip('/') + '/versions' | |
| response = requests.get(url) | |
| class Version(HTMLParser): | |
| def __init__(self): | |
| super().__init__() | |
| self.recording = 0 | |
| self.data = '' | |
| def handle_starttag(self, tag, attrs): | |
| if tag == 'a': | |
| for name, value in attrs: | |
| if name == 'href' and '/versions/' in value: | |
| self.recording = 1 | |
| def handle_endtag(self, tag): | |
| if tag == 'a' and self.recording: | |
| self.recording -= 1 | |
| def handle_data(self, data): | |
| if self.recording: | |
| self.data = data | |
| parser = Version() | |
| parser.feed(response.text) | |
| return parser.data.strip() | |
| def replicate_upscale(url: str, image_url: str, upscale: int = 2) -> str: | |
| version = get_version(url) | |
| headers = get_headers(url) | |
| session = requests.Session() | |
| anon_auth = session.get(url, headers=headers) | |
| data = { | |
| "version": version, | |
| "input": { | |
| "img": image_url, | |
| "image": image_url, | |
| "upscale": upscale, | |
| "scale": upscale, | |
| "version": "General - RealESRGANplus", | |
| }, | |
| "face_enhance": False, | |
| "is_training": False, | |
| "stream": False | |
| } | |
| response = session.post('https://replicate.com/api/predictions', headers=headers, data=json.dumps(data)) | |
| prediction_id = response.json()['id'] | |
| while True: | |
| response = session.get(f'https://replicate.com/api/predictions/{prediction_id}', headers=headers) | |
| if 'status' in response.json(): | |
| status = response.json()['status'] | |
| else: | |
| status = 'processing' | |
| if status == 'succeeded': | |
| break | |
| time.sleep(1) | |
| session.close() | |
| return response.json()['output'] | |
| def upscaler(img_url: str) -> list[str] | None: | |
| def run(url): | |
| try: | |
| return replicate_upscale(url, img_url) | |
| except Exception as e: | |
| print(e) | |
| return None | |
| urls = [ | |
| 'https://replicate.com/cjwbw/real-esrgan', | |
| 'https://replicate.com/daanelson/real-esrgan-a100', | |
| 'https://replicate.com/xinntao/realesrgan', | |
| ] | |
| with ThreadPoolExecutor() as executor: | |
| futures = {executor.submit(run, url) for url in urls} | |
| for future in as_completed(futures): | |
| result = future.result() | |
| if result is not None: | |
| break | |
| return [result] | |
| def check_upscale_result(image: str) -> list[str]: | |
| attempt = 0 | |
| response = None | |
| while attempt < 3: | |
| response = upscaler(upload_image(image)) | |
| if response: | |
| return response | |
| attempt += 1 | |
| return ['https://iili.io/JzrxjDP.png'] | |
| with gr.Blocks(analytics_enabled=False, css=radio_css, theme='Taithrah/Minimal', title='апскейл') as demo: | |
| with gr.Tab(label="наложение зернистости пленки и шума", id=2, elem_id="textures_tab"): | |
| with gr.Row(variant="compact", elem_id="textures_tab_images"): | |
| input_image = gr.Image(label="исходник", sources=["upload", "clipboard"], type="numpy") | |
| result_image = gr.Gallery(label="результат", elem_id="textured_result", allow_preview=True, preview=True, show_share_button=False, show_download_button=False) | |
| opacity_slider = gr.Slider(minimum=0.1, maximum=1.0, value=0.7, step=0.1, label="видимость") | |
| apply_button = gr.Button(value="применить", variant="primary") | |
| texture_files = [(f"{i:02d}", texture) for i, texture in enumerate(os.listdir(textures_folder), start=1) if os.path.splitext(texture)[1].lower() in valid_extensions] | |
| textures_choice = gr.Radio(texture_files, show_label=False, interactive=True, elem_id="textures") | |
| apply_button.click(fn=apply_texture, inputs=[input_image, textures_choice, opacity_slider], outputs=result_image, api_name="texturize") | |
| app = FastAPI() | |
| async def some_fastapi_middleware(request: Request, call_next): | |
| response = await call_next(request) | |
| path = request.url.path | |
| if path == "/": | |
| response_body = "" | |
| async for chunk in response.body_iterator: | |
| response_body += chunk.decode() | |
| javascript = f""" | |
| <script type="text/javascript"> | |
| {custom_js} | |
| </script> | |
| """ | |
| response_body = response_body.replace("</body>", javascript + "</body>") | |
| del response.headers["content-length"] | |
| return Response( | |
| content=response_body, | |
| status_code=response.status_code, | |
| headers=dict(response.headers), | |
| media_type=response.media_type | |
| ) | |
| return response | |
| app.mount("/preview", StaticFiles(directory=os.path.join(root, 'preview')), name="preview") | |
| gr.mount_gradio_app(app, demo, path="/") | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |