ex510 commited on
Commit
fcc9024
·
verified ·
1 Parent(s): 81a215d

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +52 -167
main.py CHANGED
@@ -1,77 +1,35 @@
1
- from fastapi import FastAPI, HTTPException, BackgroundTasks, status
2
  from pydantic import BaseModel, Field
3
  from sentence_transformers import SentenceTransformer
4
  import uvicorn
5
  import asyncio
6
  from concurrent.futures import ThreadPoolExecutor
7
- from typing import List, Dict, Any, Optional
8
  import numpy as np
9
  from contextlib import asynccontextmanager
10
  import httpx
11
  import os
12
- import collections
13
- import logging
14
- import nest_asyncio
15
- import threading
16
- import time
17
-
18
- # تهيئة التسجيل (Logging) بدلاً من print
19
- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
20
- logger = logging.getLogger(__name__)
21
 
22
  # Globals
23
- model: Optional[SentenceTransformer] = None
24
- tokenizer: Any = None # قد لا يكون نوعه واضحًا دائمًا لجميع النماذج
25
  model_id = 'Qwen/Qwen3-Embedding-0.6B'
26
-
27
- # تم تعديل max_workers إلى 1 لضمان معالجة تسلسلية
28
- executor = ThreadPoolExecutor(max_workers=1)
29
-
30
- # تم تحديث الحد الأقصى للتوكنز
31
  MAX_TOKENS = 32000
32
 
33
- # --- إضافة عناصر جديدة لإدارة قائمة الانتظار ---
34
- request_queue = collections.deque() # قائمة انتظار لتخزين الطلبات
35
- queue_lock = asyncio.Lock() # قفل لضمان الوصول الآمن لقائمة الانتظار
36
- is_processing_queue = False # مؤشر لمعرفة ما إذا كان المعالج يعمل حاليًا
37
- # --------------------------------------------------
38
-
39
  @asynccontextmanager
40
  async def lifespan(app: FastAPI):
41
  # Load the model and tokenizer at startup
42
  global model, tokenizer
43
- logger.info(f"Loading model: {model_id}...")
44
- try:
45
- model = SentenceTransformer(model_id)
46
- # محاولة الوصول إلى الـ tokenizer
47
- if hasattr(model, 'tokenizer') and model.tokenizer is not None:
48
- tokenizer = model.tokenizer
49
- else:
50
- # إذا لم يكن متاحًا مباشرة، حاول تحميله بشكل منفصل باستخدام Hugging Face transformers
51
- try:
52
- from transformers import AutoTokenizer
53
- tokenizer = AutoTokenizer.from_pretrained(model_id)
54
- logger.warning(f"Model {model_id} did not have a direct 'tokenizer' attribute. Loaded separately using AutoTokenizer.")
55
- except ImportError:
56
- logger.error("transformers library not found. Could not load tokenizer separately.")
57
- raise # يجب أن يكون tokenizer موجودًا
58
- except Exception as e:
59
- logger.error(f"Failed to load tokenizer separately for {model_id}: {e}", exc_info=True)
60
- raise
61
-
62
- logger.info("Model loaded successfully")
63
- except Exception as e:
64
- logger.critical(f"Failed to load model or tokenizer {model_id}: {e}", exc_info=True)
65
- # رفع الاستثناء لضمان عدم بدء التطبيق إذا فشل تحميل النموذج
66
- raise
67
-
68
  yield
69
  # (Optional) Clean up resources at shutdown
70
- logger.info("Cleaning up resources...")
71
  model = None
72
  tokenizer = None
73
- executor.shutdown(wait=True) # إغلاق الـ executor بشكل صحيح
74
- logger.info("Resources cleaned up.")
75
 
