from flask import Flask, request, jsonify from datetime import datetime, timedelta import requests import uuid import json from threading import Lock from collections import deque import os app = Flask(__name__) # Configuration WORKER_URLS = os.getenv('WORKER_URLS', '').split(',') API_KEY = os.getenv('API_KEY', '') # Data structures workers = {} job_queue = deque() jobs = {} lock = Lock() # Initialize workers from environment for idx, url in enumerate(WORKER_URLS): if url.strip(): worker_id = f"worker-{idx+1}" workers[worker_id] = { 'url': url.strip(), 'status': 'unknown', 'last_heartbeat': None, 'total_processed': 0, 'current_job': None } def verify_api_key(): """Verify API key if configured""" if not API_KEY: return True auth_header = request.headers.get('Authorization', '') if auth_header.startswith('Bearer '): token = auth_header[7:] return token == API_KEY return False @app.route('/health', methods=['GET']) def health_check(): """Health check endpoint""" return jsonify({ 'status': 'online', 'timestamp': datetime.now().isoformat(), 'workers': len(workers), 'queue_size': len(job_queue), 'active_jobs': len([j for j in jobs.values() if j['status'] == 'processing']) }) @app.route('/submit', methods=['POST']) def submit_job(): """Submit a new job""" if not verify_api_key(): return jsonify({'error': 'Unauthorized'}), 401 data = request.json if not data or 'unique_id' not in data: return jsonify({'error': 'Missing unique_id'}), 400 if 'service_type' not in data: return jsonify({'error': 'Missing service_type'}), 400 if 'image_data' not in data: return jsonify({'error': 'Missing image_data'}), 400 unique_id = data['unique_id'] with lock: if unique_id in jobs: return jsonify({ 'message': 'Job already exists', 'unique_id': unique_id, 'status': jobs[unique_id]['status'] }), 200 jobs[unique_id] = { 'status': 'queued', 'service_type': data['service_type'], 'image_data': data['image_data'], 'result': None, 'created_at': datetime.now(), 'worker_id': None, 'error': None } job_queue.append(unique_id) assign_jobs_to_workers() return jsonify({ 'message': 'Job submitted successfully', 'unique_id': unique_id, 'queue_position': get_queue_position(unique_id) }), 202 @app.route('/status/', methods=['GET']) def check_status(unique_id): """Check job status""" if not verify_api_key(): return jsonify({'error': 'Unauthorized'}), 401 with lock: if unique_id not in jobs: return jsonify({'error': 'Job not found'}), 404 job = jobs[unique_id] response = { 'unique_id': unique_id, 'status': job['status'], 'created_at': job['created_at'].isoformat() } if job['status'] == 'completed': response['data'] = job['result'] response['service_type'] = job['service_type'] elif job['status'] == 'failed': # ✅ FIX: Return complete error data structure error_data = job.get('result', {}) if isinstance(job.get('result'), dict) else {} if 'error' in error_data: response['error'] = error_data['error'] response['data'] = error_data elif job.get('error'): response['error'] = job['error'] response['data'] = {'error': job['error']} else: response['error'] = 'Unknown error' response['data'] = {'error': 'Unknown error'} print(f"[Orchestrator] Failed job {unique_id} - Error: {response['error']}") elif job['status'] == 'queued': response['queue_position'] = get_queue_position(unique_id) elif job['status'] == 'processing': response['worker_id'] = job['worker_id'] return jsonify(response), 200 @app.route('/workers/status', methods=['GET']) def workers_status(): """Get status of all workers""" if not verify_api_key(): return jsonify({'error': 'Unauthorized'}), 401 with lock: worker_list = [] for worker_id, worker in workers.items(): worker_list.append({ 'id': worker_id, 'status': worker['status'], 'total_processed': worker['total_processed'], 'current_job': worker['current_job'], 'last_heartbeat': worker['last_heartbeat'].isoformat() if worker['last_heartbeat'] else None }) return jsonify({ 'workers': worker_list, 'total_workers': len(workers), 'active_workers': len([w for w in workers.values() if w['status'] == 'idle' or w['status'] == 'busy']), 'queue_size': len(job_queue) }), 200 @app.route('/worker/heartbeat', methods=['POST']) def worker_heartbeat(): """Worker heartbeat endpoint""" data = request.json if not data or 'worker_id' not in data: return jsonify({'error': 'Missing worker_id'}), 400 worker_id = data['worker_id'] with lock: if worker_id not in workers: workers[worker_id] = { 'url': data.get('url', ''), 'status': 'idle', 'last_heartbeat': datetime.now(), 'total_processed': 0, 'current_job': None } else: workers[worker_id]['status'] = data.get('status', 'idle') workers[worker_id]['last_heartbeat'] = datetime.now() if data.get('status') == 'idle' and workers[worker_id]['current_job']: workers[worker_id]['current_job'] = None assign_jobs_to_workers() return jsonify({'message': 'Heartbeat received'}), 200 @app.route('/worker/result', methods=['POST']) def worker_result(): """Worker submits job result""" data = request.json if not data or 'unique_id' not in data: return jsonify({'error': 'Missing unique_id'}), 400 unique_id = data['unique_id'] worker_id = data.get('worker_id') result_status = data.get('status', 'failed') print(f"[Orchestrator] Received result for {unique_id} - Status: {result_status}") with lock: if unique_id not in jobs: print(f"[Orchestrator] Job {unique_id} not found") return jsonify({'error': 'Job not found'}), 404 job = jobs[unique_id] # ✅ FIX: Store complete result structure if result_status == 'completed': job['status'] = 'completed' job['result'] = data.get('result', {}) print(f"[Orchestrator] Job {unique_id} completed successfully") else: # failed job['status'] = 'failed' # Store the complete error structure result_data = data.get('result', {}) if isinstance(result_data, dict): job['result'] = result_data job['error'] = result_data.get('error', 'Unknown error') elif isinstance(result_data, str): job['result'] = {'error': result_data} job['error'] = result_data else: error_msg = data.get('error', 'Unknown error') job['result'] = {'error': error_msg} job['error'] = error_msg print(f"[Orchestrator] Job {unique_id} failed - Error: {job['error']}") # Update worker if worker_id and worker_id in workers: workers[worker_id]['status'] = 'idle' workers[worker_id]['current_job'] = None workers[worker_id]['total_processed'] += 1 assign_jobs_to_workers() return jsonify({'message': 'Result received'}), 200 def assign_jobs_to_workers(): """Assign pending jobs to idle workers""" with lock: while job_queue: idle_worker = None for worker_id, worker in workers.items(): if worker['status'] == 'idle': idle_worker = worker_id break if not idle_worker: break unique_id = job_queue.popleft() if unique_id not in jobs: continue job = jobs[unique_id] job['status'] = 'processing' job['worker_id'] = idle_worker workers[idle_worker]['status'] = 'busy' workers[idle_worker]['current_job'] = unique_id try: worker_url = workers[idle_worker]['url'] print(f"[Orchestrator] Assigning job {unique_id} to {idle_worker}") response = requests.post( f"{worker_url}/process", json={ 'unique_id': unique_id, 'service_type': job['service_type'], 'image_data': job['image_data'] }, timeout=5 ) if response.status_code != 200: print(f"[Orchestrator] Worker {idle_worker} rejected job {unique_id}") job['status'] = 'queued' job['worker_id'] = None job_queue.append(unique_id) workers[idle_worker]['status'] = 'idle' workers[idle_worker]['current_job'] = None except Exception as e: print(f"[Orchestrator] Error sending job to worker {idle_worker}: {str(e)}") job['status'] = 'queued' job['worker_id'] = None job_queue.append(unique_id) workers[idle_worker]['status'] = 'offline' workers[idle_worker]['current_job'] = None def get_queue_position(unique_id): """Get position of job in queue""" try: return list(job_queue).index(unique_id) + 1 except ValueError: return 0 def cleanup_old_jobs(): """Clean up jobs older than 24 hours""" with lock: cutoff_time = datetime.now() - timedelta(hours=24) jobs_to_delete = [] for unique_id, job in jobs.items(): if job['created_at'] < cutoff_time: jobs_to_delete.append(unique_id) for unique_id in jobs_to_delete: del jobs[unique_id] print(f"[Orchestrator] Cleaned up old job: {unique_id}") from threading import Timer def periodic_cleanup(): cleanup_old_jobs() Timer(3600, periodic_cleanup).start() periodic_cleanup() if __name__ == '__main__': port = int(os.getenv('PORT', 7860)) print(f"[Orchestrator] Starting on port {port}") print(f"[Orchestrator] Workers configured: {len([w for w in WORKER_URLS if w.strip()])}") app.run(host='0.0.0.0', port=port)