from flask import Flask, request, jsonify from datetime import datetime, timedelta import requests import uuid import json from threading import Lock from collections import deque import os app = Flask(__name__) # Configuration WORKER_URLS = os.getenv('WORKER_URLS', '').split(',') # Comma-separated worker URLs API_KEY = os.getenv('API_KEY', '') # Optional API key for security # Data structures workers = {} # {worker_id: {url, status, last_heartbeat, total_processed}} job_queue = deque() # Queue of pending jobs jobs = {} # {job_id: {status, result, created_at, worker_id}} lock = Lock() # Initialize workers from environment for idx, url in enumerate(WORKER_URLS): if url.strip(): worker_id = f"worker-{idx+1}" workers[worker_id] = { 'url': url.strip(), 'status': 'unknown', 'last_heartbeat': None, 'total_processed': 0, 'current_job': None } def verify_api_key(): """Verify API key if configured""" if not API_KEY: return True auth_header = request.headers.get('Authorization', '') if auth_header.startswith('Bearer '): token = auth_header[7:] return token == API_KEY return False @app.route('/health', methods=['GET']) def health_check(): """Health check endpoint""" return jsonify({ 'status': 'online', 'timestamp': datetime.now().isoformat(), 'workers': len(workers), 'queue_size': len(job_queue), 'active_jobs': len([j for j in jobs.values() if j['status'] == 'processing']) }) @app.route('/submit', methods=['POST']) def submit_job(): """Submit a new job""" if not verify_api_key(): return jsonify({'error': 'Unauthorized'}), 401 data = request.json if not data or 'unique_id' not in data: return jsonify({'error': 'Missing unique_id'}), 400 if 'service_type' not in data: return jsonify({'error': 'Missing service_type'}), 400 if 'image_data' not in data: return jsonify({'error': 'Missing image_data'}), 400 unique_id = data['unique_id'] with lock: # Check if job already exists if unique_id in jobs: return jsonify({ 'message': 'Job already exists', 'unique_id': unique_id, 'status': jobs[unique_id]['status'] }), 200 # Create job jobs[unique_id] = { 'status': 'queued', 'service_type': data['service_type'], 'image_data': data['image_data'], 'result': None, 'created_at': datetime.now(), 'worker_id': None, 'error': None } # Add to queue job_queue.append(unique_id) # Try to assign to worker immediately assign_jobs_to_workers() return jsonify({ 'message': 'Job submitted successfully', 'unique_id': unique_id, 'queue_position': get_queue_position(unique_id) }), 202 @app.route('/status/', methods=['GET']) def check_status(unique_id): """Check job status""" if not verify_api_key(): return jsonify({'error': 'Unauthorized'}), 401 with lock: if unique_id not in jobs: return jsonify({'error': 'Job not found'}), 404 job = jobs[unique_id] response = { 'unique_id': unique_id, 'status': job['status'], 'created_at': job['created_at'].isoformat() } if job['status'] == 'completed': response['data'] = job['result'] elif job['status'] == 'failed': response['error'] = job.get('error', 'Unknown error') elif job['status'] == 'queued': response['queue_position'] = get_queue_position(unique_id) elif job['status'] == 'processing': response['worker_id'] = job['worker_id'] return jsonify(response), 200 @app.route('/workers/status', methods=['GET']) def workers_status(): """Get status of all workers""" if not verify_api_key(): return jsonify({'error': 'Unauthorized'}), 401 with lock: worker_list = [] for worker_id, worker in workers.items(): worker_list.append({ 'id': worker_id, 'status': worker['status'], 'total_processed': worker['total_processed'], 'current_job': worker['current_job'], 'last_heartbeat': worker['last_heartbeat'].isoformat() if worker['last_heartbeat'] else None }) return jsonify({ 'workers': worker_list, 'total_workers': len(workers), 'active_workers': len([w for w in workers.values() if w['status'] == 'idle' or w['status'] == 'busy']), 'queue_size': len(job_queue) }), 200 @app.route('/worker/heartbeat', methods=['POST']) def worker_heartbeat(): """Worker heartbeat endpoint""" data = request.json if not data or 'worker_id' not in data: return jsonify({'error': 'Missing worker_id'}), 400 worker_id = data['worker_id'] with lock: if worker_id not in workers: # Register new worker workers[worker_id] = { 'url': data.get('url', ''), 'status': 'idle', 'last_heartbeat': datetime.now(), 'total_processed': 0, 'current_job': None } else: # Update existing worker workers[worker_id]['status'] = data.get('status', 'idle') workers[worker_id]['last_heartbeat'] = datetime.now() if data.get('status') == 'idle' and workers[worker_id]['current_job']: # Worker finished job workers[worker_id]['current_job'] = None # Try to assign jobs assign_jobs_to_workers() return jsonify({'message': 'Heartbeat received'}), 200 @app.route('/worker/result', methods=['POST']) def worker_result(): """Worker submits job result""" data = request.json if not data or 'unique_id' not in data: return jsonify({'error': 'Missing unique_id'}), 400 unique_id = data['unique_id'] worker_id = data.get('worker_id') with lock: if unique_id not in jobs: return jsonify({'error': 'Job not found'}), 404 job = jobs[unique_id] if data.get('status') == 'completed': job['status'] = 'completed' job['result'] = data.get('result') else: job['status'] = 'failed' job['error'] = data.get('error', 'Unknown error') # Update worker if worker_id and worker_id in workers: workers[worker_id]['status'] = 'idle' workers[worker_id]['current_job'] = None workers[worker_id]['total_processed'] += 1 # Try to assign next job assign_jobs_to_workers() return jsonify({'message': 'Result received'}), 200 def assign_jobs_to_workers(): """Assign pending jobs to idle workers""" with lock: while job_queue: # Find idle worker idle_worker = None for worker_id, worker in workers.items(): if worker['status'] == 'idle': idle_worker = worker_id break if not idle_worker: break # No idle workers available # Get job from queue unique_id = job_queue.popleft() if unique_id not in jobs: continue job = jobs[unique_id] # Assign job to worker job['status'] = 'processing' job['worker_id'] = idle_worker workers[idle_worker]['status'] = 'busy' workers[idle_worker]['current_job'] = unique_id # Send job to worker try: worker_url = workers[idle_worker]['url'] response = requests.post( f"{worker_url}/process", json={ 'unique_id': unique_id, 'service_type': job['service_type'], 'image_data': job['image_data'] }, timeout=5 ) if response.status_code != 200: # Worker failed to accept job job['status'] = 'queued' job['worker_id'] = None job_queue.append(unique_id) workers[idle_worker]['status'] = 'idle' workers[idle_worker]['current_job'] = None except Exception as e: # Worker unreachable print(f"Error sending job to worker {idle_worker}: {str(e)}") job['status'] = 'queued' job['worker_id'] = None job_queue.append(unique_id) workers[idle_worker]['status'] = 'offline' workers[idle_worker]['current_job'] = None def get_queue_position(unique_id): """Get position of job in queue""" try: return list(job_queue).index(unique_id) + 1 except ValueError: return 0 def cleanup_old_jobs(): """Clean up jobs older than 24 hours""" with lock: cutoff_time = datetime.now() - timedelta(hours=24) jobs_to_delete = [] for unique_id, job in jobs.items(): if job['created_at'] < cutoff_time: jobs_to_delete.append(unique_id) for unique_id in jobs_to_delete: del jobs[unique_id] # Cleanup task (run periodically) from threading import Timer def periodic_cleanup(): cleanup_old_jobs() Timer(3600, periodic_cleanup).start() # Run every hour periodic_cleanup() if __name__ == '__main__': port = int(os.getenv('PORT', 7860)) app.run(host='0.0.0.0', port=port)