MilanM's picture
Update main.py
db1d48b verified
import marimo
__generated_with = "0.16.0"
app = marimo.App(width="medium")
with app.setup:
### Setup Cell
# --- Standard Python Libraries
import ast, base64, glob, io, json, mimetypes, os, re, tempfile, time, zipfile
from typing import Any, Dict, List, Optional, Union, Callable, Literal
from pathlib import Path
# --- Third Party Libraries
from dotenv import load_dotenv
from ibm_watsonx_ai import APIClient, Credentials
from ibm_watsonx_ai.foundation_models import ModelInference
from kafka import KafkaProducer, KafkaAdminClient
load_dotenv()
from PIL import Image
import marimo as mo
import pandas as pd
import pillow_heif
import mimetypes
import requests
import certifi
import base64
import uuid
import time
import json
import os
import io
@app.cell
def _():
from base_variables import wx_regions
from helper_functions.image_helper_functions import (
create_data_url,
convert_heic_to_jpg,
create_multiple_image_previews_with_conversion,
display_results_stack_with_data,
process_multiple_images_with_display_data,
process_multiple_images_with_examples,
)
from samples.image_example_message import (
image_example_message as example_message,
)
return (
create_multiple_image_previews_with_conversion,
display_results_stack_with_data,
example_message,
process_multiple_images_with_examples,
)
@app.cell
def _():
user = os.environ.get("KAFKA_USER") or ""
password = os.environ.get("KAFKA_PASSWORD") or ""
kafka_bootstrap_servers = os.environ.get("KAFKA_BOOTSTRAP_SERVERS") or ""
kafka_topic_filter = os.environ.get("KAFKA_TOPIC_PREFIX") or ""
prompt_template = os.environ.get("EXTRACTION_PROMPT") or ""
wx_creds = {
"api_key": os.environ.get("WX_APIKEY") or "",
"project_id": os.environ.get("WX_PROJECT_ID") or "",
"space_id": os.environ.get("WX_SPACE_ID") or "",
"region": os.environ.get("WX_REGION") or "EU",
"model_id": os.environ.get("CHAT_MODEL") or "mistralai/mistral-medium-2505",
"url": os.environ.get("WX_URL") or "https://us-south.ml.cloud.ibm.com",
}
return (
kafka_bootstrap_servers,
kafka_topic_filter,
password,
prompt_template,
user,
wx_creds,
)
@app.cell
def _(kafka_bootstrap_servers, password, user, wx_creds):
kafka_config = {
"bootstrap_servers": kafka_bootstrap_servers.split(","),
"security_protocol": "SASL_SSL",
"sasl_mechanism": "PLAIN",
"sasl_plain_username": user,
"sasl_plain_password": password,
"ssl_check_hostname": True,
"ssl_cafile": certifi.where(),
}
parameter_template = {
"temperature": float(0.7),
"max_tokens": int(os.environ.get("MAX_OUTPUT_TOKENS") or 2048),
"top_p": float(1.0),
"stop": ["</s>", "<|end_of_text|>"],
}
chat_params_env = os.getenv("CHAT_PARAMS")
params = json.loads(chat_params_env) if chat_params_env else parameter_template
if os.getenv("MAX_OUTPUT_TOKENS"):
params["max_tokens"] = int(os.getenv("MAX_OUTPUT_TOKENS"))
chat_model_id = wx_creds["model_id"] or "mistralai/mistral-medium-2505"
return chat_model_id, kafka_config, params
@app.cell
def _(wx_creds):
wx_credentials = Credentials(url=wx_creds["url"], api_key=wx_creds["api_key"])
client = (
APIClient(credentials=wx_credentials, project_id=wx_creds["project_id"])
if wx_creds["project_id"]
else (
APIClient(credentials=wx_credentials, space_id=wx_creds["space_id"])
if wx_creds["space_id"]
else APIClient(credentials=wx_credentials)
)
)
return (client,)
@app.cell
def _(chat_model_id, client, params):
chat_model = ModelInference(
api_client=client, model_id=chat_model_id, params=params
)
return (chat_model,)
@app.cell
def _(kafka_config):
kafka_admin = KafkaAdminClient(**kafka_config)
kafka_topics = kafka_admin.describe_topics()
return (kafka_topics,)
@app.cell
def _(kafka_topic_filter, kafka_topics):
topic_names = (
get_topic_names(kafka_topics, kafka_topic_filter)
if kafka_topics
else ["placeholder_topic"]
)
return (topic_names,)
@app.cell
def _(topic_names):
kafka_topic_selector = mo.ui.dropdown(
topic_names,
label="**Select the Target Topic:**",
searchable=True,
allow_select_none=False,
value=topic_names[0],
)
return (kafka_topic_selector,)
@app.cell
def _(kafka_topic_selector):
kafka_topic = kafka_topic_selector.value
return (kafka_topic,)
@app.function
def get_topic_names(kafka_topics, filter_word=None):
topics = [topic["topic"] for topic in kafka_topics]
if filter_word:
topics = [t for t in topics if filter_word in t]
return topics
@app.cell
def _(kafka_config):
kafka_producer = KafkaProducer(
**kafka_config,
value_serializer=lambda x: x.encode("utf-8") if isinstance(x, str) else x,
)
return (kafka_producer,)
@app.cell
def _():
image_upload = mo.ui.file(
kind="area",
filetypes=[".png", ".jpg", ".jpeg", ".tiff", ".heic"],
label="Upload an image file (jpeg, png, tiff, heic)",
multiple=True,
)
return (image_upload,)
@app.cell
def _(image_upload):
if image_upload.name():
name_printout = mo.md(f"**{image_upload.name()}**")
else:
name_printout = mo.md(f"No File Uploaded")
image_uploader = mo.vstack(
[image_upload, name_printout], justify="space-around", align="center"
)
return (image_uploader,)
@app.cell
def _(prompt_template):
prompt_editor = mo.md(
"""
#### **Provide your instruction prompt here by editing the template:**
{editor}
"""
).batch(
editor=mo.ui.code_editor(
value=prompt_template, language="markdown", min_height=200
)
)
return (prompt_editor,)
@app.function
def check_state(variable, if_present=False, if_not_present=True):
return if_not_present if not variable else if_present
@app.cell
def _(image_upload):
button_disabled = check_state(
variable=image_upload.value, if_present=False, if_not_present=True
)
return (button_disabled,)
@app.cell
def _(button_disabled):
extract_text_button = mo.ui.run_button(
label="Extract Text from Images", disabled=button_disabled
)
return (extract_text_button,)
@app.cell
def _(prompt_editor, prompt_template):
instruction_prompt = (
str(prompt_editor.value) if prompt_editor.value else str(prompt_template)
)
return (instruction_prompt,)
@app.cell
def _(extract_text_button, image_uploader, kafka_topic_selector):
title = mo.md(
"""### **GhostEyes:** watsonx.ai Based Image to Mural Sticky Note Converter
Licensed under apache 2.0, users hold full accountability for any use or modification of the code.
This asset is meant to support IBMers, IBM Partners, Clients in developing understanding of how to better utilize various watsonx.ai features.
Created by Milan Mrdenovic [milan.mrdenovic@ibm.com] for IBM Ecosystem Client Engineering, NCEE *version 1.1 - 10.10.2025*
"""
)
mo.vstack(
[title, image_uploader, kafka_topic_selector, extract_text_button],
align="center",
gap=2,
)
return
@app.cell
def _(multiple_image_previews, results_df):
extract_stack = mo.vstack(
[multiple_image_previews, results_df],
align="center",
gap=2,
)
return (extract_stack,)
@app.cell
def _(create_multiple_image_previews_with_conversion, image_upload):
multiple_image_previews = create_multiple_image_previews_with_conversion(
image_upload
)
return (multiple_image_previews,)
@app.cell
def _(
chat_model,
example_message,
extract_text_button,
image_upload,
instruction_prompt,
process_multiple_images_with_examples,
):
if (
extract_text_button.value
and image_upload.value
and instruction_prompt
and example_message
and chat_model
):
results_df, display_files = process_multiple_images_with_examples(
instruction_prompt=instruction_prompt,
image_uploader=image_upload,
chat_model=chat_model,
example_message=example_message,
)
results_ready = True
else:
results_df = display_files = None
results_ready = False
return display_files, results_df, results_ready
@app.cell
def _(display_files, display_results_stack_with_data, results_df):
review_stack = (
display_results_stack_with_data(results_df, display_files)
if results_df is not None
else None
)
return (review_stack,)
@app.cell
def _(kafka_producer, kafka_topic, results_df, results_ready):
send_kafka_events = (
send_results_to_kafka(
kafka_producer, kafka_topic, results_df, column_to_send="model_response"
)
if results_ready == True
else None
)
return
@app.cell
def _():
pillow_heif.register_heif_opener()
return
@app.cell
def _(extract_stack):
ui_accordion_section_1 = mo.accordion(
{"**Preview Selected Images Tab**": extract_stack}
)
ui_accordion_section_1
return
@app.cell
def _(review_stack):
ui_accordion_section_2 = mo.accordion({"**Review Outputs Tab**": review_stack})
ui_accordion_section_2
return
@app.function
def send_results_to_kafka(
kafka_producer,
kafka_topic,
results_df,
exclude_value="No Text Detected",
column_to_send="model_response",
sleep_time=0.2,
):
"""
Send DataFrame results to Kafka topic, excluding specified values.
Args:
kafka_producer: Kafka producer instance
kafka_topic: Kafka topic name
results_df: DataFrame containing results
exclude_value: Value to exclude from sending (default: "No Text Detected")
column_to_send: Column name to send (default: "model_response")
sleep_time: Time to sleep between sends in seconds (default: 0.2)
"""
for _, row in results_df.iterrows():
value = row[column_to_send]
if value != exclude_value:
kafka_producer.send(topic=kafka_topic, value=str(value))
time.sleep(sleep_time)
if __name__ == "__main__":
app.run()