Spaces:
Running
Running
| import logging | |
| from fastapi import FastAPI, UploadFile, File, HTTPException, Form, Request | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import FileResponse, JSONResponse | |
| import base64 | |
| import os | |
| import time | |
| import jwt | |
| from pathlib import Path | |
| from typing import List | |
| import io | |
| import razorpay | |
| from razorpay.errors import SignatureVerificationError | |
| from supabase import create_client, Client | |
| from pydantic import BaseModel | |
| from typing import Optional | |
| from PIL import Image | |
| from io import BytesIO | |
| import google.generativeai as genai | |
| import tempfile | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Initialize FastAPI app | |
| app = FastAPI(title="Gemini Image Editing API with Razorpay") | |
| # Enable CORS for frontend | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=[ | |
| "https://hivili.web.app", | |
| "https://hivili.com", | |
| "http://localhost:3000", | |
| "https://*.lovable.dev", | |
| "https://*.sandbox.lovable.dev", | |
| ], | |
| allow_origin_regex=r"https://.*\.lovable\.dev|https://.*\.sandbox\.lovable\.dev", | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # ===== API CONFIGURATION ===== | |
| GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") | |
| GEMINI_MODEL = "gemini-2.5-flash-image-preview" | |
| # Validate Gemini API key presence | |
| if not GEMINI_API_KEY: | |
| logger.error("GEMINI_API_KEY environment variable is not set") | |
| raise ValueError("GEMINI_API_KEY environment variable is required") | |
| # Configure Gemini API | |
| genai.configure(api_key=GEMINI_API_KEY) | |
| # ===== RAZORPAY CONFIGURATION ===== | |
| RAZORPAY_KEY_ID = os.getenv("RAZORPAY_KEY_ID") | |
| RAZORPAY_KEY_SECRET = os.getenv("RAZORPAY_KEY_SECRET") | |
| razorpay_client = razorpay.Client(auth=(RAZORPAY_KEY_ID, RAZORPAY_KEY_SECRET)) if RAZORPAY_KEY_ID and RAZORPAY_KEY_SECRET else None | |
| # ===== SUPABASE CONFIGURATION ===== | |
| SUPABASE_URL = os.getenv("SUPABASE_URL") | |
| SUPABASE_KEY = os.getenv("SUPABASE_KEY") | |
| supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) if SUPABASE_URL and SUPABASE_KEY else None | |
| # Pydantic models for JSON input validation | |
| class CreateOrderRequest(BaseModel): | |
| amount: int | |
| class VerifyPaymentRequest(BaseModel): | |
| razorpay_order_id: str | |
| razorpay_payment_id: str | |
| razorpay_signature: str | |
| user_id: Optional[str] = None | |
| class GenerateImageRequest(BaseModel): | |
| prompt: str | |
| user_id: Optional[str] = None | |
| # ===== AUTHENTICATION ===== | |
| def generate_jwt_token(): | |
| """Generate JWT token for API authentication""" | |
| payload = { | |
| "iss": "gemini-image-editor", | |
| "exp": int(time.time()) + 1800, # 30 minutes expiration | |
| "nbf": int(time.time()) - 5 # Not before 5 seconds ago | |
| } | |
| return jwt.encode(payload, GEMINI_API_KEY, algorithm="HS256") | |
| # ===== IMAGE PROCESSING ===== | |
| def prepare_image_base64(image_content: bytes): | |
| """Convert image bytes to base64 without prefix""" | |
| try: | |
| return base64.b64encode(image_content).decode('utf-8') | |
| except Exception as e: | |
| logger.error(f"Image processing failed: {str(e)}") | |
| raise HTTPException(status_code=500, detail=f"Image processing failed: {str(e)}") | |
| def validate_image(image_content: bytes): | |
| """Validate image meets API requirements""" | |
| try: | |
| size_mb = len(image_content) / (1024 * 1024) | |
| if size_mb > 10: | |
| raise HTTPException(status_code=400, detail="Image too large (max 10MB)") | |
| img = Image.open(BytesIO(image_content)) | |
| if img.format.lower() not in ['png', 'jpg', 'jpeg', 'webp']: | |
| raise HTTPException(status_code=400, detail="Unsupported image format. Use PNG, JPG, JPEG, or WEBP") | |
| return True, "" | |
| except Exception as e: | |
| raise HTTPException(status_code=400, detail=f"Image validation error: {str(e)}") | |
| def generate_gemini_image(images: List[Image.Image], prompt: str): | |
| """Generate or edit image using Gemini API""" | |
| try: | |
| contents = images + [prompt] | |
| response = genai.GenerativeModel(GEMINI_MODEL).generate_content(contents) | |
| text_response = "" | |
| image_path = None | |
| for part in response.candidates[0].content.parts: | |
| if part.text: | |
| text_response += part.text + "\n" | |
| elif hasattr(part, 'inline_data') and part.inline_data: | |
| with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp: | |
| temp_path = tmp.name | |
| generated_image = Image.open(BytesIO(part.inline_data.data)) | |
| generated_image.save(temp_path) | |
| image_path = temp_path | |
| logger.info(f"Generated image saved to: {temp_path} with prompt: {prompt}") | |
| if image_path: | |
| return image_path, "" | |
| else: | |
| return None, text_response.strip() | |
| except Exception as e: | |
| logger.error(f"Gemini API error: {str(e)}") | |
| raise HTTPException(status_code=500, detail=f"Gemini API error: {str(e)}") | |
| # ===== RAZORPAY FUNCTIONS ===== | |
| def create_razorpay_order(amount: int): | |
| """Create a Razorpay order""" | |
| try: | |
| if not razorpay_client: | |
| raise HTTPException(status_code=500, detail="Razorpay configuration missing") | |
| if amount <= 0: | |
| raise ValueError("Amount must be a positive integer") | |
| order_data = { | |
| "amount": amount * 100, # Convert INR to paise | |
| "currency": "INR", | |
| "payment_capture": 1 # Auto-capture payment | |
| } | |
| order = razorpay_client.order.create(data=order_data) | |
| logger.info(f"Razorpay order created successfully: {order['id']}") | |
| return order | |
| except Exception as e: | |
| logger.error(f"Failed to create Razorpay order: {str(e)}") | |
| raise HTTPException(status_code=500, detail=f"Failed to create order: {str(e)}") | |
| def verify_payment_signature(order_id: str, payment_id: str, signature: str): | |
| """Verify Razorpay payment signature""" | |
| try: | |
| if not razorpay_client: | |
| raise HTTPException(status_code=500, detail="Razorpay configuration missing") | |
| params_dict = { | |
| "razorpay_order_id": order_id, | |
| "razorpay_payment_id": payment_id, | |
| "razorpay_signature": signature | |
| } | |
| razorpay_client.utility.verify_payment_signature(params_dict) | |
| logger.info(f"Payment signature verified successfully for order: {order_id}") | |
| return True | |
| except SignatureVerificationError as e: | |
| logger.error(f"Payment signature verification failed: {str(e)}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"Error verifying payment signature: {str(e)}") | |
| raise HTTPException(status_code=500, detail=f"Verification error: {str(e)}") | |
| # ===== MAIN PROCESSING ===== | |
| async def generate_image(images: List[bytes], prompt: str, user_id: Optional[str] = None): | |
| """Handle complete image generation workflow""" | |
| # Validate images | |
| for img_content in images: | |
| if img_content: | |
| validate_image(img_content) | |
| # Convert bytes to PIL Images | |
| pil_images = [] | |
| for img_content in images: | |
| try: | |
| img = Image.open(BytesIO(img_content)) | |
| if img.mode == "RGBA": | |
| img = img.convert("RGBA") | |
| pil_images.append(img) | |
| except Exception as e: | |
| logger.error(f"Image conversion failed: {str(e)}") | |
| raise HTTPException(status_code=400, detail=f"Image conversion failed: {str(e)}") | |
| if len(pil_images) < 1: | |
| raise HTTPException(status_code=400, detail="At least one image required") | |
| # Check user premium status if Supabase is configured | |
| if user_id and supabase: | |
| try: | |
| user_data = supabase.table("users").select("is_premium").eq("user_id", user_id).execute() | |
| if not user_data.data or not user_data.data[0].get("is_premium"): | |
| raise HTTPException(status_code=403, detail="Premium subscription required") | |
| except Exception as e: | |
| logger.error(f"Supabase user check failed: {str(e)}") | |
| raise HTTPException(status_code=500, detail=f"User verification failed: {str(e)}") | |
| # Generate image using Gemini API | |
| image_path, text_response = generate_gemini_image(pil_images, prompt) | |
| if image_path: | |
| return image_path | |
| else: | |
| raise HTTPException(status_code=500, detail=f"Image generation failed: {text_response or 'Unknown error'}") | |
| # ===== API ENDPOINTS ===== | |
| async def generate_image_endpoint( | |
| prompt: str = Form(...), | |
| images: List[UploadFile] = File(...), | |
| user_id: Optional[str] = Form(None) | |
| ): | |
| """Endpoint to generate or edit an image using Gemini API""" | |
| try: | |
| if len(images) < 1: | |
| raise HTTPException(status_code=400, detail="At least one image required") | |
| if len(images) > 4: | |
| raise HTTPException(status_code=400, detail="Maximum 4 images allowed") | |
| image_contents = [await image.read() for image in images] | |
| output_path = await generate_image(image_contents, prompt, user_id) | |
| return FileResponse( | |
| path=output_path, | |
| media_type="image/png", | |
| filename=f"gemini_output_{Path(output_path).stem}.png" | |
| ) | |
| except HTTPException as e: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error in /generate: {str(e)}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def create_order_endpoint( | |
| request: Request, | |
| amount: Optional[int] = Form(None), | |
| body: Optional[CreateOrderRequest] = None | |
| ): | |
| """Create a Razorpay order (supports form-data and JSON)""" | |
| logger.info("Received create order request") | |
| try: | |
| if not razorpay_client: | |
| raise HTTPException(status_code=500, detail="Razorpay configuration missing") | |
| # Handle JSON body if provided | |
| if body and body.amount: | |
| amount = body.amount | |
| elif not amount: | |
| try: | |
| json_body = await request.json() | |
| amount = json_body.get('amount') | |
| except: | |
| pass | |
| if not amount or amount <= 0: | |
| raise HTTPException(status_code=422, detail="Missing or invalid 'amount' parameter") | |
| logger.info(f"Creating order with amount: {amount}") | |
| order = create_razorpay_order(amount) | |
| response_data = { | |
| "id": order["id"], | |
| "amount": order["amount"], | |
| "currency": "INR", | |
| "key_id": RAZORPAY_KEY_ID | |
| } | |
| logger.info(f"Order created successfully: {order['id']}") | |
| return JSONResponse(content=response_data) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error creating order: {str(e)}") | |
| raise HTTPException(status_code=500, detail=f"Failed to create order: {str(e)}") | |
| async def verify_payment_endpoint( | |
| request: Request, | |
| razorpay_order_id: Optional[str] = Form(None), | |
| razorpay_payment_id: Optional[str] = Form(None), | |
| razorpay_signature: Optional[str] = Form(None), | |
| user_id: Optional[str] = Form(None), | |
| body: Optional[VerifyPaymentRequest] = None | |
| ): | |
| """Verify Razorpay payment signature (supports form-data and JSON)""" | |
| logger.info("Received payment verification request") | |
| try: | |
| # Handle JSON body if provided | |
| if body: | |
| razorpay_order_id = razorpay_order_id or body.razorpay_order_id | |
| razorpay_payment_id = razorpay_payment_id or body.razorpay_payment_id | |
| razorpay_signature = razorpay_signature or body.razorpay_signature | |
| user_id = user_id or body.user_id | |
| else: | |
| try: | |
| json_body = await request.json() | |
| razorpay_order_id = razorpay_order_id or json_body.get('razorpay_order_id') | |
| razorpay_payment_id = razorpay_payment_id or json_body.get('razorpay_payment_id') | |
| razorpay_signature = razorpay_signature or json_body.get('razorpay_signature') | |
| user_id = user_id or json_body.get('user_id') | |
| except: | |
| pass | |
| # Validate required fields | |
| if not all([razorpay_order_id, razorpay_payment_id, razorpay_signature]): | |
| missing_fields = [] | |
| if not razorpay_order_id: missing_fields.append("razorpay_order_id") | |
| if not razorpay_payment_id: missing_fields.append("razorpay_payment_id") | |
| if not razorpay_signature: missing_fields.append("razorpay_signature") | |
| logger.error(f"Missing required fields: {missing_fields}") | |
| raise HTTPException( | |
| status_code=422, | |
| detail=f"Missing required fields: {', '.join(missing_fields)}" | |
| ) | |
| logger.info(f"Verifying payment for order_id: {razorpay_order_id}") | |
| is_valid = verify_payment_signature(razorpay_order_id, razorpay_payment_id, razorpay_signature) | |
| if is_valid: | |
| if user_id and supabase: | |
| logger.info(f"Updating Supabase for user_id: {user_id}") | |
| try: | |
| supabase.table("users").update({"is_premium": True}).eq("user_id", user_id).execute() | |
| logger.info(f"Successfully updated premium status for user: {user_id}") | |
| except Exception as e: | |
| logger.error(f"Failed to update Supabase: {str(e)}") | |
| return JSONResponse(content={"success": True, "message": "Payment verified successfully"}) | |
| else: | |
| logger.warning(f"Payment verification failed for order: {razorpay_order_id}") | |
| return JSONResponse(content={"success": False, "message": "Payment verification failed"}, status_code=400) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error verifying payment: {str(e)}") | |
| raise HTTPException(status_code=500, detail=f"Verification error: {str(e)}") | |
| async def index(): | |
| return { | |
| "status": "Gemini Image Editing API with Razorpay is running", | |
| "endpoints": { | |
| "generate": "POST /generate", | |
| "create_order": "POST /create-razorpay-order", | |
| "verify_payment": "POST /verify-razorpay-payment" | |
| } | |
| } | |
| async def health_check(): | |
| return { | |
| "status": "healthy", | |
| "razorpay_configured": bool(RAZORPAY_KEY_ID and RAZORPAY_KEY_SECRET), | |
| "supabase_configured": bool(SUPABASE_URL and SUPABASE_KEY), | |
| "gemini_configured": bool(GEMINI_API_KEY) | |
| } | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) |