from flask import Flask, request, jsonify from datetime import datetime, timedelta import requests import uuid import json from threading import Lock, Thread from collections import deque import os import time app = Flask(__name__) # Configuration WORKER_URLS = os.getenv('WORKER_URLS', '').split(',') API_KEY = os.getenv('API_KEY', '') # Data structures workers = {} job_queue = deque() jobs = {} lock = Lock() # ✅ Round-Robin Load Balancing worker_assignment_index = 0 worker_assignment_lock = Lock() # ✅ Worker health monitoring worker_health_check_interval = 60 # seconds # Initialize workers from environment for idx, url in enumerate(WORKER_URLS): if url.strip(): worker_id = f"worker-{idx+1}" workers[worker_id] = { 'url': url.strip(), 'status': 'unknown', 'last_heartbeat': None, 'total_processed': 0, 'current_job': None, 'consecutive_failures': 0 # ✅ Track failures } def verify_api_key(): """Verify API key if configured""" if not API_KEY: return True auth_header = request.headers.get('Authorization', '') if auth_header.startswith('Bearer '): token = auth_header[7:] return token == API_KEY return False @app.route('/health', methods=['GET']) def health_check(): """Health check endpoint""" with lock: idle_workers = len([w for w in workers.values() if w['status'] == 'idle']) busy_workers = len([w for w in workers.values() if w['status'] == 'busy']) offline_workers = len([w for w in workers.values() if w['status'] == 'offline']) return jsonify({ 'status': 'online', 'timestamp': datetime.now().isoformat(), 'workers': { 'total': len(workers), 'idle': idle_workers, 'busy': busy_workers, 'offline': offline_workers }, 'queue_size': len(job_queue), 'active_jobs': len([j for j in jobs.values() if j['status'] == 'processing']) }) @app.route('/submit', methods=['POST']) def submit_job(): """Submit a new job""" if not verify_api_key(): return jsonify({'error': 'Unauthorized'}), 401 data = request.json if not data or 'unique_id' not in data: return jsonify({'error': 'Missing unique_id'}), 400 if 'service_type' not in data: return jsonify({'error': 'Missing service_type'}), 400 if 'image_data' not in data: return jsonify({'error': 'Missing image_data'}), 400 unique_id = data['unique_id'] with lock: if unique_id in jobs: return jsonify({ 'message': 'Job already exists', 'unique_id': unique_id, 'status': jobs[unique_id]['status'] }), 200 # ✅ Check if any workers are available available_workers = [w for w in workers.values() if w['status'] in ['idle', 'busy', 'unknown']] if not available_workers: return jsonify({ 'error': 'No workers available', 'message': 'All processing servers are offline. Please try again later.' }), 503 jobs[unique_id] = { 'status': 'queued', 'service_type': data['service_type'], 'image_data': data['image_data'], 'result': None, 'created_at': datetime.now(), 'worker_id': None, 'error': None, 'retry_count': 0 # ✅ Track retries } job_queue.append(unique_id) # ✅ Start assignment in background to avoid blocking Thread(target=assign_jobs_to_workers, daemon=True).start() return jsonify({ 'message': 'Job submitted successfully', 'unique_id': unique_id, 'queue_position': get_queue_position(unique_id) }), 202 @app.route('/status/', methods=['GET']) def check_status(unique_id): """Check job status""" if not verify_api_key(): return jsonify({'error': 'Unauthorized'}), 401 with lock: if unique_id not in jobs: return jsonify({'error': 'Job not found'}), 404 job = jobs[unique_id] response = { 'unique_id': unique_id, 'status': job['status'], 'created_at': job['created_at'].isoformat() } if job['status'] == 'completed': response['data'] = job['result'] response['service_type'] = job['service_type'] elif job['status'] == 'failed': error_data = job.get('result', {}) if isinstance(job.get('result'), dict) else {} if 'error' in error_data: response['error'] = error_data['error'] response['data'] = error_data elif job.get('error'): response['error'] = job['error'] response['data'] = {'error': job['error']} else: response['error'] = 'Unknown error' response['data'] = {'error': 'Unknown error'} print(f"[Orchestrator] Failed job {unique_id} - Error: {response['error']}") elif job['status'] == 'queued': response['queue_position'] = get_queue_position(unique_id) elif job['status'] == 'processing': response['worker_id'] = job['worker_id'] return jsonify(response), 200 @app.route('/workers/status', methods=['GET']) def workers_status(): """Get status of all workers""" if not verify_api_key(): return jsonify({'error': 'Unauthorized'}), 401 with lock: worker_list = [] for worker_id, worker in workers.items(): worker_list.append({ 'id': worker_id, 'status': worker['status'], 'total_processed': worker['total_processed'], 'current_job': worker['current_job'], 'last_heartbeat': worker['last_heartbeat'].isoformat() if worker['last_heartbeat'] else None, 'consecutive_failures': worker.get('consecutive_failures', 0) }) return jsonify({ 'workers': worker_list, 'total_workers': len(workers), 'active_workers': len([w for w in workers.values() if w['status'] in ['idle', 'busy']]), 'queue_size': len(job_queue) }), 200 @app.route('/worker/heartbeat', methods=['POST']) def worker_heartbeat(): """Worker heartbeat endpoint""" data = request.json if not data or 'worker_id' not in data: return jsonify({'error': 'Missing worker_id'}), 400 worker_id = data['worker_id'] with lock: if worker_id not in workers: workers[worker_id] = { 'url': data.get('url', ''), 'status': 'idle', 'last_heartbeat': datetime.now(), 'total_processed': 0, 'current_job': None, 'consecutive_failures': 0 } else: workers[worker_id]['status'] = data.get('status', 'idle') workers[worker_id]['last_heartbeat'] = datetime.now() workers[worker_id]['consecutive_failures'] = 0 # ✅ Reset on successful heartbeat if data.get('status') == 'idle' and workers[worker_id]['current_job']: workers[worker_id]['current_job'] = None # ✅ Trigger assignment after heartbeat Thread(target=assign_jobs_to_workers, daemon=True).start() return jsonify({'message': 'Heartbeat received'}), 200 @app.route('/worker/result', methods=['POST']) def worker_result(): """Worker submits job result""" data = request.json if not data or 'unique_id' not in data: return jsonify({'error': 'Missing unique_id'}), 400 unique_id = data['unique_id'] worker_id = data.get('worker_id') result_status = data.get('status', 'failed') print(f"[Orchestrator] Received result for {unique_id} - Status: {result_status}") with lock: if unique_id not in jobs: print(f"[Orchestrator] Job {unique_id} not found") return jsonify({'error': 'Job not found'}), 404 job = jobs[unique_id] if result_status == 'completed': job['status'] = 'completed' job['result'] = data.get('result', {}) print(f"[Orchestrator] Job {unique_id} completed successfully") else: # failed job['status'] = 'failed' result_data = data.get('result', {}) if isinstance(result_data, dict): job['result'] = result_data job['error'] = result_data.get('error', 'Unknown error') elif isinstance(result_data, str): job['result'] = {'error': result_data} job['error'] = result_data else: error_msg = data.get('error', 'Unknown error') job['result'] = {'error': error_msg} job['error'] = error_msg print(f"[Orchestrator] Job {unique_id} failed - Error: {job['error']}") # Update worker if worker_id and worker_id in workers: workers[worker_id]['status'] = 'idle' workers[worker_id]['current_job'] = None workers[worker_id]['total_processed'] += 1 # ✅ Trigger next assignment Thread(target=assign_jobs_to_workers, daemon=True).start() return jsonify({'message': 'Result received'}), 200 def assign_jobs_to_workers(): """Assign pending jobs to idle workers with Round-Robin""" global worker_assignment_index with lock: while job_queue: # Get list of idle workers idle_workers = [wid for wid, w in workers.items() if w['status'] == 'idle' and w.get('consecutive_failures', 0) < 3] if not idle_workers: print("[Orchestrator] No idle workers available") break # ✅ Round-Robin selection with worker_assignment_lock: selected_worker = idle_workers[worker_assignment_index % len(idle_workers)] worker_assignment_index += 1 unique_id = job_queue.popleft() if unique_id not in jobs: continue job = jobs[unique_id] job['status'] = 'processing' job['worker_id'] = selected_worker workers[selected_worker]['status'] = 'busy' workers[selected_worker]['current_job'] = unique_id try: worker_url = workers[selected_worker]['url'] print(f"[Orchestrator] Assigning job {unique_id} to {selected_worker} (Round-Robin)") # ✅ افزایش timeout و non-blocking response = requests.post( f"{worker_url}/process", json={ 'unique_id': unique_id, 'service_type': job['service_type'], 'image_data': job['image_data'] }, timeout=15 # افزایش timeout ) if response.status_code != 200: print(f"[Orchestrator] Worker {selected_worker} rejected job {unique_id}") job['status'] = 'queued' job['worker_id'] = None job['retry_count'] = job.get('retry_count', 0) + 1 # ✅ Retry logic if job['retry_count'] < 3: job_queue.append(unique_id) else: job['status'] = 'failed' job['error'] = 'Maximum retry attempts reached' workers[selected_worker]['status'] = 'idle' workers[selected_worker]['current_job'] = None workers[selected_worker]['consecutive_failures'] += 1 else: workers[selected_worker]['consecutive_failures'] = 0 except requests.exceptions.Timeout: print(f"[Orchestrator] Timeout sending job to worker {selected_worker}") job['status'] = 'queued' job['worker_id'] = None job['retry_count'] = job.get('retry_count', 0) + 1 if job['retry_count'] < 3: job_queue.appendleft(unique_id) # Put back at front else: job['status'] = 'failed' job['error'] = 'Worker timeout after multiple retries' workers[selected_worker]['status'] = 'idle' workers[selected_worker]['current_job'] = None workers[selected_worker]['consecutive_failures'] += 1 except Exception as e: print(f"[Orchestrator] Error sending job to worker {selected_worker}: {str(e)}") job['status'] = 'queued' job['worker_id'] = None job['retry_count'] = job.get('retry_count', 0) + 1 if job['retry_count'] < 3: job_queue.appendleft(unique_id) else: job['status'] = 'failed' job['error'] = f'Connection error: {str(e)}' workers[selected_worker]['status'] = 'offline' workers[selected_worker]['current_job'] = None workers[selected_worker]['consecutive_failures'] += 1 def get_queue_position(unique_id): """Get position of job in queue""" try: return list(job_queue).index(unique_id) + 1 except ValueError: return 0 def cleanup_old_jobs(): """Clean up jobs older than 24 hours""" with lock: cutoff_time = datetime.now() - timedelta(hours=24) jobs_to_delete = [] for unique_id, job in jobs.items(): if job['created_at'] < cutoff_time: jobs_to_delete.append(unique_id) for unique_id in jobs_to_delete: del jobs[unique_id] print(f"[Orchestrator] Cleaned up old job: {unique_id}") def check_worker_health(): """Check worker health and mark offline if no heartbeat""" with lock: cutoff_time = datetime.now() - timedelta(seconds=90) # 90 seconds timeout for worker_id, worker in workers.items(): if worker['last_heartbeat'] and worker['last_heartbeat'] < cutoff_time: if worker['status'] not in ['offline', 'unknown']: print(f"[Orchestrator] Worker {worker_id} marked offline (no heartbeat)") worker['status'] = 'offline' worker['consecutive_failures'] += 1 # Return job to queue if worker was processing if worker['current_job']: job_id = worker['current_job'] if job_id in jobs and jobs[job_id]['status'] == 'processing': jobs[job_id]['status'] = 'queued' jobs[job_id]['worker_id'] = None job_queue.appendleft(job_id) print(f"[Orchestrator] Returned job {job_id} to queue") worker['current_job'] = None def periodic_maintenance(): """Periodic cleanup and health checks""" while True: try: time.sleep(30) check_worker_health() cleanup_old_jobs() except Exception as e: print(f"[Orchestrator] Maintenance error: {e}") # Start maintenance thread maintenance_thread = Thread(target=periodic_maintenance, daemon=True) maintenance_thread.start() if __name__ == '__main__': port = int(os.getenv('PORT', 7860)) print(f"[Orchestrator] Starting on port {port}") print(f"[Orchestrator] Workers configured: {len([w for w in WORKER_URLS if w.strip()])}") app.run(host='0.0.0.0', port=port, threaded=True)