Spaces:
Running
on
Zero
Running
on
Zero
| """ | |
| Processing metadata logging utility. | |
| Tracks and logs processing metadata for all workflows including timing, | |
| resource usage, and processing statistics. | |
| """ | |
| import json | |
| import logging | |
| import os | |
| import time | |
| from dataclasses import asdict, dataclass, field | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Any, Dict, Optional | |
| import psutil | |
| logger = logging.getLogger(__name__) | |
| class ProcessingMetadata: | |
| """ | |
| Metadata for a processing job. | |
| Tracks timing, resource usage, and processing statistics. | |
| """ | |
| # Job identification | |
| job_id: str | |
| workflow: str # 'separation', 'extraction', 'denoising' | |
| timestamp: str = field(default_factory=lambda: datetime.now().isoformat()) | |
| # Input/Output | |
| input_files: list = field(default_factory=list) | |
| output_files: list = field(default_factory=list) | |
| # Timing (seconds) | |
| start_time: Optional[float] = None | |
| end_time: Optional[float] = None | |
| processing_time: Optional[float] = None | |
| # Resource usage | |
| peak_memory_mb: float = 0.0 | |
| avg_cpu_percent: float = 0.0 | |
| # Processing statistics (workflow-specific) | |
| statistics: Dict[str, Any] = field(default_factory=dict) | |
| # Configuration | |
| configuration: Dict[str, Any] = field(default_factory=dict) | |
| # Status | |
| status: str = "pending" # pending, running, completed, failed | |
| error_message: Optional[str] = None | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Convert metadata to dictionary.""" | |
| return asdict(self) | |
| def to_json(self) -> str: | |
| """Convert metadata to JSON string.""" | |
| return json.dumps(self.to_dict(), indent=2) | |
| class MetadataLogger: | |
| """ | |
| Logger for processing metadata. | |
| Tracks timing, resource usage, and statistics for processing jobs. | |
| """ | |
| def __init__(self, output_dir: Optional[Path] = None): | |
| """ | |
| Initialize metadata logger. | |
| Args: | |
| output_dir: Directory to save metadata logs (default: ./metadata_logs) | |
| """ | |
| self.output_dir = output_dir or Path("./metadata_logs") | |
| self.output_dir.mkdir(parents=True, exist_ok=True) | |
| self.current_metadata: Optional[ProcessingMetadata] = None | |
| self.process = psutil.Process(os.getpid()) | |
| # Resource tracking | |
| self._start_memory = 0.0 | |
| self._cpu_samples = [] | |
| logger.debug(f"Metadata logger initialized (output: {self.output_dir})") | |
| def start_job( | |
| self, job_id: str, workflow: str, input_files: list, configuration: Dict[str, Any] | |
| ) -> ProcessingMetadata: | |
| """ | |
| Start tracking a new processing job. | |
| Args: | |
| job_id: Unique job identifier | |
| workflow: Workflow name ('separation', 'extraction', 'denoising') | |
| input_files: List of input file paths | |
| configuration: Job configuration parameters | |
| Returns: | |
| ProcessingMetadata object for this job | |
| """ | |
| self.current_metadata = ProcessingMetadata( | |
| job_id=job_id, | |
| workflow=workflow, | |
| input_files=[str(f) for f in input_files], | |
| configuration=configuration, | |
| start_time=time.time(), | |
| status="running", | |
| ) | |
| # Initialize resource tracking | |
| self._start_memory = self.process.memory_info().rss / 1024 / 1024 # MB | |
| self._cpu_samples = [] | |
| logger.info(f"Started tracking job: {job_id} ({workflow})") | |
| return self.current_metadata | |
| def update_progress(self, statistics: Dict[str, Any]): | |
| """ | |
| Update job statistics during processing. | |
| Args: | |
| statistics: Current processing statistics | |
| """ | |
| if self.current_metadata is None: | |
| logger.warning("No active job to update") | |
| return | |
| self.current_metadata.statistics.update(statistics) | |
| # Track resources | |
| current_memory = self.process.memory_info().rss / 1024 / 1024 # MB | |
| self.current_metadata.peak_memory_mb = max( | |
| self.current_metadata.peak_memory_mb, current_memory | |
| ) | |
| # Sample CPU usage | |
| try: | |
| cpu_percent = self.process.cpu_percent(interval=0.1) | |
| self._cpu_samples.append(cpu_percent) | |
| except Exception: | |
| pass | |
| def complete_job( | |
| self, output_files: list, final_statistics: Optional[Dict[str, Any]] = None | |
| ) -> ProcessingMetadata: | |
| """ | |
| Mark job as completed and finalize metadata. | |
| Args: | |
| output_files: List of output file paths | |
| final_statistics: Final processing statistics | |
| Returns: | |
| Completed ProcessingMetadata object | |
| """ | |
| if self.current_metadata is None: | |
| raise ValueError("No active job to complete") | |
| self.current_metadata.end_time = time.time() | |
| self.current_metadata.processing_time = ( | |
| self.current_metadata.end_time - self.current_metadata.start_time | |
| ) | |
| self.current_metadata.output_files = [str(f) for f in output_files] | |
| self.current_metadata.status = "completed" | |
| # Update final statistics | |
| if final_statistics: | |
| self.current_metadata.statistics.update(final_statistics) | |
| # Calculate average CPU usage | |
| if self._cpu_samples: | |
| self.current_metadata.avg_cpu_percent = sum(self._cpu_samples) / len(self._cpu_samples) | |
| # Save metadata | |
| self._save_metadata() | |
| logger.info( | |
| f"Completed job: {self.current_metadata.job_id} " | |
| f"(time: {self.current_metadata.processing_time:.2f}s, " | |
| f"memory: {self.current_metadata.peak_memory_mb:.2f}MB)" | |
| ) | |
| completed_metadata = self.current_metadata | |
| self.current_metadata = None | |
| return completed_metadata | |
| def fail_job(self, error_message: str) -> ProcessingMetadata: | |
| """ | |
| Mark job as failed. | |
| Args: | |
| error_message: Error description | |
| Returns: | |
| Failed ProcessingMetadata object | |
| """ | |
| if self.current_metadata is None: | |
| raise ValueError("No active job to fail") | |
| self.current_metadata.end_time = time.time() | |
| self.current_metadata.processing_time = ( | |
| self.current_metadata.end_time - self.current_metadata.start_time | |
| ) | |
| self.current_metadata.status = "failed" | |
| self.current_metadata.error_message = error_message | |
| # Save metadata | |
| self._save_metadata() | |
| logger.error(f"Failed job: {self.current_metadata.job_id} - {error_message}") | |
| failed_metadata = self.current_metadata | |
| self.current_metadata = None | |
| return failed_metadata | |
| def _save_metadata(self): | |
| """Save metadata to file.""" | |
| if self.current_metadata is None: | |
| return | |
| try: | |
| # Create filename from job ID and timestamp | |
| filename = f"{self.current_metadata.workflow}_{self.current_metadata.job_id}.json" | |
| filepath = self.output_dir / filename | |
| # Write metadata | |
| with open(filepath, "w") as f: | |
| f.write(self.current_metadata.to_json()) | |
| logger.debug(f"Saved metadata: {filepath}") | |
| except Exception as e: | |
| logger.error(f"Failed to save metadata: {e}") | |
| def get_job_history(self, workflow: Optional[str] = None) -> list: | |
| """ | |
| Get processing history for completed jobs. | |
| Args: | |
| workflow: Filter by workflow name (None = all workflows) | |
| Returns: | |
| List of ProcessingMetadata dictionaries | |
| """ | |
| history = [] | |
| try: | |
| for metadata_file in self.output_dir.glob("*.json"): | |
| # Filter by workflow if specified | |
| if workflow and not metadata_file.stem.startswith(workflow): | |
| continue | |
| with open(metadata_file) as f: | |
| metadata = json.load(f) | |
| history.append(metadata) | |
| # Sort by timestamp (newest first) | |
| history.sort(key=lambda x: x.get("timestamp", ""), reverse=True) | |
| except Exception as e: | |
| logger.error(f"Failed to load job history: {e}") | |
| return history | |
| def get_statistics_summary(self, workflow: str) -> Dict[str, Any]: | |
| """ | |
| Get aggregated statistics for a workflow. | |
| Args: | |
| workflow: Workflow name | |
| Returns: | |
| Dictionary with aggregated statistics | |
| """ | |
| history = self.get_job_history(workflow=workflow) | |
| if not history: | |
| return { | |
| "total_jobs": 0, | |
| "completed_jobs": 0, | |
| "failed_jobs": 0, | |
| } | |
| completed = [j for j in history if j["status"] == "completed"] | |
| failed = [j for j in history if j["status"] == "failed"] | |
| summary = { | |
| "total_jobs": len(history), | |
| "completed_jobs": len(completed), | |
| "failed_jobs": len(failed), | |
| "success_rate": len(completed) / len(history) if history else 0.0, | |
| } | |
| if completed: | |
| processing_times = [j["processing_time"] for j in completed if j.get("processing_time")] | |
| memory_usage = [j["peak_memory_mb"] for j in completed if j.get("peak_memory_mb")] | |
| if processing_times: | |
| summary["avg_processing_time"] = sum(processing_times) / len(processing_times) | |
| summary["min_processing_time"] = min(processing_times) | |
| summary["max_processing_time"] = max(processing_times) | |
| if memory_usage: | |
| summary["avg_memory_mb"] = sum(memory_usage) / len(memory_usage) | |
| summary["peak_memory_mb"] = max(memory_usage) | |
| return summary | |
| # Global metadata logger instance | |
| _global_logger: Optional[MetadataLogger] = None | |
| def get_metadata_logger(output_dir: Optional[Path] = None) -> MetadataLogger: | |
| """ | |
| Get global metadata logger instance. | |
| Args: | |
| output_dir: Directory to save metadata logs | |
| Returns: | |
| MetadataLogger instance | |
| """ | |
| global _global_logger | |
| if _global_logger is None: | |
| _global_logger = MetadataLogger(output_dir=output_dir) | |
| return _global_logger | |