Spaces:
Sleeping
Sleeping
File size: 4,417 Bytes
f871fed |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
"""
Podcast service layer using API client.
This replaces direct httpx calls in the Streamlit pages.
"""
from typing import Any, Dict, List
from loguru import logger
from api.client import api_client
class PodcastAPIService:
"""Service layer for podcast operations using API client."""
def __init__(self):
logger.info("Using API client for podcast operations")
# Episode methods
def get_episodes(self) -> List[Dict[Any, Any]]:
"""Get all podcast episodes."""
result = api_client._make_request("GET", "/api/podcasts/episodes")
return result if isinstance(result, list) else [result]
def delete_episode(self, episode_id: str) -> bool:
"""Delete a podcast episode."""
try:
api_client._make_request("DELETE", f"/api/podcasts/episodes/{episode_id}")
return True
except Exception as e:
logger.error(f"Failed to delete episode: {e}")
return False
# Episode Profile methods
def get_episode_profiles(self) -> List[Dict]:
"""Get all episode profiles."""
return api_client.get_episode_profiles()
def create_episode_profile(self, profile_data: Dict) -> bool:
"""Create a new episode profile."""
try:
api_client.create_episode_profile(**profile_data)
return True
except Exception as e:
logger.error(f"Failed to create episode profile: {e}")
return False
def update_episode_profile(self, profile_id: str, profile_data: Dict) -> bool:
"""Update an episode profile."""
try:
api_client.update_episode_profile(profile_id, **profile_data)
return True
except Exception as e:
logger.error(f"Failed to update episode profile: {e}")
return False
def delete_episode_profile(self, profile_id: str) -> bool:
"""Delete an episode profile."""
try:
api_client.delete_episode_profile(profile_id)
return True
except Exception as e:
logger.error(f"Failed to delete episode profile: {e}")
return False
def duplicate_episode_profile(self, profile_id: str) -> bool:
"""Duplicate an episode profile."""
try:
api_client._make_request(
"POST", f"/api/episode-profiles/{profile_id}/duplicate"
)
return True
except Exception as e:
logger.error(f"Failed to duplicate episode profile: {e}")
return False
# Speaker Profile methods
def get_speaker_profiles(self) -> List[Dict[Any, Any]]:
"""Get all speaker profiles."""
result = api_client._make_request("GET", "/api/speaker-profiles")
return result if isinstance(result, list) else [result]
def create_speaker_profile(self, profile_data: Dict) -> bool:
"""Create a new speaker profile."""
try:
api_client._make_request("POST", "/api/speaker-profiles", json=profile_data)
return True
except Exception as e:
logger.error(f"Failed to create speaker profile: {e}")
return False
def update_speaker_profile(self, profile_id: str, profile_data: Dict) -> bool:
"""Update a speaker profile."""
try:
api_client._make_request(
"PUT", f"/api/speaker-profiles/{profile_id}", json=profile_data
)
return True
except Exception as e:
logger.error(f"Failed to update speaker profile: {e}")
return False
def delete_speaker_profile(self, profile_id: str) -> bool:
"""Delete a speaker profile."""
try:
api_client._make_request("DELETE", f"/api/speaker-profiles/{profile_id}")
return True
except Exception as e:
logger.error(f"Failed to delete speaker profile: {e}")
return False
def duplicate_speaker_profile(self, profile_id: str) -> bool:
"""Duplicate a speaker profile."""
try:
api_client._make_request(
"POST", f"/api/speaker-profiles/{profile_id}/duplicate"
)
return True
except Exception as e:
logger.error(f"Failed to duplicate speaker profile: {e}")
return False
# Global service instance
podcast_api_service = PodcastAPIService()
|