Hug0endob commited on
Commit
124f8da
·
verified ·
1 Parent(s): 883b2ce

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +48 -61
app.py CHANGED
@@ -12,43 +12,41 @@ from mistralai import Mistral
12
 
13
  DEFAULT_KEY = os.getenv("MISTRAL_API_KEY")
14
 
15
- def get_mistral_client(alt_key: str = None):
16
- key = alt_key.strip() if alt_key and alt_key.strip() else DEFAULT_KEY
17
  return Mistral(api_key=key)
18
 
19
- def is_remote(src: str):
20
- return src.startswith("http://") or src.startswith("https://")
21
 
22
- def fetch_bytes(source: str):
23
- if is_remote(source):
24
- r = requests.get(source, timeout=30)
25
  r.raise_for_status()
26
  return r.content
27
- with open(source, "rb") as f:
28
  return f.read()
29
 
30
- def try_ffmpeg_extract_frame(media_path: str, out_path: str):
31
  ffmpeg = shutil.which("ffmpeg")
32
  if not ffmpeg:
33
  return False
34
- cmd = [
35
- ffmpeg, "-y", "-i", media_path, "-vf", "scale=-2:512", "-frames:v", "1", out_path
36
- ]
37
  try:
38
  subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=30)
39
  return os.path.exists(out_path)
40
  except Exception:
41
  return False
42
 
43
- def ezgif_convert_to_jpg_bytes(media_bytes: bytes, filename_hint: str = "input"):
44
- files = {"new-image": (filename_hint, media_bytes)}
45
  r = requests.post("https://s.ezgif.com/upload", files=files, timeout=60)
46
  r.raise_for_status()
47
  m = re.search(r'name="file" value="([^"]+)"', r.text)
48
  if not m:
49
  raise RuntimeError("ezgif upload failed")
50
- file_key = m.group(1)
51
- conv = requests.post("https://s.ezgif.com/gif-to-jpg", data={"file": file_key}, timeout=60)
52
  conv.raise_for_status()
53
  m2 = re.search(r'<img src="(https?://s.ezgif.com/tmp/[^"]+)"', conv.text) or re.search(r'<a href="(https?://s.ezgif.com/tmp/[^"]+)"', conv.text)
54
  if not m2:
@@ -58,7 +56,7 @@ def ezgif_convert_to_jpg_bytes(media_bytes: bytes, filename_hint: str = "input")
58
  r2.raise_for_status()
59
  return r2.content
60
 
61
- def convert_media_to_jpeg_bytes(media_bytes: bytes, filename_hint: str = "input") -> bytes:
62
  try:
63
  img = Image.open(BytesIO(media_bytes))
64
  if img.mode != "RGB":
@@ -71,17 +69,17 @@ def convert_media_to_jpeg_bytes(media_bytes: bytes, filename_hint: str = "input"
71
  return buf.getvalue()
72
  except Exception:
73
  with tempfile.TemporaryDirectory() as td:
74
- tmp_in = os.path.join(td, filename_hint)
75
- with open(tmp_in, "wb") as f:
76
  f.write(media_bytes)
77
- tmp_out = os.path.join(td, "frame.jpg")
78
- if try_ffmpeg_extract_frame(tmp_in, tmp_out) and os.path.exists(tmp_out):
79
- with open(tmp_out, "rb") as f:
80
  return f.read()
81
- return ezgif_convert_to_jpg_bytes(media_bytes, filename_hint=filename_hint)
82
 
83
- def image_bytes_to_base64_jpeg(image_bytes: bytes):
84
- return base64.b64encode(image_bytes).decode("utf-8")
85
 
86
  def build_prompt(use_auto: bool, custom: str):
87
  if custom and custom.strip():
@@ -92,15 +90,15 @@ def build_prompt(use_auto: bool, custom: str):
92
  "poses, facial expressions, clothing, background elements, small distinguishing details, mood, and composition.")
93
  return "Provide a detailed description of this image."
94
 
95
- def generate_description_stream(image_source: str, use_auto: bool, custom_prompt: str, alt_key: str):
96
  try:
97
- raw = fetch_bytes(image_source)
98
- jpg = convert_media_to_jpeg_bytes(raw, filename_hint=os.path.basename(image_source) or "input")
99
  except Exception as e:
