Mafia2008 commited on
Commit
8a61671
Β·
verified Β·
1 Parent(s): daeadc6

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +119 -105
app.py CHANGED
@@ -1,155 +1,169 @@
1
- import cv2
2
- import av
3
  import os
4
- import time
5
- import subprocess
6
  import threading
 
 
7
  import streamlit as st
8
  from streamlit_webrtc import webrtc_streamer, VideoProcessorBase, RTCConfiguration
9
  from huggingface_hub import HfApi
10
 
11
- # --- CONFIGURATION ---
12
- DATASET_REPO = "YOUR_USERNAME/stream-recordings" # CHANGE THIS
 
 
 
 
 
 
 
 
 
13
  HF_TOKEN = os.environ.get("HF_TOKEN")
14
- HLS_DIR = "/tmp/hls"
15
 
16
- # Create HLS directory
17
- os.makedirs(HLS_DIR, exist_ok=True)
 
18
 
19
- # --- FFmpeg SINGLETON ---
20
- # We use a global object to hold the FFmpeg process so the stream persists
21
- class StreamEngine:
 
 
 
 
 
 
 
 
22
  def __init__(self):
23
  self.process = None
24
 
25
- def start_ffmpeg(self):
26
  if self.process is None:
27
  cmd = [
28
  "ffmpeg",
29
  "-f", "rawvideo",
30
  "-pixel_format", "bgr24",
31
- "-video_size", "640x480", # Match the input config below
32
  "-framerate", "30",
33
- "-i", "pipe:0", # Read from Python
34
  "-c:v", "libx264",
35
- "-preset", "ultrafast",
36
  "-tune", "zerolatency",
37
- "-g", "60", # Keyframe every 2s
38
  "-sc_threshold", "0",
39
  "-f", "hls",
40
- "-hls_time", "2",
41
- "-hls_list_size", "0", # Infinite DVR
42
- "-hls_segment_filename", f"{HLS_DIR}/segment_%03d.ts",
43
- f"{HLS_DIR}/stream.m3u8"
44
  ]
45
- # Open the pipe
46
  self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE)
47
- print("🟒 FFmpeg Started")
48
 
49
  def write_frame(self, frame):
50
  if self.process and self.process.stdin:
51
  try:
52
- # Convert frame to bytes and write to pipe
53
  self.process.stdin.write(frame.tobytes())
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
54
  except Exception as e:
55
- print(f"Error writing frame: {e}")
 
56
 
57
- engine = StreamEngine()
 
 
58
 
59
- # --- VIDEO PROCESSOR ---
60
- # This class runs for every frame coming from the webcam
61
- class BroadcastProcessor(VideoProcessorBase):
 
62
  def __init__(self):
63
- self.frame_count = 0
64
- # Start FFmpeg when the webcam starts
65
- engine.start_ffmpeg()
66
 
67
  def recv(self, frame: av.VideoFrame) -> av.VideoFrame:
68
  img = frame.to_ndarray(format="bgr24")
69
-
70
- # Send raw image to FFmpeg
71
  engine.write_frame(img)
72
-
73
- # Return the frame so the streamer sees themselves
74
  return frame
75
 
76
- # --- BACKGROUND UPLOADER ---
77
- def sync_to_dataset():
78
- api = HfApi(token=HF_TOKEN)
79
- processed_files = set()
80
-
81
- while True:
82
- if os.path.exists(HLS_DIR):
83
- # Find all .ts files
84
- files = sorted([f for f in os.listdir(HLS_DIR) if f.endswith(".ts")])
85
-
86
- # We assume the last file is still being written to, so we skip it
87
- # unless we have many files.
88
- for f in files[:-1]:
89
- if f not in processed_files:
90
- try:
91
- api.upload_file(
92
- path_or_fileobj=os.path.join(HLS_DIR, f),
93
- path_in_repo=f"recordings/{f}",
94
- repo_id=DATASET_REPO,
95
- repo_type="dataset"
96
- )
97
- processed_files.add(f)
98
- print(f"βœ… Archived {f}")
99
- except Exception as e:
100
- print(f"Upload failed: {e}")
101
- time.sleep(5)
102
-
103
- # Start background sync only once
104
- if "sync_started" not in st.session_state:
105
- threading.Thread(target=sync_to_dataset, daemon=True).start()
106
- st.session_state["sync_started"] = True
107
-
108
- # --- UI LAYOUT ---
109
- st.title("πŸŽ₯ HF Cloud Streamer")
110
 
