from fastapi import FastAPI, File, UploadFile, Request, Form from fastapi.responses import HTMLResponse, RedirectResponse, StreamingResponse from typing import List, Optional import requests import asyncio import uuid from datetime import datetime import mimetypes import httpx import io import zipfile import math app = FastAPI() DENO_API_URL = "https://dataerrr99.deno.dev" async def save_album_to_db(album_name: str, album_id: str, files: List[dict], is_private: bool = False): async with httpx.AsyncClient() as client: try: response = await client.post(DENO_API_URL, json={ "albumName": album_name, "albumLink": album_id, "files": files, "isPrivate": is_private, "createdAt": datetime.now().isoformat() }) return response.json() except Exception as e: print(f"Error saving to DB: {str(e)}") return None async def get_albums_from_db(): async with httpx.AsyncClient() as client: try: response = await client.get(DENO_API_URL) return response.json() except Exception as e: print(f"Error fetching from DB: {str(e)}") return {"data": []} async def get_album_from_db(album_id: str): async with httpx.AsyncClient() as client: try: response = await client.get(f"{DENO_API_URL}/{album_id}") return response.json() except Exception as e: print(f"Error fetching album from DB: {str(e)}") return None def get_file_type(filename): mime_type, _ = mimetypes.guess_type(filename) if mime_type: if mime_type.startswith('image/'): return 'image' elif mime_type.startswith('video/'): return 'video' elif mime_type.startswith('audio/'): return 'audio' return 'other' def is_allowed_file_type(filename): allowed_types = ['image', 'video', 'audio', 'application/zip'] mime_type, _ = mimetypes.guess_type(filename) return mime_type and any(mime_type.startswith(t) for t in allowed_types) @app.get("/", response_class=HTMLResponse) async def index(): return """
The file '{file.filename}' is not allowed. Please upload it using the Single File Upload.
← Back to Home """, status_code=400) album_id = str(uuid.uuid4()) album_files = [] cookies = await get_cookies() total_files = len(files) uploaded_files = 0 for file in files: file_content = await file.read() upload_result = await initiate_upload(cookies, file.filename, file.content_type) if upload_result and 'upload_url' in upload_result: upload_success = await retry_upload(upload_result['upload_url'], file_content, file.content_type) if upload_success: serving_path = upload_result['serving_url'].split('/pbxt/')[1] album_files.append({ 'filename': file.filename, 'path': serving_path, 'content_type': file.content_type, 'uploaded_at': datetime.now().isoformat() }) uploaded_files += 1 progress = math.floor((uploaded_files / total_files) * 100) print(f"Upload Progress: {progress}%") # Save to Deno KV database await save_album_to_db(album_name, album_id, album_files, is_private) base_url = str(request.base_url).rstrip('/') return RedirectResponse(url=f"{base_url}/album/{album_id}", status_code=303) @app.get("/album/{album_id}", response_class=HTMLResponse) async def view_album(album_id: str): album = await get_album_from_db(album_id) if not album or 'files' not in album: return "Album not found or invalid data", 404 file_list_html = "" for file in album['files']: file_url = f"/upload/{file['path']}" file_type = get_file_type(file['filename']) download_url = f"{file_url}?download=true" preview_html = "" if file_type == 'image': preview_html = f"""Found {len(matching_albums)} albums for "{query}"