ex510 commited on
Commit
81a215d
·
verified ·
1 Parent(s): 1e5ffc1

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +167 -52
main.py CHANGED
@@ -1,35 +1,77 @@
1
- from fastapi import FastAPI, HTTPException, BackgroundTasks
2
  from pydantic import BaseModel, Field
3
  from sentence_transformers import SentenceTransformer
4
  import uvicorn
5
  import asyncio
6
  from concurrent.futures import ThreadPoolExecutor
7
- from typing import List
8
  import numpy as np
9
  from contextlib import asynccontextmanager
10
  import httpx
11
  import os
 
 
 
 
 
 
 
 
 
12
 
13
  # Globals
14
- model = None
15
- tokenizer = None
16
  model_id = 'Qwen/Qwen3-Embedding-0.6B'
17
- executor = ThreadPoolExecutor(max_workers=4)
 
 
 
 
18
  MAX_TOKENS = 32000
19
 
 
 
 
 
 
 
20
  @asynccontextmanager
21
  async def lifespan(app: FastAPI):
22
  # Load the model and tokenizer at startup
23
  global model, tokenizer
24
- print(f"Loading model: {model_id}...")
25
- model = SentenceTransformer(model_id)
26
- tokenizer = model.tokenizer
27
- print("Model loaded successfully")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
28
  yield
29
  # (Optional) Clean up resources at shutdown
30
- print("Cleaning up resources...")
31
  model = None
32
  tokenizer = None
 
 
33
 
