| from flask import Flask, request, jsonify |
| import requests |
| import re |
| import os |
| import base64 |
| import logging |
| from typing import Dict, Optional |
|
|
| app = Flask(__name__) |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| class MirrorScraper: |
| def __init__(self): |
| self.base_url = "https://cdn.youtubeunblocked.live" |
| self.target_url = "https://adsha.re/4242" |
| self.headers = { |
| "User-Agent": "Mozilla/5.0 (Linux; Android 14; SM-X205 Build/UP1A.231005.007) AppleWebKit/537.36", |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", |
| } |
| |
| def extract_csrf_from_homepage(self) -> Optional[str]: |
| """Extract fresh CSRF token from the homepage""" |
| homepage_url = f"{self.base_url}/" |
| |
| logger.info("π Extracting CSRF token from homepage...") |
| try: |
| response = requests.get(homepage_url, headers=self.headers, timeout=10) |
| |
| if response.status_code == 200: |
| csrf_match = re.search(r'name="csrf"[^>]*value="([^"]+)"', response.text) |
| if csrf_match: |
| csrf_token = csrf_match.group(1) |
| logger.info("β
CSRF token extracted from homepage") |
| return csrf_token |
| else: |
| logger.error("β CSRF token not found in homepage") |
| return None |
| else: |
| logger.error(f"β Failed to get homepage. Status: {response.status_code}") |
| return None |
| except Exception as e: |
| logger.error(f"β Error extracting CSRF: {str(e)}") |
| return None |
|
|
| def get_server_data_and_csrf(self) -> tuple: |
| """POST to servers page and extract server ID and new CSRF token""" |
| servers_url = f"{self.base_url}/servers" |
| |
| |
| homepage_csrf = self.extract_csrf_from_homepage() |
| if not homepage_csrf: |
| return None, None |
| |
| data = { |
| "url": self.target_url, |
| "csrf": homepage_csrf |
| } |
| |
| headers = { |
| **self.headers, |
| "Content-Type": "application/x-www-form-urlencoded", |
| "Origin": self.base_url, |
| "Referer": self.base_url + "/", |
| } |
| |
| logger.info("π Getting server data and new CSRF token...") |
| try: |
| response = requests.post(servers_url, data=data, headers=headers, timeout=10) |
| |
| if response.status_code == 200: |
| |
| csrf_match = re.search(r'data-csrf=""([^&]+)""', response.text) |
| if csrf_match: |
| new_csrf = csrf_match.group(1) |
| logger.info("β
New CSRF token extracted") |
| else: |
| logger.warning("β οΈ New CSRF token not found, using fallback") |
| new_csrf = "RXFOUlV0ZjFxdHg5L1VGd1ZrazRKVFRDUGdNQ1ZhUG01N2l3ZitXTFFMUU81Z2tUb1JXY3JoOXJveWtaSHA2clNBRHZhSFNIN1N0Rk8rak13c3dJNFBEcEcwa3VjYndCMVo5emEyd0ZUQ2RzK0U3RCtNR3dYcGVVSkxTamx6bzl4M2orNkN0Q2JwLzZHU0F6NHkwSTlRPT0=" |
| |
| |
| server_id = "161" |
| |
| return server_id, new_csrf |
| else: |
| logger.error(f"β Failed to get servers page. Status: {response.status_code}") |
| return None, None |
| except Exception as e: |
| logger.error(f"β Error getting server data: {str(e)}") |
| return None, None |
|
|
| def get_complete_url(self) -> Dict[str, str]: |
| """Get complete URL using the correct flow""" |
| try: |
| |
| server_id, new_csrf = self.get_server_data_and_csrf() |
| if not server_id or not new_csrf: |
| return {"success": False, "error": "Cannot proceed without server data"} |
| |
| |
| url = f"{self.base_url}/requests?fso=" |
| |
| data = { |
| "url": self.target_url, |
| "proxyServerId": server_id, |
| "csrf": new_csrf, |
| "demo": "0", |
| "frontOrigin": self.base_url |
| } |
| |
| headers = { |
| **self.headers, |
| "Content-Type": "application/x-www-form-urlencoded", |
| "Origin": self.base_url, |
| "Referer": self.base_url + "/servers", |
| } |
| |
| logger.info("π Getting complete URL from mirror site...") |
| response = requests.post(url, data=data, headers=headers, allow_redirects=False, timeout=10) |
| |
| logger.info(f"π Status: {response.status_code}") |
| |
| if response.status_code == 302 and 'location' in response.headers: |
| complete_url = response.headers['location'] |
| logger.info("β
Complete URL obtained") |
| |
| return { |
| "success": True, |
| "url": complete_url, |
| "message": "URL successfully generated" |
| } |
| else: |
| error_msg = f"Failed. Status: {response.status_code}" |
| if 'location' not in response.headers: |
| error_msg += " - No location header in response" |
| logger.error(f"β {error_msg}") |
| return {"success": False, "error": error_msg} |
| |
| except Exception as e: |
| logger.error(f"β Exception occurred: {str(e)}") |
| return {"success": False, "error": f"Internal server error: {str(e)}"} |
|
|
| |
| scraper = MirrorScraper() |
|
|
| @app.route('/') |
| def home(): |
| return jsonify({ |
| "message": "Mirror URL Generator API - huijio/urltemplate", |
| "status": "active", |
| "version": "1.0", |
| "endpoints": { |
| "/generate": "GET - Generate a new URL", |
| "/health": "GET - Health check", |
| "/docs": "GET - API documentation" |
| }, |
| "example_usage": "GET https://huijio-urltemplate.hf.space/generate" |
| }) |
|
|
| @app.route('/health') |
| def health(): |
| return jsonify({"status": "healthy", "service": "mirror-url-generator"}) |
|
|
| @app.route('/docs') |
| def docs(): |
| return jsonify({ |
| "api_documentation": { |
| "base_url": "https://huijio-urltemplate.hf.space", |
| "endpoints": { |
| "/generate": { |
| "method": "GET", |
| "description": "Generate a new proxy URL", |
| "response": { |
| "success": "boolean", |
| "url": "string (the generated URL)", |
| "message": "string" |
| } |
| }, |
| "/health": { |
| "method": "GET", |
| "description": "Health check endpoint", |
| "response": {"status": "string"} |
| } |
| } |
| } |
| }) |
|
|
| @app.route('/generate', methods=['GET']) |
| def generate_url(): |
| """API endpoint to generate the complete URL""" |
| try: |
| result = scraper.get_complete_url() |
| return jsonify(result) |
| except Exception as e: |
| return jsonify({ |
| "success": False, |
| "error": f"Internal server error: {str(e)}" |
| }), 500 |
|
|
| if __name__ == '__main__': |
| app.run(host='0.0.0.0', port=7860, debug=False) |