76
  app = FastAPI(
77
  title="Text Embedding API (Qwen/Qwen3-Embedding-0.6B)",
@@ -80,23 +38,20 @@ app = FastAPI(
80
 
81
  class TextRequest(BaseModel):
82
  text: str = Field(..., min_length=1, description="Text to embed")
83
- # إضافة حقل request_id اختياري
84
  request_id: str | None = Field(None, description="Optional unique identifier for the request")
85
 
86
- async def send_to_webhook(url: str, data: Dict[str, Any]):
 
 
 
87
  """Sends data to a webhook URL asynchronously."""
88
- request_id = data.get("request_id", "N/A")
89
  try:
90
  async with httpx.AsyncClient() as client:
91
  response = await client.post(url, json=data)
92
  response.raise_for_status() # Raise an exception for bad status codes (4xx or 5xx)
93
- logger.info(f"Successfully sent data to webhook (ID: {request_id}): {url}")
94
  except httpx.RequestError as e:
95
- logger.error(f"Error sending data to webhook (ID: {request_id}) {url}: {e}", exc_info=True)
96
- except httpx.HTTPStatusError as e:
97
- logger.error(f"Webhook HTTP error (ID: {request_id}) {url} - Status: {e.response.status_code}, Response: {e.response.text}", exc_info=True)
98
- except Exception as e:
99
- logger.error(f"An unexpected error occurred in send_to_webhook (ID: {request_id}): {e}", exc_info=True)
100
 
101
  @app.get("/")
102
  def home():
@@ -104,131 +59,61 @@ def home():
104
 
105
  def chunk_and_embed(text: str) -> List[float]:
106
  """Split text into chunks if too long, then pool embeddings"""
107
- if not tokenizer or not model:
108
- raise RuntimeError("Model or tokenizer not loaded or initialized correctly.")
109
-
110
  tokens = tokenizer.encode(text, add_special_tokens=False)
111
-
112
  # If text is short, embed directly
113
  if len(tokens) <= MAX_TOKENS:
114
  return model.encode(text, normalize_embeddings=True).tolist()
115
-
116
  # Split into chunks
117
  chunks = []
118
- overlap = 50 # Overlap tokens - يمكنك تعديلها حسب الحاجة
119
  start = 0
120
  while start < len(tokens):
121
  end = start + MAX_TOKENS
122
  chunk_tokens = tokens[start:end]
123
-
124
- # التأكد من أننا لا نحاول فك ترميز قائمة توكنز فارغة
125
- if not chunk_tokens:
126
- break
127
-
128
  chunk_text = tokenizer.decode(chunk_tokens, skip_special_tokens=True)
129
  chunks.append(chunk_text)
130
-
131
- if end >= len(tokens): # إذا وصلنا إلى نهاية النص
132
  break
133
- start = end - overlap # للبدء من الجزء المتداخل التالي
134
-
135
  # Embed all chunks
136
- # Note: If this list comprehension causes memory issues for very long texts,
137
- # consider processing chunks in smaller batches or using a generator
138
  chunk_embeddings = [model.encode(chunk, normalize_embeddings=True) for chunk in chunks]
139
-
140
  # Pool embeddings (mean)
141
  final_embedding = np.mean(chunk_embeddings, axis=0).tolist()
142
-
143
  return final_embedding
144
 
145
- # --- دالة المعالج الجديدة ---
146
- async def process_queue():
147
- global is_processing_queue
148
- webhook_url = os.environ.get("WEBHOOK_URL")
149
-
150
- async with queue_lock:
151
- # إذا لم تكن هناك عناصر في قائمة الانتظار، أو كان المعالج يعمل بالفعل، لا تفعل شيئًا
152
- if not request_queue or is_processing_queue:
153
- return
154
- is_processing_queue = True # تعيين المؤشر إلى True للإشارة إلى أن المعالج يعمل
155
-
156
- logger.info("Starting to process embedding queue (single worker mode)...")
157
  try:
158
- while True:
159
- async with queue_lock:
160
- if not request_queue:
161
- logger.info("Embedding queue is empty. Stopping processor.")
162
- is_processing_queue = False # إعادة تعيين المؤشر
163
- break # الخروج من الحلقة عند فراغ قائمة الانتظار
164
-
165
- # استخراج العنصر الأول من قائمة الانتظار
166
- request_item = request_queue.popleft()
167
- text_to_embed = request_item["text"]
168
- request_id_for_webhook = request_item.get("request_id", "N/A")
169
-
170
- logger.info(f"Processing item from queue (ID: {request_id_for_webhook})... ")
171
- try:
172
- embedding = await asyncio.to_thread(chunk_and_embed, text_to_embed)
173
- logger.info(f"Embedding successful for item (ID: {request_id_for_webhook}). Checking webhook configuration...")
174
-
175
- if webhook_url:
176
- payload = {
177
- "text": text_to_embed,
178
- "embedding": embedding,
179
- "request_id": request_id_for_webhook
180
- }
181
- await send_to_webhook(webhook_url, payload)
182
- else:
183
- logger.warning(f"WEBHOOK_URL not set. Embedding result for (ID: {request_id_for_webhook}) will not be sent to a webhook.")
184
-
185
- logger.info(f"Finished processing item (ID: {request_id_for_webhook}).")
186
-
187
- except Exception as e:
188
- logger.error(f"Error during chunk_and_embed for item (ID: {request_id_for_webhook}) in queue: {e}", exc_info=True)
189
-
190
- # السماح بالتأجيل قليلًا لمنع حظر الـ event loop بالكامل إذا كانت المعالجة سريعة جدًا
191
- await asyncio.sleep(0.01)
192
-
193
  except Exception as e:
194
- logger.critical(f"CRITICAL ERROR in process_queue: {e}", exc_info=True)
195
- finally:
196
- async with queue_lock:
197
- # التأكد من إعادة تعيين المؤشر حتى لو حدث خطأ
198
- is_processing_queue = False
199
-
200
- @app.post("/embed/text", status_code=status.HTTP_202_ACCEPTED) # تغيير حالة الاستجابة إلى 202 Accepted
201
- async def embed_text(request: TextRequest, background_tasks: BackgroundTasks):
202
- global is_processing_queue
203
-
204
- request_data = {"text": request.text}
205
- if request.request_id:
206
- request_data["request_id"] = request.request_id
207
-
208
- async with queue_lock:
209
- request_queue.append(request_data) # إضافة الطلب إلى قائمة الانتظار
210
- logger.info(f"Request (ID: {request.request_id or 'N/A'}) added to queue. Queue size: {len(request_queue)}")
211
-
212
- # إذا لم يكن هناك معالج يعمل حاليًا، ابدأ واحدًا في الخلفية
213
- if not is_processing_queue:
214
- is_processing_queue = True # تعيين المؤشر لمنع بدء معالجات متعددة
215
- background_tasks.add_task(process_queue)
216
- logger.info("Started background queue processor.")
217
-
218
- # إرجاع استجابة سريعة للعميل لإعلامه بأن الطلب تم استلامه ومعالجته لاحقًا
219
- return {
220
- "success": True,
221
- "message": "Request received and added to queue for processing.",
222
- "request_id": request.request_id # إرجاع الـ ID للعميل
223
- }
224
-
225
- def run_uvicorn():
226
- nest_asyncio.apply()
227
- uvicorn.run(app, host="0.0.0.0", port=7860)
228
-
229
- # Start Uvicorn in a new thread
230
- uvicorn_thread = threading.Thread(target=run_uvicorn)
231
- uvicorn_thread.start()
232
 
233
- # Optional: Add a small delay to allow Uvicorn to start up
234
- time.sleep(1)
 
1
+ from fastapi import FastAPI, HTTPException, BackgroundTasks
2
  from pydantic import BaseModel, Field
3
  from sentence_transformers import SentenceTransformer
4
  import uvicorn
5
  import asyncio
6
  from concurrent.futures import ThreadPoolExecutor
7
+ from typing import List
8
  import numpy as np
9
  from contextlib import asynccontextmanager
10
  import httpx
11
  import os
 
 
 
 
 
 
 
 
 
12
 
13
  # Globals
14
+ model = None
15
+ tokenizer = None
16
  model_id = 'Qwen/Qwen3-Embedding-0.6B'
17
+ executor = ThreadPoolExecutor(max_workers=4)
 
 
 
 
18
  MAX_TOKENS = 32000
19
 
 
 
 
 
 
 
20
  @asynccontextmanager
21
  async def lifespan(app: FastAPI):
22
  # Load the model and tokenizer at startup
23
  global model, tokenizer
24
+ print(f"Loading model: {model_id}...")
25
+ model = SentenceTransformer(model_id)
26
+ tokenizer = model.tokenizer
27
+ print("Model loaded successfully")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
28
  yield
29
  # (Optional) Clean up resources at shutdown
30
+ print("Cleaning up resources...")
31
  model = None
32
  tokenizer = None
 
 
33
 
34
  app = FastAPI(
35
  title="Text Embedding API (Qwen/Qwen3-Embedding-0.6B)",
 
38
 
39
  class TextRequest(BaseModel):
40
  text: str = Field(..., min_length=1, description="Text to embed")
 
41
  request_id: str | None = Field(None, description="Optional unique identifier for the request")
42
 
43
+
44
+
45
+
46
+ async def send_to_webhook(url: str, data: dict):
47
  """Sends data to a webhook URL asynchronously."""
 
48
  try:
49
  async with httpx.AsyncClient() as client:
50
  response = await client.post(url, json=data)
51
  response.raise_for_status() # Raise an exception for bad status codes (4xx or 5xx)
52
+ print(f"Successfully sent data to webhook: {url}")
53
  except httpx.RequestError as e:
54
+ print(f"Error sending data to webhook {url}: {e}")
 
 
 
 
55
 
56
  @app.get("/")
57
  def home():
 
59
 
60
  def chunk_and_embed(text: str) -> List[float]:
61
  """Split text into chunks if too long, then pool embeddings"""
 
 
 
62
  tokens = tokenizer.encode(text, add_special_tokens=False)
63
+
64
  # If text is short, embed directly
65
  if len(tokens) <= MAX_TOKENS:
66
  return model.encode(text, normalize_embeddings=True).tolist()
67
+
68
  # Split into chunks
69
  chunks = []
70
+ overlap = 50
71
  start = 0
72
  while start < len(tokens):
73
  end = start + MAX_TOKENS
74
  chunk_tokens = tokens[start:end]
 
 
 
 
 
75
  chunk_text = tokenizer.decode(chunk_tokens, skip_special_tokens=True)
76
  chunks.append(chunk_text)
77
+
78
+ if end >= len(tokens):
79
  break
80
+ start = end - overlap
81
+
82
  # Embed all chunks
 
 
83
  chunk_embeddings = [model.encode(chunk, normalize_embeddings=True) for chunk in chunks]
84
+
85
  # Pool embeddings (mean)
86
  final_embedding = np.mean(chunk_embeddings, axis=0).tolist()
87
+
88
  return final_embedding
89
 
90
+ @app.post("/embed/text")
91
+ async def embed_text(request: TextRequest, background_tasks: BackgroundTasks):
 
 
 
 
 
 
 
 
 
 
92
  try:
93
+ loop = asyncio.get_event_loop()
94
+ embedding = await loop.run_in_executor(
95
+ executor,
96
+ lambda: chunk_and_embed(request.text)
97
+ )
98
+
99
+ # Check for webhook URL and add the background task
100
+ webhook_url = os.environ.get("WEBHOOK_URL")
101
+ if webhook_url:
102
+ payload = {
103
+ "text": request.text,
104
+ "embedding": embedding,
105
+ "request_id": request.request_id
106
+ }
107
+ background_tasks.add_task(send_to_webhook, webhook_url, payload)
108
+
109
+ return {
110
+ "success": True,
111
+ "model": model_id,
112
+ "dimension": len(embedding),
113
+ "embedding": embedding
114
+ }
 
 
 
 
 
 
 
 
 
 
 
 
 
115
  except Exception as e:
116
+ raise HTTPException(status_code=500, detail=str(e))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
117
 
118
+ if __name__ == "__main__":
119
+ uvicorn.run(app, host="0.0.0.0", port=7860)