| | """ |
| | Music Service |
| | |
| | High-level service for music generation operations. |
| | Abstracts all API complexity from the UI layer. |
| | """ |
| |
|
| | from typing import Callable, Optional, List |
| | from dataclasses import dataclass, field |
| |
|
| | from ..api.client import StackNetClient, MediaAction |
| |
|
| |
|
| | @dataclass |
| | class MusicClip: |
| | """Generated music clip.""" |
| | title: str |
| | audio_url: str |
| | audio_path: Optional[str] = None |
| | duration: Optional[str] = None |
| | image_url: Optional[str] = None |
| | video_url: Optional[str] = None |
| | tags: List[str] = field(default_factory=list) |
| |
|
| |
|
| | @dataclass |
| | class StemResult: |
| | """Extracted audio stems.""" |
| | vocals_path: Optional[str] = None |
| | drums_path: Optional[str] = None |
| | bass_path: Optional[str] = None |
| | other_path: Optional[str] = None |
| |
|
| |
|
| | class MusicService: |
| | """ |
| | Service for music generation and manipulation. |
| | |
| | Provides clean interfaces for: |
| | - Text-to-music generation |
| | - Cover song creation |
| | - Stem extraction |
| | """ |
| |
|
| | def __init__(self, client: Optional[StackNetClient] = None): |
| | self.client = client or StackNetClient() |
| |
|
| | async def generate_music( |
| | self, |
| | prompt: str, |
| | title: Optional[str] = None, |
| | tags: Optional[str] = None, |
| | lyrics: Optional[str] = None, |
| | instrumental: bool = False, |
| | on_progress: Optional[Callable[[float, str], None]] = None |
| | ) -> List[MusicClip]: |
| | """ |
| | Generate original music from a text prompt. |
| | |
| | Args: |
| | prompt: Description of desired music |
| | title: Optional song title |
| | tags: Optional genre/style tags (comma-separated) |
| | lyrics: Optional lyrics (ignored if instrumental=True) |
| | instrumental: Generate instrumental only |
| | on_progress: Callback for progress updates |
| | |
| | Returns: |
| | List of generated MusicClip objects |
| | """ |
| | options = {} |
| | if tags: |
| | options["tags"] = tags |
| | if title: |
| | options["title"] = title |
| | if instrumental: |
| | options["make_instrumental"] = True |
| | if lyrics and not instrumental: |
| | options["lyrics"] = lyrics |
| |
|
| | result = await self.client.submit_media_task( |
| | action=MediaAction.GENERATE_MUSIC, |
| | prompt=prompt, |
| | options=options if options else None, |
| | on_progress=on_progress |
| | ) |
| |
|
| | if not result.success: |
| | raise Exception(result.error or "Music generation failed") |
| |
|
| | return self._parse_music_result(result.data) |
| |
|
| | async def create_cover( |
| | self, |
| | audio_url: str, |
| | style_prompt: str, |
| | title: Optional[str] = None, |
| | tags: Optional[str] = None, |
| | on_progress: Optional[Callable[[float, str], None]] = None |
| | ) -> List[MusicClip]: |
| | """ |
| | Create a cover version of audio. |
| | |
| | Args: |
| | audio_url: URL to source audio |
| | style_prompt: Style/voice direction for the cover |
| | title: Optional title for the cover |
| | tags: Optional genre/style tags |
| | on_progress: Progress callback |
| | |
| | Returns: |
| | List of generated cover clips |
| | """ |
| | options = {} |
| | if tags: |
| | options["tags"] = tags |
| | if title: |
| | options["title"] = title |
| |
|
| | result = await self.client.submit_media_task( |
| | action=MediaAction.CREATE_COVER, |
| | audio_url=audio_url, |
| | prompt=style_prompt, |
| | options=options if options else None, |
| | on_progress=on_progress |
| | ) |
| |
|
| | if not result.success: |
| | raise Exception(result.error or "Cover creation failed") |
| |
|
| | return self._parse_music_result(result.data) |
| |
|
| | async def extract_stems( |
| | self, |
| | audio_url: str, |
| | on_progress: Optional[Callable[[float, str], None]] = None |
| | ) -> StemResult: |
| | """ |
| | Extract stems (vocals, drums, bass, other) from audio. |
| | |
| | Args: |
| | audio_url: URL to source audio |
| | on_progress: Progress callback |
| | |
| | Returns: |
| | StemResult with paths to each stem |
| | """ |
| | result = await self.client.submit_media_task( |
| | action=MediaAction.EXTRACT_STEMS, |
| | audio_url=audio_url, |
| | on_progress=on_progress |
| | ) |
| |
|
| | if not result.success: |
| | raise Exception(result.error or "Stem extraction failed") |
| |
|
| | stems_data = result.data.get("stems", result.data) |
| |
|
| | stem_result = StemResult() |
| |
|
| | |
| | if stems_data.get("vocals"): |
| | stem_result.vocals_path = await self.client.download_file( |
| | stems_data["vocals"], "vocals.mp3" |
| | ) |
| | if stems_data.get("drums"): |
| | stem_result.drums_path = await self.client.download_file( |
| | stems_data["drums"], "drums.mp3" |
| | ) |
| | if stems_data.get("bass"): |
| | stem_result.bass_path = await self.client.download_file( |
| | stems_data["bass"], "bass.mp3" |
| | ) |
| | if stems_data.get("other"): |
| | stem_result.other_path = await self.client.download_file( |
| | stems_data["other"], "other.mp3" |
| | ) |
| |
|
| | return stem_result |
| |
|
| | def _parse_music_result(self, data: dict) -> List[MusicClip]: |
| | """Parse API response into MusicClip objects.""" |
| | clips = [] |
| |
|
| | |
| | raw_clips = data.get("clips", []) |
| |
|
| | |
| | if not raw_clips: |
| | if data.get("audio_url") or data.get("audioUrl"): |
| | raw_clips = [data] |
| | elif data.get("url"): |
| | raw_clips = [{"audio_url": data["url"], "title": data.get("title", "Generated")}] |
| |
|
| | for clip_data in raw_clips: |
| | audio_url = clip_data.get("audio_url") or clip_data.get("audioUrl") or clip_data.get("url") |
| | if audio_url: |
| | clips.append(MusicClip( |
| | title=clip_data.get("title", "Generated Music"), |
| | audio_url=audio_url, |
| | duration=clip_data.get("duration"), |
| | image_url=clip_data.get("image_url") or clip_data.get("imageUrl"), |
| | video_url=clip_data.get("video_url") or clip_data.get("videoUrl"), |
| | tags=clip_data.get("tags", []) |
| | )) |
| |
|
| | return clips |
| |
|
| | async def download_clip(self, clip: MusicClip) -> str: |
| | """Download a clip's audio to local file.""" |
| | if clip.audio_path: |
| | return clip.audio_path |
| |
|
| | filename = f"{clip.title.replace(' ', '_')[:30]}.mp3" |
| | clip.audio_path = await self.client.download_file(clip.audio_url, filename) |
| | return clip.audio_path |
| |
|
| | def cleanup(self): |
| | """Clean up temporary files.""" |
| | self.client.cleanup() |
| |
|