Spaces:
Running
on
Zero
Running
on
Zero
| """ | |
| Timeline Manager - Manages timeline-based audio generation and blending | |
| """ | |
| import numpy as np | |
| import torch | |
| import torchaudio | |
| from pathlib import Path | |
| from typing import Optional, Dict, List, Any | |
| import json | |
| import logging | |
| from datetime import datetime | |
| import matplotlib.pyplot as plt | |
| import matplotlib | |
| matplotlib.use('Agg') # Non-interactive backend | |
| logger = logging.getLogger(__name__) | |
| class TimelineManager: | |
| """Manages audio timeline with seamless blending.""" | |
| def __init__(self, config: Dict[str, Any]): | |
| """ | |
| Initialize timeline manager. | |
| Args: | |
| config: Configuration dictionary | |
| """ | |
| self.config = config | |
| self.sample_rate = config.get("sample_rate", 44100) | |
| self.timelines = {} # Store active timelines | |
| self.timeline_dir = Path(config.get("timeline_dir", "timelines")) | |
| self.timeline_dir.mkdir(exist_ok=True) | |
| def create_timeline(self) -> str: | |
| """ | |
| Create new timeline. | |
| Returns: | |
| Timeline ID | |
| """ | |
| timeline_id = datetime.now().strftime("%Y%m%d_%H%M%S_%f") | |
| self.timelines[timeline_id] = { | |
| "id": timeline_id, | |
| "clips": [], | |
| "audio": None, | |
| "metadata": [], | |
| "created_at": datetime.now().isoformat() | |
| } | |
| logger.info(f"Created timeline: {timeline_id}") | |
| return timeline_id | |
| def add_clip( | |
| self, | |
| timeline_id: Optional[str], | |
| clip_path: str, | |
| metadata: Dict[str, Any] | |
| ) -> str: | |
| """ | |
| Add clip to timeline. | |
| Args: | |
| timeline_id: Timeline ID (creates new if None) | |
| clip_path: Path to audio clip | |
| metadata: Clip metadata | |
| Returns: | |
| Timeline ID | |
| """ | |
| try: | |
| # Create timeline if doesn't exist | |
| if timeline_id is None or timeline_id not in self.timelines: | |
| timeline_id = self.create_timeline() | |
| timeline = self.timelines[timeline_id] | |
| # Load clip | |
| clip_audio, sr = torchaudio.load(clip_path) | |
| # Resample if needed | |
| if sr != self.sample_rate: | |
| resampler = torchaudio.transforms.Resample(sr, self.sample_rate) | |
| clip_audio = resampler(clip_audio) | |
| # Convert to numpy | |
| clip_np = clip_audio.numpy() | |
| # Add to timeline | |
| if timeline["audio"] is None: | |
| # First clip | |
| timeline["audio"] = clip_np | |
| else: | |
| # Concatenate with existing audio | |
| timeline["audio"] = np.concatenate([timeline["audio"], clip_np], axis=1) | |
| # Store metadata | |
| clip_info = { | |
| "index": len(timeline["clips"]), | |
| "path": clip_path, | |
| "duration": clip_np.shape[1] / self.sample_rate, | |
| "start_time": self.get_duration(timeline_id) - (clip_np.shape[1] / self.sample_rate), | |
| "metadata": metadata | |
| } | |
| timeline["clips"].append(clip_info) | |
| logger.info(f"Added clip to timeline {timeline_id}: {clip_info['duration']:.2f}s") | |
| return timeline_id | |
| except Exception as e: | |
| logger.error(f"Failed to add clip: {e}") | |
| raise | |
| def get_context( | |
| self, | |
| timeline_id: Optional[str], | |
| context_length: int | |
| ) -> Optional[np.ndarray]: | |
| """ | |
| Get context audio from timeline. | |
| Args: | |
| timeline_id: Timeline ID | |
| context_length: Length in seconds to retrieve | |
| Returns: | |
| Context audio as numpy array or None | |
| """ | |
| if timeline_id is None or timeline_id not in self.timelines: | |
| return None | |
| timeline = self.timelines[timeline_id] | |
| if timeline["audio"] is None: | |
| return None | |
| # Calculate number of samples | |
| context_samples = int(context_length * self.sample_rate) | |
| # Get last N samples | |
| audio = timeline["audio"] | |
| if audio.shape[1] <= context_samples: | |
| return audio | |
| return audio[:, -context_samples:] | |
| def get_last_clip(self, timeline_id: Optional[str]) -> Optional[np.ndarray]: | |
| """Get the last clip from timeline.""" | |
| if timeline_id is None or timeline_id not in self.timelines: | |
| return None | |
| timeline = self.timelines[timeline_id] | |
| if not timeline["clips"]: | |
| return None | |
| last_clip = timeline["clips"][-1] | |
| audio, _ = torchaudio.load(last_clip["path"]) | |
| return audio.numpy() | |
| def export_timeline(self, timeline_id: str) -> str: | |
| """ | |
| Export full timeline audio. | |
| Args: | |
| timeline_id: Timeline ID | |
| Returns: | |
| Path to exported audio file | |
| """ | |
| if timeline_id not in self.timelines: | |
| raise ValueError(f"Timeline not found: {timeline_id}") | |
| timeline = self.timelines[timeline_id] | |
| if timeline["audio"] is None: | |
| raise ValueError("Timeline is empty") | |
| # Save to file | |
| output_path = self.timeline_dir / f"timeline_{timeline_id}.wav" | |
| audio_tensor = torch.from_numpy(timeline["audio"]).float() | |
| torchaudio.save( | |
| str(output_path), | |
| audio_tensor, | |
| self.sample_rate, | |
| encoding="PCM_S", | |
| bits_per_sample=16 | |
| ) | |
| logger.info(f"Exported timeline to {output_path}") | |
| return str(output_path) | |
| def visualize_timeline(self, timeline_id: str) -> str: | |
| """ | |
| Create visualization of timeline. | |
| Args: | |
| timeline_id: Timeline ID | |
| Returns: | |
| Path to visualization image | |
| """ | |
| if timeline_id not in self.timelines: | |
| raise ValueError(f"Timeline not found: {timeline_id}") | |
| timeline = self.timelines[timeline_id] | |
| if not timeline["clips"]: | |
| # Create empty visualization | |
| fig, ax = plt.subplots(figsize=(12, 4)) | |
| ax.text(0.5, 0.5, "No clips yet", ha='center', va='center') | |
| ax.set_xlim(0, 1) | |
| ax.set_ylim(0, 1) | |
| else: | |
| # Create timeline visualization | |
| fig, ax = plt.subplots(figsize=(12, 4)) | |
| total_duration = self.get_duration(timeline_id) | |
| # Draw each clip | |
| for clip in timeline["clips"]: | |
| start = clip["start_time"] | |
| duration = clip["duration"] | |
| # Draw clip rectangle | |
| rect = plt.Rectangle( | |
| (start, 0.3), | |
| duration, | |
| 0.4, | |
| facecolor='steelblue', | |
| edgecolor='black', | |
| linewidth=1 | |
| ) | |
| ax.add_patch(rect) | |
| # Add clip label | |
| ax.text( | |
| start + duration/2, | |
| 0.5, | |
| f"Clip {clip['index'] + 1}", | |
| ha='center', | |
| va='center', | |
| fontsize=8, | |
| color='white', | |
| weight='bold' | |
| ) | |
| ax.set_xlim(0, max(total_duration, 1)) | |
| ax.set_ylim(0, 1) | |
| ax.set_xlabel('Time (seconds)', fontsize=10) | |
| ax.set_title(f'Timeline: {len(timeline["clips"])} clips, {total_duration:.1f}s total', fontsize=12) | |
| ax.set_yticks([]) | |
| ax.grid(True, axis='x', alpha=0.3) | |
| # Save visualization | |
| viz_path = self.timeline_dir / f"timeline_{timeline_id}_viz.png" | |
| plt.tight_layout() | |
| plt.savefig(viz_path, dpi=100, bbox_inches='tight') | |
| plt.close() | |
| return str(viz_path) | |
| def get_duration(self, timeline_id: str) -> float: | |
| """Get total duration of timeline in seconds.""" | |
| if timeline_id not in self.timelines: | |
| return 0.0 | |
| timeline = self.timelines[timeline_id] | |
| if timeline["audio"] is None: | |
| return 0.0 | |
| return timeline["audio"].shape[1] / self.sample_rate | |
| def inpaint_region( | |
| self, | |
| timeline_id: str, | |
| start_time: float, | |
| end_time: float, | |
| new_prompt: str | |
| ) -> str: | |
| """ | |
| Inpaint specific region in timeline. | |
| Args: | |
| timeline_id: Timeline ID | |
| start_time: Start time in seconds | |
| end_time: End time in seconds | |
| new_prompt: Prompt for new content | |
| Returns: | |
| Path to updated timeline audio | |
| """ | |
| if timeline_id not in self.timelines: | |
| raise ValueError(f"Timeline not found: {timeline_id}") | |
| # This would integrate with ACE-Step engine for actual inpainting | |
| # For now, this is a placeholder | |
| logger.info(f"Inpainting {start_time:.1f}s-{end_time:.1f}s in timeline {timeline_id}") | |
| # Export current state | |
| return self.export_timeline(timeline_id) | |
| def delete_timeline(self, timeline_id: str): | |
| """Delete timeline and associated files.""" | |
| if timeline_id in self.timelines: | |
| del self.timelines[timeline_id] | |
| logger.info(f"Deleted timeline: {timeline_id}") | |
| def save_timeline_state(self, timeline_id: str): | |
| """Save timeline state to disk.""" | |
| if timeline_id not in self.timelines: | |
| return | |
| timeline = self.timelines[timeline_id] | |
| # Save metadata | |
| metadata_path = self.timeline_dir / f"timeline_{timeline_id}_metadata.json" | |
| metadata = { | |
| "id": timeline["id"], | |
| "clips": timeline["clips"], | |
| "created_at": timeline["created_at"], | |
| "duration": self.get_duration(timeline_id) | |
| } | |
| with open(metadata_path, 'w') as f: | |
| json.dump(metadata, f, indent=2) | |
| # Export audio | |
| if timeline["audio"] is not None: | |
| self.export_timeline(timeline_id) | |
| logger.info(f"Saved timeline state: {timeline_id}") | |
| def load_timeline_state(self, timeline_id: str) -> bool: | |
| """Load timeline state from disk.""" | |
| metadata_path = self.timeline_dir / f"timeline_{timeline_id}_metadata.json" | |
| audio_path = self.timeline_dir / f"timeline_{timeline_id}.wav" | |
| if not metadata_path.exists(): | |
| return False | |
| try: | |
| # Load metadata | |
| with open(metadata_path, 'r') as f: | |
| metadata = json.load(f) | |
| # Load audio if exists | |
| audio = None | |
| if audio_path.exists(): | |
| audio_tensor, _ = torchaudio.load(str(audio_path)) | |
| audio = audio_tensor.numpy() | |
| # Restore timeline | |
| self.timelines[timeline_id] = { | |
| "id": timeline_id, | |
| "clips": metadata["clips"], | |
| "audio": audio, | |
| "metadata": [], | |
| "created_at": metadata["created_at"] | |
| } | |
| logger.info(f"Loaded timeline state: {timeline_id}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to load timeline: {e}") | |
| return False | |