100
  yield f"Error processing media: {e}"
101
  return
102
 
103
- b64 = image_bytes_to_base64_jpeg(jpg)
104
  prompt = build_prompt(use_auto, custom_prompt)
105
  model = "pixtral-12b-2409"
106
  messages = [{
@@ -112,7 +110,7 @@ def generate_description_stream(image_source: str, use_auto: bool, custom_prompt
112
  "stream": False
113
  }]
114
 
115
- client = get_mistral_client(alt_key)
116
  try:
117
  partial = ""
118
  for chunk in client.chat.stream(model=model, messages=messages):
@@ -127,44 +125,33 @@ with gr.Blocks() as demo:
127
 
128
  with gr.Row():
129
  with gr.Column(scale=1):
130
- url_input = gr.Textbox(label="Image URL or local path", placeholder="https://... or leave blank if uploading")
131
- upload = gr.File(label="Or upload image/video (gif/mp4 allowed)", file_types=None)
132
- preview = gr.Image(label="Preview", type="pil")
133
- submit = gr.Button("Submit")
134
- with gr.Column(scale=1):
135
  custom = gr.Textbox(label="Custom prompt (optional)", lines=4, placeholder="Leave blank to use auto prompt")
136
  auto_toggle = gr.Checkbox(label="Use detailed auto prompt", value=True)
137
- alt_key = gr.Textbox(label="Alternate Mistral API Key (optional)", type="password")
138
- out = gr.Textbox(label="Generated Detailed Description", lines=16)
139
-
140
- def prepare_source(url_val, upload_file):
141
- if upload_file:
142
- return upload_file.name
143
- if url_val and url_val.strip():
144
- return url_val.strip()
145
- return ""
146
-
147
- def update_preview_and_return_source(url_val, upload_file):
148
- source = prepare_source(url_val, upload_file)
149
- if not source:
150
  return None, ""
151
  try:
152
- if is_remote(source):
153
- r = requests.get(source, timeout=30)
154
- r.raise_for_status()
155
- return Image.open(BytesIO(r.content)).convert("RGB"), source
156
- return Image.open(source).convert("RGB"), source
157
  except Exception:
158
- return None, source
159
 
160
- def start_generation(source, use_auto, custom_p, alt_k):
161
- if not source:
162
- return "No image provided."
163
- # stream generator wrapper
164
- for chunk in generate_description_stream(source, use_auto, custom_p, alt_k):
165
  yield chunk
166
 
167
- submit.click(fn=update_preview_and_return_source, inputs=[url_input, upload], outputs=[preview, url_input])
168
- submit.click(fn=start_generation, inputs=[url_input, auto_toggle, custom, alt_key], outputs=[out])
 
169
 
170
  demo.launch()
 
12
 
13
  DEFAULT_KEY = os.getenv("MISTRAL_API_KEY")
14
 
15
+ def get_client(alt_key: str = None):
16
+ key = (alt_key or "").strip() or DEFAULT_KEY
17
  return Mistral(api_key=key)
18
 
19
+ def is_remote(s: str):
20
+ return s.startswith("http://") or s.startswith("https://")
21
 
22
+ def fetch_bytes(src: str):
23
+ if is_remote(src):
24
+ r = requests.get(src, timeout=30)
25
  r.raise_for_status()
26
  return r.content
27
+ with open(src, "rb") as f:
28
  return f.read()
29
 
30
+ def try_ffmpeg_extract_frame(in_path: str, out_path: str):
31
  ffmpeg = shutil.which("ffmpeg")
32
  if not ffmpeg:
33
  return False
34
+ cmd = [ffmpeg, "-y", "-i", in_path, "-vf", "scale=-2:512", "-frames:v", "1", out_path]
 
 
35
  try:
36
  subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=30)
37
  return os.path.exists(out_path)
38
  except Exception:
39
  return False
40
 
41
+ def ezgif_convert(media_bytes: bytes, filename: str = "input"):
42
+ files = {"new-image": (filename, media_bytes)}
43
  r = requests.post("https://s.ezgif.com/upload", files=files, timeout=60)
44
  r.raise_for_status()
45
  m = re.search(r'name="file" value="([^"]+)"', r.text)
