danicor commited on
Commit
a77bb67
·
verified ·
1 Parent(s): c5f0e16

Create app.py

Browse files
Files changed (1) hide show
  1. app.py +319 -0
app.py ADDED
@@ -0,0 +1,319 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from flask import Flask, request, jsonify
2
+ from datetime import datetime, timedelta
3
+ import requests
4
+ import uuid
5
+ import json
6
+ from threading import Lock
7
+ from collections import deque
8
+ import os
9
+
10
+ app = Flask(__name__)
11
+
12
+ # Configuration
13
+ WORKER_URLS = os.getenv('WORKER_URLS', '').split(',') # Comma-separated worker URLs
14
+ API_KEY = os.getenv('API_KEY', '') # Optional API key for security
15
+
16
+ # Data structures
17
+ workers = {} # {worker_id: {url, status, last_heartbeat, total_processed}}
18
+ job_queue = deque() # Queue of pending jobs
19
+ jobs = {} # {job_id: {status, result, created_at, worker_id}}
20
+ lock = Lock()
21
+
22
+ # Initialize workers from environment
23
+ for idx, url in enumerate(WORKER_URLS):
24
+ if url.strip():
25
+ worker_id = f"worker-{idx+1}"
26
+ workers[worker_id] = {
27
+ 'url': url.strip(),
28
+ 'status': 'unknown',
29
+ 'last_heartbeat': None,
30
+ 'total_processed': 0,
31
+ 'current_job': None
32
+ }
33
+
34
+ def verify_api_key():
35
+ """Verify API key if configured"""
36
+ if not API_KEY:
37
+ return True
38
+
39
+ auth_header = request.headers.get('Authorization', '')
40
+ if auth_header.startswith('Bearer '):
41
+ token = auth_header[7:]
42
+ return token == API_KEY
43
+ return False
44
+
45
+ @app.route('/health', methods=['GET'])
46
+ def health_check():
47
+ """Health check endpoint"""
48
+ return jsonify({
49
+ 'status': 'online',
50
+ 'timestamp': datetime.now().isoformat(),
51
+ 'workers': len(workers),
52
+ 'queue_size': len(job_queue),
53
+ 'active_jobs': len([j for j in jobs.values() if j['status'] == 'processing'])
54
+ })
55
+
56
+ @app.route('/submit', methods=['POST'])
57
+ def submit_job():
58
+ """Submit a new job"""
59
+ if not verify_api_key():
60
+ return jsonify({'error': 'Unauthorized'}), 401
61
+
62
+ data = request.json
63
+
64
+ if not data or 'unique_id' not in data:
65
+ return jsonify({'error': 'Missing unique_id'}), 400
66
+
67
+ if 'service_type' not in data:
68
+ return jsonify({'error': 'Missing service_type'}), 400
69
+
70
+ if 'image_data' not in data:
71
+ return jsonify({'error': 'Missing image_data'}), 400
72
+
73
+ unique_id = data['unique_id']
74
+
75
+ with lock:
76
+ # Check if job already exists
77
+ if unique_id in jobs:
78
+ return jsonify({
79
+ 'message': 'Job already exists',
80
+ 'unique_id': unique_id,
81
+ 'status': jobs[unique_id]['status']
82
+ }), 200
83
+
84
+ # Create job
85
+ jobs[unique_id] = {
86
+ 'status': 'queued',
87
+ 'service_type': data['service_type'],
88
+ 'image_data': data['image_data'],
89
+ 'result': None,
90
+ 'created_at': datetime.now(),
91
+ 'worker_id': None,
92
+ 'error': None
93
+ }
94
+
95
+ # Add to queue
96
+ job_queue.append(unique_id)
97
+
98
+ # Try to assign to worker immediately
99
+ assign_jobs_to_workers()
100
+
101
+ return jsonify({
102
+ 'message': 'Job submitted successfully',
103
+ 'unique_id': unique_id,
104
+ 'queue_position': get_queue_position(unique_id)
105
+ }), 202
106
+
107
+ @app.route('/status/<unique_id>', methods=['GET'])
108
+ def check_status(unique_id):
109
+ """Check job status"""
110
+ if not verify_api_key():
111
+ return jsonify({'error': 'Unauthorized'}), 401
112
+
113
+ with lock:
114
+ if unique_id not in jobs:
115
+ return jsonify({'error': 'Job not found'}), 404
116
+
117
+ job = jobs[unique_id]
118
+
119
+ response = {
120
+ 'unique_id': unique_id,
121
+ 'status': job['status'],
122
+ 'created_at': job['created_at'].isoformat()
123
+ }
124
+
125
+ if job['status'] == 'completed':
126
+ response['data'] = job['result']
127
+ elif job['status'] == 'failed':
128
+ response['error'] = job.get('error', 'Unknown error')
129
+ elif job['status'] == 'queued':
130
+ response['queue_position'] = get_queue_position(unique_id)
131
+ elif job['status'] == 'processing':
132
+ response['worker_id'] = job['worker_id']
133
+
134
+ return jsonify(response), 200
135
+
136
+ @app.route('/workers/status', methods=['GET'])
137
+ def workers_status():
138
+ """Get status of all workers"""
139
+ if not verify_api_key():
140
+ return jsonify({'error': 'Unauthorized'}), 401
141
+
142
+ with lock:
143
+ worker_list = []
144
+ for worker_id, worker in workers.items():
145
+ worker_list.append({
146
+ 'id': worker_id,
147
+ 'status': worker['status'],
148
+ 'total_processed': worker['total_processed'],
149
+ 'current_job': worker['current_job'],
150
+ 'last_heartbeat': worker['last_heartbeat'].isoformat() if worker['last_heartbeat'] else None
151
+ })
152
+
153
+ return jsonify({
154
+ 'workers': worker_list,
155
+ 'total_workers': len(workers),
156
+ 'active_workers': len([w for w in workers.values() if w['status'] == 'idle' or w['status'] == 'busy']),
157
+ 'queue_size': len(job_queue)
158
+ }), 200
159
+
160
+ @app.route('/worker/heartbeat', methods=['POST'])
161
+ def worker_heartbeat():
162
+ """Worker heartbeat endpoint"""
163
+ data = request.json
164
+
165
+ if not data or 'worker_id' not in data:
166
+ return jsonify({'error': 'Missing worker_id'}), 400
167
+
168
+ worker_id = data['worker_id']
169
+
170
+ with lock:
171
+ if worker_id not in workers:
172
+ # Register new worker
173
+ workers[worker_id] = {
174
+ 'url': data.get('url', ''),
175
+ 'status': 'idle',
176
+ 'last_heartbeat': datetime.now(),
177
+ 'total_processed': 0,
178
+ 'current_job': None
179
+ }
180
+ else:
181
+ # Update existing worker
182
+ workers[worker_id]['status'] = data.get('status', 'idle')
183
+ workers[worker_id]['last_heartbeat'] = datetime.now()
184
+
185
+ if data.get('status') == 'idle' and workers[worker_id]['current_job']:
186
+ # Worker finished job
187
+ workers[worker_id]['current_job'] = None
188
+
189
+ # Try to assign jobs
190
+ assign_jobs_to_workers()
191
+
192
+ return jsonify({'message': 'Heartbeat received'}), 200
193
+
194
+ @app.route('/worker/result', methods=['POST'])
195
+ def worker_result():
196
+ """Worker submits job result"""
197
+ data = request.json
198
+
199
+ if not data or 'unique_id' not in data:
200
+ return jsonify({'error': 'Missing unique_id'}), 400
201
+
202
+ unique_id = data['unique_id']
203
+ worker_id = data.get('worker_id')
204
+
205
+ with lock:
206
+ if unique_id not in jobs:
207
+ return jsonify({'error': 'Job not found'}), 404
208
+
209
+ job = jobs[unique_id]
210
+
211
+ if data.get('status') == 'completed':
212
+ job['status'] = 'completed'
213
+ job['result'] = data.get('result')
214
+ else:
215
+ job['status'] = 'failed'
216
+ job['error'] = data.get('error', 'Unknown error')
217
+
218
+ # Update worker
219
+ if worker_id and worker_id in workers:
220
+ workers[worker_id]['status'] = 'idle'
221
+ workers[worker_id]['current_job'] = None
222
+ workers[worker_id]['total_processed'] += 1
223
+
224
+ # Try to assign next job
225
+ assign_jobs_to_workers()
226
+
227
+ return jsonify({'message': 'Result received'}), 200
228
+
229
+ def assign_jobs_to_workers():
230
+ """Assign pending jobs to idle workers"""
231
+ with lock:
232
+ while job_queue:
233
+ # Find idle worker
234
+ idle_worker = None
235
+ for worker_id, worker in workers.items():
236
+ if worker['status'] == 'idle':
237
+ idle_worker = worker_id
238
+ break
239
+
240
+ if not idle_worker:
241
+ break # No idle workers available
242
+
243
+ # Get job from queue
244
+ unique_id = job_queue.popleft()
245
+
246
+ if unique_id not in jobs:
247
+ continue
248
+
249
+ job = jobs[unique_id]
250
+
251
+ # Assign job to worker
252
+ job['status'] = 'processing'
253
+ job['worker_id'] = idle_worker
254
+
255
+ workers[idle_worker]['status'] = 'busy'
256
+ workers[idle_worker]['current_job'] = unique_id
257
+
258
+ # Send job to worker
259
+ try:
260
+ worker_url = workers[idle_worker]['url']
261
+ response = requests.post(
262
+ f"{worker_url}/process",
263
+ json={
264
+ 'unique_id': unique_id,
265
+ 'service_type': job['service_type'],
266
+ 'image_data': job['image_data']
267
+ },
268
+ timeout=5
269
+ )
270
+
271
+ if response.status_code != 200:
272
+ # Worker failed to accept job
273
+ job['status'] = 'queued'
274
+ job['worker_id'] = None
275
+ job_queue.append(unique_id)
276
+ workers[idle_worker]['status'] = 'idle'
277
+ workers[idle_worker]['current_job'] = None
278
+
279
+ except Exception as e:
280
+ # Worker unreachable
281
+ print(f"Error sending job to worker {idle_worker}: {str(e)}")
282
+ job['status'] = 'queued'
283
+ job['worker_id'] = None
284
+ job_queue.append(unique_id)
285
+ workers[idle_worker]['status'] = 'offline'
286
+ workers[idle_worker]['current_job'] = None
287
+
288
+ def get_queue_position(unique_id):
289
+ """Get position of job in queue"""
290
+ try:
291
+ return list(job_queue).index(unique_id) + 1
292
+ except ValueError:
293
+ return 0
294
+
295
+ def cleanup_old_jobs():
296
+ """Clean up jobs older than 24 hours"""
297
+ with lock:
298
+ cutoff_time = datetime.now() - timedelta(hours=24)
299
+ jobs_to_delete = []
300
+
301
+ for unique_id, job in jobs.items():
302
+ if job['created_at'] < cutoff_time:
303
+ jobs_to_delete.append(unique_id)
304
+
305
+ for unique_id in jobs_to_delete:
306
+ del jobs[unique_id]
307
+
308
+ # Cleanup task (run periodically)
309
+ from threading import Timer
310
+
311
+ def periodic_cleanup():
312
+ cleanup_old_jobs()
313
+ Timer(3600, periodic_cleanup).start() # Run every hour
314
+
315
+ periodic_cleanup()
316
+
317
+ if __name__ == '__main__':
318
+ port = int(os.getenv('PORT', 7860))
319
+ app.run(host='0.0.0.0', port=port)