from fastapi import FastAPI, File, UploadFile, Request, Form from fastapi.responses import HTMLResponse, RedirectResponse, StreamingResponse from typing import List, Optional import requests import asyncio import uuid from datetime import datetime import mimetypes import httpx import io import zipfile import math app = FastAPI() DENO_API_URL = "https://dataerrr99.deno.dev" async def save_album_to_db(album_name: str, album_id: str, files: List[dict], is_private: bool = False): async with httpx.AsyncClient() as client: try: response = await client.post(DENO_API_URL, json={ "albumName": album_name, "albumLink": album_id, "files": files, "isPrivate": is_private, "createdAt": datetime.now().isoformat() }) return response.json() except Exception as e: print(f"Error saving to DB: {str(e)}") return None async def get_albums_from_db(): async with httpx.AsyncClient() as client: try: response = await client.get(DENO_API_URL) return response.json() except Exception as e: print(f"Error fetching from DB: {str(e)}") return {"data": []} async def get_album_from_db(album_id: str): async with httpx.AsyncClient() as client: try: response = await client.get(f"{DENO_API_URL}/{album_id}") return response.json() except Exception as e: print(f"Error fetching album from DB: {str(e)}") return None def get_file_type(filename): mime_type, _ = mimetypes.guess_type(filename) if mime_type: if mime_type.startswith('image/'): return 'image' elif mime_type.startswith('video/'): return 'video' elif mime_type.startswith('audio/'): return 'audio' return 'other' def is_allowed_file_type(filename): allowed_types = ['image', 'video', 'audio', 'application/zip'] mime_type, _ = mimetypes.guess_type(filename) return mime_type and any(mime_type.startswith(t) for t in allowed_types) @app.get("/", response_class=HTMLResponse) async def index(): return """ File Upload

bing bong

Search Albums

""" @app.post("/album/create") async def create_album( request: Request, album_name: str = Form(...), files: List[UploadFile] = File(...), is_private: Optional[bool] = Form(False) ): for file in files: if not is_allowed_file_type(file.filename): return HTMLResponse(content=f""" Error

Error: Invalid File Type

The file '{file.filename}' is not allowed. Please upload it using the Single File Upload.

← Back to Home """, status_code=400) album_id = str(uuid.uuid4()) album_files = [] cookies = await get_cookies() total_files = len(files) uploaded_files = 0 for file in files: file_content = await file.read() upload_result = await initiate_upload(cookies, file.filename, file.content_type) if upload_result and 'upload_url' in upload_result: upload_success = await retry_upload(upload_result['upload_url'], file_content, file.content_type) if upload_success: serving_path = upload_result['serving_url'].split('/pbxt/')[1] album_files.append({ 'filename': file.filename, 'path': serving_path, 'content_type': file.content_type, 'uploaded_at': datetime.now().isoformat() }) uploaded_files += 1 progress = math.floor((uploaded_files / total_files) * 100) print(f"Upload Progress: {progress}%") # Save to Deno KV database await save_album_to_db(album_name, album_id, album_files, is_private) base_url = str(request.base_url).rstrip('/') return RedirectResponse(url=f"{base_url}/album/{album_id}", status_code=303) @app.get("/album/{album_id}", response_class=HTMLResponse) async def view_album(album_id: str): album = await get_album_from_db(album_id) if not album or 'files' not in album: return "Album not found or invalid data", 404 file_list_html = "" for file in album['files']: file_url = f"/upload/{file['path']}" file_type = get_file_type(file['filename']) download_url = f"{file_url}?download=true" preview_html = "" if file_type == 'image': preview_html = f"""
{file['filename']}
""" elif file_type == 'video': preview_html = f"""
""" else: preview_html = f"""
""" file_list_html += f"""
{preview_html}
{file['filename']}
""" return f""" {album['albumName']}

{album['albumName']}

Download All as ZIP
{file_list_html}
""" @app.get("/album/{album_id}/download") async def download_album(album_id: str): album = await get_album_from_db(album_id) if not album or 'files' not in album: return {"error": "Album not found or invalid data"}, 404 zip_buffer = io.BytesIO() with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file: for file in album['files']: response = requests.get(f"https://replicate.delivery/pbxt/{file['path']}") zip_file.writestr(file['filename'], response.content) zip_buffer.seek(0) return StreamingResponse( io.BytesIO(zip_buffer.getvalue()), media_type="application/zip", headers={ "Content-Type": "application/zip", "Content-Disposition": f"attachment; filename={album['albumName']}.zip" } ) @app.get("/search", response_class=HTMLResponse) async def search_albums(query: str): db_albums = await get_albums_from_db() # Exclude private albums from search results matching_albums = [ album for album in db_albums['data'] if query.lower() in album.get('albumName', '').lower() and not album.get('isPrivate', False) ] results_html = "" for album in matching_albums: results_html += f"""

{album['albumName']}

Created: {album['createdAt']}

Files: {len(album['files'])}

View Album
""" return f""" Search Results

Search Results

Found {len(matching_albums)} albums for "{query}"

{results_html}
← Back to Home """ @app.get("/upload/{path:path}") async def handle_stream(path: str, request: Request): original_url = f'https://replicate.delivery/pbxt/{path}' range_header = request.headers.get('Range') headers = {'Range': range_header} if range_header else {} response = requests.get(original_url, headers=headers, stream=True) def generate(): for chunk in response.iter_content(chunk_size=8192): yield chunk headers = dict(response.headers) headers['Access-Control-Allow-Origin'] = '*' return StreamingResponse(generate(), headers=headers, status_code=response.status_code) async def get_cookies(): try: response = requests.get('https://replicate.com/levelsio/neon-tokyo', headers={ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' }) return dict(response.cookies) except Exception: return {} async def initiate_upload(cookies, filename, content_type): url = f"https://replicate.com/api/upload/{filename}?content_type={content_type}" headers = { 'X-CSRFToken': cookies['csrftoken'], 'Referer': 'https://replicate.com/levelsio/neon-tokyo', 'Origin': 'https://replicate.com' } response = requests.post(url, cookies=cookies, headers=headers) return response.json() async def retry_upload(upload_url, file_content, content_type, max_retries=5): delay = 1 for _ in range(max_retries): try: response = requests.put(upload_url, data=file_content, headers={'Content-Type': content_type}) if response.status_code == 200: return True except Exception: pass await asyncio.sleep(delay) delay *= 2 return False if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)