File size: 7,077 Bytes
aecade4 8d55a91 aecade4 8d55a91 397572e 8d55a91 aecade4 397572e aecade4 397572e aecade4 397572e aecade4 397572e aecade4 397572e aecade4 397572e aecade4 397572e aecade4 397572e aecade4 397572e aecade4 397572e aecade4 397572e aecade4 397572e aecade4 397572e aecade4 397572e aecade4 397572e aecade4 397572e aecade4 397572e aecade4 397572e aecade4 397572e aecade4 397572e aecade4 397572e aecade4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
import os
import uuid
import requests
import threading
import time
from flask import Flask, request, jsonify, render_template, Response
from werkzeug.utils import secure_filename
from itertools import cycle
from dotenv import load_dotenv
load_dotenv()
workers_env = os.getenv("WORKER_NODES", "")
if not workers_env:
raise ValueError("WORKER_NODES environment variable is not set")
WORKER_URLS = [url.strip() for url in workers_env.split(',') if url.strip()]
worker_pool = cycle(WORKER_URLS)
UPLOADER_API_URL = "https://hamed744-uploadfile.hf.space/upload"
HF_TOKEN = os.getenv("HF_TOKEN")
app = Flask(__name__, template_folder='templates')
jobs = {}
lock = threading.Lock()
def get_next_worker_url():
return next(worker_pool)
def get_permanent_link(job_id, temp_render_url, worker_url):
try:
with lock: jobs[job_id]["status"] = "در حال دائمیسازی لینک..."
if not HF_TOKEN: raise Exception("HF_TOKEN not found")
video_full_url = f"{worker_url}{temp_render_url}"
payload = {'url': video_full_url}
headers = {'Authorization': f'Bearer {HF_TOKEN}'}
response = requests.post(UPLOADER_API_URL, json=payload, headers=headers, timeout=600)
response.raise_for_status()
data = response.json()
uploader_proxy_url = data.get("hf_url")
if not uploader_proxy_url: raise Exception("Invalid response from uploader")
path_part = uploader_proxy_url.split('/proxy/')[1]
final_proxy_url = f"/proxy/{path_part}"
with lock:
jobs[job_id]["status"] = "completed"
jobs[job_id]["result"] = final_proxy_url
except Exception as e:
print(f"Error making link permanent for {job_id}: {e}")
with lock:
jobs[job_id]["status"] = "error"
jobs[job_id]["result"] = f"Error making link permanent. Temp link: {temp_render_url}"
def poll_worker_service(job_id, render_job_id, worker_url):
POLLING_INTERVAL = 15
MAX_POLLING_ERRORS = 20
error_count = 0
while True:
try:
response = requests.post(f"{worker_url}/check_job_status", json={"job_id": render_job_id}, timeout=45)
response.raise_for_status()
error_count = 0
data = response.json()
with lock:
current_job = jobs.get(job_id)
if current_job and current_job.get("status") != data.get("status"):
current_job["status"] = data.get("status")
if data.get("status") == "completed":
get_permanent_link(job_id, data.get("result"), worker_url)
return
elif data.get("status") == "error":
with lock:
current_job = jobs.get(job_id)
if current_job:
current_job["result"] = data.get("result", "Unknown worker error")
return
except requests.exceptions.RequestException as e:
error_count += 1
print(f"Connection error polling worker {worker_url} ({error_count}/{MAX_POLLING_ERRORS}): {e}")
if error_count >= MAX_POLLING_ERRORS:
with lock:
current_job = jobs.get(job_id)
if current_job:
current_job["status"] = "error"
current_job["result"] = f"Connection to worker ({worker_url}) lost."
return
time.sleep(POLLING_INTERVAL)
@app.route('/')
def index():
return render_template('index.html')
@app.route('/api/submit_job', methods=['POST'])
def submit_job():
if 'image_file' not in request.files or 'video_file' not in request.files:
return jsonify({"error": "Image and video files are required"}), 400
image_file = request.files['image_file']
video_file = request.files['video_file']
motion = request.form.get('motion')
style = request.form.get('style')
image_bytes = image_file.read()
video_bytes = video_file.read()
MAX_RETRIES = 3
for attempt in range(MAX_RETRIES):
selected_worker_url = get_next_worker_url()
print(f"Attempt {attempt + 1}/{MAX_RETRIES}: Sending to worker: {selected_worker_url}")
try:
response = requests.post(
f"{selected_worker_url}/submit_new_job",
files={'image_file': (secure_filename(image_file.filename), image_bytes, image_file.mimetype),
'video_file': (secure_filename(video_file.filename), video_bytes, video_file.mimetype)},
data={'motion': motion, 'style': style},
timeout=600
)
response.raise_for_status()
render_data = response.json()
render_job_id = render_data.get("job_id")
if not render_job_id: raise Exception("Invalid response from worker")
internal_job_id = str(uuid.uuid4())
with lock: jobs[internal_job_id] = {"status": "Sending to worker...", "result": None}
thread = threading.Thread(target=poll_worker_service, args=(internal_job_id, render_job_id, selected_worker_url))
thread.start()
return jsonify({"job_id": internal_job_id})
except Exception as e:
print(f"Error on attempt {attempt + 1} for worker {selected_worker_url}: {e}")
time.sleep(2)
final_error_message = ("<strong>Servers are busy!</strong><br>"
"Please try again in a few minutes.")
return jsonify({"error": final_error_message}), 500
@app.route('/api/check_status', methods=['POST'])
def check_status():
data = request.get_json()
job_id = data.get('job_id')
if not job_id: return jsonify({"error": "Job ID is required"}), 400
with lock: job = jobs.get(job_id, {"status": "not_found", "result": None})
return jsonify({"status": job["status"], "result": job["result"]})
@app.route('/proxy/<user>/<repo>/<path:file_path>')
def file_proxy(user, repo, file_path):
if not HF_TOKEN: return "HF_TOKEN not set", 500
repo_id = f"{user}/{repo}"
file_url = f"https://huggingface.co/datasets/{repo_id}/resolve/main/{file_path}"
headers = {"Authorization": f"Bearer {HF_TOKEN}"}
try:
hf_response = requests.get(file_url, headers=headers, stream=True, timeout=30)
hf_response.raise_for_status()
return Response(hf_response.iter_content(chunk_size=8192),
content_type=hf_response.headers.get('Content-Type', 'application/octet-stream'))
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404: return "File not found", 404
return f"Hugging Face Server Error: {e.response.status_code}", e.response.status_code
except Exception as e:
print(f"Proxy error: {e}")
return "Error fetching file", 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=int(os.environ.get('PORT', 7860))) |