from typing import List, Dict, Optional, Union, Any import json from dataclasses import dataclass, field from enum import Enum from lpm_kernel.api.domains.trainprocess.progress_enum import Status class TrainProgress: def __init__(self): # Define the complete data structure directly in the format matching the desired JSON output self.data = { "stages": [ { "name": "Downloading the Base Model", "progress": 0.0, "status": "pending", "current_step": None, "steps": [ { "name": "Model Download", "completed": False, "status": "pending", "have_output": False, "path": None } ] }, { "name": "Activating the Memory Matrix", "progress": 0.0, "status": "pending", "current_step": None, "steps": [ { "name": "List Documents", "completed": False, "status": "pending", "have_output": False, "path": None }, { "name": "Generate Document Embeddings", "completed": False, "status": "pending", "have_output": False, "path": None }, { "name": "Process Chunks", "completed": False, "status": "pending", "have_output": False, "path": None }, { "name": "Chunk Embedding", "completed": False, "status": "pending", "have_output": False, "path": None } ] }, { "name": "Synthesize Your Life Narrative", "progress": 0.0, "status": "pending", "current_step": None, "steps": [ { "name": "Extract Dimensional Topics", "completed": False, "status": "pending", "have_output": True, "path": "resources/L2/data_pipeline/raw_data/topics.json" }, { "name": "Generate Biography", "completed": False, "status": "pending", "have_output": True, "path": "From database" }, { "name": "Map Your Entity Network", "completed": False, "status": "pending", "have_output": True, "path": "resources/L1/graphrag_indexing_output/subjective/entities.parquet" } ] }, { "name": "Prepare Training Data for Deep Comprehension", "progress": 0.0, "status": "pending", "current_step": None, "steps": [ { "name": "Decode Preference Patterns", "completed": False, "status": "pending", "have_output": True, "path": "resources/L2/data/preference.json" }, { "name": "Reinforce Identity", "completed": False, "status": "pending", "have_output": True, "path": "resources/L2/data/selfqa.json" }, { "name": "Augment Content Retention", "completed": False, "status": "pending", "have_output": True, "path": "resources/L2/data/diversity.json" } ] }, { "name": "Training to create Second Me", "progress": 0.0, "status": "pending", "current_step": None, "steps": [ { "name": "Train", "completed": False, "status": "pending", "have_output": False, "path": None }, { "name": "Merge Weights", "completed": False, "status": "pending", "have_output": False, "path": None }, { "name": "Convert Model", "completed": False, "status": "pending", "have_output": False, "path": None } ] } ], "overall_progress": 0.0, "current_stage": None, "status": "pending" } # Create stage name to stage data mapping self.stage_map = {} for stage in self.data["stages"]: stage_name = stage["name"].lower().replace(" ", "_") self.stage_map[stage_name] = stage # Create step name to step data mapping for each stage self.steps_map = {} for stage_name, stage in self.stage_map.items(): self.steps_map[stage_name] = {} for step in stage["steps"]: step_name = step["name"].lower().replace(" ", "_") self.steps_map[stage_name][step_name] = step def update_progress(self, stage: str, step: str, currentStepStatus: Union[Status, str], stageProgress: Optional[float] = None): """Update progress status Args: stage: Stage key (snake_case format) step: Step key (snake_case format) currentStepStatus: Status (enum or string) stageProgress: Optional progress value (0-100) """ stage_data = self.stage_map[stage] status_value = currentStepStatus.value if isinstance(currentStepStatus, Status) else currentStepStatus step_data = self.steps_map[stage][step] # Update step status step_data["status"] = status_value step_data["completed"] = status_value == "completed" # Update stage progress self._update_stage_progress(stage_data, stageProgress) # Update stage status and current step self._update_stage_status(stage_data, step_data) # Update overall progress self._update_overall_progress() # Update overall status self._update_overall_status() def _update_stage_progress(self, stage_data: Dict, stageProgress: Optional[float] = None): """Update the progress of a stage Args: stage_data: Stage data dictionary stageProgress: Optional progress value (0-100) """ if stageProgress is not None: stage_data["progress"] = stageProgress else: completed_steps = sum(1 for s in stage_data["steps"] if s["completed"]) total_steps = len(stage_data["steps"]) stage_data["progress"] = (completed_steps / total_steps) * 100.0 def _update_stage_status(self, stage_data: Dict, step_data: Dict): """Update the status and current step of a stage Args: stage_data: Stage data dictionary step_data: Step data dictionary """ if all(step["completed"] for step in stage_data["steps"]): stage_data["status"] = "completed" stage_data["current_step"] = None next_stage = None for stage_name, stage_info in self.stage_map.items(): if stage_info["status"] != "completed": next_stage = stage_name break self.data["current_stage"] = next_stage elif any(step["status"] == "failed" for step in stage_data["steps"]): stage_data["status"] = "failed" stage_data["current_step"] = step_data["name"] self.data["current_stage"] = stage_data["name"] elif any(step["status"] == "suspended" for step in stage_data["steps"]): stage_data["status"] = "suspended" stage_data["current_step"] = step_data["name"] self.data["current_stage"] = stage_data["name"] else: stage_data["status"] = "in_progress" stage_data["current_step"] = step_data["name"] self.data["current_stage"] = stage_data["name"] def _update_overall_progress(self): """Update the overall progress based on all stages""" completed_progress = sum(s["progress"] for s in self.data["stages"]) self.data["overall_progress"] = completed_progress / len(self.data["stages"]) def _update_overall_status(self): """Update the overall status based on all stages""" if all(s["status"] == "completed" for s in self.data["stages"]): self.data["status"] = "completed" elif any(s["status"] == "failed" for s in self.data["stages"]): self.data["status"] = "failed" elif any(s["status"] == "suspended" for s in self.data["stages"]): self.data["status"] = "suspended" elif any(s["status"] == "in_progress" for s in self.data["stages"]): self.data["status"] = "in_progress" else: self.data["status"] = "pending" def to_dict(self) -> dict: """Convert progress status to dictionary format""" return self.data def reset(self): """Reset all progress statuses""" self.__init__()