MilanM commited on
Commit
a63e5e7
·
verified ·
1 Parent(s): ea21adb

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +127 -278
main.py CHANGED
@@ -9,13 +9,13 @@ with app.setup:
9
  import ast, base64, glob, io, json, mimetypes, os, re, tempfile, time, zipfile
10
  from typing import Any, Dict, List, Optional, Union, Callable, Literal
11
  from pathlib import Path
12
-
13
  # --- Third Party Libraries
14
  from dotenv import load_dotenv
15
  from ibm_watsonx_ai import APIClient, Credentials
16
  from ibm_watsonx_ai.foundation_models import ModelInference
17
- from kafka import KafkaProducer
18
-
19
  load_dotenv()
20
  from PIL import Image
21
  import marimo as mo
@@ -43,25 +43,24 @@ def _():
43
  process_multiple_images_with_display_data,
44
  process_multiple_images_with_examples,
45
  )
46
- from samples.image_example_message import image_example_message as example_message
 
 
 
47
  return (
48
- create_data_url,
49
- convert_heic_to_jpg,
50
  create_multiple_image_previews_with_conversion,
51
  display_results_stack_with_data,
52
- process_multiple_images_with_display_data,
53
- process_multiple_images_with_examples,
54
  example_message,
55
- wx_regions,
56
  )
57
 
58
 
59
  @app.cell
60
- def _(os):
61
  user = os.environ.get("KAFKA_USER") or ""
62
  password = os.environ.get("KAFKA_PASSWORD") or ""
63
  kafka_bootstrap_servers = os.environ.get("KAFKA_BOOTSTRAP_SERVERS") or ""
64
- kafka_topic = os.environ.get("KAFKA_TOPIC") or ""
65
  prompt_template = os.environ.get("EXTRACTION_PROMPT") or ""
66
  wx_creds = {
67
  "api_key": os.environ.get("WX_APIKEY") or "",
@@ -72,16 +71,17 @@ def _(os):
72
  "url": os.environ.get("WX_URL") or "https://us-south.ml.cloud.ibm.com",
73
  }
74
  return (
75
- wx_creds,
76
- prompt_template,
77
- kafka_bootstrap_servers,
78
  kafka_bootstrap_servers,
 
79
  password,
80
- user
 
 
81
  )
82
 
 
83
  @app.cell
