Penguin-VL / inference /interface /gradio_interface.py
lkeab's picture
Restore default Gradio SSR behavior
bca5918 verified
import os
import os.path as osp
import gradio as gr
HEADER = """
# Penguin-VL Gradio Interface
Developed by [Penguin-VL](https://github.com/tencent-ailab/Penguin-VL) team at Tencent AI Lab.
Note: speed on ZeroGPU does not reflect real model speed and may be influenced by the shared environment. For stable and fast Gradio Space deployment and running, please visit [the local UI instructions](https://github.com/tencent-ailab/Penguin-VL?tab=readme-ov-file#-gradio-demo-local-ui). For usage examples and expected results, please refer to [here](https://github.com/tencent-ailab/Penguin-VL/blob/master/inference/notebooks/01_penguinvl_inference_recipes.public.ipynb).
Please login with your Hugging Face account first. We provide some example images and videos for easier trials.
"""
class PenguinVLQwen3GradioInterface(object):
def __init__(self, model_client, example_dir=None, default_system_prompt="You are a helpful assistant developed by Tencent AI Lab PenguinVL team.", **server_kwargs):
self.model_client = model_client
self.server_kwargs = server_kwargs
self.default_system_prompt = (default_system_prompt or "").strip()
self.image_formats = ("png", "jpg", "jpeg")
self.video_formats = ("mp4", "mov")
image_examples, video_examples = [], []
if example_dir is not None:
example_files = [
osp.join(example_dir, f) for f in os.listdir(example_dir)
]
for example_file in example_files:
if example_file.endswith(self.image_formats):
image_examples.append([example_file])
elif example_file.endswith(self.video_formats):
video_examples.append([example_file])
with gr.Blocks() as self.interface:
gr.Markdown(HEADER)
with gr.Row():
chatbot_kwargs = {"elem_id": "chatbot", "height": 710}
try:
chatbot = gr.Chatbot(type="messages", **chatbot_kwargs)
except TypeError:
# Gradio 6 uses OpenAI-style messages by default and removed the `type` arg.
chatbot = gr.Chatbot(**chatbot_kwargs)
with gr.Column():
with gr.Tab(label="Input"):
with gr.Row():
input_video = gr.Video(sources=["upload"], label="Upload Video")
input_image = gr.Image(sources=["upload"], type="filepath", label="Upload Image")
if len(image_examples):
gr.Examples(image_examples, inputs=[input_image], label="Example Images")
if len(video_examples):
gr.Examples(video_examples, inputs=[input_video], label="Example Videos")
input_text = gr.Textbox(label="Input Text", placeholder="Type your message here and press enter to submit")
submit_button = gr.Button("Generate")
with gr.Tab(label="Configure"):
with gr.Accordion("Prompt Config", open=True):
system_prompt = gr.Textbox(
value=self.default_system_prompt,
label="System Prompt",
lines=4,
placeholder="Optional: system instruction prepended to each request",
)
with gr.Accordion("Generation Config", open=True):
do_sample = gr.Checkbox(value=True, label="Do Sample")
temperature = gr.Slider(minimum=0.0, maximum=1.0, value=0.1, label="Temperature")
top_p = gr.Slider(minimum=0.0, maximum=1.0, value=0.9, label="Top P")
max_new_tokens = gr.Slider(minimum=0, maximum=4096, value=1536, step=1, label="Max New Tokens")
with gr.Accordion("Video Config", open=True):
fps = gr.Slider(minimum=0.0, maximum=10.0, value=1, label="FPS")
max_frames = gr.Slider(minimum=0, maximum=256, value=180, step=1, label="Max Frames")
input_video.change(self._on_video_upload, [chatbot, input_video], [chatbot, input_video])
input_image.change(self._on_image_upload, [chatbot, input_image], [chatbot, input_image])
input_text.submit(
self._predict,
[
chatbot, input_text, system_prompt, do_sample, temperature, top_p, max_new_tokens,
fps, max_frames,
],
[chatbot, input_text],
)
submit_button.click(
self._predict,
[
chatbot, input_text, system_prompt, do_sample, temperature, top_p, max_new_tokens,
fps, max_frames,
],
[chatbot, input_text],
)
def _on_video_upload(self, messages, video):
messages = messages or []
if video is not None:
# messages.append({"role": "user", "content": gr.Video(video)})
messages.append({"role": "user", "content": {"path": video}})
return messages, None
def _on_image_upload(self, messages, image):
messages = messages or []
if image is not None:
# messages.append({"role": "user", "content": gr.Image(image)})
messages.append({"role": "user", "content": {"path": image}})
return messages, None
def _on_text_submit(self, messages, text):
messages = messages or []
messages.append({"role": "user", "content": text})
return messages, ""
def _extract_media_path(self, content):
if isinstance(content, dict):
if content.get("type") == "text" and isinstance(content.get("text"), str):
raise ValueError(f"Text content is not media: {content}")
media_path = content.get("path")
if media_path:
return media_path
for value in content.values():
try:
return self._extract_media_path(value)
except ValueError:
continue
if isinstance(content, (list, tuple)) and len(content) > 0:
for item in content:
try:
return self._extract_media_path(item)
except ValueError:
continue
raise ValueError(f"Unsupported media content: {content}")
def _extract_text_content(self, content):
if isinstance(content, str):
return content
if isinstance(content, dict):
if content.get("type") == "text" and isinstance(content.get("text"), str):
return content["text"]
text = content.get("text")
if isinstance(text, str):
return text
if isinstance(content, (list, tuple)) and len(content) > 0:
text_parts = []
for item in content:
try:
text_parts.append(self._extract_text_content(item))
except ValueError:
continue
if text_parts:
return "\n".join(part for part in text_parts if part)
raise ValueError(f"Unsupported text content: {content}")
def _normalize_user_content(self, content, fps, max_frames):
if isinstance(content, str):
return [{"type": "text", "text": content}]
if isinstance(content, (list, tuple)):
normalized_items = []
for item in content:
normalized_items.extend(self._normalize_user_content(item, fps, max_frames))
return normalized_items
if isinstance(content, dict):
try:
text = self._extract_text_content(content)
except ValueError:
text = None
else:
return [{"type": "text", "text": text}]
media_path = self._extract_media_path(content)
media_ext = osp.splitext(media_path)[1].lower().lstrip(".")
if media_ext in self.video_formats:
return [{"type": "video", "video": {"video_path": media_path, "fps": fps, "max_frames": max_frames}}]
if media_ext in self.image_formats:
return [{"type": "image", "image": {"image_path": media_path}}]
raise ValueError(f"Unsupported media type: {media_path}")
raise ValueError(f"Unsupported user content: {content}")
def _predict(self, messages, input_text, system_prompt, do_sample, temperature, top_p, max_new_tokens,
fps, max_frames):
messages = list(messages or [])
input_text = input_text or ""
if input_text and len(input_text) > 0:
messages.append({"role": "user", "content": input_text})
new_messages = []
active_system_prompt = (system_prompt or self.default_system_prompt).strip()
if active_system_prompt:
new_messages.append({
"role": "system",
"content": [{"type": "text", "text": active_system_prompt}],
})
contents = []
for message in messages:
if message["role"] == "assistant":
if len(contents):
new_messages.append({"role": "user", "content": contents})
contents = []
new_messages.append(message)
elif message["role"] == "user":
contents.extend(self._normalize_user_content(message["content"], fps, max_frames))
if len(contents):
new_messages.append({"role": "user", "content": contents})
if len(new_messages) == 0 or new_messages[-1]["role"] != "user":
return messages
generation_config = {
"do_sample": do_sample,
"temperature": temperature,
"top_p": top_p,
"max_new_tokens": max_new_tokens
}
response = self.model_client.submit({"conversation": new_messages, "generation_config": generation_config})
if isinstance(response, str):
messages.append({"role": "assistant", "content": response})
yield messages, ""
return
messages.append({"role": "assistant", "content": ""})
for token in response:
messages[-1]['content'] += token
yield messages, ""
def launch(self):
self.interface.launch(**self.server_kwargs)