Spaces:
Sleeping
Sleeping
| # Copyright (c) Alibaba, Inc. and its affiliates. | |
| import os | |
| import secrets | |
| import base64 | |
| import tempfile | |
| from http import HTTPStatus | |
| from pathlib import Path | |
| import gradio as gr | |
| import modelscope_studio.components.antd as antd | |
| import modelscope_studio.components.base as ms | |
| from PIL import Image | |
| from urllib3.exceptions import HTTPError | |
| os.environ['DASHSCOPE_HTTP_BASE_URL'] = 'https://dashscope-intl.aliyuncs.com/api/v1' | |
| # os.environ['DASHSCOPE_WEBSOCKET_BASE_URL'] = 'https://poc-dashscope.aliyuncs.com/api-ws/v1/inference' | |
| import dashscope | |
| from dashscope import MultiModalConversation | |
| API_KEY = os.environ.get('API_KEY') | |
| dashscope.api_key = API_KEY | |
| is_modelscope_studio = os.getenv('MODELSCOPE_ENVIRONMENT') == 'studio' | |
| def get_text(text: str, cn_text: str): | |
| if is_modelscope_studio: | |
| return cn_text | |
| return text | |
| def resolve_image(filename): | |
| return os.path.join(os.path.dirname(__file__), filename) | |
| DEMO_LIST = [ | |
| { | |
| "description": | |
| "Evaluate the integral of the functions graphed using the formula for circles: ", | |
| "image": resolve_image("./examples/1.webp") | |
| }, | |
| { | |
| "description": "请解答这道题", | |
| "image": resolve_image("./examples/5.png") | |
| }, | |
| { | |
| "description": "图片中的滤液E是什么化学物质?", | |
| "image": resolve_image("./examples/3.png") | |
| }, | |
| { | |
| "description": "How many pelicans are there in the picture", | |
| "image": resolve_image("./examples/6.png") | |
| }, | |
| ] | |
| def process_image(image, shouldConvert=False): | |
| # 获取上传文件的目录 | |
| uploaded_file_dir = os.environ.get("GRADIO_TEMP_DIR") or str( | |
| Path(tempfile.gettempdir()) / "gradio") | |
| os.makedirs(uploaded_file_dir, exist_ok=True) | |
| # 创建临时文件路径 | |
| name = f"tmp{secrets.token_hex(20)}.jpg" | |
| filename = os.path.join(uploaded_file_dir, name) | |
| # 保存上传的图片 | |
| if shouldConvert: | |
| new_img = Image.new('RGB', | |
| size=(image.width, image.height), | |
| color=(255, 255, 255)) | |
| new_img.paste(image, (0, 0), mask=image) | |
| image = new_img | |
| image.save(filename) | |
| print("image filename = ", filename) | |
| return filename | |
| def encode_image(image_path): | |
| with open(image_path, "rb") as image_file: | |
| return base64.b64encode(image_file.read()).decode("utf-8") | |
| def on_clear(): | |
| return { | |
| input: gr.update(value=None), | |
| **{ | |
| item: gr.update(value=None) | |
| for item in input_image | |
| }, | |
| } | |
| css = """ | |
| .output-markdown { | |
| overflow: unset !important; | |
| } | |
| """ | |
| with gr.Blocks(css=css) as demo: | |
| with ms.Application() as app: | |
| with antd.ConfigProvider( | |
| locale="zh_CN" if is_modelscope_studio else None, | |
| theme=dict(token=dict(colorPrimary="#a855f7"))): | |
| with antd.Card(elem_style=dict(marginBottom=12), | |
| styles=dict(body=dict(padding=4))): | |
| with antd.Flex(elem_style=dict(width="100%"), | |
| justify="center", | |
| align="center", | |
| gap=14): | |
| with ms.Div(elem_style=dict(flexShrink=0)): | |
| antd.Image(resolve_image("./cutelogo.jpg"), | |
| preview=False, | |
| height=60, | |
| width=60) | |
| with ms.Div(): | |
| antd.Typography.Title("QVQ-72B-Preview", | |
| elem_style=dict(margin=0, | |
| fontSize=24), | |
| level=1) | |
| with ms.AutoLoading(): | |
| with antd.Row(gutter=[8, 8], align="stretch"): | |
| with antd.Col(xs=24, md=8): | |
| with antd.Space(direction="vertical", | |
| elem_style=dict(width="100%")): | |
| with antd.Space(direction="vertical", | |
| elem_style=dict(width="100%"), | |
| elem_id="input-container"): | |
| with ms.Fragment(): | |
| input_image = gr.Image(type="pil", | |
| label="Upload", | |
| sources=["upload"]), | |
| input = antd.Input.Textarea( | |
| placeholder=get_text( | |
| "Ask a question", "输入一个问题"), | |
| auto_size=dict(maxRows=6, minRows=2), | |
| allow_clear=True) | |
| with antd.Flex(align="center", | |
| justify="space-between"): | |
| antd.Typography.Text(get_text( | |
| "Warning: This model only supports single-turn dialogue.", | |
| "注:当前模型只支持单轮对话,如需中文回答,提示词加“用中文回答”"), | |
| type="warning") | |
| tour_btn = antd.Button(get_text( | |
| "Tour", "使用指引"), | |
| variant="filled", | |
| color="default") | |
| with antd.Row(gutter=8): | |
| with antd.Col(span=12): | |
| clear_btn = antd.Button(get_text( | |
| "Clear", "清除"), | |
| block=True) | |
| with antd.Col(span=12): | |
| submit_btn = antd.Button( | |
| get_text("Submit", "提交"), | |
| type="primary", | |
| block=True, | |
| elem_id="submit-btn") | |
| antd.Divider(get_text("Examples", "示例")) | |
| with antd.Flex(gap="small", wrap=True): | |
| for item in DEMO_LIST: | |
| def bind_on_example(_item): | |
| def on_example(): | |
| return gr.update( | |
| value=_item['description'] | |
| ), gr.update(value=_item['image']) | |
| return on_example | |
| with antd.Card( | |
| hoverable=True, | |
| elem_style=dict( | |
| width="100%")) as example: | |
| if "description" in item: | |
| antd.Typography.Text( | |
| item["description"]) | |
| if "image" in item: | |
| antd.Image(item["image"], | |
| preview=False) | |
| example.click( | |
| fn=bind_on_example(item), | |
| outputs=[input, input_image[0]]) | |
| with antd.Col(xs=24, md=16): | |
| with antd.Card(title=get_text("Answer", "答案"), | |
| elem_style=dict(height="100%"), | |
| elem_id="output-container"): | |
| with ms.Slot("extra"): | |
| cancel_btn = antd.Button(get_text( | |
| "Stop", "停止"), | |
| elem_id="cancel-btn", | |
| block=True, | |
| disabled=True) | |
| with ms.Div(elem_style=dict( | |
| maxHeight=1600, | |
| display="flex", | |
| flexDirection="column-reverse", | |
| overflow="auto")): | |
| output = gr.Markdown( | |
| show_copy_button=True, | |
| elem_classes="output-markdown", | |
| latex_delimiters=[{ | |
| "left": '$$', | |
| "right": '$$', | |
| "display": True | |
| }, { | |
| "left": '$', | |
| "right": '$', | |
| "display": False, | |
| }, { | |
| "left": '\\(', | |
| "right": '\\)', | |
| "display": False, | |
| }, { | |
| "left": '\\[', | |
| "right": '\\]', | |
| "display": True | |
| }]) | |
| with antd.Tour(open=False) as tour: | |
| antd.Tour.Step( | |
| title=get_text("Step 1", "步骤 1"), | |
| description=get_text("Upload image and enter text", | |
| "传入图片和文本"), | |
| get_target= | |
| "() => document.querySelector('#input-container')") | |
| antd.Tour.Step( | |
| title=get_text("Step 2", "步骤 2"), | |
| description=get_text("Click the submit button", | |
| "点击提交按钮"), | |
| get_target="() => document.querySelector('#submit-btn')" | |
| ) | |
| antd.Tour.Step( | |
| title=get_text("Step 3", "步骤 3"), | |
| description=get_text("Wait for the result", "等待结果返回"), | |
| get_target= | |
| "() => document.querySelector('#output-container')") | |
| antd.Tour.Step( | |
| title=get_text("Tips", "提示"), | |
| description=get_text("Click here to end output early", | |
| "点击这里提前结束输出"), | |
| get_target="() => document.querySelector('#cancel-btn')" | |
| ) | |
| tour_btn.click(fn=lambda: gr.update(open=True), outputs=[tour]) | |
| gr.on([tour.finish, tour.close], | |
| fn=lambda: gr.update(open=False), | |
| outputs=[tour]) | |
| def generate(image, query): | |
| content = [] | |
| if not image and not query: | |
| raise gr.Error( | |
| get_text("Error: Input is empty", "错误:输入内容为空")) | |
| if image: | |
| imageFile = process_image(image) | |
| base64_image = encode_image(imageFile) | |
| #content.append({'image': f'file://{imageFile}'}) | |
| #content.append({'image': f'data:image/png;base64,{base64_image}', "resized_height": 2160, "resized_width": 3840 }) | |
| content.append({'image': f'data:image/png;base64,{base64_image}', "min_pixels": 8294720, "max_pixels": 8294720 }) | |
| if query: | |
| content.append({'text': query}) | |
| print("image", image) | |
| print("query", query) | |
| messages = [ | |
| { | |
| 'role': 'user', | |
| 'content': content | |
| }, | |
| ] | |
| responses = MultiModalConversation.call( | |
| # model='qwen-vl-max', | |
| # model='qvq-72b-preview', | |
| model='qwen2.5-vl-72b-instruct', | |
| messages=messages, | |
| stream=True, | |
| ) | |
| yield {cancel_btn: gr.update(disabled=False)} | |
| for response in responses: | |
| if not response.status_code == HTTPStatus.OK: | |
| raise HTTPError( | |
| f'response.code: {response.code}\nresponse.message: {response.message}' | |
| ) | |
| response = response.output.choices[0].message.content | |
| if len(response) > 0 and response[0]['text']: | |
| print(response[0]['text']) | |
| yield {output: response[0]['text']} | |
| yield {cancel_btn: gr.update(disabled=True)} | |
| output_process = submit_btn.click(fn=generate, | |
| inputs=[*input_image, input], | |
| outputs=[output, cancel_btn]) | |
| clear_btn.click(fn=on_clear, outputs=[*input_image, input]) | |
| cancel_btn.click(fn=None, | |
| inputs=None, | |
| outputs=None, | |
| cancels=[output_process]) | |
| cancel_btn.click(fn=lambda: gr.update(disabled=True), | |
| inputs=None, | |
| outputs=[cancel_btn]) | |
| demo.queue(default_concurrency_limit=50).launch(ssr_mode=False) | |