Spaces:
Running on L40S
Running on L40S
| import os | |
| import os.path as osp | |
| import gradio as gr | |
| HEADER = """ | |
| # Penguin-VL Gradio Interface | |
| Developed by [Penguin-VL](https://github.com/tencent-ailab/Penguin-VL) team at Tencent AI Lab. | |
| Note: speed on ZeroGPU does not reflect real model speed and may be influenced by the shared environment. For stable and fast Gradio Space deployment and running, please visit [the local UI instructions](https://github.com/tencent-ailab/Penguin-VL?tab=readme-ov-file#-gradio-demo-local-ui). For usage examples and expected results, please refer to [here](https://github.com/tencent-ailab/Penguin-VL/blob/master/inference/notebooks/01_penguinvl_inference_recipes.public.ipynb). | |
| Please login with your Hugging Face account first. We provide some example images and videos for easier trials. | |
| """ | |
| class PenguinVLQwen3GradioInterface(object): | |
| def __init__(self, model_client, example_dir=None, default_system_prompt="You are a helpful assistant developed by Tencent AI Lab PenguinVL team.", **server_kwargs): | |
| self.model_client = model_client | |
| self.server_kwargs = server_kwargs | |
| self.default_system_prompt = (default_system_prompt or "").strip() | |
| self.image_formats = ("png", "jpg", "jpeg") | |
| self.video_formats = ("mp4", "mov") | |
| image_examples, video_examples = [], [] | |
| if example_dir is not None: | |
| example_files = [ | |
| osp.join(example_dir, f) for f in os.listdir(example_dir) | |
| ] | |
| for example_file in example_files: | |
| if example_file.endswith(self.image_formats): | |
| image_examples.append([example_file]) | |
| elif example_file.endswith(self.video_formats): | |
| video_examples.append([example_file]) | |
| with gr.Blocks() as self.interface: | |
| gr.Markdown(HEADER) | |
| with gr.Row(): | |
| chatbot_kwargs = {"elem_id": "chatbot", "height": 710} | |
| try: | |
| chatbot = gr.Chatbot(type="messages", **chatbot_kwargs) | |
| except TypeError: | |
| # Gradio 6 uses OpenAI-style messages by default and removed the `type` arg. | |
| chatbot = gr.Chatbot(**chatbot_kwargs) | |
| with gr.Column(): | |
| with gr.Tab(label="Input"): | |
| with gr.Row(): | |
| input_video = gr.Video(sources=["upload"], label="Upload Video") | |
| input_image = gr.Image(sources=["upload"], type="filepath", label="Upload Image") | |
| if len(image_examples): | |
| gr.Examples(image_examples, inputs=[input_image], label="Example Images") | |
| if len(video_examples): | |
| gr.Examples(video_examples, inputs=[input_video], label="Example Videos") | |
| input_text = gr.Textbox(label="Input Text", placeholder="Type your message here and press enter to submit") | |
| submit_button = gr.Button("Generate") | |
| with gr.Tab(label="Configure"): | |
| with gr.Accordion("Prompt Config", open=True): | |
| system_prompt = gr.Textbox( | |
| value=self.default_system_prompt, | |
| label="System Prompt", | |
| lines=4, | |
| placeholder="Optional: system instruction prepended to each request", | |
| ) | |
| with gr.Accordion("Generation Config", open=True): | |
| do_sample = gr.Checkbox(value=True, label="Do Sample") | |
| temperature = gr.Slider(minimum=0.0, maximum=1.0, value=0.1, label="Temperature") | |
| top_p = gr.Slider(minimum=0.0, maximum=1.0, value=0.9, label="Top P") | |
| max_new_tokens = gr.Slider(minimum=0, maximum=4096, value=1536, step=1, label="Max New Tokens") | |
| with gr.Accordion("Video Config", open=True): | |
| fps = gr.Slider(minimum=0.0, maximum=10.0, value=1, label="FPS") | |
| max_frames = gr.Slider(minimum=0, maximum=256, value=180, step=1, label="Max Frames") | |
| input_video.change(self._on_video_upload, [chatbot, input_video], [chatbot, input_video]) | |
| input_image.change(self._on_image_upload, [chatbot, input_image], [chatbot, input_image]) | |
| input_text.submit( | |
| self._predict, | |
| [ | |
| chatbot, input_text, system_prompt, do_sample, temperature, top_p, max_new_tokens, | |
| fps, max_frames, | |
| ], | |
| [chatbot, input_text], | |
| ) | |
| submit_button.click( | |
| self._predict, | |
| [ | |
| chatbot, input_text, system_prompt, do_sample, temperature, top_p, max_new_tokens, | |
| fps, max_frames, | |
| ], | |
| [chatbot, input_text], | |
| ) | |
| def _on_video_upload(self, messages, video): | |
| messages = messages or [] | |
| if video is not None: | |
| # messages.append({"role": "user", "content": gr.Video(video)}) | |
| messages.append({"role": "user", "content": {"path": video}}) | |
| return messages, None | |
| def _on_image_upload(self, messages, image): | |
| messages = messages or [] | |
| if image is not None: | |
| # messages.append({"role": "user", "content": gr.Image(image)}) | |
| messages.append({"role": "user", "content": {"path": image}}) | |
| return messages, None | |
| def _on_text_submit(self, messages, text): | |
| messages = messages or [] | |
| messages.append({"role": "user", "content": text}) | |
| return messages, "" | |
| def _extract_media_path(self, content): | |
| if isinstance(content, dict): | |
| if content.get("type") == "text" and isinstance(content.get("text"), str): | |
| raise ValueError(f"Text content is not media: {content}") | |
| media_path = content.get("path") | |
| if media_path: | |
| return media_path | |
| for value in content.values(): | |
| try: | |
| return self._extract_media_path(value) | |
| except ValueError: | |
| continue | |
| if isinstance(content, (list, tuple)) and len(content) > 0: | |
| for item in content: | |
| try: | |
| return self._extract_media_path(item) | |
| except ValueError: | |
| continue | |
| raise ValueError(f"Unsupported media content: {content}") | |
| def _extract_text_content(self, content): | |
| if isinstance(content, str): | |
| return content | |
| if isinstance(content, dict): | |
| if content.get("type") == "text" and isinstance(content.get("text"), str): | |
| return content["text"] | |
| text = content.get("text") | |
| if isinstance(text, str): | |
| return text | |
| if isinstance(content, (list, tuple)) and len(content) > 0: | |
| text_parts = [] | |
| for item in content: | |
| try: | |
| text_parts.append(self._extract_text_content(item)) | |
| except ValueError: | |
| continue | |
| if text_parts: | |
| return "\n".join(part for part in text_parts if part) | |
| raise ValueError(f"Unsupported text content: {content}") | |
| def _normalize_user_content(self, content, fps, max_frames): | |
| if isinstance(content, str): | |
| return [{"type": "text", "text": content}] | |
| if isinstance(content, (list, tuple)): | |
| normalized_items = [] | |
| for item in content: | |
| normalized_items.extend(self._normalize_user_content(item, fps, max_frames)) | |
| return normalized_items | |
| if isinstance(content, dict): | |
| try: | |
| text = self._extract_text_content(content) | |
| except ValueError: | |
| text = None | |
| else: | |
| return [{"type": "text", "text": text}] | |
| media_path = self._extract_media_path(content) | |
| media_ext = osp.splitext(media_path)[1].lower().lstrip(".") | |
| if media_ext in self.video_formats: | |
| return [{"type": "video", "video": {"video_path": media_path, "fps": fps, "max_frames": max_frames}}] | |
| if media_ext in self.image_formats: | |
| return [{"type": "image", "image": {"image_path": media_path}}] | |
| raise ValueError(f"Unsupported media type: {media_path}") | |
| raise ValueError(f"Unsupported user content: {content}") | |
| def _predict(self, messages, input_text, system_prompt, do_sample, temperature, top_p, max_new_tokens, | |
| fps, max_frames): | |
| messages = list(messages or []) | |
| input_text = input_text or "" | |
| if input_text and len(input_text) > 0: | |
| messages.append({"role": "user", "content": input_text}) | |
| new_messages = [] | |
| active_system_prompt = (system_prompt or self.default_system_prompt).strip() | |
| if active_system_prompt: | |
| new_messages.append({ | |
| "role": "system", | |
| "content": [{"type": "text", "text": active_system_prompt}], | |
| }) | |
| contents = [] | |
| for message in messages: | |
| if message["role"] == "assistant": | |
| if len(contents): | |
| new_messages.append({"role": "user", "content": contents}) | |
| contents = [] | |
| new_messages.append(message) | |
| elif message["role"] == "user": | |
| contents.extend(self._normalize_user_content(message["content"], fps, max_frames)) | |
| if len(contents): | |
| new_messages.append({"role": "user", "content": contents}) | |
| if len(new_messages) == 0 or new_messages[-1]["role"] != "user": | |
| return messages | |
| generation_config = { | |
| "do_sample": do_sample, | |
| "temperature": temperature, | |
| "top_p": top_p, | |
| "max_new_tokens": max_new_tokens | |
| } | |
| response = self.model_client.submit({"conversation": new_messages, "generation_config": generation_config}) | |
| if isinstance(response, str): | |
| messages.append({"role": "assistant", "content": response}) | |
| yield messages, "" | |
| return | |
| messages.append({"role": "assistant", "content": ""}) | |
| for token in response: | |
| messages[-1]['content'] += token | |
| yield messages, "" | |
| def launch(self): | |
| self.interface.launch(**self.server_kwargs) | |