emmd / cop.py
Mazenbs's picture
Update cop.py
56863c2 verified
# main.py
import os
import time
import re
import gc
import logging
from functools import lru_cache
from typing import List
import multiprocessing
import numpy as np
import psutil
import onnxruntime as ort
from transformers import AutoTokenizer
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
# =====================================================
# إعدادات عامة (CPU – HuggingFace Spaces)
# =====================================================
import os
MODEL_PATH = os.environ["MODEL_PATH"]
TOKENIZER_PATH = os.environ["TOKENIZER_PATH"]
MAX_TEXT_LENGTH = int(os.environ.get("MAX_TEXT_LENGTH", 256)) # الاستعلامات قصيرة
CACHE_SIZE = int(os.environ.get("CACHE_SIZE", 1024))
PORT = int(os.environ.get("PORT", 7860))
# تقليل logging لزيادة السرعة
logging.basicConfig(level=logging.ERROR) # فقط الأخطاء
logger = logging.getLogger("embedding-api")
# =====================================================
# تسريع ONNX Runtime على CPU
# =====================================================
# ضبط عدد الخيوط حسب عدد أنوية السيرفر
num_threads = multiprocessing.cpu_count()
os.environ["OMP_NUM_THREADS"] = str(num_threads)
os.environ["OMP_WAIT_POLICY"] = "ACTIVE"
sess_options = ort.SessionOptions()
sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
sess_options.intra_op_num_threads = num_threads
sess_options.inter_op_num_threads = 1
sess_options.enable_cpu_mem_arena = True
sess_options.execution_mode = ort.ExecutionMode.ORT_SEQUENTIAL
sess_options.optimized_model_filepath = "optimized_model.onnx"
session = ort.InferenceSession(
MODEL_PATH,
sess_options=sess_options,
providers=[("CPUExecutionProvider", {})],
)
# =====================================================
# تحميل tokenizer مرة واحدة
# =====================================================
tokenizer = AutoTokenizer.from_pretrained(
TOKENIZER_PATH,
local_files_only=True,
use_fast=True
)
# =====================================================
# تطبيع النص العربي (مع cache)
# =====================================================
@lru_cache(maxsize=4096)
def normalize_arabic(text: str) -> str:
text = re.sub(r"[ًٌٍَُِّْـ]", "", text)
text = re.sub(r"[إأآ]", "ا", text)
text = re.sub(r"ى", "ي", text)
text = re.sub(r"ؤ", "و", text)
text = re.sub(r"ئ", "ي", text)
text = re.sub(r"ة\b", "ه", text)
text = re.sub(r"[^\w\s]", " ", text)
text = re.sub(r"\s+", " ", text)
return text.strip()
# =====================================================
# تحويل النص إلى Embedding (سريع + cache)
# =====================================================
@lru_cache(maxsize=CACHE_SIZE)
def text_to_embedding(text: str) -> np.ndarray:
if not text or not text.strip():
return None
text = normalize_arabic(text)
inputs = tokenizer(
f"query: {text}",
return_tensors="np",
truncation=True,
max_length=64, # كافي للاستعلامات القصيرة
padding="max_length", # ثابت الشكل = أسرع على CPU
return_token_type_ids=False,
return_attention_mask=True
)
outputs = session.run(None, dict(inputs))
vector = outputs[1][0].astype(np.float32)
# L2 normalize
norm = np.linalg.norm(vector)
if norm > 0.0:
vector /= norm
return vector
# =====================================================
# نماذج API
# =====================================================
class TextRequest(BaseModel):
text: str = Field(..., min_length=1, max_length=MAX_TEXT_LENGTH)
class EmbeddingResponse(BaseModel):
embedding: List[float]
dimension: int
processing_time: float
class HealthResponse(BaseModel):
status: str
memory_usage: str
memory_available_gb: float
uptime: float
# =====================================================
# إنشاء التطبيق
# =====================================================
app = FastAPI(
title="Fast Arabic Embedding API (CPU Optimized)",
version="4.0.0"
)
# =====================================================
# نقاط النهاية
# =====================================================
@app.get("/")
def root():
return {"status": "ok", "docs": "/docs", "health": "/health"}
@app.get("/health", response_model=HealthResponse)
def health():
memory = psutil.virtual_memory()
uptime = time.time() - app.state.start_time
return HealthResponse(
status="healthy",
memory_usage=f"{memory.percent}%",
memory_available_gb=round(memory.available / (1024 ** 3), 2),
uptime=uptime,
)
@app.post("/query", response_model=EmbeddingResponse)
def query_endpoint(request: TextRequest):
start = time.perf_counter()
vector = text_to_embedding(request.text)
if vector is None:
raise HTTPException(400, "فشل إنشاء embedding")
return EmbeddingResponse(
embedding=vector.tolist(),
dimension=vector.shape[0],
processing_time=round(time.perf_counter() - start, 6)
)
# =====================================================
# startup / shutdown
# =====================================================
@app.on_event("startup")
def startup():
app.state.start_time = time.time()
# warm-up (مهم جدًا لتسريع أول طلب)
text_to_embedding("warm up")
logger.error("🚀 Embedding API started")
@app.on_event("shutdown")
def shutdown():
gc.collect()
logger.error("🛑 Embedding API stopped")
# =====================================================
# تشغيل السيرفر
# =====================================================
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"main:app",
host="0.0.0.0",
port=PORT,
workers=1, # HuggingFace Spaces = worker واحد
access_log=False
)