| import os |
| import av |
| import threading |
| import subprocess |
| import time |
| import streamlit as st |
| from streamlit_webrtc import webrtc_streamer, VideoProcessorBase, RTCConfiguration |
| from huggingface_hub import HfApi |
|
|
| |
| |
| |
| st.set_page_config(page_title="HF Cloud Streamer", page_icon="π΄", layout="wide") |
|
|
| |
| |
| |
| DATASET_ID = "Mafia2008/Vod" |
| BASE_URL = f"https://huggingface.co/datasets/{DATASET_ID}/resolve/main/recordings" |
| LOCAL_DIR = "/tmp/hls" |
| HF_TOKEN = os.environ.get("HF_TOKEN") |
|
|
| |
| if not os.path.exists(LOCAL_DIR): |
| os.makedirs(LOCAL_DIR, exist_ok=True) |
|
|
| |
| api = None |
| if HF_TOKEN: |
| api = HfApi(token=HF_TOKEN) |
| else: |
| st.warning("β οΈ HF_TOKEN is missing in Space Settings! Recording will not be saved.") |
|
|
| |
| |
| |
| class HLSTranscoder: |
| def __init__(self): |
| self.process = None |
|
|
| def start(self): |
| if self.process is None: |
| cmd = [ |
| "ffmpeg", |
| "-f", "rawvideo", |
| "-pixel_format", "bgr24", |
| "-video_size", "640x480", |
| "-framerate", "30", |
| "-i", "pipe:0", |
| "-c:v", "libx264", |
| "-preset", "veryfast", |
| "-tune", "zerolatency", |
| "-g", "60", |
| "-sc_threshold", "0", |
| "-f", "hls", |
| "-hls_time", "4", |
| "-hls_list_size", "0", |
| "-hls_segment_filename", f"{LOCAL_DIR}/segment_%03d.ts", |
| f"{LOCAL_DIR}/stream.m3u8" |
| ] |
| self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE) |
| print("π’ FFmpeg Transcoder Started") |
|
|
| def write_frame(self, frame): |
| if self.process and self.process.stdin: |
| try: |
| self.process.stdin.write(frame.tobytes()) |
| except Exception: |
| pass |
|
|
| |
| engine = HLSTranscoder() |
|
|
| |
| |
| |
| def dataset_sync_worker(): |
| uploaded_files = set() |
| while True: |
| |
| if api and os.path.exists(LOCAL_DIR): |
| try: |
| files = sorted(os.listdir(LOCAL_DIR)) |
| ts_files = [f for f in files if f.endswith(".ts")] |
| |
| |
| for f in ts_files: |
| if f not in uploaded_files: |
| if f == ts_files[-1] and len(ts_files) > 1: continue |
| |
| file_path = os.path.join(LOCAL_DIR, f) |
| api.upload_file( |
| path_or_fileobj=file_path, |
| path_in_repo=f"recordings/{f}", |
| repo_id=DATASET_ID, |
| repo_type="dataset" |
| ) |
| uploaded_files.add(f) |
| print(f"β
Uploaded {f}") |
|
|
| |
| m3u8_path = os.path.join(LOCAL_DIR, "stream.m3u8") |
| if os.path.exists(m3u8_path) and len(uploaded_files) > 0: |
| with open(m3u8_path, "r") as file: |
| lines = file.readlines() |
| |
| new_lines = [] |
| for line in lines: |
| clean = line.strip() |
| if clean.endswith(".ts"): |
| new_lines.append(f"{BASE_URL}/{clean}\n") |
| else: |
| new_lines.append(line) |
| |
| public_path = os.path.join(LOCAL_DIR, "public_stream.m3u8") |
| with open(public_path, "w") as file: |
| file.writelines(new_lines) |
|
|
| api.upload_file( |
| path_or_fileobj=public_path, |
| path_in_repo="recordings/stream.m3u8", |
| repo_id=DATASET_ID, |
| repo_type="dataset" |
| ) |
| except Exception as e: |
| print(f"Sync Error: {e}") |
| time.sleep(4) |
|
|
| if "sync_thread_active" not in st.session_state: |
| threading.Thread(target=dataset_sync_worker, daemon=True).start() |
| st.session_state["sync_thread_active"] = True |
|
|
| |
| |
| |
| class VideoProcessor(VideoProcessorBase): |
| def __init__(self): |
| engine.start() |
|
|
| def recv(self, frame: av.VideoFrame) -> av.VideoFrame: |
| img = frame.to_ndarray(format="bgr24") |
| engine.write_frame(img) |
| return frame |
|
|
| |
| |
| |
| st.title("π΄ Live Streamer β HF Dataset") |
| st.caption(f"Archiving to: **{DATASET_ID}**") |
|
|
| tab1, tab2 = st.tabs(["π₯ Broadcaster", "π Viewer Link"]) |
|
|
| with tab1: |
| st.info("Allow camera access to start.") |
| rtc_config = RTCConfiguration({"iceServers": [{"urls": ["stun:stun.l.google.com:19302"]}]}) |
| webrtc_streamer( |
| key="streamer", |
| mode=webrtc_streamer.WebRtcMode.SENDRECV, |
| rtc_configuration=rtc_config, |
| video_processor_factory=VideoProcessor, |
| media_stream_constraints={"video": {"width": 640, "height": 480}, "audio": False}, |
| async_processing=True, |
| ) |
|
|
| with tab2: |
| st.success("Stream is Live!") |
| st.code(f"{BASE_URL}/stream.m3u8", language="text") |
| st.markdown(f"[π Dataset Files](https://huggingface.co/datasets/{DATASET_ID}/tree/main/recordings)") |
| |