111
- tab1, tab2 = st.tabs(["πŸ”΄ Go Live", "πŸ“Ί Watch Stream"])
112
 
113
  with tab1:
114
- st.header("Broadcaster Control")
115
- st.info("Allow camera access. If it fails, check STUN/TURN settings.")
116
-
117
- # STUN/TURN CONFIG (CRITICAL FOR HF SPACES)
118
- rtc_config = RTCConfiguration(
119
- {"iceServers": [{"urls": ["stun:stun.l.google.com:19302"]}]}
120
- # Add TURN servers here for production stability!
121
- )
122
-
123
  webrtc_streamer(
124
- key="broadcaster",
125
  mode=webrtc_streamer.WebRtcMode.SENDRECV,
126
  rtc_configuration=rtc_config,
127
- video_processor_factory=BroadcastProcessor,
128
  media_stream_constraints={"video": {"width": 640, "height": 480}, "audio": False},
129
  async_processing=True,
130
  )
131
 
132
  with tab2:
133
- st.header("Viewer (HLS/DVR)")
134
-
135
- # Since Streamlit can't easily serve static HLS files from /tmp,
136
- # we use a hack to render the HLS player pointing to the local file path
137
- # OR we assume this tab is for "Monitoring" the generated files.
138
-
139
- # In a real scenario, you need a separate "File Server" to serve the /tmp folder.
140
- # However, Streamlit creates a static mount.
141
- # A workaround for testing:
142
-
143
- st.markdown(
144
- """
145
- <p>To watch the stream, the HLS files are being generated in <code>/tmp/hls</code>.</p>
146
- <p>Because Streamlit blocks direct access to /tmp, the recording is being sent to your Dataset.</p>
147
- """,
148
- unsafe_allow_html=True
149
- )
150
-
151
- if st.button("List Generated Segments"):
152
- if os.path.exists(HLS_DIR):
153
- files = os.listdir(HLS_DIR)
154
- st.write(files)
155
-
 
 
 
1
  import os
2
+ import av
 
3
  import threading
4
+ import subprocess
5
+ import time
6
  import streamlit as st
7
  from streamlit_webrtc import webrtc_streamer, VideoProcessorBase, RTCConfiguration
8
  from huggingface_hub import HfApi
9
 
10
+ # ==========================================
11
+ # 1. PAGE CONFIG (MUST BE FIRST!)
12
+ # ==========================================
13
+ st.set_page_config(page_title="HF Cloud Streamer", page_icon="πŸ”΄", layout="wide")
14
+
15
+ # ==========================================
16
+ # 2. CONFIGURATION & SETUP
17
+ # ==========================================
18
+ DATASET_ID = "Mafia2008/Vod"
19
+ BASE_URL = f"https://huggingface.co/datasets/{DATASET_ID}/resolve/main/recordings"
20
+ LOCAL_DIR = "/tmp/hls"
21
  HF_TOKEN = os.environ.get("HF_TOKEN")
 
22
 
23
+ # Ensure local directory exists
24
+ if not os.path.exists(LOCAL_DIR):
25
+ os.makedirs(LOCAL_DIR, exist_ok=True)
26
 
27
+ # Initialize API (Graceful failure if token missing)
28
+ api = None
29
+ if HF_TOKEN:
30
+ api = HfApi(token=HF_TOKEN)
31
+ else:
32
+ st.warning("⚠️ HF_TOKEN is missing in Space Settings! Recording will not be saved.")
33
+
34
+ # ==========================================
35
+ # 3. FFMPEG ENGINE (TRANSCODER)
36
+ # ==========================================
37
+ class HLSTranscoder:
38
  def __init__(self):
39
  self.process = None
40
 
41
+ def start(self):
42
  if self.process is None:
43
  cmd = [
44
  "ffmpeg",
45
  "-f", "rawvideo",
46
  "-pixel_format", "bgr24",
47
+ "-video_size", "640x480",
48
  "-framerate", "30",
49
+ "-i", "pipe:0",
50
  "-c:v", "libx264",
51
+ "-preset", "veryfast",
52
  "-tune", "zerolatency",
53
+ "-g", "60",
54
  "-sc_threshold", "0",
55
  "-f", "hls",
56
+ "-hls_time", "4",
57
+ "-hls_list_size", "0",
58
+ "-hls_segment_filename", f"{LOCAL_DIR}/segment_%03d.ts",
59
+ f"{LOCAL_DIR}/stream.m3u8"
60
  ]
 
61
  self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE)
62
+ print("🟒 FFmpeg Transcoder Started")
63
 
