Hug0endob commited on
Commit
02d7acf
·
verified ·
1 Parent(s): 1a185c8

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +168 -83
app.py CHANGED
@@ -1,7 +1,6 @@
1
  #!/usr/bin/env python3
2
  # -*- coding: utf-8 -*-
3
 
4
- from __future__ import annotations
5
  import os, shutil, subprocess, tempfile, base64, json
6
  from io import BytesIO
7
  from typing import List, Tuple
@@ -9,7 +8,6 @@ import requests
9
  from PIL import Image, ImageFile, UnidentifiedImageError
10
  import gradio as gr
11
 
12
- # --- Config
13
  DEFAULT_KEY = os.getenv("MISTRAL_API_KEY", "")
14
  PIXTRAL_MODEL = "pixtral-12b-2409"
15
  VIDEO_MODEL = "voxtral-mini-latest"
@@ -75,8 +73,10 @@ def fetch_bytes(src: str, stream_threshold: int = STREAM_THRESHOLD, timeout: int
75
  try:
76
  with open(p, "wb") as fh:
77
  for chunk in r.iter_content(8192):
78
- if chunk: fh.write(chunk)
79
- with open(p, "rb") as fh: return fh.read()
 
 
80
  finally:
81
  try: os.remove(p)
82
  except Exception: pass
@@ -85,23 +85,30 @@ def fetch_bytes(src: str, stream_threshold: int = STREAM_THRESHOLD, timeout: int
85
  r = safe_get(src, timeout=timeout)
86
  return r.content
87
  else:
88
- with open(src, "rb") as f: return f.read()
 
89
 
90
  def save_bytes_to_temp(b: bytes, suffix: str) -> str:
91
- fd, path = tempfile.mkstemp(suffix=suffix); os.close(fd)
92
- with open(path, "wb") as f: f.write(b)
 
 
93
  return path
94
 
95
  def convert_to_jpeg_bytes(img_bytes: bytes, base_h: int = 480) -> bytes:
96
  img = Image.open(BytesIO(img_bytes))
97
  try:
98
- if getattr(img, "is_animated", False): img.seek(0)
99
- except Exception: pass
100
- if img.mode != "RGB": img = img.convert("RGB")
 
 
 
101
  h = base_h
102
  w = max(1, int(img.width * (h / img.height)))
103
  img = img.resize((w, h), Image.LANCZOS)
104
- buf = BytesIO(); img.save(buf, format="JPEG", quality=85)
 
105
  return buf.getvalue()
106
 
107
  def b64_bytes(b: bytes, mime: str = "image/jpeg") -> str:
@@ -109,20 +116,43 @@ def b64_bytes(b: bytes, mime: str = "image/jpeg") -> str:
109
 
110
  def extract_best_frames_bytes(media_path: str, sample_count: int = 5, timeout_extract: int = 15) -> List[bytes]:
111
  frames: List[bytes] = []
112
- if not FFMPEG_BIN or not os.path.exists(media_path): return frames
 
113
  timestamps = [0.5, 1.0, 2.0, 3.0, 4.0][:sample_count]
114
  for i, t in enumerate(timestamps):
115
- fd, tmp = tempfile.mkstemp(suffix=f"_{i}.jpg"); os.close(fd)
116
- cmd = [FFMPEG_BIN, "-nostdin", "-y", "-ss", str(t), "-i", media_path, "-frames:v", "1", "-q:v", "2", tmp]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
117
  try:
118
- subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=timeout_extract)
 
 
 
 
 
119
  if os.path.exists(tmp) and os.path.getsize(tmp) > 0:
120
- with open(tmp, "rb") as f: frames.append(f.read())
 
121
  except Exception:
122
  pass
123
  finally:
124
- try: os.remove(tmp)
125
- except Exception: pass
 
 
126
  return frames
127
 
128
  def chat_complete(client, model: str, messages, timeout: int = 120) -> str:
@@ -132,14 +162,33 @@ def chat_complete(client, model: str, messages, timeout: int = 120) -> str:
132
  else:
133
  api_key = getattr(client, "api_key", "") or DEFAULT_KEY
134
  url = "https://api.mistral.ai/v1/chat/completions"
135
- headers = ({"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} if api_key else {"Content-Type": "application/json"})
136
- r = requests.post(url, json={"model": model, "messages": messages}, headers=headers, timeout=timeout)
137
- r.raise_for_status(); res = r.json()
 
 
 
 
 
 
 
 
 
 
138
  choices = getattr(res, "choices", None) or (res.get("choices") if isinstance(res, dict) else [])
139
- if not choices: return str(res)
 
140
  first = choices[0]
141
- msg = first.message if hasattr(first, "message") else (first.get("message") if isinstance(first, dict) else first)
142
- content = msg.get("content") if isinstance(msg, dict) else getattr(msg, "content", None)
 
 
 
 
 
 
 
 
143
  return content.strip() if isinstance(content, str) else str(content)
144
  except Exception as e:
145
  return f"Error during model call: {e}"
@@ -151,7 +200,8 @@ def upload_file_to_mistral(client, path: str, filename: str | None = None, purpo
151
  with open(path, "rb") as fh:
152
  res = client.files.upload(file={"file_name": fname, "content": fh}, purpose=purpose)
153
  fid = getattr(res, "id", None) or (res.get("id") if isinstance(res, dict) else None)
154
- if not fid: fid = res["data"][0]["id"]
 
155
  return fid
156
  except Exception:
157
  pass
@@ -159,56 +209,95 @@ def upload_file_to_mistral(client, path: str, filename: str | None = None, purpo
159
  url = "https://api.mistral.ai/v1/files"
160
  headers = {"Authorization": f"Bearer {api_key}"} if api_key else {}
161
  with open(path, "rb") as fh:
162
- files = {"file": (fname, fh)}; data = {"purpose": purpose}
163
- r = requests.post(url, headers=headers, files=files, data=data, timeout=timeout); r.raise_for_status(); jr = r.json()
 
 
 
164
  return jr.get("id") or jr.get("data", [{}])[0].get("id")
165
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
166
  def analyze_image_structured(client, img_bytes: bytes, prompt: str) -> str:
167
  jpeg = convert_to_jpeg_bytes(img_bytes, base_h=1024)
168
  data_url = b64_bytes(jpeg, mime="image/jpeg")
169
- messages = [{"role": "system", "content": SYSTEM_INSTRUCTION},
170
- {"role": "user", "content": [{"type": "text", "text": prompt}, {"type": "image_url", "image_url": data_url}]}]
 
 
 
 
 
 
 
 
171
  return chat_complete(client, PIXTRAL_MODEL, messages)
172
 
 
173
  def analyze_video_cohesive(client, video_path: str, prompt: str) -> str:
174
  try:
175
  file_id = upload_file_to_mistral(client, video_path, filename=os.path.basename(video_path))
176
- extra_msg = f"Uploaded video file id: {file_id}\n\nInstruction: Analyze the entire video and produce a single cohesive narrative describing consistent observations."
177
- messages = [{"role": "system", "content": SYSTEM_INSTRUCTION}, {"role": "user", "content": extra_msg + "\n\n" + prompt}]
 
 
 
 
 
 
178
  return chat_complete(client, VIDEO_MODEL, messages)
179
  except Exception:
180
  frames = extract_best_frames_bytes(video_path, sample_count=6)
181
- if not frames: return "Error: could not upload video and no frames could be extracted."
 
182
  image_entries = []
183
  for i, fb in enumerate(frames, start=1):
184
  try:
185
  j = convert_to_jpeg_bytes(fb, base_h=720)
186
- image_entries.append({"type": "image_url", "image_url": b64_bytes(j, mime="image/jpeg"), "meta": {"frame_index": i}})
 
 
 
 
 
 
187
  except Exception:
188
  continue
189
- content = [{"type": "text", "text": prompt + "\n\nPlease consolidate observations across these frames into a single cohesive narrative."}] + image_entries
190
- messages = [{"role": "system", "content": SYSTEM_INSTRUCTION}, {"role": "user", "content": content}]
 
 
 
 
 
191
  return chat_complete(client, PIXTRAL_MODEL, messages)
192
 
193
- def determine_media_type(src: str) -> Tuple[bool, bool]:
194
- is_image = False; is_video = False
195
- ext = ext_from_src(src)
196
- if ext in IMAGE_EXTS: is_image = True
197
- if ext in VIDEO_EXTS: is_video = True
198
- if is_remote(src):
199
- head = safe_head(src)
200
- if head:
201
- ctype = (head.headers.get("content-type") or "").lower()
202
- if ctype.startswith("image/"): is_image, is_video = True, False
203
- elif ctype.startswith("video/"): is_video, is_image = True, False
204
- return is_image, is_video
205
 
206
  def process_media(src: str, custom_prompt: str, api_key: str, progress=gr.Progress()) -> str:
207
  client = get_client(api_key)
208
  prompt = (custom_prompt or "").strip() or "Please provide a detailed visual review."
209
- if not src: return "No URL or path provided."
 
210
  progress(0.05, desc="Determining media type")
211
  is_image, is_video = determine_media_type(src)
 
212
  if is_image:
213
  try:
214
  raw = fetch_bytes(src)
@@ -221,6 +310,7 @@ def process_media(src: str, custom_prompt: str, api_key: str, progress=gr.Progre
221
  return "Error: provided file is not a valid image."
222
  except Exception as e:
223
  return f"Error analyzing image: {e}"
 
224
  if is_video:
225
  try:
226
  raw = fetch_bytes(src, timeout=120)
@@ -231,8 +321,12 @@ def process_media(src: str, custom_prompt: str, api_key: str, progress=gr.Progre
231
  progress(0.2, desc="Analyzing video")
232
  return analyze_video_cohesive(client, tmp_path, prompt)
233
  finally:
234
- try: os.remove(tmp_path)
235
- except Exception: pass
 
 
 
 
236
  try:
237
  raw = fetch_bytes(src)
238
  progress(0.2, desc="Treating as image")
@@ -240,11 +334,12 @@ def process_media(src: str, custom_prompt: str, api_key: str, progress=gr.Progre
240
  except Exception as e:
241
  return f"Unable to determine media type or fetch file: {e}"
242
 
243
- # --- Gradio UI
 
244
  css = ".preview_media img, .preview_media video { max-width: 100%; height: auto; border-radius:6px; }"
245
 
246
  def _btn_label_for_status(status: str) -> str:
247
- return {"idle": "Submit", "busy": "Processing…", "done": "Submit", "error": "Retry"}.get(status or "idle", "Submit")
248
 
249
  def create_demo():
250
  with gr.Blocks(title="Flux Multimodal", css=css) as demo:
@@ -252,44 +347,35 @@ def create_demo():
252
  with gr.Column(scale=1):
253
  preview_image = gr.Image(label="Preview Image", type="pil", elem_classes="preview_media", visible=False)
254
  preview_video = gr.Video(label="Preview Video", elem_classes="preview_media", visible=False)
255
- pip_button = gr.Button("Open Video in PiP", visible=False)
256
  with gr.Column(scale=2):
257
  url_input = gr.Textbox(label="Image / Video URL or local path", placeholder="https://... or /path/to/file", lines=1)
258
  with gr.Accordion("Prompt (optional)", open=False):
259
  custom_prompt = gr.Textbox(label="Prompt", lines=4, value="")
260
  with gr.Accordion("Mistral API Key (optional)", open=False):
261
  api_key = gr.Textbox(label="API Key", type="password", max_lines=1)
262
- submit_btn = gr.Button(_btn_label_for_status("idle"))
263
- clear_btn = gr.Button("Clear")
 
264
  output_md = gr.Markdown("")
265
  status_state = gr.State("idle")
266
 
267
- pip_html = gr.HTML("""<div id="pip-root" style="display:none"></div>
268
- <script>
269
- window.openPiP = (sel) => {
270
- try {
271
- const v = document.querySelector(sel);
272
- if (!v) return "no-video";
273
- if (v.requestPictureInPicture) { v.requestPictureInPicture(); return "opened"; }
274
- return "unsupported";
275
- } catch(e){ return "error:"+e; }
276
- };
277
- </script>""")
278
-
279
  def load_preview(url: str):
280
  empty_img = gr.update(value=None, visible=False)
281
  empty_vid = gr.update(value=None, visible=False)
282
- pip_vis = gr.update(visible=False)
283
- if not url: return empty_img, empty_vid, pip_vis
284
  if not is_remote(url) and os.path.exists(url):
285
  ext = ext_from_src(url)
286
- if ext in VIDEO_EXTS: return empty_img, gr.update(value=os.path.abspath(url), visible=True), gr.update(visible=True)
 
287
  if ext in IMAGE_EXTS:
288
  try:
289
  img = Image.open(url)
290
- if getattr(img, "is_animated", False): img.seek(0)
291
- return gr.update(value=img.convert("RGB"), visible=True), empty_vid, pip_vis
292
- except Exception: return empty_img, empty_vid, pip_vis
 
 
293
  head = safe_head(url)
294
  if head:
295
  ctype = (head.headers.get("content-type") or "").lower()
@@ -298,22 +384,19 @@ window.openPiP = (sel) => {
298
  try:
299
  r = safe_get(url, timeout=15)
300
  img = Image.open(BytesIO(r.content))
301
- if getattr(img, "is_animated", False): img.seek(0)
302
- return gr.update(value=img.convert("RGB"), visible=True), empty_vid, pip_vis
 
303
  except Exception:
304
- return empty_img, empty_vid, pip_vis
305
 
306
- url_input.change(fn=load_preview, inputs=[url_input], outputs=[preview_image, preview_video, pip_button])
 
307
 
308
  def clear_all():
309
  return "", gr.update(value=None, visible=False), gr.update(value=None, visible=False), "idle", gr.update(value=_btn_label_for_status("idle"))
310
  clear_btn.click(fn=clear_all, inputs=[], outputs=[url_input, preview_image, preview_video, status_state, submit_btn])
311
 
312
- def pip_click(_):
313
- js = "<script>setTimeout(()=>window.openPiP('video.preview_media'),50);</script>"
314
- return gr.HTML.update(value=js)
315
- pip_button.click(fn=pip_click, inputs=[url_input], outputs=[pip_html])
316
-
317
  def start_busy():
318
  s = "busy"
319
  return s, gr.update(value=_btn_label_for_status(s))
@@ -321,7 +404,9 @@ window.openPiP = (sel) => {
321
 
322
  def worker(url: str, prompt: str, key: str, progress=gr.Progress()):
323
  return process_media(url or "", prompt or "", key or "", progress=progress)
324
- submit_btn.click(fn=worker, inputs=[url_input, custom_prompt, api_key], outputs=[output_md], queue=True).then(
 
 
325
  fn=lambda res: ("error", "**Error:** no result returned.") if not res else
326
  ("error", f"**Error:** {res}") if isinstance(res, str) and res.lower().startswith("error") else ("done", res),
327
  inputs=[output_md],
 
1
  #!/usr/bin/env python3
2
  # -*- coding: utf-8 -*-
3
 
 
4
  import os, shutil, subprocess, tempfile, base64, json
5
  from io import BytesIO
6
  from typing import List, Tuple
 
8
  from PIL import Image, ImageFile, UnidentifiedImageError
9
  import gradio as gr
10
 
 
11
  DEFAULT_KEY = os.getenv("MISTRAL_API_KEY", "")
12
  PIXTRAL_MODEL = "pixtral-12b-2409"
13
  VIDEO_MODEL = "voxtral-mini-latest"
 
73
  try:
74
  with open(p, "wb") as fh:
75
  for chunk in r.iter_content(8192):
76
+ if chunk:
77
+ fh.write(chunk)
78
+ with open(p, "rb") as fh:
79
+ return fh.read()
80
  finally:
81
  try: os.remove(p)
82
  except Exception: pass
 
85
  r = safe_get(src, timeout=timeout)
86
  return r.content
87
  else:
88
+ with open(src, "rb") as f:
89
+ return f.read()
90
 
91
  def save_bytes_to_temp(b: bytes, suffix: str) -> str:
92
+ fd, path = tempfile.mkstemp(suffix=suffix)
93
+ os.close(fd)
94
+ with open(path, "wb") as f:
95
+ f.write(b)
96
  return path
97
 
98
  def convert_to_jpeg_bytes(img_bytes: bytes, base_h: int = 480) -> bytes:
99
  img = Image.open(BytesIO(img_bytes))
100
  try:
101
+ if getattr(img, "is_animated", False):
102
+ img.seek(0)
103
+ except Exception:
104
+ pass
105
+ if img.mode != "RGB":
106
+ img = img.convert("RGB")
107
  h = base_h
108
  w = max(1, int(img.width * (h / img.height)))
109
  img = img.resize((w, h), Image.LANCZOS)
110
+ buf = BytesIO()
111
+ img.save(buf, format="JPEG", quality=85)
112
  return buf.getvalue()
113
 
114
  def b64_bytes(b: bytes, mime: str = "image/jpeg") -> str:
 
116
 
117
  def extract_best_frames_bytes(media_path: str, sample_count: int = 5, timeout_extract: int = 15) -> List[bytes]:
118
  frames: List[bytes] = []
119
+ if not FFMPEG_BIN or not os.path.exists(media_path):
120
+ return frames
121
  timestamps = [0.5, 1.0, 2.0, 3.0, 4.0][:sample_count]
122
  for i, t in enumerate(timestamps):
123
+ fd, tmp = tempfile.mkstemp(suffix=f"_{i}.jpg")
124
+ os.close(fd)
125
+ cmd = [
126
+ FFMPEG_BIN,
127
+ "-nostdin",
128
+ "-y",
129
+ "-ss",
130
+ str(t),
131
+ "-i",
132
+ media_path,
133
+ "-frames:v",
134
+ "1",
135
+ "-q:v",
136
+ "2",
137
+ tmp,
138
+ ]
139
  try:
140
+ subprocess.run(
141
+ cmd,
142
+ stdout=subprocess.DEVNULL,
143
+ stderr=subprocess.DEVNULL,
144
+ timeout=timeout_extract,
145
+ )
146
  if os.path.exists(tmp) and os.path.getsize(tmp) > 0:
147
+ with open(tmp, "rb") as f:
148
+ frames.append(f.read())
149
  except Exception:
150
  pass
151
  finally:
152
+ try:
153
+ os.remove(tmp)
154
+ except Exception:
155
+ pass
156
  return frames
157
 
158
  def chat_complete(client, model: str, messages, timeout: int = 120) -> str:
 
162
  else:
163
  api_key = getattr(client, "api_key", "") or DEFAULT_KEY
164
  url = "https://api.mistral.ai/v1/chat/completions"
165
+ headers = (
166
+ {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
167
+ if api_key
168
+ else {"Content-Type": "application/json"}
169
+ )
170
+ r = requests.post(
171
+ url,
172
+ json={"model": model, "messages": messages},
173
+ headers=headers,
174
+ timeout=timeout,
175
+ )
176
+ r.raise_for_status()
177
+ res = r.json()
178
  choices = getattr(res, "choices", None) or (res.get("choices") if isinstance(res, dict) else [])
179
+ if not choices:
180
+ return str(res)
181
  first = choices[0]
182
+ msg = (
183
+ first.message
184
+ if hasattr(first, "message")
185
+ else (first.get("message") if isinstance(first, dict) else first)
186
+ )
187
+ content = (
188
+ msg.get("content")
189
+ if isinstance(msg, dict)
190
+ else getattr(msg, "content", None)
191
+ )
192
  return content.strip() if isinstance(content, str) else str(content)
193
  except Exception as e:
194
  return f"Error during model call: {e}"
 
200
  with open(path, "rb") as fh:
201
  res = client.files.upload(file={"file_name": fname, "content": fh}, purpose=purpose)
202
  fid = getattr(res, "id", None) or (res.get("id") if isinstance(res, dict) else None)
203
+ if not fid:
204
+ fid = res["data"][0]["id"]
205
  return fid
206
  except Exception:
207
  pass
 
209
  url = "https://api.mistral.ai/v1/files"
210
  headers = {"Authorization": f"Bearer {api_key}"} if api_key else {}
211
  with open(path, "rb") as fh:
212
+ files = {"file": (fname, fh)}
213
+ data = {"purpose": purpose}
214
+ r = requests.post(url, headers=headers, files=files, data=data, timeout=timeout)
215
+ r.raise_for_status()
216
+ jr = r.json()
217
  return jr.get("id") or jr.get("data", [{}])[0].get("id")
218
 
219
+ def determine_media_type(src: str) -> Tuple[bool, bool]:
220
+ is_image = False
221
+ is_video = False
222
+ ext = ext_from_src(src)
223
+
224
+ if ext in IMAGE_EXTS:
225
+ is_image = True
226
+ if ext in VIDEO_EXTS:
227
+ is_video = True
228
+
229
+ if is_remote(src):
230
+ head = safe_head(src)
231
+ if head:
232
+ ctype = (head.headers.get("content-type") or "").lower()
233
+ if ctype.startswith("image/"):
234
+ is_image, is_video = True, False
235
+ elif ctype.startswith("video/"):
236
+ is_video, is_image = True, False
237
+ return is_image, is_video
238
  def analyze_image_structured(client, img_bytes: bytes, prompt: str) -> str:
239
  jpeg = convert_to_jpeg_bytes(img_bytes, base_h=1024)
240
  data_url = b64_bytes(jpeg, mime="image/jpeg")
241
+ messages = [
242
+ {"role": "system", "content": SYSTEM_INSTRUCTION},
243
+ {
244
+ "role": "user",
245
+ "content": [
246
+ {"type": "text", "text": prompt},
247
+ {"type": "image_url", "image_url": data_url},
248
+ ],
249
+ },
250
+ ]
251
  return chat_complete(client, PIXTRAL_MODEL, messages)
252
 
253
+
254
  def analyze_video_cohesive(client, video_path: str, prompt: str) -> str:
255
  try:
256
  file_id = upload_file_to_mistral(client, video_path, filename=os.path.basename(video_path))
257
+ extra_msg = (
258
+ f"Uploaded video file id: {file_id}\n\n"
259
+ "Instruction: Analyze the entire video and produce a single cohesive narrative describing consistent observations."
260
+ )
261
+ messages = [
262
+ {"role": "system", "content": SYSTEM_INSTRUCTION},
263
+ {"role": "user", "content": extra_msg + "\n\n" + prompt},
264
+ ]
265
  return chat_complete(client, VIDEO_MODEL, messages)
266
  except Exception:
267
  frames = extract_best_frames_bytes(video_path, sample_count=6)
268
+ if not frames:
269
+ return "Error: could not upload video and no frames could be extracted."
270
  image_entries = []
271
  for i, fb in enumerate(frames, start=1):
272
  try:
273
  j = convert_to_jpeg_bytes(fb, base_h=720)
274
+ image_entries.append(
275
+ {
276
+ "type": "image_url",
277
+ "image_url": b64_bytes(j, mime="image/jpeg"),
278
+ "meta": {"frame_index": i},
279
+ }
280
+ )
281
  except Exception:
282
  continue
283
+ content = [
284
+ {"type": "text", "text": prompt + "\n\nPlease consolidate observations across these frames into a single cohesive narrative."}
285
+ ] + image_entries
286
+ messages = [
287
+ {"role": "system", "content": SYSTEM_INSTRUCTION},
288
+ {"role": "user", "content": content},
289
+ ]
290
  return chat_complete(client, PIXTRAL_MODEL, messages)
291
 
 
 
 
 
 
 
 
 
 
 
 
 
292
 
293
  def process_media(src: str, custom_prompt: str, api_key: str, progress=gr.Progress()) -> str:
294
  client = get_client(api_key)
295
  prompt = (custom_prompt or "").strip() or "Please provide a detailed visual review."
296
+ if not src:
297
+ return "No URL or path provided."
298
  progress(0.05, desc="Determining media type")
299
  is_image, is_video = determine_media_type(src)
300
+
301
  if is_image:
302
  try:
303
  raw = fetch_bytes(src)
 
310
  return "Error: provided file is not a valid image."
311
  except Exception as e:
312
  return f"Error analyzing image: {e}"
313
+
314
  if is_video:
315
  try:
316
  raw = fetch_bytes(src, timeout=120)
 
321
  progress(0.2, desc="Analyzing video")
322
  return analyze_video_cohesive(client, tmp_path, prompt)
323
  finally:
324
+ try:
325
+ os.remove(tmp_path)
326
+ except Exception:
327
+ pass
328
+
329
+ # Fallback: treat as image
330
  try:
331
  raw = fetch_bytes(src)
332
  progress(0.2, desc="Treating as image")
 
334
  except Exception as e:
335
  return f"Unable to determine media type or fetch file: {e}"
336
 
337
+
338
+ # ------------------- Gradio UI -------------------
339
  css = ".preview_media img, .preview_media video { max-width: 100%; height: auto; border-radius:6px; }"
340
 
341
  def _btn_label_for_status(status: str) -> str:
342
+ return {"idle": "Submit", "busy": "Processing…", "done": "Submit", "error": "Retry"}.get(status, "Submit")
343
 
344
  def create_demo():
345
  with gr.Blocks(title="Flux Multimodal", css=css) as demo:
 
347
  with gr.Column(scale=1):
348
  preview_image = gr.Image(label="Preview Image", type="pil", elem_classes="preview_media", visible=False)
349
  preview_video = gr.Video(label="Preview Video", elem_classes="preview_media", visible=False)
 
350
  with gr.Column(scale=2):
351
  url_input = gr.Textbox(label="Image / Video URL or local path", placeholder="https://... or /path/to/file", lines=1)
352
  with gr.Accordion("Prompt (optional)", open=False):
353
  custom_prompt = gr.Textbox(label="Prompt", lines=4, value="")
354
  with gr.Accordion("Mistral API Key (optional)", open=False):
355
  api_key = gr.Textbox(label="API Key", type="password", max_lines=1)
356
+ with gr.Row():
357
+ submit_btn = gr.Button(_btn_label_for_status("idle"))
358
+ clear_btn = gr.Button("Clear")
359
  output_md = gr.Markdown("")
360
  status_state = gr.State("idle")
361
 
 
 
 
 
 
 
 
 
 
 
 
 
362
  def load_preview(url: str):
363
  empty_img = gr.update(value=None, visible=False)
364
  empty_vid = gr.update(value=None, visible=False)
365
+ if not url:
366
+ return empty_img, empty_vid, gr.update(visible=False)
367
  if not is_remote(url) and os.path.exists(url):
368
  ext = ext_from_src(url)
369
+ if ext in VIDEO_EXTS:
370
+ return empty_img, gr.update(value=os.path.abspath(url), visible=True), gr.update(visible=True)
371
  if ext in IMAGE_EXTS:
372
  try:
373
  img = Image.open(url)
374
+ if getattr(img, "is_animated", False):
375
+ img.seek(0)
376
+ return gr.update(value=img.convert("RGB"), visible=True), empty_vid, gr.update(visible=False)
377
+ except Exception:
378
+ return empty_img, empty_vid, gr.update(visible=False)
379
  head = safe_head(url)
380
  if head:
381
  ctype = (head.headers.get("content-type") or "").lower()
 
384
  try:
385
  r = safe_get(url, timeout=15)
386
  img = Image.open(BytesIO(r.content))
387
+ if getattr(img, "is_animated", False):
388
+ img.seek(0)
389
+ return gr.update(value=img.convert("RGB"), visible=True), empty_vid, gr.update(visible=False)
390
  except Exception:
391
+ return empty_img, empty_vid, gr.update(visible=False)
392
 
393
+ url_input.change(fn=load_preview, inputs=[url_input],
394
+ outputs=[preview_image, preview_video, preview_video])
395
 
396
  def clear_all():
397
  return "", gr.update(value=None, visible=False), gr.update(value=None, visible=False), "idle", gr.update(value=_btn_label_for_status("idle"))
398
  clear_btn.click(fn=clear_all, inputs=[], outputs=[url_input, preview_image, preview_video, status_state, submit_btn])
399
 
 
 
 
 
 
400
  def start_busy():
401
  s = "busy"
402
  return s, gr.update(value=_btn_label_for_status(s))
 
404
 
405
  def worker(url: str, prompt: str, key: str, progress=gr.Progress()):
406
  return process_media(url or "", prompt or "", key or "", progress=progress)
407
+
408
+ submit_btn.click(fn=worker, inputs=[url_input, custom_prompt, api_key],
409
+ outputs=[output_md], queue=True).then(
410
  fn=lambda res: ("error", "**Error:** no result returned.") if not res else
411
  ("error", f"**Error:** {res}") if isinstance(res, str) and res.lower().startswith("error") else ("done", res),
412
  inputs=[output_md],