| """ |
| Timeline Manager - Manages timeline-based audio generation and blending |
| """ |
|
|
| import numpy as np |
| import torch |
| import torchaudio |
| from pathlib import Path |
| from typing import Optional, Dict, List, Any |
| import json |
| import logging |
| from datetime import datetime |
| import matplotlib.pyplot as plt |
| import matplotlib |
| matplotlib.use('Agg') |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class TimelineManager: |
| """Manages audio timeline with seamless blending.""" |
| |
| def __init__(self, config: Dict[str, Any]): |
| """ |
| Initialize timeline manager. |
| |
| Args: |
| config: Configuration dictionary |
| """ |
| self.config = config |
| self.sample_rate = config.get("sample_rate", 44100) |
| self.timelines = {} |
| self.timeline_dir = Path(config.get("timeline_dir", "timelines")) |
| self.timeline_dir.mkdir(exist_ok=True) |
| |
| def create_timeline(self) -> str: |
| """ |
| Create new timeline. |
| |
| Returns: |
| Timeline ID |
| """ |
| timeline_id = datetime.now().strftime("%Y%m%d_%H%M%S_%f") |
| |
| self.timelines[timeline_id] = { |
| "id": timeline_id, |
| "clips": [], |
| "audio": None, |
| "metadata": [], |
| "created_at": datetime.now().isoformat() |
| } |
| |
| logger.info(f"Created timeline: {timeline_id}") |
| return timeline_id |
| |
| def add_clip( |
| self, |
| timeline_id: Optional[str], |
| clip_path: str, |
| metadata: Dict[str, Any] |
| ) -> str: |
| """ |
| Add clip to timeline. |
| |
| Args: |
| timeline_id: Timeline ID (creates new if None) |
| clip_path: Path to audio clip |
| metadata: Clip metadata |
| |
| Returns: |
| Timeline ID |
| """ |
| try: |
| |
| if timeline_id is None or timeline_id not in self.timelines: |
| timeline_id = self.create_timeline() |
| |
| timeline = self.timelines[timeline_id] |
| |
| |
| clip_audio, sr = torchaudio.load(clip_path) |
| |
| |
| if sr != self.sample_rate: |
| resampler = torchaudio.transforms.Resample(sr, self.sample_rate) |
| clip_audio = resampler(clip_audio) |
| |
| |
| clip_np = clip_audio.numpy() |
| |
| |
| if timeline["audio"] is None: |
| |
| timeline["audio"] = clip_np |
| else: |
| |
| timeline["audio"] = np.concatenate([timeline["audio"], clip_np], axis=1) |
| |
| |
| clip_info = { |
| "index": len(timeline["clips"]), |
| "path": clip_path, |
| "duration": clip_np.shape[1] / self.sample_rate, |
| "start_time": self.get_duration(timeline_id) - (clip_np.shape[1] / self.sample_rate), |
| "metadata": metadata |
| } |
| timeline["clips"].append(clip_info) |
| |
| logger.info(f"Added clip to timeline {timeline_id}: {clip_info['duration']:.2f}s") |
| |
| return timeline_id |
| |
| except Exception as e: |
| logger.error(f"Failed to add clip: {e}") |
| raise |
| |
| def get_context( |
| self, |
| timeline_id: Optional[str], |
| context_length: int |
| ) -> Optional[np.ndarray]: |
| """ |
| Get context audio from timeline. |
| |
| Args: |
| timeline_id: Timeline ID |
| context_length: Length in seconds to retrieve |
| |
| Returns: |
| Context audio as numpy array or None |
| """ |
| if timeline_id is None or timeline_id not in self.timelines: |
| return None |
| |
| timeline = self.timelines[timeline_id] |
| |
| if timeline["audio"] is None: |
| return None |
| |
| |
| context_samples = int(context_length * self.sample_rate) |
| |
| |
| audio = timeline["audio"] |
| if audio.shape[1] <= context_samples: |
| return audio |
| |
| return audio[:, -context_samples:] |
| |
| def get_last_clip(self, timeline_id: Optional[str]) -> Optional[np.ndarray]: |
| """Get the last clip from timeline.""" |
| if timeline_id is None or timeline_id not in self.timelines: |
| return None |
| |
| timeline = self.timelines[timeline_id] |
| if not timeline["clips"]: |
| return None |
| |
| last_clip = timeline["clips"][-1] |
| audio, _ = torchaudio.load(last_clip["path"]) |
| return audio.numpy() |
| |
| def export_timeline(self, timeline_id: str) -> str: |
| """ |
| Export full timeline audio. |
| |
| Args: |
| timeline_id: Timeline ID |
| |
| Returns: |
| Path to exported audio file |
| """ |
| if timeline_id not in self.timelines: |
| raise ValueError(f"Timeline not found: {timeline_id}") |
| |
| timeline = self.timelines[timeline_id] |
| |
| if timeline["audio"] is None: |
| raise ValueError("Timeline is empty") |
| |
| |
| output_path = self.timeline_dir / f"timeline_{timeline_id}.wav" |
| |
| audio_tensor = torch.from_numpy(timeline["audio"]).float() |
| torchaudio.save( |
| str(output_path), |
| audio_tensor, |
| self.sample_rate, |
| encoding="PCM_S", |
| bits_per_sample=16 |
| ) |
| |
| logger.info(f"Exported timeline to {output_path}") |
| return str(output_path) |
| |
| def visualize_timeline(self, timeline_id: str) -> str: |
| """ |
| Create visualization of timeline. |
| |
| Args: |
| timeline_id: Timeline ID |
| |
| Returns: |
| Path to visualization image |
| """ |
| if timeline_id not in self.timelines: |
| raise ValueError(f"Timeline not found: {timeline_id}") |
| |
| timeline = self.timelines[timeline_id] |
| |
| if not timeline["clips"]: |
| |
| fig, ax = plt.subplots(figsize=(12, 4)) |
| ax.text(0.5, 0.5, "No clips yet", ha='center', va='center') |
| ax.set_xlim(0, 1) |
| ax.set_ylim(0, 1) |
| else: |
| |
| fig, ax = plt.subplots(figsize=(12, 4)) |
| |
| total_duration = self.get_duration(timeline_id) |
| |
| |
| for clip in timeline["clips"]: |
| start = clip["start_time"] |
| duration = clip["duration"] |
| |
| |
| rect = plt.Rectangle( |
| (start, 0.3), |
| duration, |
| 0.4, |
| facecolor='steelblue', |
| edgecolor='black', |
| linewidth=1 |
| ) |
| ax.add_patch(rect) |
| |
| |
| ax.text( |
| start + duration/2, |
| 0.5, |
| f"Clip {clip['index'] + 1}", |
| ha='center', |
| va='center', |
| fontsize=8, |
| color='white', |
| weight='bold' |
| ) |
| |
| ax.set_xlim(0, max(total_duration, 1)) |
| ax.set_ylim(0, 1) |
| ax.set_xlabel('Time (seconds)', fontsize=10) |
| ax.set_title(f'Timeline: {len(timeline["clips"])} clips, {total_duration:.1f}s total', fontsize=12) |
| ax.set_yticks([]) |
| ax.grid(True, axis='x', alpha=0.3) |
| |
| |
| viz_path = self.timeline_dir / f"timeline_{timeline_id}_viz.png" |
| plt.tight_layout() |
| plt.savefig(viz_path, dpi=100, bbox_inches='tight') |
| plt.close() |
| |
| return str(viz_path) |
| |
| def get_duration(self, timeline_id: str) -> float: |
| """Get total duration of timeline in seconds.""" |
| if timeline_id not in self.timelines: |
| return 0.0 |
| |
| timeline = self.timelines[timeline_id] |
| if timeline["audio"] is None: |
| return 0.0 |
| |
| return timeline["audio"].shape[1] / self.sample_rate |
| |
| def inpaint_region( |
| self, |
| timeline_id: str, |
| start_time: float, |
| end_time: float, |
| new_prompt: str |
| ) -> str: |
| """ |
| Inpaint specific region in timeline. |
| |
| Args: |
| timeline_id: Timeline ID |
| start_time: Start time in seconds |
| end_time: End time in seconds |
| new_prompt: Prompt for new content |
| |
| Returns: |
| Path to updated timeline audio |
| """ |
| if timeline_id not in self.timelines: |
| raise ValueError(f"Timeline not found: {timeline_id}") |
| |
| |
| |
| logger.info(f"Inpainting {start_time:.1f}s-{end_time:.1f}s in timeline {timeline_id}") |
| |
| |
| return self.export_timeline(timeline_id) |
| |
| def delete_timeline(self, timeline_id: str): |
| """Delete timeline and associated files.""" |
| if timeline_id in self.timelines: |
| del self.timelines[timeline_id] |
| logger.info(f"Deleted timeline: {timeline_id}") |
| |
| def save_timeline_state(self, timeline_id: str): |
| """Save timeline state to disk.""" |
| if timeline_id not in self.timelines: |
| return |
| |
| timeline = self.timelines[timeline_id] |
| |
| |
| metadata_path = self.timeline_dir / f"timeline_{timeline_id}_metadata.json" |
| metadata = { |
| "id": timeline["id"], |
| "clips": timeline["clips"], |
| "created_at": timeline["created_at"], |
| "duration": self.get_duration(timeline_id) |
| } |
| |
| with open(metadata_path, 'w') as f: |
| json.dump(metadata, f, indent=2) |
| |
| |
| if timeline["audio"] is not None: |
| self.export_timeline(timeline_id) |
| |
| logger.info(f"Saved timeline state: {timeline_id}") |
| |
| def load_timeline_state(self, timeline_id: str) -> bool: |
| """Load timeline state from disk.""" |
| metadata_path = self.timeline_dir / f"timeline_{timeline_id}_metadata.json" |
| audio_path = self.timeline_dir / f"timeline_{timeline_id}.wav" |
| |
| if not metadata_path.exists(): |
| return False |
| |
| try: |
| |
| with open(metadata_path, 'r') as f: |
| metadata = json.load(f) |
| |
| |
| audio = None |
| if audio_path.exists(): |
| audio_tensor, _ = torchaudio.load(str(audio_path)) |
| audio = audio_tensor.numpy() |
| |
| |
| self.timelines[timeline_id] = { |
| "id": timeline_id, |
| "clips": metadata["clips"], |
| "audio": audio, |
| "metadata": [], |
| "created_at": metadata["created_at"] |
| } |
| |
| logger.info(f"Loaded timeline state: {timeline_id}") |
| return True |
| |
| except Exception as e: |
| logger.error(f"Failed to load timeline: {e}") |
| return False |
|
|