danicor commited on
Commit
f4c7a04
·
verified ·
1 Parent(s): a1dc719

Create app.py

Browse files
Files changed (1) hide show
  1. app.py +450 -0
app.py ADDED
@@ -0,0 +1,450 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from flask import Flask, request, jsonify
2
+ from datetime import datetime, timedelta
3
+ import requests
4
+ import uuid
5
+ import json
6
+ from threading import Lock, Thread
7
+ from collections import deque
8
+ import os
9
+ import time
10
+
11
+ app = Flask(__name__)
12
+
13
+ # Configuration
14
+ WORKER_URLS = os.getenv('WORKER_URLS', '').split(',')
15
+ API_KEY = os.getenv('API_KEY', '')
16
+
17
+ # Data structures
18
+ workers = {}
19
+ job_queue = deque()
20
+ jobs = {}
21
+ lock = Lock()
22
+
23
+ # ✅ Round-Robin Load Balancing
24
+ worker_assignment_index = 0
25
+ worker_assignment_lock = Lock()
26
+
27
+ # ✅ Worker health monitoring
28
+ worker_health_check_interval = 60 # seconds
29
+
30
+ # Initialize workers from environment
31
+ for idx, url in enumerate(WORKER_URLS):
32
+ if url.strip():
33
+ worker_id = f"worker-{idx+1}"
34
+ workers[worker_id] = {
35
+ 'url': url.strip(),
36
+ 'status': 'unknown',
37
+ 'last_heartbeat': None,
38
+ 'total_processed': 0,
39
+ 'current_job': None,
40
+ 'consecutive_failures': 0 # ✅ Track failures
41
+ }
42
+
43
+ def verify_api_key():
44
+ """Verify API key if configured"""
45
+ if not API_KEY:
46
+ return True
47
+
48
+ auth_header = request.headers.get('Authorization', '')
49
+ if auth_header.startswith('Bearer '):
50
+ token = auth_header[7:]
51
+ return token == API_KEY
52
+ return False
53
+
54
+ @app.route('/health', methods=['GET'])
55
+ def health_check():
56
+ """Health check endpoint"""
57
+ with lock:
58
+ idle_workers = len([w for w in workers.values() if w['status'] == 'idle'])
59
+ busy_workers = len([w for w in workers.values() if w['status'] == 'busy'])
60
+ offline_workers = len([w for w in workers.values() if w['status'] == 'offline'])
61
+
62
+ return jsonify({
63
+ 'status': 'online',
64
+ 'timestamp': datetime.now().isoformat(),
65
+ 'workers': {
66
+ 'total': len(workers),
67
+ 'idle': idle_workers,
68
+ 'busy': busy_workers,
69
+ 'offline': offline_workers
70
+ },
71
+ 'queue_size': len(job_queue),
72
+ 'active_jobs': len([j for j in jobs.values() if j['status'] == 'processing'])
73
+ })
74
+
75
+ @app.route('/submit', methods=['POST'])
76
+ def submit_job():
77
+ """Submit a new job"""
78
+ if not verify_api_key():
79
+ return jsonify({'error': 'Unauthorized'}), 401
80
+
81
+ data = request.json
82
+
83
+ if not data or 'unique_id' not in data:
84
+ return jsonify({'error': 'Missing unique_id'}), 400
85
+
86
+ if 'service_type' not in data:
87
+ return jsonify({'error': 'Missing service_type'}), 400
88
+
89
+ if 'image_data' not in data:
90
+ return jsonify({'error': 'Missing image_data'}), 400
91
+
92
+ unique_id = data['unique_id']
93
+
94
+ with lock:
95
+ if unique_id in jobs:
96
+ return jsonify({
97
+ 'message': 'Job already exists',
98
+ 'unique_id': unique_id,
99
+ 'status': jobs[unique_id]['status']
100
+ }), 200
101
+
102
+ # ✅ Check if any workers are available
103
+ available_workers = [w for w in workers.values() if w['status'] in ['idle', 'busy', 'unknown']]
104
+ if not available_workers:
105
+ return jsonify({
106
+ 'error': 'No workers available',
107
+ 'message': 'All processing servers are offline. Please try again later.'
108
+ }), 503
109
+
110
+ jobs[unique_id] = {
111
+ 'status': 'queued',
112
+ 'service_type': data['service_type'],
113
+ 'image_data': data['image_data'],
114
+ 'result': None,
115
+ 'created_at': datetime.now(),
116
+ 'worker_id': None,
117
+ 'error': None,
118
+ 'retry_count': 0 # ✅ Track retries
119
+ }
120
+
121
+ job_queue.append(unique_id)
122
+
123
+ # ✅ Start assignment in background to avoid blocking
124
+ Thread(target=assign_jobs_to_workers, daemon=True).start()
125
+
126
+ return jsonify({
127
+ 'message': 'Job submitted successfully',
128
+ 'unique_id': unique_id,
129
+ 'queue_position': get_queue_position(unique_id)
130
+ }), 202
131
+
132
+ @app.route('/status/<unique_id>', methods=['GET'])
133
+ def check_status(unique_id):
134
+ """Check job status"""
135
+ if not verify_api_key():
136
+ return jsonify({'error': 'Unauthorized'}), 401
137
+
138
+ with lock:
139
+ if unique_id not in jobs:
140
+ return jsonify({'error': 'Job not found'}), 404
141
+
142
+ job = jobs[unique_id]
143
+
144
+ response = {
145
+ 'unique_id': unique_id,
146
+ 'status': job['status'],
147
+ 'created_at': job['created_at'].isoformat()
148
+ }
149
+
150
+ if job['status'] == 'completed':
151
+ response['data'] = job['result']
152
+ response['service_type'] = job['service_type']
153
+
154
+ elif job['status'] == 'failed':
155
+ error_data = job.get('result', {}) if isinstance(job.get('result'), dict) else {}
156
+
157
+ if 'error' in error_data:
158
+ response['error'] = error_data['error']
159
+ response['data'] = error_data
160
+ elif job.get('error'):
161
+ response['error'] = job['error']
162
+ response['data'] = {'error': job['error']}
163
+ else:
164
+ response['error'] = 'Unknown error'
165
+ response['data'] = {'error': 'Unknown error'}
166
+
167
+ print(f"[Orchestrator] Failed job {unique_id} - Error: {response['error']}")
168
+
169
+ elif job['status'] == 'queued':
170
+ response['queue_position'] = get_queue_position(unique_id)
171
+
172
+ elif job['status'] == 'processing':
173
+ response['worker_id'] = job['worker_id']
174
+
175
+ return jsonify(response), 200
176
+
177
+ @app.route('/workers/status', methods=['GET'])
178
+ def workers_status():
179
+ """Get status of all workers"""
180
+ if not verify_api_key():
181
+ return jsonify({'error': 'Unauthorized'}), 401
182
+
183
+ with lock:
184
+ worker_list = []
185
+ for worker_id, worker in workers.items():
186
+ worker_list.append({
187
+ 'id': worker_id,
188
+ 'status': worker['status'],
189
+ 'total_processed': worker['total_processed'],
190
+ 'current_job': worker['current_job'],
191
+ 'last_heartbeat': worker['last_heartbeat'].isoformat() if worker['last_heartbeat'] else None,
192
+ 'consecutive_failures': worker.get('consecutive_failures', 0)
193
+ })
194
+
195
+ return jsonify({
196
+ 'workers': worker_list,
197
+ 'total_workers': len(workers),
198
+ 'active_workers': len([w for w in workers.values() if w['status'] in ['idle', 'busy']]),
199
+ 'queue_size': len(job_queue)
200
+ }), 200
201
+
202
+ @app.route('/worker/heartbeat', methods=['POST'])
203
+ def worker_heartbeat():
204
+ """Worker heartbeat endpoint"""
205
+ data = request.json
206
+
207
+ if not data or 'worker_id' not in data:
208
+ return jsonify({'error': 'Missing worker_id'}), 400
209
+
210
+ worker_id = data['worker_id']
211
+
212
+ with lock:
213
+ if worker_id not in workers:
214
+ workers[worker_id] = {
215
+ 'url': data.get('url', ''),
216
+ 'status': 'idle',
217
+ 'last_heartbeat': datetime.now(),
218
+ 'total_processed': 0,
219
+ 'current_job': None,
220
+ 'consecutive_failures': 0
221
+ }
222
+ else:
223
+ workers[worker_id]['status'] = data.get('status', 'idle')
224
+ workers[worker_id]['last_heartbeat'] = datetime.now()
225
+ workers[worker_id]['consecutive_failures'] = 0 # ✅ Reset on successful heartbeat
226
+
227
+ if data.get('status') == 'idle' and workers[worker_id]['current_job']:
228
+ workers[worker_id]['current_job'] = None
229
+
230
+ # ✅ Trigger assignment after heartbeat
231
+ Thread(target=assign_jobs_to_workers, daemon=True).start()
232
+
233
+ return jsonify({'message': 'Heartbeat received'}), 200
234
+
235
+ @app.route('/worker/result', methods=['POST'])
236
+ def worker_result():
237
+ """Worker submits job result"""
238
+ data = request.json
239
+
240
+ if not data or 'unique_id' not in data:
241
+ return jsonify({'error': 'Missing unique_id'}), 400
242
+
243
+ unique_id = data['unique_id']
244
+ worker_id = data.get('worker_id')
245
+ result_status = data.get('status', 'failed')
246
+
247
+ print(f"[Orchestrator] Received result for {unique_id} - Status: {result_status}")
248
+
249
+ with lock:
250
+ if unique_id not in jobs:
251
+ print(f"[Orchestrator] Job {unique_id} not found")
252
+ return jsonify({'error': 'Job not found'}), 404
253
+
254
+ job = jobs[unique_id]
255
+
256
+ if result_status == 'completed':
257
+ job['status'] = 'completed'
258
+ job['result'] = data.get('result', {})
259
+ print(f"[Orchestrator] Job {unique_id} completed successfully")
260
+
261
+ else: # failed
262
+ job['status'] = 'failed'
263
+
264
+ result_data = data.get('result', {})
265
+
266
+ if isinstance(result_data, dict):
267
+ job['result'] = result_data
268
+ job['error'] = result_data.get('error', 'Unknown error')
269
+ elif isinstance(result_data, str):
270
+ job['result'] = {'error': result_data}
271
+ job['error'] = result_data
272
+ else:
273
+ error_msg = data.get('error', 'Unknown error')
274
+ job['result'] = {'error': error_msg}
275
+ job['error'] = error_msg
276
+
277
+ print(f"[Orchestrator] Job {unique_id} failed - Error: {job['error']}")
278
+
279
+ # Update worker
280
+ if worker_id and worker_id in workers:
281
+ workers[worker_id]['status'] = 'idle'
282
+ workers[worker_id]['current_job'] = None
283
+ workers[worker_id]['total_processed'] += 1
284
+
285
+ # ✅ Trigger next assignment
286
+ Thread(target=assign_jobs_to_workers, daemon=True).start()
287
+
288
+ return jsonify({'message': 'Result received'}), 200
289
+
290
+ def assign_jobs_to_workers():
291
+ """Assign pending jobs to idle workers with Round-Robin"""
292
+ global worker_assignment_index
293
+
294
+ with lock:
295
+ while job_queue:
296
+ # Get list of idle workers
297
+ idle_workers = [wid for wid, w in workers.items()
298
+ if w['status'] == 'idle' and w.get('consecutive_failures', 0) < 3]
299
+
300
+ if not idle_workers:
301
+ print("[Orchestrator] No idle workers available")
302
+ break
303
+
304
+ # ✅ Round-Robin selection
305
+ with worker_assignment_lock:
306
+ selected_worker = idle_workers[worker_assignment_index % len(idle_workers)]
307
+ worker_assignment_index += 1
308
+
309
+ unique_id = job_queue.popleft()
310
+
311
+ if unique_id not in jobs:
312
+ continue
313
+
314
+ job = jobs[unique_id]
315
+
316
+ job['status'] = 'processing'
317
+ job['worker_id'] = selected_worker
318
+
319
+ workers[selected_worker]['status'] = 'busy'
320
+ workers[selected_worker]['current_job'] = unique_id
321
+
322
+ try:
323
+ worker_url = workers[selected_worker]['url']
324
+ print(f"[Orchestrator] Assigning job {unique_id} to {selected_worker} (Round-Robin)")
325
+
326
+ # ✅ افزایش timeout و non-blocking
327
+ response = requests.post(
328
+ f"{worker_url}/process",
329
+ json={
330
+ 'unique_id': unique_id,
331
+ 'service_type': job['service_type'],
332
+ 'image_data': job['image_data']
333
+ },
334
+ timeout=15 # افزایش timeout
335
+ )
336
+
337
+ if response.status_code != 200:
338
+ print(f"[Orchestrator] Worker {selected_worker} rejected job {unique_id}")
339
+ job['status'] = 'queued'
340
+ job['worker_id'] = None
341
+ job['retry_count'] = job.get('retry_count', 0) + 1
342
+
343
+ # ✅ Retry logic
344
+ if job['retry_count'] < 3:
345
+ job_queue.append(unique_id)
346
+ else:
347
+ job['status'] = 'failed'
348
+ job['error'] = 'Maximum retry attempts reached'
349
+
350
+ workers[selected_worker]['status'] = 'idle'
351
+ workers[selected_worker]['current_job'] = None
352
+ workers[selected_worker]['consecutive_failures'] += 1
353
+ else:
354
+ workers[selected_worker]['consecutive_failures'] = 0
355
+
356
+ except requests.exceptions.Timeout:
357
+ print(f"[Orchestrator] Timeout sending job to worker {selected_worker}")
358
+ job['status'] = 'queued'
359
+ job['worker_id'] = None
360
+ job['retry_count'] = job.get('retry_count', 0) + 1
361
+
362
+ if job['retry_count'] < 3:
363
+ job_queue.appendleft(unique_id) # Put back at front
364
+ else:
365
+ job['status'] = 'failed'
366
+ job['error'] = 'Worker timeout after multiple retries'
367
+
368
+ workers[selected_worker]['status'] = 'idle'
369
+ workers[selected_worker]['current_job'] = None
370
+ workers[selected_worker]['consecutive_failures'] += 1
371
+
372
+ except Exception as e:
373
+ print(f"[Orchestrator] Error sending job to worker {selected_worker}: {str(e)}")
374
+ job['status'] = 'queued'
375
+ job['worker_id'] = None
376
+ job['retry_count'] = job.get('retry_count', 0) + 1
377
+
378
+ if job['retry_count'] < 3:
379
+ job_queue.appendleft(unique_id)
380
+ else:
381
+ job['status'] = 'failed'
382
+ job['error'] = f'Connection error: {str(e)}'
383
+
384
+ workers[selected_worker]['status'] = 'offline'
385
+ workers[selected_worker]['current_job'] = None
386
+ workers[selected_worker]['consecutive_failures'] += 1
387
+
388
+ def get_queue_position(unique_id):
389
+ """Get position of job in queue"""
390
+ try:
391
+ return list(job_queue).index(unique_id) + 1
392
+ except ValueError:
393
+ return 0
394
+
395
+ def cleanup_old_jobs():
396
+ """Clean up jobs older than 24 hours"""
397
+ with lock:
398
+ cutoff_time = datetime.now() - timedelta(hours=24)
399
+ jobs_to_delete = []
400
+
401
+ for unique_id, job in jobs.items():
402
+ if job['created_at'] < cutoff_time:
403
+ jobs_to_delete.append(unique_id)
404
+
405
+ for unique_id in jobs_to_delete:
406
+ del jobs[unique_id]
407
+ print(f"[Orchestrator] Cleaned up old job: {unique_id}")
408
+
409
+ def check_worker_health():
410
+ """Check worker health and mark offline if no heartbeat"""
411
+ with lock:
412
+ cutoff_time = datetime.now() - timedelta(seconds=90) # 90 seconds timeout
413
+
414
+ for worker_id, worker in workers.items():
415
+ if worker['last_heartbeat'] and worker['last_heartbeat'] < cutoff_time:
416
+ if worker['status'] not in ['offline', 'unknown']:
417
+ print(f"[Orchestrator] Worker {worker_id} marked offline (no heartbeat)")
418
+ worker['status'] = 'offline'
419
+ worker['consecutive_failures'] += 1
420
+
421
+ # Return job to queue if worker was processing
422
+ if worker['current_job']:
423
+ job_id = worker['current_job']
424
+ if job_id in jobs and jobs[job_id]['status'] == 'processing':
425
+ jobs[job_id]['status'] = 'queued'
426
+ jobs[job_id]['worker_id'] = None
427
+ job_queue.appendleft(job_id)
428
+ print(f"[Orchestrator] Returned job {job_id} to queue")
429
+
430
+ worker['current_job'] = None
431
+
432
+ def periodic_maintenance():
433
+ """Periodic cleanup and health checks"""
434
+ while True:
435
+ try:
436
+ time.sleep(30)
437
+ check_worker_health()
438
+ cleanup_old_jobs()
439
+ except Exception as e:
440
+ print(f"[Orchestrator] Maintenance error: {e}")
441
+
442
+ # Start maintenance thread
443
+ maintenance_thread = Thread(target=periodic_maintenance, daemon=True)
444
+ maintenance_thread.start()
445
+
446
+ if __name__ == '__main__':
447
+ port = int(os.getenv('PORT', 7860))
448
+ print(f"[Orchestrator] Starting on port {port}")
449
+ print(f"[Orchestrator] Workers configured: {len([w for w in WORKER_URLS if w.strip()])}")
450
+ app.run(host='0.0.0.0', port=port, threaded=True)