|
|
""" |
|
|
Music Service |
|
|
|
|
|
High-level service for music generation operations. |
|
|
Abstracts all API complexity from the UI layer. |
|
|
""" |
|
|
|
|
|
from typing import Callable, Optional, List |
|
|
from dataclasses import dataclass, field |
|
|
|
|
|
from ..api.client import StackNetClient, MediaAction |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class MusicClip: |
|
|
"""Generated music clip.""" |
|
|
title: str |
|
|
audio_url: str |
|
|
audio_path: Optional[str] = None |
|
|
duration: Optional[str] = None |
|
|
image_url: Optional[str] = None |
|
|
video_url: Optional[str] = None |
|
|
tags: List[str] = field(default_factory=list) |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class StemResult: |
|
|
"""Extracted audio stems.""" |
|
|
vocals_path: Optional[str] = None |
|
|
drums_path: Optional[str] = None |
|
|
bass_path: Optional[str] = None |
|
|
other_path: Optional[str] = None |
|
|
|
|
|
|
|
|
class MusicService: |
|
|
""" |
|
|
Service for music generation and manipulation. |
|
|
|
|
|
Provides clean interfaces for: |
|
|
- Text-to-music generation |
|
|
- Cover song creation |
|
|
- Stem extraction |
|
|
""" |
|
|
|
|
|
def __init__(self, client: Optional[StackNetClient] = None): |
|
|
self.client = client or StackNetClient() |
|
|
|
|
|
async def generate_music( |
|
|
self, |
|
|
prompt: str, |
|
|
title: Optional[str] = None, |
|
|
tags: Optional[str] = None, |
|
|
lyrics: Optional[str] = None, |
|
|
instrumental: bool = False, |
|
|
on_progress: Optional[Callable[[float, str], None]] = None |
|
|
) -> List[MusicClip]: |
|
|
""" |
|
|
Generate original music from a text prompt. |
|
|
|
|
|
Args: |
|
|
prompt: Description of desired music |
|
|
title: Optional song title |
|
|
tags: Optional genre/style tags (comma-separated) |
|
|
lyrics: Optional lyrics (ignored if instrumental=True) |
|
|
instrumental: Generate instrumental only |
|
|
on_progress: Callback for progress updates |
|
|
|
|
|
Returns: |
|
|
List of generated MusicClip objects |
|
|
""" |
|
|
options = {} |
|
|
if tags: |
|
|
options["tags"] = tags |
|
|
if title: |
|
|
options["title"] = title |
|
|
if instrumental: |
|
|
options["make_instrumental"] = True |
|
|
if lyrics and not instrumental: |
|
|
options["lyrics"] = lyrics |
|
|
|
|
|
result = await self.client.submit_media_task( |
|
|
action=MediaAction.GENERATE_MUSIC, |
|
|
prompt=prompt, |
|
|
options=options if options else None, |
|
|
on_progress=on_progress |
|
|
) |
|
|
|
|
|
if not result.success: |
|
|
raise Exception(result.error or "Music generation failed") |
|
|
|
|
|
return self._parse_music_result(result.data) |
|
|
|
|
|
async def create_cover( |
|
|
self, |
|
|
audio_url: str, |
|
|
style_prompt: str, |
|
|
title: Optional[str] = None, |
|
|
tags: Optional[str] = None, |
|
|
on_progress: Optional[Callable[[float, str], None]] = None |
|
|
) -> List[MusicClip]: |
|
|
""" |
|
|
Create a cover version of audio. |
|
|
|
|
|
Args: |
|
|
audio_url: URL to source audio |
|
|
style_prompt: Style/voice direction for the cover |
|
|
title: Optional title for the cover |
|
|
tags: Optional genre/style tags |
|
|
on_progress: Progress callback |
|
|
|
|
|
Returns: |
|
|
List of generated cover clips |
|
|
""" |
|
|
options = {} |
|
|
if tags: |
|
|
options["tags"] = tags |
|
|
if title: |
|
|
options["title"] = title |
|
|
|
|
|
result = await self.client.submit_media_task( |
|
|
action=MediaAction.CREATE_COVER, |
|
|
audio_url=audio_url, |
|
|
prompt=style_prompt, |
|
|
options=options if options else None, |
|
|
on_progress=on_progress |
|
|
) |
|
|
|
|
|
if not result.success: |
|
|
raise Exception(result.error or "Cover creation failed") |
|
|
|
|
|
return self._parse_music_result(result.data) |
|
|
|
|
|
async def extract_stems( |
|
|
self, |
|
|
audio_url: str, |
|
|
on_progress: Optional[Callable[[float, str], None]] = None |
|
|
) -> StemResult: |
|
|
""" |
|
|
Extract stems (vocals, drums, bass, other) from audio. |
|
|
|
|
|
Args: |
|
|
audio_url: URL to source audio |
|
|
on_progress: Progress callback |
|
|
|
|
|
Returns: |
|
|
StemResult with paths to each stem |
|
|
""" |
|
|
result = await self.client.submit_media_task( |
|
|
action=MediaAction.EXTRACT_STEMS, |
|
|
audio_url=audio_url, |
|
|
on_progress=on_progress |
|
|
) |
|
|
|
|
|
if not result.success: |
|
|
raise Exception(result.error or "Stem extraction failed") |
|
|
|
|
|
stems_data = result.data.get("stems", result.data) |
|
|
|
|
|
stem_result = StemResult() |
|
|
|
|
|
|
|
|
if stems_data.get("vocals"): |
|
|
stem_result.vocals_path = await self.client.download_file( |
|
|
stems_data["vocals"], "vocals.mp3" |
|
|
) |
|
|
if stems_data.get("drums"): |
|
|
stem_result.drums_path = await self.client.download_file( |
|
|
stems_data["drums"], "drums.mp3" |
|
|
) |
|
|
if stems_data.get("bass"): |
|
|
stem_result.bass_path = await self.client.download_file( |
|
|
stems_data["bass"], "bass.mp3" |
|
|
) |
|
|
if stems_data.get("other"): |
|
|
stem_result.other_path = await self.client.download_file( |
|
|
stems_data["other"], "other.mp3" |
|
|
) |
|
|
|
|
|
return stem_result |
|
|
|
|
|
def _parse_music_result(self, data: dict) -> List[MusicClip]: |
|
|
"""Parse API response into MusicClip objects.""" |
|
|
clips = [] |
|
|
|
|
|
|
|
|
raw_clips = data.get("clips", []) |
|
|
|
|
|
|
|
|
if not raw_clips: |
|
|
if data.get("audio_url") or data.get("audioUrl"): |
|
|
raw_clips = [data] |
|
|
elif data.get("url"): |
|
|
raw_clips = [{"audio_url": data["url"], "title": data.get("title", "Generated")}] |
|
|
|
|
|
for clip_data in raw_clips: |
|
|
audio_url = clip_data.get("audio_url") or clip_data.get("audioUrl") or clip_data.get("url") |
|
|
if audio_url: |
|
|
clips.append(MusicClip( |
|
|
title=clip_data.get("title", "Generated Music"), |
|
|
audio_url=audio_url, |
|
|
duration=clip_data.get("duration"), |
|
|
image_url=clip_data.get("image_url") or clip_data.get("imageUrl"), |
|
|
video_url=clip_data.get("video_url") or clip_data.get("videoUrl"), |
|
|
tags=clip_data.get("tags", []) |
|
|
)) |
|
|
|
|
|
return clips |
|
|
|
|
|
async def download_clip(self, clip: MusicClip) -> str: |
|
|
"""Download a clip's audio to local file.""" |
|
|
if clip.audio_path: |
|
|
return clip.audio_path |
|
|
|
|
|
filename = f"{clip.title.replace(' ', '_')[:30]}.mp3" |
|
|
clip.audio_path = await self.client.download_file(clip.audio_url, filename) |
|
|
return clip.audio_path |
|
|
|
|
|
def cleanup(self): |
|
|
"""Clean up temporary files.""" |
|
|
self.client.cleanup() |
|
|
|