from fastapi import FastAPI, File, UploadFile, Form from fastapi.responses import JSONResponse from PIL import Image import io import torch import logging from typing import List, Optional, Tuple import requests import threading from contextlib import asynccontextmanager import queue from logic import WatermarkRemover # --- Настройка --- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(name)s - %(message)s') logger = logging.getLogger(__name__) # --- Глобальные переменные --- remover: Optional[WatermarkRemover] = None MAX_QUEUE_SIZE = 16 task_queue: "queue.Queue[Tuple[int, List[bytes], str, Optional[str]]]" = queue.Queue(maxsize=MAX_QUEUE_SIZE) def process_in_background(task_id: int, image_contents: List[bytes], callback_url: str, webhook_secret: str): """Эта функция теперь вызывается только из воркера, поэтому model_lock не нужен.""" global remover logger.info(f"[Task {task_id}] Worker picked up task. Processing {len(image_contents)} images.") status = "success" cleaned_image_data = [] try: # Важно: remover.run() внутри себя вызовет ленивую загрузку # тяжелой модели, если она еще не загружена. # Это произойдет только для первой задачи после сна. for i, contents in enumerate(image_contents): image = Image.open(io.BytesIO(contents)).convert("RGB") cleaned_image = remover.run(image) buf = io.BytesIO() cleaned_image.save(buf, format="JPEG", quality=90, optimize=True) cleaned_image_data.append(buf.getvalue()) logger.info(f"[Task {task_id}] All images processed successfully.") except Exception as e: logger.error(f"[Task {task_id}] Error during processing: {e}", exc_info=True) status = "error" # Отправка callback headers = {"X-Webhook-Secret": webhook_secret if webhook_secret else ""} files = [('images', (f'image_{i}.jpeg', img_bytes, 'image/jpeg')) for i, img_bytes in enumerate(cleaned_image_data)] data_payload = {'task_id': str(task_id), 'status': status} try: logger.info(f"[Task {task_id}] Sending callback to {callback_url}") requests.post(callback_url, files=files, data=data_payload, headers=headers, timeout=600).raise_for_status() logger.info(f"[Task {task_id}] Callback sent successfully.") except requests.RequestException as e: logger.error(f"[Task {task_id}] Failed to send callback: {e}") # --- Рабочий поток, который обрабатывает очередь --- def queue_worker(): logger.info("Queue worker thread started.") while True: try: # Блокируется, пока в очереди не появится задача task_id, image_contents, callback_url, webhook_secret = task_queue.get() process_in_background(task_id, image_contents, callback_url, webhook_secret) # Сообщаем очереди, что задача выполнена task_queue.task_done() except Exception as e: # Логируем непредвиденные ошибки в самом воркере logger.exception(f"Critical error in queue_worker loop: {e}") @asynccontextmanager async def lifespan(app: FastAPI): global remover logger.info("Application startup... Performing 'soft' warm-up.") device = "cuda" if torch.cuda.is_available() else "cpu" remover = WatermarkRemover(device=device) # Загружаем ТОЛЬКО легкую модель детектора. # Это требует минимум VRAM и гарантирует успешный старт. remover._load_detector() logger.info("Detector model pre-loaded successfully. Main model will be loaded on first request.") if torch.cuda.is_available(): mem = torch.cuda.get_device_properties(0).total_memory / (1024**3) logger.info(f"Total GPU memory: {mem:.2f} GB") logger.info(f"Allocated after detector load: {torch.cuda.memory_allocated() / (1024**3):.2f} GB") worker_thread = threading.Thread(target=queue_worker, daemon=True, name="QueueWorker") worker_thread.start() logger.info("Queue worker has been started.") yield logger.info("Application shutdown.") app = FastAPI(lifespan=lifespan) @app.get("/") def root(): # Добавим флаг, чтобы видеть, загружена ли основная модель inpainting_loaded = remover is not None and remover.inpainting_pipe is not None return { "message": "Simba AI Services is running", "detector_model_loaded": remover is not None and remover.detector is not None, "inpainting_model_loaded": inpainting_loaded, "tasks_in_queue": task_queue.qsize(), "queue_capacity": MAX_QUEUE_SIZE } # Эндпоинт /process_images/ остается без изменений @app.post("/process_images/") async def process_images_endpoint( images: List[UploadFile] = File(...), task_id: int = Form(...), callback_url: str = Form(...), webhook_secret: Optional[str] = Form(None) ): logger.info(f"Accepted task {task_id}. Images count: {len(images)}") # --- Добавляем задачу в очередь, а не создаем поток --- try: # Сначала считываем файлы, чтобы не блокировать event loop надолго image_contents = [await image.read() for image in images] # Помещаем задачу в очередь, не блокируя основной поток task_queue.put_nowait((int(task_id), image_contents, callback_url, webhook_secret)) current_queue_size = task_queue.qsize() logger.info(f"Task {task_id} added to queue. Current queue size: {current_queue_size}") return JSONResponse( status_code=202, content={ "message": "Task accepted and placed in queue.", "task_id": task_id, "position_in_queue": current_queue_size } ) except queue.Full: logger.warning(f"Task {task_id} rejected because queue is full (size: {task_queue.qsize()})") return JSONResponse( status_code=429, # Too Many Requests content={"message": "Server is busy, the processing queue is full. Please try again later."} ) except Exception as e: logger.error(f"Failed to accept task {task_id}: {e}", exc_info=True) return JSONResponse( status_code=500, content={"message": "An internal error occurred while accepting the task."} )