Spaces:
Paused
Paused
| import gradio as gr | |
| import requests | |
| import time | |
| import json | |
| from contextlib import closing | |
| from websocket import create_connection | |
| from deep_translator import GoogleTranslator | |
| from langdetect import detect | |
| import os | |
| from PIL import Image | |
| import io | |
| import base64 | |
| import os | |
| import random | |
| import tempfile | |
| import re | |
| from gradio_client import Client | |
| import moviepy.editor as mp | |
| def animate_img(encoded_string, model): | |
| url_hg1 = os.getenv("url_hg1") | |
| url_hg2 = os.getenv("url_hg2") | |
| if model == "Stable Video Diffusion": | |
| try: | |
| r = requests.post("https://stable-video-diffusion.com/api/upload", files={"file": open(encoded_string, 'rb')}) | |
| hash_ = r.json()['hash'] | |
| time.sleep(10) | |
| c = 0 | |
| while c < 10: | |
| r2 = requests.get(f"https://stable-video-diffusion.com/result?hash={hash_}") | |
| source_string = r2.text | |
| if "Generation has been in progress for" in source_string: | |
| time.sleep(15) | |
| c += 1 | |
| continue | |
| if "Generation has been in progress for" not in source_string: | |
| pattern = r'https://storage.stable-video-diffusion.com/([a-f0-9]{32})\.mp4' | |
| matches = re.findall(pattern, source_string) | |
| sd_video = [] | |
| for match in matches: | |
| sd_video.append(f"https://storage.stable-video-diffusion.com/{match}.mp4") | |
| if len(sd_video) != 0: | |
| print("s_1") | |
| return sd_video[0] | |
| else: | |
| _ = 1/0 | |
| print("f_1") | |
| except: | |
| return None | |
| #print("2") | |
| #client1 = Client(url_hg1) | |
| #result1 = client1.predict(encoded_string, api_name="/resize_image") | |
| #client = Client(url_hg1) | |
| #result = client.predict(result1, 0, True, 1, 15, api_name="/video") | |
| #res = result[0]['video'] | |
| #print("s_2") | |
| #return res | |
| #if model == "AnimateDiff": | |
| # client = Client(url_hg2) | |
| # result = client.predict(encoded_string, "zoom-out", api_name="/predict") | |
| # return result | |
| def create_video(prompt, model): | |
| url_sd3 = os.getenv("url_sd3") | |
| url_sd4 = os.getenv("url_sd4") | |
| if model == "Stable Video Diffusion": | |
| try: | |
| with closing(create_connection(f"{url_sd3}", timeout=120)) as conn: | |
| conn.send('{"fn_index":3,"session_hash":""}') | |
| conn.send(f'{{"data":["{prompt}","[deformed | disfigured], poorly drawn, [bad : wrong] anatomy, [extra | missing | floating | disconnected] limb, (mutated hands and fingers), blurry",7.5,"(No style)"],"event_data":null,"fn_index":3,"session_hash":""}}') | |
| c = 0 | |
| while c < 60: | |
| status = json.loads(conn.recv())['msg'] | |
| if status == 'estimation': | |
| c += 1 | |
| time.sleep(1) | |
| continue | |
| if status == 'process_starts': | |
| break | |
| photo = json.loads(conn.recv())['output']['data'][0][0] | |
| base64_string = photo.replace('data:image/jpeg;base64,', '').replace('data:image/png;base64,', '') | |
| image_bytes = base64.b64decode(base64_string) | |
| #with tempfile.NamedTemporaryFile(delete=False) as temp: | |
| # temp.write(image_bytes) | |
| # temp_file_path = temp.name | |
| # print("cs_1") | |
| except: | |
| try: | |
| print("c_2") | |
| with closing(create_connection(f"{url_sd4}", timeout=120)) as conn: | |
| conn.send('{"fn_index":0,"session_hash":""}') | |
| conn.send(f'{{"data":["{prompt}","[deformed | disfigured], poorly drawn, [bad : wrong] anatomy, [extra | missing | floating | disconnected] limb, (mutated hands and fingers), blurry","dreamshaperXL10_alpha2.safetensors [c8afe2ef]",30,"DPM++ 2M Karras",7,1024,1024,-1],"event_data":null,"fn_index":0,"session_hash":""}}') | |
| conn.recv() | |
| conn.recv() | |
| conn.recv() | |
| conn.recv() | |
| photo = json.loads(conn.recv())['output']['data'][0] | |
| base64_string = photo.replace('data:image/jpeg;base64,', '').replace('data:image/png;base64,', '') | |
| image_bytes = base64.b64decode(base64_string) | |
| #with tempfile.NamedTemporaryFile(delete=False) as temp: | |
| # temp.write(image_bytes) | |
| # temp_file_path = temp.name | |
| # print("cs_2") | |
| except: | |
| return None | |
| try: | |
| r = requests.post("https://stable-video-diffusion.com/api/upload", files={"file": image_bytes}) | |
| print(r.text) | |
| hash_ = r.json()['hash'] | |
| time.sleep(10) | |
| c = 0 | |
| while c < 10: | |
| r2 = requests.get(f"https://stable-video-diffusion.com/result?hash={hash_}") | |
| source_string = r2.text | |
| if "Generation has been in progress for" in source_string: | |
| time.sleep(15) | |
| c += 1 | |
| continue | |
| if "Generation has been in progress for" not in source_string: | |
| pattern = r'https://storage.stable-video-diffusion.com/([a-f0-9]{32})\.mp4' | |
| matches = re.findall(pattern, source_string) | |
| sd_video = [] | |
| for match in matches: | |
| sd_video.append(f"https://storage.stable-video-diffusion.com/{match}.mp4") | |
| print(sd_video[0]) | |
| if len(sd_video) != 0: | |
| return sd_video[0] | |
| else: | |
| _ = 1/0 | |
| except: | |
| return None | |
| #client1 = Client("https://emmadrex-stable-video-diffusion.hf.space") | |
| #result1 = client1.predict(encoded_string, api_name="/resize_image") | |
| #client = Client("https://emmadrex-stable-video-diffusion.hf.space") | |
| #result = client.predict(result1, 0, True, 1, 15, api_name="/video") | |
| #return result[0]['video'] | |
| if model == "AnimateDiff": | |
| data = {"prompt": prompt, "negative_prompt": "EasyNegative"} | |
| r = requests.post("https://sd.cuilutech.com/sdapi/async/txt2gif", json=data) | |
| c = 0 | |
| while c < 60: | |
| r2 = requests.post("https://sd.cuilutech.com/sdapi/get_task_info", json={'task_id': r.json()['data']['task_id']}) | |
| time.sleep(2) | |
| if r2.json()['data']: | |
| photo = r2.json()['data']['image_urls'][0] | |
| break | |
| c += 1 | |
| image = base64.b64encode(requests.get(photo).content).decode("utf-8") | |
| with tempfile.NamedTemporaryFile(delete=False) as temp: | |
| temp.write(base64.decodebytes(bytes(image, "utf-8"))) | |
| temp_file_path = temp.name | |
| clip = mp.VideoFileClip(temp_file_path) | |
| n_im2 = f"{time.time()}" | |
| temp_file2 = tempfile.NamedTemporaryFile(prefix=f'aaafff{n_im2}', suffix='.mp4', delete=False) | |
| clip.write_videofile(temp_file2.name) | |
| return temp_file2.name | |
| def flip_text1(prompt, motion): | |
| try: | |
| language = detect(prompt) | |
| if language == 'ru': | |
| prompt = GoogleTranslator(source='ru', target='en').translate(prompt) | |
| print(prompt) | |
| except: | |
| prompt = 'video' | |
| url_video_g = os.getenv("url_video_g") | |
| url_video_c = os.getenv("url_video_c") | |
| if motion == "Приближение →←": | |
| motion = 'zoom in' | |
| if motion == "Отдаление ←→": | |
| motion = 'zoom out' | |
| if motion == "Вверх ↑": | |
| motion = 'up' | |
| if motion == "Вниз ↓": | |
| motion = 'down' | |
| if motion == "Влево ←": | |
| motion = 'left' | |
| if motion == "Вправо →": | |
| motion = 'right' | |
| if motion == "По часовой стрелке ⟳": | |
| motion = 'rotate cw' | |
| if motion == "Против часовой стрелки ⟲": | |
| motion = 'rotate ccw' | |
| data = {"prompt": f"{prompt}","image": "null", "denoise": 0.75,"motion": motion} | |
| r = requests.post(f"{url_video_g}", json=data) | |
| while True: | |
| data2 = {"task_id": f"{r.json()['task_id']}"} | |
| r2 = requests.post(f"{url_video_c}", json=data2) | |
| time.sleep(3) | |
| try: | |
| if r2.json()['status'] == "QUEUED": | |
| continue | |
| if r2.json()['status'] == "PROCESSING": | |
| continue | |
| except: | |
| try: | |
| n_im2 = f"{time.time()}" | |
| with tempfile.NamedTemporaryFile(prefix=f'aaafff{n_im2}', suffix='.mp4', delete=False) as file: | |
| for chunk in r2.iter_content(chunk_size=1024): | |
| if chunk: | |
| file.write(chunk) | |
| return file.name | |
| except Exception as e: | |
| print(e) | |
| break | |
| def flip_text2(encoded_string, prompt, motion): | |
| url_video_g = os.getenv("url_video_g") | |
| url_video_c = os.getenv("url_video_c") | |
| try: | |
| language = detect(prompt) | |
| if language == 'ru': | |
| prompt = GoogleTranslator(source='ru', target='en').translate(prompt) | |
| print(prompt) | |
| except: | |
| pass | |
| if motion == "Приближение →←": | |
| motion = 'zoom in' | |
| if motion == "Отдаление ←→": | |
| motion = 'zoom out' | |
| if motion == "Вверх ↑": | |
| motion = 'up' | |
| if motion == "Вниз ↓": | |
| motion = 'down' | |
| if motion == "Влево ←": | |
| motion = 'left' | |
| if motion == "Вправо →": | |
| motion = 'right' | |
| if motion == "По часовой стрелке ⟳": | |
| motion = 'rotate cw' | |
| if motion == "Против часовой стрелки ⟲": | |
| motion = 'rotate ccw' | |
| with open(encoded_string, "rb") as image_file: | |
| encoded_string2 = base64.b64encode(image_file.read()) | |
| encoded_string2 = str(encoded_string2).replace("b'", '') | |
| data = {"prompt": f"{prompt}","image": f"{encoded_string2}","denoise":0.75,"motion": motion} | |
| r = requests.post(f"{url_video_g}", json=data) | |
| while True: | |
| data2 = {"task_id": f"{r.json()['task_id']}"} | |
| r2 = requests.post(f"{url_video_c}", json=data2) | |
| time.sleep(3) | |
| try: | |
| if r2.json()['status'] == "QUEUED": | |
| continue | |
| if r2.json()['status'] == "PROCESSING": | |
| continue | |
| except: | |
| try: | |
| n_im2 = f"{time.time()}" | |
| with tempfile.NamedTemporaryFile(prefix=f'aaafff{n_im2}', suffix='.mp4', delete=False) as file: | |
| for chunk in r2.iter_content(chunk_size=1024): | |
| if chunk: | |
| file.write(chunk) | |
| return file.name | |
| except Exception as e: | |
| print(e) | |
| break | |
| css = """ | |
| #generate { | |
| width: 100%; | |
| background: #e253dd !important; | |
| border: none; | |
| border-radius: 50px; | |
| outline: none !important; | |
| color: white; | |
| } | |
| #generate:hover { | |
| background: #de6bda !important; | |
| outline: none !important; | |
| color: #fff; | |
| } | |
| footer {visibility: hidden !important;} | |
| """ | |
| with gr.Blocks(css=css) as demo: | |
| with gr.Tab("Сгенерировать видео"): | |
| with gr.Column(): | |
| prompt = gr.Textbox(placeholder="Введите описание видео...", show_label=True, label='Описание:', lines=3) | |
| # motion1 = gr.Dropdown(value="Приближение →←", interactive=True, show_label=True, label="Движение камеры:", choices=["Приближение →←", "Отдаление ←→", "Вверх ↑", "Вниз ↓", "Влево ←", "Вправо →", "По часовой стрелке ⟳", "Против часовой стрелки ⟲"]) | |
| model = gr.Radio(interactive=True, value="Stable Video Diffusion", show_label=True, | |
| label="Модель нейросети:", choices=['Stable Video Diffusion']) | |
| with gr.Column(): | |
| text_button = gr.Button("Сгенерировать видео", variant='primary', elem_id="generate") | |
| with gr.Column(): | |
| video_output = gr.Video(show_label=True, label='Результат:', type="file") | |
| text_button.click(create_video, inputs=[prompt, model], outputs=video_output) | |
| with gr.Tab("Анимировать изображение"): | |
| with gr.Column(): | |
| prompt2 = gr.Image(show_label=True, interactive=True, type='filepath', label='Исходное изображение:') | |
| # prompt12 = gr.Textbox(placeholder="Введите описание видео...", show_label=True, label='Описание видео (опционально):', lines=3) | |
| # motion2 = gr.Dropdown(value="Приближение →←", interactive=True, show_label=True, label="Движение камеры:", choices=["Приближение →←", "Отдаление ←→", "Вверх ↑", "Вниз ↓", "Влево ←", "Вправо →", "По часовой стрелке ⟳", "Против часовой стрелки ⟲"]) | |
| model2 = gr.Radio(interactive=True, value="Stable Video Diffusion", show_label=True, | |
| label="Модель нейросети:", choices=['Stable Video Diffusion']) | |
| with gr.Column(): | |
| text_button2 = gr.Button("Анимировать изображение", variant='primary', elem_id="generate") | |
| with gr.Column(): | |
| video_output2 = gr.Video(show_label=True, label='Результат:', type="file") | |
| text_button2.click(animate_img, inputs=[prompt2, model2], outputs=video_output2) | |
| demo.queue(concurrency_count=24) | |
| demo.launch() |