Spaces:
Running
Running
| from flask import Flask, request, send_file, jsonify | |
| import os | |
| import subprocess | |
| import uuid | |
| import shutil | |
| import zipfile | |
| import math | |
| app = Flask(__name__) | |
| UPLOAD_FOLDER = "uploads" | |
| os.makedirs(UPLOAD_FOLDER, exist_ok=True) | |
| # වීඩියෝ එකේ කාලය (Duration) ගන්න ෆන්ෂන් එක | |
| def get_video_duration(input_path): | |
| try: | |
| result = subprocess.run( | |
| ["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", input_path], | |
| stdout=subprocess.PIPE, stderr=subprocess.STDOUT | |
| ) | |
| return float(result.stdout) | |
| except: | |
| return 0.0 | |
| # 🎨 UNIVERSAL CONVERTER ENGINE (මේක තමයි හැම වැඩේම කරන්නේ) | |
| def convert_segment(input_path, output_path, conf, start_time=0): | |
| duration = conf['t'] | |
| fps = conf['fps'] | |
| quality = conf['q'] | |
| scale_size = conf['s'] | |
| # 🌟 Blur Filter Logic (Smart Resize එක්ක) | |
| # 512px, 384px, 320px ඕන එකකට හරියන්න මේක හැදෙනවා | |
| filter_complex = ( | |
| f"split[bg][fg];" | |
| f"[bg]scale={scale_size}:{scale_size}:force_original_aspect_ratio=increase,crop={scale_size}:{scale_size},boxblur=20:10[bg_blurred];" | |
| f"[fg]scale={scale_size}:{scale_size}:force_original_aspect_ratio=decrease[fg_scaled];" | |
| f"[bg_blurred][fg_scaled]overlay=(W-w)/2:(H-h)/2" | |
| ) | |
| command = [ | |
| "ffmpeg", "-y", | |
| "-ss", str(start_time), # පටන් ගන්න තැන (Pack එකට ඕන වෙනවා) | |
| "-i", input_path, | |
| "-vf", f"{filter_complex},fps={fps}", | |
| "-vcodec", "libwebp", | |
| "-lossless", "0", | |
| "-compression_level", "6", # Ultra Compression | |
| "-preset", "default", | |
| "-q:v", str(quality), | |
| "-loop", "0", | |
| "-an", | |
| "-t", str(duration), # කොච්චර වෙලා කපනවද | |
| output_path | |
| ] | |
| try: | |
| subprocess.run(command, check=True) | |
| return True | |
| except subprocess.CalledProcessError as e: | |
| print(f"FFmpeg Error: {e}") | |
| return False | |
| def home(): | |
| return "🔥 Cipher_MD Smart Engine (Guaranteed < 1MB) Running!" | |
| # ================================================================ | |
| # 1. SINGLE STICKER ROUTE (SMART LOOP ENABLED ✅) | |
| # ================================================================ | |
| def make_single_sticker(): | |
| if 'file' not in request.files: | |
| return jsonify({"error": "No file uploaded"}), 400 | |
| file = request.files['file'] | |
| ext = file.filename.split('.')[-1] | |
| filename = f"{uuid.uuid4()}.{ext}" | |
| input_path = os.path.join(UPLOAD_FOLDER, filename) | |
| output_path = os.path.join(UPLOAD_FOLDER, f"{uuid.uuid4()}.webp") | |
| file.save(input_path) | |
| is_animated = ext.lower() in ['mp4', 'gif', 'mov', 'avi', 'mkv', 'webm'] | |
| success = False | |
| if is_animated: | |
| original_duration = get_video_duration(input_path) | |
| target_duration = min(original_duration, 10.0) | |
| # 🧠 SMART LOOP (Priority List) | |
| # එකින් එක ට්රයි කරලා 1MB ට අඩු එකක් හම්බෙනකම් දුවනවා | |
| attempts = [ | |
| {"s": 512, "q": 50, "fps": 15, "t": target_duration}, # Attempt 1 | |
| {"s": 384, "q": 45, "fps": 12, "t": target_duration}, # Attempt 2 (Resize) | |
| {"s": 320, "q": 40, "fps": 12, "t": min(target_duration, 7.0)}, # Attempt 3 (Time Cut) | |
| {"s": 256, "q": 30, "fps": 10, "t": min(target_duration, 5.0)} # Emergency (Low Quality) | |
| ] | |
| for i, conf in enumerate(attempts): | |
| print(f"🔄 Single Attempt {i+1}: Size={conf['s']}px...") | |
| if convert_segment(input_path, output_path, conf, start_time=0): | |
| size = os.path.getsize(output_path) | |
| print(f"📦 Size: {size/1024:.2f} KB") | |
| if size < 980000: # 980KB Safe Limit | |
| success = True | |
| break | |
| else: | |
| # Static Image Logic | |
| cmd = ["ffmpeg", "-y", "-i", input_path, "-vf", "scale='if(gt(iw,ih),512,-1)':'if(gt(iw,ih),-1,512)',scale=512:512:force_original_aspect_ratio=decrease,pad=512:512:(ow-iw)/2:(oh-ih)/2:color=#00000000", "-vcodec", "libwebp", "-lossless", "0", "-q:v", "80", output_path] | |
| try: | |
| subprocess.run(cmd, check=True) | |
| success = True | |
| except: | |
| success = False | |
| if os.path.exists(input_path): os.remove(input_path) | |
| if success and os.path.exists(output_path): | |
| return send_file(output_path, mimetype='image/webp') | |
| else: | |
| return jsonify({"error": "Failed to generate valid sticker"}), 500 | |
| # ================================================================ | |
| # 2. HEAVY PACK ROUTE (SMART LOOP FOR EVERY SEGMENT ✅) | |
| # ================================================================ | |
| def make_pack(): | |
| if 'file' not in request.files: | |
| return jsonify({"error": "No file uploaded"}), 400 | |
| file = request.files['file'] | |
| request_id = str(uuid.uuid4()) | |
| pack_folder = os.path.join(UPLOAD_FOLDER, request_id) | |
| os.makedirs(pack_folder, exist_ok=True) | |
| input_path = os.path.join(pack_folder, "video.mp4") | |
| file.save(input_path) | |
| duration = get_video_duration(input_path) | |
| if duration > 130: | |
| shutil.rmtree(pack_folder) | |
| return jsonify({"error": "Video too long (Max 2 mins)"}), 400 | |
| sticker_files = [] | |
| num_segments = math.ceil(duration / 10) | |
| # 🔄 හැම කෑල්ලක්ම Loop එකක් ඇතුලේ Process කරනවා | |
| for i in range(num_segments): | |
| start_time = i * 10 | |
| output_name = f"{i+1}.webp" | |
| output_full_path = os.path.join(pack_folder, output_name) | |
| segment_duration = min(10, duration - start_time) | |
| if segment_duration < 1: continue | |
| # 🧠 PACK SMART LOOP (මේක තමයි කලින් මිස් වුනේ!) | |
| # හැම Sticker කෑල්ලක්ම 1MB ට අඩු වෙනකම් Resize කරනවා | |
| attempts = [ | |
| {"s": 512, "q": 50, "fps": 15, "t": segment_duration}, | |
| {"s": 384, "q": 45, "fps": 12, "t": segment_duration}, | |
| {"s": 320, "q": 40, "fps": 10, "t": segment_duration}, | |
| {"s": 256, "q": 30, "fps": 10, "t": segment_duration} | |
| ] | |
| converted = False | |
| for conf in attempts: | |
| print(f"📦 Pack Segment {i+1}: Trying Size={conf['s']}...") | |
| if convert_segment(input_path, output_full_path, conf, start_time=start_time): | |
| if os.path.getsize(output_full_path) < 980000: | |
| sticker_files.append(output_full_path) | |
| converted = True | |
| break # හරි ගියොත් ඊළඟ කෑල්ලට යනවා | |
| # කොහොමවත් බැරි වුනොත් අන්තිම Output එක හරි ගන්නවා (Fail නොවෙන්න) | |
| if not converted and os.path.exists(output_full_path): | |
| sticker_files.append(output_full_path) | |
| # ZIP File එක හදනවා | |
| zip_path = os.path.join(UPLOAD_FOLDER, f"{request_id}.zip") | |
| with zipfile.ZipFile(zip_path, 'w') as zipf: | |
| for sticker in sticker_files: | |
| zipf.write(sticker, arcname=os.path.basename(sticker)) | |
| # Cleanup | |
| shutil.rmtree(pack_folder) | |
| return send_file(zip_path, mimetype='application/zip', as_attachment=True, download_name=f"{request_id}.zip") | |
| if __name__ == "__main__": | |
| app.run(host="0.0.0.0", port=7860) |