# app.py β€” VidGen (Veo) β€” FINAL + Quota Fallback + Floating Donate # - Dropdown model sesuai doc: ['veo-2.0-generate-001','veo-3.0-fast-generate-001','veo-3.0-generate-001'] # - Quota/rate-limit fallback notification # - Floating donate buttons (right-bottom): Saweria & Ko-fi import os, re, time, uuid, json, base64, requests from typing import Optional, List, Dict, Any, Tuple from datetime import datetime from concurrent.futures import ThreadPoolExecutor, as_completed import streamlit as st # ---------- APP & ENV ---------- st.set_page_config(page_title="VidGen β€” Veo", page_icon="🎬", layout="wide") os.environ.setdefault("HOME", "/home/user") os.environ.setdefault("STREAMLIT_SERVER_MAX_UPLOAD_SIZE", "1024") OUTPUT_DIR = "/home/user/outputs" os.makedirs(OUTPUT_DIR, exist_ok=True) # ---------- STYLE ---------- st.markdown(""" """, unsafe_allow_html=True) # ---------- MODELS & DEFAULTS ---------- MODELS_UI = [ "veo-2.0-generate-001", "veo-3.0-fast-generate-001", "veo-3.0-generate-001", ] MODEL_MAP = { "veo-2.0-generate-001": "veo-2.0-generate-001", "veo-3.0-fast-generate-001": "veo-3.0-fast-generate-001", "veo-3.0-generate-001": "veo-3.0-generate-001", } ASPECTS = ["16:9", "9:16"] NEG_DEF = ( "no text overlays, no subtitles, no watermarks, no copyright logos, no UI elements, " "no cartoon or comic effects, no unrealistic body proportions, no extra limbs, " "no deformed anatomy, no blurry or out-of-focus areas, no distorted hands or faces, " "no glitches, no pixelation, no low resolution artifacts, no compression noise, " "no color banding, no oversaturation, no unnatural lighting, no artificial shadows, " "no grainy textures, no harsh contrast, no underexposed or overexposed areas, " "no camera shake, no motion blur, no jitter, no poor audio quality, no lip-sync issues, " "no robotic or unnatural movements, no floating objects, no duplicate subjects, " "no text artifacts, no borders, no frames, no vignette, no film grain, " "no depth-map errors, no watermark-like textures, no flickering, no chromatic aberration" ) # ---------- FAST DOWNLOADER ---------- DL_MAX_THREADS = int(os.getenv("VIDGEN_DL_THREADS", "12")) DL_PART_SIZE = int(os.getenv("VIDGEN_DL_PARTSIZE", str(8*1024*1024))) # 8MB DL_CHUNK_SIZE = int(os.getenv("VIDGEN_DL_CHUNK", str(4*1024*1024))) # 4MB DL_TIMEOUT = (10, 600) SESSION = requests.Session() SESSION.mount("https://", requests.adapters.HTTPAdapter(pool_connections=64, pool_maxsize=64, max_retries=2)) SESSION.mount("http://", requests.adapters.HTTPAdapter(pool_connections=64, pool_maxsize=64, max_retries=2)) def _autotune(total_bytes:int)->Tuple[int,int,int]: threads = min(DL_MAX_THREADS, 12) part = max(DL_PART_SIZE, 8*1024*1024) chunk = max(DL_CHUNK_SIZE, 4*1024*1024) if total_bytes >= 512*1024*1024: threads = min(DL_MAX_THREADS,16); part=max(part,16*1024*1024); chunk=max(chunk,8*1024*1024) elif total_bytes >= 256*1024*1024: threads = min(DL_MAX_THREADS,14); part=max(part,12*1024*1024); chunk=max(chunk,6*1024*1024) elif total_bytes >= 128*1024*1024: threads = min(DL_MAX_THREADS,12); part=max(part,10*1024*1024); chunk=max(chunk,4*1024*1024) return threads, part, chunk def _probe_len_range(url:str, headers:dict)->Tuple[int,bool]: try: rh=SESSION.head(url, headers=headers, timeout=DL_TIMEOUT, allow_redirects=True) if rh.status_code==200: total=int(rh.headers.get("Content-Length","0") or 0) return total, ("bytes" in rh.headers.get("Accept-Ranges","").lower()) except requests.RequestException: pass try: r=SESSION.get(url, headers={**headers,"Range":"bytes=0-0"}, timeout=DL_TIMEOUT) if r.status_code in (200,206): cr=r.headers.get("Content-Range","") if "/" in cr: return int(cr.split("/")[-1]), True return int(r.headers.get("Content-Length","0") or 0), (r.status_code==206) except requests.RequestException: pass return 0, False def _range_job(url:str, s:int, e:int, headers:dict)->bytes: h=dict(headers); h["Range"]=f"bytes={s}-{e}" r=SESSION.get(url, headers=h, timeout=DL_TIMEOUT) r.raise_for_status() return r.content def fast_download(uri:str, dst:str, prog, api_key:str, start_frac:float=0.90): headers={"x-goog-api-key": api_key or "", "X-Goog-Api-Key": api_key or ""} total,range_ok = _probe_len_range(uri, headers) t_threads, t_part, t_chunk = _autotune(total if total>0 else DL_PART_SIZE*4) if total>0 and range_ok and total>=t_part*2: parts=[(i, min(i+t_part-1, total-1)) for i in range(0,total,t_part)] done=0; last=time.time() with open(dst,"wb") as f: f.truncate(total) with ThreadPoolExecutor(max_workers=min(t_threads,len(parts))) as ex: futs={ex.submit(_range_job,uri,s,e,headers):(s,e) for s,e in parts} for fut in as_completed(futs): s,e=futs[fut]; data=fut.result() with open(dst,"r+b") as f: f.seek(s); f.write(data) done += (e-s+1) now=time.time() if now-last>0.2: prog.progress(min(1.0, start_frac + (done/total)*(1-start_frac))); last=now prog.progress(1.0); return with SESSION.get(uri, headers=headers, stream=True, timeout=DL_TIMEOUT) as r: r.raise_for_status() clen=int(r.headers.get("Content-Length","0") or 0) read=0; last=time.time() with open(dst,"wb") as f: for chunk in r.iter_content(t_chunk): if not chunk: continue f.write(chunk) if clen: read+=len(chunk); now=time.time() if now-last>0.2: prog.progress(min(1.0, start_frac + (read/clen)*(1-start_frac))); last=now prog.progress(1.0) # ---------- MEDIA UTILS ---------- def _ffprobe_path(): try: import imageio_ffmpeg; return imageio_ffmpeg.get_ffprobe_exe() except Exception: return None def get_video_size(path:str)->Tuple[int,int]: fp=_ffprobe_path() if not fp: return (0,0) import subprocess, json, shlex out=subprocess.check_output(shlex.split(f'"{fp}" -v error -select_streams v:0 -show_entries stream=width,height -of json "{path}"')) d=json.loads(out.decode()); s=d.get("streams",[{}])[0] return int(s.get("width",0)), int(s.get("height",0)) def has_audio_stream(path:str)->Optional[bool]: fp=_ffprobe_path() if not fp: return None import subprocess, json, shlex out=subprocess.check_output(shlex.split(f'"{fp}" -v error -select_streams a -show_entries stream=index -of json "{path}"')) return bool(json.loads(out.decode()).get("streams")) def mute_video(src:str, dst:str): import imageio_ffmpeg, subprocess, shlex, os ff=imageio_ffmpeg.get_ffmpeg_exe() cmd=f'"{ff}" -y -i "{src}" -map 0:v:0 -c:v copy -map -0:a -map -0:s -dn -movflags +faststart "{dst}"' try: subprocess.run(shlex.split(cmd), check=True) try: if has_audio_stream(dst) is False: return except Exception: return except Exception: pass tmp=dst+".tmp.mp4" cmd=f'"{ff}" -y -i "{src}" -an -c:v libx264 -preset veryfast -crf 18 -movflags +faststart "{tmp}"' subprocess.run(shlex.split(cmd), check=True) os.replace(tmp,dst) # ---------- HELPERS ---------- def _b64(file)->Tuple[str,str,str]: data=file.read() mime="image/jpeg" if file.type in ("image/jpeg","image/jpg") else "image/png" return base64.b64encode(data).decode(), mime, file.name def slug_from_prompt(p:str,max_words:int=7)->str: if not p: return "video" words=re.findall(r"[A-Za-z0-9\-]+", p)[:max_words] title=re.sub(r"[^A-Za-z0-9\-\s_]+","", " ".join(words)).strip() title=re.sub(r"\s+"," ",title) return title or "video" def out_path(prompt:str)->str: base=slug_from_prompt(prompt,7); uniq=str(uuid.uuid4().int % 100).zfill(2) path=os.path.join(OUTPUT_DIR, f"veo_{base}_{uniq}.mp4") if os.path.exists(path): path=os.path.join(OUTPUT_DIR, f"veo_{base}_{uniq}_{uuid.uuid4().hex[:2]}.mp4") return path def parse_prompt_json(raw:str)->List[Dict[str,Any]]: data=json.loads(raw) if not isinstance(data,list): raise ValueError("JSON harus berupa array.") for i,x in enumerate(data,1): if not isinstance(x,dict): raise ValueError(f"Item ke-{i} bukan objek.") if (x.get("mode","text")=="text") and ("prompt" not in x): raise ValueError(f"Item ke-{i} tidak punya field 'prompt'.") return data def is_quota_error(exc: Exception) -> bool: s = (str(exc) or "").lower() return ("resource_exhausted" in s) or ("rate limit" in s) or ("quota" in s) or ("429" in s) def show_quota_fallback(container, model_id: str): with container: st.error("🚦 API limit/quota tercapai (rate-limited).") st.markdown( f"- Coba **model** lain atau jalankan ulang nanti (model sekarang: `{model_id}`).\n" f"- Kurangi batch / jalankan satu per satu.\n" f"- Pastikan **billing & kuota** akun Google AI aktif.\n" f"- Jika suka dengan proyek ini, boleh dukung pengembangan di tombol donasi kanan bawah πŸ™‚", ) # ---------- CORE GENERATOR ---------- def generate_one_with_ui( *, api_key: str, model_ui: str, mode: str, # "text"|"image" prompt: str, negative_prompt: Optional[str], aspect: str, want_resolution: str, # "720p"|"1080p" person_generation: Optional[str], img_b64: Optional[str], img_mime: Optional[str], enable_audio: bool, ui_box: st.container, tz_name: str = "Asia/Jakarta", ) -> Dict[str,Any]: from google import genai from google.genai import types from google.genai.errors import ClientError model_id = MODEL_MAP.get(model_ui, model_ui) header = ui_box.container() with header: st.markdown("### 🎬 Video Pair") st.markdown( f'Model: {model_ui}' f'Aspect: {aspect}' f'Request: {want_resolution}' f'Durasi: 8s', unsafe_allow_html=True, ) client = genai.Client(api_key=api_key) def build_cfg(): # 9:16 -> 720p; 16:9 -> 720p/1080p res = "720p" if aspect=="9:16" else ("1080p" if want_resolution=="1080p" else "720p") cfg_kwargs = dict( aspect_ratio=aspect, resolution=res, number_of_videos=1, negative_prompt=(negative_prompt or None), ) if person_generation not in (None, "", "auto"): cfg_kwargs["person_generation"] = person_generation return types.GenerateVideosConfig(**cfg_kwargs) def run_call(_cfg): if mode == "text": return client.models.generate_videos(model=model_id, prompt=prompt, config=_cfg) else: from google.genai import types as t img = t.Image(image_bytes=base64.b64decode(img_b64), mime_type=img_mime) return client.models.generate_videos(model=model_id, prompt=prompt or "", image=img, config=_cfg) def _start_and_download(_cfg): prog = ui_box.progress(0.02) note = ui_box.info(f"⏳ Menghasilkan video… (Google Veo)") t0 = time.time() # --- panggilan API + polling dengan guard quota --- try: op = run_call(_cfg) except ClientError as ce: if is_quota_error(ce): note.empty(); show_quota_fallback(ui_box, model_id); return {"error":"quota"} raise except Exception as e: if is_quota_error(e): note.empty(); show_quota_fallback(ui_box, model_id); return {"error":"quota"} raise ticks = 0 while True: try: if op.done: break time.sleep(6); ticks += 1 prog.progress(min(0.90, 0.02 + ticks*0.06)) note.info("⏳ Polling status dari server…") op = client.operations.get(op) except ClientError as ce: if is_quota_error(ce): note.empty(); show_quota_fallback(ui_box, model_id); return {"error":"quota"} raise except Exception as e: if is_quota_error(e): note.empty(); show_quota_fallback(ui_box, model_id); return {"error":"quota"} raise prog.progress(0.90); note.info("πŸ“₯ Mengunduh hasil…") resp = getattr(op, "response", None) or getattr(op, "result", None) vids = getattr(resp, "generated_videos", None) if not resp or not vids: note.empty(); ui_box.error("API tidak mengembalikan video.") return None video = vids[0] tmp = os.path.join(OUTPUT_DIR, f"tmp_{uuid.uuid4().hex}.mp4") final = out_path(prompt or "video") uri = getattr(video.video, "uri", None) or getattr(video, "uri", None) if uri: try: fast_download(uri, tmp, prog, api_key or "") except Exception as e: note.warning(f"Gagal parallel download, fallback: {e}") try: client.files.download(file=video.video) try: video.video.save(tmp); prog.progress(1.0) except Exception: content=getattr(video.video,"bytes",None) if not content: raise RuntimeError("video kosong") with open(tmp,"wb") as f: f.write(content) prog.progress(1.0) except Exception as e2: note.empty(); ui_box.error(f"Gagal unduh: {e2}") return None else: try: client.files.download(file=video.video) try: video.video.save(tmp); prog.progress(1.0) except Exception: content=getattr(video.video,"bytes",None) if not content: raise RuntimeError("video kosong") with open(tmp,"wb") as f: f.write(content) prog.progress(1.0) except Exception as e: note.empty(); ui_box.error(f"Gagal unduh: {e}") return None if not enable_audio: try: mute_video(tmp, final); os.remove(tmp) except Exception as e: os.replace(tmp, final); ui_box.warning(f"Gagal mute: {e}") else: os.replace(tmp, final) from zoneinfo import ZoneInfo try: tz = ZoneInfo(tz_name or "Asia/Jakarta") except Exception: tz = ZoneInfo("UTC") finished = datetime.now(tz).strftime("%Y-%m-%d %H:%M:%S %Z") elapsed = int(time.time() - t0) w,h = get_video_size(final) prog.progress(1.0); note.empty() with ui_box: st.success("Selesai βœ…") st.markdown( f'⏱️ {finished}' f'πŸ•’ {elapsed}s' f'Dim: {w}Γ—{h or "?"}' f'Provider: Google AI', unsafe_allow_html=True, ) st.video(final) with open(final,"rb") as fh: st.download_button("⬇️ Download Video", fh, file_name=os.path.basename(final), use_container_width=True) return {"path": final, "h": h, "res_used": _cfg.resolution} try: cfg = build_cfg() res = _start_and_download(cfg) if res and "path" in res: final_res = res.get("res_used", cfg.resolution if hasattr(cfg, "resolution") else "720p") return {"path": res["path"], "meta":{ "mode": ("image" if img_b64 else "text"), "aspect_ratio": aspect, "resolution": final_res }} return res or {"error":"failed"} except ClientError as ce: if is_quota_error(ce): show_quota_fallback(ui_box, model_id) return {"error":"quota"} ui_box.error(f"Gagal generate (API): {ce}") return {"error":"client_error"} except Exception as e: if is_quota_error(e): show_quota_fallback(ui_box, model_id) return {"error":"quota"} ui_box.error(f"Gagal generate: {e}") return {"error":"failed"} # ---------- SIDEBAR ---------- with st.sidebar: st.header("Credentials") api_key = st.text_input("Google API Key", type="password", help="Untuk Google AI (GenAI).") st.markdown('
', unsafe_allow_html=True) st.header("Setting") model_ui = st.selectbox("Model", MODELS_UI, index=1) # default: veo-3.0-fast-generate-001 mode_ui = st.radio("Mode", ["Text → Video","Image → Video"]) images_info: List[Tuple[str,str,str]] = [] if mode_ui == "Image → Video": uploaded = st.file_uploader("Upload gambar (PNG/JPG) — batch", type=["png","jpg","jpeg"], accept_multiple_files=True) sort = st.checkbox("Urutkan file A→Z", value=True) if uploaded: files = sorted(uploaded, key=lambda f:f.name.lower()) if sort else list(uploaded) st.caption("Urutan pairing:") for i,f in enumerate(files,1): st.write(f"{i}. {f.name}") for f in files: images_info.append(_b64(f)) want_resolution = st.selectbox("Target kualitas", ["720p","1080p"], index=0) aspect = st.selectbox("Aspect Ratio", ASPECTS, index=0) if aspect=="9:16" and want_resolution=="1080p": st.info("1080p hanya tersedia untuk 16:9. App akan menggunakan 720p untuk 9:16.") negative_prompt = st.text_area("Negative Prompt (editable)", value=NEG_DEF, height=120) person_generation = st.selectbox("Person Generation", ["auto","allow_adult","allow_all"], index=0) audio_on = st.checkbox("Enable audio (unmute)", value=True) num_results= st.number_input("Jumlah hasil (per prompt)", 1, 5, 1) tz_name = st.selectbox("Zona waktu timestamp", ["Asia/Jakarta","Asia/Makassar","Asia/Jayapura","UTC"], index=0) st.markdown('
', unsafe_allow_html=True) c1,c2 = st.columns(2) run_btn = c1.button("Generate Video", use_container_width=True, type="primary") clr_btn = c2.button("Clear Hasil", use_container_width=True) # ---------- PROMPT AREA ---------- st.title("🎬 Video Generator β€” Google Veo (API Key Only)") st.caption("Dropdown model sesuai dokumen resmi. 1080p hanya untuk 16:9; 9:16 selalu 720p.") with st.container(): st.subheader("Prompt") prompt_raw_val = st.text_area("Masukkan Prompt", placeholder="Multi Prompt (enter 1x) atau JSON array", height=160, key="prompt_main") json_mode = st.checkbox("Prompt JSON", value=False, key="json_mode_main") # ---------- STATE ---------- for k,v in [("results",[]),("jobs",[]),("job_idx",0),("running",False),("api_key_val","")]: if k not in st.session_state: st.session_state[k]=v if clr_btn: st.session_state.update({"results":[],"jobs":[],"job_idx":0,"running":False}); st.rerun() # ---------- BUILD JOBS ---------- def build_jobs_text(lines:List[str])->List[Dict[str,Any]]: return [{ "prompt":p, "negative":negative_prompt, "aspect":aspect, "resolution":want_resolution, "person":person_generation, "mode":"text", "image":None, "audio":audio_on } for p in lines] def build_jobs_image(lines:List[str], infos:List[Tuple[str,str,str]])->List[Dict[str,Any]]: n=min(len(lines),len(infos)); jobs=[] for i in range(n): b64,mime,_=infos[i] jobs.append({ "prompt":lines[i], "negative":negative_prompt, "aspect":aspect, "resolution":want_resolution, "person":person_generation, "mode":"image", "image":{"b64":b64,"mime":mime}, "audio":audio_on }) return jobs # ---------- RUN ---------- if run_btn: if not api_key: st.error("Masukkan Google API Key.") else: items=None if json_mode: try: items = parse_prompt_json(prompt_raw_val) except Exception as e: st.error(f"JSON salah: {e}") jobs=[] if items is None: lines=[p.strip() for p in (prompt_raw_val or "").splitlines() if p.strip()] if not lines: st.error("Masukkan minimal satu prompt.") else: if mode_ui=="Text β†’ Video": jobs = build_jobs_text(lines) else: if not images_info: st.error("Mode Image β†’ Video butuh gambar.") else: st.caption("Pairing prompt ↔ gambar:") for i in range(min(len(lines),len(images_info))): st.write(f"- Prompt {i+1} ↔ {images_info[i][2]}") jobs = build_jobs_image(lines, images_info) else: for it in items: jmode=(it.get("mode") or ("image" if it.get("image") else "text")).lower() b64=None; mime=None if jmode=="image": img=it.get("image") if isinstance(img,dict) and img.get("b64"): b64=img["b64"]; mime=img.get("mime","image/png") elif isinstance(img,str) and img.startswith(("http://","https://")): try: r=SESSION.get(img,timeout=DL_TIMEOUT); r.raise_for_status() b64=base64.b64encode(r.content).decode() mime=r.headers.get("Content-Type","image/png") except Exception as e: st.error(f"Galat ambil image URL: {e}") elif isinstance(img,str) and img.startswith("data:image/"): try: header,data=img.split(",",1); mime=header[5:header.find(";")]; b64=data except Exception: b64=None jobs.append({ "prompt": it.get("prompt",""), "negative": it.get("negative_prompt", NEG_DEF), "aspect": it.get("aspect_ratio", aspect), "resolution": it.get("resolution", want_resolution), "person": it.get("person_generation","auto"), "mode": jmode, "image": ({"b64":b64,"mime":mime} if (jmode=="image" and b64) else None), "audio": bool(it.get("audio", True)) }) expanded=[] for j in jobs: for _ in range(int(num_results)): expanded.append(j.copy()) if expanded: st.session_state.update({ "results":[], "jobs":expanded, "job_idx":0, "running":True, "api_key_val": api_key }); st.rerun() # ---------- SHOW RESULTS ---------- if st.session_state["results"]: st.markdown("## Hasil Selesai") for i,item in enumerate(st.session_state["results"],1): st.markdown( f'
' f'