84
- def _(certifi, kafka_bootstrap_servers, os, password, user, wx_creds):
85
  kafka_config = {
86
  "bootstrap_servers": kafka_bootstrap_servers.split(","),
87
  "security_protocol": "SASL_SSL",
@@ -107,20 +107,24 @@ def _(certifi, kafka_bootstrap_servers, os, password, user, wx_creds):
107
  chat_model_id = wx_creds["model_id"] or "mistralai/mistral-medium-2505"
108
  return chat_model_id, kafka_config, params
109
 
 
110
  @app.cell
111
- def _(APIClient, Credentials, wx_creds):
112
  wx_credentials = Credentials(url=wx_creds["url"], api_key=wx_creds["api_key"])
113
  client = (
114
  APIClient(credentials=wx_credentials, project_id=wx_creds["project_id"])
115
  if wx_creds["project_id"]
116
- else APIClient(credentials=wx_credentials, space_id=wx_creds["space_id"])
117
- if wx_creds["space_id"]
118
- else APIClient(credentials=wx_credentials)
 
 
119
  )
120
  return (client,)
121
 
 
122
  @app.cell
123
- def _(ModelInference, chat_model_id, client, params):
124
  chat_model = ModelInference(
125
  api_client=client, model_id=chat_model_id, params=params
126
  )
@@ -128,7 +132,50 @@ def _(ModelInference, chat_model_id, client, params):
128
 
129
 
130
  @app.cell
131
- def _(kafka_config, KafkaProducer):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
132
  kafka_producer = KafkaProducer(
133
  **kafka_config,
134
  value_serializer=lambda x: x.encode("utf-8") if isinstance(x, str) else x,
@@ -137,7 +184,7 @@ def _(kafka_config, KafkaProducer):
137
 
138
 
139
  @app.cell
140
- def _(mo):
141
  image_upload = mo.ui.file(
142
  kind="area",
143
  filetypes=[".png", ".jpg", ".jpeg", ".tiff", ".heic"],
@@ -148,7 +195,7 @@ def _(mo):
148
 
149
 
150
  @app.cell
151
- def _(image_upload, mo):
152
  if image_upload.name():
153
  name_printout = mo.md(f"**{image_upload.name()}**")
154
  else:
@@ -157,23 +204,26 @@ def _(image_upload, mo):
157
  image_uploader = mo.vstack(
158
  [image_upload, name_printout], justify="space-around", align="center"
159
  )
160
- return
161
 
162
 
163
  @app.cell
164
- def _(mo, prompt_template):
165
- prompt_editor = mo.md("""
 
166
  #### **Provide your instruction prompt here by editing the template:**
167
 
168
  {editor}
169
 
170
- """).batch(
 
171
  editor=mo.ui.code_editor(
172
  value=prompt_template, language="markdown", min_height=200
173
  )
174
  )
175
  return (prompt_editor,)
176
 
 
177
  @app.function
178
  def check_state(variable, if_present=False, if_not_present=True):
179
  return if_not_present if not variable else if_present
@@ -188,12 +238,13 @@ def _(image_upload):
188
 
189
 
190
  @app.cell
191
- def _(button_disabled, mo):
192
  extract_text_button = mo.ui.run_button(
193
  label="Extract Text from Images", disabled=button_disabled
194
  )
195
  return (extract_text_button,)
196
 
 
197
  @app.cell
198
  def _(prompt_editor, prompt_template):
199
  instruction_prompt = (
@@ -201,23 +252,33 @@ def _(prompt_editor, prompt_template):
201
  )
202
  return (instruction_prompt,)
203
 
 
204
  @app.cell
205
- def _(mo, image_uploader, extract_text_button):
 
 
 
 
 
 
 
 
206
  mo.vstack(
207
- [image_uploader, extract_text_button],
208
  align="center",
209
  gap=2,
210
  )
211
  return
212
 
 
213
  @app.cell
214
- def _(mo, multiple_image_previews, results_df):
215
  extract_stack = mo.vstack(
216
  [multiple_image_previews, results_df],
217
  align="center",
218
  gap=2,
219
  )
220
- return
221
 
222
 
223
  @app.cell
@@ -227,6 +288,7 @@ def _(create_multiple_image_previews_with_conversion, image_upload):
227
  )
228
  return (multiple_image_previews,)
229
 
 
230
  @app.cell
231
  def _(
232
  chat_model,
@@ -253,9 +315,9 @@ def _(
253
  else:
254
  results_df = display_files = None
255
  results_ready = False
256
-
257
  return display_files, results_df, results_ready
258
 
 
259
  @app.cell
260
  def _(display_files, display_results_stack_with_data, results_df):
261
  review_stack = (
@@ -263,16 +325,11 @@ def _(display_files, display_results_stack_with_data, results_df):
263
  if results_df is not None
264
  else None
265
  )
266
- return
 
267
 
268
  @app.cell
269
- def _(
270
- kafka_producer,
271
- kafka_topic,
272
- results_df,
273
- results_ready,
274
- send_results_to_kafka,
275
- ):
276
  send_kafka_events = (
277
  send_results_to_kafka(
278
  kafka_producer, kafka_topic, results_df, column_to_send="model_response"
@@ -282,13 +339,12 @@ def _(
282
  )
283
  return
284
 
 
285
  @app.cell
286
- def _(pillow_heif):
287
  pillow_heif.register_heif_opener()
288
  return
289
 
290
- # --- --- --- --- --- --- ---
291
-
292
 
293
  @app.cell
294
  def _(extract_stack):
@@ -299,245 +355,38 @@ def _(extract_stack):
299
  return
300
 
301
 
302
-
303
  @app.cell
304
  def _(review_stack):
305
- ui_accordion_section_2 = mo.accordion(
306
- {"**Review Outputs Tab**": review_stack}
307
- )
308
  ui_accordion_section_2
309
  return
310
 
311
 
312
- @app.cell
313
- def _(time):
314
- def send_results_to_kafka(
315
- kafka_producer,
316
- kafka_topic,
317
- results_df,
318
- exclude_value="No Text Detected",
319
- column_to_send="model_response",
320
- sleep_time=0.2,
321
- ):
322
- """
323
- Send DataFrame results to Kafka topic, excluding specified values.
324
-
325
- Args:
326
- kafka_producer: Kafka producer instance
327
- kafka_topic: Kafka topic name
328
- results_df: DataFrame containing results
329
- exclude_value: Value to exclude from sending (default: "No Text Detected")
330
- column_to_send: Column name to send (default: "model_response")
331
- sleep_time: Time to sleep between sends in seconds (default: 0.2)
332
- """
333
- for _, row in results_df.iterrows():
334
- value = row[column_to_send]
335
- if value != exclude_value:
336
- kafka_producer.send(topic=kafka_topic, value=str(value))
337
- time.sleep(sleep_time)
338
- return (send_results_to_kafka,)
339
-
340
- # --- --- --- --- --- --- --- In case you want to use the standard watsonx.ai client instantiation form i use in other notebooks
341
-
342
- # @app.cell
343
- # def _(wx_creds, wx_regions):
344
- # client_instantiation_form = (
345
- # mo.md(
346
- # """
347
- # ###**watsonx.ai credentials:**
348
-
349
- # {wx_region}
350
-
351
- # {wx_api_key}
352
-
353
- # {project_id}
354
-
355
- # {space_id}
356
- # """
357
- # )
358
- # .batch(
359
- # wx_region=mo.ui.dropdown(
360
- # wx_regions,
361
- # label="Select your watsonx.ai region:",
362
- # value=wx_creds["region"],
363
- # searchable=True,
364
- # ),
365
- # wx_api_key=mo.ui.text(
366
- # placeholder="Add your IBM Cloud api-key...",
367
- # label="IBM Cloud Api-key:",
368
- # kind="password",
369
- # value=wx_creds["api_key"],
370
- # ),
371
- # project_id=mo.ui.text(
372
- # placeholder="Add your watsonx.ai project_id...",
373
- # label="Project_ID:",
374
- # kind="text",
375
- # value=wx_creds["project_id"],
376
- # ),
377
- # space_id=mo.ui.text(
378
- # placeholder="Add your watsonx.ai space_id...",
379
- # label="Space_ID:",
380
- # kind="text",
381
- # value=wx_creds["space_id"],
382
- # ),
383
- # )
384
- # .form(show_clear_button=True, bordered=False)
385
- # )
386
- # return (client_instantiation_form,)
387
-
388
- # @app.cell
389
- # def _(client_instantiation_form):
390
- # client_setup = client_instantiation_form.value or None
391
-
392
- # ### Extract Credential Variables:
393
- # if client_setup:
394
- # wx_url = client_setup["wx_region"] if client_setup["wx_region"] else "EU"
395
- # wx_api_key = (
396
- # client_setup["wx_api_key"].strip()
397
- # if client_setup["wx_api_key"]
398
- # else None
399
- # )
400
- # os.environ["WATSONX_APIKEY"] = wx_api_key or ""
401
-
402
- # project_id = (
403
- # client_setup["project_id"].strip()
404
- # if client_setup["project_id"]
405
- # else None
406
- # )
407
- # space_id = (
408
- # client_setup["space_id"].strip() if client_setup["space_id"] else None
409
- # )
410
- # else:
411
- # os.environ["WATSONX_APIKEY"] = ""
412
- # project_id = space_id = wx_api_key = wx_url = None
413
- # return client_setup, project_id, space_id, wx_api_key, wx_url
414
-
415
- # @app.cell
416
- # def _(client_setup, project_id, space_id, wx_api_key, wx_url):
417
- # ### Instantiate the watsonx.ai client
418
- # if client_setup:
419
- # try:
420
- # wx_credentials = Credentials(url=wx_url, api_key=wx_api_key)
421
- # project_client = (
422
- # APIClient(credentials=wx_credentials, project_id=project_id)
423
- # if project_id
424
- # else None
425
- # )
426
- # deployment_client = (
427
- # APIClient(credentials=wx_credentials, space_id=space_id)
428
- # if space_id
429
- # else None
430
- # )
431
- # instantiation_success = True
432
- # instantiation_error = None
433
- # except Exception as e:
434
- # instantiation_success = False
435
- # instantiation_error = str(e)
436
- # wx_credentials = project_client = deployment_client = None
437
- # else:
438
- # wx_credentials = project_client = deployment_client = None
439
- # instantiation_success = None
440
- # instantiation_error = None
441
- # return (
442
- # deployment_client,
443
- # instantiation_error,
444
- # instantiation_success,
445
- # project_client,
446
- # )
447
-
448
-
449
- # @app.cell
450
- # def _(client_callout_kind, client_instantiation_form, client_status):
451
- # client_callout = mo.callout(client_status, kind=client_callout_kind)
452
-
453
- # client_section = mo.hstack(
454
- # [client_instantiation_form, client_callout],
455
- # align="center",
456
- # justify="space-around",
457
- # )
458
- # return (client_section,)
459
-
460
- # @app.cell
461
- # def _(
462
- # client_key,
463
- # client_options,
464
- # client_selector,
465
- # client_setup,
466
- # get_key_by_value,
467
- # instantiation_error,
468
- # instantiation_success,
469
- # wrap_with_spaces,
470
- # ):
471
- # active_client_name = (
472
- # get_key_by_value(client_options, client_key)
473
- # if client_key
474
- # else "No Client" or "Project Client"
475
- # )
476
-
477
- # if client_setup:
478
- # if instantiation_success:
479
- # client_status = mo.md(
480
- # f"### ✅ Client Instantiation Successful ✅\n\n"
481
- # f"{client_selector}\n\n"
482
- # f"**Active Client:**{wrap_with_spaces(active_client_name, prefix_spaces=5)}"
483
- # )
484
- # client_callout_kind = "success"
485
- # else:
486
- # client_status = mo.md(
487
- # f"### ❌ Client Instantiation Failed\n**Error:** {instantiation_error}\n\nCheck your region selection and credentials"
488
- # )
489
- # client_callout_kind = "danger"
490
- # else:
491
- # client_status = mo.md(
492
- # f"### Client Instantiation Status will turn Green When Ready\n\n"
493
- # f"{client_selector}\n\n"
494
- # f"**Active Client:**{wrap_with_spaces(active_client_name, prefix_spaces=5)}"
495
- # )
496
- # client_callout_kind = "neutral"
497
- # return client_callout_kind, client_status
498
-
499
-
500
- # @app.cell
501
- # def _(deployment_client, project_client):
502
- # if project_client is not None and deployment_client is not None:
503
- # client_options = {
504
- # "Project Client": project_client,
505
- # "Deployment Client": deployment_client,
506
- # }
507
-
508
- # elif project_client is not None:
509
- # client_options = {"Project Client": project_client}
510
-
511
- # elif deployment_client is not None:
512
- # client_options = {"Deployment Client": deployment_client}
513
-
514
- # else:
515
- # client_options = {"No Client": "Instantiate a Client"}
516
-
517
- # default_client = next(iter(client_options))
518
- # client_selector = mo.ui.dropdown(
519
- # client_options, value=default_client, label="**Switch your active client:**"
520
- # )
521
- # return client_options, client_selector
522
-
523
-
524
- # @app.cell
525
- # def _(client_selector):
526
- # client_key = client_selector.value
527
- # if client_key == "Instantiate a Client":
528
- # client = None
529
- # else:
530
- # client = client_key
531
- # return client, client_key
532
-
533
-
534
- # @app.cell
535
- # def _(client_section):
536
- # ui_accordion_section_1 = mo.accordion(
537
- # {"Section 1: **watsonx.ai Credentials**": client_section}
538
- # )
539
- # ui_accordion_section_1
540
- # return
541
 
542
 
543
  if __name__ == "__main__":
 
9
  import ast, base64, glob, io, json, mimetypes, os, re, tempfile, time, zipfile
10
  from typing import Any, Dict, List, Optional, Union, Callable, Literal
11
  from pathlib import Path
12
+
13
  # --- Third Party Libraries
14
  from dotenv import load_dotenv
15
  from ibm_watsonx_ai import APIClient, Credentials
16
  from ibm_watsonx_ai.foundation_models import ModelInference
17
+ from kafka import KafkaProducer, KafkaAdminClient
18
+
19
  load_dotenv()
20
  from PIL import Image
21
  import marimo as mo
 
43
  process_multiple_images_with_display_data,
44
  process_multiple_images_with_examples,
45
  )
46
+ from samples.image_example_message import (
47
+ image_example_message as example_message,
48
+ )
49
+
50
  return (
 
 
51
  create_multiple_image_previews_with_conversion,
52
  display_results_stack_with_data,
 
 
53
  example_message,
54
+ process_multiple_images_with_examples,
55
  )
56
 
57
 
58
  @app.cell
59
+ def _():
60
  user = os.environ.get("KAFKA_USER") or ""
61
  password = os.environ.get("KAFKA_PASSWORD") or ""
62
  kafka_bootstrap_servers = os.environ.get("KAFKA_BOOTSTRAP_SERVERS") or ""
63
+ kafka_topic_filter = os.environ.get("KAFKA_TOPIC_PREFIX") or ""
64
  prompt_template = os.environ.get("EXTRACTION_PROMPT") or ""
65
  wx_creds = {
66
  "api_key": os.environ.get("WX_APIKEY") or "",
 
71
  "url": os.environ.get("WX_URL") or "https://us-south.ml.cloud.ibm.com",
72
  }
73
  return (
 
 
 
74
  kafka_bootstrap_servers,
75
+ kafka_topic_filter,
76
  password,
77
+ prompt_template,
78
+ user,
79
+ wx_creds,
80
  )
81
 
82
+
83
  @app.cell
84
+ def _(kafka_bootstrap_servers, password, user, wx_creds):
85
  kafka_config = {
86
  "bootstrap_servers": kafka_bootstrap_servers.split(","),
87
  "security_protocol": "SASL_SSL",
 
107
  chat_model_id = wx_creds["model_id"] or "mistralai/mistral-medium-2505"
108
  return chat_model_id, kafka_config, params
109
 
110
+
111
  @app.cell
112
+ def _(wx_creds):
113
  wx_credentials = Credentials(url=wx_creds["url"], api_key=wx_creds["api_key"])
114
  client = (
115
  APIClient(credentials=wx_credentials, project_id=wx_creds["project_id"])
116
  if wx_creds["project_id"]
117
+ else (
118
+ APIClient(credentials=wx_credentials, space_id=wx_creds["space_id"])
119
+ if wx_creds["space_id"]
120
+ else APIClient(credentials=wx_credentials)
121
+ )
122
  )
123
  return (client,)
124
 
125
+
126
  @app.cell
127
+ def _(chat_model_id, client, params):
128
  chat_model = ModelInference(
129
  api_client=client, model_id=chat_model_id, params=params
130
  )
 
132
 
133
 
134
  @app.cell
135
+ def _(kafka_config):
136
+ kafka_admin = KafkaAdminClient(**kafka_config)
137
+ kafka_topics = kafka_admin.describe_topics()
138
+ return (kafka_topics,)
139
+
140
+
141
+ @app.cell
142
+ def _(kafka_topic_filter, kafka_topics):
143
+ topic_names = (
144
+ get_topic_names(kafka_topics, kafka_topic_filter)
145
+ if kafka_topics
146
+ else ["placeholder_topic"]
147
+ )
148
+ return (topic_names,)
149
+
150
+
151
+ @app.cell
152
+ def _(topic_names):
153
+ kafka_topic_selector = mo.ui.dropdown(
154
+ topic_names,
155
+ label="**Select the Target Topic:**",
156
+ searchable=True,
157
+ allow_select_none=False,
158
+ value=topic_names[0],
159
+ )
160
+ return (kafka_topic_selector,)
161
+
162
+
163
+ @app.cell
164
+ def _(kafka_topic_selector):
165
+ kafka_topic = kafka_topic_selector.value
166
+ return (kafka_topic,)
167
+
168
+
169
+ @app.function
170
+ def get_topic_names(kafka_topics, filter_word=None):
171
+ topics = [topic["topic"] for topic in kafka_topics]
172
+ if filter_word:
173
+ topics = [t for t in topics if filter_word in t]
174
+ return topics
175
+
176
+
177
+ @app.cell
178
+ def _(kafka_config):
179
  kafka_producer = KafkaProducer(
180
  **kafka_config,
181
  value_serializer=lambda x: x.encode("utf-8") if isinstance(x, str) else x,
 
184
 
185
 
186
  @app.cell
187
+ def _():
188
  image_upload = mo.ui.file(
189
  kind="area",
190
  filetypes=[".png", ".jpg", ".jpeg", ".tiff", ".heic"],
 
195
 
196
 
197
  @app.cell
198
+ def _(image_upload):
199
  if image_upload.name():
200
  name_printout = mo.md(f"**{image_upload.name()}**")
201
  else:
 
204
  image_uploader = mo.vstack(
205
  [image_upload, name_printout], justify="space-around", align="center"
206
  )
207
+ return (image_uploader,)
208
 
209
 
210
  @app.cell
211
+ def _(prompt_template):
212
+ prompt_editor = mo.md(
213
+ """
214
  #### **Provide your instruction prompt here by editing the template:**
215
 
216
  {editor}
217
 
218
+ """
219
+ ).batch(
220
  editor=mo.ui.code_editor(
221
  value=prompt_template, language="markdown", min_height=200
222
  )
223
  )
224
  return (prompt_editor,)
225
 
226
+
227
  @app.function
228
  def check_state(variable, if_present=False, if_not_present=True):
229
  return if_not_present if not variable else if_present
 
238
 
239
 
240
  @app.cell
241
+ def _(button_disabled):
242
  extract_text_button = mo.ui.run_button(
243
  label="Extract Text from Images", disabled=button_disabled
244
  )
245
  return (extract_text_button,)
246
 
247
+
248
  @app.cell
249
  def _(prompt_editor, prompt_template):
250
  instruction_prompt = (
 
252
  )
253
  return (instruction_prompt,)
254
 
255
+
256
  @app.cell
257
+ def _():
258
+ title = mo.md(
259
+ """### **GhostEyes:** watsonx.ai Based Image to Mural Sticky Note Converter"""
260
+ )
261
+ return (title,)
262
+
263
+
264
+ @app.cell
265
+ def _(extract_text_button, image_uploader, kafka_topic_selector, title):
266
  mo.vstack(
267
+ [title, image_uploader, kafka_topic_selector, extract_text_button],
268
  align="center",
269
  gap=2,
270
  )
271
  return
272
 
273
+
274
  @app.cell
275
+ def _(multiple_image_previews, results_df):
276
  extract_stack = mo.vstack(
277
  [multiple_image_previews, results_df],
278
  align="center",
279
  gap=2,
280
  )
281
+ return (extract_stack,)
282
 
283
 
284
  @app.cell
 
288
  )
289
  return (multiple_image_previews,)
290
 
291
+
292
  @app.cell
293
  def _(
294
  chat_model,
 
315
  else:
316
  results_df = display_files = None
317
  results_ready = False
 
318
  return display_files, results_df, results_ready
319
 
320
+
321
  @app.cell
322
  def _(display_files, display_results_stack_with_data, results_df):
323
  review_stack = (
 
325
  if results_df is not None
326
  else None
327
  )
328
+ return (review_stack,)
329
+
330
 
331
  @app.cell
332
+ def _(kafka_producer, kafka_topic, results_df, results_ready):
 
 
 
 
 
 
333
  send_kafka_events = (
334
  send_results_to_kafka(
335
  kafka_producer, kafka_topic, results_df, column_to_send="model_response"
 
339
  )
340
  return
341
 
342
+
343
  @app.cell
344
+ def _():
345
  pillow_heif.register_heif_opener()
346
  return
347
 
 
 
348
 
349
  @app.cell
350
  def _(extract_stack):
 
355
  return
356
 
357
 
 
358
  @app.cell
359
  def _(review_stack):
360
+ ui_accordion_section_2 = mo.accordion({"**Review Outputs Tab**": review_stack})
 
 
361
  ui_accordion_section_2
362
  return
363
 
364
 
365
+ @app.function
366
+ def send_results_to_kafka(
367
+ kafka_producer,
368
+ kafka_topic,
369
+ results_df,
370
+ exclude_value="No Text Detected",
371
+ column_to_send="model_response",
372
+ sleep_time=0.2,
373
+ ):
374
+ """
375
+ Send DataFrame results to Kafka topic, excluding specified values.
376
+
377
+ Args:
378
+ kafka_producer: Kafka producer instance
379
+ kafka_topic: Kafka topic name
380
+ results_df: DataFrame containing results
381
+ exclude_value: Value to exclude from sending (default: "No Text Detected")
382
+ column_to_send: Column name to send (default: "model_response")
383
+ sleep_time: Time to sleep between sends in seconds (default: 0.2)
384
+ """
385
+ for _, row in results_df.iterrows():
386
+ value = row[column_to_send]
387
+ if value != exclude_value:
388
+ kafka_producer.send(topic=kafka_topic, value=str(value))
389
+ time.sleep(sleep_time)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
390
 
391
 
392
  if __name__ == "__main__":