""" Music Service High-level service for music generation operations. Abstracts all API complexity from the UI layer. """ from typing import Callable, Optional, List from dataclasses import dataclass, field from ..api.client import StackNetClient, MediaAction @dataclass class MusicClip: """Generated music clip.""" title: str audio_url: str audio_path: Optional[str] = None duration: Optional[str] = None image_url: Optional[str] = None video_url: Optional[str] = None tags: List[str] = field(default_factory=list) @dataclass class StemResult: """Extracted audio stems.""" vocals_path: Optional[str] = None drums_path: Optional[str] = None bass_path: Optional[str] = None other_path: Optional[str] = None class MusicService: """ Service for music generation and manipulation. Provides clean interfaces for: - Text-to-music generation - Cover song creation - Stem extraction """ def __init__(self, client: Optional[StackNetClient] = None): self.client = client or StackNetClient() async def generate_music( self, prompt: str, title: Optional[str] = None, tags: Optional[str] = None, lyrics: Optional[str] = None, instrumental: bool = False, on_progress: Optional[Callable[[float, str], None]] = None ) -> List[MusicClip]: """ Generate original music from a text prompt. Args: prompt: Description of desired music title: Optional song title tags: Optional genre/style tags (comma-separated) lyrics: Optional lyrics (ignored if instrumental=True) instrumental: Generate instrumental only on_progress: Callback for progress updates Returns: List of generated MusicClip objects """ options = {} if tags: options["tags"] = tags if title: options["title"] = title if instrumental: options["make_instrumental"] = True if lyrics and not instrumental: options["lyrics"] = lyrics result = await self.client.submit_media_task( action=MediaAction.GENERATE_MUSIC, prompt=prompt, options=options if options else None, on_progress=on_progress ) if not result.success: raise Exception(result.error or "Music generation failed") return self._parse_music_result(result.data) async def create_cover( self, audio_url: str, style_prompt: str, title: Optional[str] = None, tags: Optional[str] = None, on_progress: Optional[Callable[[float, str], None]] = None ) -> List[MusicClip]: """ Create a cover version of audio. Args: audio_url: URL to source audio style_prompt: Style/voice direction for the cover title: Optional title for the cover tags: Optional genre/style tags on_progress: Progress callback Returns: List of generated cover clips """ options = {} if tags: options["tags"] = tags if title: options["title"] = title result = await self.client.submit_media_task( action=MediaAction.CREATE_COVER, audio_url=audio_url, prompt=style_prompt, options=options if options else None, on_progress=on_progress ) if not result.success: raise Exception(result.error or "Cover creation failed") return self._parse_music_result(result.data) async def extract_stems( self, audio_url: str, on_progress: Optional[Callable[[float, str], None]] = None ) -> StemResult: """ Extract stems (vocals, drums, bass, other) from audio. Args: audio_url: URL to source audio on_progress: Progress callback Returns: StemResult with paths to each stem """ result = await self.client.submit_media_task( action=MediaAction.EXTRACT_STEMS, audio_url=audio_url, on_progress=on_progress ) if not result.success: raise Exception(result.error or "Stem extraction failed") stems_data = result.data.get("stems", result.data) stem_result = StemResult() # Download each stem if URL provided if stems_data.get("vocals"): stem_result.vocals_path = await self.client.download_file( stems_data["vocals"], "vocals.mp3" ) if stems_data.get("drums"): stem_result.drums_path = await self.client.download_file( stems_data["drums"], "drums.mp3" ) if stems_data.get("bass"): stem_result.bass_path = await self.client.download_file( stems_data["bass"], "bass.mp3" ) if stems_data.get("other"): stem_result.other_path = await self.client.download_file( stems_data["other"], "other.mp3" ) return stem_result def _parse_music_result(self, data: dict) -> List[MusicClip]: """Parse API response into MusicClip objects.""" clips = [] # Handle various response formats raw_clips = data.get("clips", []) # If no clips array, treat the data itself as a single clip if not raw_clips: if data.get("audio_url") or data.get("audioUrl"): raw_clips = [data] elif data.get("url"): raw_clips = [{"audio_url": data["url"], "title": data.get("title", "Generated")}] for clip_data in raw_clips: audio_url = clip_data.get("audio_url") or clip_data.get("audioUrl") or clip_data.get("url") if audio_url: clips.append(MusicClip( title=clip_data.get("title", "Generated Music"), audio_url=audio_url, duration=clip_data.get("duration"), image_url=clip_data.get("image_url") or clip_data.get("imageUrl"), video_url=clip_data.get("video_url") or clip_data.get("videoUrl"), tags=clip_data.get("tags", []) )) return clips async def download_clip(self, clip: MusicClip) -> str: """Download a clip's audio to local file.""" if clip.audio_path: return clip.audio_path filename = f"{clip.title.replace(' ', '_')[:30]}.mp3" clip.audio_path = await self.client.download_file(clip.audio_url, filename) return clip.audio_path def cleanup(self): """Clean up temporary files.""" self.client.cleanup()