ex510 commited on
Commit
b610781
·
verified ·
1 Parent(s): 2f03c22

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +153 -46
main.py CHANGED
@@ -1,35 +1,74 @@
1
- from fastapi import FastAPI, HTTPException, BackgroundTasks
2
  from pydantic import BaseModel, Field
3
  from sentence_transformers import SentenceTransformer
4
  import uvicorn
5
  import asyncio
6
  from concurrent.futures import ThreadPoolExecutor
7
- from typing import List
8
  import numpy as np
9
  from contextlib import asynccontextmanager
10
  import httpx
11
  import os
 
 
 
 
 
 
12
 
13
  # Globals
14
- model = None
15
- tokenizer = None
16
  model_id = 'Qwen/Qwen3-Embedding-0.6B'
17
- executor = ThreadPoolExecutor(max_workers=4)
18
- MAX_TOKENS = 32000
 
 
 
 
 
 
 
 
 
 
19
 
20
  @asynccontextmanager
21
  async def lifespan(app: FastAPI):
22
  # Load the model and tokenizer at startup
23
  global model, tokenizer
24
- print(f"Loading model: {model_id}...")
25
- model = SentenceTransformer(model_id)
26
- tokenizer = model.tokenizer
27
- print("Model loaded successfully")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
28
  yield
29
  # (Optional) Clean up resources at shutdown
30
- print("Cleaning up resources...")
31
  model = None
32
  tokenizer = None
 
 
33
 
