import os import requests from flask import Flask, Response, request, stream_with_context # session از ایمپورت‌ها حذف شد from urllib.parse import urlparse, urljoin, urlunparse import re import random import logging import json app = Flask(__name__) # هیچ secret_key یا تنظیمات مربوط به سشن اضافه نشده است SECRET_NAME_FOR_TARGET_URL = "TARGET_HF_SPACE_URL" TARGET_URL_FROM_SECRET = os.environ.get(SECRET_NAME_FOR_TARGET_URL) REQUEST_TIMEOUT = 45 log = logging.getLogger('werkzeug') log.setLevel(logging.ERROR) app.logger.setLevel(logging.CRITICAL + 1) # این تنظیم از کد اصلی شما حفظ شده print(f"APP_STARTUP: TARGET_URL_FROM_SECRET: {'SET' if TARGET_URL_FROM_SECRET else 'NOT SET'}") COMMON_USER_AGENTS = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.5005.63 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.5 Safari/605.1.15", ] SIMPLE_TEXT_REPLACEMENTS = { "ttttttt": "ادیت", "Chat with Aya": "چت🤖", "Visualize with Aya": "عکس", "Speak with Aya": "", "Model:": "مدل:", "Developed by:": "توسعه‌دهنده:", "License:": "مجوز:", "Input Prompt": "فرمان ورودی", "Input Image": "تصویر ورودی", "Drop Image Here": "تصویر را اینجا رها کنید", "- or -": "- یا -", "Click to Upload": "برای آپلود کلیک کنید", "Ask anything in our 23 languages ...": "به هر یک از ۲۳ زبان سوال خود را بپرسید ...", "Connecting Our World": "اتصال دنیای ما", # "متن اصلی دیگری که می‌خواهید جایگزین شود": "متن جدید شما", } GRADIO_HIDE_CSS_STANDARD = """ """ def replace_text_server_side(html_content_string, replacements): processed_content = html_content_string if replacements: for old_text, new_text in replacements.items(): if isinstance(old_text, str) and isinstance(new_text, str): try: # استفاده از re.sub برای جایگزینی با نادیده گرفتن بزرگی/کوچکی حروف # re.escape برای مدیریت کاراکترهای خاص در old_text processed_content = re.sub(re.escape(old_text), new_text, processed_content, flags=re.IGNORECASE) except re.error as e: app.logger.error(f"Regex error during server-side text replacement for '{old_text}': {e}") else: app.logger.warning(f"Invalid type for replacement: key='{old_text}', value='{new_text}'") return processed_content def process_html_content_server_side_minimal(html_string): processed_html = html_string patterns = [ re.compile(r'<[^>]*(?:class|id)\s*=\s*["\'][^"\']*(?:footer|meta-footer|built-with|gradio-footer)[^"\']*["\'][^>]*>.*?Built with Gradio.*?<\/[^>]+>', re.I | re.S), re.compile(r']*>.*?Built with Gradio.*?', re.I | re.S), re.compile(r']*href="https?://(?:www\.)?gradio\.app[^"]*"[^>]*>.*?Built with Gradio.*?', re.I | re.S), re.compile(r']*>.*?Built with Gradio.*?gradio\.app.*?', re.I | re.S), re.compile(r']*href="https?://(?:www\.)?gradio\.app[^"]*"[^>]*>.*?', re.I | re.S), re.compile(r'<[^>]*(?:class|id)\s*=\s*["\'][^"\']*(?:settings|options)[^"\']*["\'][^>]*>.*?Settings.*?<\/[^>]+>', re.I | re.S), re.compile(r'<(?:button|a)\b[^>]*>.*?Settings.*?', re.I | re.S), ] for p in patterns: processed_html = p.sub('', processed_html) return processed_html @app.route('/', defaults={'path': ''}, methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH', 'OPTIONS', 'HEAD']) @app.route('/', methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH', 'OPTIONS', 'HEAD']) def proxy_request_handler(path): # --- START: بخش اضافه شده برای بررسی Referer --- allowed_referer_domains = ["aisada.ir", "www.aisada.ir"] # اگر درخواست از نوع OPTIONS (معمولا برای CORS preflight) است، اجازه عبور می‌دهیم. # این درخواست‌ها نباید Referer داشته باشند و مسدود کردن آنها مشکل ایجاد می‌کند. if request.method == 'OPTIONS': pass # اجازه می‌دهیم درخواست به بخش اصلی کد پراکسی برسد # فقط درخواست‌های GET که به نظر می‌رسد برای بارگذاری یک صفحه HTML هستند، Referer را بررسی می‌کنیم. # مسیر خالی (روت)، یا مسیری که به / ختم می‌شود، یا مسیری که به .html یا .htm ختم می‌شود. elif request.method == 'GET' and \ (path == '' or \ path.endswith('/') or \ path.endswith('.html') or \ path.endswith('.htm')): referer_header = request.headers.get("Referer") is_allowed = False if referer_header: try: parsed_referer = urlparse(referer_header) referer_host = parsed_referer.hostname if referer_host and referer_host.lower() in allowed_referer_domains: is_allowed = True except ValueError: # اگر Referer قابل تجزیه نباشد، is_allowed همچنان False خواهد بود pass if not is_allowed: # با توجه به تنظیمات لاگ شما، این پیام‌ها در کنسول دیده نخواهند شد مگر اینکه سطح لاگ را تغییر دهید. # app.logger.warning(f"Access Denied (403) for path '{path}'. Referer: '{referer_header}'. IP: {request.remote_addr}") return "Error This app is broken. .", 403 # برای سایر انواع درخواست (POST, PUT, ...) یا درخواست‌های GET که به نظر نمی‌رسد # صفحه HTML باشند (مانند CSS, JS, تصاویر)، فعلا از بررسی Referer صرف‌نظر می‌کنیم. # فرض بر این است که اگر صفحه اصلی با Referer معتبر بارگذاری شده، درخواست‌های داخلی نیز مجاز هستند. # --- END: بخش اضافه شده برای بررسی Referer --- if not TARGET_URL_FROM_SECRET: return "Proxy Configuration Error: Target URL secret is not configured.", 500 base_target_url = TARGET_URL_FROM_SECRET.rstrip('/') target_full_url = urljoin(base_target_url + "/", path) if request.query_string: target_full_url += "?" + request.query_string.decode() try: parsed_target_url_for_host = urlparse(TARGET_URL_FROM_SECRET) target_hostname = parsed_target_url_for_host.hostname user_agent_to_send = random.choice(COMMON_USER_AGENTS) excluded_incoming_headers = [ 'host', 'cookie', 'connection', 'upgrade-insecure-requests', 'if-none-match', 'if-modified-since', 'referer', 'x-hf-space-host', 'x-forwarded-for', 'x-forwarded-proto', 'x-forwarded-host', 'accept-encoding' ] forward_headers = {key: value for key, value in request.headers.items() if key.lower() not in excluded_incoming_headers} forward_headers['User-Agent'] = user_agent_to_send forward_headers['Host'] = target_hostname if request.environ.get('HTTP_X_FORWARDED_FOR'): forward_headers['X-Forwarded-For'] = request.environ.get('HTTP_X_FORWARDED_FOR') + ', ' + request.remote_addr else: forward_headers['X-Forwarded-For'] = request.remote_addr forward_headers['X-Forwarded-Proto'] = request.scheme forward_headers['X-Forwarded-Host'] = request.host request_body = request.get_data() if request.method not in ['GET', 'HEAD', 'OPTIONS'] else None with requests.Session() as s: target_response = s.request( method=request.method, url=target_full_url, headers=forward_headers, data=request_body, stream=True, timeout=REQUEST_TIMEOUT, allow_redirects=False ) if 300 <= target_response.status_code < 400 and 'Location' in target_response.headers: location_header_val = target_response.headers['Location'] parsed_location = urlparse(location_header_val) rewritten_location = location_header_val if parsed_location.scheme and parsed_location.netloc == target_hostname: rewritten_location = parsed_location.path if parsed_location.query: rewritten_location += "?" + parsed_location.query if not rewritten_location.startswith('/') and rewritten_location and not urlparse(rewritten_location).scheme: rewritten_location = '/' + rewritten_location final_redirect_headers = {'Location': rewritten_location if rewritten_location else "/"} for k, v in target_response.headers.items(): if k.lower() in ['set-cookie', 'cache-control', 'expires', 'pragma', 'vary']: final_redirect_headers[k] = v return Response(response=None, status=target_response.status_code, headers=final_redirect_headers) if target_response.status_code >= 400: error_headers = {k:v for k,v in target_response.headers.items() if k.lower() not in ['content-encoding', 'transfer-encoding', 'content-length', 'link']} return Response(target_response.content, status=target_response.status_code, headers=error_headers) content_type = target_response.headers.get('Content-Type', 'application/octet-stream').lower() excluded_resp_headers = [ 'content-encoding', 'transfer-encoding', 'connection', 'keep-alive', 'x-frame-options', 'strict-transport-security', 'public-key-pins', 'content-length', 'server', 'x-powered-by', 'date', 'alt-svc', 'link' ] final_response_headers = {k: v for k, v in target_response.headers.items() if k.lower() not in excluded_resp_headers} final_response_headers['Content-Type'] = content_type final_response_headers['Cache-Control'] = 'no-store, no-cache, must-revalidate, max-age=0' final_response_headers['Pragma'] = 'no-cache' final_response_headers['Expires'] = '0' final_response_headers['Referrer-Policy'] = 'strict-origin-when-cross-origin' def generate_response_content_stream_inner(): is_html = 'text/html' in content_type css_injected = False js_injected = False html_buffer = b'' observer_script = "" if is_html and SIMPLE_TEXT_REPLACEMENTS: replacements_json_str = json.dumps(SIMPLE_TEXT_REPLACEMENTS) js_code_template = """""" observer_script = js_code_template.replace("{json_payload}", replacements_json_str) char_enc_match = None if is_html: char_enc_match = re.search(r'charset=([\w-]+)', content_type, re.I) proc_enc = char_enc_match.group(1) if char_enc_match else 'utf-8' try: for chunk in target_response.iter_content(chunk_size=8192, decode_unicode=False): if not chunk: continue if is_html: html_buffer += chunk if len(html_buffer) < 4096 and not (b"" in html_buffer or b"" in html_buffer or b"" in html_buffer): continue try: str_buffer = html_buffer.decode(proc_enc, 'replace') except UnicodeDecodeError: str_buffer = html_buffer.decode('latin-1', 'replace') processed_str = str_buffer if SIMPLE_TEXT_REPLACEMENTS: processed_str = replace_text_server_side(processed_str, SIMPLE_TEXT_REPLACEMENTS) if not css_injected: head_m = re.search(r'(]*>)', processed_str, re.I) if head_m: processed_str = processed_str[:head_m.end(1)] + GRADIO_HIDE_CSS_STANDARD + processed_str[head_m.end(1):] css_injected = True processed_str = process_html_content_server_side_minimal(processed_str) if observer_script and not js_injected: body_end_m = re.search(r'()', processed_str, re.I) if body_end_m: processed_str = processed_str[:body_end_m.start(1)] + observer_script + processed_str[body_end_m.start(1):] js_injected = True yield processed_str.encode(proc_enc, 'replace') html_buffer = b'' else: yield chunk if html_buffer and is_html: try: str_buffer = html_buffer.decode(proc_enc, 'replace') except UnicodeDecodeError: str_buffer = html_buffer.decode('latin-1', 'replace') final_proc_str = str_buffer if SIMPLE_TEXT_REPLACEMENTS: final_proc_str = replace_text_server_side(final_proc_str, SIMPLE_TEXT_REPLACEMENTS) if not css_injected: head_m = re.search(r'(]*>)', final_proc_str, re.I) payload = GRADIO_HIDE_CSS_STANDARD if head_m: final_proc_str = final_proc_str[:head_m.end(1)] + payload + final_proc_str[head_m.end(1):] else: final_proc_str = payload + final_proc_str final_proc_str = process_html_content_server_side_minimal(final_proc_str) if observer_script and not js_injected and not re.search(r'