ex510 commited on
Commit
dbfa985
·
verified ·
1 Parent(s): b610781

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +32 -21
main.py CHANGED
@@ -11,6 +11,9 @@ import httpx
11
  import os
12
  import collections
13
  import logging
 
 
 
14
 
15
  # تهيئة التسجيل (Logging) بدلاً من print
16
  logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
@@ -22,10 +25,10 @@ tokenizer: Any = None # قد لا يكون نوعه واضحًا دائمًا ل
22
  model_id = 'Qwen/Qwen3-Embedding-0.6B'
23
 
24
  # تم تعديل max_workers إلى 1 لضمان معالجة تسلسلية
25
- executor = ThreadPoolExecutor(max_workers=1)
26
 
27
  # تم تحديث الحد الأقصى للتوكنز
28
- MAX_TOKENS = 32000
29
 
30
  # --- إضافة عناصر جديدة لإدارة قائمة الانتظار ---
31
  request_queue = collections.deque() # قائمة انتظار لتخزين الطلبات
@@ -61,7 +64,7 @@ async def lifespan(app: FastAPI):
61
  logger.critical(f"Failed to load model or tokenizer {model_id}: {e}", exc_info=True)
62
  # رفع الاستثناء لضمان عدم بدء التطبيق إذا فشل تحميل النموذج
63
  raise
64
-
65
  yield
66
  # (Optional) Clean up resources at shutdown
67
  logger.info("Cleaning up resources...")
@@ -105,11 +108,11 @@ def chunk_and_embed(text: str) -> List[float]:
105
  raise RuntimeError("Model or tokenizer not loaded or initialized correctly.")
106
 
107
  tokens = tokenizer.encode(text, add_special_tokens=False)
108
-
109
  # If text is short, embed directly
110
  if len(tokens) <= MAX_TOKENS:
111
  return model.encode(text, normalize_embeddings=True).tolist()
112
-
113
  # Split into chunks
114
  chunks = []
115
  overlap = 50 # Overlap tokens - يمكنك تعديلها حسب الحاجة
@@ -117,26 +120,26 @@ def chunk_and_embed(text: str) -> List[float]:
117
  while start < len(tokens):
118
  end = start + MAX_TOKENS
119
  chunk_tokens = tokens[start:end]
120
-
121
  # التأكد من أننا لا نحاول فك ترميز قائمة توكنز فارغة
122
  if not chunk_tokens:
123
  break
124
-
125
  chunk_text = tokenizer.decode(chunk_tokens, skip_special_tokens=True)
126
  chunks.append(chunk_text)
127
-
128
  if end >= len(tokens): # إذا وصلنا إلى نهاية النص
129
  break
130
  start = end - overlap # للبدء من الجزء المتداخل التالي
131
-
132
  # Embed all chunks
133
  # Note: If this list comprehension causes memory issues for very long texts,
134
  # consider processing chunks in smaller batches or using a generator
135
  chunk_embeddings = [model.encode(chunk, normalize_embeddings=True) for chunk in chunks]
136
-
137
  # Pool embeddings (mean)
138
  final_embedding = np.mean(chunk_embeddings, axis=0).tolist()
139
-
140
  return final_embedding
141
 
142
  # --- دالة المعالج الجديدة ---
@@ -158,9 +161,9 @@ async def process_queue():
158
  logger.info("Embedding queue is empty. Stopping processor.")
159
  is_processing_queue = False # إعادة تعيين المؤشر
160
  break # الخروج من الحلقة عند فراغ قائمة الانتظار
161
-
162
  # استخراج العنصر الأول من قائمة الانتظار
163
- request_item = request_queue.popleft()
164
  text_to_embed = request_item["text"]
165
  request_id_for_webhook = request_item.get("request_id", "N/A")
166
 
@@ -168,8 +171,8 @@ async def process_queue():
168
  try:
169
  # استخدام asyncio.to_thread هو الأسلوب المفضل لـ Python 3.9+
170
  # بما أن max_workers=1 في executor، سيتم ضمان التسلسل
171
- embedding = await asyncio.to_thread(chunk_and_embed, text_to_embed)
172
-
173
  # إعداد الـ payload وإرساله للويب هوك
