from fastapi import FastAPI, File, UploadFile, Request from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse, FileResponse import requests import time import asyncio from typing import Dict import os import mimetypes import secrets import json app = FastAPI() # Store shortened URLs SHORTENED_URLS = {} HTML_CONTENT = """
Redirecting...
""" return HTMLResponse(content=html_content) @app.get("/pp/{path:path}") async def handle_file_stream(path: str, request: Request): original_url = f'https://replicate.delivery/pbxt/{path}' if not path.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.mp4', '.mp3', '.pdf', '.txt')): original_url += '.png' range_header = request.headers.get('Range') headers = {'Range': range_header} if range_header else {} response = requests.get(original_url, headers=headers, stream=True) def generate(): for chunk in response.iter_content(chunk_size=8192): yield chunk headers = dict(response.headers) headers['Access-Control-Allow-Origin'] = '*' headers['Content-Disposition'] = 'inline' if response.status_code == 206: headers['Content-Range'] = response.headers.get('Content-Range') original_extension = os.path.splitext(path)[1][1:] content_type, _ = mimetypes.guess_type(f"file.{original_extension}") if content_type: headers['Content-Type'] = content_type return StreamingResponse(generate(), status_code=response.status_code, headers=headers) @app.get("/embed") async def embed_video(url: str, thumbnail: str): html = f'''