|
|
from flask import Flask, request, jsonify |
|
|
from datetime import datetime, timedelta |
|
|
import requests |
|
|
import uuid |
|
|
import json |
|
|
from threading import Lock, Thread |
|
|
from collections import deque |
|
|
import os |
|
|
import time |
|
|
|
|
|
app = Flask(__name__) |
|
|
|
|
|
|
|
|
WORKER_URLS = os.getenv('WORKER_URLS', '').split(',') |
|
|
API_KEY = os.getenv('API_KEY', '') |
|
|
|
|
|
|
|
|
workers = {} |
|
|
job_queue = deque() |
|
|
jobs = {} |
|
|
lock = Lock() |
|
|
|
|
|
|
|
|
worker_assignment_index = 0 |
|
|
worker_assignment_lock = Lock() |
|
|
|
|
|
|
|
|
worker_health_check_interval = 60 |
|
|
|
|
|
|
|
|
for idx, url in enumerate(WORKER_URLS): |
|
|
if url.strip(): |
|
|
worker_id = f"worker-{idx+1}" |
|
|
workers[worker_id] = { |
|
|
'url': url.strip(), |
|
|
'status': 'unknown', |
|
|
'last_heartbeat': None, |
|
|
'total_processed': 0, |
|
|
'current_job': None, |
|
|
'consecutive_failures': 0 |
|
|
} |
|
|
|
|
|
def verify_api_key(): |
|
|
"""Verify API key if configured""" |
|
|
if not API_KEY: |
|
|
return True |
|
|
|
|
|
auth_header = request.headers.get('Authorization', '') |
|
|
if auth_header.startswith('Bearer '): |
|
|
token = auth_header[7:] |
|
|
return token == API_KEY |
|
|
return False |
|
|
|
|
|
@app.route('/health', methods=['GET']) |
|
|
def health_check(): |
|
|
"""Health check endpoint""" |
|
|
with lock: |
|
|
idle_workers = len([w for w in workers.values() if w['status'] == 'idle']) |
|
|
busy_workers = len([w for w in workers.values() if w['status'] == 'busy']) |
|
|
offline_workers = len([w for w in workers.values() if w['status'] == 'offline']) |
|
|
|
|
|
return jsonify({ |
|
|
'status': 'online', |
|
|
'timestamp': datetime.now().isoformat(), |
|
|
'workers': { |
|
|
'total': len(workers), |
|
|
'idle': idle_workers, |
|
|
'busy': busy_workers, |
|
|
'offline': offline_workers |
|
|
}, |
|
|
'queue_size': len(job_queue), |
|
|
'active_jobs': len([j for j in jobs.values() if j['status'] == 'processing']) |
|
|
}) |
|
|
|
|
|
@app.route('/submit', methods=['POST']) |
|
|
def submit_job(): |
|
|
"""Submit a new job""" |
|
|
if not verify_api_key(): |
|
|
return jsonify({'error': 'Unauthorized'}), 401 |
|
|
|
|
|
data = request.json |
|
|
|
|
|
if not data or 'unique_id' not in data: |
|
|
return jsonify({'error': 'Missing unique_id'}), 400 |
|
|
|
|
|
if 'service_type' not in data: |
|
|
return jsonify({'error': 'Missing service_type'}), 400 |
|
|
|
|
|
if 'image_data' not in data: |
|
|
return jsonify({'error': 'Missing image_data'}), 400 |
|
|
|
|
|
unique_id = data['unique_id'] |
|
|
|
|
|
with lock: |
|
|
if unique_id in jobs: |
|
|
return jsonify({ |
|
|
'message': 'Job already exists', |
|
|
'unique_id': unique_id, |
|
|
'status': jobs[unique_id]['status'] |
|
|
}), 200 |
|
|
|
|
|
|
|
|
available_workers = [w for w in workers.values() if w['status'] in ['idle', 'busy', 'unknown']] |
|
|
if not available_workers: |
|
|
return jsonify({ |
|
|
'error': 'No workers available', |
|
|
'message': 'All processing servers are offline. Please try again later.' |
|
|
}), 503 |
|
|
|
|
|
jobs[unique_id] = { |
|
|
'status': 'queued', |
|
|
'service_type': data['service_type'], |
|
|
'image_data': data['image_data'], |
|
|
'result': None, |
|
|
'created_at': datetime.now(), |
|
|
'worker_id': None, |
|
|
'error': None, |
|
|
'retry_count': 0 |
|
|
} |
|
|
|
|
|
job_queue.append(unique_id) |
|
|
|
|
|
|
|
|
Thread(target=assign_jobs_to_workers, daemon=True).start() |
|
|
|
|
|
return jsonify({ |
|
|
'message': 'Job submitted successfully', |
|
|
'unique_id': unique_id, |
|
|
'queue_position': get_queue_position(unique_id) |
|
|
}), 202 |
|
|
|
|
|
@app.route('/status/<unique_id>', methods=['GET']) |
|
|
def check_status(unique_id): |
|
|
"""Check job status""" |
|
|
if not verify_api_key(): |
|
|
return jsonify({'error': 'Unauthorized'}), 401 |
|
|
|
|
|
with lock: |
|
|
if unique_id not in jobs: |
|
|
return jsonify({'error': 'Job not found'}), 404 |
|
|
|
|
|
job = jobs[unique_id] |
|
|
|
|
|
response = { |
|
|
'unique_id': unique_id, |
|
|
'status': job['status'], |
|
|
'created_at': job['created_at'].isoformat() |
|
|
} |
|
|
|
|
|
if job['status'] == 'completed': |
|
|
response['data'] = job['result'] |
|
|
response['service_type'] = job['service_type'] |
|
|
|
|
|
elif job['status'] == 'failed': |
|
|
error_data = job.get('result', {}) if isinstance(job.get('result'), dict) else {} |
|
|
|
|
|
if 'error' in error_data: |
|
|
response['error'] = error_data['error'] |
|
|
response['data'] = error_data |
|
|
elif job.get('error'): |
|
|
response['error'] = job['error'] |
|
|
response['data'] = {'error': job['error']} |
|
|
else: |
|
|
response['error'] = 'Unknown error' |
|
|
response['data'] = {'error': 'Unknown error'} |
|
|
|
|
|
print(f"[Orchestrator] Failed job {unique_id} - Error: {response['error']}") |
|
|
|
|
|
elif job['status'] == 'queued': |
|
|
response['queue_position'] = get_queue_position(unique_id) |
|
|
|
|
|
elif job['status'] == 'processing': |
|
|
response['worker_id'] = job['worker_id'] |
|
|
|
|
|
return jsonify(response), 200 |
|
|
|
|
|
@app.route('/workers/status', methods=['GET']) |
|
|
def workers_status(): |
|
|
"""Get status of all workers""" |
|
|
if not verify_api_key(): |
|
|
return jsonify({'error': 'Unauthorized'}), 401 |
|
|
|
|
|
with lock: |
|
|
worker_list = [] |
|
|
for worker_id, worker in workers.items(): |
|
|
worker_list.append({ |
|
|
'id': worker_id, |
|
|
'status': worker['status'], |
|
|
'total_processed': worker['total_processed'], |
|
|
'current_job': worker['current_job'], |
|
|
'last_heartbeat': worker['last_heartbeat'].isoformat() if worker['last_heartbeat'] else None, |
|
|
'consecutive_failures': worker.get('consecutive_failures', 0) |
|
|
}) |
|
|
|
|
|
return jsonify({ |
|
|
'workers': worker_list, |
|
|
'total_workers': len(workers), |
|
|
'active_workers': len([w for w in workers.values() if w['status'] in ['idle', 'busy']]), |
|
|
'queue_size': len(job_queue) |
|
|
}), 200 |
|
|
|
|
|
@app.route('/worker/heartbeat', methods=['POST']) |
|
|
def worker_heartbeat(): |
|
|
"""Worker heartbeat endpoint""" |
|
|
data = request.json |
|
|
|
|
|
if not data or 'worker_id' not in data: |
|
|
return jsonify({'error': 'Missing worker_id'}), 400 |
|
|
|
|
|
worker_id = data['worker_id'] |
|
|
|
|
|
with lock: |
|
|
if worker_id not in workers: |
|
|
workers[worker_id] = { |
|
|
'url': data.get('url', ''), |
|
|
'status': 'idle', |
|
|
'last_heartbeat': datetime.now(), |
|
|
'total_processed': 0, |
|
|
'current_job': None, |
|
|
'consecutive_failures': 0 |
|
|
} |
|
|
else: |
|
|
workers[worker_id]['status'] = data.get('status', 'idle') |
|
|
workers[worker_id]['last_heartbeat'] = datetime.now() |
|
|
workers[worker_id]['consecutive_failures'] = 0 |
|
|
|
|
|
if data.get('status') == 'idle' and workers[worker_id]['current_job']: |
|
|
workers[worker_id]['current_job'] = None |
|
|
|
|
|
|
|
|
Thread(target=assign_jobs_to_workers, daemon=True).start() |
|
|
|
|
|
return jsonify({'message': 'Heartbeat received'}), 200 |
|
|
|
|
|
@app.route('/worker/result', methods=['POST']) |
|
|
def worker_result(): |
|
|
"""Worker submits job result""" |
|
|
data = request.json |
|
|
|
|
|
if not data or 'unique_id' not in data: |
|
|
return jsonify({'error': 'Missing unique_id'}), 400 |
|
|
|
|
|
unique_id = data['unique_id'] |
|
|
worker_id = data.get('worker_id') |
|
|
result_status = data.get('status', 'failed') |
|
|
|
|
|
print(f"[Orchestrator] Received result for {unique_id} - Status: {result_status}") |
|
|
|
|
|
with lock: |
|
|
if unique_id not in jobs: |
|
|
print(f"[Orchestrator] Job {unique_id} not found") |
|
|
return jsonify({'error': 'Job not found'}), 404 |
|
|
|
|
|
job = jobs[unique_id] |
|
|
|
|
|
if result_status == 'completed': |
|
|
job['status'] = 'completed' |
|
|
job['result'] = data.get('result', {}) |
|
|
print(f"[Orchestrator] Job {unique_id} completed successfully") |
|
|
|
|
|
else: |
|
|
job['status'] = 'failed' |
|
|
|
|
|
result_data = data.get('result', {}) |
|
|
|
|
|
if isinstance(result_data, dict): |
|
|
job['result'] = result_data |
|
|
job['error'] = result_data.get('error', 'Unknown error') |
|
|
elif isinstance(result_data, str): |
|
|
job['result'] = {'error': result_data} |
|
|
job['error'] = result_data |
|
|
else: |
|
|
error_msg = data.get('error', 'Unknown error') |
|
|
job['result'] = {'error': error_msg} |
|
|
job['error'] = error_msg |
|
|
|
|
|
print(f"[Orchestrator] Job {unique_id} failed - Error: {job['error']}") |
|
|
|
|
|
|
|
|
if worker_id and worker_id in workers: |
|
|
workers[worker_id]['status'] = 'idle' |
|
|
workers[worker_id]['current_job'] = None |
|
|
workers[worker_id]['total_processed'] += 1 |
|
|
|
|
|
|
|
|
Thread(target=assign_jobs_to_workers, daemon=True).start() |
|
|
|
|
|
return jsonify({'message': 'Result received'}), 200 |
|
|
|
|
|
def assign_jobs_to_workers(): |
|
|
"""Assign pending jobs to idle workers with Round-Robin""" |
|
|
global worker_assignment_index |
|
|
|
|
|
with lock: |
|
|
while job_queue: |
|
|
|
|
|
idle_workers = [wid for wid, w in workers.items() |
|
|
if w['status'] == 'idle' and w.get('consecutive_failures', 0) < 3] |
|
|
|
|
|
if not idle_workers: |
|
|
print("[Orchestrator] No idle workers available") |
|
|
break |
|
|
|
|
|
|
|
|
with worker_assignment_lock: |
|
|
selected_worker = idle_workers[worker_assignment_index % len(idle_workers)] |
|
|
worker_assignment_index += 1 |
|
|
|
|
|
unique_id = job_queue.popleft() |
|
|
|
|
|
if unique_id not in jobs: |
|
|
continue |
|
|
|
|
|
job = jobs[unique_id] |
|
|
|
|
|
job['status'] = 'processing' |
|
|
job['worker_id'] = selected_worker |
|
|
|
|
|
workers[selected_worker]['status'] = 'busy' |
|
|
workers[selected_worker]['current_job'] = unique_id |
|
|
|
|
|
try: |
|
|
worker_url = workers[selected_worker]['url'] |
|
|
print(f"[Orchestrator] Assigning job {unique_id} to {selected_worker} (Round-Robin)") |
|
|
|
|
|
|
|
|
response = requests.post( |
|
|
f"{worker_url}/process", |
|
|
json={ |
|
|
'unique_id': unique_id, |
|
|
'service_type': job['service_type'], |
|
|
'image_data': job['image_data'] |
|
|
}, |
|
|
timeout=15 |
|
|
) |
|
|
|
|
|
if response.status_code != 200: |
|
|
print(f"[Orchestrator] Worker {selected_worker} rejected job {unique_id}") |
|
|
job['status'] = 'queued' |
|
|
job['worker_id'] = None |
|
|
job['retry_count'] = job.get('retry_count', 0) + 1 |
|
|
|
|
|
|
|
|
if job['retry_count'] < 3: |
|
|
job_queue.append(unique_id) |
|
|
else: |
|
|
job['status'] = 'failed' |
|
|
job['error'] = 'Maximum retry attempts reached' |
|
|
|
|
|
workers[selected_worker]['status'] = 'idle' |
|
|
workers[selected_worker]['current_job'] = None |
|
|
workers[selected_worker]['consecutive_failures'] += 1 |
|
|
else: |
|
|
workers[selected_worker]['consecutive_failures'] = 0 |
|
|
|
|
|
except requests.exceptions.Timeout: |
|
|
print(f"[Orchestrator] Timeout sending job to worker {selected_worker}") |
|
|
job['status'] = 'queued' |
|
|
job['worker_id'] = None |
|
|
job['retry_count'] = job.get('retry_count', 0) + 1 |
|
|
|
|
|
if job['retry_count'] < 3: |
|
|
job_queue.appendleft(unique_id) |
|
|
else: |
|
|
job['status'] = 'failed' |
|
|
job['error'] = 'Worker timeout after multiple retries' |
|
|
|
|
|
workers[selected_worker]['status'] = 'idle' |
|
|
workers[selected_worker]['current_job'] = None |
|
|
workers[selected_worker]['consecutive_failures'] += 1 |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[Orchestrator] Error sending job to worker {selected_worker}: {str(e)}") |
|
|
job['status'] = 'queued' |
|
|
job['worker_id'] = None |
|
|
job['retry_count'] = job.get('retry_count', 0) + 1 |
|
|
|
|
|
if job['retry_count'] < 3: |
|
|
job_queue.appendleft(unique_id) |
|
|
else: |
|
|
job['status'] = 'failed' |
|
|
job['error'] = f'Connection error: {str(e)}' |
|
|
|
|
|
workers[selected_worker]['status'] = 'offline' |
|
|
workers[selected_worker]['current_job'] = None |
|
|
workers[selected_worker]['consecutive_failures'] += 1 |
|
|
|
|
|
def get_queue_position(unique_id): |
|
|
"""Get position of job in queue""" |
|
|
try: |
|
|
return list(job_queue).index(unique_id) + 1 |
|
|
except ValueError: |
|
|
return 0 |
|
|
|
|
|
def cleanup_old_jobs(): |
|
|
"""Clean up jobs older than 24 hours""" |
|
|
with lock: |
|
|
cutoff_time = datetime.now() - timedelta(hours=24) |
|
|
jobs_to_delete = [] |
|
|
|
|
|
for unique_id, job in jobs.items(): |
|
|
if job['created_at'] < cutoff_time: |
|
|
jobs_to_delete.append(unique_id) |
|
|
|
|
|
for unique_id in jobs_to_delete: |
|
|
del jobs[unique_id] |
|
|
print(f"[Orchestrator] Cleaned up old job: {unique_id}") |
|
|
|
|
|
def check_worker_health(): |
|
|
"""Check worker health and mark offline if no heartbeat""" |
|
|
with lock: |
|
|
cutoff_time = datetime.now() - timedelta(seconds=90) |
|
|
|
|
|
for worker_id, worker in workers.items(): |
|
|
if worker['last_heartbeat'] and worker['last_heartbeat'] < cutoff_time: |
|
|
if worker['status'] not in ['offline', 'unknown']: |
|
|
print(f"[Orchestrator] Worker {worker_id} marked offline (no heartbeat)") |
|
|
worker['status'] = 'offline' |
|
|
worker['consecutive_failures'] += 1 |
|
|
|
|
|
|
|
|
if worker['current_job']: |
|
|
job_id = worker['current_job'] |
|
|
if job_id in jobs and jobs[job_id]['status'] == 'processing': |
|
|
jobs[job_id]['status'] = 'queued' |
|
|
jobs[job_id]['worker_id'] = None |
|
|
job_queue.appendleft(job_id) |
|
|
print(f"[Orchestrator] Returned job {job_id} to queue") |
|
|
|
|
|
worker['current_job'] = None |
|
|
|
|
|
def periodic_maintenance(): |
|
|
"""Periodic cleanup and health checks""" |
|
|
while True: |
|
|
try: |
|
|
time.sleep(30) |
|
|
check_worker_health() |
|
|
cleanup_old_jobs() |
|
|
except Exception as e: |
|
|
print(f"[Orchestrator] Maintenance error: {e}") |
|
|
|
|
|
|
|
|
maintenance_thread = Thread(target=periodic_maintenance, daemon=True) |
|
|
maintenance_thread.start() |
|
|
|
|
|
if __name__ == '__main__': |
|
|
port = int(os.getenv('PORT', 7860)) |
|
|
print(f"[Orchestrator] Starting on port {port}") |
|
|
print(f"[Orchestrator] Workers configured: {len([w for w in WORKER_URLS if w.strip()])}") |
|
|
app.run(host='0.0.0.0', port=port, threaded=True) |