danicor commited on
Commit
8cfed6d
·
verified ·
1 Parent(s): 42c099e

Create app.py

Browse files
Files changed (1) hide show
  1. app.py +348 -0
app.py ADDED
@@ -0,0 +1,348 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from flask import Flask, request, jsonify
2
+ from datetime import datetime, timedelta
3
+ import requests
4
+ import uuid
5
+ import json
6
+ from threading import Lock
7
+ from collections import deque
8
+ import os
9
+
10
+ app = Flask(__name__)
11
+
12
+ # Configuration
13
+ WORKER_URLS = os.getenv('WORKER_URLS', '').split(',')
14
+ API_KEY = os.getenv('API_KEY', '')
15
+
16
+ # Data structures
17
+ workers = {}
18
+ job_queue = deque()
19
+ jobs = {}
20
+ lock = Lock()
21
+
22
+ # Initialize workers from environment
23
+ for idx, url in enumerate(WORKER_URLS):
24
+ if url.strip():
25
+ worker_id = f"worker-{idx+1}"
26
+ workers[worker_id] = {
27
+ 'url': url.strip(),
28
+ 'status': 'unknown',
29
+ 'last_heartbeat': None,
30
+ 'total_processed': 0,
31
+ 'current_job': None
32
+ }
33
+
34
+ def verify_api_key():
35
+ """Verify API key if configured"""
36
+ if not API_KEY:
37
+ return True
38
+
39
+ auth_header = request.headers.get('Authorization', '')
40
+ if auth_header.startswith('Bearer '):
41
+ token = auth_header[7:]
42
+ return token == API_KEY
43
+ return False
44
+
45
+ @app.route('/health', methods=['GET'])
46
+ def health_check():
47
+ """Health check endpoint"""
48
+ return jsonify({
49
+ 'status': 'online',
50
+ 'timestamp': datetime.now().isoformat(),
51
+ 'workers': len(workers),
52
+ 'queue_size': len(job_queue),
53
+ 'active_jobs': len([j for j in jobs.values() if j['status'] == 'processing'])
54
+ })
55
+
56
+ @app.route('/submit', methods=['POST'])
57
+ def submit_job():
58
+ """Submit a new job"""
59
+ if not verify_api_key():
60
+ return jsonify({'error': 'Unauthorized'}), 401
61
+
62
+ data = request.json
63
+
64
+ if not data or 'unique_id' not in data:
65
+ return jsonify({'error': 'Missing unique_id'}), 400
66
+
67
+ if 'service_type' not in data:
68
+ return jsonify({'error': 'Missing service_type'}), 400
69
+
70
+ if 'image_data' not in data:
71
+ return jsonify({'error': 'Missing image_data'}), 400
72
+
73
+ unique_id = data['unique_id']
74
+
75
+ with lock:
76
+ if unique_id in jobs:
77
+ return jsonify({
78
+ 'message': 'Job already exists',
79
+ 'unique_id': unique_id,
80
+ 'status': jobs[unique_id]['status']
81
+ }), 200
82
+
83
+ jobs[unique_id] = {
84
+ 'status': 'queued',
85
+ 'service_type': data['service_type'],
86
+ 'image_data': data['image_data'],
87
+ 'result': None,
88
+ 'created_at': datetime.now(),
89
+ 'worker_id': None,
90
+ 'error': None
91
+ }
92
+
93
+ job_queue.append(unique_id)
94
+
95
+ assign_jobs_to_workers()
96
+
97
+ return jsonify({
98
+ 'message': 'Job submitted successfully',
99
+ 'unique_id': unique_id,
100
+ 'queue_position': get_queue_position(unique_id)
101
+ }), 202
102
+
103
+ @app.route('/status/<unique_id>', methods=['GET'])
104
+ def check_status(unique_id):
105
+ """Check job status"""
106
+ if not verify_api_key():
107
+ return jsonify({'error': 'Unauthorized'}), 401
108
+
109
+ with lock:
110
+ if unique_id not in jobs:
111
+ return jsonify({'error': 'Job not found'}), 404
112
+
113
+ job = jobs[unique_id]
114
+
115
+ response = {
116
+ 'unique_id': unique_id,
117
+ 'status': job['status'],
118
+ 'created_at': job['created_at'].isoformat()
119
+ }
120
+
121
+ if job['status'] == 'completed':
122
+ response['data'] = job['result']
123
+ response['service_type'] = job['service_type']
124
+
125
+ elif job['status'] == 'failed':
126
+ # ✅ FIX: Return complete error data structure
127
+ error_data = job.get('result', {}) if isinstance(job.get('result'), dict) else {}
128
+
129
+ if 'error' in error_data:
130
+ response['error'] = error_data['error']
131
+ response['data'] = error_data
132
+ elif job.get('error'):
133
+ response['error'] = job['error']
134
+ response['data'] = {'error': job['error']}
135
+ else:
136
+ response['error'] = 'Unknown error'
137
+ response['data'] = {'error': 'Unknown error'}
138
+
139
+ print(f"[Orchestrator] Failed job {unique_id} - Error: {response['error']}")
140
+
141
+ elif job['status'] == 'queued':
142
+ response['queue_position'] = get_queue_position(unique_id)
143
+
144
+ elif job['status'] == 'processing':
145
+ response['worker_id'] = job['worker_id']
146
+
147
+ return jsonify(response), 200
148
+
149
+ @app.route('/workers/status', methods=['GET'])
150
+ def workers_status():
151
+ """Get status of all workers"""
152
+ if not verify_api_key():
153
+ return jsonify({'error': 'Unauthorized'}), 401
154
+
155
+ with lock:
156
+ worker_list = []
157
+ for worker_id, worker in workers.items():
158
+ worker_list.append({
159
+ 'id': worker_id,
160
+ 'status': worker['status'],
161
+ 'total_processed': worker['total_processed'],
162
+ 'current_job': worker['current_job'],
163
+ 'last_heartbeat': worker['last_heartbeat'].isoformat() if worker['last_heartbeat'] else None
164
+ })
165
+
166
+ return jsonify({
167
+ 'workers': worker_list,
168
+ 'total_workers': len(workers),
169
+ 'active_workers': len([w for w in workers.values() if w['status'] == 'idle' or w['status'] == 'busy']),
170
+ 'queue_size': len(job_queue)
171
+ }), 200
172
+
173
+ @app.route('/worker/heartbeat', methods=['POST'])
174
+ def worker_heartbeat():
175
+ """Worker heartbeat endpoint"""
176
+ data = request.json
177
+
178
+ if not data or 'worker_id' not in data:
179
+ return jsonify({'error': 'Missing worker_id'}), 400
180
+
181
+ worker_id = data['worker_id']
182
+
183
+ with lock:
184
+ if worker_id not in workers:
185
+ workers[worker_id] = {
186
+ 'url': data.get('url', ''),
187
+ 'status': 'idle',
188
+ 'last_heartbeat': datetime.now(),
189
+ 'total_processed': 0,
190
+ 'current_job': None
191
+ }
192
+ else:
193
+ workers[worker_id]['status'] = data.get('status', 'idle')
194
+ workers[worker_id]['last_heartbeat'] = datetime.now()
195
+
196
+ if data.get('status') == 'idle' and workers[worker_id]['current_job']:
197
+ workers[worker_id]['current_job'] = None
198
+
199
+ assign_jobs_to_workers()
200
+
201
+ return jsonify({'message': 'Heartbeat received'}), 200
202
+
203
+ @app.route('/worker/result', methods=['POST'])
204
+ def worker_result():
205
+ """Worker submits job result"""
206
+ data = request.json
207
+
208
+ if not data or 'unique_id' not in data:
209
+ return jsonify({'error': 'Missing unique_id'}), 400
210
+
211
+ unique_id = data['unique_id']
212
+ worker_id = data.get('worker_id')
213
+ result_status = data.get('status', 'failed')
214
+
215
+ print(f"[Orchestrator] Received result for {unique_id} - Status: {result_status}")
216
+
217
+ with lock:
218
+ if unique_id not in jobs:
219
+ print(f"[Orchestrator] Job {unique_id} not found")
220
+ return jsonify({'error': 'Job not found'}), 404
221
+
222
+ job = jobs[unique_id]
223
+
224
+ # ✅ FIX: Store complete result structure
225
+ if result_status == 'completed':
226
+ job['status'] = 'completed'
227
+ job['result'] = data.get('result', {})
228
+ print(f"[Orchestrator] Job {unique_id} completed successfully")
229
+
230
+ else: # failed
231
+ job['status'] = 'failed'
232
+
233
+ # Store the complete error structure
234
+ result_data = data.get('result', {})
235
+
236
+ if isinstance(result_data, dict):
237
+ job['result'] = result_data
238
+ job['error'] = result_data.get('error', 'Unknown error')
239
+ elif isinstance(result_data, str):
240
+ job['result'] = {'error': result_data}
241
+ job['error'] = result_data
242
+ else:
243
+ error_msg = data.get('error', 'Unknown error')
244
+ job['result'] = {'error': error_msg}
245
+ job['error'] = error_msg
246
+
247
+ print(f"[Orchestrator] Job {unique_id} failed - Error: {job['error']}")
248
+
249
+ # Update worker
250
+ if worker_id and worker_id in workers:
251
+ workers[worker_id]['status'] = 'idle'
252
+ workers[worker_id]['current_job'] = None
253
+ workers[worker_id]['total_processed'] += 1
254
+
255
+ assign_jobs_to_workers()
256
+
257
+ return jsonify({'message': 'Result received'}), 200
258
+
259
+ def assign_jobs_to_workers():
260
+ """Assign pending jobs to idle workers"""
261
+ with lock:
262
+ while job_queue:
263
+ idle_worker = None
264
+ for worker_id, worker in workers.items():
265
+ if worker['status'] == 'idle':
266
+ idle_worker = worker_id
267
+ break
268
+
269
+ if not idle_worker:
270
+ break
271
+
272
+ unique_id = job_queue.popleft()
273
+
274
+ if unique_id not in jobs:
275
+ continue
276
+
277
+ job = jobs[unique_id]
278
+
279
+ job['status'] = 'processing'
280
+ job['worker_id'] = idle_worker
281
+
282
+ workers[idle_worker]['status'] = 'busy'
283
+ workers[idle_worker]['current_job'] = unique_id
284
+
285
+ try:
286
+ worker_url = workers[idle_worker]['url']
287
+ print(f"[Orchestrator] Assigning job {unique_id} to {idle_worker}")
288
+
289
+ response = requests.post(
290
+ f"{worker_url}/process",
291
+ json={
292
+ 'unique_id': unique_id,
293
+ 'service_type': job['service_type'],
294
+ 'image_data': job['image_data']
295
+ },
296
+ timeout=5
297
+ )
298
+
299
+ if response.status_code != 200:
300
+ print(f"[Orchestrator] Worker {idle_worker} rejected job {unique_id}")
301
+ job['status'] = 'queued'
302
+ job['worker_id'] = None
303
+ job_queue.append(unique_id)
304
+ workers[idle_worker]['status'] = 'idle'
305
+ workers[idle_worker]['current_job'] = None
306
+
307
+ except Exception as e:
308
+ print(f"[Orchestrator] Error sending job to worker {idle_worker}: {str(e)}")
309
+ job['status'] = 'queued'
310
+ job['worker_id'] = None
311
+ job_queue.append(unique_id)
312
+ workers[idle_worker]['status'] = 'offline'
313
+ workers[idle_worker]['current_job'] = None
314
+
315
+ def get_queue_position(unique_id):
316
+ """Get position of job in queue"""
317
+ try:
318
+ return list(job_queue).index(unique_id) + 1
319
+ except ValueError:
320
+ return 0
321
+
322
+ def cleanup_old_jobs():
323
+ """Clean up jobs older than 24 hours"""
324
+ with lock:
325
+ cutoff_time = datetime.now() - timedelta(hours=24)
326
+ jobs_to_delete = []
327
+
328
+ for unique_id, job in jobs.items():
329
+ if job['created_at'] < cutoff_time:
330
+ jobs_to_delete.append(unique_id)
331
+
332
+ for unique_id in jobs_to_delete:
333
+ del jobs[unique_id]
334
+ print(f"[Orchestrator] Cleaned up old job: {unique_id}")
335
+
336
+ from threading import Timer
337
+
338
+ def periodic_cleanup():
339
+ cleanup_old_jobs()
340
+ Timer(3600, periodic_cleanup).start()
341
+
342
+ periodic_cleanup()
343
+
344
+ if __name__ == '__main__':
345
+ port = int(os.getenv('PORT', 7860))
346
+ print(f"[Orchestrator] Starting on port {port}")
347
+ print(f"[Orchestrator] Workers configured: {len([w for w in WORKER_URLS if w.strip()])}")
348
+ app.run(host='0.0.0.0', port=port)