File size: 10,856 Bytes
19f880d
 
 
 
 
 
 
7a6b6ef
19f880d
 
 
 
 
 
dac49af
19f880d
 
 
 
 
 
 
7a6b6ef
19f880d
 
 
 
7a6b6ef
19f880d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
948cb41
19f880d
 
 
 
 
 
 
0269fcb
 
 
02e3c97
 
 
19f880d
 
 
 
 
 
 
 
 
0269fcb
19f880d
7a6b6ef
19f880d
948cb41
19f880d
 
 
 
 
948cb41
 
 
 
 
 
19f880d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
55790c5
19f880d
 
55790c5
 
 
19f880d
 
0cf7a89
55790c5
0cf7a89
55790c5
19f880d
 
55790c5
 
c47ff83
19f880d
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
import gradio as gr
import os
import pickle
import requests
import time
import re
from huggingface_hub import hf_hub_download, list_repo_files
from huggingface_hub.errors import LocalEntryNotFoundError

app_root = os.path.dirname(os.path.abspath(__file__))
app_root = os.path.join(app_root, "src/app")

repo_id = "Yzy00518/motionReFit"


selected_videos = {
    '000021': 'walking',
    '000472': 'writing something',
    '001454': 'walking backward',
    '002093': 'cleaning the window',
    '002550': 'walking in a Zig-Zag pattern',
    '003111': 'dancing',
    '003712': 'kneeling down and crawling',
    '004163': 'flying like a bird',
    '004455': 'running in place',
    '004912': 'swimming',
    '005458': 'running forward',
    '005869': 'picking up something',
    '006662': 'falling down',
    '006979': 'punching',
    '007354': 'crawling',
    '007822': 'jumping on both sides',
    '008162': 'dancing like a robot',
    '009768': 'looking back',
    '010193': 'lifting something heavy',
    '013449': 'punching with fists',
    '013659': 'jumping jacks',
    '014920': 'walking in place',
    '015249': 'putting something on their face',
    '015729': 'cleaning the table',
}

def is_six_digit_filename(file_path):
    basename = os.path.basename(file_path)
    return bool(re.match(r'^\d{6}\.[a-zA-Z0-9]+$', basename))

def extract_six_digit_code(file_path):
    basename = os.path.basename(file_path)
    match = re.search(r'(\d{6})', basename)
    return match.group(1) if match else None

def rename_files_with_six_digit_code(file_path):
    if not is_six_digit_filename(file_path):
        return file_path
    six_digit_code = extract_six_digit_code(file_path)
    if six_digit_code in selected_videos:
        new_name = selected_videos[six_digit_code]
        if '.mp4' in file_path:
            new_path = os.path.join(os.path.dirname(file_path), f"{new_name}.mp4")
        elif '.pkl' in file_path:
            new_path = os.path.join(os.path.dirname(file_path), f"{new_name}.pkl")
        else:
            raise ValueError(f"Invalid file extension: {file_path}")
        os.rename(file_path, new_path)
        print(f"Renamed: {file_path} -> {new_path}")
        return new_path
    return None


def download_files_from_huggingface(repo_id, repo_type, max_retries=10):
    file_list = list_repo_files(repo_id=repo_id, repo_type=repo_type)
    for file in file_list:
        relative_path = os.path.dirname(file)
        local_path = os.path.join(os.getcwd(), relative_path)
        if not os.path.exists(local_path):
            os.makedirs(local_path)

        # if is_six_digit_filename(file) and (extract_six_digit_code(file) not in selected_videos):
        #     print(f"Skipping: {file}")
        #     continue
        # if "app_base_motion" in file:
        #     print(f"Skipping:{file}")
        #     continue

        for attempt in range(max_retries):
            try:
                hf_hub_download(repo_id=repo_id, 
                                filename=file, 
                                local_dir=os.getcwd(), 
                                local_dir_use_symlinks=False,
                                resume_download=True,)
                print(f"Successfully Download: {file}")
                # rename_files_with_six_digit_code(os.path.join(local_path, os.path.basename(file)))
                break
            except (requests.exceptions.ReadTimeout, requests.exceptions.ConnectionError, LocalEntryNotFoundError) as e:
                if attempt < max_retries - 1:
                    wait_time = (attempt + 1)*40
                    print(f"Download failed, retrying in {wait_time} seconds")
                    time.sleep(wait_time)
            except FileExistsError:
                print(f"{file} Exists")
                break
            except Exception as e:
                if attempt < max_retries - 1:
                    wait_time = (attempt + 1)*100
                    print(f"Unknown exception, retrying in {wait_time} seconds")
                    time.sleep(wait_time)
                

download_files_from_huggingface(repo_id, 'model')

from src.app.setup_models import *
from src.app.pipeline import pipeline

def select_and_show(data_id):
    video_path = os.path.join(app_root, f"app_base_motion_mp4/{data_id}.mp4")
    return video_path if os.path.exists(video_path) else None


