Spaces:
Running
Running
| import logging | |
| from fastapi import FastAPI, UploadFile, File, HTTPException, Form, Request | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import FileResponse, JSONResponse | |
| import google.generativeai as genai | |
| import base64 | |
| import os | |
| from pathlib import Path | |
| from typing import List | |
| import io | |
| from PIL import Image | |
| import razorpay | |
| from razorpay.errors import SignatureVerificationError | |
| from supabase import create_client, Client | |
| from pydantic import BaseModel | |
| from typing import Optional | |
| import time | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Initialize FastAPI app | |
| app = FastAPI(title="Gemini Image Generator API with Razorpay") | |
| # Enable CORS for the frontend | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=[ | |
| "https://hivili.web.app", | |
| "http://localhost:3000", | |
| "https://*.lovable.dev", | |
| "https://*.sandbox.lovable.dev", | |
| ], | |
| allow_origin_regex=r"https://.*\.lovable\.dev|https://.*\.sandbox\.lovable\.dev", | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # ===== API CONFIGURATION ===== | |
| GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") | |
| if not GEMINI_API_KEY: | |
| logger.error("GEMINI_API_KEY is not set") | |
| raise HTTPException(status_code=500, detail="GEMINI_API_KEY is not set") | |
| genai.configure(api_key=GEMINI_API_KEY) | |
| MODEL_NAME = "gemini-1.5-flash" # Use a valid model (verify in Google's documentation) | |
| # ===== RAZORPAY CONFIGURATION ===== | |
| RAZORPAY_KEY_ID = os.getenv("RAZORPAY_KEY_ID") | |
| RAZORPAY_KEY_SECRET = os.getenv("RAZORPAY_KEY_SECRET") | |
| razorpay_client = razorpay.Client(auth=(RAZORPAY_KEY_ID, RAZORPAY_KEY_SECRET)) if RAZORPAY_KEY_ID and RAZORPAY_KEY_SECRET else None | |
| # ===== SUPABASE CONFIGURATION ===== | |
| SUPABASE_URL = os.getenv("SUPABASE_URL") | |
| SUPABASE_KEY = os.getenv("SUPABASE_KEY") | |
| supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) if SUPABASE_URL and SUPABASE_KEY else None | |
| # Pydantic models for JSON input validation | |
| class CreateOrderRequest(BaseModel): | |
| amount: int | |
| class VerifyPaymentRequest(BaseModel): | |
| razorpay_order_id: str | |
| razorpay_payment_id: str | |
| razorpay_signature: str | |
| user_id: Optional[str] = None | |
| # ===== IMAGE PROCESSING ===== | |
| def validate_image(image_content: bytes): | |
| """Validate image meets API requirements""" | |
| try: | |
| size_mb = len(image_content) / (1024 * 1024) | |
| if size_mb > 20: | |
| raise HTTPException(status_code=400, detail="Image too large (max 20MB)") | |
| img = Image.open(io.BytesIO(image_content)) | |
| if img.format not in ["PNG", "JPEG"]: | |
| raise HTTPException(status_code=400, detail="Only PNG or JPEG images are supported") | |
| logger.info(f"Validated image: format={img.format}, size={size_mb:.2f}MB") | |
| return True, img.format.lower() | |
| except Exception as e: | |
| logger.error(f"Image validation error: {str(e)}") | |
| raise HTTPException(status_code=400, detail=f"Image validation error: {str(e)}") | |
| # ===== API FUNCTIONS ===== | |
| def create_multi_image_task(subject_images: List[bytes], prompt: str): | |
| """Create image generation task with Gemini API (up to two images)""" | |
| try: | |
| model = genai.GenerativeModel(MODEL_NAME) | |
| parts = [] | |
| for img_content in subject_images: | |
| _, img_format = validate_image(img_content) | |
| parts.append({ | |
| "inline_data": { | |
| "data": base64.b64encode(img_content).decode('utf-8'), | |
| "mime_type": f"image/{img_format}" | |
| } | |
| }) | |
| enhanced_prompt = f"A photorealistic composition combining elements from the provided images: {prompt}. Ensure the scene is cohesive, with soft, natural lighting and a balanced aspect ratio of 16:9." | |
| parts.append({"text": enhanced_prompt}) | |
| logger.info(f"Sending request to Gemini API with prompt: {prompt}") | |
| response = model.generate_content( | |
| parts, | |
| generation_config={"response_mime_type": "image/png"}, | |
| safety_settings=[ | |
| {"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"}, | |
| {"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_NONE"}, | |
| {"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE"}, | |
| {"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"} | |
| ] | |
| ) | |
| logger.info(f"API response: {response}") | |
| if not response.candidates or not response.candidates[0].content: | |
| raise HTTPException(status_code=500, detail="No valid content returned from API") | |
| return response | |
| except Exception as e: | |
| logger.error(f"API request failed: {str(e)}") | |
| raise HTTPException(status_code=500, detail=f"API Error: {str(e)}") | |
| async def generate_image(subject_images: List[bytes], prompt: str): | |
| """Handle complete image generation workflow""" | |
| if len(subject_images) != 2: | |
| raise HTTPException(status_code=400, detail="Exactly two images are required") | |
| for img_content in subject_images: | |
| validate_image(img_content) | |
| response = create_multi_image_task(subject_images, prompt) | |
| try: | |
| candidate = response.candidates[0] | |
| parts = candidate.content.parts | |
| logger.info(f"Response parts: {parts}") | |
| image_data = None | |
| for part in parts: | |
| if hasattr(part, 'inline_data') and part.inline_data.data: | |
| image_data = part.inline_data.data | |
| break | |
| elif hasattr(part, 'text'): | |
| logger.info(f"Text part found: {part.text}") | |
| if not image_data: | |
| raise HTTPException(status_code=500, detail="No image data found in API response") | |
| output_dir = Path("/tmp") | |
| output_dir.mkdir(exist_ok=True) | |
| output_path = output_dir / f"gemini_output_{int(time.time())}.png" | |
| with open(output_path, "wb") as f: | |
| f.write(image_data) | |
| return output_path | |
| except Exception as e: | |
| logger.error(f"Failed to process API response: {str(e)}") | |
| raise HTTPException(status_code=500, detail=f"Failed to process result: {str(e)}") | |
| # ===== API ENDPOINTS ===== | |
| async def generate_image_endpoint( | |
| prompt: str = Form(...), | |
| images: List[UploadFile] = File(...) | |
| ): | |
| """Endpoint to generate an image from exactly two input images and a prompt""" | |
| try: | |
| if len(images) != 2: | |
| raise HTTPException(status_code=400, detail="Exactly two images are required") | |
| image_contents = [await image.read() for image in images] | |
| output_path = await generate_image(image_contents, prompt) | |
| return FileResponse( | |
| path=output_path, | |
| media_type="image/png", | |
| filename=f"gemini_output_{Path(output_path).stem}.png" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error in /generate: {str(e)}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def create_order_endpoint( | |
| request: Request, | |
| amount: Optional[int] = Form(None), | |
| body: Optional[CreateOrderRequest] = None | |
| ): | |
| """Create a Razorpay order (supports form-data and JSON)""" | |
| logger.info("Received create order request") | |
| if not RAZORPAY_KEY_ID or not RAZORPAY_KEY_SECRET: | |
| logger.error("Razorpay configuration missing") | |
| raise HTTPException(status_code=500, detail="Razorpay configuration missing") | |
| try: | |
| if body and body.amount: | |
| amount = body.amount | |
| elif not amount: | |
| try: | |
| json_body = await request.json() | |
| amount = json_body.get('amount') | |
| except: | |
| pass | |
| if not amount or amount <= 0: | |
| raise HTTPException(status_code=422, detail="Missing or invalid 'amount' parameter") | |
| logger.info(f"Creating order with amount: {amount}") | |
| order = create_razorpay_order(amount) | |
| response_data = { | |
| "id": order["id"], | |
| "amount": order["amount"], | |
| "currency": order["currency"], | |
| "key_id": RAZORPAY_KEY_ID | |
| } | |
| logger.info(f"Order created successfully: {order['id']}") | |
| return JSONResponse(content=response_data) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error creating order: {str(e)}") | |
| raise HTTPException(status_code=500, detail=f"Failed to create order: {str(e)}") | |
| async def verify_payment_endpoint( | |
| request: Request, | |
| razorpay_order_id: Optional[str] = Form(None), | |
| razorpay_payment_id: Optional[str] = Form(None), | |
| razorpay_signature: Optional[str] = Form(None), | |
| user_id: Optional[str] = Form(None), | |
| body: Optional[VerifyPaymentRequest] = None | |
| ): | |
| """Verify Razorpay payment signature (supports form-data and JSON)""" | |
| logger.info("Received payment verification request") | |
| try: | |
| if body: | |
| razorpay_order_id = razorpay_order_id or body.razorpay_order_id | |
| razorpay_payment_id = razorpay_payment_id or body.razorpay_payment_id | |
| razorpay_signature = razorpay_signature or body.razorpay_signature | |
| user_id = user_id or body.user_id | |
| else: | |
| try: | |
| json_body = await request.json() | |
| razorpay_order_id = razorpay_order_id or json_body.get('razorpay_order_id') | |
| razorpay_payment_id = razorpay_payment_id or json_body.get('razorpay_payment_id') | |
| razorpay_signature = razorpay_signature or json_body.get('razorpay_signature') | |
| user_id = user_id or json_body.get('user_id') | |
| except: | |
| pass | |
| if not all([razorpay_order_id, razorpay_payment_id, razorpay_signature]): | |
| missing_fields = [] | |
| if not razorpay_order_id: | |
| missing_fields.append("razorpay_order_id") | |
| if not razorpay_payment_id: | |
| missing_fields.append("razorpay_payment_id") | |
| if not razorpay_signature: | |
| missing_fields.append("razorpay_signature") | |
| logger.error(f"Missing required fields: {missing_fields}") | |
| raise HTTPException( | |
| status_code=422, | |
| detail=f"Missing required fields: {', '.join(missing_fields)}" | |
| ) | |
| logger.info(f"Verifying payment for order_id: {razorpay_order_id}") | |
| is_valid = verify_payment_signature(razorpay_order_id, razorpay_payment_id, razorpay_signature) | |
| if is_valid: | |
| if user_id and supabase: | |
| logger.info(f"Updating Supabase for user_id: {user_id}") | |
| try: | |
| supabase.table("users").update({"is_premium": True}).eq("user_id", user_id).execute() | |
| logger.info(f"Successfully updated premium status for user: {user_id}") | |
| except Exception as e: | |
| logger.error(f"Failed to update Supabase: {str(e)}") | |
| return JSONResponse(content={"success": True, "message": "Payment verified successfully"}) | |
| else: | |
| logger.warning(f"Payment verification failed for order: {razorpay_order_id}") | |
| return JSONResponse(content={"success": False, "message": "Payment verification failed"}, status_code=400) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error verifying payment: {str(e)}") | |
| raise HTTPException(status_code=500, detail=f"Verification error: {str(e)}") | |
| async def index(): | |
| return { | |
| "status": "Gemini Image Generator API with Razorpay is running", | |
| "endpoints": { | |
| "generate": "POST /generate", | |
| "create_order": "POST /create-razorpay-order", | |
| "verify_payment": "POST /verify-razorpay-payment" | |
| } | |
| } | |
| async def health_check(): | |
| return { | |
| "status": "healthy", | |
| "razorpay_configured": bool(RAZORPAY_KEY_ID and RAZORPAY_KEY_SECRET), | |
| "supabase_configured": bool(SUPABASE_URL and SUPABASE_KEY) | |
| } | |
| def create_razorpay_order(amount: int): | |
| """Create a Razorpay order""" | |
| try: | |
| if amount <= 0: | |
| raise ValueError("Amount must be a positive integer") | |
| order_data = { | |
| "amount": amount * 100, # Convert INR to paise | |
| "currency": "INR", | |
| "payment_capture": 1 # Auto-capture payment | |
| } | |
| order = razorpay_client.order.create(data=order_data) | |
| logger.info(f"Razorpay order created successfully: {order['id']}") | |
| return order | |
| except Exception as e: | |
| logger.error(f"Failed to create Razorpay order: {str(e)}") | |
| raise HTTPException(status_code=500, detail=f"Failed to create order: {str(e)}") | |
| def verify_payment_signature(order_id: str, payment_id: str, signature: str): | |
| """Verify Razorpay payment signature""" | |
| try: | |
| params_dict = { | |
| "razorpay_order_id": order_id, | |
| "razorpay_payment_id": payment_id, | |
| "razorpay_signature": signature | |
| } | |
| razorpay_client.utility.verify_payment_signature(params_dict) | |
| logger.info(f"Payment signature verified successfully for order: {order_id}") | |
| return True | |
| except SignatureVerificationError as e: | |
| logger.error(f"Payment signature verification failed: {str(e)}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"Error verifying payment signature: {str(e)}") | |
| raise HTTPException(status_code=500, detail=f"Verification error: {str(e)}") | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) |