diff --git "a/api/main.py" "b/api/main.py" --- "a/api/main.py" +++ "b/api/main.py" @@ -1,3395 +1,3 @@ -# import os -# import uuid -# import shutil -# import re -# from datetime import datetime, timedelta, date -# from io import BytesIO -# from typing import Dict, List, Optional,Any -# import numpy as np -# from fastapi import ( -# FastAPI, -# UploadFile, -# File, -# HTTPException, -# Depends, -# Header, -# Request, -# Form, -# ) -# from fastapi.responses import FileResponse, JSONResponse -# from pydantic import BaseModel -# from PIL import Image, UnidentifiedImageError -# import cv2 -# import logging -# from gridfs import GridFS -# from gridfs.errors import NoFile - -# from bson import ObjectId -# from pymongo import MongoClient -# import time - -# # Load environment variables from .env if present -# try: -# from dotenv import load_dotenv - -# load_dotenv() -# except Exception: -# pass - -# logging.basicConfig(level=logging.INFO) -# log = logging.getLogger("api") - -# from src.core import process_inpaint - -# # Directories (use writable space on HF Spaces) -# BASE_DIR = os.environ.get("DATA_DIR", "/data") -# if not os.path.isdir(BASE_DIR): -# # Fallback to /tmp if /data not available -# BASE_DIR = "/tmp" - -# UPLOAD_DIR = os.path.join(BASE_DIR, "uploads") -# OUTPUT_DIR = os.path.join(BASE_DIR, "outputs") - -# os.makedirs(UPLOAD_DIR, exist_ok=True) -# os.makedirs(OUTPUT_DIR, exist_ok=True) - -# # Optional Bearer token: set env API_TOKEN to require auth; if not set, endpoints are open -# ENV_TOKEN = os.environ.get("API_TOKEN") - -# app = FastAPI(title="Photo Object Removal API", version="1.0.0") - -# # In-memory stores -# file_store: Dict[str, Dict[str, str]] = {} -# logs: List[Dict[str, str]] = [] - -# MONGO_URI = os.environ.get("MONGO_URI") or os.environ.get("MONGODB_URI") -# mongo_client = None -# mongo_db = None -# mongo_logs = None -# grid_fs = None - -# def _cleanup_gridfs_collections(): -# """Remove GridFS image_files and image_chunks collections.""" -# if mongo_db is not None: -# try: -# # Drop the GridFS collections -# if "image_files" in mongo_db.list_collection_names(): -# mongo_db["image_files"].drop() -# log.info("Dropped image_files collection") -# if "image_chunks" in mongo_db.list_collection_names(): -# mongo_db["image_chunks"].drop() -# log.info("Dropped image_chunks collection") -# # Also drop default GridFS collections (fs.files and fs.chunks) -# if "fs.files" in mongo_db.list_collection_names(): -# mongo_db["fs.files"].drop() -# log.info("Dropped fs.files collection") -# if "fs.chunks" in mongo_db.list_collection_names(): -# mongo_db["fs.chunks"].drop() -# log.info("Dropped fs.chunks collection") -# except Exception as err: -# log.error("Failed to cleanup GridFS collections: %s", err, exc_info=True) - - -# # Track last cleanup time to avoid running every request -# _last_cleanup_time = None - - -# def _cleanup_old_gridfs_files(minutes: int = 10): -# """ -# Cleanup old files from GridFS that are older than specified minutes. -# Runs at most once per interval (default 10 minutes) to avoid repeating on -# every request. -# """ -# global _last_cleanup_time - -# if mongo_db is None or grid_fs is None: -# return - -# now = datetime.utcnow() - -# # Only run cleanup once per interval -# if _last_cleanup_time is not None: -# time_diff = (now - _last_cleanup_time).total_seconds() -# if time_diff < (minutes * 60): # Less than interval elapsed -# return - -# try: -# # Set the current cleanup time -# _last_cleanup_time = now -# cutoff_time = now - timedelta(minutes=minutes) - -# # Find and delete old files from GridFS -# files_collection = mongo_db["fs.files"] -# chunks_collection = mongo_db["fs.chunks"] - -# # Query for old files (uploaded_at or uploadDate before cutoff) -# old_files = files_collection.find({ -# "$or": [ -# {"uploadDate": {"$lt": cutoff_time}}, -# {"metadata.uploaded_at": {"$lt": cutoff_time}} -# ] -# }) - -# deleted_count = 0 -# for file_doc in old_files: -# file_id = file_doc["_id"] -# try: -# # Delete file from GridFS (also deletes associated chunks) -# grid_fs.delete(file_id) -# deleted_count += 1 -# except Exception as err: -# log.warning("Failed to delete GridFS file %s: %s", file_id, err) - -# if deleted_count > 0: -# log.info("Cleanup: Deleted %d old GridFS files (older than %d minutes)", deleted_count, minutes) - -# except Exception as err: -# log.error("Failed to cleanup old GridFS files: %s", err, exc_info=True) - - -# if MONGO_URI: -# try: -# mongo_client = MongoClient(MONGO_URI) -# # Try to get database from connection string first -# try: -# mongo_db = mongo_client.get_default_database() -# log.info("Using database from connection string: %s", mongo_db.name) -# except Exception as db_err: -# mongo_db = None -# log.warning("Could not extract database from connection string: %s", db_err) - -# # Fallback to 'object_remover' if no database in connection string -# if mongo_db is None: -# mongo_db = mongo_client["object_remover"] -# log.info("Using default database: object_remover") - -# mongo_logs = mongo_db["api_logs"] -# grid_fs = GridFS(mongo_db) -# log.info("MongoDB connection initialized successfully - Database: %s, Collection: %s", mongo_db.name, mongo_logs.name) -# except Exception as err: -# log.error("Failed to initialize MongoDB connection: %s", err, exc_info=True) -# log.warning("GridFS operations will be disabled. Set MONGO_URI or MONGODB_URI environment variable.") -# else: -# log.warning("MONGO_URI not set. GridFS operations will be disabled. Upload endpoints will not work.") - -# ADMIN_MONGO_URI = os.environ.get("MONGODB_ADMIN") -# DEFAULT_CATEGORY_ID = "69368f722e46bd68ae188984" -# admin_media_clicks = None - -# # Collage-maker MongoDB configuration -# COLLAGE_MAKER_MONGO_URI = os.environ.get("MONGODB_COLLAGE_MAKER") -# COLLAGE_MAKER_DB_NAME = os.environ.get("MONGODB_COLLAGE_MAKER_DB_NAME", "collage-maker") -# COLLAGE_MAKER_ADMIN_DB_NAME = os.environ.get("MONGODB_COLLAGE_MAKER_ADMIN_DB_NAME", "adminPanel") -# collage_maker_client = None -# collage_maker_db = None -# collage_maker_admin_db = None -# collage_maker_media_clicks = None -# collage_maker_categories = None - -# # AI-Enhancer MongoDB configuration -# AI_ENHANCER_MONGO_URI = os.environ.get("MONGODB_AI_ENHANCER") -# AI_ENHANCER_DB_NAME = os.environ.get("MONGODB_AI_ENHANCER_DB_NAME", "ai-enhancer") -# AI_ENHANCER_ADMIN_DB_NAME = os.environ.get("MONGODB_AI_ENHANCER_ADMIN_DB_NAME", "test") -# ai_enhancer_client = None -# ai_enhancer_db = None -# ai_enhancer_admin_db = None -# ai_enhancer_media_clicks = None - - -# def get_collage_maker_client() -> Optional[MongoClient]: -# """Get collage-maker MongoDB client.""" -# global collage_maker_client -# if collage_maker_client is None and COLLAGE_MAKER_MONGO_URI: -# try: -# collage_maker_client = MongoClient(COLLAGE_MAKER_MONGO_URI) -# log.info("Collage-maker MongoDB client initialized") -# except Exception as err: -# log.error("Failed to initialize collage-maker MongoDB client: %s", err) -# collage_maker_client = None -# return collage_maker_client - - -# def get_collage_maker_database() -> Optional[Any]: -# """Get collage-maker database instance.""" -# global collage_maker_db -# client = get_collage_maker_client() -# if client is None: -# return None -# if collage_maker_db is None: -# try: -# collage_maker_db = client[COLLAGE_MAKER_DB_NAME] -# log.info("Collage-maker database initialized: %s", COLLAGE_MAKER_DB_NAME) -# except Exception as err: -# log.error("Failed to get collage-maker database: %s", err) -# collage_maker_db = None -# return collage_maker_db - - -# def _init_collage_maker_mongo() -> None: -# """Initialize collage-maker MongoDB connections.""" -# global collage_maker_admin_db, collage_maker_media_clicks, collage_maker_categories -# client = get_collage_maker_client() -# if client is None: -# log.info("Collage-maker Mongo URI not provided; collage-maker features disabled") -# return -# try: -# collage_maker_admin_db = client[COLLAGE_MAKER_ADMIN_DB_NAME] -# collage_maker_media_clicks = collage_maker_admin_db["media_clicks"] -# collage_maker_categories = collage_maker_admin_db["categories"] -# log.info( -# "Collage-maker admin initialized: db=%s, media_clicks=%s, categories=%s", -# COLLAGE_MAKER_ADMIN_DB_NAME, -# collage_maker_media_clicks.name, -# collage_maker_categories.name, -# ) -# except Exception as err: -# log.error("Failed to init collage-maker admin Mongo: %s", err) -# collage_maker_admin_db = None -# collage_maker_media_clicks = None -# collage_maker_categories = None - - -# _init_collage_maker_mongo() - - -# def get_ai_enhancer_client() -> Optional[MongoClient]: -# """Get AI-Enhancer MongoDB client.""" -# global ai_enhancer_client -# if ai_enhancer_client is None and AI_ENHANCER_MONGO_URI: -# try: -# ai_enhancer_client = MongoClient(AI_ENHANCER_MONGO_URI) -# log.info("AI-Enhancer MongoDB client initialized") -# except Exception as err: -# log.error("Failed to initialize AI-Enhancer MongoDB client: %s", err) -# ai_enhancer_client = None -# return ai_enhancer_client - - -# def get_ai_enhancer_database() -> Optional[Any]: -# """Get AI-Enhancer database instance.""" -# global ai_enhancer_db -# client = get_ai_enhancer_client() -# if client is None: -# return None -# if ai_enhancer_db is None: -# try: -# ai_enhancer_db = client[AI_ENHANCER_DB_NAME] -# log.info("AI-Enhancer database initialized: %s", AI_ENHANCER_DB_NAME) -# except Exception as err: -# log.error("Failed to get AI-Enhancer database: %s", err) -# ai_enhancer_db = None -# return ai_enhancer_db - - -# def _init_ai_enhancer_mongo() -> None: -# """Initialize AI-Enhancer MongoDB connections.""" -# global ai_enhancer_admin_db, ai_enhancer_media_clicks -# client = get_ai_enhancer_client() -# if client is None: -# log.info("AI-Enhancer Mongo URI not provided; AI-Enhancer features disabled") -# return -# try: -# ai_enhancer_admin_db = client[AI_ENHANCER_ADMIN_DB_NAME] -# ai_enhancer_media_clicks = ai_enhancer_admin_db["media_clicks"] -# log.info( -# "AI-Enhancer admin initialized: db=%s, media_clicks=%s", -# AI_ENHANCER_ADMIN_DB_NAME, -# ai_enhancer_media_clicks.name, -# ) -# except Exception as err: -# log.error("Failed to init AI-Enhancer admin Mongo: %s", err) -# ai_enhancer_admin_db = None -# ai_enhancer_media_clicks = None - - -# _init_ai_enhancer_mongo() - - -# def get_category_id_from_collage_maker() -> Optional[str]: -# """Query category ID from collage-maker categories collection.""" -# if collage_maker_categories is None: -# log.warning("Collage-maker categories collection not initialized") -# return None -# try: -# # Query the categories collection - you may need to adjust the query based on your schema -# # This assumes there's a default category or we get the first one -# category_doc = collage_maker_categories.find_one() -# if category_doc: -# category_id = str(category_doc.get("_id", "")) -# log.info("Found category ID from collage-maker: %s", category_id) -# return category_id -# else: -# log.warning("No categories found in collage-maker collection") -# return None -# except Exception as err: -# log.error("Failed to query collage-maker categories: %s", err) -# return None - - -# def _init_admin_mongo() -> None: -# global admin_media_clicks -# if not ADMIN_MONGO_URI: -# log.info("Admin Mongo URI not provided; media click logging disabled") -# return -# try: -# admin_client = MongoClient(ADMIN_MONGO_URI) -# # get_default_database() extracts database from connection string (e.g., /adminPanel) -# try: -# admin_db = admin_client.get_default_database() -# except Exception as db_err: -# admin_db = None -# log.warning("Admin Mongo URI has no default DB; error=%s", db_err) -# if admin_db is None: -# # Fallback to provided default for this app -# admin_db = admin_client["object_remover"] -# log.warning("No database in connection string, defaulting to 'object_remover'") - -# admin_media_clicks = admin_db["media_clicks"] -# log.info( -# "Admin media click logging initialized: db=%s collection=%s", -# admin_db.name, -# admin_media_clicks.name, -# ) -# try: -# admin_media_clicks.drop_index("user_id_1_header_1_media_id_1") -# log.info("Dropped legacy index user_id_1_header_1_media_id_1") -# except Exception as idx_err: -# # Index drop failure is non-critical (often permission issue) -# if "Unauthorized" not in str(idx_err): -# log.info("Skipping legacy index drop: %s", idx_err) -# except Exception as err: -# log.error("Failed to init admin Mongo client: %s", err) -# admin_media_clicks = None - - -# _init_admin_mongo() - - -# def _admin_logging_status() -> Dict[str, object]: -# if admin_media_clicks is None: -# return { -# "enabled": False, -# "db": None, -# "collection": None, -# } -# return { -# "enabled": True, -# "db": admin_media_clicks.database.name, -# "collection": admin_media_clicks.name, -# } - - -# def _save_upload_to_gridfs(upload: UploadFile, file_type: str) -> str: -# """Store an uploaded file into GridFS and return its ObjectId string.""" -# if grid_fs is None: -# raise HTTPException( -# status_code=503, -# detail="MongoDB/GridFS not configured. Set MONGO_URI or MONGODB_URI environment variable." -# ) -# data = upload.file.read() -# if not data: -# raise HTTPException(status_code=400, detail=f"{file_type} file is empty") -# oid = grid_fs.put( -# data, -# filename=upload.filename or f"{file_type}.bin", -# contentType=upload.content_type, -# metadata={"type": file_type}, -# ) -# return str(oid) - - -# def _read_gridfs_bytes(file_id: str, expected_type: str) -> bytes: -# """Fetch raw bytes from GridFS and validate the stored type metadata.""" -# if grid_fs is None: -# raise HTTPException( -# status_code=503, -# detail="MongoDB/GridFS not configured. Set MONGO_URI or MONGODB_URI environment variable." -# ) -# try: -# oid = ObjectId(file_id) -# except Exception: -# raise HTTPException(status_code=404, detail=f"{expected_type}_id invalid") - -# try: -# grid_out = grid_fs.get(oid) -# except NoFile: -# raise HTTPException(status_code=404, detail=f"{expected_type}_id not found") - -# meta = grid_out.metadata or {} -# stored_type = meta.get("type") -# if stored_type and stored_type != expected_type: -# raise HTTPException(status_code=404, detail=f"{expected_type}_id not found") - -# return grid_out.read() - - -# def _load_rgba_image_from_gridfs(file_id: str, expected_type: str) -> Image.Image: -# """Load an image from GridFS and convert to RGBA.""" -# data = _read_gridfs_bytes(file_id, expected_type) -# try: -# img = Image.open(BytesIO(data)) -# except UnidentifiedImageError: -# raise HTTPException(status_code=422, detail=f"{expected_type} is not a valid image") -# return img.convert("RGBA") - - -# def _build_ai_edit_daily_count( -# existing: Optional[List[Dict[str, object]]], -# today: date, -# ) -> List[Dict[str, object]]: -# """ -# Build / extend the ai_edit_daily_count array with the following rules: - -# - Case A (no existing data): return [{date: today, count: 1}] -# - Case B (today already recorded): return list unchanged -# - Case C (gap in days): fill missing days with count=0 and append today with count=1 - -# Additionally, the returned list is capped to the most recent 32 entries. - -# The stored "date" value is a midnight UTC (naive UTC) datetime for the given day. -# """ - -# def _to_date_only(value: object) -> date: -# if isinstance(value, datetime): -# return value.date() -# if isinstance(value, date): -# return value -# # Fallback: try parsing ISO string "YYYY-MM-DD" or full datetime -# try: -# text = str(value) -# if len(text) == 10: -# return datetime.strptime(text, "%Y-%m-%d").date() -# return datetime.fromisoformat(text).date() -# except Exception: -# # If parsing fails, just treat as today to avoid crashing -# return today - -# # Case A: first ever use (no array yet) -# if not existing: -# return [ -# { -# "date": datetime(today.year, today.month, today.day), -# "count": 1, -# } -# ] - -# # Work on a shallow copy so we don't mutate original in-place -# result: List[Dict[str, object]] = list(existing) - -# last_entry = result[-1] if result else None -# if not last_entry or "date" not in last_entry: -# # If structure is unexpected, re-initialize safely -# return [ -# { -# "date": datetime(today.year, today.month, today.day), -# "count": 1, -# } -# ] - -# last_date = _to_date_only(last_entry["date"]) - -# # If somehow the last stored date is in the future, do nothing to avoid corrupting history -# if last_date > today: -# return result - -# # Case B: today's date already present as the last entry → unchanged -# if last_date == today: -# return result - -# # Case C: there is a gap, fill missing days with count=0 and append today with count=1 -# cursor = last_date + timedelta(days=1) -# while cursor < today: -# result.append( -# { -# "date": datetime(cursor.year, cursor.month, cursor.day), -# "count": 0, -# } -# ) -# cursor += timedelta(days=1) - -# # Finally add today's presence indicator -# result.append( -# { -# "date": datetime(today.year, today.month, today.day), -# "count": 1, -# } -# ) - -# # Sort by date ascending (older dates first) to guarantee stable ordering: -# # [oldest, ..., newest] -# try: -# result.sort(key=lambda entry: _to_date_only(entry.get("date"))) -# except Exception: -# # If anything goes wrong during sort, fall back to current ordering -# pass - -# # Enforce 32-entry limit (keep the most recent 32 days) -# if len(result) > 32: -# result = result[-32:] - -# return result - -# def bearer_auth(authorization: Optional[str] = Header(default=None)) -> None: -# if not ENV_TOKEN: -# return -# if authorization is None or not authorization.lower().startswith("bearer "): -# raise HTTPException(status_code=401, detail="Unauthorized") -# token = authorization.split(" ", 1)[1] -# if token != ENV_TOKEN: -# raise HTTPException(status_code=403, detail="Forbidden") - - -# class InpaintRequest(BaseModel): -# image_id: str -# mask_id: str -# invert_mask: bool = True # True => selected/painted area is removed -# passthrough: bool = False # If True, return the original image unchanged -# prompt: Optional[str] = None # Optional: describe what to remove -# user_id: Optional[str] = None -# category_id: Optional[str] = None -# appname: Optional[str] = None # Optional: app name (e.g., "collage-maker") - - -# class SimpleRemoveRequest(BaseModel): -# image_id: str # Image with pink/magenta segments to remove - - -# def _coerce_object_id(value: Optional[str]) -> ObjectId: -# if value is None: -# return ObjectId() -# value_str = str(value).strip() -# if re.fullmatch(r"[0-9a-fA-F]{24}", value_str): -# return ObjectId(value_str) -# if value_str.isdigit(): -# hex_str = format(int(value_str), "x") -# if len(hex_str) > 24: -# hex_str = hex_str[-24:] -# hex_str = hex_str.rjust(24, "0") -# return ObjectId(hex_str) -# return ObjectId() - - -# def _coerce_category_id(category_id: Optional[str]) -> ObjectId: -# raw = category_id or DEFAULT_CATEGORY_ID -# raw_str = str(raw).strip() -# if re.fullmatch(r"[0-9a-fA-F]{24}", raw_str): -# return ObjectId(raw_str) -# return _coerce_object_id(raw_str) - - -# def log_media_click(user_id: Optional[str], category_id: Optional[str], appname: Optional[str] = None) -> None: -# """Log to admin media_clicks collection only if user_id is provided. - -# If appname='collage-maker', logs to collage-maker MongoDB instead of regular admin MongoDB. -# If appname='AI-Enhancer' (case-insensitive), logs to AI-Enhancer MongoDB. -# """ -# # Determine which media_clicks collection to use -# target_media_clicks = None -# appname_lower = appname.lower() if appname else None - -# if appname_lower == "collage-maker": -# target_media_clicks = collage_maker_media_clicks -# if target_media_clicks is None: -# log.warning("Collage-maker media_clicks not initialized, skipping log") -# return -# elif appname_lower == "ai-enhancer": -# target_media_clicks = ai_enhancer_media_clicks -# if target_media_clicks is None: -# log.warning("AI-Enhancer media_clicks not initialized, skipping log") -# return -# else: -# target_media_clicks = admin_media_clicks -# if target_media_clicks is None: -# return - -# # Only log if user_id is provided (not None/empty) -# if not user_id or not user_id.strip(): -# return -# try: -# user_obj = _coerce_object_id(user_id) -# category_obj = _coerce_category_id(category_id) -# now = datetime.utcnow() -# today = now.date() - -# doc = target_media_clicks.find_one({"userId": user_obj}) -# if doc: -# existing_daily = doc.get("ai_edit_daily_count") -# updated_daily = _build_ai_edit_daily_count(existing_daily, today) -# categories = doc.get("categories") or [] -# if any(cat.get("categoryId") == category_obj for cat in categories): -# # Category exists: increment click_count and ai_edit_complete, update dates -# target_media_clicks.update_one( -# {"_id": doc["_id"], "categories.categoryId": category_obj}, -# { -# "$inc": { -# "categories.$.click_count": 1, -# "ai_edit_complete": 1, # $inc handles missing fields (backward compatible) -# }, -# "$set": { -# "categories.$.lastClickedAt": now, -# "updatedAt": now, -# "ai_edit_last_date": now, -# "ai_edit_daily_count": updated_daily, -# }, -# }, -# ) -# else: -# # New category to existing document: push category, increment ai_edit_complete -# target_media_clicks.update_one( -# {"_id": doc["_id"]}, -# { -# "$push": { -# "categories": { -# "categoryId": category_obj, -# "click_count": 1, -# "lastClickedAt": now, -# } -# }, -# "$inc": {"ai_edit_complete": 1}, # $inc handles missing fields -# "$set": { -# "updatedAt": now, -# "ai_edit_last_date": now, -# "ai_edit_daily_count": updated_daily, -# }, -# }, -# ) -# else: -# # New user: create document with default ai_edit_complete=0, then increment to 1 -# daily_for_new = _build_ai_edit_daily_count(None, today) -# target_media_clicks.update_one( -# {"userId": user_obj}, -# { -# "$setOnInsert": { -# "userId": user_obj, -# "categories": [ -# { -# "categoryId": category_obj, -# "click_count": 1, -# "lastClickedAt": now, -# } -# ], -# "createdAt": now, -# "ai_edit_daily_count": daily_for_new, -# }, -# "$inc": {"ai_edit_complete": 1}, # Increment to 1 on first use -# "$set": { -# "updatedAt": now, -# "ai_edit_last_date": now, -# }, -# }, -# upsert=True, -# ) -# except Exception as err: -# err_str = str(err) -# appname_lower = appname.lower() if appname else None -# if appname_lower == "collage-maker": -# target_name = "collage-maker" -# elif appname_lower == "ai-enhancer": -# target_name = "AI-Enhancer" -# else: -# target_name = "admin" -# if "Unauthorized" in err_str or "not authorized" in err_str.lower(): -# log.warning( -# "%s media click logging failed (permissions): user lacks read/write on db=%s collection=%s. " -# "Check MongoDB user permissions.", -# target_name, -# target_media_clicks.database.name, -# target_media_clicks.name, -# ) -# else: -# log.warning("%s media click logging failed: %s", target_name, err) - - -# @app.get("/") -# def root() -> Dict[str, Any]: -# return { -# "success": True, -# "message": "Object Remover API", -# "data": { -# "version": "1.0.0", -# "product_name": "Beauty Camera - GlowCam AI Studio", -# "released_by": "LogicGo Infotech" -# } -# } - - - -# @app.get("/health") -# def health() -> Dict[str, str]: -# return {"status": "healthy"} - - -# @app.get("/logging-status") -# def logging_status(_: None = Depends(bearer_auth)) -> Dict[str, object]: -# """Helper endpoint to verify admin media logging wiring (no secrets exposed).""" -# return _admin_logging_status() - - -# @app.get("/mongo-status") -# def mongo_status(_: None = Depends(bearer_auth)) -> Dict[str, object]: -# """Check MongoDB connection status and verify data storage.""" -# status = { -# "mongo_configured": MONGO_URI is not None, -# "mongo_connected": mongo_client is not None, -# "database": mongo_db.name if mongo_db else None, -# "collection": mongo_logs.name if mongo_logs else None, -# "admin_logging": _admin_logging_status(), -# } - -# # Try to count documents in api_logs collection -# if mongo_logs is not None: -# try: -# count = mongo_logs.count_documents({}) -# status["api_logs_count"] = count -# # Get latest 5 documents -# latest_docs = list(mongo_logs.find().sort("timestamp", -1).limit(5)) -# status["recent_logs"] = [] -# for doc in latest_docs: -# doc_dict = { -# "_id": str(doc.get("_id")), -# "output_id": doc.get("output_id"), -# "status": doc.get("status"), -# "timestamp": doc.get("timestamp").isoformat() if isinstance(doc.get("timestamp"), datetime) else str(doc.get("timestamp")), -# } -# if "input_image_id" in doc: -# doc_dict["input_image_id"] = doc.get("input_image_id") -# if "input_mask_id" in doc: -# doc_dict["input_mask_id"] = doc.get("input_mask_id") -# if "error" in doc: -# doc_dict["error"] = doc.get("error") -# status["recent_logs"].append(doc_dict) - -# # Get latest document for backward compatibility -# if latest_docs: -# latest = latest_docs[0] -# status["latest_log"] = { -# "_id": str(latest.get("_id")), -# "output_id": latest.get("output_id"), -# "status": latest.get("status"), -# "timestamp": latest.get("timestamp").isoformat() if isinstance(latest.get("timestamp"), datetime) else str(latest.get("timestamp")), -# } -# except Exception as err: -# status["api_logs_error"] = str(err) -# log.error("Error querying MongoDB: %s", err, exc_info=True) - -# return status - - -# @app.post("/cleanup-gridfs") -# def cleanup_gridfs_collections(_: None = Depends(bearer_auth)) -> Dict[str, str]: -# """Clean up and remove GridFS image_files and image_chunks collections.""" -# if mongo_db is None: -# raise HTTPException( -# status_code=503, -# detail="MongoDB not configured. Set MONGO_URI or MONGODB_URI environment variable." -# ) - -# _cleanup_gridfs_collections() -# return {"status": "success", "message": "GridFS image_files and image_chunks collections cleaned up"} - - -# @app.post("/upload-image") -# def upload_image(image: UploadFile = File(...), _: None = Depends(bearer_auth)) -> Dict[str, str]: -# file_id = _save_upload_to_gridfs(image, "image") -# logs.append({"id": file_id, "filename": image.filename, "type": "image", "timestamp": datetime.utcnow().isoformat()}) -# return {"id": file_id, "filename": image.filename} - - -# @app.post("/upload-mask") -# def upload_mask(mask: UploadFile = File(...), _: None = Depends(bearer_auth)) -> Dict[str, str]: -# file_id = _save_upload_to_gridfs(mask, "mask") -# logs.append({"id": file_id, "filename": mask.filename, "type": "mask", "timestamp": datetime.utcnow().isoformat()}) -# return {"id": file_id, "filename": mask.filename} - - -# def _compress_image(image_path: str, output_path: str, quality: int = 85) -> None: -# """ -# Compress an image to reduce file size. -# Converts to JPEG format with specified quality to achieve smaller file size. -# """ -# img = Image.open(image_path) -# # Convert RGBA to RGB if needed (JPEG doesn't support alpha) -# if img.mode == "RGBA": -# rgb_img = Image.new("RGB", img.size, (255, 255, 255)) -# rgb_img.paste(img, mask=img.split()[3]) # Use alpha channel as mask -# img = rgb_img -# elif img.mode != "RGB": -# img = img.convert("RGB") - -# # Save as JPEG with quality setting for compression -# img.save(output_path, "JPEG", quality=quality, optimize=True) - - -# def _load_rgba_mask_from_image(img: Image.Image) -> np.ndarray: -# """ -# Convert mask image to RGBA format (black/white mask). -# Standard convention: white (255) = area to remove, black (0) = area to keep -# Returns RGBA with white in RGB channels where removal is needed, alpha=255 -# """ -# if img.mode != "RGBA": -# # For RGB/Grayscale masks: white (value>128) = remove, black (value<=128) = keep -# gray = img.convert("L") -# arr = np.array(gray) -# # Create proper black/white mask: white pixels (>128) = remove, black (<=128) = keep -# mask_bw = np.where(arr > 128, 255, 0).astype(np.uint8) - -# rgba = np.zeros((img.height, img.width, 4), dtype=np.uint8) -# rgba[:, :, 0] = mask_bw # R -# rgba[:, :, 1] = mask_bw # G -# rgba[:, :, 2] = mask_bw # B -# rgba[:, :, 3] = 255 # Fully opaque -# log.info(f"Loaded {img.mode} mask: {int((mask_bw > 0).sum())} white pixels (to remove)") -# return rgba - -# # For RGBA: check if alpha channel is meaningful -# arr = np.array(img) -# alpha = arr[:, :, 3] -# rgb = arr[:, :, :3] - -# # If alpha is mostly opaque everywhere (mean > 200), treat RGB channels as mask values -# if alpha.mean() > 200: -# # Use RGB to determine mask: white/bright in RGB = remove -# gray = cv2.cvtColor(rgb, cv2.COLOR_RGB2GRAY) -# # Also detect magenta specifically -# magenta = np.all(rgb == [255, 0, 255], axis=2).astype(np.uint8) * 255 -# mask_bw = np.maximum(np.where(gray > 128, 255, 0).astype(np.uint8), magenta) - -# rgba = arr.copy() -# rgba[:, :, 0] = mask_bw # R -# rgba[:, :, 1] = mask_bw # G -# rgba[:, :, 2] = mask_bw # B -# rgba[:, :, 3] = 255 # Fully opaque -# log.info(f"Loaded RGBA mask (RGB-based): {int((mask_bw > 0).sum())} white pixels (to remove)") -# return rgba - -# # Alpha channel encodes the mask - convert to RGB-based -# # Transparent areas (alpha < 128) = remove, Opaque areas = keep -# mask_bw = np.where(alpha < 128, 255, 0).astype(np.uint8) -# rgba = arr.copy() -# rgba[:, :, 0] = mask_bw -# rgba[:, :, 1] = mask_bw -# rgba[:, :, 2] = mask_bw -# rgba[:, :, 3] = 255 -# log.info(f"Loaded RGBA mask (alpha-based): {int((mask_bw > 0).sum())} white pixels (to remove)") -# return rgba - -# @app.post("/inpaint") -# def inpaint(req: InpaintRequest, request: Request, _: None = Depends(bearer_auth)) -> Dict[str, str]: -# start_time = time.time() -# status = "success" -# error_msg = None -# output_name = None -# compressed_url = None - -# try: -# # Cleanup old GridFS files (runs at most once every 10 minutes) -# _cleanup_old_gridfs_files() - -# # Handle appname="collage-maker": get category_id from collage-maker if not provided -# category_id = req.category_id -# if req.appname == "collage-maker" and not category_id: -# category_id = get_category_id_from_collage_maker() -# if category_id: -# log.info("Using category_id from collage-maker: %s", category_id) - -# img_rgba = _load_rgba_image_from_gridfs(req.image_id, "image") -# mask_img = _load_rgba_image_from_gridfs(req.mask_id, "mask") -# mask_rgba = _load_rgba_mask_from_image(mask_img) - -# if req.passthrough: -# result = np.array(img_rgba.convert("RGB")) -# else: -# result = process_inpaint( -# np.array(img_rgba), -# mask_rgba, -# invert_mask=req.invert_mask, -# prompt=req.prompt, -# ) - -# output_name = f"output_{uuid.uuid4().hex}.png" -# output_path = os.path.join(OUTPUT_DIR, output_name) - -# Image.fromarray(result).save( -# output_path, "PNG", optimize=False, compress_level=1 -# ) - -# # Create compressed version -# compressed_name = f"compressed_{output_name.replace('.png', '.jpg')}" -# compressed_path = os.path.join(OUTPUT_DIR, compressed_name) -# try: -# _compress_image(output_path, compressed_path, quality=85) -# compressed_url = str(request.url_for("download_file", filename=compressed_name)) -# except Exception as compress_err: -# log.warning("Failed to create compressed image: %s", compress_err) -# compressed_url = None - -# log_media_click(req.user_id, category_id, req.appname) -# response = {"result": output_name} -# if compressed_url: -# response["Compressed_Image_URL"] = compressed_url -# return response - -# except Exception as e: -# status = "fail" -# error_msg = str(e) -# raise - -# finally: -# end_time = time.time() -# response_time_ms = (end_time - start_time) * 1000 - -# log_doc = { -# "input_image_id": req.image_id, -# "input_mask_id": req.mask_id, -# "output_id": output_name, -# "status": status, -# "timestamp": datetime.utcnow(), -# "ts": int(time.time()), -# "response_time_ms": response_time_ms -# } - -# # Store appname in api_logs if provided -# if req.appname: -# log_doc["appname"] = req.appname - -# if error_msg: -# log_doc["error"] = error_msg - -# if mongo_logs is not None: -# try: -# log.info("Inserting log to MongoDB - Database: %s, Collection: %s", mongo_logs.database.name, mongo_logs.name) -# log.debug("Log document: %s", log_doc) -# result = mongo_logs.insert_one(log_doc) -# log.info("Mongo log inserted successfully: _id=%s, output_id=%s, status=%s, db=%s, collection=%s", -# result.inserted_id, output_name, status, mongo_logs.database.name, mongo_logs.name) - -# # Verify the insert by reading it back -# try: -# verify_doc = mongo_logs.find_one({"_id": result.inserted_id}) -# if verify_doc: -# log.info("Verified: Document exists in MongoDB after insert") -# else: -# log.error("WARNING: Document not found after insert! _id=%s", result.inserted_id) -# except Exception as verify_err: -# log.warning("Could not verify insert: %s", verify_err) -# except Exception as mongo_err: -# log.error("Mongo log insert failed: %s, log_doc=%s", mongo_err, log_doc, exc_info=True) -# else: -# log.warning("MongoDB not configured, skipping log insert") - -# # @app.post("/inpaint") -# # def inpaint(req: InpaintRequest, _: None = Depends(bearer_auth)) -> Dict[str, str]: -# # if req.image_id not in file_store or file_store[req.image_id]["type"] != "image": -# # raise HTTPException(status_code=404, detail="image_id not found") -# # if req.mask_id not in file_store or file_store[req.mask_id]["type"] != "mask": -# # raise HTTPException(status_code=404, detail="mask_id not found") - -# # img_rgba = _load_rgba_image(file_store[req.image_id]["path"]) -# # mask_img = Image.open(file_store[req.mask_id]["path"]) # may be RGB/gray/RGBA -# # mask_rgba = _load_rgba_mask_from_image(mask_img) - -# # # Debug: check mask before processing -# # white_pixels = int((mask_rgba[:,:,0] > 128).sum()) -# # log.info(f"Inpaint request: mask has {white_pixels} white pixels, invert_mask={req.invert_mask}") - -# # if req.passthrough: -# # result = np.array(img_rgba.convert("RGB")) -# # else: -# # result = process_inpaint(np.array(img_rgba), mask_rgba, invert_mask=req.invert_mask) -# # result_name = f"output_{uuid.uuid4().hex}.png" -# # result_path = os.path.join(OUTPUT_DIR, result_name) -# # Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) - -# # logs.append({"result": result_name, "timestamp": datetime.utcnow().isoformat()}) -# # return {"result": result_name} - - -# @app.post("/inpaint-url") -# def inpaint_url(req: InpaintRequest, request: Request, _: None = Depends(bearer_auth)) -> Dict[str, str]: -# """Same as /inpaint but returns a JSON with a public download URL instead of image bytes.""" -# start_time = time.time() -# status = "success" -# error_msg = None -# result_name = None - -# try: -# # Cleanup old GridFS files (runs at most once every 10 minutes) -# _cleanup_old_gridfs_files() - -# # Handle appname="collage-maker": get category_id from collage-maker if not provided -# category_id = req.category_id -# if req.appname == "collage-maker" and not category_id: -# category_id = get_category_id_from_collage_maker() -# if category_id: -# log.info("Using category_id from collage-maker: %s", category_id) - -# img_rgba = _load_rgba_image_from_gridfs(req.image_id, "image") -# mask_img = _load_rgba_image_from_gridfs(req.mask_id, "mask") # may be RGB/gray/RGBA -# mask_rgba = _load_rgba_mask_from_image(mask_img) - -# if req.passthrough: -# result = np.array(img_rgba.convert("RGB")) -# else: -# result = process_inpaint( -# np.array(img_rgba), -# mask_rgba, -# invert_mask=req.invert_mask, -# prompt=req.prompt, -# ) -# result_name = f"output_{uuid.uuid4().hex}.png" -# result_path = os.path.join(OUTPUT_DIR, result_name) -# Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) - -# url = str(request.url_for("download_file", filename=result_name)) -# logs.append({"result": result_name, "url": url, "timestamp": datetime.utcnow().isoformat()}) -# log_media_click(req.user_id, category_id, req.appname) -# return {"result": result_name, "url": url} -# except Exception as e: -# status = "fail" -# error_msg = str(e) -# raise -# finally: -# # Always log to regular MongoDB (mandatory) -# end_time = time.time() -# response_time_ms = (end_time - start_time) * 1000 -# log_doc = { -# "input_image_id": req.image_id, -# "input_mask_id": req.mask_id, -# "output_id": result_name, -# "status": status, -# "timestamp": datetime.utcnow(), -# "ts": int(time.time()), -# "response_time_ms": response_time_ms, -# } -# # Store appname in api_logs if provided -# if req.appname: -# log_doc["appname"] = req.appname -# if error_msg: -# log_doc["error"] = error_msg -# if mongo_logs is not None: -# try: -# log.info("Inserting log to MongoDB - Database: %s, Collection: %s", mongo_logs.database.name, mongo_logs.name) -# result = mongo_logs.insert_one(log_doc) -# log.info("Mongo log inserted successfully: _id=%s, output_id=%s, status=%s, db=%s, collection=%s", -# result.inserted_id, output_name, status, mongo_logs.database.name, mongo_logs.name) - -# # Verify the insert by reading it back -# try: -# verify_doc = mongo_logs.find_one({"_id": result.inserted_id}) -# if verify_doc: -# log.info("Verified: Document exists in MongoDB after insert") -# else: -# log.error("WARNING: Document not found after insert! _id=%s", result.inserted_id) -# except Exception as verify_err: -# log.warning("Could not verify insert: %s", verify_err) -# except Exception as mongo_err: -# log.error("Mongo log insert failed: %s, log_doc=%s", mongo_err, log_doc, exc_info=True) -# else: -# log.warning("MongoDB not configured, skipping log insert") - - -# @app.post("/inpaint-multipart") -# def inpaint_multipart( -# image: UploadFile = File(...), -# mask: UploadFile = File(...), -# request: Request = None, -# invert_mask: bool = True, -# mask_is_painted: bool = False, # if True, mask file is the painted-on image (e.g., black strokes on original) -# passthrough: bool = False, -# prompt: Optional[str] = Form(None), -# user_id: Optional[str] = Form(None), -# category_id: Optional[str] = Form(None), -# appname: Optional[str] = Form(None), -# _: None = Depends(bearer_auth), -# ) -> Dict[str, str]: -# start_time = time.time() -# status = "success" -# error_msg = None -# result_name = None - -# try: -# # Cleanup old GridFS files (runs at most once every 10 minutes) -# _cleanup_old_gridfs_files() - -# # Handle appname="collage-maker": get category_id from collage-maker if not provided -# final_category_id = category_id -# if appname == "collage-maker" and not final_category_id: -# final_category_id = get_category_id_from_collage_maker() -# if final_category_id: -# log.info("Using category_id from collage-maker: %s", final_category_id) - -# # Load in-memory -# img = Image.open(image.file).convert("RGBA") -# m = Image.open(mask.file).convert("RGBA") - -# if passthrough: -# # Just echo the input image, ignore mask -# result = np.array(img.convert("RGB")) -# result_name = f"output_{uuid.uuid4().hex}.png" -# result_path = os.path.join(OUTPUT_DIR, result_name) -# Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) - -# url: Optional[str] = None -# try: -# if request is not None: -# url = str(request.url_for("download_file", filename=result_name)) -# except Exception: -# url = None - -# entry: Dict[str, str] = {"result": result_name, "timestamp": datetime.utcnow().isoformat()} -# if url: -# entry["url"] = url -# logs.append(entry) -# resp: Dict[str, str] = {"result": result_name} -# if url: -# resp["url"] = url -# log_media_click(user_id, final_category_id, appname) -# return resp - -# if mask_is_painted: -# # Auto-detect pink/magenta paint and convert to black/white mask -# # White pixels = areas to remove, Black pixels = areas to keep -# log.info("Auto-detecting pink/magenta paint from uploaded image...") - -# m_rgb = cv2.cvtColor(np.array(m), cv2.COLOR_RGBA2RGB) - -# # Detect pink/magenta using fixed RGB bounds (same as /remove-pink) -# lower = np.array([150, 0, 100], dtype=np.uint8) -# upper = np.array([255, 120, 255], dtype=np.uint8) -# magenta_detected = ( -# (m_rgb[:, :, 0] >= lower[0]) & (m_rgb[:, :, 0] <= upper[0]) & -# (m_rgb[:, :, 1] >= lower[1]) & (m_rgb[:, :, 1] <= upper[1]) & -# (m_rgb[:, :, 2] >= lower[2]) & (m_rgb[:, :, 2] <= upper[2]) -# ).astype(np.uint8) * 255 - -# # Method 2: Also check if original image was provided to find differences -# if img is not None: -# img_rgb = cv2.cvtColor(np.array(img), cv2.COLOR_RGBA2RGB) -# if img_rgb.shape == m_rgb.shape: -# diff = cv2.absdiff(img_rgb, m_rgb) -# gray_diff = cv2.cvtColor(diff, cv2.COLOR_RGB2GRAY) -# # Any significant difference (>50) could be paint -# diff_mask = (gray_diff > 50).astype(np.uint8) * 255 -# # Combine with magenta detection -# binmask = cv2.bitwise_or(magenta_detected, diff_mask) -# else: -# binmask = magenta_detected -# else: -# # No original image provided, use magenta detection only -# binmask = magenta_detected - -# # Clean up the mask: remove noise and fill small holes -# kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) -# # Close small gaps in the mask -# binmask = cv2.morphologyEx(binmask, cv2.MORPH_CLOSE, kernel, iterations=2) -# # Remove small noise -# binmask = cv2.morphologyEx(binmask, cv2.MORPH_OPEN, kernel, iterations=1) - -# nonzero = int((binmask > 0).sum()) -# log.info("Pink/magenta paint detected: %d pixels marked for removal (white)", nonzero) - -# # If very few pixels detected, assume the user may already be providing a BW mask -# # and proceed without forcing strict detection - -# if nonzero < 50: -# log.error("CRITICAL: Could not detect pink/magenta paint! Returning original image.") -# result = np.array(img.convert("RGB")) if img else np.array(m.convert("RGB")) -# result_name = f"output_{uuid.uuid4().hex}.png" -# result_path = os.path.join(OUTPUT_DIR, result_name) -# Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) -# return {"result": result_name, "error": "pink/magenta paint detection failed - very few pixels detected"} - -# # Create binary mask: Pink pixels → white (255), Everything else → black (0) -# # Encode in RGBA format for process_inpaint -# # process_inpaint does: mask = 255 - mask[:,:,3] -# # So: alpha=0 (transparent/pink) → becomes 255 (white/remove) -# # alpha=255 (opaque/keep) → becomes 0 (black/keep) -# mask_rgba = np.zeros((binmask.shape[0], binmask.shape[1], 4), dtype=np.uint8) -# mask_rgba[:, :, 0] = binmask # R: white where pink (for visualization) -# mask_rgba[:, :, 1] = binmask # G: white where pink -# mask_rgba[:, :, 2] = binmask # B: white where pink -# # Alpha: invert so pink areas get alpha=0 → will become white after 255-alpha -# mask_rgba[:, :, 3] = 255 - binmask - -# log.info("Successfully created binary mask: %d pink pixels → white (255), %d pixels → black (0)", -# nonzero, binmask.shape[0] * binmask.shape[1] - nonzero) -# else: -# mask_rgba = _load_rgba_mask_from_image(m) - -# # When mask_is_painted=true, we encode pink as alpha=0, so process_inpaint's default invert_mask=True works correctly -# actual_invert = invert_mask # Use default True for painted masks -# log.info("Using invert_mask=%s (mask_is_painted=%s)", actual_invert, mask_is_painted) - -# result = process_inpaint( -# np.array(img), -# mask_rgba, -# invert_mask=actual_invert, -# prompt=prompt, -# ) -# result_name = f"output_{uuid.uuid4().hex}.png" -# result_path = os.path.join(OUTPUT_DIR, result_name) -# Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) - -# url: Optional[str] = None -# try: -# if request is not None: -# url = str(request.url_for("download_file", filename=result_name)) -# except Exception: -# url = None - -# entry: Dict[str, str] = {"result": result_name, "timestamp": datetime.utcnow().isoformat()} -# if url: -# entry["url"] = url -# logs.append(entry) -# resp: Dict[str, str] = {"result": result_name} -# if url: -# resp["url"] = url -# log_media_click(user_id, final_category_id, appname) -# return resp -# except Exception as e: -# status = "fail" -# error_msg = str(e) -# raise -# finally: -# # Always log to regular MongoDB (mandatory) -# end_time = time.time() -# response_time_ms = (end_time - start_time) * 1000 -# log_doc = { -# "endpoint": "inpaint-multipart", -# "output_id": result_name, -# "status": status, -# "timestamp": datetime.utcnow(), -# "ts": int(time.time()), -# "response_time_ms": response_time_ms, -# } -# # Store appname in api_logs if provided -# if appname: -# log_doc["appname"] = appname -# if error_msg: -# log_doc["error"] = error_msg -# if mongo_logs is not None: -# try: -# log.info("Inserting log to MongoDB - Database: %s, Collection: %s", mongo_logs.database.name, mongo_logs.name) -# result = mongo_logs.insert_one(log_doc) -# log.info("Mongo log inserted successfully: _id=%s, output_id=%s, status=%s, db=%s, collection=%s", -# result.inserted_id, output_name, status, mongo_logs.database.name, mongo_logs.name) - -# # Verify the insert by reading it back -# try: -# verify_doc = mongo_logs.find_one({"_id": result.inserted_id}) -# if verify_doc: -# log.info("Verified: Document exists in MongoDB after insert") -# else: -# log.error("WARNING: Document not found after insert! _id=%s", result.inserted_id) -# except Exception as verify_err: -# log.warning("Could not verify insert: %s", verify_err) -# except Exception as mongo_err: -# log.error("Mongo log insert failed: %s, log_doc=%s", mongo_err, log_doc, exc_info=True) -# else: -# log.warning("MongoDB not configured, skipping log insert") - - -# @app.post("/remove-pink") -# def remove_pink_segments( -# image: UploadFile = File(...), -# request: Request = None, -# user_id: Optional[str] = Form(None), -# category_id: Optional[str] = Form(None), -# appname: Optional[str] = Form(None), -# _: None = Depends(bearer_auth), -# ) -> Dict[str, str]: -# """ -# Simple endpoint: upload an image with pink/magenta segments to remove. -# - Pink/Magenta segments → automatically removed (white in mask) -# - Everything else → automatically kept (black in mask) -# Just paint pink/magenta on areas you want to remove, upload the image, and it works! -# """ -# start_time = time.time() -# status = "success" -# error_msg = None -# result_name = None - -# try: -# # Cleanup old GridFS files (runs at most once every 10 minutes) -# _cleanup_old_gridfs_files() - -# # Handle appname="collage-maker": get category_id from collage-maker if not provided -# final_category_id = category_id -# if appname == "collage-maker" and not final_category_id: -# final_category_id = get_category_id_from_collage_maker() -# if final_category_id: -# log.info("Using category_id from collage-maker: %s", final_category_id) - -# log.info(f"Simple remove-pink: processing image {image.filename}") - -# # Load the image (with pink paint on it) -# img = Image.open(image.file).convert("RGBA") -# img_rgb = cv2.cvtColor(np.array(img), cv2.COLOR_RGBA2RGB) - -# # Auto-detect pink/magenta segments to remove -# # Pink/Magenta → white in mask (remove) -# # Everything else (natural image colors, including dark areas) → black in mask (keep) - -# # Detect pink/magenta using fixed RGB bounds per requested logic -# lower = np.array([150, 0, 100], dtype=np.uint8) -# upper = np.array([255, 120, 255], dtype=np.uint8) -# binmask = ( -# (img_rgb[:, :, 0] >= lower[0]) & (img_rgb[:, :, 0] <= upper[0]) & -# (img_rgb[:, :, 1] >= lower[1]) & (img_rgb[:, :, 1] <= upper[1]) & -# (img_rgb[:, :, 2] >= lower[2]) & (img_rgb[:, :, 2] <= upper[2]) -# ).astype(np.uint8) * 255 - -# # Clean up the pink mask -# kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) -# binmask = cv2.morphologyEx(binmask, cv2.MORPH_CLOSE, kernel, iterations=2) -# binmask = cv2.morphologyEx(binmask, cv2.MORPH_OPEN, kernel, iterations=1) - -# nonzero = int((binmask > 0).sum()) -# total_pixels = binmask.shape[0] * binmask.shape[1] -# log.info(f"Detected {nonzero} pink pixels ({100*nonzero/total_pixels:.2f}% of image) to remove") - -# # Debug: log bounds used -# log.info("Pink detection bounds used: lower=[150,0,100], upper=[255,120,255]") - -# if nonzero < 50: -# log.error("No pink segments detected! Returning original image.") -# result = np.array(img.convert("RGB")) -# result_name = f"output_{uuid.uuid4().hex}.png" -# result_path = os.path.join(OUTPUT_DIR, result_name) -# Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) -# return { -# "result": result_name, -# "error": "No pink/magenta segments detected. Please paint areas to remove with magenta/pink color (RGB 255,0,255)." -# } - -# # Create binary mask: Pink pixels → white (255), Everything else → black (0) -# # Encode in RGBA format that process_inpaint expects -# # process_inpaint does: mask = 255 - mask[:,:,3] -# # So: alpha=0 (transparent/pink) → becomes 255 (white/remove) -# # alpha=255 (opaque/keep) → becomes 0 (black/keep) -# mask_rgba = np.zeros((binmask.shape[0], binmask.shape[1], 4), dtype=np.uint8) -# # RGB channels don't matter for process_inpaint, but set them to white where pink for visualization -# mask_rgba[:, :, 0] = binmask # R: white where pink -# mask_rgba[:, :, 1] = binmask # G: white where pink -# mask_rgba[:, :, 2] = binmask # B: white where pink -# # Alpha: 0 (transparent) where pink → will become white after 255-alpha -# # 255 (opaque) everywhere else → will become black after 255-alpha -# mask_rgba[:, :, 3] = 255 - binmask # Invert: pink areas get alpha=0, rest get alpha=255 - -# # Verify mask encoding -# alpha_zero_count = int((mask_rgba[:,:,3] == 0).sum()) -# alpha_255_count = int((mask_rgba[:,:,3] == 255).sum()) -# total_pixels = binmask.shape[0] * binmask.shape[1] -# log.info(f"Mask encoding: {alpha_zero_count} pixels with alpha=0 (pink), {alpha_255_count} pixels with alpha=255 (keep)") -# log.info(f"After 255-alpha conversion: {alpha_zero_count} will become white (255/remove), {alpha_255_count} will become black (0/keep)") - -# # IMPORTANT: We need to use the ORIGINAL image WITHOUT pink paint for inpainting! -# # Remove pink from the original image before processing -# # Create a clean version: where pink was detected, keep original image colors -# img_clean = np.array(img.convert("RGBA")) -# # Where pink is detected, we want to inpaint, so we can leave it (or blend it out) -# # Actually, the model will inpaint over those areas, so we can pass the original -# # But for better results, we might want to remove the pink overlay first - -# # Process with invert_mask=True (default) because process_inpaint expects alpha=0 for removal -# log.info(f"Starting inpainting process...") -# result = process_inpaint(img_clean, mask_rgba, invert_mask=True) -# log.info(f"Inpainting complete, result shape: {result.shape}") -# result_name = f"output_{uuid.uuid4().hex}.png" -# result_path = os.path.join(OUTPUT_DIR, result_name) -# Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) - -# url: Optional[str] = None -# try: -# if request is not None: -# url = str(request.url_for("download_file", filename=result_name)) -# except Exception: -# url = None - -# logs.append({ -# "result": result_name, -# "filename": image.filename, -# "pink_pixels": nonzero, -# "timestamp": datetime.utcnow().isoformat() -# }) - -# resp: Dict[str, str] = {"result": result_name, "pink_segments_detected": str(nonzero)} -# if url: -# resp["url"] = url -# log_media_click(user_id, final_category_id, appname) -# return resp -# except Exception as e: -# status = "fail" -# error_msg = str(e) -# raise -# finally: -# # Always log to regular MongoDB (mandatory) -# end_time = time.time() -# response_time_ms = (end_time - start_time) * 1000 -# log_doc = { -# "endpoint": "remove-pink", -# "output_id": result_name, -# "status": status, -# "timestamp": datetime.utcnow(), -# "ts": int(time.time()), -# "response_time_ms": response_time_ms, -# } -# # Store appname in api_logs if provided -# if appname: -# log_doc["appname"] = appname -# if error_msg: -# log_doc["error"] = error_msg -# if mongo_logs is not None: -# try: -# log.info("Inserting log to MongoDB - Database: %s, Collection: %s", mongo_logs.database.name, mongo_logs.name) -# result = mongo_logs.insert_one(log_doc) -# log.info("Mongo log inserted successfully: _id=%s, output_id=%s, status=%s, db=%s, collection=%s", -# result.inserted_id, output_name, status, mongo_logs.database.name, mongo_logs.name) - -# # Verify the insert by reading it back -# try: -# verify_doc = mongo_logs.find_one({"_id": result.inserted_id}) -# if verify_doc: -# log.info("Verified: Document exists in MongoDB after insert") -# else: -# log.error("WARNING: Document not found after insert! _id=%s", result.inserted_id) -# except Exception as verify_err: -# log.warning("Could not verify insert: %s", verify_err) -# except Exception as mongo_err: -# log.error("Mongo log insert failed: %s, log_doc=%s", mongo_err, log_doc, exc_info=True) -# else: -# log.warning("MongoDB not configured, skipping log insert") - - -# @app.get("/download/{filename}") -# def download_file(filename: str): -# path = os.path.join(OUTPUT_DIR, filename) -# if not os.path.isfile(path): -# raise HTTPException(status_code=404, detail="file not found") -# return FileResponse(path) - - -# @app.get("/result/{filename}") -# def view_result(filename: str): -# """View result image directly in browser (same as download but with proper content-type for viewing)""" -# path = os.path.join(OUTPUT_DIR, filename) -# if not os.path.isfile(path): -# raise HTTPException(status_code=404, detail="file not found") -# return FileResponse(path, media_type="image/png") - - -# @app.get("/logs") -# def get_logs(_: None = Depends(bearer_auth)) -> JSONResponse: -# return JSONResponse(content=logs) -# import os -# import uuid -# import shutil -# import re -# from datetime import datetime, timedelta, date -# from typing import Dict, List, Optional - -# import numpy as np -# from fastapi import ( -# FastAPI, -# UploadFile, -# File, -# HTTPException, -# Depends, -# Header, -# Request, -# Form, -# ) -# from fastapi.responses import FileResponse, JSONResponse -# from pydantic import BaseModel -# from PIL import Image -# import cv2 -# import logging - -# from bson import ObjectId -# from pymongo import MongoClient -# import time - -# logging.basicConfig(level=logging.INFO) -# log = logging.getLogger("api") - -# from src.core import process_inpaint - -# # Directories (use writable space on HF Spaces) -# BASE_DIR = os.environ.get("DATA_DIR", "/data") -# if not os.path.isdir(BASE_DIR): -# # Fallback to /tmp if /data not available -# BASE_DIR = "/tmp" - -# UPLOAD_DIR = os.path.join(BASE_DIR, "uploads") -# OUTPUT_DIR = os.path.join(BASE_DIR, "outputs") - -# os.makedirs(UPLOAD_DIR, exist_ok=True) -# os.makedirs(OUTPUT_DIR, exist_ok=True) - -# # Optional Bearer token: set env API_TOKEN to require auth; if not set, endpoints are open -# ENV_TOKEN = os.environ.get("API_TOKEN") - -# app = FastAPI(title="Photo Object Removal API", version="1.0.0") - -# # In-memory stores -# file_store: Dict[str, Dict[str, str]] = {} -# logs: List[Dict[str, str]] = [] - -# MONGO_URI = "mongodb+srv://harilogicgo_db_user:pdnh6UCMsWvuTCoi@kiddoimages.k2a4nuv.mongodb.net/?appName=KiddoImages" -# mongo_client = MongoClient(MONGO_URI) -# mongo_db = mongo_client["object_remover"] -# mongo_logs = mongo_db["api_logs"] - -# ADMIN_MONGO_URI = os.environ.get("MONGODB_ADMIN") -# DEFAULT_CATEGORY_ID = "69368f722e46bd68ae188984" -# admin_media_clicks = None - - -# def _init_admin_mongo() -> None: -# global admin_media_clicks -# if not ADMIN_MONGO_URI: -# log.info("Admin Mongo URI not provided; media click logging disabled") -# return -# try: -# admin_client = MongoClient(ADMIN_MONGO_URI) -# # get_default_database() extracts database from connection string (e.g., /adminPanel) -# admin_db = admin_client.get_default_database() -# if admin_db is None: -# # Fallback if no database in URI -# admin_db = admin_client["admin"] -# log.warning("No database in connection string, defaulting to 'admin'") - -# admin_media_clicks = admin_db["media_clicks"] -# log.info( -# "Admin media click logging initialized: db=%s collection=%s", -# admin_db.name, -# admin_media_clicks.name, -# ) -# try: -# admin_media_clicks.drop_index("user_id_1_header_1_media_id_1") -# log.info("Dropped legacy index user_id_1_header_1_media_id_1") -# except Exception as idx_err: -# # Index drop failure is non-critical (often permission issue) -# if "Unauthorized" not in str(idx_err): -# log.info("Skipping legacy index drop: %s", idx_err) -# except Exception as err: -# log.error("Failed to init admin Mongo client: %s", err) -# admin_media_clicks = None - - -# _init_admin_mongo() - - -# def _admin_logging_status() -> Dict[str, object]: -# if admin_media_clicks is None: -# return { -# "enabled": False, -# "db": None, -# "collection": None, -# } -# return { -# "enabled": True, -# "db": admin_media_clicks.database.name, -# "collection": admin_media_clicks.name, -# } - - -# def _build_ai_edit_daily_count( -# existing: Optional[List[Dict[str, object]]], -# today: date, -# ) -> List[Dict[str, object]]: -# """ -# Build / extend the ai_edit_daily_count array with the following rules: - -# - Case A (no existing data): return [{date: today, count: 1}] -# - Case B (today already recorded): return list unchanged -# - Case C (gap in days): fill missing days with count=0 and append today with count=1 - -# Additionally, the returned list is capped to the most recent 32 entries. - -# The stored "date" value is a midnight UTC (naive UTC) datetime for the given day. -# """ - -# def _to_date_only(value: object) -> date: -# if isinstance(value, datetime): -# return value.date() -# if isinstance(value, date): -# return value -# # Fallback: try parsing ISO string "YYYY-MM-DD" or full datetime -# try: -# text = str(value) -# if len(text) == 10: -# return datetime.strptime(text, "%Y-%m-%d").date() -# return datetime.fromisoformat(text).date() -# except Exception: -# # If parsing fails, just treat as today to avoid crashing -# return today - -# # Case A: first ever use (no array yet) -# if not existing: -# return [ -# { -# "date": datetime(today.year, today.month, today.day), -# "count": 1, -# } -# ] - -# # Work on a shallow copy so we don't mutate original in-place -# result: List[Dict[str, object]] = list(existing) - -# last_entry = result[-1] if result else None -# if not last_entry or "date" not in last_entry: -# # If structure is unexpected, re-initialize safely -# return [ -# { -# "date": datetime(today.year, today.month, today.day), -# "count": 1, -# } -# ] - -# last_date = _to_date_only(last_entry["date"]) - -# # If somehow the last stored date is in the future, do nothing to avoid corrupting history -# if last_date > today: -# return result - -# # Case B: today's date already present as the last entry → unchanged -# if last_date == today: -# return result - -# # Case C: there is a gap, fill missing days with count=0 and append today with count=1 -# cursor = last_date + timedelta(days=1) -# while cursor < today: -# result.append( -# { -# "date": datetime(cursor.year, cursor.month, cursor.day), -# "count": 0, -# } -# ) -# cursor += timedelta(days=1) - -# # Finally add today's presence indicator -# result.append( -# { -# "date": datetime(today.year, today.month, today.day), -# "count": 1, -# } -# ) - -# # Sort by date ascending (older dates first) to guarantee stable ordering: -# # [oldest, ..., newest] -# try: -# result.sort(key=lambda entry: _to_date_only(entry.get("date"))) -# except Exception: -# # If anything goes wrong during sort, fall back to current ordering -# pass - -# # Enforce 32-entry limit (keep the most recent 32 days) -# if len(result) > 32: -# result = result[-32:] - -# return result - -# def bearer_auth(authorization: Optional[str] = Header(default=None)) -> None: -# if not ENV_TOKEN: -# return -# if authorization is None or not authorization.lower().startswith("bearer "): -# raise HTTPException(status_code=401, detail="Unauthorized") -# token = authorization.split(" ", 1)[1] -# if token != ENV_TOKEN: -# raise HTTPException(status_code=403, detail="Forbidden") - - -# class InpaintRequest(BaseModel): -# image_id: str -# mask_id: str -# invert_mask: bool = True # True => selected/painted area is removed -# passthrough: bool = False # If True, return the original image unchanged -# user_id: Optional[str] = None -# category_id: Optional[str] = None - - -# class SimpleRemoveRequest(BaseModel): -# image_id: str # Image with pink/magenta segments to remove - - -# def _coerce_object_id(value: Optional[str]) -> ObjectId: -# if value is None: -# return ObjectId() -# value_str = str(value).strip() -# if re.fullmatch(r"[0-9a-fA-F]{24}", value_str): -# return ObjectId(value_str) -# if value_str.isdigit(): -# hex_str = format(int(value_str), "x") -# if len(hex_str) > 24: -# hex_str = hex_str[-24:] -# hex_str = hex_str.rjust(24, "0") -# return ObjectId(hex_str) -# return ObjectId() - - -# def _coerce_category_id(category_id: Optional[str]) -> ObjectId: -# raw = category_id or DEFAULT_CATEGORY_ID -# raw_str = str(raw).strip() -# if re.fullmatch(r"[0-9a-fA-F]{24}", raw_str): -# return ObjectId(raw_str) -# return _coerce_object_id(raw_str) - - -# def log_media_click(user_id: Optional[str], category_id: Optional[str]) -> None: -# """Log to admin media_clicks collection only if user_id is provided.""" -# if admin_media_clicks is None: -# return -# # Only log if user_id is provided (not None/empty) -# if not user_id or not user_id.strip(): -# return -# try: -# user_obj = _coerce_object_id(user_id) -# category_obj = _coerce_category_id(category_id) -# now = datetime.utcnow() -# today = now.date() - -# doc = admin_media_clicks.find_one({"userId": user_obj}) -# if doc: -# existing_daily = doc.get("ai_edit_daily_count") -# updated_daily = _build_ai_edit_daily_count(existing_daily, today) -# categories = doc.get("categories") or [] -# if any(cat.get("categoryId") == category_obj for cat in categories): -# # Category exists: increment click_count and ai_edit_complete, update dates -# admin_media_clicks.update_one( -# {"_id": doc["_id"], "categories.categoryId": category_obj}, -# { -# "$inc": { -# "categories.$.click_count": 1, -# "ai_edit_complete": 1, # $inc handles missing fields (backward compatible) -# }, -# "$set": { -# "categories.$.lastClickedAt": now, -# "updatedAt": now, -# "ai_edit_last_date": now, -# "ai_edit_daily_count": updated_daily, -# }, -# }, -# ) -# else: -# # New category to existing document: push category, increment ai_edit_complete -# admin_media_clicks.update_one( -# {"_id": doc["_id"]}, -# { -# "$push": { -# "categories": { -# "categoryId": category_obj, -# "click_count": 1, -# "lastClickedAt": now, -# } -# }, -# "$inc": {"ai_edit_complete": 1}, # $inc handles missing fields -# "$set": { -# "updatedAt": now, -# "ai_edit_last_date": now, -# "ai_edit_daily_count": updated_daily, -# }, -# }, -# ) -# else: -# # New user: create document with default ai_edit_complete=0, then increment to 1 -# daily_for_new = _build_ai_edit_daily_count(None, today) -# admin_media_clicks.update_one( -# {"userId": user_obj}, -# { -# "$setOnInsert": { -# "userId": user_obj, -# "categories": [ -# { -# "categoryId": category_obj, -# "click_count": 1, -# "lastClickedAt": now, -# } -# ], -# "createdAt": now, -# "updatedAt": now, -# "ai_edit_daily_count": daily_for_new, -# }, -# "$inc": {"ai_edit_complete": 1}, # Increment to 1 on first use -# "$set": { -# "updatedAt": now, -# "ai_edit_last_date": now, -# }, -# }, -# upsert=True, -# ) -# except Exception as err: -# err_str = str(err) -# if "Unauthorized" in err_str or "not authorized" in err_str.lower(): -# log.warning( -# "Admin media click logging failed (permissions): user lacks read/write on db=%s collection=%s. " -# "Check MongoDB user permissions.", -# admin_media_clicks.database.name, -# admin_media_clicks.name, -# ) -# else: -# log.warning("Admin media click logging failed: %s", err) - - -# @app.get("/") -# def root() -> Dict[str, object]: -# return { -# "name": "Photo Object Removal API", -# "status": "ok", -# "endpoints": { -# "GET /health": "health check", -# "POST /upload-image": "form-data: image=file", -# "POST /upload-mask": "form-data: mask=file", -# "POST /inpaint": "JSON: {image_id, mask_id}", -# "POST /inpaint-multipart": "form-data: image=file, mask=file", -# "POST /remove-pink": "form-data: image=file (auto-detects pink segments and removes them)", -# "GET /download/{filename}": "download result image", -# "GET /result/{filename}": "view result image in browser", -# "GET /logs": "recent uploads/results", -# }, -# "auth": "set API_TOKEN env var to require Authorization: Bearer (except /health)", -# } - - -# @app.get("/health") -# def health() -> Dict[str, str]: -# return {"status": "healthy"} - - -# @app.get("/logging-status") -# def logging_status(_: None = Depends(bearer_auth)) -> Dict[str, object]: -# """Helper endpoint to verify admin media logging wiring (no secrets exposed).""" -# return _admin_logging_status() - - -# @app.post("/upload-image") -# def upload_image(image: UploadFile = File(...), _: None = Depends(bearer_auth)) -> Dict[str, str]: -# ext = os.path.splitext(image.filename)[1] or ".png" -# file_id = str(uuid.uuid4()) -# stored_name = f"{file_id}{ext}" -# stored_path = os.path.join(UPLOAD_DIR, stored_name) -# with open(stored_path, "wb") as f: -# shutil.copyfileobj(image.file, f) -# file_store[file_id] = { -# "type": "image", -# "filename": image.filename, -# "stored_name": stored_name, -# "path": stored_path, -# "timestamp": datetime.utcnow().isoformat(), -# } -# logs.append({"id": file_id, "filename": image.filename, "type": "image", "timestamp": datetime.utcnow().isoformat()}) -# return {"id": file_id, "filename": image.filename} - - -# @app.post("/upload-mask") -# def upload_mask(mask: UploadFile = File(...), _: None = Depends(bearer_auth)) -> Dict[str, str]: -# ext = os.path.splitext(mask.filename)[1] or ".png" -# file_id = str(uuid.uuid4()) -# stored_name = f"{file_id}{ext}" -# stored_path = os.path.join(UPLOAD_DIR, stored_name) -# with open(stored_path, "wb") as f: -# shutil.copyfileobj(mask.file, f) -# file_store[file_id] = { -# "type": "mask", -# "filename": mask.filename, -# "stored_name": stored_name, -# "path": stored_path, -# "timestamp": datetime.utcnow().isoformat(), -# } -# logs.append({"id": file_id, "filename": mask.filename, "type": "mask", "timestamp": datetime.utcnow().isoformat()}) -# return {"id": file_id, "filename": mask.filename} - - -# def _load_rgba_image(path: str) -> Image.Image: -# img = Image.open(path) -# return img.convert("RGBA") - - -# def _load_rgba_mask_from_image(img: Image.Image) -> np.ndarray: -# """ -# Convert mask image to RGBA format (black/white mask). -# Standard convention: white (255) = area to remove, black (0) = area to keep -# Returns RGBA with white in RGB channels where removal is needed, alpha=255 -# """ -# if img.mode != "RGBA": -# # For RGB/Grayscale masks: white (value>128) = remove, black (value<=128) = keep -# gray = img.convert("L") -# arr = np.array(gray) -# # Create proper black/white mask: white pixels (>128) = remove, black (<=128) = keep -# mask_bw = np.where(arr > 128, 255, 0).astype(np.uint8) - -# rgba = np.zeros((img.height, img.width, 4), dtype=np.uint8) -# rgba[:, :, 0] = mask_bw # R -# rgba[:, :, 1] = mask_bw # G -# rgba[:, :, 2] = mask_bw # B -# rgba[:, :, 3] = 255 # Fully opaque -# log.info(f"Loaded {img.mode} mask: {int((mask_bw > 0).sum())} white pixels (to remove)") -# return rgba - -# # For RGBA: check if alpha channel is meaningful -# arr = np.array(img) -# alpha = arr[:, :, 3] -# rgb = arr[:, :, :3] - -# # If alpha is mostly opaque everywhere (mean > 200), treat RGB channels as mask values -# if alpha.mean() > 200: -# # Use RGB to determine mask: white/bright in RGB = remove -# gray = cv2.cvtColor(rgb, cv2.COLOR_RGB2GRAY) -# # Also detect magenta specifically -# magenta = np.all(rgb == [255, 0, 255], axis=2).astype(np.uint8) * 255 -# mask_bw = np.maximum(np.where(gray > 128, 255, 0).astype(np.uint8), magenta) - -# rgba = arr.copy() -# rgba[:, :, 0] = mask_bw # R -# rgba[:, :, 1] = mask_bw # G -# rgba[:, :, 2] = mask_bw # B -# rgba[:, :, 3] = 255 # Fully opaque -# log.info(f"Loaded RGBA mask (RGB-based): {int((mask_bw > 0).sum())} white pixels (to remove)") -# return rgba - -# # Alpha channel encodes the mask - convert to RGB-based -# # Transparent areas (alpha < 128) = remove, Opaque areas = keep -# mask_bw = np.where(alpha < 128, 255, 0).astype(np.uint8) -# rgba = arr.copy() -# rgba[:, :, 0] = mask_bw -# rgba[:, :, 1] = mask_bw -# rgba[:, :, 2] = mask_bw -# rgba[:, :, 3] = 255 -# log.info(f"Loaded RGBA mask (alpha-based): {int((mask_bw > 0).sum())} white pixels (to remove)") -# return rgba - -# @app.post("/inpaint") -# def inpaint(req: InpaintRequest, _: None = Depends(bearer_auth)) -> Dict[str, str]: -# start_time = time.time() -# status = "success" -# error_msg = None -# output_name = None - -# try: -# if req.image_id not in file_store or file_store[req.image_id]["type"] != "image": -# raise HTTPException(status_code=404, detail="image_id not found") - -# if req.mask_id not in file_store or file_store[req.mask_id]["type"] != "mask": -# raise HTTPException(status_code=404, detail="mask_id not found") - -# img_rgba = _load_rgba_image(file_store[req.image_id]["path"]) -# mask_img = Image.open(file_store[req.mask_id]["path"]) -# mask_rgba = _load_rgba_mask_from_image(mask_img) - -# if req.passthrough: -# result = np.array(img_rgba.convert("RGB")) -# else: -# result = process_inpaint( -# np.array(img_rgba), -# mask_rgba, -# invert_mask=req.invert_mask -# ) - -# output_name = f"output_{uuid.uuid4().hex}.png" -# output_path = os.path.join(OUTPUT_DIR, output_name) - -# Image.fromarray(result).save( -# output_path, "PNG", optimize=False, compress_level=1 -# ) - -# log_media_click(req.user_id, req.category_id) -# return {"result": output_name} - -# except Exception as e: -# status = "fail" -# error_msg = str(e) -# raise - -# finally: -# end_time = time.time() -# response_time_ms = (end_time - start_time) * 1000 - -# log_doc = { -# "input_image_id": req.image_id, -# "input_mask_id": req.mask_id, -# "output_id": output_name, -# "status": status, -# "timestamp": datetime.utcnow(), -# "ts": int(time.time()), -# "response_time_ms": response_time_ms -# } - -# if error_msg: -# log_doc["error"] = error_msg - -# try: -# mongo_logs.insert_one(log_doc) -# except Exception as mongo_err: -# log.error(f"Mongo log insert failed: {mongo_err}") - -# # @app.post("/inpaint") -# # def inpaint(req: InpaintRequest, _: None = Depends(bearer_auth)) -> Dict[str, str]: -# # if req.image_id not in file_store or file_store[req.image_id]["type"] != "image": -# # raise HTTPException(status_code=404, detail="image_id not found") -# # if req.mask_id not in file_store or file_store[req.mask_id]["type"] != "mask": -# # raise HTTPException(status_code=404, detail="mask_id not found") - -# # img_rgba = _load_rgba_image(file_store[req.image_id]["path"]) -# # mask_img = Image.open(file_store[req.mask_id]["path"]) # may be RGB/gray/RGBA -# # mask_rgba = _load_rgba_mask_from_image(mask_img) - -# # # Debug: check mask before processing -# # white_pixels = int((mask_rgba[:,:,0] > 128).sum()) -# # log.info(f"Inpaint request: mask has {white_pixels} white pixels, invert_mask={req.invert_mask}") - -# # if req.passthrough: -# # result = np.array(img_rgba.convert("RGB")) -# # else: -# # result = process_inpaint(np.array(img_rgba), mask_rgba, invert_mask=req.invert_mask) -# # result_name = f"output_{uuid.uuid4().hex}.png" -# # result_path = os.path.join(OUTPUT_DIR, result_name) -# # Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) - -# # logs.append({"result": result_name, "timestamp": datetime.utcnow().isoformat()}) -# # return {"result": result_name} - - -# @app.post("/inpaint-url") -# def inpaint_url(req: InpaintRequest, request: Request, _: None = Depends(bearer_auth)) -> Dict[str, str]: -# """Same as /inpaint but returns a JSON with a public download URL instead of image bytes.""" -# start_time = time.time() -# status = "success" -# error_msg = None -# result_name = None - -# try: -# if req.image_id not in file_store or file_store[req.image_id]["type"] != "image": -# raise HTTPException(status_code=404, detail="image_id not found") -# if req.mask_id not in file_store or file_store[req.mask_id]["type"] != "mask": -# raise HTTPException(status_code=404, detail="mask_id not found") - -# img_rgba = _load_rgba_image(file_store[req.image_id]["path"]) -# mask_img = Image.open(file_store[req.mask_id]["path"]) # may be RGB/gray/RGBA -# mask_rgba = _load_rgba_mask_from_image(mask_img) - -# if req.passthrough: -# result = np.array(img_rgba.convert("RGB")) -# else: -# result = process_inpaint(np.array(img_rgba), mask_rgba, invert_mask=req.invert_mask) -# result_name = f"output_{uuid.uuid4().hex}.png" -# result_path = os.path.join(OUTPUT_DIR, result_name) -# Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) - -# url = str(request.url_for("download_file", filename=result_name)) -# logs.append({"result": result_name, "url": url, "timestamp": datetime.utcnow().isoformat()}) -# log_media_click(req.user_id, req.category_id) -# return {"result": result_name, "url": url} -# except Exception as e: -# status = "fail" -# error_msg = str(e) -# raise -# finally: -# # Always log to regular MongoDB (mandatory) -# end_time = time.time() -# response_time_ms = (end_time - start_time) * 1000 -# log_doc = { -# "input_image_id": req.image_id, -# "input_mask_id": req.mask_id, -# "output_id": result_name, -# "status": status, -# "timestamp": datetime.utcnow(), -# "ts": int(time.time()), -# "response_time_ms": response_time_ms, -# } -# if error_msg: -# log_doc["error"] = error_msg -# try: -# mongo_logs.insert_one(log_doc) -# except Exception as mongo_err: -# log.error("Mongo log insert failed: %s", mongo_err) - - -# @app.post("/inpaint-multipart") -# def inpaint_multipart( -# image: UploadFile = File(...), -# mask: UploadFile = File(...), -# request: Request = None, -# invert_mask: bool = True, -# mask_is_painted: bool = False, # if True, mask file is the painted-on image (e.g., black strokes on original) -# passthrough: bool = False, -# user_id: Optional[str] = Form(None), -# category_id: Optional[str] = Form(None), -# _: None = Depends(bearer_auth), -# ) -> Dict[str, str]: -# start_time = time.time() -# status = "success" -# error_msg = None -# result_name = None - -# try: -# # Load in-memory -# img = Image.open(image.file).convert("RGBA") -# m = Image.open(mask.file).convert("RGBA") - -# if passthrough: -# # Just echo the input image, ignore mask -# result = np.array(img.convert("RGB")) -# result_name = f"output_{uuid.uuid4().hex}.png" -# result_path = os.path.join(OUTPUT_DIR, result_name) -# Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) - -# url: Optional[str] = None -# try: -# if request is not None: -# url = str(request.url_for("download_file", filename=result_name)) -# except Exception: -# url = None - -# entry: Dict[str, str] = {"result": result_name, "timestamp": datetime.utcnow().isoformat()} -# if url: -# entry["url"] = url -# logs.append(entry) -# resp: Dict[str, str] = {"result": result_name} -# if url: -# resp["url"] = url -# log_media_click(user_id, category_id) -# return resp - -# if mask_is_painted: -# # Auto-detect pink/magenta paint and convert to black/white mask -# # White pixels = areas to remove, Black pixels = areas to keep -# log.info("Auto-detecting pink/magenta paint from uploaded image...") - -# m_rgb = cv2.cvtColor(np.array(m), cv2.COLOR_RGBA2RGB) - -# # Detect pink/magenta using fixed RGB bounds (same as /remove-pink) -# lower = np.array([150, 0, 100], dtype=np.uint8) -# upper = np.array([255, 120, 255], dtype=np.uint8) -# magenta_detected = ( -# (m_rgb[:, :, 0] >= lower[0]) & (m_rgb[:, :, 0] <= upper[0]) & -# (m_rgb[:, :, 1] >= lower[1]) & (m_rgb[:, :, 1] <= upper[1]) & -# (m_rgb[:, :, 2] >= lower[2]) & (m_rgb[:, :, 2] <= upper[2]) -# ).astype(np.uint8) * 255 - -# # Method 2: Also check if original image was provided to find differences -# if img is not None: -# img_rgb = cv2.cvtColor(np.array(img), cv2.COLOR_RGBA2RGB) -# if img_rgb.shape == m_rgb.shape: -# diff = cv2.absdiff(img_rgb, m_rgb) -# gray_diff = cv2.cvtColor(diff, cv2.COLOR_RGB2GRAY) -# # Any significant difference (>50) could be paint -# diff_mask = (gray_diff > 50).astype(np.uint8) * 255 -# # Combine with magenta detection -# binmask = cv2.bitwise_or(magenta_detected, diff_mask) -# else: -# binmask = magenta_detected -# else: -# # No original image provided, use magenta detection only -# binmask = magenta_detected - -# # Clean up the mask: remove noise and fill small holes -# kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) -# # Close small gaps in the mask -# binmask = cv2.morphologyEx(binmask, cv2.MORPH_CLOSE, kernel, iterations=2) -# # Remove small noise -# binmask = cv2.morphologyEx(binmask, cv2.MORPH_OPEN, kernel, iterations=1) - -# nonzero = int((binmask > 0).sum()) -# log.info("Pink/magenta paint detected: %d pixels marked for removal (white)", nonzero) - -# # If very few pixels detected, assume the user may already be providing a BW mask -# # and proceed without forcing strict detection - -# if nonzero < 50: -# log.error("CRITICAL: Could not detect pink/magenta paint! Returning original image.") -# result = np.array(img.convert("RGB")) if img else np.array(m.convert("RGB")) -# result_name = f"output_{uuid.uuid4().hex}.png" -# result_path = os.path.join(OUTPUT_DIR, result_name) -# Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) -# return {"result": result_name, "error": "pink/magenta paint detection failed - very few pixels detected"} - -# # Create binary mask: Pink pixels → white (255), Everything else → black (0) -# # Encode in RGBA format for process_inpaint -# # process_inpaint does: mask = 255 - mask[:,:,3] -# # So: alpha=0 (transparent/pink) → becomes 255 (white/remove) -# # alpha=255 (opaque/keep) → becomes 0 (black/keep) -# mask_rgba = np.zeros((binmask.shape[0], binmask.shape[1], 4), dtype=np.uint8) -# mask_rgba[:, :, 0] = binmask # R: white where pink (for visualization) -# mask_rgba[:, :, 1] = binmask # G: white where pink -# mask_rgba[:, :, 2] = binmask # B: white where pink -# # Alpha: invert so pink areas get alpha=0 → will become white after 255-alpha -# mask_rgba[:, :, 3] = 255 - binmask - -# log.info("Successfully created binary mask: %d pink pixels → white (255), %d pixels → black (0)", -# nonzero, binmask.shape[0] * binmask.shape[1] - nonzero) -# else: -# mask_rgba = _load_rgba_mask_from_image(m) - -# # When mask_is_painted=true, we encode pink as alpha=0, so process_inpaint's default invert_mask=True works correctly -# actual_invert = invert_mask # Use default True for painted masks -# log.info("Using invert_mask=%s (mask_is_painted=%s)", actual_invert, mask_is_painted) - -# result = process_inpaint(np.array(img), mask_rgba, invert_mask=actual_invert) -# result_name = f"output_{uuid.uuid4().hex}.png" -# result_path = os.path.join(OUTPUT_DIR, result_name) -# Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) - -# url: Optional[str] = None -# try: -# if request is not None: -# url = str(request.url_for("download_file", filename=result_name)) -# except Exception: -# url = None - -# entry: Dict[str, str] = {"result": result_name, "timestamp": datetime.utcnow().isoformat()} -# if url: -# entry["url"] = url -# logs.append(entry) -# resp: Dict[str, str] = {"result": result_name} -# if url: -# resp["url"] = url -# log_media_click(user_id, category_id) -# return resp -# except Exception as e: -# status = "fail" -# error_msg = str(e) -# raise -# finally: -# # Always log to regular MongoDB (mandatory) -# end_time = time.time() -# response_time_ms = (end_time - start_time) * 1000 -# log_doc = { -# "endpoint": "inpaint-multipart", -# "output_id": result_name, -# "status": status, -# "timestamp": datetime.utcnow(), -# "ts": int(time.time()), -# "response_time_ms": response_time_ms, -# } -# if error_msg: -# log_doc["error"] = error_msg -# try: -# mongo_logs.insert_one(log_doc) -# except Exception as mongo_err: -# log.error("Mongo log insert failed: %s", mongo_err) - - -# @app.post("/remove-pink") -# def remove_pink_segments( -# image: UploadFile = File(...), -# request: Request = None, -# user_id: Optional[str] = Form(None), -# category_id: Optional[str] = Form(None), -# _: None = Depends(bearer_auth), -# ) -> Dict[str, str]: -# """ -# Simple endpoint: upload an image with pink/magenta segments to remove. -# - Pink/Magenta segments → automatically removed (white in mask) -# - Everything else → automatically kept (black in mask) -# Just paint pink/magenta on areas you want to remove, upload the image, and it works! -# """ -# start_time = time.time() -# status = "success" -# error_msg = None -# result_name = None - -# try: -# log.info(f"Simple remove-pink: processing image {image.filename}") - -# # Load the image (with pink paint on it) -# img = Image.open(image.file).convert("RGBA") -# img_rgb = cv2.cvtColor(np.array(img), cv2.COLOR_RGBA2RGB) - -# # Auto-detect pink/magenta segments to remove -# # Pink/Magenta → white in mask (remove) -# # Everything else (natural image colors, including dark areas) → black in mask (keep) - -# # Detect pink/magenta using fixed RGB bounds per requested logic -# lower = np.array([150, 0, 100], dtype=np.uint8) -# upper = np.array([255, 120, 255], dtype=np.uint8) -# binmask = ( -# (img_rgb[:, :, 0] >= lower[0]) & (img_rgb[:, :, 0] <= upper[0]) & -# (img_rgb[:, :, 1] >= lower[1]) & (img_rgb[:, :, 1] <= upper[1]) & -# (img_rgb[:, :, 2] >= lower[2]) & (img_rgb[:, :, 2] <= upper[2]) -# ).astype(np.uint8) * 255 - -# # Clean up the pink mask -# kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) -# binmask = cv2.morphologyEx(binmask, cv2.MORPH_CLOSE, kernel, iterations=2) -# binmask = cv2.morphologyEx(binmask, cv2.MORPH_OPEN, kernel, iterations=1) - -# nonzero = int((binmask > 0).sum()) -# total_pixels = binmask.shape[0] * binmask.shape[1] -# log.info(f"Detected {nonzero} pink pixels ({100*nonzero/total_pixels:.2f}% of image) to remove") - -# # Debug: log bounds used -# log.info("Pink detection bounds used: lower=[150,0,100], upper=[255,120,255]") - -# if nonzero < 50: -# log.error("No pink segments detected! Returning original image.") -# result = np.array(img.convert("RGB")) -# result_name = f"output_{uuid.uuid4().hex}.png" -# result_path = os.path.join(OUTPUT_DIR, result_name) -# Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) -# return { -# "result": result_name, -# "error": "No pink/magenta segments detected. Please paint areas to remove with magenta/pink color (RGB 255,0,255)." -# } - -# # Create binary mask: Pink pixels → white (255), Everything else → black (0) -# # Encode in RGBA format that process_inpaint expects -# # process_inpaint does: mask = 255 - mask[:,:,3] -# # So: alpha=0 (transparent/pink) → becomes 255 (white/remove) -# # alpha=255 (opaque/keep) → becomes 0 (black/keep) -# mask_rgba = np.zeros((binmask.shape[0], binmask.shape[1], 4), dtype=np.uint8) -# # RGB channels don't matter for process_inpaint, but set them to white where pink for visualization -# mask_rgba[:, :, 0] = binmask # R: white where pink -# mask_rgba[:, :, 1] = binmask # G: white where pink -# mask_rgba[:, :, 2] = binmask # B: white where pink -# # Alpha: 0 (transparent) where pink → will become white after 255-alpha -# # 255 (opaque) everywhere else → will become black after 255-alpha -# mask_rgba[:, :, 3] = 255 - binmask # Invert: pink areas get alpha=0, rest get alpha=255 - -# # Verify mask encoding -# alpha_zero_count = int((mask_rgba[:,:,3] == 0).sum()) -# alpha_255_count = int((mask_rgba[:,:,3] == 255).sum()) -# total_pixels = binmask.shape[0] * binmask.shape[1] -# log.info(f"Mask encoding: {alpha_zero_count} pixels with alpha=0 (pink), {alpha_255_count} pixels with alpha=255 (keep)") -# log.info(f"After 255-alpha conversion: {alpha_zero_count} will become white (255/remove), {alpha_255_count} will become black (0/keep)") - -# # IMPORTANT: We need to use the ORIGINAL image WITHOUT pink paint for inpainting! -# # Remove pink from the original image before processing -# # Create a clean version: where pink was detected, keep original image colors -# img_clean = np.array(img.convert("RGBA")) -# # Where pink is detected, we want to inpaint, so we can leave it (or blend it out) -# # Actually, the model will inpaint over those areas, so we can pass the original -# # But for better results, we might want to remove the pink overlay first - -# # Process with invert_mask=True (default) because process_inpaint expects alpha=0 for removal -# log.info(f"Starting inpainting process...") -# result = process_inpaint(img_clean, mask_rgba, invert_mask=True) -# log.info(f"Inpainting complete, result shape: {result.shape}") -# result_name = f"output_{uuid.uuid4().hex}.png" -# result_path = os.path.join(OUTPUT_DIR, result_name) -# Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) - -# url: Optional[str] = None -# try: -# if request is not None: -# url = str(request.url_for("download_file", filename=result_name)) -# except Exception: -# url = None - -# logs.append({ -# "result": result_name, -# "filename": image.filename, -# "pink_pixels": nonzero, -# "timestamp": datetime.utcnow().isoformat() -# }) - -# resp: Dict[str, str] = {"result": result_name, "pink_segments_detected": str(nonzero)} -# if url: -# resp["url"] = url -# log_media_click(user_id, category_id) -# return resp -# except Exception as e: -# status = "fail" -# error_msg = str(e) -# raise -# finally: -# # Always log to regular MongoDB (mandatory) -# end_time = time.time() -# response_time_ms = (end_time - start_time) * 1000 -# log_doc = { -# "endpoint": "remove-pink", -# "output_id": result_name, -# "status": status, -# "timestamp": datetime.utcnow(), -# "ts": int(time.time()), -# "response_time_ms": response_time_ms, -# } -# if error_msg: -# log_doc["error"] = error_msg -# try: -# mongo_logs.insert_one(log_doc) -# except Exception as mongo_err: -# log.error("Mongo log insert failed: %s", mongo_err) - - -# @app.get("/download/{filename}") -# def download_file(filename: str): -# path = os.path.join(OUTPUT_DIR, filename) -# if not os.path.isfile(path): -# raise HTTPException(status_code=404, detail="file not found") -# return FileResponse(path) - - -# @app.get("/result/{filename}") -# def view_result(filename: str): -# """View result image directly in browser (same as download but with proper content-type for viewing)""" -# path = os.path.join(OUTPUT_DIR, filename) -# if not os.path.isfile(path): -# raise HTTPException(status_code=404, detail="file not found") -# return FileResponse(path, media_type="image/png") - - -# @app.get("/logs") -# def get_logs(_: None = Depends(bearer_auth)) -> JSONResponse: -# return JSONResponse(content=logs) - - - - - -# import os -# import uuid -# import shutil -# import re -# from datetime import datetime, timedelta, date -# from typing import Dict, List, Optional - -# import numpy as np -# from fastapi import ( -# FastAPI, -# UploadFile, -# File, -# HTTPException, -# Depends, -# Header, -# Request, -# Form, -# ) -# from fastapi.responses import FileResponse, JSONResponse -# from pydantic import BaseModel -# from PIL import Image -# import cv2 -# import logging - -# from bson import ObjectId -# from pymongo import MongoClient -# import time - -# logging.basicConfig(level=logging.INFO) -# log = logging.getLogger("api") - -# from src.core import process_inpaint - -# # Directories (use writable space on HF Spaces) -# BASE_DIR = os.environ.get("DATA_DIR", "/data") -# if not os.path.isdir(BASE_DIR): -# # Fallback to /tmp if /data not available -# BASE_DIR = "/tmp" - -# UPLOAD_DIR = os.path.join(BASE_DIR, "uploads") -# OUTPUT_DIR = os.path.join(BASE_DIR, "outputs") - -# os.makedirs(UPLOAD_DIR, exist_ok=True) -# os.makedirs(OUTPUT_DIR, exist_ok=True) - -# # Optional Bearer token: set env API_TOKEN to require auth; if not set, endpoints are open -# ENV_TOKEN = os.environ.get("API_TOKEN") - -# app = FastAPI(title="Photo Object Removal API", version="1.0.0") - -# # In-memory stores -# file_store: Dict[str, Dict[str, str]] = {} -# logs: List[Dict[str, str]] = [] - -# MONGO_URI = "mongodb+srv://harilogicgo_db_user:pdnh6UCMsWvuTCoi@kiddoimages.k2a4nuv.mongodb.net/?appName=KiddoImages" -# mongo_client = MongoClient(MONGO_URI) -# mongo_db = mongo_client["object_remover"] -# mongo_logs = mongo_db["api_logs"] - -# ADMIN_MONGO_URI = os.environ.get("MONGODB_ADMIN") -# DEFAULT_CATEGORY_ID = "69368f722e46bd68ae188984" -# admin_media_clicks = None - - -# def _init_admin_mongo() -> None: -# global admin_media_clicks -# if not ADMIN_MONGO_URI: -# log.info("Admin Mongo URI not provided; media click logging disabled") -# return -# try: -# admin_client = MongoClient(ADMIN_MONGO_URI) -# # get_default_database() extracts database from connection string (e.g., /adminPanel) -# admin_db = admin_client.get_default_database() -# if admin_db is None: -# # Fallback if no database in URI -# admin_db = admin_client["admin"] -# log.warning("No database in connection string, defaulting to 'admin'") - -# admin_media_clicks = admin_db["media_clicks"] -# log.info( -# "Admin media click logging initialized: db=%s collection=%s", -# admin_db.name, -# admin_media_clicks.name, -# ) -# try: -# admin_media_clicks.drop_index("user_id_1_header_1_media_id_1") -# log.info("Dropped legacy index user_id_1_header_1_media_id_1") -# except Exception as idx_err: -# # Index drop failure is non-critical (often permission issue) -# if "Unauthorized" not in str(idx_err): -# log.info("Skipping legacy index drop: %s", idx_err) -# except Exception as err: -# log.error("Failed to init admin Mongo client: %s", err) -# admin_media_clicks = None - - -# _init_admin_mongo() - - -# def _admin_logging_status() -> Dict[str, object]: -# if admin_media_clicks is None: -# return { -# "enabled": False, -# "db": None, -# "collection": None, -# } -# return { -# "enabled": True, -# "db": admin_media_clicks.database.name, -# "collection": admin_media_clicks.name, -# } - - -# def _build_ai_edit_daily_count( -# existing: Optional[List[Dict[str, object]]], -# today: date, -# ) -> List[Dict[str, object]]: -# """ -# Build / extend the ai_edit_daily_count array with the following rules: - -# - Case A (no existing data): return [{date: today, count: 1}] -# - Case B (today already recorded): return list unchanged -# - Case C (gap in days): fill missing days with count=0 and append today with count=1 - -# Additionally, the returned list is capped to the most recent 32 entries. - -# The stored "date" value is a midnight UTC (naive UTC) datetime for the given day. -# """ - -# def _to_date_only(value: object) -> date: -# if isinstance(value, datetime): -# return value.date() -# if isinstance(value, date): -# return value -# # Fallback: try parsing ISO string "YYYY-MM-DD" or full datetime -# try: -# text = str(value) -# if len(text) == 10: -# return datetime.strptime(text, "%Y-%m-%d").date() -# return datetime.fromisoformat(text).date() -# except Exception: -# # If parsing fails, just treat as today to avoid crashing -# return today - -# # Case A: first ever use (no array yet) -# if not existing: -# return [ -# { -# "date": datetime(today.year, today.month, today.day), -# "count": 1, -# } -# ] - -# # Work on a shallow copy so we don't mutate original in-place -# result: List[Dict[str, object]] = list(existing) - -# last_entry = result[-1] if result else None -# if not last_entry or "date" not in last_entry: -# # If structure is unexpected, re-initialize safely -# return [ -# { -# "date": datetime(today.year, today.month, today.day), -# "count": 1, -# } -# ] - -# last_date = _to_date_only(last_entry["date"]) - -# # If somehow the last stored date is in the future, do nothing to avoid corrupting history -# if last_date > today: -# return result - -# # Case B: today's date already present as the last entry → unchanged -# if last_date == today: -# return result - -# # Case C: there is a gap, fill missing days with count=0 and append today with count=1 -# cursor = last_date + timedelta(days=1) -# while cursor < today: -# result.append( -# { -# "date": datetime(cursor.year, cursor.month, cursor.day), -# "count": 0, -# } -# ) -# cursor += timedelta(days=1) - -# # Finally add today's presence indicator -# result.append( -# { -# "date": datetime(today.year, today.month, today.day), -# "count": 1, -# } -# ) - -# # Sort by date ascending (older dates first) to guarantee stable ordering: -# # [oldest, ..., newest] -# try: -# result.sort(key=lambda entry: _to_date_only(entry.get("date"))) -# except Exception: -# # If anything goes wrong during sort, fall back to current ordering -# pass - -# # Enforce 32-entry limit (keep the most recent 32 days) -# if len(result) > 32: -# result = result[-32:] - -# return result - -# def bearer_auth(authorization: Optional[str] = Header(default=None)) -> None: -# if not ENV_TOKEN: -# return -# if authorization is None or not authorization.lower().startswith("bearer "): -# raise HTTPException(status_code=401, detail="Unauthorized") -# token = authorization.split(" ", 1)[1] -# if token != ENV_TOKEN: -# raise HTTPException(status_code=403, detail="Forbidden") - - -# class InpaintRequest(BaseModel): -# image_id: str -# mask_id: str -# invert_mask: bool = True # True => selected/painted area is removed -# passthrough: bool = False # If True, return the original image unchanged -# user_id: Optional[str] = None -# category_id: Optional[str] = None - - -# class SimpleRemoveRequest(BaseModel): -# image_id: str # Image with pink/magenta segments to remove - - -# def _coerce_object_id(value: Optional[str]) -> ObjectId: -# if value is None: -# return ObjectId() -# value_str = str(value).strip() -# if re.fullmatch(r"[0-9a-fA-F]{24}", value_str): -# return ObjectId(value_str) -# if value_str.isdigit(): -# hex_str = format(int(value_str), "x") -# if len(hex_str) > 24: -# hex_str = hex_str[-24:] -# hex_str = hex_str.rjust(24, "0") -# return ObjectId(hex_str) -# return ObjectId() - - -# def _coerce_category_id(category_id: Optional[str]) -> ObjectId: -# raw = category_id or DEFAULT_CATEGORY_ID -# raw_str = str(raw).strip() -# if re.fullmatch(r"[0-9a-fA-F]{24}", raw_str): -# return ObjectId(raw_str) -# return _coerce_object_id(raw_str) - - -# def log_media_click(user_id: Optional[str], category_id: Optional[str]) -> None: -# """Log to admin media_clicks collection only if user_id is provided.""" -# if admin_media_clicks is None: -# return -# # Only log if user_id is provided (not None/empty) -# if not user_id or not user_id.strip(): -# return -# try: -# user_obj = _coerce_object_id(user_id) -# category_obj = _coerce_category_id(category_id) -# now = datetime.utcnow() -# today = now.date() - -# doc = admin_media_clicks.find_one({"userId": user_obj}) -# if doc: -# existing_daily = doc.get("ai_edit_daily_count") -# updated_daily = _build_ai_edit_daily_count(existing_daily, today) -# categories = doc.get("categories") or [] -# if any(cat.get("categoryId") == category_obj for cat in categories): -# # Category exists: increment click_count and ai_edit_complete, update dates -# admin_media_clicks.update_one( -# {"_id": doc["_id"], "categories.categoryId": category_obj}, -# { -# "$inc": { -# "categories.$.click_count": 1, -# "ai_edit_complete": 1, # $inc handles missing fields (backward compatible) -# }, -# "$set": { -# "categories.$.lastClickedAt": now, -# "updatedAt": now, -# "ai_edit_last_date": now, -# "ai_edit_daily_count": updated_daily, -# }, -# }, -# ) -# else: -# # New category to existing document: push category, increment ai_edit_complete -# admin_media_clicks.update_one( -# {"_id": doc["_id"]}, -# { -# "$push": { -# "categories": { -# "categoryId": category_obj, -# "click_count": 1, -# "lastClickedAt": now, -# } -# }, -# "$inc": {"ai_edit_complete": 1}, # $inc handles missing fields -# "$set": { -# "updatedAt": now, -# "ai_edit_last_date": now, -# "ai_edit_daily_count": updated_daily, -# }, -# }, -# ) -# else: -# # New user: create document with default ai_edit_complete=0, then increment to 1 -# daily_for_new = _build_ai_edit_daily_count(None, today) -# admin_media_clicks.update_one( -# {"userId": user_obj}, -# { -# "$setOnInsert": { -# "userId": user_obj, -# "categories": [ -# { -# "categoryId": category_obj, -# "click_count": 1, -# "lastClickedAt": now, -# } -# ], -# "createdAt": now, -# "updatedAt": now, -# "ai_edit_daily_count": daily_for_new, -# }, -# "$inc": {"ai_edit_complete": 1}, # Increment to 1 on first use -# "$set": { -# "updatedAt": now, -# "ai_edit_last_date": now, -# }, -# }, -# upsert=True, -# ) -# except Exception as err: -# err_str = str(err) -# if "Unauthorized" in err_str or "not authorized" in err_str.lower(): -# log.warning( -# "Admin media click logging failed (permissions): user lacks read/write on db=%s collection=%s. " -# "Check MongoDB user permissions.", -# admin_media_clicks.database.name, -# admin_media_clicks.name, -# ) -# else: -# log.warning("Admin media click logging failed: %s", err) - - -# @app.get("/") -# def root() -> Dict[str, object]: -# return { -# "name": "Photo Object Removal API", -# "status": "ok", -# "endpoints": { -# "GET /health": "health check", -# "POST /upload-image": "form-data: image=file", -# "POST /upload-mask": "form-data: mask=file", -# "POST /inpaint": "JSON: {image_id, mask_id}", -# "POST /inpaint-multipart": "form-data: image=file, mask=file", -# "POST /remove-pink": "form-data: image=file (auto-detects pink segments and removes them)", -# "GET /download/{filename}": "download result image", -# "GET /result/{filename}": "view result image in browser", -# "GET /logs": "recent uploads/results", -# }, -# "auth": "set API_TOKEN env var to require Authorization: Bearer (except /health)", -# } - - -# @app.get("/health") -# def health() -> Dict[str, str]: -# return {"status": "healthy"} - - -# @app.get("/logging-status") -# def logging_status(_: None = Depends(bearer_auth)) -> Dict[str, object]: -# """Helper endpoint to verify admin media logging wiring (no secrets exposed).""" -# return _admin_logging_status() - - -# @app.post("/upload-image") -# def upload_image(image: UploadFile = File(...), _: None = Depends(bearer_auth)) -> Dict[str, str]: -# ext = os.path.splitext(image.filename)[1] or ".png" -# file_id = str(uuid.uuid4()) -# stored_name = f"{file_id}{ext}" -# stored_path = os.path.join(UPLOAD_DIR, stored_name) -# with open(stored_path, "wb") as f: -# shutil.copyfileobj(image.file, f) -# file_store[file_id] = { -# "type": "image", -# "filename": image.filename, -# "stored_name": stored_name, -# "path": stored_path, -# "timestamp": datetime.utcnow().isoformat(), -# } -# logs.append({"id": file_id, "filename": image.filename, "type": "image", "timestamp": datetime.utcnow().isoformat()}) -# return {"id": file_id, "filename": image.filename} - - -# @app.post("/upload-mask") -# def upload_mask(mask: UploadFile = File(...), _: None = Depends(bearer_auth)) -> Dict[str, str]: -# ext = os.path.splitext(mask.filename)[1] or ".png" -# file_id = str(uuid.uuid4()) -# stored_name = f"{file_id}{ext}" -# stored_path = os.path.join(UPLOAD_DIR, stored_name) -# with open(stored_path, "wb") as f: -# shutil.copyfileobj(mask.file, f) -# file_store[file_id] = { -# "type": "mask", -# "filename": mask.filename, -# "stored_name": stored_name, -# "path": stored_path, -# "timestamp": datetime.utcnow().isoformat(), -# } -# logs.append({"id": file_id, "filename": mask.filename, "type": "mask", "timestamp": datetime.utcnow().isoformat()}) -# return {"id": file_id, "filename": mask.filename} - - -# def _load_rgba_image(path: str) -> Image.Image: -# img = Image.open(path) -# return img.convert("RGBA") - - -# def _load_rgba_mask_from_image(img: Image.Image) -> np.ndarray: -# """ -# Convert mask image to RGBA format (black/white mask). -# Standard convention: white (255) = area to remove, black (0) = area to keep -# Returns RGBA with white in RGB channels where removal is needed, alpha=255 -# """ -# if img.mode != "RGBA": -# # For RGB/Grayscale masks: white (value>128) = remove, black (value<=128) = keep -# gray = img.convert("L") -# arr = np.array(gray) -# # Create proper black/white mask: white pixels (>128) = remove, black (<=128) = keep -# mask_bw = np.where(arr > 128, 255, 0).astype(np.uint8) - -# rgba = np.zeros((img.height, img.width, 4), dtype=np.uint8) -# rgba[:, :, 0] = mask_bw # R -# rgba[:, :, 1] = mask_bw # G -# rgba[:, :, 2] = mask_bw # B -# rgba[:, :, 3] = 255 # Fully opaque -# log.info(f"Loaded {img.mode} mask: {int((mask_bw > 0).sum())} white pixels (to remove)") -# return rgba - -# # For RGBA: check if alpha channel is meaningful -# arr = np.array(img) -# alpha = arr[:, :, 3] -# rgb = arr[:, :, :3] - -# # If alpha is mostly opaque everywhere (mean > 200), treat RGB channels as mask values -# if alpha.mean() > 200: -# # Use RGB to determine mask: white/bright in RGB = remove -# gray = cv2.cvtColor(rgb, cv2.COLOR_RGB2GRAY) -# # Also detect magenta specifically -# magenta = np.all(rgb == [255, 0, 255], axis=2).astype(np.uint8) * 255 -# mask_bw = np.maximum(np.where(gray > 128, 255, 0).astype(np.uint8), magenta) - -# rgba = arr.copy() -# rgba[:, :, 0] = mask_bw # R -# rgba[:, :, 1] = mask_bw # G -# rgba[:, :, 2] = mask_bw # B -# rgba[:, :, 3] = 255 # Fully opaque -# log.info(f"Loaded RGBA mask (RGB-based): {int((mask_bw > 0).sum())} white pixels (to remove)") -# return rgba - -# # Alpha channel encodes the mask - convert to RGB-based -# # Transparent areas (alpha < 128) = remove, Opaque areas = keep -# mask_bw = np.where(alpha < 128, 255, 0).astype(np.uint8) -# rgba = arr.copy() -# rgba[:, :, 0] = mask_bw -# rgba[:, :, 1] = mask_bw -# rgba[:, :, 2] = mask_bw -# rgba[:, :, 3] = 255 -# log.info(f"Loaded RGBA mask (alpha-based): {int((mask_bw > 0).sum())} white pixels (to remove)") -# return rgba - -# @app.post("/inpaint") -# def inpaint(req: InpaintRequest, _: None = Depends(bearer_auth)) -> Dict[str, str]: -# start_time = time.time() -# status = "success" -# error_msg = None -# output_name = None - -# try: -# if req.image_id not in file_store or file_store[req.image_id]["type"] != "image": -# raise HTTPException(status_code=404, detail="image_id not found") - -# if req.mask_id not in file_store or file_store[req.mask_id]["type"] != "mask": -# raise HTTPException(status_code=404, detail="mask_id not found") - -# img_rgba = _load_rgba_image(file_store[req.image_id]["path"]) -# mask_img = Image.open(file_store[req.mask_id]["path"]) -# mask_rgba = _load_rgba_mask_from_image(mask_img) - -# if req.passthrough: -# result = np.array(img_rgba.convert("RGB")) -# else: -# result = process_inpaint( -# np.array(img_rgba), -# mask_rgba, -# invert_mask=req.invert_mask -# ) - -# output_name = f"output_{uuid.uuid4().hex}.png" -# output_path = os.path.join(OUTPUT_DIR, output_name) - -# Image.fromarray(result).save( -# output_path, "PNG", optimize=False, compress_level=1 -# ) - -# log_media_click(req.user_id, req.category_id) -# return {"result": output_name} - -# except Exception as e: -# status = "fail" -# error_msg = str(e) -# raise - -# finally: -# end_time = time.time() -# response_time_ms = (end_time - start_time) * 1000 - -# log_doc = { -# "input_image_id": req.image_id, -# "input_mask_id": req.mask_id, -# "output_id": output_name, -# "status": status, -# "timestamp": datetime.utcnow(), -# "ts": int(time.time()), -# "response_time_ms": response_time_ms -# } - -# if error_msg: -# log_doc["error"] = error_msg - -# try: -# mongo_logs.insert_one(log_doc) -# except Exception as mongo_err: -# log.error(f"Mongo log insert failed: {mongo_err}") - -# # @app.post("/inpaint") -# # def inpaint(req: InpaintRequest, _: None = Depends(bearer_auth)) -> Dict[str, str]: -# # if req.image_id not in file_store or file_store[req.image_id]["type"] != "image": -# # raise HTTPException(status_code=404, detail="image_id not found") -# # if req.mask_id not in file_store or file_store[req.mask_id]["type"] != "mask": -# # raise HTTPException(status_code=404, detail="mask_id not found") - -# # img_rgba = _load_rgba_image(file_store[req.image_id]["path"]) -# # mask_img = Image.open(file_store[req.mask_id]["path"]) # may be RGB/gray/RGBA -# # mask_rgba = _load_rgba_mask_from_image(mask_img) - -# # # Debug: check mask before processing -# # white_pixels = int((mask_rgba[:,:,0] > 128).sum()) -# # log.info(f"Inpaint request: mask has {white_pixels} white pixels, invert_mask={req.invert_mask}") - -# # if req.passthrough: -# # result = np.array(img_rgba.convert("RGB")) -# # else: -# # result = process_inpaint(np.array(img_rgba), mask_rgba, invert_mask=req.invert_mask) -# # result_name = f"output_{uuid.uuid4().hex}.png" -# # result_path = os.path.join(OUTPUT_DIR, result_name) -# # Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) - -# # logs.append({"result": result_name, "timestamp": datetime.utcnow().isoformat()}) -# # return {"result": result_name} - - -# @app.post("/inpaint-url") -# def inpaint_url(req: InpaintRequest, request: Request, _: None = Depends(bearer_auth)) -> Dict[str, str]: -# """Same as /inpaint but returns a JSON with a public download URL instead of image bytes.""" -# start_time = time.time() -# status = "success" -# error_msg = None -# result_name = None - -# try: -# if req.image_id not in file_store or file_store[req.image_id]["type"] != "image": -# raise HTTPException(status_code=404, detail="image_id not found") -# if req.mask_id not in file_store or file_store[req.mask_id]["type"] != "mask": -# raise HTTPException(status_code=404, detail="mask_id not found") - -# img_rgba = _load_rgba_image(file_store[req.image_id]["path"]) -# mask_img = Image.open(file_store[req.mask_id]["path"]) # may be RGB/gray/RGBA -# mask_rgba = _load_rgba_mask_from_image(mask_img) - -# if req.passthrough: -# result = np.array(img_rgba.convert("RGB")) -# else: -# result = process_inpaint(np.array(img_rgba), mask_rgba, invert_mask=req.invert_mask) -# result_name = f"output_{uuid.uuid4().hex}.png" -# result_path = os.path.join(OUTPUT_DIR, result_name) -# Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) - -# url = str(request.url_for("download_file", filename=result_name)) -# logs.append({"result": result_name, "url": url, "timestamp": datetime.utcnow().isoformat()}) -# log_media_click(req.user_id, req.category_id) -# return {"result": result_name, "url": url} -# except Exception as e: -# status = "fail" -# error_msg = str(e) -# raise -# finally: -# # Always log to regular MongoDB (mandatory) -# end_time = time.time() -# response_time_ms = (end_time - start_time) * 1000 -# log_doc = { -# "input_image_id": req.image_id, -# "input_mask_id": req.mask_id, -# "output_id": result_name, -# "status": status, -# "timestamp": datetime.utcnow(), -# "ts": int(time.time()), -# "response_time_ms": response_time_ms, -# } -# if error_msg: -# log_doc["error"] = error_msg -# try: -# mongo_logs.insert_one(log_doc) -# except Exception as mongo_err: -# log.error("Mongo log insert failed: %s", mongo_err) - - -# @app.post("/inpaint-multipart") -# def inpaint_multipart( -# image: UploadFile = File(...), -# mask: UploadFile = File(...), -# request: Request = None, -# invert_mask: bool = True, -# mask_is_painted: bool = False, # if True, mask file is the painted-on image (e.g., black strokes on original) -# passthrough: bool = False, -# user_id: Optional[str] = Form(None), -# category_id: Optional[str] = Form(None), -# _: None = Depends(bearer_auth), -# ) -> Dict[str, str]: -# start_time = time.time() -# status = "success" -# error_msg = None -# result_name = None - -# try: -# # Load in-memory -# img = Image.open(image.file).convert("RGBA") -# m = Image.open(mask.file).convert("RGBA") - -# if passthrough: -# # Just echo the input image, ignore mask -# result = np.array(img.convert("RGB")) -# result_name = f"output_{uuid.uuid4().hex}.png" -# result_path = os.path.join(OUTPUT_DIR, result_name) -# Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) - -# url: Optional[str] = None -# try: -# if request is not None: -# url = str(request.url_for("download_file", filename=result_name)) -# except Exception: -# url = None - -# entry: Dict[str, str] = {"result": result_name, "timestamp": datetime.utcnow().isoformat()} -# if url: -# entry["url"] = url -# logs.append(entry) -# resp: Dict[str, str] = {"result": result_name} -# if url: -# resp["url"] = url -# log_media_click(user_id, category_id) -# return resp - -# if mask_is_painted: -# # Auto-detect pink/magenta paint and convert to black/white mask -# # White pixels = areas to remove, Black pixels = areas to keep -# log.info("Auto-detecting pink/magenta paint from uploaded image...") - -# m_rgb = cv2.cvtColor(np.array(m), cv2.COLOR_RGBA2RGB) - -# # Detect pink/magenta using fixed RGB bounds (same as /remove-pink) -# lower = np.array([150, 0, 100], dtype=np.uint8) -# upper = np.array([255, 120, 255], dtype=np.uint8) -# magenta_detected = ( -# (m_rgb[:, :, 0] >= lower[0]) & (m_rgb[:, :, 0] <= upper[0]) & -# (m_rgb[:, :, 1] >= lower[1]) & (m_rgb[:, :, 1] <= upper[1]) & -# (m_rgb[:, :, 2] >= lower[2]) & (m_rgb[:, :, 2] <= upper[2]) -# ).astype(np.uint8) * 255 - -# # Method 2: Also check if original image was provided to find differences -# if img is not None: -# img_rgb = cv2.cvtColor(np.array(img), cv2.COLOR_RGBA2RGB) -# if img_rgb.shape == m_rgb.shape: -# diff = cv2.absdiff(img_rgb, m_rgb) -# gray_diff = cv2.cvtColor(diff, cv2.COLOR_RGB2GRAY) -# # Any significant difference (>50) could be paint -# diff_mask = (gray_diff > 50).astype(np.uint8) * 255 -# # Combine with magenta detection -# binmask = cv2.bitwise_or(magenta_detected, diff_mask) -# else: -# binmask = magenta_detected -# else: -# # No original image provided, use magenta detection only -# binmask = magenta_detected - -# # Clean up the mask: remove noise and fill small holes -# kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) -# # Close small gaps in the mask -# binmask = cv2.morphologyEx(binmask, cv2.MORPH_CLOSE, kernel, iterations=2) -# # Remove small noise -# binmask = cv2.morphologyEx(binmask, cv2.MORPH_OPEN, kernel, iterations=1) - -# nonzero = int((binmask > 0).sum()) -# log.info("Pink/magenta paint detected: %d pixels marked for removal (white)", nonzero) - -# # If very few pixels detected, assume the user may already be providing a BW mask -# # and proceed without forcing strict detection - -# if nonzero < 50: -# log.error("CRITICAL: Could not detect pink/magenta paint! Returning original image.") -# result = np.array(img.convert("RGB")) if img else np.array(m.convert("RGB")) -# result_name = f"output_{uuid.uuid4().hex}.png" -# result_path = os.path.join(OUTPUT_DIR, result_name) -# Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) -# return {"result": result_name, "error": "pink/magenta paint detection failed - very few pixels detected"} - -# # Create binary mask: Pink pixels → white (255), Everything else → black (0) -# # Encode in RGBA format for process_inpaint -# # process_inpaint does: mask = 255 - mask[:,:,3] -# # So: alpha=0 (transparent/pink) → becomes 255 (white/remove) -# # alpha=255 (opaque/keep) → becomes 0 (black/keep) -# mask_rgba = np.zeros((binmask.shape[0], binmask.shape[1], 4), dtype=np.uint8) -# mask_rgba[:, :, 0] = binmask # R: white where pink (for visualization) -# mask_rgba[:, :, 1] = binmask # G: white where pink -# mask_rgba[:, :, 2] = binmask # B: white where pink -# # Alpha: invert so pink areas get alpha=0 → will become white after 255-alpha -# mask_rgba[:, :, 3] = 255 - binmask - -# log.info("Successfully created binary mask: %d pink pixels → white (255), %d pixels → black (0)", -# nonzero, binmask.shape[0] * binmask.shape[1] - nonzero) -# else: -# mask_rgba = _load_rgba_mask_from_image(m) - -# # When mask_is_painted=true, we encode pink as alpha=0, so process_inpaint's default invert_mask=True works correctly -# actual_invert = invert_mask # Use default True for painted masks -# log.info("Using invert_mask=%s (mask_is_painted=%s)", actual_invert, mask_is_painted) - -# result = process_inpaint(np.array(img), mask_rgba, invert_mask=actual_invert) -# result_name = f"output_{uuid.uuid4().hex}.png" -# result_path = os.path.join(OUTPUT_DIR, result_name) -# Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) - -# url: Optional[str] = None -# try: -# if request is not None: -# url = str(request.url_for("download_file", filename=result_name)) -# except Exception: -# url = None - -# entry: Dict[str, str] = {"result": result_name, "timestamp": datetime.utcnow().isoformat()} -# if url: -# entry["url"] = url -# logs.append(entry) -# resp: Dict[str, str] = {"result": result_name} -# if url: -# resp["url"] = url -# log_media_click(user_id, category_id) -# return resp -# except Exception as e: -# status = "fail" -# error_msg = str(e) -# raise -# finally: -# # Always log to regular MongoDB (mandatory) -# end_time = time.time() -# response_time_ms = (end_time - start_time) * 1000 -# log_doc = { -# "endpoint": "inpaint-multipart", -# "output_id": result_name, -# "status": status, -# "timestamp": datetime.utcnow(), -# "ts": int(time.time()), -# "response_time_ms": response_time_ms, -# } -# if error_msg: -# log_doc["error"] = error_msg -# try: -# mongo_logs.insert_one(log_doc) -# except Exception as mongo_err: -# log.error("Mongo log insert failed: %s", mongo_err) - - -# @app.post("/remove-pink") -# def remove_pink_segments( -# image: UploadFile = File(...), -# request: Request = None, -# user_id: Optional[str] = Form(None), -# category_id: Optional[str] = Form(None), -# _: None = Depends(bearer_auth), -# ) -> Dict[str, str]: -# """ -# Simple endpoint: upload an image with pink/magenta segments to remove. -# - Pink/Magenta segments → automatically removed (white in mask) -# - Everything else → automatically kept (black in mask) -# Just paint pink/magenta on areas you want to remove, upload the image, and it works! -# """ -# start_time = time.time() -# status = "success" -# error_msg = None -# result_name = None - -# try: -# log.info(f"Simple remove-pink: processing image {image.filename}") - -# # Load the image (with pink paint on it) -# img = Image.open(image.file).convert("RGBA") -# img_rgb = cv2.cvtColor(np.array(img), cv2.COLOR_RGBA2RGB) - -# # Auto-detect pink/magenta segments to remove -# # Pink/Magenta → white in mask (remove) -# # Everything else (natural image colors, including dark areas) → black in mask (keep) - -# # Detect pink/magenta using fixed RGB bounds per requested logic -# lower = np.array([150, 0, 100], dtype=np.uint8) -# upper = np.array([255, 120, 255], dtype=np.uint8) -# binmask = ( -# (img_rgb[:, :, 0] >= lower[0]) & (img_rgb[:, :, 0] <= upper[0]) & -# (img_rgb[:, :, 1] >= lower[1]) & (img_rgb[:, :, 1] <= upper[1]) & -# (img_rgb[:, :, 2] >= lower[2]) & (img_rgb[:, :, 2] <= upper[2]) -# ).astype(np.uint8) * 255 - -# # Clean up the pink mask -# kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) -# binmask = cv2.morphologyEx(binmask, cv2.MORPH_CLOSE, kernel, iterations=2) -# binmask = cv2.morphologyEx(binmask, cv2.MORPH_OPEN, kernel, iterations=1) - -# nonzero = int((binmask > 0).sum()) -# total_pixels = binmask.shape[0] * binmask.shape[1] -# log.info(f"Detected {nonzero} pink pixels ({100*nonzero/total_pixels:.2f}% of image) to remove") - -# # Debug: log bounds used -# log.info("Pink detection bounds used: lower=[150,0,100], upper=[255,120,255]") - -# if nonzero < 50: -# log.error("No pink segments detected! Returning original image.") -# result = np.array(img.convert("RGB")) -# result_name = f"output_{uuid.uuid4().hex}.png" -# result_path = os.path.join(OUTPUT_DIR, result_name) -# Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) -# return { -# "result": result_name, -# "error": "No pink/magenta segments detected. Please paint areas to remove with magenta/pink color (RGB 255,0,255)." -# } - -# # Create binary mask: Pink pixels → white (255), Everything else → black (0) -# # Encode in RGBA format that process_inpaint expects -# # process_inpaint does: mask = 255 - mask[:,:,3] -# # So: alpha=0 (transparent/pink) → becomes 255 (white/remove) -# # alpha=255 (opaque/keep) → becomes 0 (black/keep) -# mask_rgba = np.zeros((binmask.shape[0], binmask.shape[1], 4), dtype=np.uint8) -# # RGB channels don't matter for process_inpaint, but set them to white where pink for visualization -# mask_rgba[:, :, 0] = binmask # R: white where pink -# mask_rgba[:, :, 1] = binmask # G: white where pink -# mask_rgba[:, :, 2] = binmask # B: white where pink -# # Alpha: 0 (transparent) where pink → will become white after 255-alpha -# # 255 (opaque) everywhere else → will become black after 255-alpha -# mask_rgba[:, :, 3] = 255 - binmask # Invert: pink areas get alpha=0, rest get alpha=255 - -# # Verify mask encoding -# alpha_zero_count = int((mask_rgba[:,:,3] == 0).sum()) -# alpha_255_count = int((mask_rgba[:,:,3] == 255).sum()) -# total_pixels = binmask.shape[0] * binmask.shape[1] -# log.info(f"Mask encoding: {alpha_zero_count} pixels with alpha=0 (pink), {alpha_255_count} pixels with alpha=255 (keep)") -# log.info(f"After 255-alpha conversion: {alpha_zero_count} will become white (255/remove), {alpha_255_count} will become black (0/keep)") - -# # IMPORTANT: We need to use the ORIGINAL image WITHOUT pink paint for inpainting! -# # Remove pink from the original image before processing -# # Create a clean version: where pink was detected, keep original image colors -# img_clean = np.array(img.convert("RGBA")) -# # Where pink is detected, we want to inpaint, so we can leave it (or blend it out) -# # Actually, the model will inpaint over those areas, so we can pass the original -# # But for better results, we might want to remove the pink overlay first - -# # Process with invert_mask=True (default) because process_inpaint expects alpha=0 for removal -# log.info(f"Starting inpainting process...") -# result = process_inpaint(img_clean, mask_rgba, invert_mask=True) -# log.info(f"Inpainting complete, result shape: {result.shape}") -# result_name = f"output_{uuid.uuid4().hex}.png" -# result_path = os.path.join(OUTPUT_DIR, result_name) -# Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) - -# url: Optional[str] = None -# try: -# if request is not None: -# url = str(request.url_for("download_file", filename=result_name)) -# except Exception: -# url = None - -# logs.append({ -# "result": result_name, -# "filename": image.filename, -# "pink_pixels": nonzero, -# "timestamp": datetime.utcnow().isoformat() -# }) - -# resp: Dict[str, str] = {"result": result_name, "pink_segments_detected": str(nonzero)} -# if url: -# resp["url"] = url -# log_media_click(user_id, category_id) -# return resp -# except Exception as e: -# status = "fail" -# error_msg = str(e) -# raise -# finally: -# # Always log to regular MongoDB (mandatory) -# end_time = time.time() -# response_time_ms = (end_time - start_time) * 1000 -# log_doc = { -# "endpoint": "remove-pink", -# "output_id": result_name, -# "status": status, -# "timestamp": datetime.utcnow(), -# "ts": int(time.time()), -# "response_time_ms": response_time_ms, -# } -# if error_msg: -# log_doc["error"] = error_msg -# try: -# mongo_logs.insert_one(log_doc) -# except Exception as mongo_err: -# log.error("Mongo log insert failed: %s", mongo_err) - - -# @app.get("/download/{filename}") -# def download_file(filename: str): -# path = os.path.join(OUTPUT_DIR, filename) -# if not os.path.isfile(path): -# raise HTTPException(status_code=404, detail="file not found") -# return FileResponse(path) - - -# @app.get("/result/{filename}") -# def view_result(filename: str): -# """View result image directly in browser (same as download but with proper content-type for viewing)""" -# path = os.path.join(OUTPUT_DIR, filename) -# if not os.path.isfile(path): -# raise HTTPException(status_code=404, detail="file not found") -# return FileResponse(path, media_type="image/png") - - -# @app.get("/logs") -# def get_logs(_: None = Depends(bearer_auth)) -> JSONResponse: -# return JSONResponse(content=logs) - - - - import os import uuid import shutil @@ -4247,7 +855,8 @@ def inpaint(req: InpaintRequest, request: Request, _: None = Depends(bearer_auth compressed_path = os.path.join(OUTPUT_DIR, compressed_name) try: _compress_image(output_path, compressed_path, quality=85) - compressed_url = str(request.url_for("download_file", filename=compressed_name)) + # compressed_url = str(request.url_for("download_file", filename=compressed_name)) + compressed_url = str(request.url_for("download_file", filename=compressed_name).replace("http://", "https://")) except Exception as compress_err: log.warning("Failed to create compressed image: %s", compress_err) compressed_url = None