🎬 Video Pair {i}

' f'{item["meta"]["mode"]} β€’ {item["meta"]["aspect_ratio"]} β€’ {item["meta"]["resolution"]}' f'
', unsafe_allow_html=True ) st.video(item["path"]) with open(item["path"],"rb") as fh: st.download_button(f"⬇️ Download Video (hasil {i})", fh, file_name=os.path.basename(item["path"]), use_container_width=True, key=f"dl-persist-{i}") # ---------- WORK LOOP ---------- if st.session_state["running"]: jobs=st.session_state["jobs"]; idx=st.session_state["job_idx"] if idx>=len(jobs): st.session_state["running"]=False; st.success("Semua job selesai βœ…") else: j=jobs[idx] st.markdown(f"### Memproses job {idx+1}/{len(jobs)}") box=st.container() res = generate_one_with_ui( api_key=st.session_state["api_key_val"], model_ui=model_ui, mode=j["mode"], prompt=j["prompt"], negative_prompt=j["negative"], aspect=j["aspect"], want_resolution=j["resolution"], person_generation=j["person"], img_b64=(j["image"]["b64"] if j.get("image") else None), img_mime=(j["image"]["mime"] if j.get("image") else None), enable_audio=j["audio"], ui_box=box, tz_name=tz_name, ) if isinstance(res, dict) and "path" in res: st.session_state["results"].append({ "path": res["path"], "meta": {"mode": j["mode"], "aspect_ratio": j["aspect"], "resolution": res["meta"]["resolution"]} }) st.session_state["job_idx"]=idx+1; st.rerun() # ---------- FLOATING DONATE ---------- st.markdown(""" """, unsafe_allow_html=True)