Spaces:
Running
Running
| import os | |
| import json | |
| import cv2 | |
| import boto3 | |
| import shutil | |
| import razorpay | |
| from fastapi import FastAPI, Request, BackgroundTasks, UploadFile, File, Form, HTTPException, Response | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.staticfiles import StaticFiles | |
| from slowapi import Limiter, _rate_limit_exceeded_handler | |
| from slowapi.util import get_remote_address | |
| from slowapi.errors import RateLimitExceeded | |
| from pydantic import BaseModel | |
| from dotenv import load_dotenv | |
| from typing import Optional | |
| from fastapi import Response | |
| import firebase_admin | |
| from firebase_admin import credentials, firestore | |
| # π€ YOUR AI TOOLS | |
| import main_photo | |
| import main_video | |
| import bg_remove | |
| import add_logo | |
| import enhance_photo | |
| import enhance_video | |
| import auto_detect | |
| import db | |
| # 1οΈβ£ LOAD SECRETS FIRST | |
| load_dotenv() | |
| # 2οΈβ£ INITIALIZE THE APP (ONLY ONCE!) | |
| app = FastAPI(title="VaniConnect AI Engine") | |
| # 3οΈβ£ OPEN THE SECURITY GATES (CORS) - THIS FIXES YOUR BUTTON! | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=[ | |
| "http://localhost:5173", | |
| "https://vaniconnect-studio.vercel.app", | |
| "https://clipeto.com", | |
| "https://www.clipeto.com" | |
| ], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # 4οΈβ£ SET UP RATE LIMITER | |
| limiter = Limiter(key_func=get_remote_address) | |
| app.state.limiter = limiter | |
| app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) | |
| # 5οΈβ£ SET UP DOWNLOADS FOLDER | |
| # 1. Define the custom class FIRST so Python knows what it is | |
| class CORStaticFiles(StaticFiles): | |
| async def item_response(self, *args, **kwargs) -> Response: | |
| response = await super().item_response(*args, **kwargs) | |
| response.headers["Access-Control-Allow-Origin"] = "*" | |
| response.headers["Access-Control-Allow-Methods"] = "GET, OPTIONS" | |
| response.headers["Access-Control-Allow-Headers"] = "*" | |
| return response | |
| # 2. Make the directory physical structure | |
| os.makedirs("downloads", exist_ok=True) | |
| # 3. Mount the folder using your class LAST | |
| app.mount("/downloads", CORStaticFiles(directory="downloads"), name="downloads") | |
| # 6οΈβ£ CONNECT TO CLOUDFLARE R2 | |
| r2_access_key = os.getenv('R2_ACCESS_KEY_ID') | |
| r2_secret_key = os.getenv('R2_SECRET_ACCESS_KEY') | |
| r2_endpoint = os.getenv('R2_ENDPOINT_URL') | |
| bucket_name = os.getenv('R2_BUCKET_NAME') | |
| s3 = boto3.client( | |
| 's3', | |
| endpoint_url=r2_endpoint, | |
| aws_access_key_id=r2_access_key, | |
| aws_secret_access_key=r2_secret_key, | |
| region_name='auto' | |
| ) | |
| # 7οΈβ£ RAZORPAY SETUP | |
| RAZORPAY_KEY_ID = os.getenv("RAZORPAY_KEY_ID") | |
| RAZORPAY_KEY_SECRET = os.getenv("RAZORPAY_KEY_SECRET") | |
| rzp_client = razorpay.Client(auth=(RAZORPAY_KEY_ID, RAZORPAY_KEY_SECRET)) | |
| class OrderRequest(BaseModel): | |
| user_id: str | |
| class VerifyRequest(BaseModel): | |
| razorpay_order_id: str | |
| razorpay_payment_id: str | |
| razorpay_signature: str | |
| user_id: str | |
| # 8οΈβ£ FIREBASE ADMIN SETUP | |
| firebase_secret = os.environ.get("FIREBASE_KEY") | |
| cred = credentials.Certificate(json.loads(firebase_secret)) | |
| # π₯ FIX: Check if Firebase is already initialized to prevent reload crashes | |
| if not firebase_admin._apps: | |
| firebase_admin.initialize_app(cred) | |
| firestore_db = firestore.client() | |
| # --------------------------------------------------------- | |
| # π ROUTES | |
| # --------------------------------------------------------- | |
| def read_root(): | |
| return {"message": "β VaniConnect AI Engine is Live and Running!"} | |
| # 1οΈβ£ CREATE THE ORDER | |
| async def create_order(req: OrderRequest): | |
| try: | |
| # βΉ299 is 99900 paise in Razorpay | |
| order_amount = 29900 | |
| razorpay_order = rzp_client.order.create({ | |
| "amount": order_amount, | |
| "currency": "INR", | |
| "receipt": f"receipt_{req.user_id}", | |
| "payment_capture": "1" # Auto-capture the payment | |
| }) | |
| return { | |
| "order_id": razorpay_order['id'], | |
| "amount": order_amount, | |
| "currency": "INR" | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # 2οΈβ£ VERIFY PAYMENT & UNLOCK PRO IN FIREBASE | |
| async def verify_payment(req: VerifyRequest): | |
| try: | |
| # 1. Razorpay securely checks if the payment is legitimate | |
| rzp_client.utility.verify_payment_signature({ | |
| 'razorpay_order_id': req.razorpay_order_id, | |
| 'razorpay_payment_id': req.razorpay_payment_id, | |
| 'razorpay_signature': req.razorpay_signature | |
| }) | |
| # 2. π¨ UNLOCK THE FIREBASE USER | |
| try: | |
| user_ref = firestore_db.collection('users').document(req.user_id) | |
| # π₯ We use .set() with merge=True so it creates the user if they don't exist yet! | |
| user_ref.set({"isProUser": True}, merge=True) | |
| print(f"β Successfully upgraded user {req.user_id} to PRO in Firestore!") | |
| except Exception as e: | |
| print(f"π₯ FIREBASE ERROR: {e}") | |
| # We just print the error to the terminal, but we don't crash the app | |
| # because the Razorpay payment was actually successful! | |
| return {"status": "success", "message": "Payment verified. Pro Unlocked!"} | |
| except razorpay.errors.SignatureVerificationError: | |
| raise HTTPException(status_code=400, detail="Payment verification failed!") | |
| # π (Paste the rest of your AI tool routes down here like /api/enhance, etc.) π | |
| def read_root(): | |
| return {"message": "β VaniConnect AI Engine is Live and Running!"} | |
| async def process_photo( | |
| request: Request, | |
| file: UploadFile = File(...), | |
| x: int = Form(0), | |
| y: int = Form(0), | |
| w: int = Form(0), | |
| h: int = Form(0), | |
| style: str = Form("Standard AI Inpaint"), | |
| mode: str = Form("manual"), | |
| user_id: str = Form(...) # π 1. Added the Catching Mitt! | |
| ): | |
| # β 2. Deleted the hardcoded admin_user_1! | |
| user_data = db.get_or_create_user(user_id) | |
| # 2. Safety Check: If Firebase returns NOTHING, stop gracefully! | |
| if not user_data: | |
| print(f"π¨ Blocked: User {user_id} not found in Firebase!") | |
| raise HTTPException(status_code=401, detail="User profile not found. Please log in securely.") | |
| # 3. Use the NEW Firebase vocabulary | |
| is_pro = user_data.get("isProUser", False) | |
| credits_left = user_data.get("free_credits", 0) | |
| # 4. The Paywall Logic | |
| if not is_pro and credits_left <= 0: | |
| print(f"π¨ Blocked: User {user_id} is out of credits!") | |
| raise HTTPException(status_code=402, detail="PaywallTrigger: Daily limit reached. Upgrade to Pro.") | |
| input_filename = f"downloads/temp_{file.filename}" | |
| output_filename = f"downloads/clean_{file.filename}" | |
| with open(input_filename, "wb") as buffer: | |
| buffer.write(await file.read()) | |
| print(f"βοΈ Removing photo watermark from {input_filename} in {mode.upper()} mode...") | |
| # π THE AI AUTO LOGIC FOR PHOTOS! | |
| if mode == "auto": | |
| print("π€ Handing photo to EasyOCR to find the watermark...") | |
| box = auto_detect.find_text_watermark(input_filename) | |
| if box: | |
| x, y, w, h = box['x'], box['y'], box['w'], box['h'] | |
| print(f"β AI found the text at: x={x}, y={y}, w={w}, h={h}") | |
| else: | |
| return {"error": "AI Auto could not find clear text in the image. Please use Manual Select."} | |
| success = main_photo.remove_photo_watermark_web( | |
| input_path=input_filename, | |
| output_path=output_filename, | |
| x=x, y=y, w=w, h=h, style=style | |
| ) | |
| if not success: | |
| return {"error": "Failed to process photo"} | |
| try: | |
| with open(output_filename, 'rb') as clean_file: | |
| s3.put_object( | |
| Bucket=bucket_name, | |
| Key=f'processed_photos/clean_{file.filename}', | |
| Body=clean_file | |
| ) | |
| except Exception as e: | |
| print(f"Cloudflare upload skipped: {e}") | |
| try: | |
| os.remove(input_filename) | |
| except Exception as e: | |
| print(f"Cleanup issue: {e}") | |
| # π 3. FIXED INDENTATION: Now credit is deducted safely! | |
| if not is_pro: | |
| db.deduct_credit(user_id) | |
| print(f"πΈ Credit deducted! Remaining: {credits_left - 1}") | |
| return {"message": "Success!", "file_name": f"clean_{file.filename}"} | |
| async def process_video( | |
| request: Request, | |
| file: UploadFile = File(...), | |
| x: int = Form(0), | |
| y: int = Form(0), | |
| w: int = Form(0), | |
| h: int = Form(0), | |
| mode: str = Form("manual"), | |
| user_id: str = Form(...) | |
| ): | |
| # 1. User Validation & Profile Synchronization | |
| user_data = db.get_or_create_user(user_id) | |
| if not user_data: | |
| print(f"π¨ Blocked: User {user_id} not found in Firebase!") | |
| raise HTTPException(status_code=401, detail="User profile not found. Please log in securely.") | |
| is_pro = user_data.get("isProUser", False) | |
| credits_left = user_data.get("free_credits", 0) | |
| if not is_pro and credits_left <= 0: | |
| print(f"π¨ Blocked: User {user_id} is out of credits!") | |
| raise HTTPException(status_code=402, detail="PaywallTrigger: Daily limit reached. Upgrade to Pro.") | |
| # 2. Disk Staging Setup | |
| input_filename = f"downloads/temp_vid_{file.filename}" | |
| output_filename = f"downloads/clean_vid_{file.filename}" | |
| with open(input_filename, "wb") as buffer: | |
| buffer.write(await file.read()) | |
| print(f"π¬ Incoming Frontend Request Mode: {mode.upper()}") | |
| # Both modes will share your amazing fast dominant color patcher engine! | |
| backend_execution_mode = "fast" | |
| # π ISOLATED AUTO MODE LOGIC CORE (Does not interfere with Manual mode parameters) | |
| if mode == "auto": | |
| print("π€ Handing video to EasyOCR to parse text layouts...") | |
| box = auto_detect.find_text_watermark(input_filename) | |
| if box: | |
| # Open video metadata stream to calculate strict layout boundaries | |
| vid = cv2.VideoCapture(input_filename) | |
| vid_w = int(vid.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| vid_h = int(vid.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| vid.release() | |
| temp_x, temp_y = box['x'], box['y'] | |
| temp_w, temp_h = box['w'], box['h'] | |
| print(f"π― Raw AI Detection Coordinates -> x: {temp_x}, y: {temp_y}, w: {temp_w}, h: {temp_h} (Canvas Size: {vid_w}x{vid_h})") | |
| # π‘οΈ SHIELD 1: Headline Banner Detection (Rejects long horizontal bars) | |
| if temp_w > (vid_w * 0.40) or temp_h > (vid_h * 0.30): | |
| print("β οΈ Intercepted: AI detected a structural headline banner instead of a corner logo.") | |
| try: os.remove(input_filename) | |
| except: pass | |
| return {"error": "AI Auto targeted the large news headline banner. To keep the banner text intact, please use 'Manual' mode and circle just the corner logo directly!"} | |
| # π‘οΈ SHIELD 2: Central Video Content Guard (Rejects center subtitles/captions) | |
| if (vid_h * 0.25) < temp_y < (vid_h * 0.75) and (vid_w * 0.20) < temp_x < (vid_w * 0.80): | |
| print("β οΈ Intercepted: AI detected text within the primary central video tracking zone.") | |
| try: os.remove(input_filename) | |
| except: pass | |
| return {"error": "AI Auto detected text elements in the center of the screen. Please switch to 'Manual' mode to specify your watermark area!"} | |
| # If the coordinates pass both safety filters, assign them smoothly! | |
| x, y, w, h = temp_x, temp_y, temp_w, temp_h | |
| print(f"β Auto coordinates cleared security check: x={x}, y={y}, w={w}, h={h}") | |
| else: | |
| print("β EasyOCR scan returned empty array fields.") | |
| try: os.remove(input_filename) | |
| except: pass | |
| return {"error": "AI Auto could not detect any explicit text watermarks in the corners. Please use Manual Mode to draw your selection box."} | |
| # 3. Core Engine Execution (Safe, flawless rendering path) | |
| success = main_video.remove_watermark_pro( | |
| input_path=input_filename, | |
| output_path=output_filename, | |
| x=x, y=y, w=w, h=h, | |
| mode=backend_execution_mode | |
| ) | |
| if not success: | |
| return {"error": "Video processing execution encountered a system error."} | |
| # 4. Production Cloud Delivery & Cleanup | |
| print("βοΈ Uploading clean asset matrix to Cloudflare R2...") | |
| try: | |
| with open(output_filename, 'rb') as clean_file: | |
| s3.put_object( | |
| Bucket=bucket_name, | |
| Key=f'processed_videos/clean_vid_{file.filename}', | |
| Body=clean_file | |
| ) | |
| except Exception as e: | |
| print(f"Cloudflare infrastructure bypass (Local Fallback Triggered): {e}") | |
| try: | |
| os.remove(input_filename) | |
| except Exception as e: | |
| print(f"File retention warning: {e}") | |
| if not is_pro: | |
| db.deduct_credit(user_id) | |
| return {"message": "Success!", "file_name": f"clean_vid_{file.filename}"} | |
| async def process_background( | |
| request: Request, | |
| file: UploadFile = File(...), | |
| bg_color: Optional[str] = Form(None), | |
| bg_image: Optional[UploadFile] = File(None) , | |
| user_id: str = Form(...) | |
| ): | |
| # 1. Grab the REAL user_id from the frontend (Replace hardcoded string later!) | |
| user_data = db.get_or_create_user(user_id) | |
| # 2. Safety Check: If Firebase returns NOTHING, stop gracefully! | |
| if not user_data: | |
| print(f"π¨ Blocked: User {user_id} not found in Firebase!") | |
| raise HTTPException(status_code=401, detail="User profile not found. Please log in securely.") | |
| # 3. Use the NEW Firebase vocabulary | |
| is_pro = user_data.get("isProUser", False) | |
| credits_left = user_data.get("free_credits", 0) | |
| # 4. The Paywall Logic | |
| if not is_pro and credits_left <= 0: | |
| print(f"π¨ Blocked: User {user_id} is out of credits!") | |
| # 402 Payment Required is the perfect status code here! | |
| raise HTTPException(status_code=402, detail="PaywallTrigger: Daily limit reached. Upgrade to Pro.") | |
| # --- YOUR EXISTING LOGIC STARTS HERE --- | |
| input_filename = f"temp_bg_{file.filename}" | |
| base_name = os.path.splitext(file.filename)[0] | |
| transparent_filename = f"nobg_{base_name}.png" | |
| pro_output_filename = f"pro_bg_{base_name}.jpg" | |
| with open(input_filename, "wb") as buffer: | |
| buffer.write(await file.read()) | |
| print(f"π§½ Removing background from {input_filename}...") | |
| success = bg_remove.remove_background_web(input_path=input_filename, output_path=transparent_filename) | |
| if not success: | |
| return {"error": "Background removal failed"} | |
| file_to_upload = transparent_filename | |
| content_type = 'image/png' | |
| if bg_color or bg_image: | |
| custom_bg_path = None | |
| if bg_image: | |
| custom_bg_path = f"temp_custom_bg_{bg_image.filename}" | |
| with open(custom_bg_path, "wb") as buffer: | |
| buffer.write(await bg_image.read()) | |
| print("π¨ Applying Pro Background...") | |
| pro_success = bg_remove.apply_pro_background( | |
| transparent_filename, pro_output_filename, | |
| bg_color_hex=bg_color, bg_image_path=custom_bg_path | |
| ) | |
| if pro_success: | |
| file_to_upload = pro_output_filename | |
| content_type = 'image/jpeg' | |
| if custom_bg_path and os.path.exists(custom_bg_path): | |
| os.remove(custom_bg_path) | |
| shutil.copy(file_to_upload, f"downloads/{file_to_upload}") | |
| print(f"βοΈ Uploading {file_to_upload} to Cloudflare...") | |
| with open(file_to_upload, 'rb') as final_file: | |
| s3.put_object( | |
| Bucket=bucket_name, | |
| Key=f'processed_backgrounds/{file_to_upload}', | |
| Body=final_file, | |
| ContentType=content_type | |
| ) | |
| if os.path.exists(input_filename): os.remove(input_filename) | |
| if os.path.exists(transparent_filename): os.remove(transparent_filename) | |
| if os.path.exists(pro_output_filename): os.remove(pro_output_filename) | |
| # π° 5. DEDUCT CREDIT: The job was successful, subtract 1 credit! | |
| # If they are NOT a pro user, deduct a credit. (Pro users don't lose credits) | |
| if not is_pro: | |
| db.deduct_credit(user_id) | |
| print(f"πΈ Credit deducted! Remaining: {credits_left - 1}") | |
| return {"message": "Success!", "file_name": file_to_upload} | |
| async def process_add_logo( | |
| request: Request, | |
| video_file: UploadFile = File(...), | |
| logo_file: UploadFile = File(...), | |
| x: int = Form(...), | |
| y: int = Form(...), | |
| logo_w: int = Form(...), | |
| logo_h: int = Form(100), | |
| user_id: str = Form(...) # π 1. Added the Catching Mitt! | |
| ): | |
| # β 2. Deleted the hardcoded admin_user_1! | |
| user_data = db.get_or_create_user(user_id) | |
| # 2. Safety Check: If Firebase returns NOTHING, stop gracefully! | |
| if not user_data: | |
| print(f"π¨ Blocked: User {user_id} not found in Firebase!") | |
| raise HTTPException(status_code=401, detail="User profile not found. Please log in securely.") | |
| # 3. Use the NEW Firebase vocabulary | |
| is_pro = user_data.get("isProUser", False) | |
| credits_left = user_data.get("free_credits", 0) | |
| # 4. The Paywall Logic | |
| if not is_pro and credits_left <= 0: | |
| print(f"π¨ Blocked: User {user_id} is out of credits!") | |
| # 402 Payment Required is the perfect status code here! | |
| raise HTTPException(status_code=402, detail="PaywallTrigger: Daily limit reached. Upgrade to Pro.") | |
| # 1. π THE FOLDER FIX: Save everything directly into the 'downloads' folder | |
| input_video = f"downloads/temp_vid_{video_file.filename}" | |
| input_logo = f"downloads/temp_logo_{logo_file.filename}" | |
| output_video = f"downloads/watermarked_{video_file.filename}" | |
| # Save both uploaded files directly to the safe folder | |
| with open(input_video, "wb") as buffer: | |
| buffer.write(await video_file.read()) | |
| with open(input_logo, "wb") as buffer: | |
| buffer.write(await logo_file.read()) | |
| print(f"π Stamping logo onto {input_video}...") | |
| # 2. Run your logo logic | |
| success = add_logo.add_user_controlled_logo( | |
| video_path=input_video, | |
| logo_path=input_logo, | |
| output_path=output_video, | |
| x=x, y=y, logo_w=logo_w, logo_h=logo_h | |
| ) | |
| if not success: | |
| return {"error": "Failed to add logo to video"} | |
| # 3. THE CLOUD FIX: Wrap in a try/except so local tests don't crash without Wi-Fi | |
| print("βοΈ Uploading watermarked video to Cloudflare...") | |
| try: | |
| with open(output_video, 'rb') as clean_file: | |
| s3.put_object( | |
| Bucket=bucket_name, | |
| Key=f'processed_videos/watermarked_{video_file.filename}', | |
| Body=clean_file | |
| ) | |
| except Exception as e: | |
| print(f"Cloudflare upload skipped (Local Mode): {e}") | |
| # 4. π THE DELETE FIX: Clean up inputs, but KEEP the output video! | |
| try: | |
| os.remove(input_video) | |
| os.remove(input_logo) | |
| # os.remove(output_video) <-- Commented out so the frontend can play it! | |
| except Exception as e: | |
| print(f"Cleanup issue: {e}") | |
| # Send JUST the final file name back to React | |
| if not is_pro: | |
| db.deduct_credit(user_id) | |
| print(f"πΈ Credit deducted! Remaining: {credits_left - 1}") | |
| return {"message": "Success!", "file_name": f"watermarked_{video_file.filename}"} | |
| async def process_enhance_photo( | |
| request: Request, | |
| file: UploadFile = File(...), | |
| factor: int = Form(4), | |
| face_restoration: str = Form("true"), | |
| color_correction: str = Form("false"), | |
| user_id: str = Form(...) # π 1. Added the Catching Mitt! | |
| ): | |
| # β 2. Deleted the hardcoded admin_user_1! | |
| user_data = db.get_or_create_user(user_id) | |
| # 2. Safety Check: If Firebase returns NOTHING, stop gracefully! | |
| if not user_data: | |
| print(f"π¨ Blocked: User {user_id} not found in Firebase!") | |
| raise HTTPException(status_code=401, detail="User profile not found. Please log in securely.") | |
| # 3. Use the NEW Firebase vocabulary | |
| is_pro = user_data.get("isProUser", False) | |
| credits_left = user_data.get("free_credits", 0) | |
| # 4. The Paywall Logic | |
| if not is_pro and credits_left <= 0: | |
| print(f"π¨ Blocked: User {user_id} is out of credits!") | |
| raise HTTPException(status_code=402, detail="PaywallTrigger: Daily limit reached. Upgrade to Pro.") | |
| # 1. π THE FOLDER FIX: Save directly into the safe 'downloads' folder | |
| input_filename = f"downloads/temp_enhance_{file.filename}" | |
| output_filename = f"downloads/enhanced_{file.filename}" | |
| # Save uploaded photo locally | |
| with open(input_filename, "wb") as buffer: | |
| buffer.write(await file.read()) | |
| print(f"β¨ Enhancing photo {input_filename} | Factor: {factor}x | Face: {face_restoration} | Color: {color_correction}") | |
| # 2. Run your OpenCV enhancement logic | |
| # We pass all the React UI settings directly to your engine! | |
| success = enhance_photo.enhance_photo_web( | |
| input_path=input_filename, | |
| output_path=output_filename, | |
| factor=factor, | |
| face_restoration=(face_restoration == "true"), | |
| color_correction=(color_correction == "false") | |
| ) | |
| if not success: | |
| return {"error": "Photo enhancement failed"} | |
| # 3. βοΈ THE CLOUD FIX: Wrap in try/except so local tests don't crash | |
| print("βοΈ Uploading enhanced photo to Cloudflare...") | |
| try: | |
| with open(output_filename, 'rb') as clean_file: | |
| s3.put_object( | |
| Bucket=bucket_name, | |
| Key=f'processed_enhancements/enhanced_{file.filename}', | |
| Body=clean_file | |
| ) | |
| except Exception as e: | |
| print(f"Cloudflare upload skipped (Local Mode): {e}") | |
| # 4. π THE DELETE FIX: Keep the output file alive! | |
| try: | |
| os.remove(input_filename) | |
| # os.remove(output_filename) <-- Commented out so frontend can show it! | |
| except Exception as e: | |
| print(f"Cleanup issue: {e}") | |
| # 5. THE DOUBLE-DOWNLOAD FIX: Send ONLY the file name back to React | |
| if not is_pro: | |
| db.deduct_credit(user_id) | |
| print(f"πΈ Credit deducted! Remaining: {credits_left - 1}") | |
| return {"message": "Success!", "file_name": f"enhanced_{file.filename}"} | |
| # <-- 1. The Bouncer: Only 5 downloads per minute! | |
| async def process_enhance_video( | |
| request: Request, | |
| file: UploadFile = File(...), | |
| # π‘οΈ THE SHIELD: Catch the UI settings so React doesn't crash! | |
| resolution: str = Form("1080p FHD"), | |
| fps_60: str = Form("true"), | |
| denoise: str = Form("true"), | |
| user_id: str = Form(...) # π 1. Added the Catching Mitt! | |
| ): | |
| # β 2. Deleted the hardcoded admin_user_1! | |
| user_data = db.get_or_create_user(user_id) | |
| # 2. Safety Check: If Firebase returns NOTHING, stop gracefully! | |
| if not user_data: | |
| print(f"π¨ Blocked: User {user_id} not found in Firebase!") | |
| raise HTTPException(status_code=401, detail="User profile not found. Please log in securely.") | |
| # 3. Use the NEW Firebase vocabulary | |
| is_pro = user_data.get("isProUser", False) | |
| credits_left = user_data.get("free_credits", 0) | |
| # 4. The Paywall Logic | |
| if not is_pro and credits_left <= 0: | |
| print(f"π¨ Blocked: User {user_id} is out of credits!") | |
| raise HTTPException(status_code=402, detail="PaywallTrigger: Daily limit reached. Upgrade to Pro.") | |
| # 1. π THE FOLDER FIX: Force the engine to save inside the 'downloads' folder | |
| input_filename = f"downloads/temp_enhvid_{file.filename}" | |
| output_filename = f"downloads/enhanced_{file.filename}" | |
| # Save uploaded video locally | |
| with open(input_filename, "wb") as buffer: | |
| buffer.write(await file.read()) | |
| print(f"β‘ Enhancing video {input_filename} in Turbo Mode... (UI Settings Caught but using Turbo Override)") | |
| # Run your smart MoviePy logic (Turbo Mode) | |
| success = enhance_video.enhance_video_smartly( | |
| input_path=input_filename, | |
| output_path=output_filename | |
| ) | |
| if not success: | |
| return {"error": "Video enhancement failed"} | |
| # Upload the brightened video to Cloudflare | |
| print("βοΈ Uploading enhanced video to Cloudflare...") | |
| try: | |
| with open(output_filename, 'rb') as clean_file: | |
| s3.put_object( | |
| Bucket=bucket_name, | |
| Key=f'processed_videos/enhanced_{file.filename}', | |
| Body=clean_file | |
| ) | |
| except Exception as e: | |
| print(f"Cloudflare upload skipped: {e}") | |
| # 2. π THE DELETE FIX: Keep the output file alive! | |
| try: | |
| os.remove(input_filename) # We can safely delete the temporary input video | |
| except Exception: | |
| pass | |
| # Send JUST the file name back to the website so it knows what to look for | |
| if not is_pro: | |
| db.deduct_credit(user_id) | |
| print(f"πΈ Credit deducted! Remaining: {credits_left - 1}") | |
| return {"message": "Success!", "file_name": f"enhanced_{file.filename}"} | |
| # <-- 1. The Bouncer: Only 5 downloads per minute! | |
| async def process_clipcut( | |
| request: Request, | |
| file: UploadFile = File(...), | |
| start_time: str = Form("00:00:00"), | |
| end_time: str = Form("00:00:10"), | |
| text: str = Form("VaniConnect AI"), | |
| user_id: str = Form(...) # π 1. Added the Catching Mitt! | |
| ): | |
| # β 2. Deleted the hardcoded admin_user_1! | |
| user_data = db.get_or_create_user(user_id) | |
| # 2. Safety Check: If Firebase returns NOTHING, stop gracefully! | |
| if not user_data: | |
| print(f"π¨ Blocked: User {user_id} not found in Firebase!") | |
| raise HTTPException(status_code=401, detail="User profile not found. Please log in securely.") | |
| # 3. Use the NEW Firebase vocabulary | |
| is_pro = user_data.get("isProUser", False) | |
| credits_left = user_data.get("free_credits", 0) | |
| # 4. The Paywall Logic | |
| if not is_pro and credits_left <= 0: | |
| print(f"π¨ Blocked: User {user_id} is out of credits!") | |
| raise HTTPException(status_code=402, detail="PaywallTrigger: Daily limit reached. Upgrade to Pro.") | |
| input_filename = f"downloads/temp_clip_{file.filename}" | |
| trimmed_filename = f"downloads/trimmed_{file.filename}" | |
| final_filename = f"downloads/final_clipcut_{file.filename}" | |
| # 1. Save uploaded video locally | |
| with open(input_filename, "wb") as buffer: | |
| buffer.write(await file.read()) | |
| print(f"βοΈ 1/2: Trimming video from {start_time} to {end_time}...") | |
| # 2. Run trimming function (MoviePy understands HH:MM:SS perfectly!) | |
| trim_success = trim_video.trim_video( | |
| input_path=input_filename, | |
| output_path=trimmed_filename, | |
| start_sec=start_time, | |
| end_sec=end_time | |
| ) | |
| if not trim_success: | |
| return {"error": "Video trimming failed. Make sure Start Time is before End Time!"} | |
| print(f"βοΈ 2/2: Adding professional text overlay: '{text}'...") | |
| # 3. Add text overlay | |
| text_success = trim_video.add_professional_text( | |
| input_path=trimmed_filename, | |
| output_path=final_filename, | |
| text=text | |
| ) | |
| if not text_success: | |
| return {"error": "Text overlay failed"} | |
| # 4. Upload to Cloudflare (optional cloud step) | |
| print("βοΈ Uploading ClipCut Pro video to Cloudflare...") | |
| try: | |
| with open(final_filename, 'rb') as clean_file: | |
| s3.put_object( | |
| Bucket=bucket_name, | |
| Key=f'processed_videos/{final_filename}', | |
| Body=clean_file | |
| ) | |
| except Exception as e: | |
| print(f"Cloud upload skipped: {e}") | |
| # 5. Clean up local files | |
| try: | |
| os.remove(input_filename) | |
| os.remove(trimmed_filename) | |
| except Exception: | |
| pass | |
| # π 3. Fixed the indentation here! | |
| if not is_pro: | |
| db.deduct_credit(user_id) | |
| print(f"πΈ Credit deducted! Remaining: {credits_left - 1}") | |
| return {"message": "Success!", "file_name": f"final_clipcut_{file.filename}"} | |
| def test_database(): | |
| user_data = db.get_or_create_user("ceo@vaniconnect.com") | |
| return {"message": "Database is working perfectly!", "user_data": user_data} |