from openai import OpenAI
def __translate(text_raw, temperature=1.5):
    MODEL_NAME = 'gpt-3.5-turbo-0125'
    MAX_TOKENS = 400

    os.environ['OPENAI_API_KEY'] = 'sk-gCaeFye5rjfnhdvDDaE24205108b4bB1Bf1497D6A9EeB704'
    client = OpenAI(
        api_key=os.environ.get("OPENAI_API_KEY", "<your OpenAI API key if not set as env var>"),
        base_url='https://api.xty.app/v1'
    )

    content = [
        {
            "type": "text",
            "text": F"""
                    You will be given a sentence about a person's action or description about their motion. 
                    First, determine which of the following three tasks best fits the input:
                        regen (regeneration) (Use this option very often.) – Modify or generate a new version of the given action (e.g., 'a person is waving his hands' or 'a person kicks with his left foot').
                        style_transfer – Add an emotional or stylistic tone to the given action. For example, 'Replace a person with a proud look using their upper body'. In this task, the output should include one of these adjectives (angry, sad, proud, old, or sexy) and should ONLY change the action style, NOT the action itself.
                        adjustment – (Use this option very rarely.) Only choose this when the input clearly specifies explicit adjustments such as increasing or decreasing the motion amplitude or switching from clockwise to counterclockwise.
                    Next, simplify and normalize the input text by retaining only the parts that emphasize how the action changes. Remove any details about the person's appearance, gender, body type, scenery, or objects they hold.
                    Only for style_transfer tasks, format the output as:   Replace a person with a X look using their Y body. Here, X must be chosen from [angry, sad, proud, old, sexy] and Y is limited to either 'upper' or 'lower'.
                    For other tasks, the format should be 'The person is doing X with their Y'. Here, X is the action and Y is the body part[upper body, lower body, left arm, right arm, both arms] used to perform the action.
                    Finally, your output should follow the format:   task#text where 'task' is one of regen, style_transfer, or adjustment, and 'text' is the simplified and formatted result.
                    REMEMBER TO USE THE '#' SYMBOL TO SEPARATE THE TASK AND TEXT. For example, if the task is regen and the text is 'a person is waving his hands', the output should be: regen#a person is waving his hands with their both arms
                    REMEMBER YOU CAN ONLY CHOOSE ONE TASK. If the input contains multiple tasks, choose the one that best fits the input. If the input contains no tasks, choose regen.
                    """
        },
    ]
    content.append({
        "type": "text",
        "text": F"""The sentence is: {text_raw}"""
    })

    # print(content)
    messages = [{"role": "user", "content": content}]
    params = {"model": MODEL_NAME, "messages": messages, "max_tokens": MAX_TOKENS, "temperature": temperature}
    result = client.chat.completions.create(**params)
    
    translated_text = result.choices[0].message.content
    print("Translated Text:", translated_text)

    return translated_text


def translate(text_raw):
    tasks = ['regen', 'style_transfer', 'adjustment']
    split = '#'
    tempreture = 1.5
    timer = 0
    while timer <= 5:
        result = __translate(text_raw, tempreture)
        if split in result:
            task, text = result.split(split)[0], result.split(split)[1]
            if task in tasks:
                print("=========GPT-3.5 Turbo successfully translated the text.===========")
                print("Task:", task)
                print("Text:", text)
                print("===================================================================")
                return task, text
            
        tempreture -= 0.2
        timer += 1
    return None, None
    

def inference_warpper(data_id, change_prompt):
    def select_model(task):
        if task == 'regen':
            return {'model': models['regen'], 'disc_model': models['regen_disc']}
        elif task == 'style_transfer':
            return {'model': models['style_transfer'], 'disc_model': models['style_transfer_disc']}
        elif task == 'adjustment':
            return {'model': models['adjustment'], 'disc_model': models['adjustment_disc']}
        else:
            raise ValueError(f"Invalid task: {task}")

    with open(os.path.join(app_root, f"app_base_motion/{data_id}.pkl"), 'rb') as f:
        motion = pickle.load(f)

    task, text_ = translate(change_prompt)
    if task is None:
        print("GPT-3.5 Turbo failed to translate the text.")
        return None
    
    model = select_model(task)
    data = {
        'source': motion,
        'text': text_,
        'prog_ind': None,
        'All_one_model': True,
        'model_type': task,
    }
    ret = pipeline(data, model, device ,diffuser, SEQLEN=16, smplx_pth='deps/smplx')
    return ret

def get_all_videos():
    video_dir = os.path.join(app_root, "app_base_motion_mp4")
    all_files = [f for f in os.listdir(video_dir) if f.endswith('.mp4')]
    all_files.sort()
    return [os.path.splitext(f)[0] for f in all_files]

def bar_length(data_id):
    return 4

with gr.Blocks() as demo:
    gr.Markdown("### Edit a motion with your own instruction")
    all_videos = get_all_videos()

    #with gr.Row():
    data_id_input = gr.Dropdown(
            label=f"Step 1: Select the motion to edit({len(all_videos)} in total)",
            choices=all_videos,
            multiselect=False,
            allow_custom_value=True,
            #scale=5,
        )
    show_video_button = gr.Button("Step 2: Display the selected motion")
    video_output = gr.Video(label="Selected Motion")

    change_prompt_textbox = gr.Textbox(visible=True, label="Step 3: Write your editing instruction here")
    inference_button = gr.Button("Step4: Start inference")
    output_file = gr.Video(label="Edited Motion (It takes 40 sec to render. The video is truncated to 6sec due to the constraint of computational resources)")
   
    show_video_button.click(
        fn=select_and_show,
        inputs=[data_id_input],
        outputs=[video_output]
    )

    inference_button.click(
        fn=inference_warpper,
        inputs=[data_id_input, change_prompt_textbox],
        outputs=[output_file]
    )

demo.launch()