Spaces:
Running
Running
| import vlc | |
| from PySide6.QtCore import QObject, Signal, Slot, QTimer | |
| from typing import Optional, Callable | |
| class VLCService(QObject): | |
| state_changed = Signal(str) # playing, paused, stopped, error | |
| position_changed = Signal(float) # 0.0 - 1.0 | |
| time_changed = Signal(int) # milliseconds | |
| duration_changed = Signal(int) # milliseconds | |
| def __init__(self): | |
| super().__init__() | |
| self.instance = vlc.Instance("--no-video-title-show") | |
| self.player = self.instance.media_player_new() | |
| self.media = None | |
| self._is_paused = False | |
| # Timer do aktualizacji pozycji | |
| self.timer = QTimer() | |
| self.timer.timeout.connect(self._update_position) | |
| self.timer.start(1000) | |
| self._error_callback: Optional[Callable] = None | |
| def play_stream(self, url: str): | |
| try: | |
| if self.media: | |
| self.media.release() | |
| self.media = self.instance.media_new(url) | |
| self.player.set_media(self.media) | |
| self.player.play() | |
| self._is_paused = False | |
| self.state_changed.emit("playing") | |
| except Exception as e: | |
| self.state_changed.emit(f"error: {str(e)}") | |
| def stop(self): | |
| self.player.stop() | |
| self.state_changed.emit("stopped") | |
| def pause(self): | |
| if self._is_paused: | |
| self.player.play() | |
| self.state_changed.emit("playing") | |
| else: | |
| self.player.pause() | |
| self.state_changed.emit("paused") | |
| self._is_paused = not self._is_paused | |
| def set_volume(self, volume: int): | |
| self.player.audio_set_volume(volume) | |
| def get_volume(self) -> int: | |
| return self.player.audio_get_volume() | |
| def seek(self, position: float): | |
| if self.player.is_seekable(): | |
| duration = self.player.get_length() | |
| if duration > 0: | |
| self.player.set_time(int(duration * position)) | |
| def set_fullscreen(self, fullscreen: bool): | |
| self.player.set_fullscreen(fullscreen) | |
| def _update_position(self): | |
| if self.player.is_playing(): | |
| duration = self.player.get_length() | |
| current = self.player.get_time() | |
| if duration > 0: | |
| self.position_changed.emit(current / duration) | |
| self.time_changed.emit(current) | |
| self.duration_changed.emit(duration) | |
| def cleanup(self): | |
| self.timer.stop() | |
| self.player.stop() | |
| if self.media: | |
| self.media.release() | |
| self.instance.release() |