| import os, uuid, zipfile, shutil, cv2, json |
| from flask import Flask, request, send_file, render_template_string, jsonify |
| from rembg import remove, new_session |
| from PIL import Image |
|
|
| app = Flask(__name__) |
| session = new_session("u2net") |
| JOBS_DIR = "jobs" |
| os.makedirs(JOBS_DIR, exist_ok=True) |
|
|
| HTML = """ |
| <!DOCTYPE html> |
| <html> |
| <head> |
| <title>Xlnk Video Processor</title> |
| <style> |
| body { background: #0b0f19; color: white; font-family: sans-serif; text-align: center; padding: 20px; } |
| .container { max-width: 500px; margin: auto; border: 1px solid #334155; padding: 30px; border-radius: 10px; background: #111827; } |
| .progress-box { display: none; margin-top: 20px; } |
| #progressBar { width: 100%; background: #374151; border-radius: 5px; height: 20px; overflow: hidden; } |
| #progressFill { width: 0%; height: 100%; background: #3b82f6; transition: width 0.3s; } |
| .btn { background: #3b82f6; color: white; border: none; padding: 12px 24px; border-radius: 5px; cursor: pointer; } |
| </style> |
| </head> |
| <body> |
| <div class="container"> |
| <h2>Video BG Remover</h2> |
| <form id="uploadForm"> |
| <input type="file" id="videoFile" accept="video/*" required><br><br> |
| <button type="submit" class="btn" id="btn">Upload & Process</button> |
| </form> |
| |
| <div class="progress-box" id="pbox"> |
| <p id="statusTxt">Uploading...</p> |
| <div id="progressBar"><div id="progressFill"></div></div> |
| </div> |
| </div> |
| |
| <script> |
| const form = document.getElementById('uploadForm'); |
| let jobId = ""; |
| |
| form.onsubmit = async (e) => { |
| e.preventDefault(); |
| document.getElementById('btn').style.display = 'none'; |
| document.getElementById('pbox').style.display = 'block'; |
| |
| const formData = new FormData(); |
| formData.append('video', document.getElementById('videoFile').files[0]); |
| |
| // Start the process |
| const response = await fetch('/upload', { method: 'POST', body: formData }); |
| const data = await response.json(); |
| jobId = data.job_id; |
| |
| // Start polling for progress |
| const interval = setInterval(async () => { |
| const res = await fetch(`/status/${jobId}`); |
| const status = await res.json(); |
| |
| if (status.percent) { |
| document.getElementById('progressFill').style.width = status.percent + '%'; |
| document.getElementById('statusTxt').innerText = `Processing: ${status.current}/${status.total} frames`; |
| } |
| |
| if (status.status === "completed") { |
| clearInterval(interval); |
| window.location.href = `/download/${jobId}`; |
| } |
| }, 2000); |
| }; |
| </script> |
| </body> |
| </html> |
| """ |
|
|
| @app.route("/") |
| def index(): return render_template_string(HTML) |
|
|
| @app.route("/upload", methods=["POST"]) |
| def upload(): |
| file = request.files['video'] |
| job_id = str(uuid.uuid4()) |
| path = os.path.join(JOBS_DIR, job_id) |
| os.makedirs(os.path.join(path, "frames"), exist_ok=True) |
| |
| video_path = os.path.join(path, "input.mp4") |
| file.save(video_path) |
| |
| |
| with open(os.path.join(path, "status.json"), "w") as f: |
| json.dump({"status": "processing", "current": 0, "total": 0, "percent": 0}, f) |
|
|
| |
| |
| |
| process_video(job_id, video_path, path) |
| return jsonify({"job_id": job_id}) |
|
|
| def process_video(job_id, video_path, workspace): |
| cap = cv2.VideoCapture(video_path) |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
| frames_dir = os.path.join(workspace, "frames") |
| |
| count = 0 |
| while cap.isOpened(): |
| ret, frame = cap.read() |
| if not ret: break |
| |
| output = remove(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB), session=session) |
| Image.fromarray(output).save(os.path.join(frames_dir, f"f_{count:04d}.png")) |
| count += 1 |
| |
| |
| if count % 5 == 0 or count == total_frames: |
| with open(os.path.join(workspace, "status.json"), "w") as f: |
| json.dump({"status": "processing", "current": count, "total": total_frames, "percent": int((count/total_frames)*100)}, f) |
| |
| cap.release() |
| |
| zip_path = os.path.join(JOBS_DIR, f"{job_id}.zip") |
| with zipfile.ZipFile(zip_path, 'w') as z: |
| for f in os.listdir(frames_dir): |
| z.write(os.path.join(frames_dir, f), f) |
| |
| with open(os.path.join(workspace, "status.json"), "w") as f: |
| json.dump({"status": "completed"}, f) |
|
|
| @app.route("/status/<job_id>") |
| def status(job_id): |
| try: |
| with open(os.path.join(JOBS_DIR, job_id, "status.json"), "r") as f: |
| return jsonify(json.load(f)) |
| except: return jsonify({"status": "not_found"}) |
|
|
| @app.route("/download/<job_id>") |
| def download(job_id): |
| return send_file(os.path.join(JOBS_DIR, f"{job_id}.zip"), as_attachment=True) |
|
|
| if __name__ == "__main__": |
| app.run(host="0.0.0.0", port=7860) |