import gradio as gr import time import json from typing import Dict, List, Tuple, Optional import threading import asyncio import logging from dataclasses import dataclass from enum import Enum # For real Chromecast control try: import pychromecast from pychromecast.controllers.youtube import YouTubeController from pychromecast.controllers.media import MediaController from pychromecast import Chromecast CHROMECAST_AVAILABLE = True except ImportError: CHROMECAST_AVAILABLE = False # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class DeviceStatus(Enum): DISCONNECTED = "disconnected" CONNECTED = "connected" PLAYING = "playing" PAUSED = "paused" ERROR = "error" @dataclass class TVDevice: """Represents a Chromecast TV device""" id: int name: str chromecast: Optional[Chromecast] = None status: DeviceStatus = DeviceStatus.DISCONNECTED current_content: str = "Esperando..." volume: int = 50 is_muted: bool = False media_controller: Optional[MediaController] = None class ChromecastController: """Real Chromecast controller with device management""" def __init__(self): self.devices: Dict[int, TVDevice] = {} self.current_video_url: Optional[str] = None self.progress: float = 0 self.playlist: List[Dict] = [] self.discovered_casts: List[Chromecast] = [] self.is_playing: bool = False self.progress_thread: Optional[threading.Thread] = None self.stop_progress = False # Initialize demo devices self._initialize_demo_devices() # Discover real devices if available if CHROMECAST_AVAILABLE: self._discover_devices() # Initialize default playlist self._initialize_playlist() def _initialize_demo_devices(self): """Initialize demo TV devices""" demo_configs = [ {"name": "TV - Sala Principal", "location": "Sala"}, {"name": "TV - Barra", "location": "Bar"}, {"name": "TV - Terraza", "location": "Terraza"}, ] for i, config in enumerate(demo_configs, 1): self.devices[i] = TVDevice( id=i, name=config["name"] ) def _discover_devices(self): """Discover real Chromecast devices on network""" try: # Discover Chromecasts (blocking operation) self.discovered_casts, browser = pychromecast.get_listed_chromecasts() logger.info(f"Found {len(self.discovered_casts)} Chromecast devices") # Map discovered devices to our demo devices for i, cast in enumerate(self.discovered_casts[:3], 1): if i in self.devices: self.devices[i].chromecast = cast self.devices[i].status = DeviceStatus.CONNECTED cast.wait() # Get media controller self.devices[i].media_controller = cast.media_controller logger.info(f"Connected to {cast.name}") except Exception as e: logger.error(f"Error discovering devices: {e}") def _initialize_playlist(self): """Initialize default playlist""" self.playlist = [ { "title": "Video Promocional - Restaurante", "source": "YouTube", "url": "https://www.youtube.com/watch?v=dQw4w9WgXcQ", "duration": "3:45", "thumbnail": "https://picsum.photos/seed/promo/60/40" }, { "title": "Video Corporativo 2024", "source": "Vimeo", "url": "https://vimeo.com/123456789", "duration": "2:30", "thumbnail": "https://picsum.photos/seed/corp/60/40" }, { "title": "Video Evento Especial", "source": "Directo", "url": "https://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4", "duration": "5:15", "thumbnail": "https://picsum.photos/seed/event/60/40" } ] def discover_new_devices(self) -> Tuple[str, str, str]: """Discover new Chromecast devices""" if not CHROMECAST_AVAILABLE: return "❌ Error", "PyChromecast no está instalado", "error" try: self._discover_devices() count = sum(1 for device in self.devices.values() if device.chromecast) return "✅ Descubiertos", f"Se encontraron {count} dispositivos", "success" except Exception as e: return "❌ Error", f"Error al descubrir: {str(e)}", "error" def get_device_status(self, device_id: int) -> Dict: """Get status of a specific device""" device = self.devices.get(device_id) if not device: return {} # Update real device status if available if device.chromecast and CHROMECAST_AVAILABLE: try: mc = device.chromecast.media_controller if mc.status: if mc.status.player_is_playing: device.status = DeviceStatus.PLAYING elif mc.status.player_is_paused: device.status = DeviceStatus.PAUSED else: device.status = DeviceStatus.CONNECTED device.current_content = mc.status.title or "Sin título" device.volume = int(device.chromecast.status.volume_level * 100) device.is_muted = device.chromecast.status.volume_muted except Exception as e: logger.error(f"Error updating device {device_id}: {e}") device.status = DeviceStatus.ERROR return { "name": device.name, "status": device.status.value, "connected": device.chromecast is not None, "playing": device.status == DeviceStatus.PLAYING, "content": device.current_content, "volume": device.volume, "muted": device.is_muted, "device_type": "Chromecast" if device.chromecast else "Simulado" } def get_all_devices_status(self) -> List[Dict]: """Get status of all devices""" return [self.get_device_status(i) for i in range(1, 4)] def load_video(self, url: str, title: str = "") -> Tuple[str, str, str]: """Load video to all connected devices""" if not url: return "❌ Error", "Por favor, introduce una URL válida", "error" self.current_video_url = url video_title = title or self._extract_video_name(url) loaded_count = 0 errors = [] for device in self.devices.values(): if device.chromecast and CHROMECAST_AVAILABLE: try: mc = device.chromecast.media_controller # Handle different video sources if "youtube.com" in url: # Extract YouTube video ID video_id = url.split("v=")[-1].split("&")[0] yt_controller = YouTubeController() device.chromecast.register_handler(yt_controller) yt_controller.play_video(video_id) else: # Load generic media mc.play_media(url, "video/mp4", title=video_title) device.current_content = video_title loaded_count += 1 except Exception as e: errors.append(f"{device.name}: {str(e)}") else: # Simulated device device.current_content = video_title device.status = DeviceStatus.CONNECTED if loaded_count > 0: return "✅ Cargado", f"Video cargado en {loaded_count} dispositivos", "success" elif errors: return "❌ Error", f"Errores: {'; '.join(errors[:2])}", "error" else: return "⚠️ Simulado", "Video cargado (modo simulación)", "info" def _extract_video_name(self, url: str) -> str: """Extract video name from URL""" if "youtube.com" in url.lower(): return "Video de YouTube" elif "vimeo.com" in url.lower(): return "Video de Vimeo" elif ".mp4" in url.lower(): return "Video Directo MP4" elif "commondatastorage.googleapis.com" in url.lower(): return "Video de Google Storage" else: return "Video Online" def control_device(self, device_id: int, action: str) -> Tuple[str, str, str]: """Control individual device""" device = self.devices.get(device_id) if not device: return "❌ Error", f"Dispositivo {device_id} no encontrado", "error" if not device.chromecast or not CHROMECAST_AVAILABLE: # Simulated control if action in ["play", "pause", "stop", "mute"]: device.status = DeviceStatus.PLAYING if action == "play" else DeviceStatus.CONNECTED if action == "mute": device.is_muted = not device.is_muted return "⚠️ Simulado", f"Acción '{action}' simulada en {device.name}", "info" return "❌ Error", "Dispositivo no disponible", "error" try: mc = device.chromecast.media_controller if action == "play": if not self.current_video_url: return "❌ Error", "No hay video cargado", "error" mc.play() device.status = DeviceStatus.PLAYING return "✅ Reproduciendo", f"{device.name} está reproduciendo", "success" elif action == "pause": mc.pause() device.status = DeviceStatus.PAUSED return "⏸️ Pausado", f"{device.name} pausado", "info" elif action == "stop": mc.stop() device.status = DeviceStatus.CONNECTED device.current_content = "Esperando..." return "⏹️ Detenido", f"{device.name} detenido", "info" elif action == "mute": device.chromecast.set_volume_muted(not device.is_muted) device.is_muted = not device.is_muted status = "silenciado" if device.is_muted else "activado" return "🔇 Volumen", f"{device.name} {status}", "info" elif action == "volume_up": new_volume = min(100, device.volume + 10) device.chromecast.set_volume(new_volume / 100) device.volume = new_volume return "🔊 Volumen", f"{device.name}: {new_volume}%", "info" elif action == "volume_down": new_volume = max(0, device.volume - 10) device.chromecast.set_volume(new_volume / 100) device.volume = new_volume return "🔊 Volumen", f"{device.name}: {new_volume}%", "info" except Exception as e: return "❌ Error", f"Error en {device.name}: {str(e)}", "error" return "❌ Error", "Acción no reconocida", "error" def control_all_devices(self, action: str) -> Tuple[str, str, str]: """Control all devices simultaneously""" results = [] success_count = 0 for device_id in self.devices.keys(): title, message, status = self.control_device(device_id, action) results.append(f"{device_id}: {message}") if status == "success": success_count += 1 if action == "stop": self.current_video_url = None self.progress = 0 self.is_playing = False if success_count > 0: return f"✅ {action.title()}", f"Acción ejecutada en {success_count} dispositivos", "success" else: return "⚠️ Simulado", "Acción ejecutada (modo simulación)", "info" def set_volume_all(self, volume: int) -> Tuple[str, str, str]: """Set volume for all devices""" volume = max(0, min(100, volume)) success_count = 0 for device in self.devices.values(): if device.chromecast and CHROMECAST_AVAILABLE: try: device.chromecast.set_volume(volume / 100) device.volume = volume success_count += 1 except Exception as e: logger.error(f"Error setting volume on {device.name}: {e}") else: device.volume = volume return "🔊 Volumen", f"Volumen ajustado a {volume}% ({success_count} reales)", "info" def update_progress(self) -> Tuple[float, str, str]: """Update playback progress""" if self.is_playing and self.progress < 100: self.progress += 0.5 # Get real progress from first connected device for device in self.devices.values(): if device.chromecast and CHROMECAST_AVAILABLE: try: mc = device.chromecast.media_controller if mc.status and mc.status.current_time and mc.status.duration: real_progress = (mc.status.current_time / mc.status.duration) * 100 current_seconds = int(mc.status.current_time) duration_seconds = int(mc.status.duration) current_time = f"{current_seconds // 60:02d}:{current_seconds % 60:02d}" duration = f"{duration_seconds // 60:02d}:{duration_seconds % 60:02d}" return real_progress, current_time, duration except: pass # Fallback to simulated progress current_seconds = int(self.progress * 2.25) current_time = f"{current_seconds // 60:02d}:{current_seconds % 60:02d}" duration = "03:45" return self.progress, current_time, duration def add_to_playlist(self, url: str, title: str = "") -> Tuple[str, str, str]: """Add video to playlist""" if not url: return "❌ Error", "Introduce una URL primero", "error" if not title: title = self._extract_video_name(url) self.playlist.append({ "title": title, "source": "Custom", "url": url, "duration": "--:--", "thumbnail": f"https://picsum.photos/seed/{title}/60/40" }) return "✅ Éxito", f"'{title}' agregado a la playlist", "success" def play_from_playlist(self, index: int) -> Tuple[str, str, str]: """Play video from playlist""" if 0 <= index < len(self.playlist): video = self.playlist[index] return self.load_video(video["url"], video["title"]) return "❌ Error", "Video no encontrado en playlist", "error" def get_playlist(self) -> List[Dict]: """Get current playlist""" return self.playlist # Initialize controller controller = ChromecastController() def update_devices_display(): """Update the devices status display""" statuses = controller.get_all_devices_status() html = "" for i, status in enumerate(statuses, 1): status_colors = { "connected": "#16a34a", "playing": "#d97706", "paused": "#2563eb", "disconnected": "#dc2626", "error": "#dc2626" } status_texts = { "connected": "Conectado", "playing": "Reproduciendo", "paused": "Pausado", "disconnected": "Desconectado", "error": "Error" } status_color = status_colors.get(status["status"], "#6b7280") status_text = status_texts.get(status["status"], "Desconocido") # Status icon status_icons = { "playing": "🎬", "connected": "✅", "paused": "⏸️", "disconnected": "❌", "error": "⚠️" } icon = status_icons.get(status["status"], "📺") html += f"""

