Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| from transformers import AutoModelForCausalLM, AutoProcessor, GenerationConfig | |
| from PIL import Image | |
| import torch | |
| import spaces | |
| import json | |
| import re | |
| from langdetect import detect, LangDetectException | |
| from googletrans import Translator | |
| # Load the processor and model | |
| processor = AutoProcessor.from_pretrained( | |
| 'allenai/Molmo-7B-D-0924', | |
| trust_remote_code=True, | |
| torch_dtype='auto', | |
| device_map='auto' | |
| ) | |
| model = AutoModelForCausalLM.from_pretrained( | |
| 'allenai/Molmo-7B-D-0924', | |
| trust_remote_code=True, | |
| torch_dtype='auto', | |
| device_map='auto' | |
| ) | |
| import json | |
| def wrap_json_in_markdown(text): | |
| result = [] | |
| stack = [] | |
| json_start = None | |
| in_json = False | |
| i = 0 | |
| while i < len(text): | |
| char = text[i] | |
| if char in ['{', '[']: | |
| if not in_json: | |
| json_start = i | |
| in_json = True | |
| stack.append(char) | |
| else: | |
| stack.append(char) | |
| elif char in ['}', ']'] and in_json: | |
| if not stack: | |
| # Unbalanced bracket, reset | |
| in_json = False | |
| json_start = None | |
| else: | |
| last = stack.pop() | |
| if (last == '{' and char != '}') or (last == '[' and char != ']'): | |
| # Mismatched brackets | |
| in_json = False | |
| json_start = None | |
| if in_json and not stack: | |
| # Potential end of JSON | |
| json_str = text[json_start:i+1] | |
| try: | |
| # Try to parse the JSON to ensure it's valid | |
| parsed = json.loads(json_str) | |
| # Wrap in Markdown code block | |
| wrapped = f"\n```json\n{json.dumps(parsed, indent=4)}\n```\n" | |
| result.append(text[:json_start]) # Append text before JSON | |
| result.append(wrapped) # Append wrapped JSON | |
| text = text[i+1:] # Update the remaining text | |
| i = -1 # Reset index | |
| except json.JSONDecodeError: | |
| # Not valid JSON, continue searching | |
| pass | |
| in_json = False | |
| json_start = None | |
| i += 1 | |
| result.append(text) # Append any remaining text | |
| return ''.join(result) | |
| def decode_unicode_sequences(unicode_seq): | |
| """ | |
| Decodes a sequence of Unicode escape sequences (e.g., \\u4F60\\u597D) to actual characters. | |
| Args: | |
| unicode_seq (str): A string containing Unicode escape sequences. | |
| Returns: | |
| str: The decoded Unicode string. | |
| """ | |
| # Regular expression to find \uXXXX | |
| unicode_escape_pattern = re.compile(r'\\u([0-9a-fA-F]{4})') | |
| # Function to replace each \uXXXX with the corresponding character | |
| def replace_match(match): | |
| hex_value = match.group(1) | |
| return chr(int(hex_value, 16)) | |
| # Decode all \uXXXX sequences | |
| decoded = unicode_escape_pattern.sub(replace_match, unicode_seq) | |
| return decoded | |
| def is_mandarin(text): | |
| """ | |
| Detects if the given text is in Mandarin. | |
| Args: | |
| text (str): The text to check. | |
| Returns: | |
| bool: True if the text is detected as Mandarin, False otherwise. | |
| """ | |
| try: | |
| lang = detect(text) | |
| return lang == 'zh-cn' or lang == 'zh-tw' or lang == 'zh' | |
| except LangDetectException: | |
| return False | |
| def translate_to_english(text, translator): | |
| """ | |
| Translates the given Mandarin text to English. | |
| Args: | |
| text (str): The Mandarin text to translate. | |
| translator (Translator): An instance of googletrans Translator. | |
| Returns: | |
| str: The translated English text. | |
| """ | |
| try: | |
| translation = translator.translate(text, src='zh-cn', dest='en') | |
| return translation.text | |
| except Exception as e: | |
| print(f"Translation error: {e}") | |
| return text # Return the original text if translation fails | |
| def process_text_for_mandarin_unicode(input_string): | |
| """ | |
| Processes the input string to find Unicode escape sequences representing Mandarin words, | |
| translates them to English, and replaces them accordingly. | |
| Args: | |
| input_string (str): The original string containing Unicode escape sequences. | |
| Returns: | |
| str: The processed string with translations where applicable. | |
| """ | |
| # Initialize the translator | |
| translator = Translator() | |
| # Regular expression to find groups of consecutive \uXXXX sequences | |
| unicode_word_pattern = re.compile(r'(?:\\u[0-9a-fA-F]{4})+') | |
| # Function to process each matched Unicode word | |
| def process_match(match): | |
| unicode_seq = match.group(0) | |
| decoded_word = decode_unicode_sequences(unicode_seq) | |
| if is_mandarin(decoded_word): | |
| translated = translate_to_english(decoded_word, translator) | |
| return f"{translated} ({decoded_word})" | |
| else: | |
| # If not Mandarin, return the original sequence | |
| return unicode_seq | |
| # Substitute all matched Unicode words with their translations if applicable | |
| processed_string = unicode_word_pattern.sub(process_match, input_string) | |
| return processed_string | |
| def process_image_and_text(image, text): | |
| # Process the image and text | |
| inputs = processor.process( | |
| images=[Image.fromarray(image)], | |
| text=text | |
| ) | |
| # Move inputs to the correct device and make a batch of size 1 | |
| inputs = {k: v.to(model.device).unsqueeze(0) for k, v in inputs.items()} | |
| # Generate output | |
| output = model.generate_from_batch( | |
| inputs, | |
| GenerationConfig(max_new_tokens=1024, stop_strings="<|endoftext|>"), | |
| tokenizer=processor.tokenizer | |
| ) | |
| # Only get generated tokens; decode them to text | |
| generated_tokens = output[0, inputs['input_ids'].size(1):] | |
| generated_text = processor.tokenizer.decode(generated_tokens, skip_special_tokens=True) | |
| generated_text_w_json_wrapper = wrap_json_in_markdown(generated_text) | |
| generated_text_w_unicode_mdn = process_text_for_mandarin_unicode(generated_text_w_json_wrapper) | |
| return generated_text_w_unicode_mdn | |
| def chatbot(image, text, history): | |
| if image is None: | |
| return history + [("Please upload an image first.", None)] | |
| response = process_image_and_text(image, text) | |
| history.append({"role": "user", "content": text}) | |
| history.append({"role": "assistant", "content": response}) | |
| return history | |
| # Define the Gradio interface | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# Image Chatbot with Molmo-7B-D-0924") | |
| with gr.Row(): | |
| image_input = gr.Image(type="numpy") | |
| chatbot_output = gr.Chatbot(type="messages") | |
| text_input = gr.Textbox(placeholder="Ask a question about the image...") | |
| submit_button = gr.Button("Submit") | |
| state = gr.State([]) | |
| submit_button.click( | |
| chatbot, | |
| inputs=[image_input, text_input, state], | |
| outputs=[chatbot_output] | |
| ) | |
| text_input.submit( | |
| chatbot, | |
| inputs=[image_input, text_input, state], | |
| outputs=[chatbot_output] | |
| ) | |
| demo.launch() |