from fastapi import FastAPI, File, UploadFile, Request from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse, FileResponse import requests import time import asyncio from typing import Dict import os import mimetypes import secrets import json app = FastAPI() # Store shortened URLs SHORTENED_URLS = {} HTML_CONTENT = """ File Uploader

File Uploader

or drag and drop file here/paste image

All file types are supported
×

Upload History

×
""" @app.get("/", response_class=HTMLResponse) async def index(): return HTML_CONTENT @app.post("/upload") async def handle_upload(file: UploadFile = File(...)): if not file.filename: return JSONResponse(content={"error": "No file selected."}, status_code=400) cookies = await get_cookies() if 'csrftoken' not in cookies or 'sessionid' not in cookies: return JSONResponse(content={"error": "Failed"}, status_code=500) original_extension = os.path.splitext(file.filename)[1][1:] supported_types = ['mp4', 'png', 'jpg', 'jpeg', 'gif', 'mp3', 'pdf', 'txt'] if original_extension.lower() in supported_types: temp_filename = file.filename content_type = file.content_type else: temp_filename = f"{file.filename}.png" content_type = "image/png" upload_result = await initiate_upload(cookies, temp_filename, content_type) if not upload_result or 'upload_url' not in upload_result: return JSONResponse(content={"error": "Failed to upload"}, status_code=500) file_content = await file.read() upload_success = await retry_upload(upload_result['upload_url'], file_content, content_type) if not upload_success: return JSONResponse(content={"error": "FAILED GOD MAN AFTER alot of attempts"}, status_code=500) original_url = upload_result['serving_url'] mirrored_url = f"/pp/{original_url.split('/pbxt/')[1]}" if original_extension.lower() not in supported_types: mirrored_url = mirrored_url.replace('.png', '') return JSONResponse(content={"url": mirrored_url, "originalExtension": original_extension}) @app.post("/shorten") async def shorten_url(request: Request): data = await request.json() original_url = data.get("url") if not original_url: return JSONResponse(content={"error": "No URL provided"}, status_code=400) # Generate a short code short_code = ''.join(secrets.choice('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789') for _ in range(6)) # Store the mapping SHORTENED_URLS[short_code] = original_url # Save to file for persistence try: with open("shortened_urls.json", "w") as f: json.dump(SHORTENED_URLS, f) except: # Not critical if it fails to save pass return JSONResponse(content={"shortUrl": f"/s/{short_code}"}) @app.get("/s/{short_code}") async def redirect_short_url(short_code: str): # Try to load from file first (for persistence) try: if not SHORTENED_URLS and os.path.exists("shortened_urls.json"): with open("shortened_urls.json", "r") as f: shortened_urls_data = json.load(f) SHORTENED_URLS.update(shortened_urls_data) except: pass # Get the original URL original_url = SHORTENED_URLS.get(short_code) if not original_url: return JSONResponse(content={"error": "Short URL not found"}, status_code=404) # HTML for redirecting html_content = f"""

Redirecting...

""" return HTMLResponse(content=html_content) @app.get("/pp/{path:path}") async def handle_file_stream(path: str, request: Request): original_url = f'https://replicate.delivery/pbxt/{path}' if not path.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.mp4', '.mp3', '.pdf', '.txt')): original_url += '.png' range_header = request.headers.get('Range') headers = {'Range': range_header} if range_header else {} response = requests.get(original_url, headers=headers, stream=True) def generate(): for chunk in response.iter_content(chunk_size=8192): yield chunk headers = dict(response.headers) headers['Access-Control-Allow-Origin'] = '*' headers['Content-Disposition'] = 'inline' if response.status_code == 206: headers['Content-Range'] = response.headers.get('Content-Range') original_extension = os.path.splitext(path)[1][1:] content_type, _ = mimetypes.guess_type(f"file.{original_extension}") if content_type: headers['Content-Type'] = content_type return StreamingResponse(generate(), status_code=response.status_code, headers=headers) @app.get("/embed") async def embed_video(url: str, thumbnail: str): html = f''' ''' return HTMLResponse(content=html) async def get_cookies() -> Dict[str, str]: try: response = requests.get('https://replicate.com/levelsio/neon-tokyo', headers={ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36' }) return dict(response.cookies) except Exception as e: print(f'Error fetching the page: {e}') return {} async def initiate_upload(cookies: Dict[str, str], filename: str, content_type: str) -> Dict: url = f'https://replicate.com/api/upload/{filename}?content_type={content_type}' try: response = requests.post(url, cookies=cookies, headers={ 'X-CSRFToken': cookies.get('csrftoken'), 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36', 'Referer': 'https://replicate.com/levelsio/neon-tokyo', 'Origin': 'https://replicate.com', 'Accept': '*/*', 'Accept-Language': 'en-US,en;q=0.5', 'Accept-Encoding': 'identity', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-origin', 'Sec-GPC': '1', 'Priority': 'u=1, i' }) return response.json() except Exception as e: print(f'Error initiating upload: {e}') raise async def upload_file(upload_url: str, file_content: bytes, content_type: str) -> bool: try: response = requests.put(upload_url, data=file_content, headers={'Content-Type': content_type}) return response.status_code == 200 except Exception as e: print(f'Error uploading file: {e}') return False async def retry_upload(upload_url: str, file_content: bytes, content_type: str, max_retries: int = 5, delay: int = 1) -> bool: while True: try: success = await upload_file(upload_url, file_content, content_type) if success: return True print("Upload failed. Retrying...") except Exception as e: print(f"Error during upload: {e}") await asyncio.sleep(delay) delay = min(delay * 2, 60) return False