{icon} {status['name']}

{status_text} {status['device_type']}
Reproduciendo: {status['content']}
Volumen: {status['volume']}%{' 🔇' if status['muted'] else ''}
""" return html def update_playlist_display(): """Update the playlist display""" playlist = controller.get_playlist() html = "" for i, video in enumerate(playlist): html += f"""
Thumbnail

{video['title']}

{video['source']} • {video['duration']}

""" return html def load_video_url(url): """Load video from URL""" if not url.strip(): return "❌ Error", "Introduce una URL válida", update_devices_display(), "" title, message, status = controller.load_video(url) return title, message, update_devices_display(), url def discover_devices(): """Discover new devices""" title, message, status = controller.discover_new_devices() return title, message, update_devices_display() def control_individual_device(device_id, action): """Control individual device""" title, message, status = controller.control_device(device_id, action) return title, message, update_devices_display() def control_all_devices_action(action): """Control all devices""" title, message, status = controller.control_all_devices(action) return title, message, update_devices_display() def change_volume_all(volume): """Change volume for all devices""" title, message, status = controller.set_volume_all(volume) return title, message, update_devices_display(), f"{volume}%" def update_playback_progress(): """Update playback progress""" progress, current_time, duration = controller.update_progress() return progress, current_time, duration def add_video_to_playlist(url, title): """Add video to playlist""" result_title, message, status = controller.add_to_playlist(url, title) return result_title, message, update_playlist_display() def play_playlist_item(index): """Play item from playlist""" title, message, status = controller.play_from_playlist(index) return title, message, update_devices_display() def load_sample_video(sample_type): """Load sample video""" samples = { "youtube": "https://www.youtube.com/watch?v=dQw4w9WgXcQ", "vimeo": "https://vimeo.com/123456789", "direct": "https://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4" } url = samples.get(sample_type, "") return load_video_url(url) # Create CSS for better styling custom_css = """ .container { max-width: 1400px; margin: 0 auto; } .status-card { transition: all 0.3s ease; } .status-card:hover { transform: translateY(-2px); box-shadow: 0 4px 12px rgba(0,0,0,0.15); } .device-button { transition: all 0.2s ease; } .device-button:hover { transform: scale(1.05); } .playlist-item { transition: all 0.2s ease; } .progress-bar { height: 8px; } .gradio-container { background: #f8fafc; } """ # Create Gradio interface - FIXED: Removed css from Blocks constructor with gr.Blocks(title="Control Multimedia Chromecast - Real", theme=gr.themes.Soft()) as demo: gr.HTML("""

📺 Control Multimedia Chromecast - Hostelería

Sistema de control real para dispositivos Chromecast con descubrimiento automático

{'🟢 Real' if CHROMECAST_AVAILABLE else '🟡 Modo Demo'} Python + PyChromecast
Built with anycoder
""") with gr.Row(): with gr.Column(scale=2): # Device Discovery Section with gr.Group(): gr.HTML("

🔍 Descubrimiento de Dispositivos

") with gr.Row(): discover_btn = gr.Button("🔍 Descubrir Chromecasts", variant="primary") refresh_status_btn = gr.Button("🔄 Actualizar Estado", variant="secondary") # Quick Actions gr.HTML("

⚡ Acciones Rápidas

") with gr.Row(): play_all_btn = gr.Button("▶️ Reproducir Todo", variant="primary", size="sm") pause_all_btn = gr.Button("⏸️ Pausar Todo", variant="secondary", size="sm") stop_all_btn = gr.Button("⏹️ Detener Todo", variant="stop", size="sm") mute_all_btn = gr.Button("🔇 Silenciar Todo", variant="secondary", size="sm") # Device Status Display gr.HTML("

📺 Estado de los Dispositivos

") devices_display = gr.HTML(update_devices_display()) # Video Input Section gr.HTML("

🎬 Control de Reproducción

") with gr.Row(): video_url = gr.Textbox( label="URL del Video", placeholder="https://www.youtube.com/watch?v=...", scale=3 ) load_btn = gr.Button("📥 Cargar Video", variant="primary", scale=1) with gr.Row(): youtube_btn = gr.Button("📺 YouTube Sample", size="sm") vimeo_btn = gr.Button("🎥 Vimeo Sample", size="sm") direct_btn = gr.Button("🔗 Video Directo", size="sm") # Progress Bar gr.HTML("

📊 Progreso de Reproducción

") progress = gr.Slider(0, 100, value=0, label="Progreso", interactive=True) with gr.Row(): current_time = gr.Textbox("00:00", label="Tiempo Actual", interactive=False, scale=1) duration_time = gr.Textbox("03:45", label="Duración Total", interactive=False, scale=1) # Media Controls gr.HTML("

🎮 Controles de Multimedia

") with gr.Row(): rewind_btn = gr.Button("⏪ -10s", size="sm") play_btn = gr.Button("▶️ Reproducir", variant="primary", size="sm") pause_btn = gr.Button("⏸️ Pausar", variant="secondary", size="sm") stop_btn = gr.Button("⏹️ Detener", variant="stop", size="sm") forward_btn = gr.Button("⏩ +10s", size="sm") # Volume Control gr.HTML("

🔊 Control de Volumen Maestro

") with gr.Row(): volume_slider = gr.Slider(0, 100, value=50, label="Volumen", scale=2) volume_percentage = gr.Textbox("50%", label="Porcentaje", interactive=False, scale=1) with gr.Column(scale=1): # Individual Device Controls gr.HTML("

🎛️ Controles Individuales

") for i in range(1, 4): with gr.Group(): device_name = f"TV {i}" gr.HTML(f"

📺 {device_name}

") with gr.Row(): play_btn = gr.Button("▶️", size="sm", variant="primary", elem_classes=["device-button"]) pause_btn = gr.Button("⏸️", size="sm", elem_classes=["device-button"]) stop_btn = gr.Button("⏹️", size="sm", variant="stop", elem_classes=["device-button"]) mute_btn = gr.Button("🔇", size="sm", elem_classes=["device-button"]) vol_up_btn = gr.Button("🔊", size="sm", elem_classes=["device-button"]) vol_down_btn = gr.Button("🔉", size="sm", elem_classes=["device-button"]) # Store buttons in variables for event binding if i == 1: tv1_play, tv1_pause, tv1_stop, tv1_mute, tv1_vol_up, tv1_vol_down = play_btn, pause_btn, stop_btn, mute_btn, vol_up_btn, vol_down_btn elif i == 2: tv2_play, tv2_pause, tv2_stop, tv2_mute, tv2_vol_up, tv2_vol_down = play_btn, pause_btn, stop_btn, mute_btn, vol_up_btn, vol_down_btn else: tv3_play, tv3_pause, tv3_stop, tv3_mute, tv3_vol_up, tv3_vol_down = play_btn, pause_btn, stop_btn, mute_btn, vol_up_btn, vol_down_btn # Playlist Section gr.HTML("

📋 Playlist de Videos

") playlist_display = gr.HTML(update_playlist_display()) with gr.Group(): playlist_url = gr.Textbox(label="URL para Playlist", placeholder="Agregar video a playlist") playlist_title = gr.Textbox(label="Título (opcional)") with gr.Row(): add_playlist_btn = gr.Button("➕ Agregar", variant="primary") clear_playlist_btn = gr.Button("🗑️ Limpiar", variant="secondary") # Status Display with gr.Row(): status_title = gr.Textbox("✅ Sistema Listo", label="Estado", interactive=False, scale=1) status_message = gr.Textbox("Esperando acción", label="Mensaje", interactive=False, scale=2) # Event Handlers load_btn.click(load_video_url, inputs=[video_url], outputs=[status_title, status_message, devices_display, video_url]) # Device discovery discover_btn.click(discover_devices, outputs=[status_title, status_message, devices_display]) refresh_status_btn.click(lambda: update_devices_display(), outputs=[devices_display]) # Sample videos youtube_btn.click(fn=lambda: load_sample_video("youtube"), outputs=[status_title, status_message, devices_display, video_url]) vimeo_btn.click(fn=lambda: load_sample_video("vimeo"), outputs=[status_title, status_message, devices_display, video_url]) direct_btn.click(fn=lambda: load_sample_video("direct"), outputs=[status_title, status_message, devices_display, video_url]) # Quick actions play_all_btn.click(fn=lambda: control_all_devices_action("play"), outputs=[status_title, status_message, devices_display]) pause_all_btn.click(fn=lambda: control_all_devices_action("pause"), outputs=[status_title, status_message, devices_display]) stop_all_btn.click(fn=lambda: control_all_devices_action("stop"), outputs=[status_title, status_message, devices_display]) mute_all_btn.click(fn=lambda: control_all_devices_action("mute"), outputs=[status_title, status_message, devices_display]) # Media controls play_btn.click(fn=lambda: control_all_devices_action("play"), outputs=[status_title, status_message, devices_display]) pause_btn.click(fn=lambda: control_all_devices_action("pause"), outputs=[status_title, status_message, devices_display]) stop_btn.click(fn=lambda: control_all_devices_action("stop"), outputs=[status_title, status_message, devices_display]) # Volume control volume_slider.change(change_volume_all, inputs=[volume_slider], outputs=[status_title, status_message, devices_display, volume_percentage]) # Individual TV controls - TV 1 tv1_play.click(fn=lambda: control_individual_device(1, "play"), outputs=[status_title, status_message, devices_display]) tv1_pause.click(fn=lambda: control_individual_device(1, "pause"), outputs=[status_title, status_message, devices_display]) tv1_stop.click(fn=lambda: control_individual_device(1, "stop"), outputs=[status_title, status_message, devices_display]) tv1_mute.click(fn=lambda: control_individual_device(1, "mute"), outputs=[status_title, status_message, devices_display]) tv1_vol_up.click(fn=lambda: control_individual_device(1, "volume_up"), outputs=[status_title, status_message, devices_display]) tv1_vol_down.click(fn=lambda: control_individual_device(1, "volume_down"), outputs=[status_title, status_message, devices_display]) # Individual TV controls - TV 2 tv2_play.click(fn=lambda: control_individual_device(2, "play"), outputs=[status_title, status_message, devices_display]) tv2_pause.click(fn=lambda: control_individual_device(2, "pause"), outputs=[status_title, status_message, devices_display]) tv2_stop.click(fn=lambda: control_individual_device(2, "stop"), outputs=[status_title, status_message, devices_display]) tv2_mute.click(fn=lambda: control_individual_device(2, "mute"), outputs=[status_title, status_message, devices_display]) tv2_vol_up.click(fn=lambda: control_individual_device(2, "volume_up"), outputs=[status_title, status_message, devices_display]) tv2_vol_down.click(fn=lambda: control_individual_device(2, "volume_down"), outputs=[status_title, status_message, devices_display]) # Individual TV controls - TV 3 tv3_play.click(fn=lambda: control_individual_device(3, "play"), outputs=[status_title, status_message, devices_display]) tv3_pause.click(fn=lambda: control_individual_device(3, "pause"), outputs=[status_title, status_message, devices_display]) tv3_stop.click(fn=lambda: control_individual_device(3, "stop"), outputs=[status_title, status_message, devices_display]) tv3_mute.click(fn=lambda: control_individual_device(3, "mute"), outputs=[status_title, status_message, devices_display]) tv3_vol_up.click(fn=lambda: control_individual_device(3, "volume_up"), outputs=[status_title, status_message, devices_display]) tv3_vol_down.click(fn=lambda: control_individual_device(3, "volume_down"), outputs=[status_title, status_message, devices_display]) # Playlist controls add_playlist_btn.click(add_video_to_playlist, inputs=[playlist_url, playlist_title], outputs=[status_title, status_message, playlist_display]) # Timer for progress update timer = gr.Timer(1.0) timer.tick(update_playback_progress, outputs=[progress, current_time, duration_time]) # Footer with information gr.HTML("""
Motor: Python 3.9+ Librería: PyChromecast Protocolo: Cast v2
Built with anycoder
""") if __name__ == "__main__": # Display installation info if PyChromecast is not available if not CHROMECAST_AVAILABLE: print("\n" + "="*60) print("⚠️ ADVERTENCIA: PyChromecast no está instalado") print("Para control real de Chromecasts, instala:") print("pip install pychromecast") print("="*60 + "\n") print("🚀 Iniciando Control Multimedia Chromecast...") print(f"📊 Modo: {'Real (con Chromecasts)' if CHROMECAST_AVAILABLE else 'Demo/Simulación'}") # FIXED: Pass css parameter to launch() method instead demo.launch( theme=gr.themes.Soft(), css=custom_css, share=False, server_name="0.0.0.0", server_port=7860 )