64
  def write_frame(self, frame):
65
  if self.process and self.process.stdin:
66
  try:
 
67
  self.process.stdin.write(frame.tobytes())
68
+ except Exception:
69
+ pass
70
+
71
+ # Global engine
72
+ engine = HLSTranscoder()
73
+
74
+ # ==========================================
75
+ # 4. BACKGROUND UPLOADER
76
+ # ==========================================
77
+ def dataset_sync_worker():
78
+ uploaded_files = set()
79
+ while True:
80
+ # Only run if token exists
81
+ if api and os.path.exists(LOCAL_DIR):
82
+ try:
83
+ files = sorted(os.listdir(LOCAL_DIR))
84
+ ts_files = [f for f in files if f.endswith(".ts")]
85
+
86
+ # Upload segments
87
+ for f in ts_files:
88
+ if f not in uploaded_files:
89
+ if f == ts_files[-1] and len(ts_files) > 1: continue # Skip active file
90
+
91
+ file_path = os.path.join(LOCAL_DIR, f)
92
+ api.upload_file(
93
+ path_or_fileobj=file_path,
94
+ path_in_repo=f"recordings/{f}",
95
+ repo_id=DATASET_ID,
96
+ repo_type="dataset"
97
+ )
98
+ uploaded_files.add(f)
99
+ print(f"βœ… Uploaded {f}")
100
+
101
+ # Update Playlist
102
+ m3u8_path = os.path.join(LOCAL_DIR, "stream.m3u8")
103
+ if os.path.exists(m3u8_path) and len(uploaded_files) > 0:
104
+ with open(m3u8_path, "r") as file:
105
+ lines = file.readlines()
106
+
107
+ new_lines = []
108
+ for line in lines:
109
+ clean = line.strip()
110
+ if clean.endswith(".ts"):
111
+ new_lines.append(f"{BASE_URL}/{clean}\n")
112
+ else:
113
+ new_lines.append(line)
114
+
115
+ public_path = os.path.join(LOCAL_DIR, "public_stream.m3u8")
116
+ with open(public_path, "w") as file:
117
+ file.writelines(new_lines)
118
+
119
+ api.upload_file(
120
+ path_or_fileobj=public_path,
121
+ path_in_repo="recordings/stream.m3u8",
122
+ repo_id=DATASET_ID,
123
+ repo_type="dataset"
124
+ )
125
  except Exception as e:
126
+ print(f"Sync Error: {e}")
127
+ time.sleep(4)
128
 
129
+ if "sync_thread_active" not in st.session_state:
130
+ threading.Thread(target=dataset_sync_worker, daemon=True).start()
131
+ st.session_state["sync_thread_active"] = True
132
 
133
+ # ==========================================
134
+ # 5. WEBRTC PROCESSOR
135
+ # ==========================================
136
+ class VideoProcessor(VideoProcessorBase):
137
  def __init__(self):
138
+ engine.start()
 
 
139
 
140
  def recv(self, frame: av.VideoFrame) -> av.VideoFrame:
141
  img = frame.to_ndarray(format="bgr24")
 
 
142
  engine.write_frame(img)
 
 
143
  return frame
144
 
145
+ # ==========================================
146
+ # 6. UI LAYOUT
147
+ # ==========================================
148
+ st.title("πŸ”΄ Live Streamer β†’ HF Dataset")
149
+ st.caption(f"Archiving to: **{DATASET_ID}**")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
150
 
151
+ tab1, tab2 = st.tabs(["πŸŽ₯ Broadcaster", "🌍 Viewer Link"])
152
 
153
  with tab1:
154
+ st.info("Allow camera access to start.")
155
+ rtc_config = RTCConfiguration({"iceServers": [{"urls": ["stun:stun.l.google.com:19302"]}]})
 
 
 
 
 
 
 
156
  webrtc_streamer(
157
+ key="streamer",
158
  mode=webrtc_streamer.WebRtcMode.SENDRECV,
159
  rtc_configuration=rtc_config,
160
+ video_processor_factory=VideoProcessor,
161
  media_stream_constraints={"video": {"width": 640, "height": 480}, "audio": False},
162
  async_processing=True,
163
  )
164
 
165
  with tab2:
166
+ st.success("Stream is Live!")
167
+ st.code(f"{BASE_URL}/stream.m3u8", language="text")
168
+ st.markdown(f"[πŸ“‚ Dataset Files](https://huggingface.co/datasets/{DATASET_ID}/tree/main/recordings)")
169
+