File size: 6,016 Bytes
aecade4 f2841bb aecade4 8d55a91 aecade4 8d55a91 397572e 8d55a91 aecade4 397572e aecade4 f2841bb aecade4 f2841bb aecade4 397572e aecade4 397572e aecade4 397572e aecade4 397572e aecade4 397572e aecade4 397572e aecade4 397572e aecade4 397572e aecade4 397572e aecade4 397572e aecade4 397572e aecade4 397572e aecade4 397572e aecade4 397572e aecade4 397572e aecade4 397572e aecade4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
import os
import uuid
import requests
import threading
import time
from flask import Flask, request, jsonify, render_template
from werkzeug.utils import secure_filename
from itertools import cycle
from dotenv import load_dotenv
load_dotenv()
workers_env = os.getenv("WORKER_NODES", "")
if not workers_env:
raise ValueError("WORKER_NODES environment variable is not set")
WORKER_URLS = [url.strip() for url in workers_env.split(',') if url.strip()]
worker_pool = cycle(WORKER_URLS)
UPLOADER_API_URL = "https://hamed744-uploadfile.hf.space/upload"
HF_TOKEN = os.getenv("HF_TOKEN")
app = Flask(__name__, template_folder='templates')
jobs = {}
lock = threading.Lock()
def get_next_worker_url():
return next(worker_pool)
def get_permanent_link(job_id, temp_render_url, worker_url):
try:
with lock: jobs[job_id]["status"] = "در حال دائمیسازی لینک..."
if not HF_TOKEN: raise Exception("HF_TOKEN not found")
video_full_url = f"{worker_url}{temp_render_url}"
payload = {'url': video_full_url}
headers = {'Authorization': f'Bearer {HF_TOKEN}'}
response = requests.post(UPLOADER_API_URL, json=payload, headers=headers, timeout=600)
response.raise_for_status()
data = response.json()
final_url = data.get("hf_url") or data.get("url")
if not final_url: raise Exception("Invalid response from uploader service")
with lock:
jobs[job_id]["status"] = "completed"
jobs[job_id]["result"] = final_url
except Exception as e:
print(f"Error making link permanent for {job_id}: {e}")
with lock:
jobs[job_id]["status"] = "error"
jobs[job_id]["result"] = f"Error making link permanent. Temp link: {temp_render_url}"
def poll_worker_service(job_id, render_job_id, worker_url):
POLLING_INTERVAL = 15
MAX_POLLING_ERRORS = 20
error_count = 0
while True:
try:
response = requests.post(f"{worker_url}/check_job_status", json={"job_id": render_job_id}, timeout=45)
response.raise_for_status()
error_count = 0
data = response.json()
with lock:
current_job = jobs.get(job_id)
if current_job and current_job.get("status") != data.get("status"):
current_job["status"] = data.get("status")
if data.get("status") == "completed":
get_permanent_link(job_id, data.get("result"), worker_url)
return
elif data.get("status") == "error":
with lock:
current_job = jobs.get(job_id)
if current_job:
current_job["result"] = data.get("result", "Unknown worker error")
return
except requests.exceptions.RequestException as e:
error_count += 1
print(f"Connection error polling worker {worker_url} ({error_count}/{MAX_POLLING_ERRORS}): {e}")
if error_count >= MAX_POLLING_ERRORS:
with lock:
current_job = jobs.get(job_id)
if current_job:
current_job["status"] = "error"
current_job["result"] = f"Connection to worker ({worker_url}) lost."
return
time.sleep(POLLING_INTERVAL)
@app.route('/')
def index():
return render_template('index.html')
@app.route('/api/submit_job', methods=['POST'])
def submit_job():
if 'image_file' not in request.files or 'video_file' not in request.files:
return jsonify({"error": "Image and video files are required"}), 400
image_file = request.files['image_file']
video_file = request.files['video_file']
motion = request.form.get('motion')
style = request.form.get('style')
image_bytes = image_file.read()
video_bytes = video_file.read()
MAX_RETRIES = 3
for attempt in range(MAX_RETRIES):
selected_worker_url = get_next_worker_url()
print(f"Attempt {attempt + 1}/{MAX_RETRIES}: Sending to worker: {selected_worker_url}")
try:
response = requests.post(
f"{selected_worker_url}/submit_new_job",
files={'image_file': (secure_filename(image_file.filename), image_bytes, image_file.mimetype),
'video_file': (secure_filename(video_file.filename), video_bytes, video_file.mimetype)},
data={'motion': motion, 'style': style},
timeout=600
)
response.raise_for_status()
render_data = response.json()
render_job_id = render_data.get("job_id")
if not render_job_id: raise Exception("Invalid response from worker")
internal_job_id = str(uuid.uuid4())
with lock: jobs[internal_job_id] = {"status": "Sending to worker...", "result": None}
thread = threading.Thread(target=poll_worker_service, args=(internal_job_id, render_job_id, selected_worker_url))
thread.start()
return jsonify({"job_id": internal_job_id})
except Exception as e:
print(f"Error on attempt {attempt + 1} for worker {selected_worker_url}: {e}")
time.sleep(2)
final_error_message = ("<strong>Servers are busy!</strong><br>"
"Please try again in a few minutes.")
return jsonify({"error": final_error_message}), 500
@app.route('/api/check_status', methods=['POST'])
def check_status():
data = request.get_json()
job_id = data.get('job_id')
if not job_id: return jsonify({"error": "Job ID is required"}), 400
with lock: job = jobs.get(job_id, {"status": "not_found", "result": None})
return jsonify({"status": job["status"], "result": job["result"]})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=int(os.environ.get('PORT', 7860))) |