174
  if webhook_url:
175
  payload = {
@@ -177,10 +180,10 @@ async def process_queue():
177
  "embedding": embedding,
178
  "request_id": request_id_for_webhook
179
  }
180
- await send_to_webhook(webhook_url, payload)
181
  else:
182
  logger.warning(f"WEBHOOK_URL not set. Embedding result for (ID: {request_id_for_webhook}) will not be sent to a webhook.")
183
-
184
  logger.info(f"Finished processing item (ID: {request_id_for_webhook}).")
185
 
186
  except Exception as e:
@@ -188,14 +191,14 @@ async def process_queue():
188
  # هنا يمكنك إضافة منطق لإعادة المحاولة أو تسجيل الخطأ بشكل دائم
189
 
190
  # السماح بالتأجيل قليلًا لمنع حظر الـ event loop بالكامل إذ�� كانت المعالجة سريعة جدًا
191
- await asyncio.sleep(0.01)
192
 
193
  except Exception as e:
194
  logger.critical(f"CRITICAL ERROR in process_queue: {e}", exc_info=True)
195
  finally:
196
  async with queue_lock:
197
  # التأكد من إعادة تعيين المؤشر حتى لو حدث خطأ
198
- is_processing_queue = False
199
 
200
  @app.post("/embed/text", status_code=status.HTTP_202_ACCEPTED) # تغيير حالة الاستجابة إلى 202 Accepted
201
  async def embed_text(request: TextRequest, background_tasks: BackgroundTasks):
@@ -214,7 +217,7 @@ async def embed_text(request: TextRequest, background_tasks: BackgroundTasks):
214
  is_processing_queue = True # تعيين المؤشر لمنع بدء معالجات متعددة
215
  background_tasks.add_task(process_queue)
216
  logger.info("Started background queue processor.")
217
-
218
  # إرجاع استجابة سريعة للعميل لإعلامه بأن الطلب تم استلامه ومعالجته لاحقًا
219
  return {
220
  "success": True,
@@ -222,5 +225,13 @@ async def embed_text(request: TextRequest, background_tasks: BackgroundTasks):
222
  "request_id": request.request_id # إرجاع الـ ID للعميل
223
  }
224
 
225
- if __name__ == "__main__":
 
226
  uvicorn.run(app, host="0.0.0.0", port=7860)
 
 
 
 
 
 
 
 
11
  import os
12
  import collections
13
  import logging
14
+ import nest_asyncio
15
+ import threading
16
+ import time
17
 
18
  # تهيئة التسجيل (Logging) بدلاً من print
19
  logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
 
25
  model_id = 'Qwen/Qwen3-Embedding-0.6B'
26
 
27
  # تم تعديل max_workers إلى 1 لضمان معالجة تسلسلية
28
+ executor = ThreadPoolExecutor(max_workers=1)
29
 
30
  # تم تحديث الحد الأقصى للتوكنز
31
+ MAX_TOKENS = 32000
32
 
33
  # --- إضافة عناصر جديدة لإدارة قائمة الانتظار ---
34
  request_queue = collections.deque() # قائمة انتظار لتخزين الطلبات
 
64
  logger.critical(f"Failed to load model or tokenizer {model_id}: {e}", exc_info=True)
65
  # رفع الاستثناء لضمان عدم بدء التطبيق إذا فشل تحميل النموذج
66
  raise
67
+
68
  yield
69
  # (Optional) Clean up resources at shutdown
70
  logger.info("Cleaning up resources...")
 
108
  raise RuntimeError("Model or tokenizer not loaded or initialized correctly.")
109
 
110
  tokens = tokenizer.encode(text, add_special_tokens=False)
111
+
112
  # If text is short, embed directly
113
  if len(tokens) <= MAX_TOKENS:
114
  return model.encode(text, normalize_embeddings=True).tolist()
115
+
116
  # Split into chunks
117
  chunks = []
118
  overlap = 50 # Overlap tokens - يمكنك تعديلها حسب الحاجة
 
120
  while start < len(tokens):
121
  end = start + MAX_TOKENS
122
  chunk_tokens = tokens[start:end]
