langgraph-ui / app.py
Yoon-gu Hwang
Basic Chatbot ์ œ๊ฑฐ ๋ฐ ML Pipeline๋งŒ ๋‚จ๊น€
820a28c
import gradio as gr
from gradio import ChatMessage
from langchain_core.messages import BaseMessage, HumanMessage
import time
from ml_pipeline_workflow import ml_app as workflow
def format_namespace(namespace):
"""๋„ค์ž„์ŠคํŽ˜์ด์Šค์—์„œ ๊ทธ๋ž˜ํ”„ ์ด๋ฆ„ ์ถ”์ถœ"""
return namespace[-1].split(":")[0] if len(namespace) > 0 else "root graph"
def handle_like(data: gr.LikeData):
"""Like/dislike ์ด๋ฒคํŠธ ํ•ธ๋“ค๋Ÿฌ"""
if data.liked:
print(f"๐Ÿ‘ Upvoted: {data.value}")
else:
print(f"๐Ÿ‘Ž Downvoted: {data.value}")
def generate_response(message, history):
"""ML Pipeline ์›Œํฌํ”Œ๋กœ์šฐ์— ๋Œ€ํ•œ nested thought ์‘๋‹ต ์ƒ์„ฑ"""
inputs = {
"messages": [HumanMessage(content=message)],
}
response = []
message_id_counter = [0] # Use list for mutable counter
agent_header_ids = {} # Map agent names to their header message IDs
current_namespace = None # Track current namespace to detect changes
for namespace, chunk in workflow.stream(
inputs,
stream_mode="updates",
subgraphs=True
):
start_time = time.time()
for node_name, node_chunk in chunk.items():
formatted_namespace = format_namespace(namespace)
# Complete previous non-header message FIRST (before namespace change check)
if len(response) > 0 and response[-1].metadata.get("status") == "pending":
# Only complete if it's not an agent header
if response[-1].metadata.get("id") not in agent_header_ids.values():
prev_msg = response[-1]
prev_msg.metadata["status"] = "done"
if "start_time" in prev_msg.metadata:
prev_msg.metadata["duration"] = time.time() - prev_msg.metadata["start_time"]
# Update parent agent header to show completion
prev_parent_id = prev_msg.metadata.get("parent_id")
if prev_parent_id:
prev_title = prev_msg.metadata.get("title", "unknown")
for msg in response:
if msg.metadata.get("id") == prev_parent_id and msg.metadata.get("is_agent_header"):
# Add completion log
if not msg.content or "๐Ÿ”„" in msg.content:
msg.content = f"โœ“ {prev_title}"
else:
msg.content += f"\nโœ“ {prev_title}"
break
yield response # Yield to show completion
# If namespace changed, complete the previous agent header
if current_namespace and current_namespace != formatted_namespace:
if current_namespace in agent_header_ids:
header_id = agent_header_ids[current_namespace]
# Find and complete the agent header
for msg in response:
if msg.metadata.get("id") == header_id and msg.metadata.get("status") == "pending":
msg.metadata["status"] = "done"
if "start_time" in msg.metadata:
msg.metadata["duration"] = time.time() - msg.metadata["start_time"]
yield response
break
current_namespace = formatted_namespace
# If this is a subgraph node, ensure parent header exists
parent_id = None
if formatted_namespace != "root graph":
# This node is inside an agent subgraph
if formatted_namespace not in agent_header_ids:
# Create agent header message first
message_id_counter[0] += 1
header_id = message_id_counter[0]
agent_header_ids[formatted_namespace] = header_id
agent_emojis = {
"supervisor": "๐Ÿ‘”",
"data_extraction_expert": "๐Ÿ“Š",
"pretraining_expert": "๐Ÿ”ฌ",
"finetuning_expert": "๐ŸŽฏ",
"evaluation_expert": "๐Ÿ“ˆ"
}
emoji = agent_emojis.get(formatted_namespace, "๐Ÿง ")
header_message = ChatMessage(
content="",
metadata={
"title": f"{emoji} {formatted_namespace}",
"id": header_id,
"status": "pending",
"start_time": time.time(),
"is_agent_header": True # Mark as agent header
}
)
response.append(header_message)
yield response
parent_id = agent_header_ids[formatted_namespace]
# Create current node message
message_id_counter[0] += 1
current_id = message_id_counter[0]
# Create title
agent_names = ["data_extraction_expert", "pretraining_expert",
"finetuning_expert", "evaluation_expert"]
if formatted_namespace == "root graph":
if node_name == "supervisor":
title = f"๐Ÿ‘” {node_name}"
elif node_name in agent_names:
# Skip - we'll handle these when we see their subgraph
continue
else:
title = f"๐Ÿง  {node_name}"
else:
# This is inside an agent
title = f"โš™๏ธ {node_name}"
# Create node message
node_message = ChatMessage(
content="",
metadata={
"title": title,
"id": current_id,
"status": "pending",
"start_time": time.time()
}
)
if parent_id:
node_message.metadata["parent_id"] = parent_id
response.append(node_message)
yield response
# Process node content
out_str = []
if isinstance(node_chunk, dict):
for k, v in node_chunk.items():
if isinstance(v, BaseMessage):
out_str.append(v.pretty_repr())
elif isinstance(v, list):
for list_item in v:
if isinstance(list_item, BaseMessage):
out_str.append(list_item.pretty_repr())
else:
out_str.append(str(list_item))
else:
out_str.append(f"{k}:\n{v}")
response[-1].content = "\n".join(out_str)
# Update parent agent header with current activity
if parent_id:
for msg in response:
if msg.metadata.get("id") == parent_id and msg.metadata.get("is_agent_header"):
# Update agent header content with summary of current activity
activity_summary = f"๐Ÿ”„ Working on: {node_name}"
if len(out_str) > 0:
# Add a brief preview of the content
preview = out_str[0][:100].replace('\n', ' ')
activity_summary += f"\n๐Ÿ“ {preview}..."
msg.content = activity_summary
break
yield response
# Keep the node as pending - it will be completed when the next node starts
# Just yield to show current state
yield response
# Complete any remaining pending messages (both agent headers and sub nodes)
for msg in response:
if msg.metadata.get("status") == "pending":
msg.metadata["status"] = "done"
if "start_time" in msg.metadata:
msg.metadata["duration"] = time.time() - msg.metadata["start_time"]
# Add final response (without metadata so it displays as regular message)
if node_chunk and isinstance(node_chunk, dict) and 'messages' in node_chunk:
final_message = node_chunk['messages'][-1].content
response.append(ChatMessage(content=final_message))
yield response
# Create interface with Blocks for like functionality
with gr.Blocks() as demo:
chatbot = gr.Chatbot(
type="messages",
show_copy_button=True,
show_copy_all_button=True,
show_share_button=True
)
chatbot.like(handle_like, None, None)
gr.ChatInterface(
generate_response,
type="messages",
chatbot=chatbot,
title="๐Ÿ”ฌ ML Pipeline Automation",
description="๋ฐ์ดํ„ฐ ์ถ”์ถœ, ์‚ฌ์ „ํ•™์Šต, ํŒŒ์ธํŠœ๋‹, ํ‰๊ฐ€ ๋‹จ๊ณ„๋ฅผ ๊ฑฐ์น˜๋Š” ์™„์ „ํ•œ ML ํŒŒ์ดํ”„๋ผ์ธ์ž…๋‹ˆ๋‹ค. ๊ฐ ์ „๋ฌธ๊ฐ€ ์—์ด์ „ํŠธ์˜ ์ž‘์—… ๊ณผ์ •์„ ๋‹จ๊ณ„๋ณ„๋กœ ํ™•์ธํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.",
examples=[
# ์ „์ฒด ํŒŒ์ดํ”„๋ผ์ธ ์˜ˆ์ œ
"user_events ํ…Œ์ด๋ธ”์—์„œ 2024-01-01๋ถ€ํ„ฐ 2024-12-31๊นŒ์ง€ ์ด๋ฒคํŠธ ๋ฐ์ดํ„ฐ๋ฅผ ์ถ”์ถœํ•˜๊ณ , ๋ชจ๋ธ์„ ์‚ฌ์ „ํ•™์Šตํ•œ ํ›„ 5๊ฐœ ํด๋ž˜์Šค ๋ถ„๋ฅ˜ ๋ชจ๋ธ์„ ํ•™์Šตํ•˜๊ณ  ํ‰๊ฐ€ํ•ด์ค˜",
"transaction_data ํ…Œ์ด๋ธ”์—์„œ ์ตœ๊ทผ 6๊ฐœ์›” ๋ฐ์ดํ„ฐ๋กœ ์ด์ƒ๊ฑฐ๋ž˜ ํƒ์ง€ ๋ชจ๋ธ์„ ์ฒ˜์Œ๋ถ€ํ„ฐ ๋๊นŒ์ง€ ๋งŒ๋“ค์–ด์ค˜",
# ์กฐํ•ฉ ์˜ˆ์ œ (2-3๋‹จ๊ณ„)
"customer_logs ํ…Œ์ด๋ธ”์—์„œ 2024๋…„ ๋ฐ์ดํ„ฐ๋ฅผ ์ถ”์ถœํ•˜๊ณ  GPT-2 ๋ชจ๋ธ ์‚ฌ์ „ํ•™์Šต๊นŒ์ง€๋งŒ ํ•ด์ค˜",
"์ค€๋น„๋œ ๋ฐ์ดํ„ฐ(/data/corpus.txt)๋กœ ๋ชจ๋ธ์„ ์‚ฌ์ „ํ•™์Šตํ•˜๊ณ , ๊ทธ ๋ชจ๋ธ๋กœ ๊ฐ์„ฑ๋ถ„์„ 3ํด๋ž˜์Šค ๋ถ„๋ฅ˜ ํŒŒ์ธํŠœ๋‹๊นŒ์ง€ ์ง„ํ–‰ํ•ด์ค˜",
"์‚ฌ์ „ํ•™์Šต๋œ ๋ชจ๋ธ(/models/pretrained/gpt2)๋กœ spam detection 2ํด๋ž˜์Šค ๋ถ„๋ฅ˜ ๋ชจ๋ธ ํ•™์Šตํ•˜๊ณ  ์„ฑ๋Šฅ ํ‰๊ฐ€ํ•ด์ค˜",
"product_reviews ํ…Œ์ด๋ธ”์—์„œ 2024๋…„ ๋ฆฌ๋ทฐ ๋ฐ์ดํ„ฐ ์ถ”์ถœํ•˜๊ณ  ๋ฐ”๋กœ ๋ณ„์  ์˜ˆ์ธก 5ํด๋ž˜์Šค ๋ถ„๋ฅ˜ ๋ชจ๋ธ ํŒŒ์ธํŠœ๋‹ํ•ด์ค˜",
# ๋‹จ์ผ ๊ธฐ๋Šฅ ์˜ˆ์ œ
"์ค€๋น„๋œ ๋ฐ์ดํ„ฐ(/data/pretraining/corpus.txt)๋กœ GPT-2 ๋ชจ๋ธ์„ 3 ์—ํฌํฌ ์‚ฌ์ „ํ•™์Šต๋งŒ ํ•ด์ค˜",
"์‚ฌ์ „ํ•™์Šต๋œ ๋ชจ๋ธ(/models/pretrained/checkpoint)์œผ๋กœ ๊ฐ์„ฑ๋ถ„์„ 3ํด๋ž˜์Šค ๋ถ„๋ฅ˜ ํŒŒ์ธํŠœ๋‹๋งŒ ์ง„ํ–‰ํ•ด์ค˜",
"ํ•™์Šต๋œ ๋ชจ๋ธ(/models/finetuned/model)์„ ํ…Œ์ŠคํŠธ ๋ฐ์ดํ„ฐ(/data/test.jsonl)๋กœ ํ‰๊ฐ€๋งŒ ํ•ด์ค˜"
],
cache_examples=False
)
if __name__ == "__main__":
demo.launch(ssr_mode=False)