| | from fastapi import FastAPI, Query |
| | from fastapi.responses import StreamingResponse, PlainTextResponse, Response |
| | import httpx |
| | from urllib.parse import quote, unquote |
| | import base64 |
| |
|
| | app = FastAPI() |
| |
|
| | async def universal_proxy(url: str): |
| | try: |
| | async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as client: |
| | r = await client.get(url) |
| | content_type = r.headers.get("content-type", "") |
| | original_text = r.text if "text" in content_type or url.endswith(".m3u8") else None |
| |
|
| | |
| | if "application/vnd.apple.mpegurl" in content_type or url.endswith(".m3u8"): |
| | base_url = url.rsplit("/", 1)[0] + "/" |
| | lines = original_text.splitlines() |
| | new_lines = [] |
| |
|
| | for line in lines: |
| | line = line.strip() |
| | if line.startswith("#EXT-X-KEY") and "URI=" in line: |
| | key_url = line.split('URI="')[1].split('"')[0] |
| | full_key_url = key_url if key_url.startswith("http") else base_url + key_url |
| | encoded_key = quote(base64.urlsafe_b64encode(full_key_url.encode()).decode()) |
| | line = line.replace(key_url, f"/proxy_safe?encoded={encoded_key}") |
| | new_lines.append(line) |
| | elif line and not line.startswith("#"): |
| | full_url = line if line.startswith("http") else base_url + line |
| | encoded_url = quote(base64.urlsafe_b64encode(full_url.encode()).decode()) |
| | new_lines.append(f"/proxy_safe?encoded={encoded_url}") |
| | else: |
| | new_lines.append(line) |
| |
|
| | return PlainTextResponse("\n".join(new_lines), media_type="application/vnd.apple.mpegurl") |
| |
|
| | |
| | elif any(ext in url.lower() for ext in [".ts", ".key", ".jpeg", ".jpg", ".avif"]): |
| | async def stream(): |
| | async with client.stream("GET", url) as resp: |
| | async for chunk in resp.aiter_bytes(): |
| | yield chunk |
| | return StreamingResponse(stream(), media_type=content_type) |
| |
|
| | |
| | return Response(content=r.content, media_type=content_type) |
| |
|
| | except Exception as e: |
| | return PlainTextResponse(f"Hata: {str(e)}", status_code=500) |
| |
|
| | @app.get("/proxy_safe") |
| | async def proxy_safe(encoded: str): |
| | try: |
| | decoded_url = base64.urlsafe_b64decode(encoded.encode()).decode() |
| | return await universal_proxy(decoded_url) |
| | except Exception as e: |
| | return PlainTextResponse(f"Base64 çözümleme hatası: {str(e)}", status_code=400) |
| |
|
| |
|
| |
|
| | @app.get("/proxy/ts") |
| | async def proxy_ts( |
| | url: str, |
| | h_User_Agent: str = Query(None), |
| | h_Referer: str = Query(None) |
| | ): |
| | headers = {} |
| | if h_User_Agent: |
| | headers["User-Agent"] = h_User_Agent |
| | if h_Referer: |
| | headers["Referer"] = h_Referer |
| |
|
| | client = httpx.AsyncClient(timeout=30.0, headers=headers, follow_redirects=True) |
| | |
| |
|
| | async def stream(): |
| | async with client.stream("GET", url) as resp: |
| | |
| | resp.raise_for_status() |
| | async for chunk in resp.aiter_bytes(): |
| | yield chunk |
| | await client.aclose() |
| |
|
| | |
| | |
| |
|
| | return StreamingResponse(stream(), media_type="application/octet-stream") |
| |
|
| |
|
| |
|