123
+
124
  # التأكد من أننا لا نحاول فك ترميز قائمة توكنز فارغة
125
  if not chunk_tokens:
126
  break
127
+
128
  chunk_text = tokenizer.decode(chunk_tokens, skip_special_tokens=True)
129
  chunks.append(chunk_text)
130
+
131
  if end >= len(tokens): # إذا وصلنا إلى نهاية النص
132
  break
133
  start = end - overlap # للبدء من الجزء المتداخل التالي
134
+
135
  # Embed all chunks
136
  # Note: If this list comprehension causes memory issues for very long texts,
137
  # consider processing chunks in smaller batches or using a generator
138
  chunk_embeddings = [model.encode(chunk, normalize_embeddings=True) for chunk in chunks]
139
+
140
  # Pool embeddings (mean)
141
  final_embedding = np.mean(chunk_embeddings, axis=0).tolist()
142
+
143
  return final_embedding
144
 
145
  # --- دالة المعالج الجديدة ---
 
161
  logger.info("Embedding queue is empty. Stopping processor.")
162
  is_processing_queue = False # إعادة تعيين المؤشر
163
  break # الخروج من الحلقة عند فراغ قائمة الانتظار
164
+
165
  # استخراج العنصر الأول من قائمة الانتظار
166
+ request_item = request_queue.popleft()
167
  text_to_embed = request_item["text"]
168
  request_id_for_webhook = request_item.get("request_id", "N/A")
169
 
 
171
  try:
172
  # استخدام asyncio.to_thread هو الأسلوب المفضل لـ Python 3.9+
173
  # بما أن max_workers=1 في executor، سيتم ضمان التسلسل
174
+ embedding = await asyncio.to_thread(chunk_and_embed, text_to_embed)
175
+
176
  # إعداد الـ payload وإرساله للويب هوك
177
  if webhook_url:
178
  payload = {
 
180
  "embedding": embedding,
181
  "request_id": request_id_for_webhook
182
  }
183
+ await send_to_webhook(webhook_url, payload)
184
  else:
185
  logger.warning(f"WEBHOOK_URL not set. Embedding result for (ID: {request_id_for_webhook}) will not be sent to a webhook.")
186
+
187
  logger.info(f"Finished processing item (ID: {request_id_for_webhook}).")
188
 
189
  except Exception as e:
 
191
  # هنا يمكنك إضافة منطق لإعادة المحاولة أو تسجيل الخطأ بشكل دائم
192
 
193
  # السماح بالتأجيل قليلًا لمنع حظر الـ event loop بالكامل إذ�� كانت المعالجة سريعة جدًا
194
+ await asyncio.sleep(0.01)
195
 
196
  except Exception as e:
197
  logger.critical(f"CRITICAL ERROR in process_queue: {e}", exc_info=True)
198
  finally:
199
  async with queue_lock:
200
  # التأكد من إعادة تعيين المؤشر حتى لو حدث خطأ
201
+ is_processing_queue = False
202
 
203
  @app.post("/embed/text", status_code=status.HTTP_202_ACCEPTED) # تغيير حالة الاستجابة إلى 202 Accepted
204
  async def embed_text(request: TextRequest, background_tasks: BackgroundTasks):
 
217
  is_processing_queue = True # تعيين المؤشر لمنع بدء معالجات متعددة
218
  background_tasks.add_task(process_queue)
219
  logger.info("Started background queue processor.")
220
+
221
  # إرجاع استجابة سريعة للعميل لإعلامه بأن الطلب تم استلامه ومعالجته لاحقًا
222
  return {
223
  "success": True,
 
225
  "request_id": request.request_id # إرجاع الـ ID للعميل
226
  }
227
 
228
+ def run_uvicorn():
229
+ nest_asyncio.apply()
230
  uvicorn.run(app, host="0.0.0.0", port=7860)
231
+
232
+ # Start Uvicorn in a new thread
233
+ uvicorn_thread = threading.Thread(target=run_uvicorn)
234
+ uvicorn_thread.start()
235
+
236
+ # Optional: Add a small delay to allow Uvicorn to start up
237
+ time.sleep(1)