""" Timeline Manager - Manages timeline-based audio generation and blending """ import numpy as np import torch import torchaudio from pathlib import Path from typing import Optional, Dict, List, Any import json import logging from datetime import datetime import matplotlib.pyplot as plt import matplotlib matplotlib.use('Agg') # Non-interactive backend logger = logging.getLogger(__name__) class TimelineManager: """Manages audio timeline with seamless blending.""" def __init__(self, config: Dict[str, Any]): """ Initialize timeline manager. Args: config: Configuration dictionary """ self.config = config self.sample_rate = config.get("sample_rate", 44100) self.timelines = {} # Store active timelines self.timeline_dir = Path(config.get("timeline_dir", "timelines")) self.timeline_dir.mkdir(exist_ok=True) def create_timeline(self) -> str: """ Create new timeline. Returns: Timeline ID """ timeline_id = datetime.now().strftime("%Y%m%d_%H%M%S_%f") self.timelines[timeline_id] = { "id": timeline_id, "clips": [], "audio": None, "metadata": [], "created_at": datetime.now().isoformat() } logger.info(f"Created timeline: {timeline_id}") return timeline_id def add_clip( self, timeline_id: Optional[str], clip_path: str, metadata: Dict[str, Any] ) -> str: """ Add clip to timeline. Args: timeline_id: Timeline ID (creates new if None) clip_path: Path to audio clip metadata: Clip metadata Returns: Timeline ID """ try: # Create timeline if doesn't exist if timeline_id is None or timeline_id not in self.timelines: timeline_id = self.create_timeline() timeline = self.timelines[timeline_id] # Load clip clip_audio, sr = torchaudio.load(clip_path) # Resample if needed if sr != self.sample_rate: resampler = torchaudio.transforms.Resample(sr, self.sample_rate) clip_audio = resampler(clip_audio) # Convert to numpy clip_np = clip_audio.numpy() # Add to timeline if timeline["audio"] is None: # First clip timeline["audio"] = clip_np else: # Concatenate with existing audio timeline["audio"] = np.concatenate([timeline["audio"], clip_np], axis=1) # Store metadata clip_info = { "index": len(timeline["clips"]), "path": clip_path, "duration": clip_np.shape[1] / self.sample_rate, "start_time": self.get_duration(timeline_id) - (clip_np.shape[1] / self.sample_rate), "metadata": metadata } timeline["clips"].append(clip_info) logger.info(f"Added clip to timeline {timeline_id}: {clip_info['duration']:.2f}s") return timeline_id except Exception as e: logger.error(f"Failed to add clip: {e}") raise def get_context( self, timeline_id: Optional[str], context_length: int ) -> Optional[np.ndarray]: """ Get context audio from timeline. Args: timeline_id: Timeline ID context_length: Length in seconds to retrieve Returns: Context audio as numpy array or None """ if timeline_id is None or timeline_id not in self.timelines: return None timeline = self.timelines[timeline_id] if timeline["audio"] is None: return None # Calculate number of samples context_samples = int(context_length * self.sample_rate) # Get last N samples audio = timeline["audio"] if audio.shape[1] <= context_samples: return audio return audio[:, -context_samples:] def get_last_clip(self, timeline_id: Optional[str]) -> Optional[np.ndarray]: """Get the last clip from timeline.""" if timeline_id is None or timeline_id not in self.timelines: return None timeline = self.timelines[timeline_id] if not timeline["clips"]: return None last_clip = timeline["clips"][-1] audio, _ = torchaudio.load(last_clip["path"]) return audio.numpy() def export_timeline(self, timeline_id: str) -> str: """ Export full timeline audio. Args: timeline_id: Timeline ID Returns: Path to exported audio file """ if timeline_id not in self.timelines: raise ValueError(f"Timeline not found: {timeline_id}") timeline = self.timelines[timeline_id] if timeline["audio"] is None: raise ValueError("Timeline is empty") # Save to file output_path = self.timeline_dir / f"timeline_{timeline_id}.wav" audio_tensor = torch.from_numpy(timeline["audio"]).float() torchaudio.save( str(output_path), audio_tensor, self.sample_rate, encoding="PCM_S", bits_per_sample=16 ) logger.info(f"Exported timeline to {output_path}") return str(output_path) def visualize_timeline(self, timeline_id: str) -> str: """ Create visualization of timeline. Args: timeline_id: Timeline ID Returns: Path to visualization image """ if timeline_id not in self.timelines: raise ValueError(f"Timeline not found: {timeline_id}") timeline = self.timelines[timeline_id] if not timeline["clips"]: # Create empty visualization fig, ax = plt.subplots(figsize=(12, 4)) ax.text(0.5, 0.5, "No clips yet", ha='center', va='center') ax.set_xlim(0, 1) ax.set_ylim(0, 1) else: # Create timeline visualization fig, ax = plt.subplots(figsize=(12, 4)) total_duration = self.get_duration(timeline_id) # Draw each clip for clip in timeline["clips"]: start = clip["start_time"] duration = clip["duration"] # Draw clip rectangle rect = plt.Rectangle( (start, 0.3), duration, 0.4, facecolor='steelblue', edgecolor='black', linewidth=1 ) ax.add_patch(rect) # Add clip label ax.text( start + duration/2, 0.5, f"Clip {clip['index'] + 1}", ha='center', va='center', fontsize=8, color='white', weight='bold' ) ax.set_xlim(0, max(total_duration, 1)) ax.set_ylim(0, 1) ax.set_xlabel('Time (seconds)', fontsize=10) ax.set_title(f'Timeline: {len(timeline["clips"])} clips, {total_duration:.1f}s total', fontsize=12) ax.set_yticks([]) ax.grid(True, axis='x', alpha=0.3) # Save visualization viz_path = self.timeline_dir / f"timeline_{timeline_id}_viz.png" plt.tight_layout() plt.savefig(viz_path, dpi=100, bbox_inches='tight') plt.close() return str(viz_path) def get_duration(self, timeline_id: str) -> float: """Get total duration of timeline in seconds.""" if timeline_id not in self.timelines: return 0.0 timeline = self.timelines[timeline_id] if timeline["audio"] is None: return 0.0 return timeline["audio"].shape[1] / self.sample_rate def inpaint_region( self, timeline_id: str, start_time: float, end_time: float, new_prompt: str ) -> str: """ Inpaint specific region in timeline. Args: timeline_id: Timeline ID start_time: Start time in seconds end_time: End time in seconds new_prompt: Prompt for new content Returns: Path to updated timeline audio """ if timeline_id not in self.timelines: raise ValueError(f"Timeline not found: {timeline_id}") # This would integrate with ACE-Step engine for actual inpainting # For now, this is a placeholder logger.info(f"Inpainting {start_time:.1f}s-{end_time:.1f}s in timeline {timeline_id}") # Export current state return self.export_timeline(timeline_id) def delete_timeline(self, timeline_id: str): """Delete timeline and associated files.""" if timeline_id in self.timelines: del self.timelines[timeline_id] logger.info(f"Deleted timeline: {timeline_id}") def save_timeline_state(self, timeline_id: str): """Save timeline state to disk.""" if timeline_id not in self.timelines: return timeline = self.timelines[timeline_id] # Save metadata metadata_path = self.timeline_dir / f"timeline_{timeline_id}_metadata.json" metadata = { "id": timeline["id"], "clips": timeline["clips"], "created_at": timeline["created_at"], "duration": self.get_duration(timeline_id) } with open(metadata_path, 'w') as f: json.dump(metadata, f, indent=2) # Export audio if timeline["audio"] is not None: self.export_timeline(timeline_id) logger.info(f"Saved timeline state: {timeline_id}") def load_timeline_state(self, timeline_id: str) -> bool: """Load timeline state from disk.""" metadata_path = self.timeline_dir / f"timeline_{timeline_id}_metadata.json" audio_path = self.timeline_dir / f"timeline_{timeline_id}.wav" if not metadata_path.exists(): return False try: # Load metadata with open(metadata_path, 'r') as f: metadata = json.load(f) # Load audio if exists audio = None if audio_path.exists(): audio_tensor, _ = torchaudio.load(str(audio_path)) audio = audio_tensor.numpy() # Restore timeline self.timelines[timeline_id] = { "id": timeline_id, "clips": metadata["clips"], "audio": audio, "metadata": [], "created_at": metadata["created_at"] } logger.info(f"Loaded timeline state: {timeline_id}") return True except Exception as e: logger.error(f"Failed to load timeline: {e}") return False