import os import os.path as osp import gradio as gr HEADER = """ # Penguin-VL Gradio Interface Developed by [Penguin-VL](https://github.com/tencent-ailab/Penguin-VL) team at Tencent AI Lab. Note: speed on ZeroGPU does not reflect real model speed and may be influenced by the shared environment. For stable and fast Gradio Space deployment and running, please visit [the local UI instructions](https://github.com/tencent-ailab/Penguin-VL?tab=readme-ov-file#-gradio-demo-local-ui). For usage examples and expected results, please refer to [here](https://github.com/tencent-ailab/Penguin-VL/blob/master/inference/notebooks/01_penguinvl_inference_recipes.public.ipynb). Please login with your Hugging Face account first. We provide some example images and videos for easier trials. """ class PenguinVLQwen3GradioInterface(object): def __init__(self, model_client, example_dir=None, default_system_prompt="You are a helpful assistant developed by Tencent AI Lab PenguinVL team.", **server_kwargs): self.model_client = model_client self.server_kwargs = server_kwargs self.default_system_prompt = (default_system_prompt or "").strip() self.image_formats = ("png", "jpg", "jpeg") self.video_formats = ("mp4", "mov") image_examples, video_examples = [], [] if example_dir is not None: example_files = [ osp.join(example_dir, f) for f in os.listdir(example_dir) ] for example_file in example_files: if example_file.endswith(self.image_formats): image_examples.append([example_file]) elif example_file.endswith(self.video_formats): video_examples.append([example_file]) with gr.Blocks() as self.interface: gr.Markdown(HEADER) with gr.Row(): chatbot_kwargs = {"elem_id": "chatbot", "height": 710} try: chatbot = gr.Chatbot(type="messages", **chatbot_kwargs) except TypeError: # Gradio 6 uses OpenAI-style messages by default and removed the `type` arg. chatbot = gr.Chatbot(**chatbot_kwargs) with gr.Column(): with gr.Tab(label="Input"): with gr.Row(): input_video = gr.Video(sources=["upload"], label="Upload Video") input_image = gr.Image(sources=["upload"], type="filepath", label="Upload Image") if len(image_examples): gr.Examples(image_examples, inputs=[input_image], label="Example Images") if len(video_examples): gr.Examples(video_examples, inputs=[input_video], label="Example Videos") input_text = gr.Textbox(label="Input Text", placeholder="Type your message here and press enter to submit") submit_button = gr.Button("Generate") with gr.Tab(label="Configure"): with gr.Accordion("Prompt Config", open=True): system_prompt = gr.Textbox( value=self.default_system_prompt, label="System Prompt", lines=4, placeholder="Optional: system instruction prepended to each request", ) with gr.Accordion("Generation Config", open=True): do_sample = gr.Checkbox(value=True, label="Do Sample") temperature = gr.Slider(minimum=0.0, maximum=1.0, value=0.1, label="Temperature") top_p = gr.Slider(minimum=0.0, maximum=1.0, value=0.9, label="Top P") max_new_tokens = gr.Slider(minimum=0, maximum=4096, value=1536, step=1, label="Max New Tokens") with gr.Accordion("Video Config", open=True): fps = gr.Slider(minimum=0.0, maximum=10.0, value=1, label="FPS") max_frames = gr.Slider(minimum=0, maximum=256, value=180, step=1, label="Max Frames") input_video.change(self._on_video_upload, [chatbot, input_video], [chatbot, input_video]) input_image.change(self._on_image_upload, [chatbot, input_image], [chatbot, input_image]) input_text.submit( self._predict, [ chatbot, input_text, system_prompt, do_sample, temperature, top_p, max_new_tokens, fps, max_frames, ], [chatbot, input_text], ) submit_button.click( self._predict, [ chatbot, input_text, system_prompt, do_sample, temperature, top_p, max_new_tokens, fps, max_frames, ], [chatbot, input_text], ) def _on_video_upload(self, messages, video): messages = messages or [] if video is not None: # messages.append({"role": "user", "content": gr.Video(video)}) messages.append({"role": "user", "content": {"path": video}}) return messages, None def _on_image_upload(self, messages, image): messages = messages or [] if image is not None: # messages.append({"role": "user", "content": gr.Image(image)}) messages.append({"role": "user", "content": {"path": image}}) return messages, None def _on_text_submit(self, messages, text): messages = messages or [] messages.append({"role": "user", "content": text}) return messages, "" def _extract_media_path(self, content): if isinstance(content, dict): if content.get("type") == "text" and isinstance(content.get("text"), str): raise ValueError(f"Text content is not media: {content}") media_path = content.get("path") if media_path: return media_path for value in content.values(): try: return self._extract_media_path(value) except ValueError: continue if isinstance(content, (list, tuple)) and len(content) > 0: for item in content: try: return self._extract_media_path(item) except ValueError: continue raise ValueError(f"Unsupported media content: {content}") def _extract_text_content(self, content): if isinstance(content, str): return content if isinstance(content, dict): if content.get("type") == "text" and isinstance(content.get("text"), str): return content["text"] text = content.get("text") if isinstance(text, str): return text if isinstance(content, (list, tuple)) and len(content) > 0: text_parts = [] for item in content: try: text_parts.append(self._extract_text_content(item)) except ValueError: continue if text_parts: return "\n".join(part for part in text_parts if part) raise ValueError(f"Unsupported text content: {content}") def _normalize_user_content(self, content, fps, max_frames): if isinstance(content, str): return [{"type": "text", "text": content}] if isinstance(content, (list, tuple)): normalized_items = [] for item in content: normalized_items.extend(self._normalize_user_content(item, fps, max_frames)) return normalized_items if isinstance(content, dict): try: text = self._extract_text_content(content) except ValueError: text = None else: return [{"type": "text", "text": text}] media_path = self._extract_media_path(content) media_ext = osp.splitext(media_path)[1].lower().lstrip(".") if media_ext in self.video_formats: return [{"type": "video", "video": {"video_path": media_path, "fps": fps, "max_frames": max_frames}}] if media_ext in self.image_formats: return [{"type": "image", "image": {"image_path": media_path}}] raise ValueError(f"Unsupported media type: {media_path}") raise ValueError(f"Unsupported user content: {content}") def _predict(self, messages, input_text, system_prompt, do_sample, temperature, top_p, max_new_tokens, fps, max_frames): messages = list(messages or []) input_text = input_text or "" if input_text and len(input_text) > 0: messages.append({"role": "user", "content": input_text}) new_messages = [] active_system_prompt = (system_prompt or self.default_system_prompt).strip() if active_system_prompt: new_messages.append({ "role": "system", "content": [{"type": "text", "text": active_system_prompt}], }) contents = [] for message in messages: if message["role"] == "assistant": if len(contents): new_messages.append({"role": "user", "content": contents}) contents = [] new_messages.append(message) elif message["role"] == "user": contents.extend(self._normalize_user_content(message["content"], fps, max_frames)) if len(contents): new_messages.append({"role": "user", "content": contents}) if len(new_messages) == 0 or new_messages[-1]["role"] != "user": return messages generation_config = { "do_sample": do_sample, "temperature": temperature, "top_p": top_p, "max_new_tokens": max_new_tokens } response = self.model_client.submit({"conversation": new_messages, "generation_config": generation_config}) if isinstance(response, str): messages.append({"role": "assistant", "content": response}) yield messages, "" return messages.append({"role": "assistant", "content": ""}) for token in response: messages[-1]['content'] += token yield messages, "" def launch(self): self.interface.launch(**self.server_kwargs)