""" Processing metadata logging utility. Tracks and logs processing metadata for all workflows including timing, resource usage, and processing statistics. """ import json import logging import os import time from dataclasses import asdict, dataclass, field from datetime import datetime from pathlib import Path from typing import Any, Dict, Optional import psutil logger = logging.getLogger(__name__) @dataclass class ProcessingMetadata: """ Metadata for a processing job. Tracks timing, resource usage, and processing statistics. """ # Job identification job_id: str workflow: str # 'separation', 'extraction', 'denoising' timestamp: str = field(default_factory=lambda: datetime.now().isoformat()) # Input/Output input_files: list = field(default_factory=list) output_files: list = field(default_factory=list) # Timing (seconds) start_time: Optional[float] = None end_time: Optional[float] = None processing_time: Optional[float] = None # Resource usage peak_memory_mb: float = 0.0 avg_cpu_percent: float = 0.0 # Processing statistics (workflow-specific) statistics: Dict[str, Any] = field(default_factory=dict) # Configuration configuration: Dict[str, Any] = field(default_factory=dict) # Status status: str = "pending" # pending, running, completed, failed error_message: Optional[str] = None def to_dict(self) -> Dict[str, Any]: """Convert metadata to dictionary.""" return asdict(self) def to_json(self) -> str: """Convert metadata to JSON string.""" return json.dumps(self.to_dict(), indent=2) class MetadataLogger: """ Logger for processing metadata. Tracks timing, resource usage, and statistics for processing jobs. """ def __init__(self, output_dir: Optional[Path] = None): """ Initialize metadata logger. Args: output_dir: Directory to save metadata logs (default: ./metadata_logs) """ self.output_dir = output_dir or Path("./metadata_logs") self.output_dir.mkdir(parents=True, exist_ok=True) self.current_metadata: Optional[ProcessingMetadata] = None self.process = psutil.Process(os.getpid()) # Resource tracking self._start_memory = 0.0 self._cpu_samples = [] logger.debug(f"Metadata logger initialized (output: {self.output_dir})") def start_job( self, job_id: str, workflow: str, input_files: list, configuration: Dict[str, Any] ) -> ProcessingMetadata: """ Start tracking a new processing job. Args: job_id: Unique job identifier workflow: Workflow name ('separation', 'extraction', 'denoising') input_files: List of input file paths configuration: Job configuration parameters Returns: ProcessingMetadata object for this job """ self.current_metadata = ProcessingMetadata( job_id=job_id, workflow=workflow, input_files=[str(f) for f in input_files], configuration=configuration, start_time=time.time(), status="running", ) # Initialize resource tracking self._start_memory = self.process.memory_info().rss / 1024 / 1024 # MB self._cpu_samples = [] logger.info(f"Started tracking job: {job_id} ({workflow})") return self.current_metadata def update_progress(self, statistics: Dict[str, Any]): """ Update job statistics during processing. Args: statistics: Current processing statistics """ if self.current_metadata is None: logger.warning("No active job to update") return self.current_metadata.statistics.update(statistics) # Track resources current_memory = self.process.memory_info().rss / 1024 / 1024 # MB self.current_metadata.peak_memory_mb = max( self.current_metadata.peak_memory_mb, current_memory ) # Sample CPU usage try: cpu_percent = self.process.cpu_percent(interval=0.1) self._cpu_samples.append(cpu_percent) except Exception: pass def complete_job( self, output_files: list, final_statistics: Optional[Dict[str, Any]] = None ) -> ProcessingMetadata: """ Mark job as completed and finalize metadata. Args: output_files: List of output file paths final_statistics: Final processing statistics Returns: Completed ProcessingMetadata object """ if self.current_metadata is None: raise ValueError("No active job to complete") self.current_metadata.end_time = time.time() self.current_metadata.processing_time = ( self.current_metadata.end_time - self.current_metadata.start_time ) self.current_metadata.output_files = [str(f) for f in output_files] self.current_metadata.status = "completed" # Update final statistics if final_statistics: self.current_metadata.statistics.update(final_statistics) # Calculate average CPU usage if self._cpu_samples: self.current_metadata.avg_cpu_percent = sum(self._cpu_samples) / len(self._cpu_samples) # Save metadata self._save_metadata() logger.info( f"Completed job: {self.current_metadata.job_id} " f"(time: {self.current_metadata.processing_time:.2f}s, " f"memory: {self.current_metadata.peak_memory_mb:.2f}MB)" ) completed_metadata = self.current_metadata self.current_metadata = None return completed_metadata def fail_job(self, error_message: str) -> ProcessingMetadata: """ Mark job as failed. Args: error_message: Error description Returns: Failed ProcessingMetadata object """ if self.current_metadata is None: raise ValueError("No active job to fail") self.current_metadata.end_time = time.time() self.current_metadata.processing_time = ( self.current_metadata.end_time - self.current_metadata.start_time ) self.current_metadata.status = "failed" self.current_metadata.error_message = error_message # Save metadata self._save_metadata() logger.error(f"Failed job: {self.current_metadata.job_id} - {error_message}") failed_metadata = self.current_metadata self.current_metadata = None return failed_metadata def _save_metadata(self): """Save metadata to file.""" if self.current_metadata is None: return try: # Create filename from job ID and timestamp filename = f"{self.current_metadata.workflow}_{self.current_metadata.job_id}.json" filepath = self.output_dir / filename # Write metadata with open(filepath, "w") as f: f.write(self.current_metadata.to_json()) logger.debug(f"Saved metadata: {filepath}") except Exception as e: logger.error(f"Failed to save metadata: {e}") def get_job_history(self, workflow: Optional[str] = None) -> list: """ Get processing history for completed jobs. Args: workflow: Filter by workflow name (None = all workflows) Returns: List of ProcessingMetadata dictionaries """ history = [] try: for metadata_file in self.output_dir.glob("*.json"): # Filter by workflow if specified if workflow and not metadata_file.stem.startswith(workflow): continue with open(metadata_file) as f: metadata = json.load(f) history.append(metadata) # Sort by timestamp (newest first) history.sort(key=lambda x: x.get("timestamp", ""), reverse=True) except Exception as e: logger.error(f"Failed to load job history: {e}") return history def get_statistics_summary(self, workflow: str) -> Dict[str, Any]: """ Get aggregated statistics for a workflow. Args: workflow: Workflow name Returns: Dictionary with aggregated statistics """ history = self.get_job_history(workflow=workflow) if not history: return { "total_jobs": 0, "completed_jobs": 0, "failed_jobs": 0, } completed = [j for j in history if j["status"] == "completed"] failed = [j for j in history if j["status"] == "failed"] summary = { "total_jobs": len(history), "completed_jobs": len(completed), "failed_jobs": len(failed), "success_rate": len(completed) / len(history) if history else 0.0, } if completed: processing_times = [j["processing_time"] for j in completed if j.get("processing_time")] memory_usage = [j["peak_memory_mb"] for j in completed if j.get("peak_memory_mb")] if processing_times: summary["avg_processing_time"] = sum(processing_times) / len(processing_times) summary["min_processing_time"] = min(processing_times) summary["max_processing_time"] = max(processing_times) if memory_usage: summary["avg_memory_mb"] = sum(memory_usage) / len(memory_usage) summary["peak_memory_mb"] = max(memory_usage) return summary # Global metadata logger instance _global_logger: Optional[MetadataLogger] = None def get_metadata_logger(output_dir: Optional[Path] = None) -> MetadataLogger: """ Get global metadata logger instance. Args: output_dir: Directory to save metadata logs Returns: MetadataLogger instance """ global _global_logger if _global_logger is None: _global_logger = MetadataLogger(output_dir=output_dir) return _global_logger