arthrod commited on
Commit
7a42892
·
verified ·
1 Parent(s): 66b4d35

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +270 -2
app.py CHANGED
@@ -30,10 +30,278 @@ import numpy.typing as npt
30
  from fastrtc import AsyncAudioVideoStreamHandler, ReplyOnPause, Stream, get_cloudflare_turn_credentials_async
31
  from google import genai
32
  from google.genai import types
33
- sys.path.append(os.path.join(os.path.dirname(__file__), 'src', 'gradio_screenrecorder', 'backend'))
34
- from .src.backend.gradio_screenrecorder import ScreenRecorder
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
35
 
 
 
 
36
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
37
  # Environment variable for API key
38
  API_KEY = os.getenv("GEMINI_API_KEY", "")
39
 
 
30
  from fastrtc import AsyncAudioVideoStreamHandler, ReplyOnPause, Stream, get_cloudflare_turn_credentials_async
31
  from google import genai
32
  from google.genai import types
33
+ import gradio as gr
34
+ from gradio.components.base import Component
35
+ from gradio.data_classes import FileData, GradioModel
36
+ from typing import Optional, Literal, Any
37
+ import tempfile
38
+ import os
39
+ import json
40
+
41
+ class ScreenRecorderData(GradioModel):
42
+ video: Optional[FileData] = None
43
+ duration: Optional[float] = None
44
+ audio_enabled: bool = True
45
+ status: Literal["recording", "stopped", "error"] = "stopped"
46
+
47
+ class Config:
48
+ json_encoders = {
49
+ FileData: lambda v: v.model_dump() if v else None
50
+ }
51
+
52
+
53
+ class ScreenRecorder(Component):
54
+ """
55
+ Custom Gradio component for comprehensive screen recording functionality.
56
+ """
57
+
58
+ data_model = ScreenRecorderData
59
+
60
+ EVENTS = [
61
+ "record_start",
62
+ "record_stop",
63
+ "stream_update",
64
+ "change"
65
+ ]
66
+
67
+ def __init__(
68
+ self,
69
+ value=None,
70
+ audio_enabled: bool = True,
71
+ webcam_overlay: bool = False,
72
+ webcam_position: Literal["top-left", "top-right", "bottom-left", "bottom-right"] = "bottom-right",
73
+ recording_format: str = "webm",
74
+ max_duration: Optional[int] = None,
75
+ interactive: bool = True,
76
+ **kwargs
77
+ ):
78
+ self.audio_enabled = audio_enabled
79
+ self.webcam_overlay = webcam_overlay
80
+ self.webcam_position = webcam_position
81
+ self.recording_format = recording_format
82
+ self.max_duration = max_duration
83
+ self._status = "stopped"
84
+
85
+ super().__init__(
86
+ value=value,
87
+ interactive=interactive,
88
+ **kwargs
89
+ )
90
+
91
+ def example_payload(self) -> dict:
92
+ """
93
+ The example inputs for this component for API usage. Must be JSON-serializable.
94
+ """
95
+ return {
96
+ "video": {
97
+ "path": "https://sample-videos.com/zip/10/mp4/SampleVideo_360x240_1mb.mp4",
98
+ "orig_name": "example_recording.webm",
99
+ "size": 1024000
100
+ },
101
+ "duration": 30.5,
102
+ "audio_enabled": True,
103
+ "status": "stopped"
104
+ }
105
+
106
+ def example_value(self) -> ScreenRecorderData:
107
+ """
108
+ An example value for this component for the default app.
109
+ """
110
+ return ScreenRecorderData(
111
+ video=FileData(
112
+ path="https://sample-videos.com/zip/10/mp4/SampleVideo_360x240_1mb.mp4",
113
+ orig_name="example_recording.webm",
114
+ size=1024000
115
+ ),
116
+ duration=30.5,
117
+ audio_enabled=True,
118
+ status="stopped"
119
+ )
120
+
121
+ def flag(self, x, flag_dir: str = "") -> str:
122
+ """
123
+ Write the component's value to a format for flagging (CSV storage).
124
+ """
125
+ if x is None:
126
+ return ""
127
+
128
+ if isinstance(x, ScreenRecorderData) and x.video:
129
+ return f"Recording: {x.video.orig_name} ({x.duration}s) - Status: {x.status}"
130
+
131
+ if isinstance(x, dict) and "video" in x:
132
+ duration = x.get("duration", "unknown")
133
+ status = x.get("status", "unknown")
134
+ video_name = x["video"].get("orig_name", "unknown") if x["video"] else "none"
135
+ return f"Recording: {video_name} ({duration}s) - Status: {status}"
136
+
137
+ return str(x)
138
+
139
+ def preprocess(self, payload) -> Optional[ScreenRecorderData]:
140
+ """Process incoming recording data from frontend."""
141
+ if payload is None:
142
+ return None
143
 
