import gradio as gr from gradio import ChatMessage from langchain_core.messages import BaseMessage, HumanMessage import time from ml_pipeline_workflow import ml_app as workflow def format_namespace(namespace): """네임스페이스에서 그래프 이름 추출""" return namespace[-1].split(":")[0] if len(namespace) > 0 else "root graph" def handle_like(data: gr.LikeData): """Like/dislike 이벤트 핸들러""" if data.liked: print(f"👍 Upvoted: {data.value}") else: print(f"👎 Downvoted: {data.value}") def generate_response(message, history): """ML Pipeline 워크플로우에 대한 nested thought 응답 생성""" inputs = { "messages": [HumanMessage(content=message)], } response = [] message_id_counter = [0] # Use list for mutable counter agent_header_ids = {} # Map agent names to their header message IDs current_namespace = None # Track current namespace to detect changes for namespace, chunk in workflow.stream( inputs, stream_mode="updates", subgraphs=True ): start_time = time.time() for node_name, node_chunk in chunk.items(): formatted_namespace = format_namespace(namespace) # Complete previous non-header message FIRST (before namespace change check) if len(response) > 0 and response[-1].metadata.get("status") == "pending": # Only complete if it's not an agent header if response[-1].metadata.get("id") not in agent_header_ids.values(): prev_msg = response[-1] prev_msg.metadata["status"] = "done" if "start_time" in prev_msg.metadata: prev_msg.metadata["duration"] = time.time() - prev_msg.metadata["start_time"] # Update parent agent header to show completion prev_parent_id = prev_msg.metadata.get("parent_id") if prev_parent_id: prev_title = prev_msg.metadata.get("title", "unknown") for msg in response: if msg.metadata.get("id") == prev_parent_id and msg.metadata.get("is_agent_header"): # Add completion log if not msg.content or "🔄" in msg.content: msg.content = f"✓ {prev_title}" else: msg.content += f"\n✓ {prev_title}" break yield response # Yield to show completion # If namespace changed, complete the previous agent header if current_namespace and current_namespace != formatted_namespace: if current_namespace in agent_header_ids: header_id = agent_header_ids[current_namespace] # Find and complete the agent header for msg in response: if msg.metadata.get("id") == header_id and msg.metadata.get("status") == "pending": msg.metadata["status"] = "done" if "start_time" in msg.metadata: msg.metadata["duration"] = time.time() - msg.metadata["start_time"] yield response break current_namespace = formatted_namespace # If this is a subgraph node, ensure parent header exists parent_id = None if formatted_namespace != "root graph": # This node is inside an agent subgraph if formatted_namespace not in agent_header_ids: # Create agent header message first message_id_counter[0] += 1 header_id = message_id_counter[0] agent_header_ids[formatted_namespace] = header_id agent_emojis = { "supervisor": "👔", "data_extraction_expert": "📊", "pretraining_expert": "🔬", "finetuning_expert": "🎯", "evaluation_expert": "📈" } emoji = agent_emojis.get(formatted_namespace, "🧠") header_message = ChatMessage( content="", metadata={ "title": f"{emoji} {formatted_namespace}", "id": header_id, "status": "pending", "start_time": time.time(), "is_agent_header": True # Mark as agent header } ) response.append(header_message) yield response parent_id = agent_header_ids[formatted_namespace] # Create current node message message_id_counter[0] += 1 current_id = message_id_counter[0] # Create title agent_names = ["data_extraction_expert", "pretraining_expert", "finetuning_expert", "evaluation_expert"] if formatted_namespace == "root graph": if node_name == "supervisor": title = f"👔 {node_name}" elif node_name in agent_names: # Skip - we'll handle these when we see their subgraph continue else: title = f"🧠 {node_name}" else: # This is inside an agent title = f"⚙️ {node_name}" # Create node message node_message = ChatMessage( content="", metadata={ "title": title, "id": current_id, "status": "pending", "start_time": time.time() } ) if parent_id: node_message.metadata["parent_id"] = parent_id response.append(node_message) yield response # Process node content out_str = [] if isinstance(node_chunk, dict): for k, v in node_chunk.items(): if isinstance(v, BaseMessage): out_str.append(v.pretty_repr()) elif isinstance(v, list): for list_item in v: if isinstance(list_item, BaseMessage): out_str.append(list_item.pretty_repr()) else: out_str.append(str(list_item)) else: out_str.append(f"{k}:\n{v}") response[-1].content = "\n".join(out_str) # Update parent agent header with current activity if parent_id: for msg in response: if msg.metadata.get("id") == parent_id and msg.metadata.get("is_agent_header"): # Update agent header content with summary of current activity activity_summary = f"🔄 Working on: {node_name}" if len(out_str) > 0: # Add a brief preview of the content preview = out_str[0][:100].replace('\n', ' ') activity_summary += f"\n📝 {preview}..." msg.content = activity_summary break yield response # Keep the node as pending - it will be completed when the next node starts # Just yield to show current state yield response # Complete any remaining pending messages (both agent headers and sub nodes) for msg in response: if msg.metadata.get("status") == "pending": msg.metadata["status"] = "done" if "start_time" in msg.metadata: msg.metadata["duration"] = time.time() - msg.metadata["start_time"] # Add final response (without metadata so it displays as regular message) if node_chunk and isinstance(node_chunk, dict) and 'messages' in node_chunk: final_message = node_chunk['messages'][-1].content response.append(ChatMessage(content=final_message)) yield response # Create interface with Blocks for like functionality with gr.Blocks() as demo: chatbot = gr.Chatbot( type="messages", show_copy_button=True, show_copy_all_button=True, show_share_button=True ) chatbot.like(handle_like, None, None) gr.ChatInterface( generate_response, type="messages", chatbot=chatbot, title="🔬 ML Pipeline Automation", description="데이터 추출, 사전학습, 파인튜닝, 평가 단계를 거치는 완전한 ML 파이프라인입니다. 각 전문가 에이전트의 작업 과정을 단계별로 확인할 수 있습니다.", examples=[ # 전체 파이프라인 예제 "user_events 테이블에서 2024-01-01부터 2024-12-31까지 이벤트 데이터를 추출하고, 모델을 사전학습한 후 5개 클래스 분류 모델을 학습하고 평가해줘", "transaction_data 테이블에서 최근 6개월 데이터로 이상거래 탐지 모델을 처음부터 끝까지 만들어줘", # 조합 예제 (2-3단계) "customer_logs 테이블에서 2024년 데이터를 추출하고 GPT-2 모델 사전학습까지만 해줘", "준비된 데이터(/data/corpus.txt)로 모델을 사전학습하고, 그 모델로 감성분석 3클래스 분류 파인튜닝까지 진행해줘", "사전학습된 모델(/models/pretrained/gpt2)로 spam detection 2클래스 분류 모델 학습하고 성능 평가해줘", "product_reviews 테이블에서 2024년 리뷰 데이터 추출하고 바로 별점 예측 5클래스 분류 모델 파인튜닝해줘", # 단일 기능 예제 "준비된 데이터(/data/pretraining/corpus.txt)로 GPT-2 모델을 3 에포크 사전학습만 해줘", "사전학습된 모델(/models/pretrained/checkpoint)으로 감성분석 3클래스 분류 파인튜닝만 진행해줘", "학습된 모델(/models/finetuned/model)을 테스트 데이터(/data/test.jsonl)로 평가만 해줘" ], cache_examples=False ) if __name__ == "__main__": demo.launch(ssr_mode=False)