Spaces:
Paused
Paused
| import re | |
| import urllib.parse | |
| from fastapi import FastAPI, Request, HTTPException, Response | |
| import httpx | |
| from bs4 import BeautifulSoup | |
| app = FastAPI() | |
| # --------------------------------------------------------------------------- | |
| # Utility: Rewrite CSS URLs inside CSS text | |
| # --------------------------------------------------------------------------- | |
| def rewrite_css(css_text: str, target_url: str, proxy_prefix="/?url=") -> str: | |
| """ | |
| Finds all url(...) references in CSS and rewrites them so that | |
| the resources are loaded via the proxy. | |
| """ | |
| # Matches: url('...'), url("..."), or url(...) | |
| pattern = re.compile(r'url\(\s*(?P<quote>["\']?)(?P<url>[^"\')]+)(?P=quote)\s*\)') | |
| def replace_url(match): | |
| original_url = match.group("url") | |
| # Skip if already proxied, is a data URL, or if it's already absolute and served correctly. | |
| if original_url.startswith(proxy_prefix) or original_url.startswith("data:"): | |
| return match.group(0) | |
| new_url = urllib.parse.urljoin(target_url, original_url) | |
| proxied = f'url({match.group("quote")}{proxy_prefix}{urllib.parse.quote(new_url)}{match.group("quote")})' | |
| return proxied | |
| return pattern.sub(replace_url, css_text) | |
| # --------------------------------------------------------------------------- | |
| # Injected JavaScript for Dynamic Interception & Real-Time Updates | |
| # --------------------------------------------------------------------------- | |
| # This script intercepts history changes, fetch, XHR, and anchor clicks, | |
| # and uses MutationObserver to rewrite new elements. It also handles SVG | |
| # attributes (including xlink:href) for icons and logos. | |
| INJECTED_JS = """ | |
| <script> | |
| (function() { | |
| // Intercept history.pushState to route dynamic navigations. | |
| const originalPushState = history.pushState; | |
| history.pushState = function(state, title, url) { | |
| if (url) { | |
| const proxiedUrl = '/?url=' + encodeURIComponent(url); | |
| return originalPushState.call(history, state, title, proxiedUrl); | |
| } | |
| return originalPushState.call(history, state, title, url); | |
| }; | |
| // Intercept fetch() calls. | |
| const originalFetch = window.fetch; | |
| window.fetch = function(input, init) { | |
| let url; | |
| if (typeof input === 'string') { | |
| url = input; | |
| } else if (input && input.url) { | |
| url = input.url; | |
| } else { | |
| return originalFetch(input, init); | |
| } | |
| const proxiedUrl = '/?url=' + encodeURIComponent(url); | |
| if (typeof input === 'object') { | |
| input = new Request(proxiedUrl, input); | |
| } else { | |
| input = proxiedUrl; | |
| } | |
| return originalFetch(input, init); | |
| }; | |
| // Intercept XMLHttpRequest.open(). | |
| const originalOpen = XMLHttpRequest.prototype.open; | |
| XMLHttpRequest.prototype.open = function(method, url) { | |
| const proxiedUrl = '/?url=' + encodeURIComponent(url); | |
| return originalOpen.apply(this, [method, proxiedUrl, true]); | |
| }; | |
| // Intercept anchor clicks to ensure navigation goes through the proxy. | |
| document.addEventListener('click', function(event) { | |
| const target = event.target.closest('a'); | |
| if (target && target.href) { | |
| if (target.getAttribute('data-no-proxy') || target.href.indexOf('/?url=') === 0) { | |
| return; | |
| } | |
| event.preventDefault(); | |
| window.location.href = '/?url=' + encodeURIComponent(target.href); | |
| } | |
| }); | |
| // Use MutationObserver to catch and rewrite dynamically added elements. | |
| const observer = new MutationObserver(function(mutations) { | |
| mutations.forEach(function(mutation) { | |
| mutation.addedNodes.forEach(function(node) { | |
| if (node.nodeType === Node.ELEMENT_NODE) { | |
| reProxyElement(node); | |
| } | |
| }); | |
| }); | |
| }); | |
| observer.observe(document.body, { childList: true, subtree: true }); | |
| // Rewrites URL attributes for an element and its children. | |
| function reProxyElement(element) { | |
| const urlAttrs = ['href', 'src', 'action', 'srcset', 'xlink:href']; | |
| urlAttrs.forEach(function(attr) { | |
| if (element.hasAttribute(attr)) { | |
| const value = element.getAttribute(attr); | |
| if (value && !value.startsWith('/?url=') && | |
| !value.startsWith('javascript:') && !value.startsWith('mailto:')) { | |
| element.setAttribute(attr, '/?url=' + encodeURIComponent(value)); | |
| } | |
| } | |
| }); | |
| // Also rewrite inline style attribute. | |
| if (element.hasAttribute('style')) { | |
| let styleVal = element.getAttribute('style'); | |
| // Simple client-side rewriting: prepend proxy to URLs. | |
| styleVal = styleVal.replace(/url\\((['"]?)(.*?)\\1\\)/g, function(match, quote, url) { | |
| if (url.startsWith('/?url=') || url.startsWith('data:')) return match; | |
| return "url(" + quote + "/?url=" + encodeURIComponent(url) + quote + ")"; | |
| }); | |
| element.setAttribute('style', styleVal); | |
| } | |
| Array.from(element.children).forEach(child => reProxyElement(child)); | |
| } | |
| })(); | |
| </script> | |
| """ | |
| # --------------------------------------------------------------------------- | |
| # Helper Function: Rewrite URLs for All Relevant Attributes in HTML | |
| # --------------------------------------------------------------------------- | |
| def rewrite_urls(soup, target_url, proxy_prefix="/?url="): | |
| """ | |
| Iterates over all elements in the parsed HTML and rewrites URL-like attributes. | |
| Supports attributes such as: href, src, action, srcset, xlink:href. | |
| Also rewrites inline style attributes that contain CSS url() references. | |
| """ | |
| url_attrs = ['href', 'src', 'action', 'srcset', 'xlink:href'] | |
| for element in soup.find_all(True): | |
| # Rewrite attributes that hold URLs. | |
| for attr in list(element.attrs): | |
| if attr in url_attrs: | |
| orig_value = element.get(attr) | |
| if not orig_value or orig_value.startswith(proxy_prefix) or orig_value.startswith(("mailto:", "javascript:")): | |
| continue | |
| new_url = urllib.parse.urljoin(target_url, orig_value) | |
| element[attr] = proxy_prefix + urllib.parse.quote(new_url) | |
| # Rewrite inline style attributes containing CSS url() patterns. | |
| if element.has_attr("style"): | |
| original_style = element.get("style") | |
| new_style = rewrite_css(original_style, target_url, proxy_prefix) | |
| element["style"] = new_style | |
| return soup | |
| # --------------------------------------------------------------------------- | |
| # Core Function: Fetch and Rewrite the Target Resource | |
| # --------------------------------------------------------------------------- | |
| async def fetch_and_rewrite(target_url: str) -> Response: | |
| """ | |
| Fetches the target URL and processes the response. | |
| - If the content is HTML, removes conflicting CSP tags, inserts a <base> tag, | |
| injects dynamic JavaScript, and rewrites URL attributes. | |
| - If the content is CSS, rewrites any url() references so that the resources are proxied. | |
| - Other content types are returned as-is. | |
| """ | |
| async with httpx.AsyncClient() as client: | |
| resp = await client.get(target_url) | |
| content_type = resp.headers.get("Content-Type", "") | |
| # Process CSS files to rewrite url() references. | |
| if "text/css" in content_type: | |
| new_css = rewrite_css(resp.text, target_url) | |
| return Response(content=new_css, media_type="text/css", status_code=resp.status_code) | |
| # Process HTML content. | |
| if "text/html" in content_type: | |
| soup = BeautifulSoup(resp.text, "html.parser") | |
| # Remove any Content Security Policy meta tags that might block our scripts. | |
| for meta in soup.find_all("meta", attrs={"http-equiv": "Content-Security-Policy"}): | |
| meta.decompose() | |
| # Insert (or replace) a <base> tag to ensure relative URLs resolve properly. | |
| parsed_target = urllib.parse.urlparse(target_url) | |
| base_href = f"{parsed_target.scheme}://{parsed_target.netloc}" | |
| if soup.head: | |
| for base in soup.head.find_all("base"): | |
| base.decompose() | |
| base_tag = soup.new_tag("base", href=base_href) | |
| soup.head.insert(0, base_tag) | |
| else: | |
| head_tag = soup.new_tag("head") | |
| base_tag = soup.new_tag("base", href=base_href) | |
| head_tag.insert(0, base_tag) | |
| soup.insert(0, head_tag) | |
| # Inject dynamic JavaScript for real-time interception. | |
| if soup.body: | |
| soup.body.insert(0, BeautifulSoup(INJECTED_JS, "html.parser")) | |
| else: | |
| soup.insert(0, BeautifulSoup(INJECTED_JS, "html.parser")) | |
| # Rewrite URLs (including SVG icons, CSS backgrounds in style attributes, etc.). | |
| soup = rewrite_urls(soup, target_url) | |
| return Response(content=str(soup), media_type="text/html", status_code=resp.status_code) | |
| # For other content types (JS, images, etc.) return the response directly. | |
| return Response(content=resp.content, media_type=content_type, status_code=resp.status_code) | |
| # --------------------------------------------------------------------------- | |
| # Simple Proxy Endpoint: /proxy_full | |
| # --------------------------------------------------------------------------- | |
| async def proxy_full(url: str): | |
| """ | |
| A simple proxy endpoint that fetches the given URL and rewrites its HTML/CSS so that all resource URLs route via the proxy. | |
| """ | |
| if not url: | |
| raise HTTPException(status_code=400, detail="Missing 'url' query parameter") | |
| async with httpx.AsyncClient() as client: | |
| resp = await client.get(url) | |
| content_type = resp.headers.get("Content-Type", "") | |
| if "text/html" in content_type: | |
| soup = BeautifulSoup(resp.text, "html.parser") | |
| soup = rewrite_urls(soup, url) | |
| return Response(content=str(soup), media_type="text/html") | |
| elif "text/css" in content_type: | |
| new_css = rewrite_css(resp.text, url) | |
| return Response(content=new_css, media_type="text/css") | |
| return Response(content=resp.content, media_type=content_type, status_code=resp.status_code) | |
| # --------------------------------------------------------------------------- | |
| # Catch-All Dynamic Proxy Endpoint | |
| # --------------------------------------------------------------------------- | |
| async def catch_all(full_path: str, request: Request): | |
| """ | |
| Catch-all endpoint for dynamic proxying. | |
| Determines the target URL via a query parameter or a stored cookie, | |
| then processes the resource (HTML or CSS) to inject dynamic JS and rewrite URLs. | |
| """ | |
| query_params = dict(request.query_params) | |
| if "url" in query_params: | |
| target_url = query_params["url"] | |
| else: | |
| target_base = request.cookies.get("target_base") | |
| if not target_base: | |
| return Response("No target URL provided.", status_code=400) | |
| qs = request.url.query | |
| target_url = urllib.parse.urljoin(target_base, full_path) | |
| if qs: | |
| target_url += "?" + qs | |
| response = await fetch_and_rewrite(target_url) | |
| # Store the target's base URL in a cookie for subsequent requests. | |
| parsed_target = urllib.parse.urlparse(target_url) | |
| base_url = f"{parsed_target.scheme}://{parsed_target.netloc}" | |
| response.set_cookie("target_base", base_url) | |
| return response | |
| # --------------------------------------------------------------------------- | |
| # Run the Application on Port 7860 | |
| # --------------------------------------------------------------------------- | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |