from .firebase_config import firebase_bucket import base64 import os import tempfile from PIL import Image import io import aiofiles import asyncio from typing import List, Optional from datetime import datetime import pytz import asyncio import functools def upload_file_to_storage_sync(file_path, file_name): """ Synchronous function to upload a file to Firebase Storage. param: file_path: str - The path of the file on the local machine to be uploaded. file_name: str - The name of the file in Firebase Storage. return: str - The public URL of the uploaded file. """ blob = firebase_bucket.blob(file_name) blob.upload_from_filename(file_path) blob.make_public() return blob.public_url async def upload_file_to_storage(file_path: str, file_name: str) -> str: """ Asynchronous wrapper to upload a file to Firebase Storage using a thread pool. param: file_path: str - The path of the file on the local machine to be uploaded. file_name: str - The name of the file in Firebase Storage. return: str - The public URL of the uploaded file. """ loop = asyncio.get_event_loop() # Run the synchronous `upload_file_to_storage_sync` in a thread pool. public_url = await loop.run_in_executor( None, functools.partial(upload_file_to_storage_sync, file_path, file_name) ) return public_url def delete_file_from_storage(file_name): """ Delete a file from Firebase Storage param: file_name: str - The name of the file to be deleted return: bool - True if the file is deleted successfully, False if the file is not found """ try: blob = firebase_bucket.blob(file_name) blob.delete() return True except Exception as e: print("Error:", e) return False def list_all_files_in_storage(): """ View all files in Firebase Storage return: dict - Dictionary with keys are names and values are url of all files in Firebase Storage """ blobs = firebase_bucket.list_blobs() blob_dict = {blob.name: blob.public_url for blob in blobs} return blob_dict def download_file_from_storage(file_name, destination_path): """ Download a file from Firebase Storage param: file_name: str - The name of the file to be downloaded destination_path: str - The path to save the downloaded file return: bool - True if the file is downloaded successfully, False if the file is not found """ try: blob = firebase_bucket.blob(file_name) blob.download_to_filename(destination_path) print("da tai xun thanh cong") return True except Exception as e: print("Error:", e) return False async def upload_base64_image_to_storage( base64_image: str, file_name: str ) -> Optional[str]: """ Upload a base64 image to Firebase Storage asynchronously. Args: base64_image: str - The base64 encoded image file_name: str - The name of the file to be uploaded Returns: Optional[str] - The public URL of the uploaded file or None if failed """ try: # Run CPU-intensive operations in thread pool loop = asyncio.get_event_loop() # Decode base64 in thread pool image_data = await loop.run_in_executor( None, lambda: base64.b64decode(base64_image) ) # Open and process image in thread pool image = await loop.run_in_executor( None, lambda: Image.open(io.BytesIO(image_data)) ) # Create unique temp file path temp_file_path = os.path.join( tempfile.gettempdir(), f"{file_name}_{datetime.now().timestamp()}.jpg" ) # Save image in thread pool await loop.run_in_executor( None, lambda: image.save(temp_file_path, format="JPEG") ) try: # Upload to Firebase public_url = await upload_file_to_storage( temp_file_path, f"{file_name}.jpg" ) return public_url finally: # Clean up temp file in thread pool await loop.run_in_executor(None, os.remove, temp_file_path) except Exception as e: print(f"Error processing image {file_name}: {str(e)}") return None async def process_images(base64_images: List[str]) -> List[Optional[str]]: """ Process multiple base64 images concurrently. Args: base64_images: List[str] - List of base64 encoded images Returns: List[Optional[str]] - List of public URLs or None for failed uploads """ tasks = [] for idx, base64_image in enumerate(base64_images): timestamp = ( datetime.now(pytz.timezone("Asia/Ho_Chi_Minh")) .replace(tzinfo=None) .strftime("%Y-%m-%d_%H-%M-%S") ) file_name = f"image_{timestamp}_{idx}" tasks.append(upload_base64_image_to_storage(base64_image, file_name)) return await asyncio.gather(*tasks, return_exceptions=True)