File size: 6,016 Bytes
aecade4
 
 
 
 
f2841bb
aecade4
 
8d55a91
 
 
aecade4
8d55a91
 
397572e
8d55a91
 
aecade4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
397572e
aecade4
 
 
 
 
 
 
 
 
f2841bb
 
aecade4
 
 
f2841bb
aecade4
 
397572e
aecade4
 
397572e
aecade4
 
 
397572e
aecade4
 
397572e
aecade4
 
 
397572e
aecade4
 
 
 
 
 
 
 
 
397572e
aecade4
 
 
 
397572e
 
aecade4
 
397572e
aecade4
 
 
 
 
397572e
 
aecade4
 
 
 
 
 
 
 
 
 
397572e
aecade4
 
 
 
 
 
 
 
 
 
 
 
397572e
aecade4
 
 
 
 
 
 
 
 
 
 
397572e
aecade4
 
397572e
aecade4
 
 
 
 
397572e
aecade4
 
397572e
 
aecade4
 
 
 
 
 
397572e
aecade4
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import os
import uuid
import requests
import threading
import time
from flask import Flask, request, jsonify, render_template
from werkzeug.utils import secure_filename
from itertools import cycle
from dotenv import load_dotenv

load_dotenv()

workers_env = os.getenv("WORKER_NODES", "")
if not workers_env:
    raise ValueError("WORKER_NODES environment variable is not set")

WORKER_URLS = [url.strip() for url in workers_env.split(',') if url.strip()]
worker_pool = cycle(WORKER_URLS)

UPLOADER_API_URL = "https://hamed744-uploadfile.hf.space/upload"
HF_TOKEN = os.getenv("HF_TOKEN")

app = Flask(__name__, template_folder='templates')

jobs = {}
lock = threading.Lock()

def get_next_worker_url():
    return next(worker_pool)

def get_permanent_link(job_id, temp_render_url, worker_url):
    try:
        with lock: jobs[job_id]["status"] = "در حال دائمی‌سازی لینک..."
        if not HF_TOKEN: raise Exception("HF_TOKEN not found")
        
        video_full_url = f"{worker_url}{temp_render_url}"
        
        payload = {'url': video_full_url}
        headers = {'Authorization': f'Bearer {HF_TOKEN}'}
        response = requests.post(UPLOADER_API_URL, json=payload, headers=headers, timeout=600)
        response.raise_for_status()
        data = response.json()
        
        final_url = data.get("hf_url") or data.get("url")
        if not final_url: raise Exception("Invalid response from uploader service")
        
        with lock:
            jobs[job_id]["status"] = "completed"
            jobs[job_id]["result"] = final_url
            
    except Exception as e:
        print(f"Error making link permanent for {job_id}: {e}")
        with lock:
            jobs[job_id]["status"] = "error"
            jobs[job_id]["result"] = f"Error making link permanent. Temp link: {temp_render_url}"

def poll_worker_service(job_id, render_job_id, worker_url):
    POLLING_INTERVAL = 15
    MAX_POLLING_ERRORS = 20
    error_count = 0

    while True:
        try:
            response = requests.post(f"{worker_url}/check_job_status", json={"job_id": render_job_id}, timeout=45)
            response.raise_for_status()
            error_count = 0

            data = response.json()
            with lock:
                current_job = jobs.get(job_id)
                if current_job and current_job.get("status") != data.get("status"):
                    current_job["status"] = data.get("status")
            
            if data.get("status") == "completed":
                get_permanent_link(job_id, data.get("result"), worker_url)
                return
            elif data.get("status") == "error":
                with lock: 
                    current_job = jobs.get(job_id)
                    if current_job:
                        current_job["result"] = data.get("result", "Unknown worker error")
                return
        except requests.exceptions.RequestException as e:
            error_count += 1
            print(f"Connection error polling worker {worker_url} ({error_count}/{MAX_POLLING_ERRORS}): {e}")
            if error_count >= MAX_POLLING_ERRORS:
                with lock:
                    current_job = jobs.get(job_id)
                    if current_job:
                        current_job["status"] = "error"
                        current_job["result"] = f"Connection to worker ({worker_url}) lost."
                return
        
        time.sleep(POLLING_INTERVAL)

@app.route('/')
def index():
    return render_template('index.html')

@app.route('/api/submit_job', methods=['POST'])
def submit_job():
    if 'image_file' not in request.files or 'video_file' not in request.files:
        return jsonify({"error": "Image and video files are required"}), 400

    image_file = request.files['image_file']
    video_file = request.files['video_file']
    motion = request.form.get('motion')
    style = request.form.get('style')

    image_bytes = image_file.read()
    video_bytes = video_file.read()
    
    MAX_RETRIES = 3
    for attempt in range(MAX_RETRIES):
        selected_worker_url = get_next_worker_url()
        print(f"Attempt {attempt + 1}/{MAX_RETRIES}: Sending to worker: {selected_worker_url}")
        try:
            response = requests.post(
                f"{selected_worker_url}/submit_new_job",
                files={'image_file': (secure_filename(image_file.filename), image_bytes, image_file.mimetype),
                       'video_file': (secure_filename(video_file.filename), video_bytes, video_file.mimetype)},
                data={'motion': motion, 'style': style},
                timeout=600
            )
            response.raise_for_status()
            render_data = response.json()
            render_job_id = render_data.get("job_id")
            if not render_job_id: raise Exception("Invalid response from worker")
            
            internal_job_id = str(uuid.uuid4())
            with lock: jobs[internal_job_id] = {"status": "Sending to worker...", "result": None}
            
            thread = threading.Thread(target=poll_worker_service, args=(internal_job_id, render_job_id, selected_worker_url))
            thread.start()
            return jsonify({"job_id": internal_job_id})
        except Exception as e:
            print(f"Error on attempt {attempt + 1} for worker {selected_worker_url}: {e}")
            time.sleep(2)

    final_error_message = ("<strong>Servers are busy!</strong><br>"
                         "Please try again in a few minutes.")
    return jsonify({"error": final_error_message}), 500

@app.route('/api/check_status', methods=['POST'])
def check_status():
    data = request.get_json()
    job_id = data.get('job_id')
    if not job_id: return jsonify({"error": "Job ID is required"}), 400
    with lock: job = jobs.get(job_id, {"status": "not_found", "result": None})
    return jsonify({"status": job["status"], "result": job["result"]})

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=int(os.environ.get('PORT', 7860)))