langgraph-ui / ml_pipeline_workflow.py
Yoon-gu Hwang
ν”„λ‘œμ νŠΈ ꡬ쑰 평탄화 및 λΆˆν•„μš”ν•œ 파일 정리
5dc0c74
from typing import Dict, List, Any
import time
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langgraph_supervisor import create_supervisor
from langgraph.prebuilt import create_react_agent
load_dotenv()
model = ChatOpenAI(model="gpt-4o")
def extract_events_from_rdb(
table_name: str,
start_date: str,
end_date: str,
event_types: List[str] = None
) -> Dict[str, Any]:
"""
RDB ν…Œμ΄λΈ”μ—μ„œ 이벀트 λ ˆμ½”λ“œλ₯Ό μΆ”μΆœν•˜κ³  ν…μŠ€νŠΈ ν˜•μ‹μœΌλ‘œ λ³€ν™˜ν•©λ‹ˆλ‹€.
Args:
table_name: RDB ν…Œμ΄λΈ” 이름
start_date: μ‹œμž‘ λ‚ μ§œ (YYYY-MM-DD ν˜•μ‹)
end_date: μ’…λ£Œ λ‚ μ§œ (YYYY-MM-DD ν˜•μ‹)
event_types: 필터링할 이벀트 νƒ€μž… λͺ©λ‘ (선택사항)
Returns:
μΆ”μΆœλœ 데이터 톡계 및 파일 경둜λ₯Ό ν¬ν•¨ν•œ λ”•μ…”λ„ˆλ¦¬
"""
time.sleep(0.5)
return {
"status": "success",
"records_extracted": 125847,
"output_file": f"/data/events/{table_name}_{start_date}_{end_date}.txt",
"total_size_mb": 482.3,
"event_type_distribution": {
"user_action": 45230,
"system_event": 32145,
"error_log": 18472,
"transaction": 30000
},
"processing_time_seconds": 12.5
}
def prepare_pretraining_data(
input_file: str,
tokenizer: str = "gpt2",
max_length: int = 512,
min_length: int = 50
) -> Dict[str, Any]:
"""
ν† ν¬λ‚˜μ΄μ œμ΄μ…˜κ³Ό ν¬λ§€νŒ…μ„ 톡해 μ‚¬μ „ν•™μŠ΅μ„ μœ„ν•œ ν…μŠ€νŠΈ 데이터λ₯Ό μ€€λΉ„ν•©λ‹ˆλ‹€.
Args:
input_file: μž…λ ₯ ν…μŠ€νŠΈ 파일 경둜
tokenizer: μ‚¬μš©ν•  ν† ν¬λ‚˜μ΄μ €
max_length: μ΅œλŒ€ μ‹œν€€μŠ€ 길이
min_length: μ΅œμ†Œ μ‹œν€€μŠ€ 길이
Returns:
μ€€λΉ„λœ 데이터 톡계λ₯Ό ν¬ν•¨ν•œ λ”•μ…”λ„ˆλ¦¬
"""
time.sleep(0.5)
return {
"status": "success",
"output_file": "/data/pretraining/tokenized_data.bin",
"total_sequences": 89234,
"total_tokens": 45623890,
"avg_sequence_length": 511.2,
"vocab_size": 50257,
"processing_time_seconds": 34.2
}
def pretrain_model(
data_file: str,
model_architecture: str = "gpt2-small",
num_epochs: int = 3,
batch_size: int = 32,
learning_rate: float = 5e-5
) -> Dict[str, Any]:
"""
μ€€λΉ„λœ λ°μ΄ν„°λ‘œ μ–Έμ–΄λͺ¨λΈμ„ μ‚¬μ „ν•™μŠ΅μ‹œν‚΅λ‹ˆλ‹€.
Args:
data_file: ν† ν¬λ‚˜μ΄μ¦ˆλœ 데이터 파일 경둜
model_architecture: μ‚¬μš©ν•  λͺ¨λΈ μ•„ν‚€ν…μ²˜
num_epochs: ν•™μŠ΅ 에포크 수
batch_size: ν•™μŠ΅ 배치 크기
learning_rate: ν•™μŠ΅λ₯ 
Returns:
ν•™μŠ΅ μ§€ν‘œ 및 λͺ¨λΈ 경둜λ₯Ό ν¬ν•¨ν•œ λ”•μ…”λ„ˆλ¦¬
"""
time.sleep(0.5)
return {
"status": "success",
"model_path": "/models/pretrained/model_checkpoint_epoch3",
"final_loss": 2.341,
"perplexity": 10.39,
"training_time_hours": 4.5,
"total_steps": 8340,
"best_checkpoint": "checkpoint-7800",
"gpu_hours": 36.0,
"metrics": {
"epoch_1_loss": 3.245,
"epoch_2_loss": 2.789,
"epoch_3_loss": 2.341
}
}
def create_finetuning_data(
source_data: str,
task_type: str = "classification",
num_classes: int = 5,
train_ratio: float = 0.8,
augmentation: bool = True
) -> Dict[str, Any]:
"""
λΆ„λ₯˜ μž‘μ—…μ„ μœ„ν•œ νŒŒμΈνŠœλ‹ 데이터셋을 μƒμ„±ν•©λ‹ˆλ‹€.
Args:
source_data: μ†ŒμŠ€ 데이터 경둜
task_type: μž‘μ—… μœ ν˜• (classification, regression λ“±)
num_classes: λΆ„λ₯˜ 클래슀 수
train_ratio: ν•™μŠ΅ 데이터 λΉ„μœ¨
augmentation: 데이터 증강 적용 μ—¬λΆ€
Returns:
데이터셋 톡계 및 파일 경둜λ₯Ό ν¬ν•¨ν•œ λ”•μ…”λ„ˆλ¦¬
"""
time.sleep(0.5)
return {
"status": "success",
"train_file": "/data/finetuning/train.jsonl",
"val_file": "/data/finetuning/val.jsonl",
"test_file": "/data/finetuning/test.jsonl",
"train_samples": 12456,
"val_samples": 3114,
"test_samples": 3114,
"class_distribution": {
"class_0": 2489,
"class_1": 3201,
"class_2": 2845,
"class_3": 2134,
"class_4": 1787
},
"augmentation_applied": True,
"processing_time_seconds": 8.3
}
def train_classification_model(
pretrained_model: str,
train_data: str,
val_data: str,
num_classes: int = 5,
num_epochs: int = 10,
batch_size: int = 16,
learning_rate: float = 2e-5
) -> Dict[str, Any]:
"""
νŒŒμΈνŠœλ‹ 데이터λ₯Ό μ‚¬μš©ν•˜μ—¬ λΆ„λ₯˜ λͺ¨λΈμ„ ν•™μŠ΅μ‹œν‚΅λ‹ˆλ‹€.
Args:
pretrained_model: μ‚¬μ „ν•™μŠ΅λœ λͺ¨λΈ 경둜
train_data: ν•™μŠ΅ 데이터 경둜
val_data: 검증 데이터 경둜
num_classes: 클래슀 수
num_epochs: ν•™μŠ΅ 에포크 수
batch_size: 배치 크기
learning_rate: ν•™μŠ΅λ₯ 
Returns:
ν•™μŠ΅ κ²°κ³Ό 및 λͺ¨λΈ 경둜λ₯Ό ν¬ν•¨ν•œ λ”•μ…”λ„ˆλ¦¬
"""
time.sleep(0.5)
return {
"status": "success",
"model_path": "/models/finetuned/classification_model",
"best_checkpoint": "checkpoint-epoch8",
"final_train_loss": 0.234,
"final_val_loss": 0.312,
"best_val_accuracy": 0.923,
"training_time_hours": 1.2,
"total_steps": 7785,
"early_stopping_epoch": 8,
"metrics_per_epoch": {
"epoch_1": {"train_loss": 0.892, "val_loss": 0.845, "val_acc": 0.712},
"epoch_5": {"train_loss": 0.345, "val_loss": 0.389, "val_acc": 0.887},
"epoch_8": {"train_loss": 0.234, "val_loss": 0.312, "val_acc": 0.923}
}
}
def evaluate_model(
model_path: str,
test_data: str,
metrics: List[str] = None
) -> Dict[str, Any]:
"""
ν…ŒμŠ€νŠΈ λ°μ΄ν„°λ‘œ ν•™μŠ΅λœ λͺ¨λΈμ„ 쒅합적인 μ§€ν‘œλ‘œ ν‰κ°€ν•©λ‹ˆλ‹€.
Args:
model_path: ν•™μŠ΅λœ λͺ¨λΈ 경둜
test_data: ν…ŒμŠ€νŠΈ 데이터 경둜
metrics: 계산할 μ§€ν‘œ λͺ©λ‘
Returns:
평가 μ§€ν‘œλ₯Ό ν¬ν•¨ν•œ λ”•μ…”λ„ˆλ¦¬
"""
time.sleep(0.5)
if metrics is None:
metrics = ["precision", "recall", "f1", "accuracy"]
return {
"status": "success",
"test_samples": 3114,
"overall_accuracy": 0.918,
"macro_precision": 0.912,
"macro_recall": 0.908,
"macro_f1": 0.910,
"weighted_precision": 0.916,
"weighted_recall": 0.918,
"weighted_f1": 0.917,
"per_class_metrics": {
"class_0": {"precision": 0.935, "recall": 0.921, "f1": 0.928, "support": 623},
"class_1": {"precision": 0.948, "recall": 0.952, "f1": 0.950, "support": 640},
"class_2": {"precision": 0.899, "recall": 0.887, "f1": 0.893, "support": 569},
"class_3": {"precision": 0.887, "recall": 0.901, "f1": 0.894, "support": 427},
"class_4": {"precision": 0.891, "recall": 0.879, "f1": 0.885, "support": 357}
},
"confusion_matrix": [
[574, 12, 18, 10, 9],
[8, 609, 11, 7, 5],
[15, 9, 505, 28, 12],
[11, 8, 22, 385, 1],
[14, 6, 18, 5, 314]
],
"inference_time_ms": 1247.5
}
data_extraction_agent = create_react_agent(
model=model,
tools=[extract_events_from_rdb],
name="data_extraction_expert",
prompt=(
"당신은 SQLκ³Ό RDB μž‘μ—…μ— νŠΉν™”λœ 데이터 μΆ”μΆœ μ „λ¬Έκ°€μž…λ‹ˆλ‹€. "
"λ°μ΄ν„°λ² μ΄μŠ€ ν…Œμ΄λΈ”μ—μ„œ 이벀트 λ ˆμ½”λ“œλ₯Ό μΆ”μΆœν•˜κ³  ν…μŠ€νŠΈ ν˜•μ‹μœΌλ‘œ λ³€ν™˜ν•˜λŠ” 역할을 ν•©λ‹ˆλ‹€. "
"ν…Œμ΄λΈ” 이름, λ‚ μ§œ λ²”μœ„, 이벀트 νƒ€μž…μ— λŒ€ν•œ λͺ…ν™•ν•œ 정보λ₯Ό μ œκ³΅ν•΄μ•Ό ν•©λ‹ˆλ‹€. "
"λ ˆμ½”λ“œ μˆ˜μ™€ 파일 크기λ₯Ό ν¬ν•¨ν•œ μΆ”μΆœ 톡계λ₯Ό λ³΄κ³ ν•˜μ„Έμš”."
)
)
pretraining_agent = create_react_agent(
model=model,
tools=[prepare_pretraining_data, pretrain_model],
name="pretraining_expert",
prompt=(
"당신은 μ–Έμ–΄λͺ¨λΈ μ‚¬μ „ν•™μŠ΅ μ „λ¬Έκ°€μž…λ‹ˆλ‹€. "
"ν† ν°ν™”λœ 데이터λ₯Ό μ€€λΉ„ν•˜κ³  λͺ¨λΈμ„ μ²˜μŒλΆ€ν„° ν•™μŠ΅μ‹œν‚€λŠ” μ±…μž„μ„ λ§‘κ³  μžˆμŠ΅λ‹ˆλ‹€. "
"Loss와 Perplexity 같은 ν•™μŠ΅ μ§€ν‘œλ₯Ό λͺ¨λ‹ˆν„°λ§ν•˜μ„Έμš”. "
"데이터 μ€€λΉ„ 및 λͺ¨λΈ ν•™μŠ΅ μ§„ν–‰ 상황에 λŒ€ν•œ μžμ„Έν•œ 톡계λ₯Ό λ³΄κ³ ν•˜μ„Έμš”. "
"ν•œ λ²ˆμ— ν•˜λ‚˜μ˜ λ„κ΅¬λ§Œ μ‚¬μš©ν•˜μ„Έμš”."
)
)
finetuning_agent = create_react_agent(
model=model,
tools=[create_finetuning_data, train_classification_model],
name="finetuning_expert",
prompt=(
"당신은 λΆ„λ₯˜ μž‘μ—…μ— νŠΉν™”λœ νŒŒμΈνŠœλ‹ μ „λ¬Έκ°€μž…λ‹ˆλ‹€. "
"κ³ ν’ˆμ§ˆμ˜ νŒŒμΈνŠœλ‹ 데이터셋을 λ§Œλ“€κ³  λΆ„λ₯˜ λͺ¨λΈμ„ ν•™μŠ΅μ‹œν‚€λŠ” 역할을 ν•©λ‹ˆλ‹€. "
"μ μ ˆν•œ 데이터 λΆ„ν• κ³Ό 클래슀 뢄포λ₯Ό 보μž₯ν•˜μ„Έμš”. "
"νŒŒμΈνŠœλ‹ κ³Όμ • μ „λ°˜μ— 걸쳐 ν•™μŠ΅ 및 검증 μ§€ν‘œλ₯Ό λͺ¨λ‹ˆν„°λ§ν•˜μ„Έμš”. "
"ν•œ λ²ˆμ— ν•˜λ‚˜μ˜ λ„κ΅¬λ§Œ μ‚¬μš©ν•˜μ„Έμš”."
)
)
evaluation_agent = create_react_agent(
model=model,
tools=[evaluate_model],
name="evaluation_expert",
prompt=(
"당신은 λΆ„λ₯˜ μ§€ν‘œμ— νŠΉν™”λœ λͺ¨λΈ 평가 μ „λ¬Έκ°€μž…λ‹ˆλ‹€. "
"Precision, Recall, F1-score, Accuracyλ₯Ό μ‚¬μš©ν•˜μ—¬ ν•™μŠ΅λœ λͺ¨λΈμ„ μ² μ €νžˆ ν‰κ°€ν•˜λŠ” 역할을 ν•©λ‹ˆλ‹€. "
"ν΄λž˜μŠ€λ³„ μ„ΈλΆ€ μ§€ν‘œμ™€ 전체 μ„±λŠ₯ 톡계λ₯Ό μ œκ³΅ν•˜μ„Έμš”. "
"Confusion matrixλ₯Ό λΆ„μ„ν•˜κ³  κ°œμ„ μ΄ ν•„μš”ν•œ μ˜μ—­μ„ νŒŒμ•…ν•˜μ„Έμš”."
)
)
workflow = create_supervisor(
[data_extraction_agent, pretraining_agent, finetuning_agent, evaluation_agent],
model=model,
prompt=(
"당신은 ML νŒŒμ΄ν”„λΌμΈ κ°λ…μžμž…λ‹ˆλ‹€. "
"μ‚¬μš©μžμ˜ μš”μ²­μ„ μ΄ν•΄ν•˜κ³  λͺ©ν‘œλ₯Ό λ‹¬μ„±ν•˜κΈ° μœ„ν•΄ ν•„μš”ν•œ μ „λ¬Έκ°€λ§Œ μ„ νƒν•˜μ„Έμš”.\n\n"
"μ‚¬μš© κ°€λŠ₯ν•œ μ „λ¬Έκ°€:\n"
"- data_extraction_expert: RDBμ—μ„œ 이벀트 데이터 μΆ”μΆœ\n"
"- pretraining_expert: 데이터 μ€€λΉ„ 및 μ–Έμ–΄λͺ¨λΈ μ‚¬μ „ν•™μŠ΅\n"
"- finetuning_expert: νŒŒμΈνŠœλ‹ 데이터 생성 및 λΆ„λ₯˜ λͺ¨λΈ ν•™μŠ΅\n"
"- evaluation_expert: λͺ¨λΈ 평가 (Precision, Recall, F1 λ“±)\n\n"
"μ‚¬μš©μžκ°€ μš”μ²­ν•œ μž‘μ—…λ§Œ μˆ˜ν–‰ν•˜κ³ , μš”μ²­ν•˜μ§€ μ•Šμ€ μΆ”κ°€ μž‘μ—…μ€ μ§„ν–‰ν•˜μ§€ λ§ˆμ„Έμš”."
)
)
ml_app = workflow.compile()