image-retrieval / src /firebase /firebase_provider.py
ABAO77's picture
Upload 25 files
0e33a02 verified
from .firebase_config import firebase_bucket
import base64
import os
import tempfile
from PIL import Image
import io
import aiofiles
import asyncio
from typing import List, Optional
from datetime import datetime
import pytz
import asyncio
import functools
def upload_file_to_storage_sync(file_path, file_name):
"""
Synchronous function to upload a file to Firebase Storage.
param:
file_path: str - The path of the file on the local machine to be uploaded.
file_name: str - The name of the file in Firebase Storage.
return:
str - The public URL of the uploaded file.
"""
blob = firebase_bucket.blob(file_name)
blob.upload_from_filename(file_path)
blob.make_public()
return blob.public_url
async def upload_file_to_storage(file_path: str, file_name: str) -> str:
"""
Asynchronous wrapper to upload a file to Firebase Storage using a thread pool.
param:
file_path: str - The path of the file on the local machine to be uploaded.
file_name: str - The name of the file in Firebase Storage.
return:
str - The public URL of the uploaded file.
"""
loop = asyncio.get_event_loop()
# Run the synchronous `upload_file_to_storage_sync` in a thread pool.
public_url = await loop.run_in_executor(
None, functools.partial(upload_file_to_storage_sync, file_path, file_name)
)
return public_url
def delete_file_from_storage(file_name):
"""
Delete a file from Firebase Storage
param:
file_name: str - The name of the file to be deleted
return:
bool - True if the file is deleted successfully, False if the file is not found
"""
try:
blob = firebase_bucket.blob(file_name)
blob.delete()
return True
except Exception as e:
print("Error:", e)
return False
def list_all_files_in_storage():
"""
View all files in Firebase Storage
return:
dict - Dictionary with keys are names and values are url of all files in Firebase Storage
"""
blobs = firebase_bucket.list_blobs()
blob_dict = {blob.name: blob.public_url for blob in blobs}
return blob_dict
def download_file_from_storage(file_name, destination_path):
"""
Download a file from Firebase Storage
param:
file_name: str - The name of the file to be downloaded
destination_path: str - The path to save the downloaded file
return:
bool - True if the file is downloaded successfully, False if the file is not found
"""
try:
blob = firebase_bucket.blob(file_name)
blob.download_to_filename(destination_path)
print("da tai xun thanh cong")
return True
except Exception as e:
print("Error:", e)
return False
async def upload_base64_image_to_storage(
base64_image: str, file_name: str
) -> Optional[str]:
"""
Upload a base64 image to Firebase Storage asynchronously.
Args:
base64_image: str - The base64 encoded image
file_name: str - The name of the file to be uploaded
Returns:
Optional[str] - The public URL of the uploaded file or None if failed
"""
try:
# Run CPU-intensive operations in thread pool
loop = asyncio.get_event_loop()
# Decode base64 in thread pool
image_data = await loop.run_in_executor(
None, lambda: base64.b64decode(base64_image)
)
# Open and process image in thread pool
image = await loop.run_in_executor(
None, lambda: Image.open(io.BytesIO(image_data))
)
# Create unique temp file path
temp_file_path = os.path.join(
tempfile.gettempdir(), f"{file_name}_{datetime.now().timestamp()}.jpg"
)
# Save image in thread pool
await loop.run_in_executor(
None, lambda: image.save(temp_file_path, format="JPEG")
)
try:
# Upload to Firebase
public_url = await upload_file_to_storage(
temp_file_path, f"{file_name}.jpg"
)
return public_url
finally:
# Clean up temp file in thread pool
await loop.run_in_executor(None, os.remove, temp_file_path)
except Exception as e:
print(f"Error processing image {file_name}: {str(e)}")
return None
async def process_images(base64_images: List[str]) -> List[Optional[str]]:
"""
Process multiple base64 images concurrently.
Args:
base64_images: List[str] - List of base64 encoded images
Returns:
List[Optional[str]] - List of public URLs or None for failed uploads
"""
tasks = []
for idx, base64_image in enumerate(base64_images):
timestamp = (
datetime.now(pytz.timezone("Asia/Ho_Chi_Minh"))
.replace(tzinfo=None)
.strftime("%Y-%m-%d_%H-%M-%S")
)
file_name = f"image_{timestamp}_{idx}"
tasks.append(upload_base64_image_to_storage(base64_image, file_name))
return await asyncio.gather(*tasks, return_exceptions=True)