testingproxy123 / main.py
triflix's picture
Update main.py
f3f8bd0 verified
import re
import urllib.parse
from fastapi import FastAPI, Request, HTTPException, Response
import httpx
from bs4 import BeautifulSoup
app = FastAPI()
# ---------------------------------------------------------------------------
# Utility: Rewrite CSS URLs inside CSS text
# ---------------------------------------------------------------------------
def rewrite_css(css_text: str, target_url: str, proxy_prefix="/?url=") -> str:
"""
Finds all url(...) references in CSS and rewrites them so that
the resources are loaded via the proxy.
"""
# Matches: url('...'), url("..."), or url(...)
pattern = re.compile(r'url\(\s*(?P<quote>["\']?)(?P<url>[^"\')]+)(?P=quote)\s*\)')
def replace_url(match):
original_url = match.group("url")
# Skip if already proxied, is a data URL, or if it's already absolute and served correctly.
if original_url.startswith(proxy_prefix) or original_url.startswith("data:"):
return match.group(0)
new_url = urllib.parse.urljoin(target_url, original_url)
proxied = f'url({match.group("quote")}{proxy_prefix}{urllib.parse.quote(new_url)}{match.group("quote")})'
return proxied
return pattern.sub(replace_url, css_text)
# ---------------------------------------------------------------------------
# Injected JavaScript for Dynamic Interception & Real-Time Updates
# ---------------------------------------------------------------------------
# This script intercepts history changes, fetch, XHR, and anchor clicks,
# and uses MutationObserver to rewrite new elements. It also handles SVG
# attributes (including xlink:href) for icons and logos.
INJECTED_JS = """
<script>
(function() {
// Intercept history.pushState to route dynamic navigations.
const originalPushState = history.pushState;
history.pushState = function(state, title, url) {
if (url) {
const proxiedUrl = '/?url=' + encodeURIComponent(url);
return originalPushState.call(history, state, title, proxiedUrl);
}
return originalPushState.call(history, state, title, url);
};
// Intercept fetch() calls.
const originalFetch = window.fetch;
window.fetch = function(input, init) {
let url;
if (typeof input === 'string') {
url = input;
} else if (input && input.url) {
url = input.url;
} else {
return originalFetch(input, init);
}
const proxiedUrl = '/?url=' + encodeURIComponent(url);
if (typeof input === 'object') {
input = new Request(proxiedUrl, input);
} else {
input = proxiedUrl;
}
return originalFetch(input, init);
};
// Intercept XMLHttpRequest.open().
const originalOpen = XMLHttpRequest.prototype.open;
XMLHttpRequest.prototype.open = function(method, url) {
const proxiedUrl = '/?url=' + encodeURIComponent(url);
return originalOpen.apply(this, [method, proxiedUrl, true]);
};
// Intercept anchor clicks to ensure navigation goes through the proxy.
document.addEventListener('click', function(event) {
const target = event.target.closest('a');
if (target && target.href) {
if (target.getAttribute('data-no-proxy') || target.href.indexOf('/?url=') === 0) {
return;
}
event.preventDefault();
window.location.href = '/?url=' + encodeURIComponent(target.href);
}
});
// Use MutationObserver to catch and rewrite dynamically added elements.
const observer = new MutationObserver(function(mutations) {
mutations.forEach(function(mutation) {
mutation.addedNodes.forEach(function(node) {
if (node.nodeType === Node.ELEMENT_NODE) {
reProxyElement(node);
}
});
});
});
observer.observe(document.body, { childList: true, subtree: true });
// Rewrites URL attributes for an element and its children.
function reProxyElement(element) {
const urlAttrs = ['href', 'src', 'action', 'srcset', 'xlink:href'];
urlAttrs.forEach(function(attr) {
if (element.hasAttribute(attr)) {
const value = element.getAttribute(attr);
if (value && !value.startsWith('/?url=') &&
!value.startsWith('javascript:') && !value.startsWith('mailto:')) {
element.setAttribute(attr, '/?url=' + encodeURIComponent(value));
}
}
});
// Also rewrite inline style attribute.
if (element.hasAttribute('style')) {
let styleVal = element.getAttribute('style');
// Simple client-side rewriting: prepend proxy to URLs.
styleVal = styleVal.replace(/url\\((['"]?)(.*?)\\1\\)/g, function(match, quote, url) {
if (url.startsWith('/?url=') || url.startsWith('data:')) return match;
return "url(" + quote + "/?url=" + encodeURIComponent(url) + quote + ")";
});
element.setAttribute('style', styleVal);
}
Array.from(element.children).forEach(child => reProxyElement(child));
}
})();
</script>
"""
# ---------------------------------------------------------------------------
# Helper Function: Rewrite URLs for All Relevant Attributes in HTML
# ---------------------------------------------------------------------------
def rewrite_urls(soup, target_url, proxy_prefix="/?url="):
"""
Iterates over all elements in the parsed HTML and rewrites URL-like attributes.
Supports attributes such as: href, src, action, srcset, xlink:href.
Also rewrites inline style attributes that contain CSS url() references.
"""
url_attrs = ['href', 'src', 'action', 'srcset', 'xlink:href']
for element in soup.find_all(True):
# Rewrite attributes that hold URLs.
for attr in list(element.attrs):
if attr in url_attrs:
orig_value = element.get(attr)
if not orig_value or orig_value.startswith(proxy_prefix) or orig_value.startswith(("mailto:", "javascript:")):
continue
new_url = urllib.parse.urljoin(target_url, orig_value)
element[attr] = proxy_prefix + urllib.parse.quote(new_url)
# Rewrite inline style attributes containing CSS url() patterns.
if element.has_attr("style"):
original_style = element.get("style")
new_style = rewrite_css(original_style, target_url, proxy_prefix)
element["style"] = new_style
return soup
# ---------------------------------------------------------------------------
# Core Function: Fetch and Rewrite the Target Resource
# ---------------------------------------------------------------------------
async def fetch_and_rewrite(target_url: str) -> Response:
"""
Fetches the target URL and processes the response.
- If the content is HTML, removes conflicting CSP tags, inserts a <base> tag,
injects dynamic JavaScript, and rewrites URL attributes.
- If the content is CSS, rewrites any url() references so that the resources are proxied.
- Other content types are returned as-is.
"""
async with httpx.AsyncClient() as client:
resp = await client.get(target_url)
content_type = resp.headers.get("Content-Type", "")
# Process CSS files to rewrite url() references.
if "text/css" in content_type:
new_css = rewrite_css(resp.text, target_url)
return Response(content=new_css, media_type="text/css", status_code=resp.status_code)
# Process HTML content.
if "text/html" in content_type:
soup = BeautifulSoup(resp.text, "html.parser")
# Remove any Content Security Policy meta tags that might block our scripts.
for meta in soup.find_all("meta", attrs={"http-equiv": "Content-Security-Policy"}):
meta.decompose()
# Insert (or replace) a <base> tag to ensure relative URLs resolve properly.
parsed_target = urllib.parse.urlparse(target_url)
base_href = f"{parsed_target.scheme}://{parsed_target.netloc}"
if soup.head:
for base in soup.head.find_all("base"):
base.decompose()
base_tag = soup.new_tag("base", href=base_href)
soup.head.insert(0, base_tag)
else:
head_tag = soup.new_tag("head")
base_tag = soup.new_tag("base", href=base_href)
head_tag.insert(0, base_tag)
soup.insert(0, head_tag)
# Inject dynamic JavaScript for real-time interception.
if soup.body:
soup.body.insert(0, BeautifulSoup(INJECTED_JS, "html.parser"))
else:
soup.insert(0, BeautifulSoup(INJECTED_JS, "html.parser"))
# Rewrite URLs (including SVG icons, CSS backgrounds in style attributes, etc.).
soup = rewrite_urls(soup, target_url)
return Response(content=str(soup), media_type="text/html", status_code=resp.status_code)
# For other content types (JS, images, etc.) return the response directly.
return Response(content=resp.content, media_type=content_type, status_code=resp.status_code)
# ---------------------------------------------------------------------------
# Simple Proxy Endpoint: /proxy_full
# ---------------------------------------------------------------------------
@app.get("/proxy_full")
async def proxy_full(url: str):
"""
A simple proxy endpoint that fetches the given URL and rewrites its HTML/CSS so that all resource URLs route via the proxy.
"""
if not url:
raise HTTPException(status_code=400, detail="Missing 'url' query parameter")
async with httpx.AsyncClient() as client:
resp = await client.get(url)
content_type = resp.headers.get("Content-Type", "")
if "text/html" in content_type:
soup = BeautifulSoup(resp.text, "html.parser")
soup = rewrite_urls(soup, url)
return Response(content=str(soup), media_type="text/html")
elif "text/css" in content_type:
new_css = rewrite_css(resp.text, url)
return Response(content=new_css, media_type="text/css")
return Response(content=resp.content, media_type=content_type, status_code=resp.status_code)
# ---------------------------------------------------------------------------
# Catch-All Dynamic Proxy Endpoint
# ---------------------------------------------------------------------------
@app.get("/{full_path:path}")
async def catch_all(full_path: str, request: Request):
"""
Catch-all endpoint for dynamic proxying.
Determines the target URL via a query parameter or a stored cookie,
then processes the resource (HTML or CSS) to inject dynamic JS and rewrite URLs.
"""
query_params = dict(request.query_params)
if "url" in query_params:
target_url = query_params["url"]
else:
target_base = request.cookies.get("target_base")
if not target_base:
return Response("No target URL provided.", status_code=400)
qs = request.url.query
target_url = urllib.parse.urljoin(target_base, full_path)
if qs:
target_url += "?" + qs
response = await fetch_and_rewrite(target_url)
# Store the target's base URL in a cookie for subsequent requests.
parsed_target = urllib.parse.urlparse(target_url)
base_url = f"{parsed_target.scheme}://{parsed_target.netloc}"
response.set_cookie("target_base", base_url)
return response
# ---------------------------------------------------------------------------
# Run the Application on Port 7860
# ---------------------------------------------------------------------------
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)