| import os, tempfile, io, math, time, threading |
| import numpy as np |
| import cv2 |
| import gradio as gr |
| from PIL import Image, ImageDraw, ImageFont, ImageFilter, ImageEnhance |
|
|
| |
| hf_token = (os.environ.get("HF_TOKEN","") or os.environ.get("HF_KEY","")).strip() |
| hf_client = None |
| if hf_token: |
| try: |
| from huggingface_hub import login, InferenceClient |
| login(token=hf_token); hf_client = InferenceClient(token=hf_token) |
| print("β
HF ready") |
| except Exception as e: print(f"β οΈ HF: {e}") |
|
|
| |
| HF_MODELS = [ |
| {"id": "Lightricks/LTX-2", "name": "LTX-2 β‘"}, |
| {"id": "Wan-AI/Wan2.2-I2V-A14B", "name": "Wan 2.2"}, |
| {"id": "stabilityai/stable-video-diffusion-img2vid-xt", "name": "SVD-XT"}, |
| {"id": "KlingTeam/LivePortrait", "name": "Kling LivePortrait"}, |
| {"id": "Lightricks/LTX-Video", "name": "LTX-Video"}, |
| {"id": "__local__", "name": "Ken Burns β
"}, |
| ] |
|
|
| def pil_to_bytes(img): |
| b=io.BytesIO(); img.save(b,format="JPEG",quality=92); return b.getvalue() |
|
|
| def run_timeout(fn, sec, *a, **kw): |
| box=[None]; err=[None] |
| def r(): |
| try: box[0]=fn(*a,**kw) |
| except Exception as e: err[0]=str(e) |
| t=threading.Thread(target=r,daemon=True); t.start(); t.join(timeout=sec) |
| if t.is_alive(): print(f" β± timeout"); return None |
| if err[0]: print(f" β {err[0][:80]}") |
| return box[0] |
|
|
| def try_hf(model_id, pil, prompt): |
| if not hf_client: return None |
| try: |
| r=hf_client.image_to_video(image=pil_to_bytes(pil),model=model_id,prompt=prompt) |
| return r.read() if hasattr(r,"read") else r |
| except Exception as e: print(f" β {model_id}: {e}"); return None |
|
|
| def get_video(pil, prompt, cb=None): |
| for m in HF_MODELS: |
| mid,mname=m["id"],m["name"] |
| if cb: cb(f"β³ Trying: {mname}") |
| if mid=="__local__": |
| return ken_burns(pil), mname |
| data=run_timeout(try_hf,50,mid,pil,prompt) |
| if data: |
| t=tempfile.NamedTemporaryFile(suffix=".mp4",delete=False) |
| t.write(data); t.flush() |
| return t.name, mname |
| time.sleep(1) |
| return ken_burns(pil), "Ken Burns" |
|
|
|
|
| |
| |
| |
| def ease(t): t=max(0.,min(1.,t)); return t*t*(3-2*t) |
| def ease_cubic(t): t=max(0.,min(1.,t)); return 4*t*t*t if t<.5 else 1-math.pow(-2*t+2,3)/2 |
| def ease_expo(t): return 1-math.pow(2,-10*t) if t<1 else 1. |
| def ease_bounce(t): |
| if t<1/2.75: return 7.5625*t*t |
| elif t<2/2.75: t-=1.5/2.75; return 7.5625*t*t+.75 |
| elif t<2.5/2.75: t-=2.25/2.75; return 7.5625*t*t+.9375 |
| else: t-=2.625/2.75; return 7.5625*t*t+.984375 |
|
|
| def ken_burns(pil, duration_sec=6, fps=30, style="premium"): |
| TW,TH=720,1280 |
| |
| pad=60; BW,BH=TW+pad*2,TH+pad*2 |
| total=duration_sec*fps |
|
|
| |
| img=pil.convert("RGB"); sw,sh=img.size |
| |
| scale=TH/sh; nw=int(sw*scale); nh=TH |
| if nw>TW: scale=TW/sw; nw=TW; nh=int(sh*scale) |
| img_resized=img.resize((nw,nh),Image.LANCZOS) |
| |
| bg=img.resize((TW,TH),Image.LANCZOS) |
| bg=bg.filter(ImageFilter.GaussianBlur(radius=20)) |
| bg_arr=np.array(ImageEnhance.Brightness(bg).enhance(0.5)) |
| canvas=Image.fromarray(bg_arr) |
| |
| px=(TW-nw)//2; py=(TH-nh)//2 |
| canvas.paste(img_resized,(px,py)) |
| canvas=canvas.filter(ImageFilter.UnsharpMask(radius=0.8,percent=110,threshold=2)) |
| canvas=ImageEnhance.Contrast(canvas).enhance(1.05) |
| canvas=ImageEnhance.Color(canvas).enhance(1.08) |
| base=np.array(canvas.resize((BW,BH),Image.LANCZOS)) |
|
|
| |
| Y,X=np.ogrid[:TH,:TW] |
| dist=np.sqrt(((X-TW/2)/(TW/2))**2+((Y-TH/2)/(TH/2))**2) |
| vmask=np.clip(1.-0.22*np.maximum(dist-0.85,0)**2,0,1).astype(np.float32) |
|
|
| |
| SEG=[ |
| (0.00,0.30, 1.00,1.04, 0, -int(pad*.40), 0, -int(pad*.40)), |
| (0.30,0.60, 1.04,1.06, -int(pad*.30), int(pad*.30), -int(pad*.40),-int(pad*.70)), |
| (0.60,0.80, 1.06,1.04, int(pad*.30), int(pad*.50), -int(pad*.70),-int(pad*.40)), |
| (0.80,1.00, 1.04,1.00, int(pad*.50), 0, -int(pad*.40), 0), |
| ] |
|
|
| tmp=tempfile.NamedTemporaryFile(suffix=".mp4",delete=False) |
| writer=cv2.VideoWriter(tmp.name,cv2.VideoWriter_fourcc(*"mp4v"),fps,(TW,TH)) |
|
|
| for i in range(total): |
| tg=i/max(total-1,1) |
| zoom=pan_x=pan_y=None |
| for t0,t1,z0,z1,px0,px1,py0,py1 in SEG: |
| if t0<=tg<=t1: |
| te=ease_cubic((tg-t0)/(t1-t0)) |
| zoom=z0+(z1-z0)*te; pan_x=int(px0+(px1-px0)*te); pan_y=int(py0+(py1-py0)*te); break |
| if zoom is None: zoom,pan_x,pan_y=1.,0,0 |
| |
|
|
| cw,ch=int(TW/zoom),int(TH/zoom) |
| ox,oy=BW//2+pan_x,BH//2+pan_y |
| x1,y1=max(0,ox-cw//2),max(0,oy-ch//2) |
| x2,y2=min(BW,x1+cw),min(BH,y1+ch) |
| if (x2-x1)<10 or (y2-y1)<10: x1,y1,x2,y2=0,0,TW,TH |
|
|
| frame=cv2.resize(base[y1:y2,x1:x2],(TW,TH),interpolation=cv2.INTER_LINEAR) |
|
|
| |
| f=frame.astype(np.float32)/255. |
| if style=="premium": |
| f[:,:,0]=np.clip(f[:,:,0]*1.03+.01,0,1) |
| f[:,:,2]=np.clip(f[:,:,2]*1.02,0,1) |
| elif style=="energetic": |
| gray=0.299*f[:,:,0:1]+0.587*f[:,:,1:2]+0.114*f[:,:,2:3] |
| f=np.clip(gray+1.2*(f-gray),0,1); f=np.clip(f*1.04,0,1) |
| elif style=="fun": |
| f[:,:,0]=np.clip(f[:,:,0]*1.05,0,1) |
| f[:,:,1]=np.clip(f[:,:,1]*1.03,0,1) |
| frame=np.clip(f*255,0,255).astype(np.uint8) |
|
|
| |
| frame=np.clip(frame.astype(np.float32)*vmask[:,:,None],0,255).astype(np.uint8) |
|
|
| |
| frame=np.clip(frame.astype(np.float32)+np.random.normal(0,3,frame.shape),0,255).astype(np.uint8) |
|
|
| |
| frame[:36,:]=0; frame[-36:,:]=0 |
|
|
| |
| if tg<0.02: alpha=ease_expo(tg/0.02) |
| elif tg>0.95: alpha=ease(1-(tg-0.95)/0.05) |
| else: alpha=1. |
| if alpha<1.: frame=np.clip(frame.astype(np.float32)*alpha,0,255).astype(np.uint8) |
|
|
| writer.write(cv2.cvtColor(frame,cv2.COLOR_RGB2BGR)) |
| writer.release() |
| return tmp.name |
|
|
|
|
| |
| |
| |
| def add_captions_ffmpeg(video_path, caption, duration_sec, style): |
| """Burn animated captions + hashtag tag + shop-now CTA using ffmpeg drawtext.""" |
| import re |
| def clean(t): return re.sub(r"[^A-Za-z0-9 !.,-]","",t).strip() |
|
|
| words=caption.strip().split() |
| mid=max(1,len(words)//2) |
| line1=clean(" ".join(words[:mid])) |
| line2=clean(" ".join(words[mid:])) if len(words)>1 else line1 |
|
|
| colors={"premium":"FFD232","energetic":"3CC8FF","fun":"FF78C8"} |
| col=colors.get(style,"FFFFFF") |
| out=video_path.replace(".mp4","_cap.mp4") |
|
|
| font_paths=[ |
| "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", |
| "/usr/share/fonts/truetype/liberation/LiberationSans-Bold.ttf", |
| "/usr/share/fonts/truetype/freefont/FreeSansBold.ttf", |
| ] |
| font=""; font_reg="" |
| for p in font_paths: |
| if os.path.exists(p): font=f":fontfile='{p}'"; font_reg=font; break |
|
|
| def dt(text, start, end, y, size=42, color=None, box_alpha="0.60"): |
| c = color or col |
| fd=0.4 |
| return ( |
| f"drawtext=text='{text}'{font}" |
| f":fontsize={size}:fontcolor=#{c}" |
| f":x=(w-text_w)/2:y={y}" |
| f":box=1:boxcolor=black@{box_alpha}:boxborderw=14" |
| f":enable='between(t,{start},{end})'" |
| f":alpha='if(lt(t,{start+fd}),(t-{start})/{fd},if(gt(t,{end-fd}),({end}-t)/{fd},1))'" |
| ) |
|
|
| end2 = min(duration_sec-0.2, 6.5) |
|
|
| |
| cap1 = dt(line1, 1.0, 3.5, "h-190") |
| cap2 = dt(line2, 3.8, end2, "h-190") |
|
|
| |
| cta_colors={"premium":"FF9900","energetic":"FF4444","fun":"AA44FF"} |
| cta = dt("Shop Now >", 4.5, end2, "h-130", size=32, color=cta_colors.get(style,"FF9900"), box_alpha="0.70") |
|
|
| |
| tag = dt("#NewCollection", 0.5, 3.0, "60", size=28, color="FFFFFF", box_alpha="0.40") |
|
|
| vf = ",".join([cap1, cap2, cta, tag]) |
|
|
| ret=os.system(f'ffmpeg -y -i "{video_path}" -vf "{vf}" -c:a copy "{out}" -loglevel error') |
| return out if (ret==0 and os.path.exists(out)) else video_path |
|
|
|
|
| |
| |
| |
| def make_bgm(duration_sec, out_path, style="premium"): |
| import wave |
| sr=44100; n=int(sr*duration_sec) |
| t=np.linspace(0,duration_sec,n,endpoint=False) |
| bpm={"premium":88,"energetic":126,"fun":104}.get(style,88) |
| beat=60./bpm |
|
|
| kick=np.zeros(n,np.float32) |
| for i in range(int(duration_sec/beat)+2): |
| s=int(i*beat*sr) |
| if s>=n: break |
| l=min(int(sr*.10),n-s) |
| env=np.exp(-20*np.arange(l)/sr) |
| kick[s:s+l]+=env*np.sin(2*math.pi*55*np.exp(-25*np.arange(l)/sr)*np.arange(l)/sr)*0.55 |
|
|
| bass_f={"premium":55,"energetic":80,"fun":65}.get(style,55) |
| bass=np.sin(2*math.pi*bass_f*t)*0.10*(0.5+0.5*np.sin(2*math.pi*(bpm/60/4)*t)) |
|
|
| mf={"premium":[261,329,392],"energetic":[330,415,494],"fun":[392,494,587]}.get(style,[261,329,392]) |
| mel=np.zeros(n,np.float32) |
| for j,f in enumerate(mf): |
| env=np.clip(0.5+0.5*np.sin(2*math.pi*1.5*t-j*2.1),0,1) |
| mel+=np.sin(2*math.pi*f*t)*env*0.045 |
|
|
| hat=np.zeros(n,np.float32) |
| hs=beat/2 |
| for i in range(int(duration_sec/hs)+2): |
| s=int(i*hs*sr) |
| if s>=n: break |
| l=min(int(sr*.03),n-s) |
| hat[s:s+l]+=np.random.randn(l)*np.exp(-80*np.arange(l)/sr)*0.06 |
|
|
| mix=np.clip((kick+bass+mel+hat)*0.18,-1,1) |
| fade=int(sr*.5); mix[:fade]*=np.linspace(0,1,fade); mix[-fade:]*=np.linspace(1,0,fade) |
|
|
| with wave.open(out_path,"w") as wf: |
| wf.setnchannels(1); wf.setsampwidth(2); wf.setframerate(sr) |
| wf.writeframes((mix*32767).astype(np.int16).tobytes()) |
|
|
| def add_audio(video_path, caption, duration_sec, style): |
| bgm=video_path.replace(".mp4","_bgm.wav") |
| final=video_path.replace(".mp4","_final.mp4") |
| make_bgm(duration_sec, bgm, style) |
|
|
| |
| audio=bgm |
| try: |
| from gtts import gTTS |
| tts_mp3=video_path.replace(".mp4","_tts.mp3") |
| tts_wav=video_path.replace(".mp4","_tts.wav") |
| gTTS(text=caption[:200],lang="en",slow=False).save(tts_mp3) |
| mixed=video_path.replace(".mp4","_mix.wav") |
| os.system(f'ffmpeg -y -i "{bgm}" -i "{tts_mp3}" ' |
| f'-filter_complex "[0]volume=0.20[a];[1]volume=0.95[b];[a][b]amix=inputs=2:duration=first" ' |
| f'-t {duration_sec} "{mixed}" -loglevel error') |
| if os.path.exists(mixed): audio=mixed |
| except Exception as e: print(f" TTS skip: {e}") |
|
|
| os.system(f'ffmpeg -y -i "{video_path}" -i "{audio}" ' |
| f'-c:v copy -c:a aac -b:a 128k -shortest "{final}" -loglevel error') |
| return final if os.path.exists(final) else video_path |
|
|
|
|
| |
| |
| |
| def generate(image, caption, style, add_aud, add_cap, progress=gr.Progress()): |
| if image is None: return None,"β οΈ Upload an image!" |
| pil=image if isinstance(image,Image.Image) else Image.fromarray(image) |
| cap=caption.strip() or "Premium Quality. Shop Now." |
| prompt=f"cinematic product ad, {cap}, smooth motion, dramatic lighting" |
| lines=[] |
| def log(msg): lines.append(msg); progress(min(.1+len(lines)*.10,.80),desc=msg) |
|
|
| progress(.05,desc="π Starting...") |
| video_path, model_used = get_video(pil, prompt, cb=log) |
| dur=6 |
|
|
| |
| if add_cap: |
| log("π¬ Adding captions...") |
| video_path=add_captions_ffmpeg(video_path, cap, dur, style.lower()) |
|
|
| |
| if add_aud: |
| log("π΅ Adding music + voice...") |
| video_path=add_audio(video_path, cap, dur, style.lower()) |
|
|
| progress(1.0,desc="β
Done!") |
| return video_path, "\n".join(lines)+f"\n\nβ
Used: {model_used}" |
|
|
|
|
| |
| css="#title{text-align:center;font-size:2.3rem;font-weight:900}#sub{text-align:center;color:#888;margin-bottom:1.5rem}" |
| with gr.Blocks(css=css,theme=gr.themes.Soft(primary_hue="violet")) as demo: |
| gr.Markdown("# π¬ AI Reel Generator",elem_id="title") |
| gr.Markdown("Image β AI video + captions + music",elem_id="sub") |
| with gr.Row(): |
| with gr.Column(scale=1): |
| img_in =gr.Image(label="πΈ Upload Image",type="pil",height=300) |
| cap_in =gr.Textbox(label="βοΈ Caption",value="Step into style. Own the moment.",lines=2) |
| sty_dd =gr.Dropdown(["Premium","Energetic","Fun"],value="Premium",label="π¨ Style") |
| with gr.Row(): |
| aud_cb=gr.Checkbox(label="π΅ Music + Voice",value=True) |
| cap_cb=gr.Checkbox(label="π¬ Captions", value=True) |
| gen_btn=gr.Button("π Generate Reel",variant="primary",size="lg") |
| gr.Markdown("**π Chain:** LTX-2 β‘ β Wan 2.2 β SVD-XT β Kling β LTX-Video β Ken Burns β
") |
| with gr.Column(scale=1): |
| vid_out=gr.Video(label="π₯ Reel",height=500) |
| log_out=gr.Textbox(label="π Log",lines=6,interactive=False) |
| gen_btn.click(fn=generate,inputs=[img_in,cap_in,sty_dd,aud_cb,cap_cb],outputs=[vid_out,log_out]) |
|
|
| if __name__=="__main__": |
| demo.launch() |