34
  app = FastAPI(
35
  title="Text Embedding API (Qwen/Qwen3-Embedding-0.6B)",
@@ -38,20 +77,23 @@ app = FastAPI(
38
 
39
  class TextRequest(BaseModel):
40
  text: str = Field(..., min_length=1, description="Text to embed")
 
41
  request_id: str | None = Field(None, description="Optional unique identifier for the request")
42
 
43
-
44
-
45
-
46
- async def send_to_webhook(url: str, data: dict):
47
  """Sends data to a webhook URL asynchronously."""
 
48
  try:
49
  async with httpx.AsyncClient() as client:
50
  response = await client.post(url, json=data)
51
  response.raise_for_status() # Raise an exception for bad status codes (4xx or 5xx)
52
- print(f"Successfully sent data to webhook: {url}")
53
  except httpx.RequestError as e:
54
- print(f"Error sending data to webhook {url}: {e}")
 
 
 
 
55
 
56
  @app.get("/")
57
  def home():
@@ -59,6 +101,9 @@ def home():
59
 
60
  def chunk_and_embed(text: str) -> List[float]:
61
  """Split text into chunks if too long, then pool embeddings"""
 
 
 
62
  tokens = tokenizer.encode(text, add_special_tokens=False)
63
 
64
  # If text is short, embed directly
@@ -67,19 +112,26 @@ def chunk_and_embed(text: str) -> List[float]:
67
 
68
  # Split into chunks
69
  chunks = []
70
- overlap = 50
71
  start = 0
72
  while start < len(tokens):
73
  end = start + MAX_TOKENS
74
  chunk_tokens = tokens[start:end]
 
 
 
 
 
75
  chunk_text = tokenizer.decode(chunk_tokens, skip_special_tokens=True)
76
  chunks.append(chunk_text)
77
 
78
- if end >= len(tokens):
79
  break
80
- start = end - overlap
81
 
82
  # Embed all chunks
 
 
83
  chunk_embeddings = [model.encode(chunk, normalize_embeddings=True) for chunk in chunks]
84
 
85
  # Pool embeddings (mean)
@@ -87,33 +139,88 @@ def chunk_and_embed(text: str) -> List[float]:
87
 
88
  return final_embedding
89
 
90
- @app.post("/embed/text")
91
- async def embed_text(request: TextRequest, background_tasks: BackgroundTasks):
 
 
 
 
 
 
 
 
 
 
92
  try:
93
- loop = asyncio.get_event_loop()
94
- embedding = await loop.run_in_executor(
95
- executor,
96
- lambda: chunk_and_embed(request.text)
97
- )
98
-
99
- # Check for webhook URL and add the background task
100
- webhook_url = os.environ.get("WEBHOOK_URL")
101
- if webhook_url:
102
- payload = {
103
- "text": request.text,
104
- "embedding": embedding,
105
- "request_id": request.request_id
106
- }
107
- background_tasks.add_task(send_to_webhook, webhook_url, payload)
108
-
109
- return {
110
- "success": True,
111
- "model": model_id,
112
- "dimension": len(embedding),
113
- "embedding": embedding
114
- }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
115
  except Exception as e:
116
- raise HTTPException(status_code=500, detail=str(e))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
117
 
118
  if __name__ == "__main__":
119
- uvicorn.run(app, host="0.0.0.0", port=7860)
 
1
+ from fastapi import FastAPI, HTTPException, BackgroundTasks, status
2
  from pydantic import BaseModel, Field
3
  from sentence_transformers import SentenceTransformer
4
  import uvicorn
5
  import asyncio
6
  from concurrent.futures import ThreadPoolExecutor
7
+ from typing import List, Dict, Any, Optional
8
  import numpy as np
9
  from contextlib import asynccontextmanager
10
  import httpx
11
  import os
12
+ import collections
13
+ import logging
14
+
15
+ # تهيئة التسجيل (Logging) بدلاً من print
16
+ logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
17
+ logger = logging.getLogger(__name__)
18
 
19
  # Globals
20
+ model: Optional[SentenceTransformer] = None
21
+ tokenizer: Any = None # قد لا يكون نوعه واضحًا دائمًا لجميع النماذج
22
  model_id = 'Qwen/Qwen3-Embedding-0.6B'
23
+
24
+ # تم تعديل max_workers إلى 1 لضمان معالجة تسلسلية
25
+ executor = ThreadPoolExecutor(max_workers=1)
26
+
27
+ # تم تحديث الحد الأقصى للتوكنز
28
+ MAX_TOKENS = 32000
29
+
30
+ # --- إضافة عناصر جديدة لإدارة قائمة الانتظار ---
31
+ request_queue = collections.deque() # قائمة انتظار لتخزين الطلبات
32
+ queue_lock = asyncio.Lock() # قفل لضمان الوصول الآمن لقائمة الانتظار
33
+ is_processing_queue = False # مؤشر لمعرفة ما إذا كان المعالج يعمل حاليًا
34
+ # --------------------------------------------------
35
 
36
  @asynccontextmanager
37
  async def lifespan(app: FastAPI):
38
  # Load the model and tokenizer at startup
39
  global model, tokenizer
40
+ logger.info(f"Loading model: {model_id}...")
41
+ try:
42
+ model = SentenceTransformer(model_id)
43
+ # محاولة الوصول إلى الـ tokenizer
44
+ if hasattr(model, 'tokenizer') and model.tokenizer is not None:
45
+ tokenizer = model.tokenizer
46
+ else:
47
+ # إذا لم يكن متاحًا مباشرة، حاول تحميله بشكل منفصل باستخدام Hugging Face transformers
48
+ try:
49
+ from transformers import AutoTokenizer
50
+ tokenizer = AutoTokenizer.from_pretrained(model_id)
51
+ logger.warning(f"Model {model_id} did not have a direct 'tokenizer' attribute. Loaded separately using AutoTokenizer.")
52
+ except ImportError:
53
+ logger.error("transformers library not found. Could not load tokenizer separately.")
54
+ raise # يجب أن يكون tokenizer موجودًا
55
+ except Exception as e:
56
+ logger.error(f"Failed to load tokenizer separately for {model_id}: {e}", exc_info=True)
57
+ raise
58
+
59
+ logger.info("Model loaded successfully")
60
+ except Exception as e:
61
+ logger.critical(f"Failed to load model or tokenizer {model_id}: {e}", exc_info=True)
62
+ # رفع الاستثناء لضمان عدم بدء التطبيق إذا فشل تحميل النموذج
63
+ raise
64
+
65
  yield
66
  # (Optional) Clean up resources at shutdown
67
+ logger.info("Cleaning up resources...")
68
  model = None
69
  tokenizer = None
70
+ executor.shutdown(wait=True) # إغلاق الـ executor بشكل صحيح
71
+ logger.info("Resources cleaned up.")
72
 
73
  app = FastAPI(
74
  title="Text Embedding API (Qwen/Qwen3-Embedding-0.6B)",
 
77
 
78
  class TextRequest(BaseModel):
79
  text: str = Field(..., min_length=1, description="Text to embed")
80
+ # إضافة حقل request_id اختياري
81
  request_id: str | None = Field(None, description="Optional unique identifier for the request")
82
 
83
+ async def send_to_webhook(url: str, data: Dict[str, Any]):
 
 
 
84
  """Sends data to a webhook URL asynchronously."""
85
+ request_id = data.get("request_id", "N/A")
86
  try:
87
  async with httpx.AsyncClient() as client:
88
  response = await client.post(url, json=data)
89
  response.raise_for_status() # Raise an exception for bad status codes (4xx or 5xx)
90
+ logger.info(f"Successfully sent data to webhook (ID: {request_id}): {url}")
91
  except httpx.RequestError as e:
92
+ logger.error(f"Error sending data to webhook (ID: {request_id}) {url}: {e}", exc_info=True)
93
+ except httpx.HTTPStatusError as e:
94
+ logger.error(f"Webhook HTTP error (ID: {request_id}) {url} - Status: {e.response.status_code}, Response: {e.response.text}", exc_info=True)
95
+ except Exception as e:
96
+ logger.error(f"An unexpected error occurred in send_to_webhook (ID: {request_id}): {e}", exc_info=True)
97
 
98
  @app.get("/")
99
  def home():
 
101
 
102
  def chunk_and_embed(text: str) -> List[float]:
103
  """Split text into chunks if too long, then pool embeddings"""
104
+ if not tokenizer or not model:
105
+ raise RuntimeError("Model or tokenizer not loaded or initialized correctly.")
106
+
107
  tokens = tokenizer.encode(text, add_special_tokens=False)
108
 
109
  # If text is short, embed directly
 
112
 
113
  # Split into chunks
114
  chunks = []
115
+ overlap = 50 # Overlap tokens - يمكنك تعديلها حسب الحاجة
116
  start = 0
117
  while start < len(tokens):
118
  end = start + MAX_TOKENS
119
  chunk_tokens = tokens[start:end]
120
+
121
+ # التأكد من أننا لا نحاول فك ترميز قائمة توكنز فارغة
122
+ if not chunk_tokens:
123
+ break
124
+
125
  chunk_text = tokenizer.decode(chunk_tokens, skip_special_tokens=True)
126
  chunks.append(chunk_text)
127
 
128
+ if end >= len(tokens): # إذا وصلنا إلى نهاية النص
129
  break
130
+ start = end - overlap # للبدء من الجزء المتداخل التالي
131
 
132
  # Embed all chunks
133
+ # Note: If this list comprehension causes memory issues for very long texts,
134
+ # consider processing chunks in smaller batches or using a generator
135
  chunk_embeddings = [model.encode(chunk, normalize_embeddings=True) for chunk in chunks]
136
 
137
  # Pool embeddings (mean)
 
139
 
140
  return final_embedding
141
 
142
+ # --- دالة المعالج الجديدة ---
143
+ async def process_queue():
144
+ global is_processing_queue
145
+ webhook_url = os.environ.get("WEBHOOK_URL")
146
+
147
+ async with queue_lock:
148
+ # إذا لم تكن هناك عناصر في قائمة الانتظار، أو كان المعالج يعمل بالفعل، لا تفعل شيئًا
149
+ if not request_queue or is_processing_queue:
150
+ return
151
+ is_processing_queue = True # تعيين المؤشر إلى True للإشارة إلى أن المعالج يعمل
152
+
153
+ logger.info("Starting to process embedding queue (single worker mode)...")
154
  try:
155
+ while True:
156
+ async with queue_lock:
157
+ if not request_queue:
158
+ logger.info("Embedding queue is empty. Stopping processor.")
159
+ is_processing_queue = False # إعادة تعيين المؤشر
160
+ break # الخروج من الحلقة عند فراغ قائمة الانتظار
161
+
162
+ # استخراج العنصر الأول من قائمة الانتظار
163
+ request_item = request_queue.popleft()
164
+ text_to_embed = request_item["text"]
165
+ request_id_for_webhook = request_item.get("request_id", "N/A")
166
+
167
+ logger.info(f"Processing item from queue (ID: {request_id_for_webhook})...")
168
+ try:
169
+ # استخدام asyncio.to_thread هو الأسلوب المفضل لـ Python 3.9+
170
+ # بما أن max_workers=1 في executor، سيتم ضمان التسلسل
171
+ embedding = await asyncio.to_thread(chunk_and_embed, text_to_embed)
172
+
173
+ # إعداد الـ payload وإرساله للويب هوك
174
+ if webhook_url:
175
+ payload = {
176
+ "text": text_to_embed,
177
+ "embedding": embedding,
178
+ "request_id": request_id_for_webhook
179
+ }
180
+ await send_to_webhook(webhook_url, payload)
181
+ else:
182
+ logger.warning(f"WEBHOOK_URL not set. Embedding result for (ID: {request_id_for_webhook}) will not be sent to a webhook.")
183
+
184
+ logger.info(f"Finished processing item (ID: {request_id_for_webhook}).")
185
+
186
+ except Exception as e:
187
+ logger.error(f"Error processing embedding for item (ID: {request_id_for_webhook}) in queue: {e}", exc_info=True)
188
+ # هنا يمكنك إضافة منطق لإعادة المحاولة أو تسجيل الخطأ بشكل دائم
189
+
190
+ # السماح بالتأجيل قليلًا لمنع حظر الـ event loop بالكامل إذا كانت المعالجة سريعة جدًا
191
+ await asyncio.sleep(0.01)
192
+
193
  except Exception as e:
194
+ logger.critical(f"CRITICAL ERROR in process_queue: {e}", exc_info=True)
195
+ finally:
196
+ async with queue_lock:
197
+ # التأكد من إعادة تعيين المؤشر حتى لو حدث خطأ
198
+ is_processing_queue = False
199
+
200
+ @app.post("/embed/text", status_code=status.HTTP_202_ACCEPTED) # تغيير حالة الاستجابة إلى 202 Accepted
201
+ async def embed_text(request: TextRequest, background_tasks: BackgroundTasks):
202
+ global is_processing_queue
203
+
204
+ request_data = {"text": request.text}
205
+ if request.request_id:
206
+ request_data["request_id"] = request.request_id
207
+
208
+ async with queue_lock:
209
+ request_queue.append(request_data) # إضافة الطلب إلى قائمة الانتظار
210
+ logger.info(f"Request (ID: {request.request_id or 'N/A'}) added to queue. Queue size: {len(request_queue)}")
211
+
212
+ # إذا لم يكن هناك معالج يعمل حاليًا، ابدأ واحدًا في الخلفية
213
+ if not is_processing_queue:
214
+ is_processing_queue = True # تعيين المؤشر لمنع بدء معالجات متعددة
215
+ background_tasks.add_task(process_queue)
216
+ logger.info("Started background queue processor.")
217
+
218
+ # إرجاع استجابة سريعة للعميل لإعلامه بأن الطلب تم استلامه ومعالجته لاحقًا
219
+ return {
220
+ "success": True,
221
+ "message": "Request received and added to queue for processing.",
222
+ "request_id": request.request_id # إرجاع الـ ID للعميل
223
+ }
224
 
225
  if __name__ == "__main__":
226
+ uvicorn.run(app, host="0.0.0.0", port=7860)