Gemini
feat: add detailed logging
01d5a5d
from typing import List, Dict, Optional, Union, Any
import json
from dataclasses import dataclass, field
from enum import Enum
from lpm_kernel.api.domains.trainprocess.progress_enum import Status
class TrainProgress:
def __init__(self):
# Define the complete data structure directly in the format matching the desired JSON output
self.data = {
"stages": [
{
"name": "Downloading the Base Model",
"progress": 0.0,
"status": "pending",
"current_step": None,
"steps": [
{
"name": "Model Download",
"completed": False,
"status": "pending",
"have_output": False,
"path": None
}
]
},
{
"name": "Activating the Memory Matrix",
"progress": 0.0,
"status": "pending",
"current_step": None,
"steps": [
{
"name": "List Documents",
"completed": False,
"status": "pending",
"have_output": False,
"path": None
},
{
"name": "Generate Document Embeddings",
"completed": False,
"status": "pending",
"have_output": False,
"path": None
},
{
"name": "Process Chunks",
"completed": False,
"status": "pending",
"have_output": False,
"path": None
},
{
"name": "Chunk Embedding",
"completed": False,
"status": "pending",
"have_output": False,
"path": None
}
]
},
{
"name": "Synthesize Your Life Narrative",
"progress": 0.0,
"status": "pending",
"current_step": None,
"steps": [
{
"name": "Extract Dimensional Topics",
"completed": False,
"status": "pending",
"have_output": True,
"path": "resources/L2/data_pipeline/raw_data/topics.json"
},
{
"name": "Generate Biography",
"completed": False,
"status": "pending",
"have_output": True,
"path": "From database"
},
{
"name": "Map Your Entity Network",
"completed": False,
"status": "pending",
"have_output": True,
"path": "resources/L1/graphrag_indexing_output/subjective/entities.parquet"
}
]
},
{
"name": "Prepare Training Data for Deep Comprehension",
"progress": 0.0,
"status": "pending",
"current_step": None,
"steps": [
{
"name": "Decode Preference Patterns",
"completed": False,
"status": "pending",
"have_output": True,
"path": "resources/L2/data/preference.json"
},
{
"name": "Reinforce Identity",
"completed": False,
"status": "pending",
"have_output": True,
"path": "resources/L2/data/selfqa.json"
},
{
"name": "Augment Content Retention",
"completed": False,
"status": "pending",
"have_output": True,
"path": "resources/L2/data/diversity.json"
}
]
},
{
"name": "Training to create Second Me",
"progress": 0.0,
"status": "pending",
"current_step": None,
"steps": [
{
"name": "Train",
"completed": False,
"status": "pending",
"have_output": False,
"path": None
},
{
"name": "Merge Weights",
"completed": False,
"status": "pending",
"have_output": False,
"path": None
},
{
"name": "Convert Model",
"completed": False,
"status": "pending",
"have_output": False,
"path": None
}
]
}
],
"overall_progress": 0.0,
"current_stage": None,
"status": "pending"
}
# Create stage name to stage data mapping
self.stage_map = {}
for stage in self.data["stages"]:
stage_name = stage["name"].lower().replace(" ", "_")
self.stage_map[stage_name] = stage
# Create step name to step data mapping for each stage
self.steps_map = {}
for stage_name, stage in self.stage_map.items():
self.steps_map[stage_name] = {}
for step in stage["steps"]:
step_name = step["name"].lower().replace(" ", "_")
self.steps_map[stage_name][step_name] = step
def update_progress(self, stage: str, step: str, currentStepStatus: Union[Status, str], stageProgress: Optional[float] = None):
"""Update progress status
Args:
stage: Stage key (snake_case format)
step: Step key (snake_case format)
currentStepStatus: Status (enum or string)
stageProgress: Optional progress value (0-100)
"""
stage_data = self.stage_map[stage]
status_value = currentStepStatus.value if isinstance(currentStepStatus, Status) else currentStepStatus
step_data = self.steps_map[stage][step]
# Update step status
step_data["status"] = status_value
step_data["completed"] = status_value == "completed"
# Update stage progress
self._update_stage_progress(stage_data, stageProgress)
# Update stage status and current step
self._update_stage_status(stage_data, step_data)
# Update overall progress
self._update_overall_progress()
# Update overall status
self._update_overall_status()
def _update_stage_progress(self, stage_data: Dict, stageProgress: Optional[float] = None):
"""Update the progress of a stage
Args:
stage_data: Stage data dictionary
stageProgress: Optional progress value (0-100)
"""
if stageProgress is not None:
stage_data["progress"] = stageProgress
else:
completed_steps = sum(1 for s in stage_data["steps"] if s["completed"])
total_steps = len(stage_data["steps"])
stage_data["progress"] = (completed_steps / total_steps) * 100.0
def _update_stage_status(self, stage_data: Dict, step_data: Dict):
"""Update the status and current step of a stage
Args:
stage_data: Stage data dictionary
step_data: Step data dictionary
"""
if all(step["completed"] for step in stage_data["steps"]):
stage_data["status"] = "completed"
stage_data["current_step"] = None
next_stage = None
for stage_name, stage_info in self.stage_map.items():
if stage_info["status"] != "completed":
next_stage = stage_name
break
self.data["current_stage"] = next_stage
elif any(step["status"] == "failed" for step in stage_data["steps"]):
stage_data["status"] = "failed"
stage_data["current_step"] = step_data["name"]
self.data["current_stage"] = stage_data["name"]
elif any(step["status"] == "suspended" for step in stage_data["steps"]):
stage_data["status"] = "suspended"
stage_data["current_step"] = step_data["name"]
self.data["current_stage"] = stage_data["name"]
else:
stage_data["status"] = "in_progress"
stage_data["current_step"] = step_data["name"]
self.data["current_stage"] = stage_data["name"]
def _update_overall_progress(self):
"""Update the overall progress based on all stages"""
completed_progress = sum(s["progress"] for s in self.data["stages"])
self.data["overall_progress"] = completed_progress / len(self.data["stages"])
def _update_overall_status(self):
"""Update the overall status based on all stages"""
if all(s["status"] == "completed" for s in self.data["stages"]):
self.data["status"] = "completed"
elif any(s["status"] == "failed" for s in self.data["stages"]):
self.data["status"] = "failed"
elif any(s["status"] == "suspended" for s in self.data["stages"]):
self.data["status"] = "suspended"
elif any(s["status"] == "in_progress" for s in self.data["stages"]):
self.data["status"] = "in_progress"
else:
self.data["status"] = "pending"
def to_dict(self) -> dict:
"""Convert progress status to dictionary format"""
return self.data
def reset(self):
"""Reset all progress statuses"""
self.__init__()