|
|
import os |
|
|
import numpy as np |
|
|
import requests |
|
|
from flask import Flask, request, jsonify, send_file |
|
|
from werkzeug.utils import secure_filename |
|
|
import onnxruntime |
|
|
from PIL import Image |
|
|
import io |
|
|
import base64 |
|
|
import logging |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
app = Flask(__name__) |
|
|
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 |
|
|
|
|
|
|
|
|
|
|
|
MODEL_URL = "https://huggingface.co/briaai/RMBG-1.4/resolve/main/onnx/model_fp16.onnx?download=true" |
|
|
MODEL_PATH = "isnetis.onnx" |
|
|
UPLOAD_FOLDER = "uploads" |
|
|
RESULT_FOLDER = "results" |
|
|
|
|
|
|
|
|
os.makedirs(UPLOAD_FOLDER, exist_ok=True) |
|
|
os.makedirs(RESULT_FOLDER, exist_ok=True) |
|
|
|
|
|
class BackgroundRemover: |
|
|
def __init__(self): |
|
|
self.session = None |
|
|
self.input_name = None |
|
|
self.output_name = None |
|
|
|
|
|
def download_model(self): |
|
|
if os.path.exists(MODEL_PATH): |
|
|
logger.info("โ
Model already exists, skipping download") |
|
|
return True |
|
|
|
|
|
try: |
|
|
logger.info("๐ฅ Downloading model from HuggingFace...") |
|
|
response = requests.get(MODEL_URL, stream=True) |
|
|
response.raise_for_status() |
|
|
|
|
|
total_size = int(response.headers.get('content-length', 0)) |
|
|
downloaded = 0 |
|
|
|
|
|
with open(MODEL_PATH, 'wb') as f: |
|
|
for chunk in response.iter_content(chunk_size=8192): |
|
|
if chunk: |
|
|
f.write(chunk) |
|
|
downloaded += len(chunk) |
|
|
if total_size > 0: |
|
|
progress = (downloaded / total_size) * 100 |
|
|
print(f"\r๐ฅ Downloading: {progress:.1f}%", end="", flush=True) |
|
|
|
|
|
print() |
|
|
logger.info("โ
Model downloaded successfully") |
|
|
return True |
|
|
except Exception as e: |
|
|
logger.error(f"โ Failed to download model: {e}") |
|
|
return False |
|
|
|
|
|
def load_model(self): |
|
|
try: |
|
|
logger.info("๐ Loading ONNX model...") |
|
|
providers = ['CPUExecutionProvider'] |
|
|
|
|
|
|
|
|
try: |
|
|
available_providers = onnxruntime.get_available_providers() |
|
|
if 'CUDAExecutionProvider' in available_providers: |
|
|
providers.insert(0, 'CUDAExecutionProvider') |
|
|
logger.info("๐ CUDA provider available") |
|
|
else: |
|
|
logger.info("๐ป Using CPU provider") |
|
|
except: |
|
|
logger.info("๐ป Using CPU provider") |
|
|
|
|
|
self.session = onnxruntime.InferenceSession(MODEL_PATH, providers=providers) |
|
|
self.input_name = self.session.get_inputs()[0].name |
|
|
self.output_name = self.session.get_outputs()[0].name |
|
|
|
|
|
logger.info(f"โ
Model loaded successfully") |
|
|
logger.info(f"๐ Input: {self.input_name}, Output: {self.output_name}") |
|
|
return True |
|
|
except Exception as e: |
|
|
logger.error(f"โ Failed to load model: {e}") |
|
|
return False |
|
|
|
|
|
def preprocess_image(self, image): |
|
|
original_size = image.size |
|
|
image = image.convert('RGB') |
|
|
image = image.resize((1024, 1024), Image.LANCZOS) |
|
|
image_array = np.array(image).astype(np.float32) |
|
|
image_array = image_array / 255.0 |
|
|
image_array = np.transpose(image_array, (2, 0, 1)) |
|
|
image_array = np.expand_dims(image_array, axis=0) |
|
|
return image_array, original_size |
|
|
|
|
|
def postprocess_mask(self, mask, original_size): |
|
|
mask = mask.squeeze() |
|
|
mask = (mask * 255).astype(np.uint8) |
|
|
mask = Image.fromarray(mask, mode='L') |
|
|
mask = mask.resize(original_size, Image.LANCZOS) |
|
|
return mask |
|
|
|
|
|
def remove_background(self, image_path): |
|
|
try: |
|
|
logger.info(f"๐ผ๏ธ Processing image: {image_path}") |
|
|
image = Image.open(image_path) |
|
|
preprocessed, original_size = self.preprocess_image(image) |
|
|
|
|
|
logger.info("๐ค Running AI inference...") |
|
|
mask = self.session.run([self.output_name], {self.input_name: preprocessed})[0] |
|
|
mask = self.postprocess_mask(mask, original_size) |
|
|
|
|
|
logger.info("โ๏ธ Applying mask to remove background...") |
|
|
image = image.convert('RGBA') |
|
|
image.putalpha(mask) |
|
|
|
|
|
logger.info("โ
Background removed successfully") |
|
|
return image |
|
|
except Exception as e: |
|
|
logger.error(f"โ Failed to remove background: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
background_remover = BackgroundRemover() |
|
|
|
|
|
@app.route('/', methods=['GET']) |
|
|
def health_check(): |
|
|
"""ูุญุต ุตุญุฉ ุงูุณูุฑูุฑ""" |
|
|
return jsonify({ |
|
|
'status': 'โ
Server is running', |
|
|
'model_loaded': background_remover.session is not None, |
|
|
'endpoints': { |
|
|
'health_check': '/ (GET)', |
|
|
'remove_background': '/remove-background (POST)' |
|
|
}, |
|
|
'info': 'Background Remover API - Ready to use!' |
|
|
}) |
|
|
|
|
|
@app.route('/remove-background', methods=['POST']) |
|
|
def remove_background_endpoint(): |
|
|
"""ุฅุฒุงูุฉ ุฎูููุฉ ุงูุตูุฑุฉ""" |
|
|
try: |
|
|
|
|
|
if 'file' not in request.files: |
|
|
return jsonify({'error': 'No file uploaded'}), 400 |
|
|
|
|
|
file = request.files['file'] |
|
|
if file.filename == '': |
|
|
return jsonify({'error': 'No file selected'}), 400 |
|
|
|
|
|
|
|
|
if not file.content_type.startswith('image/'): |
|
|
return jsonify({'error': 'File must be an image'}), 400 |
|
|
|
|
|
|
|
|
filename = secure_filename(file.filename) |
|
|
filepath = os.path.join(UPLOAD_FOLDER, filename) |
|
|
file.save(filepath) |
|
|
|
|
|
logger.info(f"๐ File saved: {filepath}") |
|
|
|
|
|
|
|
|
result = background_remover.remove_background(filepath) |
|
|
|
|
|
if result is None: |
|
|
os.remove(filepath) |
|
|
return jsonify({'error': 'Failed to process image'}), 500 |
|
|
|
|
|
|
|
|
result_filename = f"result_{filename.rsplit('.', 1)[0]}.png" |
|
|
result_path = os.path.join(RESULT_FOLDER, result_filename) |
|
|
result.save(result_path, 'PNG') |
|
|
|
|
|
|
|
|
os.remove(filepath) |
|
|
|
|
|
logger.info(f"โ
Result saved: {result_path}") |
|
|
|
|
|
return send_file(result_path, mimetype='image/png', as_attachment=False) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"โ Error in remove_background_endpoint: {e}") |
|
|
return jsonify({'error': 'Internal server error'}), 500 |
|
|
|
|
|
if __name__ == '__main__': |
|
|
print("=" * 60) |
|
|
print("๐ฏ Background Remover API Server") |
|
|
print("=" * 60) |
|
|
|
|
|
logger.info("๐ Starting Background Remover Server...") |
|
|
|
|
|
|
|
|
if not background_remover.download_model(): |
|
|
logger.error("โ Failed to download model. Exiting...") |
|
|
exit(1) |
|
|
|
|
|
if not background_remover.load_model(): |
|
|
logger.error("โ Failed to load model. Exiting...") |
|
|
exit(1) |
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
if not background_remover.download_model(): |
|
|
exit(1) |
|
|
if not background_remover.load_model(): |
|
|
exit(1) |
|
|
|
|
|
|
|
|
port = int(os.environ.get('PORT', 7860)) |
|
|
print(f'๐ก Running on port: {port}') |
|
|
print(f'๐ Local URL: http://localhost:{port}') |
|
|
print(f'๐ Health check: http://localhost:{port}/') |
|
|
print(f'๐ฑ API endpoint: http://localhost:{port}/remove-background') |
|
|
print('=' * 60) |
|
|
|
|
|
app.run(host='0.0.0.0', port=port, debug=False) |