import os
import requests
from flask import Flask, Response, request, stream_with_context # session از ایمپورتها حذف شد
from urllib.parse import urlparse, urljoin, urlunparse
import re
import random
import logging
import json
app = Flask(__name__)
# هیچ secret_key یا تنظیمات مربوط به سشن اضافه نشده است
SECRET_NAME_FOR_TARGET_URL = "TARGET_HF_SPACE_URL"
TARGET_URL_FROM_SECRET = os.environ.get(SECRET_NAME_FOR_TARGET_URL)
REQUEST_TIMEOUT = 45
log = logging.getLogger('werkzeug')
log.setLevel(logging.ERROR)
app.logger.setLevel(logging.CRITICAL + 1) # این تنظیم از کد اصلی شما حفظ شده
print(f"APP_STARTUP: TARGET_URL_FROM_SECRET: {'SET' if TARGET_URL_FROM_SECRET else 'NOT SET'}")
COMMON_USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.5005.63 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.5 Safari/605.1.15",
]
SIMPLE_TEXT_REPLACEMENTS = {
"ttttttt": "ادیت",
"Chat with Aya": "چت🤖",
"Visualize with Aya": "عکس",
"Speak with Aya": "",
"Model:": "مدل:",
"Developed by:": "توسعهدهنده:",
"License:": "مجوز:",
"Input Prompt": "فرمان ورودی",
"Input Image": "تصویر ورودی",
"Drop Image Here": "تصویر را اینجا رها کنید",
"- or -": "- یا -",
"Click to Upload": "برای آپلود کلیک کنید",
"Ask anything in our 23 languages ...": "به هر یک از ۲۳ زبان سوال خود را بپرسید ...",
"Connecting Our World": "اتصال دنیای ما",
# "متن اصلی دیگری که میخواهید جایگزین شود": "متن جدید شما",
}
GRADIO_HIDE_CSS_STANDARD = """
"""
def replace_text_server_side(html_content_string, replacements):
processed_content = html_content_string
if replacements:
for old_text, new_text in replacements.items():
if isinstance(old_text, str) and isinstance(new_text, str):
try:
# استفاده از re.sub برای جایگزینی با نادیده گرفتن بزرگی/کوچکی حروف
# re.escape برای مدیریت کاراکترهای خاص در old_text
processed_content = re.sub(re.escape(old_text), new_text, processed_content, flags=re.IGNORECASE)
except re.error as e:
app.logger.error(f"Regex error during server-side text replacement for '{old_text}': {e}")
else:
app.logger.warning(f"Invalid type for replacement: key='{old_text}', value='{new_text}'")
return processed_content
def process_html_content_server_side_minimal(html_string):
processed_html = html_string
patterns = [
re.compile(r'<[^>]*(?:class|id)\s*=\s*["\'][^"\']*(?:footer|meta-footer|built-with|gradio-footer)[^"\']*["\'][^>]*>.*?Built with Gradio.*?<\/[^>]+>', re.I | re.S),
re.compile(r'', re.I | re.S),
re.compile(r']*href="https?://(?:www\.)?gradio\.app[^"]*"[^>]*>.*?Built with Gradio.*?', re.I | re.S),
re.compile(r'
]*>.*?Built with Gradio.*?gradio\.app.*?
', re.I | re.S),
re.compile(r']*href="https?://(?:www\.)?gradio\.app[^"]*"[^>]*>.*?', re.I | re.S),
re.compile(r'<[^>]*(?:class|id)\s*=\s*["\'][^"\']*(?:settings|options)[^"\']*["\'][^>]*>.*?Settings.*?<\/[^>]+>', re.I | re.S),
re.compile(r'<(?:button|a)\b[^>]*>.*?Settings.*?(?:button|a)>', re.I | re.S),
]
for p in patterns: processed_html = p.sub('', processed_html)
return processed_html
@app.route('/', defaults={'path': ''}, methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH', 'OPTIONS', 'HEAD'])
@app.route('/', methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH', 'OPTIONS', 'HEAD'])
def proxy_request_handler(path):
# --- START: بخش اضافه شده برای بررسی Referer ---
allowed_referer_domains = ["aisada.ir", "www.aisada.ir"]
# اگر درخواست از نوع OPTIONS (معمولا برای CORS preflight) است، اجازه عبور میدهیم.
# این درخواستها نباید Referer داشته باشند و مسدود کردن آنها مشکل ایجاد میکند.
if request.method == 'OPTIONS':
pass # اجازه میدهیم درخواست به بخش اصلی کد پراکسی برسد
# فقط درخواستهای GET که به نظر میرسد برای بارگذاری یک صفحه HTML هستند، Referer را بررسی میکنیم.
# مسیر خالی (روت)، یا مسیری که به / ختم میشود، یا مسیری که به .html یا .htm ختم میشود.
elif request.method == 'GET' and \
(path == '' or \
path.endswith('/') or \
path.endswith('.html') or \
path.endswith('.htm')):
referer_header = request.headers.get("Referer")
is_allowed = False
if referer_header:
try:
parsed_referer = urlparse(referer_header)
referer_host = parsed_referer.hostname
if referer_host and referer_host.lower() in allowed_referer_domains:
is_allowed = True
except ValueError:
# اگر Referer قابل تجزیه نباشد، is_allowed همچنان False خواهد بود
pass
if not is_allowed:
# با توجه به تنظیمات لاگ شما، این پیامها در کنسول دیده نخواهند شد مگر اینکه سطح لاگ را تغییر دهید.
# app.logger.warning(f"Access Denied (403) for path '{path}'. Referer: '{referer_header}'. IP: {request.remote_addr}")
return "Error This app is broken. .", 403
# برای سایر انواع درخواست (POST, PUT, ...) یا درخواستهای GET که به نظر نمیرسد
# صفحه HTML باشند (مانند CSS, JS, تصاویر)، فعلا از بررسی Referer صرفنظر میکنیم.
# فرض بر این است که اگر صفحه اصلی با Referer معتبر بارگذاری شده، درخواستهای داخلی نیز مجاز هستند.
# --- END: بخش اضافه شده برای بررسی Referer ---
if not TARGET_URL_FROM_SECRET:
return "Proxy Configuration Error: Target URL secret is not configured.", 500
base_target_url = TARGET_URL_FROM_SECRET.rstrip('/')
target_full_url = urljoin(base_target_url + "/", path)
if request.query_string: target_full_url += "?" + request.query_string.decode()
try:
parsed_target_url_for_host = urlparse(TARGET_URL_FROM_SECRET)
target_hostname = parsed_target_url_for_host.hostname
user_agent_to_send = random.choice(COMMON_USER_AGENTS)
excluded_incoming_headers = [
'host', 'cookie', 'connection', 'upgrade-insecure-requests',
'if-none-match', 'if-modified-since', 'referer', 'x-hf-space-host',
'x-forwarded-for', 'x-forwarded-proto', 'x-forwarded-host', 'accept-encoding'
]
forward_headers = {key: value for key, value in request.headers.items() if key.lower() not in excluded_incoming_headers}
forward_headers['User-Agent'] = user_agent_to_send
forward_headers['Host'] = target_hostname
if request.environ.get('HTTP_X_FORWARDED_FOR'): forward_headers['X-Forwarded-For'] = request.environ.get('HTTP_X_FORWARDED_FOR') + ', ' + request.remote_addr
else: forward_headers['X-Forwarded-For'] = request.remote_addr
forward_headers['X-Forwarded-Proto'] = request.scheme
forward_headers['X-Forwarded-Host'] = request.host
request_body = request.get_data() if request.method not in ['GET', 'HEAD', 'OPTIONS'] else None
with requests.Session() as s:
target_response = s.request(
method=request.method, url=target_full_url, headers=forward_headers,
data=request_body, stream=True, timeout=REQUEST_TIMEOUT, allow_redirects=False
)
if 300 <= target_response.status_code < 400 and 'Location' in target_response.headers:
location_header_val = target_response.headers['Location']
parsed_location = urlparse(location_header_val)
rewritten_location = location_header_val
if parsed_location.scheme and parsed_location.netloc == target_hostname:
rewritten_location = parsed_location.path
if parsed_location.query: rewritten_location += "?" + parsed_location.query
if not rewritten_location.startswith('/') and rewritten_location and not urlparse(rewritten_location).scheme:
rewritten_location = '/' + rewritten_location
final_redirect_headers = {'Location': rewritten_location if rewritten_location else "/"}
for k, v in target_response.headers.items():
if k.lower() in ['set-cookie', 'cache-control', 'expires', 'pragma', 'vary']: final_redirect_headers[k] = v
return Response(response=None, status=target_response.status_code, headers=final_redirect_headers)
if target_response.status_code >= 400:
error_headers = {k:v for k,v in target_response.headers.items() if k.lower() not in ['content-encoding', 'transfer-encoding', 'content-length', 'link']}
return Response(target_response.content, status=target_response.status_code, headers=error_headers)
content_type = target_response.headers.get('Content-Type', 'application/octet-stream').lower()
excluded_resp_headers = [
'content-encoding', 'transfer-encoding', 'connection', 'keep-alive',
'x-frame-options', 'strict-transport-security', 'public-key-pins',
'content-length', 'server', 'x-powered-by', 'date', 'alt-svc', 'link'
]
final_response_headers = {k: v for k, v in target_response.headers.items() if k.lower() not in excluded_resp_headers}
final_response_headers['Content-Type'] = content_type
final_response_headers['Cache-Control'] = 'no-store, no-cache, must-revalidate, max-age=0'
final_response_headers['Pragma'] = 'no-cache'
final_response_headers['Expires'] = '0'
final_response_headers['Referrer-Policy'] = 'strict-origin-when-cross-origin'
def generate_response_content_stream_inner():
is_html = 'text/html' in content_type
css_injected = False
js_injected = False
html_buffer = b''
observer_script = ""
if is_html and SIMPLE_TEXT_REPLACEMENTS:
replacements_json_str = json.dumps(SIMPLE_TEXT_REPLACEMENTS)
js_code_template = """"""
observer_script = js_code_template.replace("{json_payload}", replacements_json_str)
char_enc_match = None
if is_html:
char_enc_match = re.search(r'charset=([\w-]+)', content_type, re.I)
proc_enc = char_enc_match.group(1) if char_enc_match else 'utf-8'
try:
for chunk in target_response.iter_content(chunk_size=8192, decode_unicode=False):
if not chunk: continue
if is_html:
html_buffer += chunk
if len(html_buffer) < 4096 and not (b"" in html_buffer or b"