|
|
import requests |
|
|
import random |
|
|
import time |
|
|
import threading |
|
|
from flask import Flask, request, Response, jsonify |
|
|
|
|
|
PROXY_LIST_URL = "https://proxies.typegpt.net/ips.txt" |
|
|
proxies_cache = [] |
|
|
last_refresh = 0 |
|
|
|
|
|
app = Flask(__name__) |
|
|
|
|
|
def fetch_proxies(): |
|
|
try: |
|
|
resp = requests.get(PROXY_LIST_URL, timeout=10) |
|
|
resp.raise_for_status() |
|
|
proxies = [line.strip() for line in resp.text.splitlines() if line.strip()] |
|
|
return proxies |
|
|
except Exception as e: |
|
|
print(f"[ERROR] Failed to fetch proxies: {e}") |
|
|
return [] |
|
|
|
|
|
def get_random_proxy(proxies): |
|
|
if not proxies: |
|
|
return None |
|
|
return random.choice(proxies) |
|
|
|
|
|
def refresh_proxies_loop(): |
|
|
global proxies_cache, last_refresh |
|
|
while True: |
|
|
if time.time() - last_refresh > 300 or not proxies_cache: |
|
|
proxies_cache = fetch_proxies() |
|
|
last_refresh = time.time() |
|
|
print(f"[INFO] Refreshed {len(proxies_cache)} proxies.") |
|
|
time.sleep(60) |
|
|
|
|
|
@app.route("/health", methods=["GET"]) |
|
|
def health(): |
|
|
return "Healthy", 200 |
|
|
|
|
|
@app.route("/<path:target_url>", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"]) |
|
|
def proxy_request(target_url): |
|
|
if not target_url.startswith("http"): |
|
|
target_url = "http://" + target_url |
|
|
|
|
|
proxy = get_random_proxy(proxies_cache) |
|
|
if not proxy: |
|
|
return jsonify({"error": "No proxies available"}), 500 |
|
|
|
|
|
proxies = {"http": proxy, "https": proxy} |
|
|
|
|
|
try: |
|
|
print(f"[INFO] Forwarding {request.method} to {target_url} via {proxy}") |
|
|
|
|
|
|
|
|
resp = requests.request( |
|
|
method=request.method, |
|
|
url=target_url, |
|
|
headers={k: v for k, v in request.headers if k.lower() != "host"}, |
|
|
data=request.get_data(), |
|
|
cookies=request.cookies, |
|
|
params=request.args, |
|
|
proxies=proxies, |
|
|
timeout=30, |
|
|
allow_redirects=False |
|
|
) |
|
|
|
|
|
|
|
|
headers = dict(resp.headers) |
|
|
headers["X-Proxy-Used"] = proxy |
|
|
|
|
|
return Response( |
|
|
resp.content, |
|
|
status=resp.status_code, |
|
|
headers=headers, |
|
|
) |
|
|
except Exception as e: |
|
|
return jsonify({"error": "Proxy failed", "proxy": proxy, "details": str(e)}), 502 |
|
|
|
|
|
def main(): |
|
|
t = threading.Thread(target=refresh_proxies_loop, daemon=True) |
|
|
t.start() |
|
|
app.run(host="0.0.0.0", port=5000) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|