Spaces:
Runtime error
Runtime error
| from .firebase_config import firebase_bucket | |
| import base64 | |
| import os | |
| import tempfile | |
| from PIL import Image | |
| import io | |
| import aiofiles | |
| import asyncio | |
| from typing import List, Optional | |
| from datetime import datetime | |
| import pytz | |
| import asyncio | |
| import functools | |
| def upload_file_to_storage_sync(file_path, file_name): | |
| """ | |
| Synchronous function to upload a file to Firebase Storage. | |
| param: | |
| file_path: str - The path of the file on the local machine to be uploaded. | |
| file_name: str - The name of the file in Firebase Storage. | |
| return: | |
| str - The public URL of the uploaded file. | |
| """ | |
| blob = firebase_bucket.blob(file_name) | |
| blob.upload_from_filename(file_path) | |
| blob.make_public() | |
| return blob.public_url | |
| async def upload_file_to_storage(file_path: str, file_name: str) -> str: | |
| """ | |
| Asynchronous wrapper to upload a file to Firebase Storage using a thread pool. | |
| param: | |
| file_path: str - The path of the file on the local machine to be uploaded. | |
| file_name: str - The name of the file in Firebase Storage. | |
| return: | |
| str - The public URL of the uploaded file. | |
| """ | |
| loop = asyncio.get_event_loop() | |
| # Run the synchronous `upload_file_to_storage_sync` in a thread pool. | |
| public_url = await loop.run_in_executor( | |
| None, functools.partial(upload_file_to_storage_sync, file_path, file_name) | |
| ) | |
| return public_url | |
| def delete_file_from_storage(file_name): | |
| """ | |
| Delete a file from Firebase Storage | |
| param: | |
| file_name: str - The name of the file to be deleted | |
| return: | |
| bool - True if the file is deleted successfully, False if the file is not found | |
| """ | |
| try: | |
| blob = firebase_bucket.blob(file_name) | |
| blob.delete() | |
| return True | |
| except Exception as e: | |
| print("Error:", e) | |
| return False | |
| def list_all_files_in_storage(): | |
| """ | |
| View all files in Firebase Storage | |
| return: | |
| dict - Dictionary with keys are names and values are url of all files in Firebase Storage | |
| """ | |
| blobs = firebase_bucket.list_blobs() | |
| blob_dict = {blob.name: blob.public_url for blob in blobs} | |
| return blob_dict | |
| def download_file_from_storage(file_name, destination_path): | |
| """ | |
| Download a file from Firebase Storage | |
| param: | |
| file_name: str - The name of the file to be downloaded | |
| destination_path: str - The path to save the downloaded file | |
| return: | |
| bool - True if the file is downloaded successfully, False if the file is not found | |
| """ | |
| try: | |
| blob = firebase_bucket.blob(file_name) | |
| blob.download_to_filename(destination_path) | |
| print("da tai xun thanh cong") | |
| return True | |
| except Exception as e: | |
| print("Error:", e) | |
| return False | |
| async def upload_base64_image_to_storage( | |
| base64_image: str, file_name: str | |
| ) -> Optional[str]: | |
| """ | |
| Upload a base64 image to Firebase Storage asynchronously. | |
| Args: | |
| base64_image: str - The base64 encoded image | |
| file_name: str - The name of the file to be uploaded | |
| Returns: | |
| Optional[str] - The public URL of the uploaded file or None if failed | |
| """ | |
| try: | |
| # Run CPU-intensive operations in thread pool | |
| loop = asyncio.get_event_loop() | |
| # Decode base64 in thread pool | |
| image_data = await loop.run_in_executor( | |
| None, lambda: base64.b64decode(base64_image) | |
| ) | |
| # Open and process image in thread pool | |
| image = await loop.run_in_executor( | |
| None, lambda: Image.open(io.BytesIO(image_data)) | |
| ) | |
| # Create unique temp file path | |
| temp_file_path = os.path.join( | |
| tempfile.gettempdir(), f"{file_name}_{datetime.now().timestamp()}.jpg" | |
| ) | |
| # Save image in thread pool | |
| await loop.run_in_executor( | |
| None, lambda: image.save(temp_file_path, format="JPEG") | |
| ) | |
| try: | |
| # Upload to Firebase | |
| public_url = await upload_file_to_storage( | |
| temp_file_path, f"{file_name}.jpg" | |
| ) | |
| return public_url | |
| finally: | |
| # Clean up temp file in thread pool | |
| await loop.run_in_executor(None, os.remove, temp_file_path) | |
| except Exception as e: | |
| print(f"Error processing image {file_name}: {str(e)}") | |
| return None | |
| async def process_images(base64_images: List[str]) -> List[Optional[str]]: | |
| """ | |
| Process multiple base64 images concurrently. | |
| Args: | |
| base64_images: List[str] - List of base64 encoded images | |
| Returns: | |
| List[Optional[str]] - List of public URLs or None for failed uploads | |
| """ | |
| tasks = [] | |
| for idx, base64_image in enumerate(base64_images): | |
| timestamp = ( | |
| datetime.now(pytz.timezone("Asia/Ho_Chi_Minh")) | |
| .replace(tzinfo=None) | |
| .strftime("%Y-%m-%d_%H-%M-%S") | |
| ) | |
| file_name = f"image_{timestamp}_{idx}" | |
| tasks.append(upload_base64_image_to_storage(base64_image, file_name)) | |
| return await asyncio.gather(*tasks, return_exceptions=True) | |