| import marimo |
|
|
| __generated_with = "0.16.0" |
| app = marimo.App(width="medium") |
|
|
| with app.setup: |
| |
| |
| import ast, base64, glob, io, json, mimetypes, os, re, tempfile, time, zipfile |
| from typing import Any, Dict, List, Optional, Union, Callable, Literal |
| from pathlib import Path |
|
|
| |
| from dotenv import load_dotenv |
| from ibm_watsonx_ai import APIClient, Credentials |
| from ibm_watsonx_ai.foundation_models import ModelInference |
| from kafka import KafkaProducer, KafkaAdminClient |
|
|
| load_dotenv() |
| from PIL import Image |
| import marimo as mo |
| import pandas as pd |
| import pillow_heif |
| import mimetypes |
| import requests |
| import certifi |
| import base64 |
| import uuid |
| import time |
| import json |
| import os |
| import io |
|
|
|
|
| @app.cell |
| def _(): |
| from base_variables import wx_regions |
| from helper_functions.image_helper_functions import ( |
| create_data_url, |
| convert_heic_to_jpg, |
| create_multiple_image_previews_with_conversion, |
| display_results_stack_with_data, |
| process_multiple_images_with_display_data, |
| process_multiple_images_with_examples, |
| ) |
| from samples.image_example_message import ( |
| image_example_message as example_message, |
| ) |
|
|
| return ( |
| create_multiple_image_previews_with_conversion, |
| display_results_stack_with_data, |
| example_message, |
| process_multiple_images_with_examples, |
| ) |
|
|
|
|
| @app.cell |
| def _(): |
| user = os.environ.get("KAFKA_USER") or "" |
| password = os.environ.get("KAFKA_PASSWORD") or "" |
| kafka_bootstrap_servers = os.environ.get("KAFKA_BOOTSTRAP_SERVERS") or "" |
| kafka_topic_filter = os.environ.get("KAFKA_TOPIC_PREFIX") or "" |
| prompt_template = os.environ.get("EXTRACTION_PROMPT") or "" |
| wx_creds = { |
| "api_key": os.environ.get("WX_APIKEY") or "", |
| "project_id": os.environ.get("WX_PROJECT_ID") or "", |
| "space_id": os.environ.get("WX_SPACE_ID") or "", |
| "region": os.environ.get("WX_REGION") or "EU", |
| "model_id": os.environ.get("CHAT_MODEL") or "mistralai/mistral-medium-2505", |
| "url": os.environ.get("WX_URL") or "https://us-south.ml.cloud.ibm.com", |
| } |
| return ( |
| kafka_bootstrap_servers, |
| kafka_topic_filter, |
| password, |
| prompt_template, |
| user, |
| wx_creds, |
| ) |
|
|
|
|
| @app.cell |
| def _(kafka_bootstrap_servers, password, user, wx_creds): |
| kafka_config = { |
| "bootstrap_servers": kafka_bootstrap_servers.split(","), |
| "security_protocol": "SASL_SSL", |
| "sasl_mechanism": "PLAIN", |
| "sasl_plain_username": user, |
| "sasl_plain_password": password, |
| "ssl_check_hostname": True, |
| "ssl_cafile": certifi.where(), |
| } |
|
|
| parameter_template = { |
| "temperature": float(0.7), |
| "max_tokens": int(os.environ.get("MAX_OUTPUT_TOKENS") or 2048), |
| "top_p": float(1.0), |
| "stop": ["</s>", "<|end_of_text|>"], |
| } |
|
|
| chat_params_env = os.getenv("CHAT_PARAMS") |
| params = json.loads(chat_params_env) if chat_params_env else parameter_template |
| if os.getenv("MAX_OUTPUT_TOKENS"): |
| params["max_tokens"] = int(os.getenv("MAX_OUTPUT_TOKENS")) |
|
|
| chat_model_id = wx_creds["model_id"] or "mistralai/mistral-medium-2505" |
| return chat_model_id, kafka_config, params |
|
|
|
|
| @app.cell |
| def _(wx_creds): |
| wx_credentials = Credentials(url=wx_creds["url"], api_key=wx_creds["api_key"]) |
| client = ( |
| APIClient(credentials=wx_credentials, project_id=wx_creds["project_id"]) |
| if wx_creds["project_id"] |
| else ( |
| APIClient(credentials=wx_credentials, space_id=wx_creds["space_id"]) |
| if wx_creds["space_id"] |
| else APIClient(credentials=wx_credentials) |
| ) |
| ) |
| return (client,) |
|
|
|
|
| @app.cell |
| def _(chat_model_id, client, params): |
| chat_model = ModelInference( |
| api_client=client, model_id=chat_model_id, params=params |
| ) |
| return (chat_model,) |
|
|
|
|
| @app.cell |
| def _(kafka_config): |
| kafka_admin = KafkaAdminClient(**kafka_config) |
| kafka_topics = kafka_admin.describe_topics() |
| return (kafka_topics,) |
|
|
|
|
| @app.cell |
| def _(kafka_topic_filter, kafka_topics): |
| topic_names = ( |
| get_topic_names(kafka_topics, kafka_topic_filter) |
| if kafka_topics |
| else ["placeholder_topic"] |
| ) |
| return (topic_names,) |
|
|
|
|
| @app.cell |
| def _(topic_names): |
| kafka_topic_selector = mo.ui.dropdown( |
| topic_names, |
| label="**Select the Target Topic:**", |
| searchable=True, |
| allow_select_none=False, |
| value=topic_names[0], |
| ) |
| return (kafka_topic_selector,) |
|
|
|
|
| @app.cell |
| def _(kafka_topic_selector): |
| kafka_topic = kafka_topic_selector.value |
| return (kafka_topic,) |
|
|
|
|
| @app.function |
| def get_topic_names(kafka_topics, filter_word=None): |
| topics = [topic["topic"] for topic in kafka_topics] |
| if filter_word: |
| topics = [t for t in topics if filter_word in t] |
| return topics |
|
|
|
|
| @app.cell |
| def _(kafka_config): |
| kafka_producer = KafkaProducer( |
| **kafka_config, |
| value_serializer=lambda x: x.encode("utf-8") if isinstance(x, str) else x, |
| ) |
| return (kafka_producer,) |
|
|
|
|
| @app.cell |
| def _(): |
| image_upload = mo.ui.file( |
| kind="area", |
| filetypes=[".png", ".jpg", ".jpeg", ".tiff", ".heic"], |
| label="Upload an image file (jpeg, png, tiff, heic)", |
| multiple=True, |
| ) |
| return (image_upload,) |
|
|
|
|
| @app.cell |
| def _(image_upload): |
| if image_upload.name(): |
| name_printout = mo.md(f"**{image_upload.name()}**") |
| else: |
| name_printout = mo.md(f"No File Uploaded") |
|
|
| image_uploader = mo.vstack( |
| [image_upload, name_printout], justify="space-around", align="center" |
| ) |
| return (image_uploader,) |
|
|
|
|
| @app.cell |
| def _(prompt_template): |
| prompt_editor = mo.md( |
| """ |
| #### **Provide your instruction prompt here by editing the template:** |
| |
| {editor} |
| |
| """ |
| ).batch( |
| editor=mo.ui.code_editor( |
| value=prompt_template, language="markdown", min_height=200 |
| ) |
| ) |
| return (prompt_editor,) |
|
|
|
|
| @app.function |
| def check_state(variable, if_present=False, if_not_present=True): |
| return if_not_present if not variable else if_present |
|
|
|
|
| @app.cell |
| def _(image_upload): |
| button_disabled = check_state( |
| variable=image_upload.value, if_present=False, if_not_present=True |
| ) |
| return (button_disabled,) |
|
|
|
|
| @app.cell |
| def _(button_disabled): |
| extract_text_button = mo.ui.run_button( |
| label="Extract Text from Images", disabled=button_disabled |
| ) |
| return (extract_text_button,) |
|
|
|
|
| @app.cell |
| def _(prompt_editor, prompt_template): |
| instruction_prompt = ( |
| str(prompt_editor.value) if prompt_editor.value else str(prompt_template) |
| ) |
| return (instruction_prompt,) |
|
|
|
|
| @app.cell |
| def _(extract_text_button, image_uploader, kafka_topic_selector): |
| title = mo.md( |
| """### **GhostEyes:** watsonx.ai Based Image to Mural Sticky Note Converter |
| |
| Licensed under apache 2.0, users hold full accountability for any use or modification of the code. |
| |
| This asset is meant to support IBMers, IBM Partners, Clients in developing understanding of how to better utilize various watsonx.ai features. |
| |
| Created by Milan Mrdenovic [milan.mrdenovic@ibm.com] for IBM Ecosystem Client Engineering, NCEE *version 1.1 - 10.10.2025* |
| """ |
| ) |
| mo.vstack( |
| [title, image_uploader, kafka_topic_selector, extract_text_button], |
| align="center", |
| gap=2, |
| ) |
| return |
|
|
|
|
| @app.cell |
| def _(multiple_image_previews, results_df): |
| extract_stack = mo.vstack( |
| [multiple_image_previews, results_df], |
| align="center", |
| gap=2, |
| ) |
| return (extract_stack,) |
|
|
|
|
| @app.cell |
| def _(create_multiple_image_previews_with_conversion, image_upload): |
| multiple_image_previews = create_multiple_image_previews_with_conversion( |
| image_upload |
| ) |
| return (multiple_image_previews,) |
|
|
|
|
| @app.cell |
| def _( |
| chat_model, |
| example_message, |
| extract_text_button, |
| image_upload, |
| instruction_prompt, |
| process_multiple_images_with_examples, |
| ): |
| if ( |
| extract_text_button.value |
| and image_upload.value |
| and instruction_prompt |
| and example_message |
| and chat_model |
| ): |
| results_df, display_files = process_multiple_images_with_examples( |
| instruction_prompt=instruction_prompt, |
| image_uploader=image_upload, |
| chat_model=chat_model, |
| example_message=example_message, |
| ) |
| results_ready = True |
| else: |
| results_df = display_files = None |
| results_ready = False |
| return display_files, results_df, results_ready |
|
|
|
|
| @app.cell |
| def _(display_files, display_results_stack_with_data, results_df): |
| review_stack = ( |
| display_results_stack_with_data(results_df, display_files) |
| if results_df is not None |
| else None |
| ) |
| return (review_stack,) |
|
|
|
|
| @app.cell |
| def _(kafka_producer, kafka_topic, results_df, results_ready): |
| send_kafka_events = ( |
| send_results_to_kafka( |
| kafka_producer, kafka_topic, results_df, column_to_send="model_response" |
| ) |
| if results_ready == True |
| else None |
| ) |
| return |
|
|
|
|
| @app.cell |
| def _(): |
| pillow_heif.register_heif_opener() |
| return |
|
|
|
|
| @app.cell |
| def _(extract_stack): |
| ui_accordion_section_1 = mo.accordion( |
| {"**Preview Selected Images Tab**": extract_stack} |
| ) |
| ui_accordion_section_1 |
| return |
|
|
|
|
| @app.cell |
| def _(review_stack): |
| ui_accordion_section_2 = mo.accordion({"**Review Outputs Tab**": review_stack}) |
| ui_accordion_section_2 |
| return |
|
|
|
|
| @app.function |
| def send_results_to_kafka( |
| kafka_producer, |
| kafka_topic, |
| results_df, |
| exclude_value="No Text Detected", |
| column_to_send="model_response", |
| sleep_time=0.2, |
| ): |
| """ |
| Send DataFrame results to Kafka topic, excluding specified values. |
| |
| Args: |
| kafka_producer: Kafka producer instance |
| kafka_topic: Kafka topic name |
| results_df: DataFrame containing results |
| exclude_value: Value to exclude from sending (default: "No Text Detected") |
| column_to_send: Column name to send (default: "model_response") |
| sleep_time: Time to sleep between sends in seconds (default: 0.2) |
| """ |
| for _, row in results_df.iterrows(): |
| value = row[column_to_send] |
| if value != exclude_value: |
| kafka_producer.send(topic=kafka_topic, value=str(value)) |
| time.sleep(sleep_time) |
|
|
|
|
| if __name__ == "__main__": |
| app.run() |
|
|