34
  app = FastAPI(
35
  title="Text Embedding API (Qwen/Qwen3-Embedding-0.6B)",
@@ -38,20 +80,23 @@ app = FastAPI(
38
 
39
  class TextRequest(BaseModel):
40
  text: str = Field(..., min_length=1, description="Text to embed")
 
41
  request_id: str | None = Field(None, description="Optional unique identifier for the request")
42
 
43
-
44
-
45
-
46
- async def send_to_webhook(url: str, data: dict):
47
  """Sends data to a webhook URL asynchronously."""
 
48
  try:
49
  async with httpx.AsyncClient() as client:
50
  response = await client.post(url, json=data)
51
  response.raise_for_status() # Raise an exception for bad status codes (4xx or 5xx)
52
- print(f"Successfully sent data to webhook: {url}")
53
  except httpx.RequestError as e:
54
- print(f"Error sending data to webhook {url}: {e}")
 
 
 
 
55
 
56
  @app.get("/")
57
  def home():
@@ -59,61 +104,131 @@ def home():
59
 
60
  def chunk_and_embed(text: str) -> List[float]:
61
  """Split text into chunks if too long, then pool embeddings"""
 
 
 
62
  tokens = tokenizer.encode(text, add_special_tokens=False)
63
-
64
  # If text is short, embed directly
65
  if len(tokens) <= MAX_TOKENS:
66
  return model.encode(text, normalize_embeddings=True).tolist()
67
-
68
  # Split into chunks
69
  chunks = []
70
- overlap = 50
71
  start = 0
72
  while start < len(tokens):
73
  end = start + MAX_TOKENS
74
  chunk_tokens = tokens[start:end]
 
 
 
 
 
75
  chunk_text = tokenizer.decode(chunk_tokens, skip_special_tokens=True)
76
  chunks.append(chunk_text)
77
-
78
- if end >= len(tokens):
79
  break
80
- start = end - overlap
81
-
82
  # Embed all chunks
 
 
83
  chunk_embeddings = [model.encode(chunk, normalize_embeddings=True) for chunk in chunks]
84
-
85
  # Pool embeddings (mean)
86
  final_embedding = np.mean(chunk_embeddings, axis=0).tolist()
87
-
88
  return final_embedding
89
 
90
- @app.post("/embed/text")
91
- async def embed_text(request: TextRequest, background_tasks: BackgroundTasks):
 
 
 
 
 
 
 
 
 
 
92
  try:
93
- loop = asyncio.get_event_loop()
94
- embedding = await loop.run_in_executor(
95
- executor,
96
- lambda: chunk_and_embed(request.text)
97
- )
98
-
99
- # Check for webhook URL and add the background task
100
- webhook_url = os.environ.get("WEBHOOK_URL")
101
- if webhook_url:
102
- payload = {
103
- "text": request.text,
104
- "embedding": embedding,
105
- "request_id": request.request_id
106
- }
107
- background_tasks.add_task(send_to_webhook, webhook_url, payload)
108
-
109
- return {
110
- "success": True,
111
- "model": model_id,
112
- "dimension": len(embedding),
113
- "embedding": embedding
114
- }
 
 
 
 
 
 
 
 
 
 
 
 
 
115
  except Exception as e:
116
- raise HTTPException(status_code=500, detail=str(e))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
117
 
118
- if __name__ == "__main__":
119
- uvicorn.run(app, host="0.0.0.0", port=7860)
 
1
+ from fastapi import FastAPI, HTTPException, BackgroundTasks, status
2
  from pydantic import BaseModel, Field
3
  from sentence_transformers import SentenceTransformer
4
  import uvicorn
5
  import asyncio
6
  from concurrent.futures import ThreadPoolExecutor
7
+ from typing import List, Dict, Any, Optional
8
  import numpy as np
9
  from contextlib import asynccontextmanager
10
  import httpx
11
  import os
12
+ import collections
13
+ import logging
14
+ import nest_asyncio
15
+ import threading
16
+ import time
17
+
18
+ # تهيئة التسجيل (Logging) بدلاً من print
19
+ logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
20
+ logger = logging.getLogger(__name__)
21
 
22
  # Globals
23
+ model: Optional[SentenceTransformer] = None
24
+ tokenizer: Any = None # قد لا يكون نوعه واضحًا دائمًا لجميع النماذج
25
  model_id = 'Qwen/Qwen3-Embedding-0.6B'
26
+
27
+ # تم تعديل max_workers إلى 1 لضمان معالجة تسلسلية
28
+ executor = ThreadPoolExecutor(max_workers=1)
29
+
30
+ # تم تحديث الحد الأقصى للتوكنز
31
  MAX_TOKENS = 32000
32
 
33
+ # --- إضافة عناصر جديدة لإدارة قائمة الانتظار ---
34
+ request_queue = collections.deque() # قائمة انتظار لتخزين الطلبات
35
+ queue_lock = asyncio.Lock() # قفل لضمان الوصول الآمن لقائمة الانتظار
36
+ is_processing_queue = False # مؤشر لمعرفة ما إذا كان المعالج يعمل حاليًا
37
+ # --------------------------------------------------
38
+
39
  @asynccontextmanager
40
  async def lifespan(app: FastAPI):
41
  # Load the model and tokenizer at startup
42
  global model, tokenizer
43
+ logger.info(f"Loading model: {model_id}...")
44
+ try:
45
+ model = SentenceTransformer(model_id)
46
+ # محاولة الوصول إلى الـ tokenizer
47
+ if hasattr(model, 'tokenizer') and model.tokenizer is not None:
48
+ tokenizer = model.tokenizer
49
+ else:
50
+ # إذا لم يكن متاحًا مباشرة، حاول تحميله بشكل منفصل باستخدام Hugging Face transformers
51
+ try:
52
+ from transformers import AutoTokenizer
53
+ tokenizer = AutoTokenizer.from_pretrained(model_id)
54
+ logger.warning(f"Model {model_id} did not have a direct 'tokenizer' attribute. Loaded separately using AutoTokenizer.")
55
+ except ImportError:
56
+ logger.error("transformers library not found. Could not load tokenizer separately.")
57
+ raise # يجب أن يكون tokenizer موجودًا
58
+ except Exception as e:
59
+ logger.error(f"Failed to load tokenizer separately for {model_id}: {e}", exc_info=True)
60
+ raise
61
+
62
+ logger.info("Model loaded successfully")
63
+ except Exception as e:
64
+ logger.critical(f"Failed to load model or tokenizer {model_id}: {e}", exc_info=True)
65
+ # رفع الاستثناء لضمان عدم بدء التطبيق إذا فشل تحميل النموذج
66
+ raise
67
+
68
  yield
69
  # (Optional) Clean up resources at shutdown
70
+ logger.info("Cleaning up resources...")
71
  model = None
72
  tokenizer = None
73
+ executor.shutdown(wait=True) # إغلاق الـ executor بشكل صحيح
74
+ logger.info("Resources cleaned up.")
75
 
76
  app = FastAPI(
77
  title="Text Embedding API (Qwen/Qwen3-Embedding-0.6B)",
 
80
 
81
  class TextRequest(BaseModel):
82
  text: str = Field(..., min_length=1, description="Text to embed")
83
+ # إضافة حقل request_id اختياري
84
  request_id: str | None = Field(None, description="Optional unique identifier for the request")
85
 
86
+ async def send_to_webhook(url: str, data: Dict[str, Any]):
 
 
 
87
  """Sends data to a webhook URL asynchronously."""
88
+ request_id = data.get("request_id", "N/A")
89
  try:
90
  async with httpx.AsyncClient() as client:
91
  response = await client.post(url, json=data)
92
  response.raise_for_status() # Raise an exception for bad status codes (4xx or 5xx)
93
+ logger.info(f"Successfully sent data to webhook (ID: {request_id}): {url}")
94
  except httpx.RequestError as e:
95
+ logger.error(f"Error sending data to webhook (ID: {request_id}) {url}: {e}", exc_info=True)
96
+ except httpx.HTTPStatusError as e:
97
+ logger.error(f"Webhook HTTP error (ID: {request_id}) {url} - Status: {e.response.status_code}, Response: {e.response.text}", exc_info=True)
98
+ except Exception as e:
99
+ logger.error(f"An unexpected error occurred in send_to_webhook (ID: {request_id}): {e}", exc_info=True)
100
 
101
  @app.get("/")
102
  def home():
 
104
 
105
  def chunk_and_embed(text: str) -> List[float]:
106
  """Split text into chunks if too long, then pool embeddings"""
107
+ if not tokenizer or not model:
108
+ raise RuntimeError("Model or tokenizer not loaded or initialized correctly.")
109
+
110
  tokens = tokenizer.encode(text, add_special_tokens=False)
111
+
112
  # If text is short, embed directly
113
  if len(tokens) <= MAX_TOKENS:
114
  return model.encode(text, normalize_embeddings=True).tolist()
115
+
116
  # Split into chunks
117
  chunks = []
118
+ overlap = 50 # Overlap tokens - يمكنك تعديلها حسب الحاجة
119
  start = 0
120
  while start < len(tokens):
121
  end = start + MAX_TOKENS
122
  chunk_tokens = tokens[start:end]
123
+
124
+ # التأكد من أننا لا نحاول فك ترميز قائمة توكنز فارغة
125
+ if not chunk_tokens:
126
+ break
127
+
128
  chunk_text = tokenizer.decode(chunk_tokens, skip_special_tokens=True)
129
  chunks.append(chunk_text)
130
+
131
+ if end >= len(tokens): # إذا وصلنا إلى نهاية النص
132
  break
133
+ start = end - overlap # للبدء من الجزء المتداخل التالي
134
+
135
  # Embed all chunks
136
+ # Note: If this list comprehension causes memory issues for very long texts,
137
+ # consider processing chunks in smaller batches or using a generator
138
  chunk_embeddings = [model.encode(chunk, normalize_embeddings=True) for chunk in chunks]
139
+
140
  # Pool embeddings (mean)
141
  final_embedding = np.mean(chunk_embeddings, axis=0).tolist()
142
+
143
  return final_embedding
144
 
145
+ # --- دالة المعالج الجديدة ---
146
+ async def process_queue():
147
+ global is_processing_queue
148
+ webhook_url = os.environ.get("WEBHOOK_URL")
149
+
150
+ async with queue_lock:
151
+ # إذا لم تكن هناك عناصر في قائمة الانتظار، أو كان المعالج يعمل بالفعل، لا تفعل شيئًا
152
+ if not request_queue or is_processing_queue:
153
+ return
154
+ is_processing_queue = True # تعيين المؤشر إلى True للإشارة إلى أن المعالج يعمل
155
+
156
+ logger.info("Starting to process embedding queue (single worker mode)...")
157
  try:
158
+ while True:
159
+ async with queue_lock:
160
+ if not request_queue:
161
+ logger.info("Embedding queue is empty. Stopping processor.")
162
+ is_processing_queue = False # إعادة تعيين المؤشر
163
+ break # الخروج من الحلقة عند فراغ قائمة الانتظار
164
+
165
+ # استخراج العنصر الأول من قائمة الانتظار
166
+ request_item = request_queue.popleft()
167
+ text_to_embed = request_item["text"]
168
+ request_id_for_webhook = request_item.get("request_id", "N/A")
169
+
170
+ logger.info(f"Processing item from queue (ID: {request_id_for_webhook})... ")
171
+ try:
172
+ embedding = await asyncio.to_thread(chunk_and_embed, text_to_embed)
173
+ logger.info(f"Embedding successful for item (ID: {request_id_for_webhook}). Checking webhook configuration...")
174
+
175
+ if webhook_url:
176
+ payload = {
177
+ "text": text_to_embed,
178
+ "embedding": embedding,
179
+ "request_id": request_id_for_webhook
180
+ }
181
+ await send_to_webhook(webhook_url, payload)
182
+ else:
183
+ logger.warning(f"WEBHOOK_URL not set. Embedding result for (ID: {request_id_for_webhook}) will not be sent to a webhook.")
184
+
185
+ logger.info(f"Finished processing item (ID: {request_id_for_webhook}).")
186
+
187
+ except Exception as e:
188
+ logger.error(f"Error during chunk_and_embed for item (ID: {request_id_for_webhook}) in queue: {e}", exc_info=True)
189
+
190
+ # السماح بالتأجيل قليلًا لمنع حظر الـ event loop بالكامل إذا كانت المعالجة سريعة جدًا
191
+ await asyncio.sleep(0.01)
192
+
193
  except Exception as e:
194
+ logger.critical(f"CRITICAL ERROR in process_queue: {e}", exc_info=True)
195
+ finally:
196
+ async with queue_lock:
197
+ # التأكد من إعادة تعيين المؤشر حتى لو حدث خطأ
198
+ is_processing_queue = False
199
+
200
+ @app.post("/embed/text", status_code=status.HTTP_202_ACCEPTED) # تغيير حالة الاستجابة إلى 202 Accepted
201
+ async def embed_text(request: TextRequest, background_tasks: BackgroundTasks):
202
+ global is_processing_queue
203
+
204
+ request_data = {"text": request.text}
205
+ if request.request_id:
206
+ request_data["request_id"] = request.request_id
207
+
208
+ async with queue_lock:
209
+ request_queue.append(request_data) # إضافة الطلب إلى قائمة الانتظار
210
+ logger.info(f"Request (ID: {request.request_id or 'N/A'}) added to queue. Queue size: {len(request_queue)}")
211
+
212
+ # إذا لم يكن هناك معالج يعمل حاليًا، ابدأ واحدًا في الخلفية
213
+ if not is_processing_queue:
214
+ is_processing_queue = True # تعيين المؤشر لمنع بدء معالجات متعددة
215
+ background_tasks.add_task(process_queue)
216
+ logger.info("Started background queue processor.")
217
+
218
+ # إرجاع استجابة سريعة للعميل لإعلامه بأن الطلب تم استلامه ومعالجته لاحقًا
219
+ return {
220
+ "success": True,
221
+ "message": "Request received and added to queue for processing.",
222
+ "request_id": request.request_id # إرجاع الـ ID للعميل
223
+ }
224
+
225
+ def run_uvicorn():
226
+ nest_asyncio.apply()
227
+ uvicorn.run(app, host="0.0.0.0", port=7860)
228
+
229
+ # Start Uvicorn in a new thread
230
+ uvicorn_thread = threading.Thread(target=run_uvicorn)
231
+ uvicorn_thread.start()
232
 
233
+ # Optional: Add a small delay to allow Uvicorn to start up
234
+ time.sleep(1)