import gradio as gr import time import os import sys import re import json import base64 import tqdm import base64 from datetime import datetime import subprocess import copy from pathlib import Path import numpy as np from PIL import Image from argparse import ArgumentParser import tempfile # Function to encode the image def encode_image(image_path): with open(image_path, "rb") as image_file: return base64.b64encode(image_file.read()).decode("utf-8") def array_to_image_path(image_array): if image_array is None: raise ValueError("No image provided. Please upload an image before submitting.") # Convert numpy array to PIL Image img = Image.fromarray(np.uint8(image_array)) # Generate a unique filename using timestamp timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"image_{timestamp}.png" # Save the image img.save(filename) # Get the full path of the saved image full_path = os.path.abspath(filename) return full_path from openai import OpenAI openai_api_key = "EMPTY" openai_api_base = os.environ.get("openai_api_base") client = OpenAI( api_key=openai_api_key, base_url=openai_api_base, ) REVISION = 'v0.0.1' BOX_TAG_PATTERN = r"([\s\S]*?)" PUNCTUATION = "!?。"#$%&'()*+,-/:;<=>@[\]^_`{|}~⦅⦆「」、、〃》「」『』【】〔〕〖〗〘〙〚〛〜〝〞〟〰〾〿–—‘’‛“”„‟…‧﹏." def _get_args(): parser = ArgumentParser() parser.add_argument("--revision", type=str, default=REVISION) parser.add_argument("--cpu-only", action="store_true", help="Run demo with CPU only") parser.add_argument("--share", action="store_true", default=False, help="Create a publicly shareable link for the interface.") parser.add_argument("--inbrowser", action="store_true", default=False, help="Automatically launch the interface in a new tab on the default browser.") parser.add_argument("--server-port", type=int, default=7860, help="Demo server port.") parser.add_argument("--server-name", type=str, default="127.0.0.1", help="Demo server name.") args = parser.parse_args() return args def _parse_text(text): lines = text.split("\n") lines = [line for line in lines if line != ""] count = 0 for i, line in enumerate(lines): if "```" in line: count += 1 items = line.split("`") if count % 2 == 1: lines[i] = f'
'
            else:
                lines[i] = f"
" else: if i > 0: if count % 2 == 1: line = line.replace("`", r"\`") line = line.replace("<", "<") line = line.replace(">", ">") line = line.replace(" ", " ") line = line.replace("*", "*") line = line.replace("_", "_") line = line.replace("-", "-") line = line.replace(".", ".") line = line.replace("!", "!") line = line.replace("(", "(") line = line.replace(")", ")") line = line.replace("$", "$") lines[i] = "
" + line text = "".join(lines) return text def _remove_image_special(text): text = text.replace('', '').replace('', '') return re.sub(r'.*?(|$)', '', text) def is_video_file(filename): video_extensions = ['.mp4', '.avi', '.mkv', '.mov', '.wmv', '.flv', '.webm', '.mpeg'] return any(filename.lower().endswith(ext) for ext in video_extensions) def _launch_demo(args): uploaded_file_dir = os.environ.get("GRADIO_TEMP_DIR") or str( Path(tempfile.gettempdir()) / "gradio" ) def predict(_chatbot, task_history): print("chatbot", _chatbot) print("history",task_history) chat_query = _chatbot[-1][0] query = task_history[-1][0] if len(chat_query) == 0: _chatbot.pop() task_history.pop() return _chatbot print("User: " + _parse_text(query)) history_cp = copy.deepcopy(task_history) full_response = "" messages = [] content = [] for q, a in history_cp: if isinstance(q, (tuple, list)): if is_video_file(q[0]): content.append({'video': f'file://{q[0]}'}) else: content.append({ "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{encode_image(q[0])}" } }) else: content.append({"type": "text", 'text': q}) messages.append({'role': 'user', 'content': content}) messages.append({'role': 'assistant', 'content': [{"type": "text", 'text': a}]}) content = [] messages.pop() responses = client.chat.completions.create( model="Qwen2_5VL", messages=messages, extra_body={}, extra_headers={ "apikey": "empty" }, stream=True, temperature=0, top_p=1.0, ) response_text = [] for response in responses: response = response.choices[0].delta.content for ele in response: response_text.append(ele) _chatbot[-1] = (_parse_text(chat_query), _remove_image_special(''.join(response_text))) yield _chatbot _chatbot[-1] = (_parse_text(chat_query), _remove_image_special(''.join(response_text))) full_response = _parse_text(''.join(response_text)) task_history[-1] = (query, full_response) print("Qwen2.5-VL-Chat: " + _parse_text(full_response)) yield _chatbot def regenerate(_chatbot, task_history): if not task_history: return _chatbot item = task_history[-1] if item[1] is None: return _chatbot task_history[-1] = (item[0], None) chatbot_item = _chatbot.pop(-1) if chatbot_item[0] is None: _chatbot[-1] = (_chatbot[-1][0], None) else: _chatbot.append((chatbot_item[0], None)) _chatbot_gen = predict(_chatbot, task_history) for _chatbot in _chatbot_gen: yield _chatbot def add_text(history, task_history, text): task_text = text history = history if history is not None else [] task_history = task_history if task_history is not None else [] history = history + [(_parse_text(text), None)] task_history = task_history + [(task_text, None)] return history, task_history, "" def add_file(history, task_history, file): history = history if history is not None else [] task_history = task_history if task_history is not None else [] history = history + [((file.name,), None)] task_history = task_history + [((file.name,), None)] return history, task_history def reset_user_input(): return gr.update(value="") def reset_state(task_history): task_history.clear() return [], [] with gr.Blocks() as demo: gr.Markdown("""
Infinity-7B Demo
""") chatbot = gr.Chatbot(label='Infinity-7B', elem_classes="control-height", height=500) query = gr.Textbox(lines=2, label='Input') task_history = gr.State([]) with gr.Row(): addfile_btn = gr.UploadButton("📁 Upload (上传文件)", file_types=["image", "video"]) submit_btn = gr.Button("🚀 Submit (发送)") regen_btn = gr.Button("🤔️ Regenerate (重试)") empty_bin = gr.Button("🧹 Clear History (清除历史)") submit_btn.click(add_text, [chatbot, task_history, query], [chatbot, task_history]).then( predict, [chatbot, task_history], [chatbot], show_progress=True ) submit_btn.click(reset_user_input, [], [query]) empty_bin.click(reset_state, [task_history], [chatbot, task_history], show_progress=True) regen_btn.click(regenerate, [chatbot, task_history], [chatbot], show_progress=True) addfile_btn.upload(add_file, [chatbot, task_history, addfile_btn], [chatbot, task_history], show_progress=True) demo.queue(default_concurrency_limit=40).launch( share=args.share, # inbrowser=args.inbrowser, # server_port=args.server_port, # server_name=args.server_name, ) def main(): args = _get_args() _launch_demo(args) if __name__ == '__main__': main()