144
+ if isinstance(payload, dict):
145
+ if payload.get("status") == "error": # Early exit for errors from frontend
146
+ raise gr.Error(f"Recording failed on frontend: {payload.get('error', 'Unknown error')}")
147
 
148
+ # If 'video' field is a string, assume it's JSON and parse it.
149
+ if "video" in payload and isinstance(payload["video"], str):
150
+ try:
151
+ video_json_string = payload["video"]
152
+ if video_json_string.strip().startswith("{") and video_json_string.strip().endswith("}"):
153
+ payload["video"] = json.loads(video_json_string)
154
+ # If it's a string but not our expected JSON (e.g. 'null', or empty string, or simple path)
155
+ # json.loads would fail or Pydantic validation later will catch it if structure is wrong.
156
+ # For 'null' string, json.loads results in None for payload["video"].
157
+ elif video_json_string.lower() == 'null':
158
+ payload["video"] = None
159
+ else:
160
+ # This case implies a string that isn't a JSON object or 'null',
161
+ # e.g. a direct file path string, which FileData might not directly accept
162
+ # if it expects a dict. Pydantic will raise error later if type is incompatible.
163
+ gr.Warning(f"Video data is a string but not a recognized JSON object or 'null': {video_json_string[:100]}")
164
+ # To be safe, if it's not a JSON object string, we might want to error or handle specifically
165
+ # For now, let Pydantic try to handle it or fail.
166
+
167
+ except json.JSONDecodeError:
168
+ raise gr.Error(f"Invalid JSON for video data: {payload['video'][:100]}")
169
+
170
+ # --- Validations from here ---
171
+ video_data = payload.get("video") # Use .get() for safety, as 'video' might be absent or None
172
+
173
+ if video_data is not None: # Only validate video_data if it exists
174
+ if not isinstance(video_data, dict):
175
+ # This can happen if payload["video"] was a string like "some_path.webm" and not parsed to dict
176
+ # Or if it was parsed to something unexpected.
177
+ raise gr.Error(f"Video data is not a dictionary after processing: {type(video_data)}. Value: {str(video_data)[:100]}")
178
+
179
+ if video_data.get("size", 0) == 0:
180
+ gr.Warning("Received recording with zero size. This might be an empty recording or an issue with data capture.")
181
+ # Depending on requirements, could raise gr.Error here.
182
+
183
+ max_size = 500 * 1024 * 1024 # 500MB
184
+ if video_data.get("size", 0) > max_size:
185
+ raise gr.Error(f"Recording file too large ({video_data.get('size', 0)} bytes). Maximum allowed: {max_size} bytes.")
186
+ # If video_data is None (e.g. 'video': null was sent, or 'video' key missing),
187
+ # ScreenRecorderData will have video=None, which is allowed by Optional[FileData].
188
+
189
+ duration = payload.get("duration", 0)
190
+ if duration <= 0 and video_data is not None: # Only warn about duration if there's video data
191
+ gr.Warning("Recording duration is 0 or invalid. The recording might be corrupted.")
192
+
193
+ try:
194
+ return ScreenRecorderData(**payload)
195
+ except Exception as e: # Catch Pydantic validation errors or other issues during model instantiation
196
+ # Log the payload for easier debugging if there's a Pydantic error
197
+ # Be careful with logging sensitive data in production.
198
+ # print(f"Error creating ScreenRecorderData. Payload: {payload}")
199
+ raise gr.Error(f"Error creating ScreenRecorderData from payload: {e}")
200
+
201
+ elif isinstance(payload, ScreenRecorderData): # If it's already the correct type
202
+ return payload
203
+
204
+ gr.Warning(f"Unexpected payload format: {type(payload)}. Payload: {str(payload)[:200]}")
205
+ return None
206
+
207
+ # def postprocess(self, value) -> Optional[dict]:
208
+ # """Process outgoing data to frontend."""
209
+ # if value is None:
210
+ # return {"status": "stopped"} # Ensure valid empty state
211
+
212
+ # try:
213
+ # if isinstance(value, ScreenRecorderData):
214
+ # return value.model_dump()
215
+ # elif isinstance(value, dict):
216
+ # return value
217
+ # return None
218
+ # except Exception as e:
219
+ # return {"status": "error", "error": str(e)}
220
+
221
+
222
+ def postprocess(self, value) -> Optional[dict]:
223
+ """Process outgoing data to frontend."""
224
+ print(f'value in postprocess: {value}')
225
+ if value is None:
226
+ return None
227
+
228
+ try:
229
+ # If it's already a dict, return as is
230
+ if isinstance(value, dict):
231
+ return value
232
+
233
+ # If it's a ScreenRecorderData object, convert to dict
234
+ if hasattr(value, 'model_dump'):
235
+ return value.model_dump()
236
+
237
+ # Handle string values
238
+ if isinstance(value, str):
239
+ return {"video": {"path": value}}
240
+
241
+ return None
242
+
243
+ except Exception as e:
244
+ print(f'Error in postprocess: {e}')
245
+ return None
246
+
247
+
248
+ # try:
249
+ # if isinstance(value, ScreenRecorderData):
250
+ # # Ensure video data exists before sending
251
+ # if not value.video:
252
+ # return {"status": "error", "error": "No video recorded"}
253
+
254
+ # return {
255
+ # "video": value.video,
256
+ # "duration": value.duration,
257
+ # "audio_enabled": value.audio_enabled,
258
+ # "status": value.status
259
+ # }
260
+
261
+ # # Handle raw dict format from frontend
262
+ # if isinstance(value, dict):
263
+ # return {
264
+ # "video": FileData(**value.get("video", {})),
265
+ # "duration": value.get("duration"),
266
+ # "audio_enabled": value.get("audio_enabled", True),
267
+ # "status": value.get("status", "stopped")
268
+ # }
269
+
270
+ # except Exception as e:
271
+ # return {"status": "error", "error": str(e)}
272
+
273
+ # return {"status": "stopped"}
274
+
275
+ def as_example(self, input_data):
276
+ """Handle example data display."""
277
+ if input_data is None:
278
+ return None
279
+
280
+ if isinstance(input_data, (ScreenRecorderData, dict)):
281
+ return input_data
282
+
283
+ # Convert simple video path to proper format
284
+ if isinstance(input_data, str):
285
+ return {
286
+ "video": {
287
+ "path": input_data,
288
+ "orig_name": os.path.basename(input_data),
289
+ "size": 0
290
+ },
291
+ "duration": None,
292
+ "audio_enabled": self.audio_enabled,
293
+ "status": "stopped"
294
+ }
295
+
296
+ return input_data
297
+
298
+ def update_status(self, status: Literal["recording", "stopped", "error"]):
299
+ """Update the internal status of the recorder."""
300
+ self._status = status
301
+
302
+ def get_status(self) -> str:
303
+ """Get the current status of the recorder."""
304
+ return self._status
305
  # Environment variable for API key
306
  API_KEY = os.getenv("GEMINI_API_KEY", "")
307