from fastapi import APIRouter, HTTPException, Query, Response, Request from fastapi.responses import StreamingResponse import httpx import re from urllib.parse import urlparse, urljoin import logging from scraper.extractors.engine import ExtractorEngine try: from curl_cffi.requests import AsyncSession HAS_CURL_CFFI = True except ImportError: HAS_CURL_CFFI = False router = APIRouter(prefix="/proxy", tags=["proxy"]) logger = logging.getLogger("api.proxy") @router.get("/video") async def proxy_video(url: str = Query(...), referer: str = Query(None)): """ Proxies video streams to bypass Referer blocking. For HLS, it redirects to /hls if .m3u8 is detected. """ if not url: raise HTTPException(status_code=400, detail="Missing URL") # If it's an HLS playlist, use the HLS proxy for better compatibility if ".m3u8" in url.split('?')[0]: return await proxy_hls(url, referer) try: headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36", } if referer: headers["Referer"] = referer else: domain = urlparse(url).netloc headers["Referer"] = f"https://{domain}/" async def stream_video(): async with httpx.AsyncClient(verify=False, follow_redirects=True, timeout=60.0) as client: async with client.stream("GET", url, headers=headers) as resp: if resp.status_code >= 400: yield f"Error: {resp.status_code}".encode() return async for chunk in resp.aiter_bytes(chunk_size=1024*64): yield chunk return StreamingResponse( stream_video(), media_type="video/mp4" ) except Exception as e: logger.error(f"Video proxy error: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.get("/hls") async def proxy_hls(url: str = Query(...), referer: str = Query(None)): """ Proxies HLS playlists and rewrites them to proxy segments through this backend. """ try: headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", "Referer": referer if referer else f"https://{urlparse(url).netloc}/" } async with httpx.AsyncClient(verify=False, follow_redirects=True, timeout=15.0) as client: resp = await client.get(url, headers=headers) if resp.status_code != 200: raise HTTPException(status_code=resp.status_code, detail="Failed to fetch HLS playlist") content = resp.text base_url = url.rsplit('/', 1)[0] + '/' # Advanced HLS rewriter using regex to catch URLs in lines and attributes def rewrite_url(match): full_match = match.group(0) # Extract the actual URL part if full_match.startswith('URI='): prefix = 'URI="' original_url = match.group(1) suffix = '"' else: prefix = '' original_url = match.group(0) suffix = '' # Make URL absolute abs_url = urljoin(base_url, original_url) from urllib.parse import quote safe_url = quote(abs_url, safe=':/') safe_referer = quote(referer, safe='') if referer else '' # Use absolute path for proxy from the root return f'{prefix}/proxy/video?url={safe_url}&referer={safe_referer}{suffix}' # Regex for pure URLs on lines content = re.sub(r'^(?!#)(.+)$', rewrite_url, content, flags=re.MULTILINE) # Regex for URI="url" attributes content = re.sub(r'URI="([^"]+)"', rewrite_url, content) return Response(content=content, media_type="application/x-mpegURL") except Exception as e: logger.error(f"HLS proxy error: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.get("/image") async def proxy_image(url: str = Query(...)): """ Proxies images to bypass connection issues or CORS/Referer blocking. Uses curl_cffi for better impersonation. """ if not url: raise HTTPException(status_code=400, detail="Missing URL") try: headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", "Referer": "https://asd.pics/" } if HAS_CURL_CFFI: async with AsyncSession(impersonate="chrome124", verify=False) as session: resp = await session.get(url, headers=headers, timeout=20) if resp.status_code == 200: return Response(content=resp.content, media_type=resp.headers.get("content-type", "image/jpeg")) async with httpx.AsyncClient(verify=False, follow_redirects=True, timeout=20.0) as client: resp = await client.get(url, headers=headers) if resp.status_code == 200: return Response(content=resp.content, media_type=resp.headers.get("content-type", "image/jpeg")) return Response(status_code=404) except Exception as e: logger.error(f"Image proxy error: {e}") return Response(status_code=500) @router.get("/resolve") async def resolve_url(url: str = Query(...)): """ Resolves an embed URL into a direct video stream URL. """ if not url: raise HTTPException(status_code=400, detail="Missing URL") try: result = await ExtractorEngine.extract(url) if result: return { "success": True, "url": result['url'], "type": result.get('type', 'hls'), "headers": result.get('headers', {}) } return {"success": False, "message": "Could not resolve stream"} except Exception as e: logger.error(f"Resolution error for {url}: {e}") return {"success": False, "message": str(e)}