from .firebase_config import firebase_bucket import base64 import os import tempfile from PIL import Image import io import asyncio from typing import List, Optional from datetime import datetime import pytz from src.utils.image_utils import base64_to_image import asyncio import functools def upload_file_to_storage_sync(file_path, file_name): """ Synchronous function to upload a file to Firebase Storage. param: file_path: str - The path of the file on the local machine to be uploaded. file_name: str - The name of the file in Firebase Storage. return: str - The public URL of the uploaded file. """ blob = firebase_bucket.blob(file_name) blob.upload_from_filename(file_path) blob.make_public() return blob.public_url async def upload_file_to_storage(file_path: str, file_name: str) -> str: """ Asynchronous wrapper to upload a file to Firebase Storage using a thread pool. Args: file_path: str - The path of the file on the local machine to be uploaded. file_name: str - The name of the file in Firebase Storage. Returns: str - The public URL of the uploaded file. """ loop = asyncio.get_event_loop() # Run the synchronous upload in a thread pool def upload_sync(): blob = firebase_bucket.blob(file_name) blob.upload_from_filename(file_path) blob.make_public() return blob.public_url public_url = await loop.run_in_executor(None, upload_sync) return public_url def delete_file_from_storage(file_name): """ Delete a file from Firebase Storage param: file_name: str - The name of the file to be deleted return: bool - True if the file is deleted successfully, False if the file is not found """ try: blob = firebase_bucket.blob(file_name) blob.delete() return True except Exception as e: print("Error:", e) return False def delete_file_by_url(public_url): """ Delete a file from Firebase Storage using its public URL param: public_url: str - The public URL of the file to be deleted return: bool - True if the file is deleted successfully, False if the file is not found """ try: # Extract the file name from the public URL # URL format is typically: https://storage.googleapis.com/BUCKET_NAME/FILE_NAME file_name = public_url.split("/")[-1] # Delete the file using the extracted name return delete_file_from_storage(file_name) except Exception as e: print(f"Error deleting file by URL: {e}") return False def list_all_files_in_storage(): """ View all files in Firebase Storage return: dict - Dictionary with keys are names and values are url of all files in Firebase Storage """ blobs = firebase_bucket.list_blobs() blob_dict = {blob.name: blob.public_url for blob in blobs} return blob_dict def download_file_from_storage(file_name, destination_path): """ Download a file from Firebase Storage param: file_name: str - The name of the file to be downloaded destination_path: str - The path to save the downloaded file return: bool - True if the file is downloaded successfully, False if the file is not found """ try: blob = firebase_bucket.blob(file_name) blob.download_to_filename(destination_path) print("da tai xun thanh cong") return True except Exception as e: print("Error:", e) return False async def upload_base64_image_to_storage( base64_image: str, file_name: str, format: str = "JPEG" ) -> Optional[str]: """ Upload a base64 image to Firebase Storage asynchronously. Args: base64_image: str - The base64 encoded image file_name: str - The name of the file to be uploaded format: str - The format to save the image in (JPEG, PNG, etc.) Returns: Optional[str] - The public URL of the uploaded file or None if failed """ try: # Convert base64 to PIL Image image = base64_to_image(base64_image) # Create unique temp file path with appropriate extension temp_file_path = os.path.join( tempfile.gettempdir(), f"{file_name}_{datetime.now().timestamp()}.{format.lower()}", ) # Save image in the specified format image.save(temp_file_path, format=format) try: # Upload to Firebase public_url = await upload_file_to_storage( temp_file_path, f"{file_name}.{format.lower()}" ) return public_url finally: # Clean up temp file if os.path.exists(temp_file_path): os.remove(temp_file_path) except Exception as e: print(f"Error processing image {file_name}: {str(e)}") # If format is not JPEG, try again with JPEG if format.upper() != "JPEG": return await upload_base64_image_to_storage( base64_image, file_name, format="JPEG" ) return None async def process_images(base64_images: List[str]) -> List[Optional[str]]: """ Process multiple base64 images concurrently. Args: base64_images: List[str] - List of base64 encoded images Returns: List[Optional[str]] - List of public URLs or None for failed uploads """ tasks = [] for idx, base64_image in enumerate(base64_images): timestamp = ( datetime.now(pytz.timezone("Asia/Ho_Chi_Minh")) .replace(tzinfo=None) .strftime("%Y-%m-%d_%H-%M-%S") ) file_name = f"image_{timestamp}_{idx}" # Determine format from base64 header or default to JPEG format = "JPEG" if "data:image/" in base64_image: mime_type = base64_image.split(";")[0].split("/")[1] if mime_type == "png": format = "PNG" elif mime_type == "webp": format = "WEBP" tasks.append( upload_base64_image_to_storage(base64_image, file_name, format=format) ) return await asyncio.gather(*tasks, return_exceptions=True)