Spaces:
Sleeping
Sleeping
| """ | |
| Progress Tracker - Tracks knowledge base initialization progress | |
| """ | |
| import asyncio | |
| from collections.abc import Callable | |
| from datetime import datetime | |
| from enum import Enum | |
| import json | |
| from pathlib import Path | |
| import logging | |
| # Use unified logging system | |
| from deeptutor.logging import get_logger | |
| _logger = get_logger("KnowledgeInit") | |
| def _get_logger(): | |
| return _logger | |
| class ProgressStage(Enum): | |
| """Initialization stage""" | |
| INITIALIZING = "initializing" # Initializing | |
| PROCESSING_DOCUMENTS = "processing_documents" # Processing documents | |
| PROCESSING_FILE = "processing_file" # Processing single file | |
| EXTRACTING_ITEMS = "extracting_items" # Extracting numbered items | |
| COMPLETED = "completed" # Completed | |
| ERROR = "error" # Error | |
| class ProgressTracker: | |
| """Progress tracker""" | |
| def __init__(self, kb_name: str, base_dir: Path): | |
| self.kb_name = kb_name | |
| self.base_dir = base_dir | |
| self.kb_dir = base_dir / kb_name | |
| self.progress_file = self.kb_dir / ".progress.json" | |
| self._callbacks: list = [] # Support multiple callbacks | |
| self.task_id: str | None = None # Task ID (for log identification) | |
| def set_callback(self, callback: Callable[[dict], None]): | |
| """Set progress callback function (can be called multiple times to add multiple callbacks)""" | |
| if callback not in self._callbacks: | |
| self._callbacks.append(callback) | |
| def remove_callback(self, callback: Callable[[dict], None]): | |
| """Remove progress callback function""" | |
| if callback in self._callbacks: | |
| self._callbacks.remove(callback) | |
| def _notify(self, progress: dict): | |
| """Notify progress update (call all callbacks)""" | |
| from deeptutor.runtime.mode import is_server | |
| if is_server(): | |
| try: | |
| from deeptutor.api.utils.progress_broadcaster import ProgressBroadcaster | |
| broadcaster = ProgressBroadcaster.get_instance() | |
| try: | |
| asyncio.get_running_loop() | |
| asyncio.create_task(broadcaster.broadcast(self.kb_name, progress)) | |
| except RuntimeError: | |
| try: | |
| loop = asyncio.get_event_loop() | |
| if loop.is_running(): | |
| asyncio.create_task(broadcaster.broadcast(self.kb_name, progress)) | |
| except RuntimeError: | |
| pass | |
| except (ImportError, Exception): | |
| pass | |
| for callback in self._callbacks: | |
| try: | |
| callback(progress) | |
| except Exception as e: | |
| _get_logger().debug("Progress callback error: %s", e) | |
| def _save_progress(self, progress: dict): | |
| """Save progress to kb_config.json and local .progress.json file""" | |
| # Save to kb_config.json (centralized config) | |
| try: | |
| from deeptutor.knowledge.manager import KnowledgeBaseManager | |
| manager = KnowledgeBaseManager(base_dir=str(self.base_dir)) | |
| # Determine status based on stage | |
| stage = progress.get("stage", "") | |
| if stage == "completed": | |
| status = "ready" | |
| elif stage == "error": | |
| status = "error" | |
| elif stage in [ | |
| "initializing", | |
| "processing_documents", | |
| "processing_file", | |
| "extracting_items", | |
| ]: | |
| status = "processing" | |
| else: | |
| status = "initializing" | |
| # Update kb_config.json with status and progress | |
| manager.update_kb_status( | |
| name=self.kb_name, | |
| status=status, | |
| progress={ | |
| "stage": progress.get("stage"), | |
| "message": progress.get("message"), | |
| "percent": progress.get("progress_percent", 0), | |
| "current": progress.get("current", 0), | |
| "total": progress.get("total", 0), | |
| "file_name": progress.get("file_name"), | |
| "error": progress.get("error"), | |
| "timestamp": progress.get("timestamp"), | |
| "task_id": progress.get("task_id"), | |
| }, | |
| ) | |
| except Exception as e: | |
| _get_logger().warning("Failed to save progress to kb_config.json: %s", e) | |
| def update( | |
| self, | |
| stage: ProgressStage, | |
| message: str = "", | |
| current: int = 0, | |
| total: int = 0, | |
| file_name: str = "", | |
| error: str | None = None, | |
| ): | |
| """Update progress""" | |
| progress = { | |
| "kb_name": self.kb_name, | |
| "task_id": self.task_id, | |
| "stage": stage.value, | |
| "message": message, | |
| "current": current, | |
| "total": total, | |
| "file_name": file_name, | |
| "progress_percent": int(current / total * 100) if total > 0 else 0, | |
| "timestamp": datetime.now().isoformat(), | |
| } | |
| if error: | |
| progress["error"] = error | |
| progress["stage"] = ProgressStage.ERROR.value | |
| # Output to logger (terminal and log file) | |
| try: | |
| logger = _get_logger() | |
| prefix = f"[{self.task_id}]" if self.task_id else "" | |
| if total > 0: | |
| percent = progress["progress_percent"] | |
| progress_msg = f"{prefix} {message} ({current}/{total}, {percent}%)" | |
| if file_name: | |
| progress_msg += f" - File: {file_name}" | |
| else: | |
| progress_msg = f"{prefix} {message}" | |
| if file_name: | |
| progress_msg += f" - File: {file_name}" | |
| if error: | |
| logger.error(f"{progress_msg} - Error: {error}") | |
| else: | |
| logger.progress(progress_msg) | |
| except Exception: | |
| # If unified logging fails unexpectedly, use stdlib logger as fallback. | |
| fallback_logger = logging.getLogger("deeptutor.ProgressTracker") | |
| prefix = f"[{self.task_id}]" if self.task_id else "" | |
| fallback_logger.warning( | |
| "%s [ProgressTracker] %s (%s/%s)", | |
| prefix, | |
| message, | |
| current, | |
| total if total > 0 else "?", | |
| ) | |
| if error: | |
| fallback_logger.error("%s [ProgressTracker] Error: %s", prefix, error) | |
| self._save_progress(progress) | |
| self._notify(progress) | |
| def get_progress(self) -> dict | None: | |
| """Get current progress""" | |
| if not self.progress_file.exists(): | |
| return None | |
| try: | |
| with open(self.progress_file, encoding="utf-8") as f: | |
| return json.load(f) | |
| except Exception as e: | |
| _get_logger().debug(f"Failed to read progress file for '{self.kb_name}': {e}") | |
| return None | |
| def clear(self): | |
| """Clear progress file""" | |
| if self.progress_file.exists(): | |
| try: | |
| self.progress_file.unlink() | |
| except Exception as e: | |
| _get_logger().debug(f"Failed to clear progress file for '{self.kb_name}': {e}") | |