import os import logging from fastapi import FastAPI, HTTPException, Request from fastapi.responses import StreamingResponse, RedirectResponse from fastapi.middleware.cors import CORSMiddleware import httpx from urllib.parse import urlparse, unquote from PIL import Image import io import re app = FastAPI( title="Universal Image Proxy", description="Proxy de imagens para qualquer aplicação com tratamento robusto de erros", version="2.1.0" ) # Configuração de logs logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger("image-proxy") # CORS completo para acesso universal app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["GET"], allow_headers=["*"], expose_headers=["*"] ) # Configurações otimizadas para Hugging Face Spaces TIMEOUT = 10.0 MAX_SIZE = 3 * 1024 * 1024 # 3MB MAX_DIMENSION = 2000 MAX_REDIRECTS = 5 # Lista de domínios problemáticos que precisam de tratamento especial PROBLEMATIC_DOMAINS = [ "static.wikia.nocookie.net", "tmsimg.com", "espncdn.com" ] def clean_url(url: str) -> str: """Remove caracteres problemáticos e decodifica URL""" # Decodificar URL codificada cleaned = unquote(url) # Remover parâmetros de cache para alguns domínios if any(domain in cleaned for domain in PROBLEMATIC_DOMAINS): cleaned = re.sub(r'(\?|&)(cb|revision|__cf_chl_[^=]+)=[^&]+', '', cleaned) cleaned = re.sub(r'\?.*', '', cleaned) return cleaned @app.get("/") async def home(): """Página inicial com instruções simplificadas""" return { "service": "Universal Image Proxy", "status": "active", "endpoint": "/proxy-image/?url=URL_DA_IMAGEM", "parameters": { "w": "Largura desejada (opcional)", "h": "Altura desejada (opcional)", "q": "Qualidade (1-100, padrão 85)" }, "example": "/proxy-image/?url=https://example.com/photo.jpg&w=300&q=85" } @app.get("/proxy-image/") async def proxy_image( request: Request, url: str, w: int = None, h: int = None, q: int = 85 ): """Endpoint principal do proxy de imagens com tratamento robusto de erros""" try: # Limpar e validar URL cleaned_url = clean_url(url) parsed = urlparse(cleaned_url) if not parsed.scheme or not parsed.netloc or parsed.scheme not in ("http", "https"): raise HTTPException(400, "URL inválida") # Headers para evitar bloqueio headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", "Accept": "image/*,*/*;q=0.8", "Referer": f"{parsed.scheme}://{parsed.netloc}/" } # Buscar a imagem async with httpx.AsyncClient(follow_redirects=True, max_redirects=MAX_REDIRECTS) as client: response = await client.get( cleaned_url, headers=headers, timeout=TIMEOUT ) # Tratamento de erros HTTP if response.status_code != 200: error_msg = f"Origem retornou erro {response.status_code}" logger.warning(f"{error_msg} para: {cleaned_url}") raise HTTPException(response.status_code, error_msg) content_type = response.headers.get("Content-Type", "") if not content_type.startswith("image/"): error_msg = f"Tipo de conteúdo inválido: {content_type}" logger.warning(f"{error_msg} - URL: {cleaned_url}") raise HTTPException(400, error_msg) # Verificar tamanho if len(response.content) > MAX_SIZE: error_msg = f"Imagem muito grande ({len(response.content)} bytes)" logger.warning(f"{error_msg} - URL: {cleaned_url}") raise HTTPException(400, error_msg) # Processar imagem img_bytes = response.content content_type = response.headers.get("Content-Type", "image/jpeg") # Aplicar transformações se solicitado if w or h or q != 85: try: with Image.open(io.BytesIO(img_bytes)) as img: # Limitar dimensões máximas if w and w > MAX_DIMENSION: w = MAX_DIMENSION if h and h > MAX_DIMENSION: h = MAX_DIMENSION # Preservar proporções if w and not h: h = int(w * img.height / img.width) elif h and not w: w = int(h * img.width / img.height) # Redimensionar se necessário if w or h: img = img.resize((w or img.width, h or img.height)) # Converter para JPEG (exceto PNGs com transparência) output_format = "JPEG" if img.format == "PNG" and img.mode == "RGBA": output_format = "PNG" output = io.BytesIO() img.save(output, format=output_format, quality=q, optimize=True) img_bytes = output.getvalue() content_type = f"image/{output_format.lower()}" except Exception as img_error: logger.error(f"Erro no processamento: {str(img_error)} - URL: {cleaned_url}") # Manter a imagem original se falhar no processamento # Headers de resposta headers = { "Cache-Control": "public, max-age=86400", "Access-Control-Allow-Origin": "*", "X-Image-Proxy": "processed" if (w or h or q != 85) else "original", "X-Original-URL": cleaned_url } return StreamingResponse( io.BytesIO(img_bytes), media_type=content_type, headers=headers ) except httpx.HTTPStatusError as e: logger.error(f"Erro HTTP {e.response.status_code} para: {cleaned_url}") raise HTTPException(e.response.status_code, f"Erro na origem: {e.response.status_code}") except httpx.TimeoutException: logger.error(f"Timeout para: {cleaned_url}") raise HTTPException(504, "Tempo de requisição excedido") except httpx.RequestError as e: logger.error(f"Erro de conexão: {str(e)} - URL: {cleaned_url}") raise HTTPException(502, f"Erro de conexão: {str(e)}") except HTTPException: # Re-lançar exceções HTTP já tratadas raise except Exception as e: logger.exception(f"Erro inesperado: {str(e)} - URL: {cleaned_url}") raise HTTPException(500, f"Erro interno: {str(e)}") @app.get("/favicon.ico") async def favicon(): return RedirectResponse( "https://www.stremio.com/favicon.ico", status_code=301, headers={"Cache-Control": "public, max-age=31536000"} ) # Função para compatibilidade com Hugging Face Spaces def get_app(): return app