| |
| """ |
| GeeTest4 Solver - Pure FastAPI Version v1.2.0 |
| - Updated with optimal thresholds from testing. |
| - Fixed YOLOv8 ONNX output transpose bug. |
| """ |
|
|
| import os |
| import base64 |
| import io |
| import logging |
| import random |
| import yaml |
| from typing import Tuple, List, Dict, Union |
| from fastapi import FastAPI, HTTPException |
| from fastapi.responses import JSONResponse |
| from pydantic import BaseModel |
| import uvicorn |
| import numpy as np |
| from PIL import Image |
| import cv2 |
|
|
| try: |
| import onnxruntime as ort |
| ONNX_AVAILABLE = True |
| except ImportError: |
| ONNX_AVAILABLE = False |
|
|
|
|
| |
| |
| |
| SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) |
| MODEL_PATH = os.path.join(SCRIPT_DIR, "best_model.onnx") |
| YAML_PATH = os.path.join(SCRIPT_DIR, "data.yaml") |
| API_KEY = os.getenv("GEETEST4_API_KEY", "ADMINCKV005") |
|
|
| |
| CONFIDENCE_THRESHOLD = 0.70 |
| NMS_IOU_THRESHOLD = 0.0 |
|
|
| |
| model_session = None |
| CLASS_NAMES = [] |
|
|
| |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
| logger = logging.getLogger(__name__) |
|
|
| |
| class PredictRequest(BaseModel): |
| data: List[str] |
| BoundingBox = Dict[str, int] |
|
|
| def verify_api_key(api_key: str) -> bool: |
| return api_key == API_KEY |
|
|
| def preprocess_for_onnx(image: np.ndarray, input_size: int = 640): |
| height, width, _ = image.shape |
| r = min(input_size / width, input_size / height) |
| new_width, new_height = int(width * r), int(height * r) |
| resized_image = cv2.resize(image, (new_width, new_height), interpolation=cv2.INTER_LINEAR) |
| d_w, d_h = (input_size - new_width) // 2, (input_size - new_height) // 2 |
| padded_image = np.full((input_size, input_size, 3), 114, dtype=np.uint8) |
| padded_image[d_h:new_height + d_h, d_w:new_width + d_w, :] = resized_image |
| input_tensor = (padded_image.astype(np.float32) / 255.0).transpose(2, 0, 1) |
| return np.expand_dims(input_tensor, axis=0), r, d_w, d_h |
|
|
| def process_image_onnx(image_np: np.ndarray) -> Tuple[int, float, Union[BoundingBox, None]]: |
| """Memproses gambar dengan model ONNX.""" |
| try: |
| input_tensor, ratio, dw, dh = preprocess_for_onnx(image_np) |
| |
| |
| outputs = model_session.run(None, {model_session.get_inputs()[0].name: input_tensor}) |
| |
| |
| |
| |
| raw_predictions = outputs[0][0].T |
|
|
| |
| scores = raw_predictions[:, 4] |
| valid_indices = scores > CONFIDENCE_THRESHOLD |
| |
| boxes_raw = raw_predictions[valid_indices, :4] |
| scores = scores[valid_indices] |
| |
| if len(boxes_raw) == 0: |
| return 0, 0.0, None |
| |
| |
| x1 = boxes_raw[:, 0] - boxes_raw[:, 2] / 2 |
| y1 = boxes_raw[:, 1] - boxes_raw[:, 3] / 2 |
| x2 = boxes_raw[:, 0] + boxes_raw[:, 2] / 2 |
| y2 = boxes_raw[:, 1] + boxes_raw[:, 3] / 2 |
| boxes_for_nms = np.column_stack((x1, y1, x2, y2)).astype(np.float32) |
|
|
| |
| indices = cv2.dnn.NMSBoxes(boxes_for_nms, scores, CONFIDENCE_THRESHOLD, NMS_IOU_THRESHOLD) |
| |
| if len(indices) == 0: |
| return 0, 0.0, None |
|
|
| |
| indices = indices.flatten() |
| best_idx = indices[np.argmax(scores[indices])] |
| |
| best_box_coords = boxes_for_nms[best_idx] |
| best_score = scores[best_idx] |
| |
| |
| x1_orig = int((best_box_coords[0] - dw) / ratio) |
| y1_orig = int((best_box_coords[1] - dh) / ratio) |
| x2_orig = int((best_box_coords[2] - dw) / ratio) |
| y2_orig = int((best_box_coords[3] - dh) / ratio) |
| |
| center_x = (x1_orig + x2_orig) // 2 |
| bbox = {'x': x1_orig, 'y': y1_orig, 'w': x2_orig - x1_orig, 'h': y2_orig - y1_orig} |
| |
| return center_x, float(best_score), bbox |
| |
| except Exception as e: |
| logger.error(f"Error dalam pemrosesan ONNX: {e}") |
| return 0, 0.0, None |
|
|
| def load_model(): |
| """Memuat model ONNX.""" |
| global model_session, CLASS_NAMES |
| try: |
| if os.path.exists(YAML_PATH): |
| with open(YAML_PATH, "r", encoding="utf-8") as f: |
| CLASS_NAMES = yaml.safe_load(f).get('names', ['Target']) |
| else: |
| CLASS_NAMES = ['Target'] |
| |
| if ONNX_AVAILABLE and os.path.exists(MODEL_PATH): |
| model_session = ort.InferenceSession(MODEL_PATH, providers=['CPUExecutionProvider']) |
| logger.info("β
Model ONNX berhasil dimuat.") |
| else: |
| model_session = None |
| logger.critical("β GAGAL: Model ONNX tidak ditemukan atau onnxruntime tidak terinstal.") |
| |
| except Exception as e: |
| logger.error(f"FATAL: Gagal memuat model: {e}") |
| model_session = None |
|
|
| def base64_to_numpy(base64_string: str) -> np.ndarray: |
| try: |
| if base64_string.startswith('data:image'): |
| base64_string = base64_string.split(',')[1] |
| image_data = base64.b64decode(base64_string) |
| return np.array(Image.open(io.BytesIO(image_data)).convert('RGB')) |
| except Exception as e: |
| logger.error(f"Error saat konversi base64: {e}") |
| raise ValueError("Data gambar tidak valid") |
|
|
| def solve_geetest4_api(background_image: str, api_key: str): |
| """Fungsi endpoint API utama.""" |
| try: |
| if not verify_api_key(api_key): |
| return ["β Kunci API tidak valid", 0, 0.0, None] |
| |
| image_np = base64_to_numpy(background_image) |
| |
| if model_session is not None: |
| target_x, confidence, bbox = process_image_onnx(image_np) |
| model_type = "ONNX" |
| else: |
| return ["β Model tidak dimuat", 0, 0.0, None] |
| |
| if target_x > 0 and bbox is not None: |
| return [f"β
Sukses! Target di x={target_x} (Model: {model_type})", target_x, confidence, bbox] |
| else: |
| return [f"β οΈ Tidak ada target terdeteksi dengan threshold saat ini.", 0, 0.0, None] |
| |
| except Exception as e: |
| logger.error(f"Error API: {e}") |
| return [f"β οΈ Error server, menggunakan posisi fallback", 200, 0.6, None] |
|
|
| |
| load_model() |
|
|
| |
| app = FastAPI(title="GeeTest4 Solver API", version="1.2.0", docs_url=None, redoc_url=None) |
|
|
| @app.get("/") |
| async def root(): |
| raise HTTPException(status_code=404, detail="Not Found") |
|
|
| @app.post("/api/predict") |
| async def predict(request: PredictRequest): |
| """Endpoint utama untuk prediksi.""" |
| try: |
| if len(request.data) < 2: |
| raise HTTPException(status_code=400, detail="Format request tidak valid") |
| |
| background_image, api_key = request.data[0], request.data[1] |
| result = solve_geetest4_api(background_image, api_key) |
| return {"data": result} |
| |
| except Exception as e: |
| logger.error(f"API Error: {e}") |
| return JSONResponse(status_code=500, content={"data": ["β Error internal server", 0, 0.0, None]}) |
|
|
| @app.get("/health") |
| async def health_check(): |
| return {"status": "healthy", "model_loaded": model_session is not None} |
|
|
| |
| if __name__ == "__main__": |
| logger.info("π Memulai Server FastAPI GeeTest4...") |
| uvicorn.run( |
| "__main__:app", |
| host="0.0.0.0", |
| port=int(os.getenv("PORT", 7860)), |
| log_level="info" |
| ) |