Spaces:
Paused
Paused
File size: 11,949 Bytes
f3f8bd0 840dd01 f3f8bd0 2100890 f3f8bd0 840dd01 2100890 840dd01 f459e61 840dd01 2100890 840dd01 f459e61 840dd01 a0e40dd 840dd01 f459e61 840dd01 a0e40dd 2100890 a0e40dd 2100890 f3f8bd0 2100890 f3f8bd0 2100890 840dd01 2100890 f3f8bd0 2100890 f3f8bd0 2100890 f3f8bd0 2100890 f3f8bd0 2100890 f3f8bd0 2100890 f3f8bd0 2100890 f459e61 2100890 f3f8bd0 2100890 840dd01 f459e61 2100890 f3f8bd0 2100890 f3f8bd0 2100890 f3f8bd0 2100890 f3f8bd0 2100890 f3f8bd0 2100890 f3f8bd0 f459e61 2100890 f3f8bd0 2100890 f3f8bd0 2100890 f3f8bd0 2100890 f459e61 2100890 f3f8bd0 2100890 f459e61 a0e40dd f459e61 a0e40dd f459e61 2100890 f3f8bd0 f459e61 2100890 f459e61 2100890 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 |
import re
import urllib.parse
from fastapi import FastAPI, Request, HTTPException, Response
import httpx
from bs4 import BeautifulSoup
app = FastAPI()
# ---------------------------------------------------------------------------
# Utility: Rewrite CSS URLs inside CSS text
# ---------------------------------------------------------------------------
def rewrite_css(css_text: str, target_url: str, proxy_prefix="/?url=") -> str:
"""
Finds all url(...) references in CSS and rewrites them so that
the resources are loaded via the proxy.
"""
# Matches: url('...'), url("..."), or url(...)
pattern = re.compile(r'url\(\s*(?P<quote>["\']?)(?P<url>[^"\')]+)(?P=quote)\s*\)')
def replace_url(match):
original_url = match.group("url")
# Skip if already proxied, is a data URL, or if it's already absolute and served correctly.
if original_url.startswith(proxy_prefix) or original_url.startswith("data:"):
return match.group(0)
new_url = urllib.parse.urljoin(target_url, original_url)
proxied = f'url({match.group("quote")}{proxy_prefix}{urllib.parse.quote(new_url)}{match.group("quote")})'
return proxied
return pattern.sub(replace_url, css_text)
# ---------------------------------------------------------------------------
# Injected JavaScript for Dynamic Interception & Real-Time Updates
# ---------------------------------------------------------------------------
# This script intercepts history changes, fetch, XHR, and anchor clicks,
# and uses MutationObserver to rewrite new elements. It also handles SVG
# attributes (including xlink:href) for icons and logos.
INJECTED_JS = """
<script>
(function() {
// Intercept history.pushState to route dynamic navigations.
const originalPushState = history.pushState;
history.pushState = function(state, title, url) {
if (url) {
const proxiedUrl = '/?url=' + encodeURIComponent(url);
return originalPushState.call(history, state, title, proxiedUrl);
}
return originalPushState.call(history, state, title, url);
};
// Intercept fetch() calls.
const originalFetch = window.fetch;
window.fetch = function(input, init) {
let url;
if (typeof input === 'string') {
url = input;
} else if (input && input.url) {
url = input.url;
} else {
return originalFetch(input, init);
}
const proxiedUrl = '/?url=' + encodeURIComponent(url);
if (typeof input === 'object') {
input = new Request(proxiedUrl, input);
} else {
input = proxiedUrl;
}
return originalFetch(input, init);
};
// Intercept XMLHttpRequest.open().
const originalOpen = XMLHttpRequest.prototype.open;
XMLHttpRequest.prototype.open = function(method, url) {
const proxiedUrl = '/?url=' + encodeURIComponent(url);
return originalOpen.apply(this, [method, proxiedUrl, true]);
};
// Intercept anchor clicks to ensure navigation goes through the proxy.
document.addEventListener('click', function(event) {
const target = event.target.closest('a');
if (target && target.href) {
if (target.getAttribute('data-no-proxy') || target.href.indexOf('/?url=') === 0) {
return;
}
event.preventDefault();
window.location.href = '/?url=' + encodeURIComponent(target.href);
}
});
// Use MutationObserver to catch and rewrite dynamically added elements.
const observer = new MutationObserver(function(mutations) {
mutations.forEach(function(mutation) {
mutation.addedNodes.forEach(function(node) {
if (node.nodeType === Node.ELEMENT_NODE) {
reProxyElement(node);
}
});
});
});
observer.observe(document.body, { childList: true, subtree: true });
// Rewrites URL attributes for an element and its children.
function reProxyElement(element) {
const urlAttrs = ['href', 'src', 'action', 'srcset', 'xlink:href'];
urlAttrs.forEach(function(attr) {
if (element.hasAttribute(attr)) {
const value = element.getAttribute(attr);
if (value && !value.startsWith('/?url=') &&
!value.startsWith('javascript:') && !value.startsWith('mailto:')) {
element.setAttribute(attr, '/?url=' + encodeURIComponent(value));
}
}
});
// Also rewrite inline style attribute.
if (element.hasAttribute('style')) {
let styleVal = element.getAttribute('style');
// Simple client-side rewriting: prepend proxy to URLs.
styleVal = styleVal.replace(/url\\((['"]?)(.*?)\\1\\)/g, function(match, quote, url) {
if (url.startsWith('/?url=') || url.startsWith('data:')) return match;
return "url(" + quote + "/?url=" + encodeURIComponent(url) + quote + ")";
});
element.setAttribute('style', styleVal);
}
Array.from(element.children).forEach(child => reProxyElement(child));
}
})();
</script>
"""
# ---------------------------------------------------------------------------
# Helper Function: Rewrite URLs for All Relevant Attributes in HTML
# ---------------------------------------------------------------------------
def rewrite_urls(soup, target_url, proxy_prefix="/?url="):
"""
Iterates over all elements in the parsed HTML and rewrites URL-like attributes.
Supports attributes such as: href, src, action, srcset, xlink:href.
Also rewrites inline style attributes that contain CSS url() references.
"""
url_attrs = ['href', 'src', 'action', 'srcset', 'xlink:href']
for element in soup.find_all(True):
# Rewrite attributes that hold URLs.
for attr in list(element.attrs):
if attr in url_attrs:
orig_value = element.get(attr)
if not orig_value or orig_value.startswith(proxy_prefix) or orig_value.startswith(("mailto:", "javascript:")):
continue
new_url = urllib.parse.urljoin(target_url, orig_value)
element[attr] = proxy_prefix + urllib.parse.quote(new_url)
# Rewrite inline style attributes containing CSS url() patterns.
if element.has_attr("style"):
original_style = element.get("style")
new_style = rewrite_css(original_style, target_url, proxy_prefix)
element["style"] = new_style
return soup
# ---------------------------------------------------------------------------
# Core Function: Fetch and Rewrite the Target Resource
# ---------------------------------------------------------------------------
async def fetch_and_rewrite(target_url: str) -> Response:
"""
Fetches the target URL and processes the response.
- If the content is HTML, removes conflicting CSP tags, inserts a <base> tag,
injects dynamic JavaScript, and rewrites URL attributes.
- If the content is CSS, rewrites any url() references so that the resources are proxied.
- Other content types are returned as-is.
"""
async with httpx.AsyncClient() as client:
resp = await client.get(target_url)
content_type = resp.headers.get("Content-Type", "")
# Process CSS files to rewrite url() references.
if "text/css" in content_type:
new_css = rewrite_css(resp.text, target_url)
return Response(content=new_css, media_type="text/css", status_code=resp.status_code)
# Process HTML content.
if "text/html" in content_type:
soup = BeautifulSoup(resp.text, "html.parser")
# Remove any Content Security Policy meta tags that might block our scripts.
for meta in soup.find_all("meta", attrs={"http-equiv": "Content-Security-Policy"}):
meta.decompose()
# Insert (or replace) a <base> tag to ensure relative URLs resolve properly.
parsed_target = urllib.parse.urlparse(target_url)
base_href = f"{parsed_target.scheme}://{parsed_target.netloc}"
if soup.head:
for base in soup.head.find_all("base"):
base.decompose()
base_tag = soup.new_tag("base", href=base_href)
soup.head.insert(0, base_tag)
else:
head_tag = soup.new_tag("head")
base_tag = soup.new_tag("base", href=base_href)
head_tag.insert(0, base_tag)
soup.insert(0, head_tag)
# Inject dynamic JavaScript for real-time interception.
if soup.body:
soup.body.insert(0, BeautifulSoup(INJECTED_JS, "html.parser"))
else:
soup.insert(0, BeautifulSoup(INJECTED_JS, "html.parser"))
# Rewrite URLs (including SVG icons, CSS backgrounds in style attributes, etc.).
soup = rewrite_urls(soup, target_url)
return Response(content=str(soup), media_type="text/html", status_code=resp.status_code)
# For other content types (JS, images, etc.) return the response directly.
return Response(content=resp.content, media_type=content_type, status_code=resp.status_code)
# ---------------------------------------------------------------------------
# Simple Proxy Endpoint: /proxy_full
# ---------------------------------------------------------------------------
@app.get("/proxy_full")
async def proxy_full(url: str):
"""
A simple proxy endpoint that fetches the given URL and rewrites its HTML/CSS so that all resource URLs route via the proxy.
"""
if not url:
raise HTTPException(status_code=400, detail="Missing 'url' query parameter")
async with httpx.AsyncClient() as client:
resp = await client.get(url)
content_type = resp.headers.get("Content-Type", "")
if "text/html" in content_type:
soup = BeautifulSoup(resp.text, "html.parser")
soup = rewrite_urls(soup, url)
return Response(content=str(soup), media_type="text/html")
elif "text/css" in content_type:
new_css = rewrite_css(resp.text, url)
return Response(content=new_css, media_type="text/css")
return Response(content=resp.content, media_type=content_type, status_code=resp.status_code)
# ---------------------------------------------------------------------------
# Catch-All Dynamic Proxy Endpoint
# ---------------------------------------------------------------------------
@app.get("/{full_path:path}")
async def catch_all(full_path: str, request: Request):
"""
Catch-all endpoint for dynamic proxying.
Determines the target URL via a query parameter or a stored cookie,
then processes the resource (HTML or CSS) to inject dynamic JS and rewrite URLs.
"""
query_params = dict(request.query_params)
if "url" in query_params:
target_url = query_params["url"]
else:
target_base = request.cookies.get("target_base")
if not target_base:
return Response("No target URL provided.", status_code=400)
qs = request.url.query
target_url = urllib.parse.urljoin(target_base, full_path)
if qs:
target_url += "?" + qs
response = await fetch_and_rewrite(target_url)
# Store the target's base URL in a cookie for subsequent requests.
parsed_target = urllib.parse.urlparse(target_url)
base_url = f"{parsed_target.scheme}://{parsed_target.netloc}"
response.set_cookie("target_base", base_url)
return response
# ---------------------------------------------------------------------------
# Run the Application on Port 7860
# ---------------------------------------------------------------------------
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)
|