46
  if not m:
47
  raise RuntimeError("ezgif upload failed")
48
+ key = m.group(1)
49
+ conv = requests.post("https://s.ezgif.com/gif-to-jpg", data={"file": key}, timeout=60)
50
  conv.raise_for_status()
51
  m2 = re.search(r'<img src="(https?://s.ezgif.com/tmp/[^"]+)"', conv.text) or re.search(r'<a href="(https?://s.ezgif.com/tmp/[^"]+)"', conv.text)
52
  if not m2:
 
56
  r2.raise_for_status()
57
  return r2.content
58
 
59
+ def convert_to_jpeg_bytes(media_bytes: bytes, filename_hint: str = "input"):
60
  try:
61
  img = Image.open(BytesIO(media_bytes))
62
  if img.mode != "RGB":
 
69
  return buf.getvalue()
70
  except Exception:
71
  with tempfile.TemporaryDirectory() as td:
72
+ in_path = os.path.join(td, filename_hint)
73
+ with open(in_path, "wb") as f:
74
  f.write(media_bytes)
75
+ out_path = os.path.join(td, "frame.jpg")
76
+ if try_ffmpeg_extract_frame(in_path, out_path) and os.path.exists(out_path):
77
+ with open(out_path, "rb") as f:
78
  return f.read()
79
+ return ezgif_convert(media_bytes, filename_hint)
80
 
81
+ def to_b64_jpeg(img_bytes: bytes):
82
+ return base64.b64encode(img_bytes).decode("utf-8")
83
 
84
  def build_prompt(use_auto: bool, custom: str):
85
  if custom and custom.strip():
 
90
  "poses, facial expressions, clothing, background elements, small distinguishing details, mood, and composition.")
91
  return "Provide a detailed description of this image."
92
 
93
+ def generate_stream(image_src: str, use_auto: bool, custom_prompt: str, alt_key: str):
94
  try:
95
+ raw = fetch_bytes(image_src)
96
+ jpg = convert_to_jpeg_bytes(raw, filename_hint=os.path.basename(image_src) or "input")
97
  except Exception as e:
98
  yield f"Error processing media: {e}"
99
  return
100
 
101
+ b64 = to_b64_jpeg(jpg)
102
  prompt = build_prompt(use_auto, custom_prompt)
103
  model = "pixtral-12b-2409"
104
  messages = [{
 
110
  "stream": False
111
  }]
112
 
113
+ client = get_client(alt_key)
114
  try:
115
  partial = ""
116
  for chunk in client.chat.stream(model=model, messages=messages):
 
125
 
126
  with gr.Row():
127
  with gr.Column(scale=1):
128
+ alt_key = gr.Textbox(label="Alternate Mistral API Key (optional)", type="password")
129
+ url_input = gr.Textbox(label="Image URL", placeholder="https://...", value="")
 
 
 
130
  custom = gr.Textbox(label="Custom prompt (optional)", lines=4, placeholder="Leave blank to use auto prompt")
131
  auto_toggle = gr.Checkbox(label="Use detailed auto prompt", value=True)
132
+ with gr.Column(scale=1):
133
+ preview = gr.Image(label="Preview", type="pil")
134
+ out = gr.Textbox(label="Generated Detailed Description", lines=20)
135
+
136
+ def load_preview_and_return_url(url):
137
+ if not url:
 
 
 
 
 
 
 
138
  return None, ""
139
  try:
140
+ r = requests.get(url, timeout=30)
141
+ r.raise_for_status()
142
+ img = Image.open(BytesIO(r.content)).convert("RGB")
143
+ return img, url
 
144
  except Exception:
145
+ return None, url
146
 
147
+ def start_gen(url, use_auto, custom_p, alt_k):
148
+ if not url:
149
+ return "No URL provided."
150
+ for chunk in generate_stream(url, use_auto, custom_p, alt_k):
 
151
  yield chunk
152
 
153
+ url_input.change(fn=load_preview_and_return_url, inputs=[url_input], outputs=[preview, url_input])
154
+ # Generate when Submit (press Enter in URL or add separate button if desired)
155
+ url_input.submit(fn=start_gen, inputs=[url_input, auto_toggle, custom, alt_key], outputs=[out])
156
 
157
  demo.launch()