Spaces:
Sleeping
Sleeping
| from .firebase_config import firebase_bucket | |
| import base64 | |
| import os | |
| import tempfile | |
| from PIL import Image | |
| import io | |
| import asyncio | |
| from typing import List, Optional | |
| from datetime import datetime | |
| import pytz | |
| from src.utils.image_utils import base64_to_image | |
| import asyncio | |
| import functools | |
| def upload_file_to_storage_sync(file_path, file_name): | |
| """ | |
| Synchronous function to upload a file to Firebase Storage. | |
| param: | |
| file_path: str - The path of the file on the local machine to be uploaded. | |
| file_name: str - The name of the file in Firebase Storage. | |
| return: | |
| str - The public URL of the uploaded file. | |
| """ | |
| blob = firebase_bucket.blob(file_name) | |
| blob.upload_from_filename(file_path) | |
| blob.make_public() | |
| return blob.public_url | |
| async def upload_file_to_storage(file_path: str, file_name: str) -> str: | |
| """ | |
| Asynchronous wrapper to upload a file to Firebase Storage using a thread pool. | |
| Args: | |
| file_path: str - The path of the file on the local machine to be uploaded. | |
| file_name: str - The name of the file in Firebase Storage. | |
| Returns: | |
| str - The public URL of the uploaded file. | |
| """ | |
| loop = asyncio.get_event_loop() | |
| # Run the synchronous upload in a thread pool | |
| def upload_sync(): | |
| blob = firebase_bucket.blob(file_name) | |
| blob.upload_from_filename(file_path) | |
| blob.make_public() | |
| return blob.public_url | |
| public_url = await loop.run_in_executor(None, upload_sync) | |
| return public_url | |
| def delete_file_from_storage(file_name): | |
| """ | |
| Delete a file from Firebase Storage | |
| param: | |
| file_name: str - The name of the file to be deleted | |
| return: | |
| bool - True if the file is deleted successfully, False if the file is not found | |
| """ | |
| try: | |
| blob = firebase_bucket.blob(file_name) | |
| blob.delete() | |
| return True | |
| except Exception as e: | |
| print("Error:", e) | |
| return False | |
| def delete_file_by_url(public_url): | |
| """ | |
| Delete a file from Firebase Storage using its public URL | |
| param: | |
| public_url: str - The public URL of the file to be deleted | |
| return: | |
| bool - True if the file is deleted successfully, False if the file is not found | |
| """ | |
| try: | |
| # Extract the file name from the public URL | |
| # URL format is typically: https://storage.googleapis.com/BUCKET_NAME/FILE_NAME | |
| file_name = public_url.split("/")[-1] | |
| # Delete the file using the extracted name | |
| return delete_file_from_storage(file_name) | |
| except Exception as e: | |
| print(f"Error deleting file by URL: {e}") | |
| return False | |
| def list_all_files_in_storage(): | |
| """ | |
| View all files in Firebase Storage | |
| return: | |
| dict - Dictionary with keys are names and values are url of all files in Firebase Storage | |
| """ | |
| blobs = firebase_bucket.list_blobs() | |
| blob_dict = {blob.name: blob.public_url for blob in blobs} | |
| return blob_dict | |
| def download_file_from_storage(file_name, destination_path): | |
| """ | |
| Download a file from Firebase Storage | |
| param: | |
| file_name: str - The name of the file to be downloaded | |
| destination_path: str - The path to save the downloaded file | |
| return: | |
| bool - True if the file is downloaded successfully, False if the file is not found | |
| """ | |
| try: | |
| blob = firebase_bucket.blob(file_name) | |
| blob.download_to_filename(destination_path) | |
| print("da tai xun thanh cong") | |
| return True | |
| except Exception as e: | |
| print("Error:", e) | |
| return False | |
| async def upload_base64_image_to_storage( | |
| base64_image: str, file_name: str, format: str = "JPEG" | |
| ) -> Optional[str]: | |
| """ | |
| Upload a base64 image to Firebase Storage asynchronously. | |
| Args: | |
| base64_image: str - The base64 encoded image | |
| file_name: str - The name of the file to be uploaded | |
| format: str - The format to save the image in (JPEG, PNG, etc.) | |
| Returns: | |
| Optional[str] - The public URL of the uploaded file or None if failed | |
| """ | |
| try: | |
| # Convert base64 to PIL Image | |
| image = base64_to_image(base64_image) | |
| # Create unique temp file path with appropriate extension | |
| temp_file_path = os.path.join( | |
| tempfile.gettempdir(), | |
| f"{file_name}_{datetime.now().timestamp()}.{format.lower()}", | |
| ) | |
| # Save image in the specified format | |
| image.save(temp_file_path, format=format) | |
| try: | |
| # Upload to Firebase | |
| public_url = await upload_file_to_storage( | |
| temp_file_path, f"{file_name}.{format.lower()}" | |
| ) | |
| return public_url | |
| finally: | |
| # Clean up temp file | |
| if os.path.exists(temp_file_path): | |
| os.remove(temp_file_path) | |
| except Exception as e: | |
| print(f"Error processing image {file_name}: {str(e)}") | |
| # If format is not JPEG, try again with JPEG | |
| if format.upper() != "JPEG": | |
| return await upload_base64_image_to_storage( | |
| base64_image, file_name, format="JPEG" | |
| ) | |
| return None | |
| async def process_images(base64_images: List[str]) -> List[Optional[str]]: | |
| """ | |
| Process multiple base64 images concurrently. | |
| Args: | |
| base64_images: List[str] - List of base64 encoded images | |
| Returns: | |
| List[Optional[str]] - List of public URLs or None for failed uploads | |
| """ | |
| tasks = [] | |
| for idx, base64_image in enumerate(base64_images): | |
| timestamp = ( | |
| datetime.now(pytz.timezone("Asia/Ho_Chi_Minh")) | |
| .replace(tzinfo=None) | |
| .strftime("%Y-%m-%d_%H-%M-%S") | |
| ) | |
| file_name = f"image_{timestamp}_{idx}" | |
| # Determine format from base64 header or default to JPEG | |
| format = "JPEG" | |
| if "data:image/" in base64_image: | |
| mime_type = base64_image.split(";")[0].split("/")[1] | |
| if mime_type == "png": | |
| format = "PNG" | |
| elif mime_type == "webp": | |
| format = "WEBP" | |
| tasks.append( | |
| upload_base64_image_to_storage(base64_image, file_name, format=format) | |
| ) | |
| return await asyncio.gather(*tasks, return_exceptions=True) | |