| import os |
| import sys |
| import asyncio |
| import argparse |
| import hashlib |
| from typing import Optional |
| from fastapi import FastAPI, Request, HTTPException |
| from fastapi.responses import Response |
| from fastapi.staticfiles import StaticFiles |
| import additions.saves as saves |
| from additions.auth import BasicAuthMiddleware |
| from additions.cache import proxy_and_cache, get_local_file |
| from additions.packed import init_packed_archive, get_packed_file, is_initialized as packed_is_initialized |
|
|
| |
| sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'utils')) |
|
|
| parser = argparse.ArgumentParser() |
| parser.add_argument( |
| "--port", |
| type=int, |
| default=int(os.environ.get("PORT", 7860)) |
| ) |
| parser.add_argument("--custom_saves", action="store_true") |
| parser.add_argument("--login", type=str) |
| parser.add_argument("--password", type=str) |
| parser.add_argument("--vcsky_local", type=str, nargs='?', const='vcsky', default=None, |
| help="Serve vcsky from local directory instead of proxy. Optionally specify path (default: vcsky/)") |
| parser.add_argument("--vcbr_local", type=str, nargs='?', const='vcbr', default=None, |
| help="Serve vcbr from local directory instead of proxy. Optionally specify path (default: vcbr/)") |
| parser.add_argument("--vcsky_url", type=str, default="https://cdn.dos.zone/vcsky/", help="Custom vcsky proxy URL") |
| parser.add_argument("--vcbr_url", type=str, default="https://br.cdn.dos.zone/vcsky/", help="Custom vcbr proxy URL") |
| parser.add_argument("--vcsky_cache", action="store_true", help="Cache vcsky files locally. If files are not found in the local directory, they will be downloaded from the specified URL and saved to the local directory.") |
| parser.add_argument("--vcbr_cache", action="store_true", help="Cache vcbr files locally. If files are not found in the local directory, they will be downloaded from the specified URL and saved to the local directory.") |
| parser.add_argument("--packed", type=str, nargs='?', const='revcdos.bin', default=None, |
| help="Serve vcsky/ and vcbr/ from packed archive. Can be a local file path or URL. " |
| "If URL, downloads to local file if not present. If no value specified, uses 'revcdos.bin'. " |
| "Supports brotli passthrough.") |
| parser.add_argument("--unpacked", type=str, default=None, |
| help="Unpack archive to local folders and serve from there. Can be a local .bin file or URL. " |
| "Unpacks to unpacked/{md5_hash}/ and sets vcsky_local/vcbr_local automatically. " |
| "If already unpacked, uses existing files without re-unpacking. " |
| "If URL, streams and unpacks during download using downloader_brotli.") |
| parser.add_argument("--pack", type=str, default=None, |
| help="Pack a folder to {hash}.bin archive. Can be a folder path or MD5 hash from unpacked/. " |
| "Packs all subfolders (vcsky/, vcbr/, etc.) into a single archive. " |
| "After packing, uses the archive with --packed mode to serve files.") |
| args = parser.parse_args() |
|
|
|
|
| def _md5_hash(text: str) -> str: |
| """Get MD5 hash of text.""" |
| return hashlib.md5(text.encode()).hexdigest() |
|
|
|
|
| def _is_url(path: str) -> bool: |
| """Check if path is a URL.""" |
| return path.startswith("http://") or path.startswith("https://") |
|
|
|
|
| def _is_md5_hash(text: str) -> bool: |
| """Check if text is a valid MD5 hash (32 hex characters).""" |
| if len(text) != 32: |
| return False |
| try: |
| int(text, 16) |
| return True |
| except ValueError: |
| return False |
|
|
|
|
| def _get_unpacked_dir(source: str) -> str: |
| """ |
| Get unpacked directory path for a source. |
| |
| If source IS a valid MD5 hash (32 hex chars), uses it directly. |
| Otherwise computes MD5 hash from the source string. |
| """ |
| |
| if _is_md5_hash(source): |
| return os.path.join("unpacked", source.lower()) |
| |
| |
| source_hash = _md5_hash(source) |
| return os.path.join("unpacked", source_hash) |
|
|
|
|
| def _check_unpacked_exists(unpacked_dir: str) -> bool: |
| """Check if unpacked directory exists and has content.""" |
| if not os.path.isdir(unpacked_dir): |
| return False |
| |
| |
| for subdir in ["vcsky", "vcbr"]: |
| subdir_path = os.path.join(unpacked_dir, subdir) |
| if os.path.isdir(subdir_path): |
| |
| for root, dirs, files in os.walk(subdir_path): |
| if files: |
| return True |
| |
| return False |
|
|
|
|
| async def _unpack_from_url(url: str, output_dir: str) -> bool: |
| """ |
| Unpack archive directly from URL using streaming download. |
| Uses downloader_brotli for efficient stream unpacking. |
| """ |
| try: |
| from utils.downloader_brotli import download_and_unpack_async |
| print(f"Streaming and unpacking from URL: {url}") |
| print(f"Output directory: {output_dir}") |
| await download_and_unpack_async(url, output_dir) |
| return True |
| except Exception as e: |
| print(f"Error unpacking from URL: {e}") |
| return False |
|
|
|
|
| async def _unpack_from_file(file_path: str, output_dir: str) -> bool: |
| """ |
| Unpack archive from local file. |
| Uses packer_brotli.unpack_file for unpacking. |
| """ |
| try: |
| from utils.packer_brotli import unpack_file |
| print(f"Unpacking local file: {file_path}") |
| print(f"Output directory: {output_dir}") |
| |
| |
| loop = asyncio.get_event_loop() |
| await loop.run_in_executor(None, unpack_file, file_path, output_dir) |
| return True |
| except Exception as e: |
| print(f"Error unpacking file: {e}") |
| return False |
|
|
|
|
| def pack_source(source: str) -> Optional[str]: |
| """ |
| Pack folder contents into {hash}.bin archive. |
| |
| If source is an MD5 hash, uses unpacked/{hash}/ folder. |
| Otherwise uses the folder path directly. |
| |
| Packs all subfolders (vcsky/, vcbr/, etc.) by: |
| 1. Creating archive from first subfolder using pack_folder() |
| 2. Adding remaining subfolders using add_folder() |
| |
| Args: |
| source: Folder path or MD5 hash |
| |
| Returns: |
| Output filename (e.g., "abc123...def.bin") or None if failed |
| """ |
| from utils.packer_brotli import pack_folder, add_folder |
| |
| |
| if _is_md5_hash(source): |
| folder_path = os.path.join("unpacked", source.lower()) |
| output_hash = source.lower() |
| else: |
| folder_path = source.rstrip('/\\') |
| output_hash = _md5_hash(os.path.basename(folder_path)) |
| |
| if not os.path.isdir(folder_path): |
| print(f"Error: Folder not found: {folder_path}") |
| return None |
| |
| output_file = f"{output_hash}.bin" |
| |
| |
| subdirs = sorted([d for d in os.listdir(folder_path) |
| if os.path.isdir(os.path.join(folder_path, d)) and not d.startswith('.')]) |
| |
| if not subdirs: |
| print(f"Error: No subdirectories found in {folder_path}") |
| return None |
| |
| print(f"Packing {len(subdirs)} subfolders from {folder_path} to {output_file}") |
| print(f"Subfolders: {', '.join(subdirs)}") |
| print() |
| |
| |
| first_subdir = os.path.join(folder_path, subdirs[0]) |
| print(f"=== Creating archive from {subdirs[0]} ===") |
| pack_folder(first_subdir, output_file) |
| |
| |
| for subdir_name in subdirs[1:]: |
| subdir_path = os.path.join(folder_path, subdir_name) |
| print(f"\n=== Adding {subdir_name} ===") |
| add_folder(output_file, subdir_path) |
| |
| final_size = os.path.getsize(output_file) |
| print(f"\n=== Packing complete ===") |
| print(f"Output: {output_file} ({final_size:,} bytes)") |
| |
| return output_file |
|
|
|
|
| async def setup_unpacked(source: str) -> tuple: |
| """ |
| Setup unpacked mode - unpack archive if needed and return local paths. |
| |
| Args: |
| source: Local file path, URL to packed archive, or MD5 hash of existing unpacked folder |
| |
| Returns: |
| Tuple of (vcsky_local_path, vcbr_local_path) or (None, None) if failed |
| """ |
| unpacked_dir = _get_unpacked_dir(source) |
| |
| |
| is_hash_only = _is_md5_hash(source) |
| |
| |
| if _check_unpacked_exists(unpacked_dir): |
| print(f"Using existing unpacked directory: {unpacked_dir}") |
| elif is_hash_only: |
| |
| print(f"Error: Unpacked folder not found for hash: {source}") |
| print(f"Expected directory: {unpacked_dir}") |
| return None, None |
| else: |
| |
| print(f"Unpacking to: {unpacked_dir}") |
| os.makedirs(unpacked_dir, exist_ok=True) |
| |
| if _is_url(source): |
| |
| success = await _unpack_from_url(source, unpacked_dir) |
| else: |
| |
| if not os.path.isfile(source): |
| print(f"Error: Archive file not found: {source}") |
| return None, None |
| success = await _unpack_from_file(source, unpacked_dir) |
| |
| if not success: |
| print(f"Failed to unpack from: {source}") |
| return None, None |
| |
| |
| vcsky_path = None |
| vcbr_path = None |
| |
| |
| vcsky_candidate = os.path.join(unpacked_dir, "vcsky") |
| if os.path.isdir(vcsky_candidate): |
| vcsky_path = vcsky_candidate |
| print(f" vcsky: {vcsky_path}") |
| |
| |
| vcbr_candidate = os.path.join(unpacked_dir, "vcbr") |
| if os.path.isdir(vcbr_candidate): |
| vcbr_path = vcbr_candidate |
| print(f" vcbr: {vcbr_path}") |
| |
| if not vcsky_path and not vcbr_path: |
| print(f"Warning: No vcsky or vcbr folders found in {unpacked_dir}") |
| |
| |
| for item in os.listdir(unpacked_dir): |
| item_path = os.path.join(unpacked_dir, item) |
| if os.path.isdir(item_path): |
| vcsky_sub = os.path.join(item_path, "vcsky") |
| vcbr_sub = os.path.join(item_path, "vcbr") |
| if os.path.isdir(vcsky_sub): |
| vcsky_path = vcsky_sub |
| if os.path.isdir(vcbr_sub): |
| vcbr_path = vcbr_sub |
| |
| return vcsky_path, vcbr_path |
|
|
|
|
| app = FastAPI() |
|
|
| if args.login and args.password: |
| app.add_middleware(BasicAuthMiddleware, username=args.login, password=args.password) |
|
|
| if args.custom_saves: |
| app.include_router(saves.router) |
|
|
| VCSKY_BASE_URL = args.vcsky_url |
| VCBR_BASE_URL = args.vcbr_url |
|
|
| |
| VCSKY_LOCAL_PATH = args.vcsky_local |
| VCBR_LOCAL_PATH = args.vcbr_local |
|
|
|
|
| def request_to_url(request: Request, path: str, base_url: str): |
| query_string = str(request.url.query) if request.url.query else "" |
| url = f"{base_url}{path}" |
| if query_string: |
| url = f"{url}?{query_string}" |
| return url |
|
|
|
|
| |
| @app.api_route("/vcsky/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "HEAD", "OPTIONS"]) |
| async def vc_sky_proxy(request: Request, path: str): |
| |
| if args.packed and packed_is_initialized(): |
| packed_path = f"vcsky/{path}" |
| if response := await get_packed_file(packed_path, request): |
| return response |
| |
| |
| if VCSKY_LOCAL_PATH: |
| local_path = os.path.join(VCSKY_LOCAL_PATH, path) |
| if response := get_local_file(local_path, request): |
| return response |
| |
| if args.vcsky_local is not None or args.unpacked: |
| raise HTTPException(status_code=404, detail="File not found") |
| |
| |
| url = request_to_url(request, path, VCSKY_BASE_URL) |
| if args.vcsky_cache: |
| cache_path = os.path.join("vcsky", path) |
| return await proxy_and_cache(request, url, cache_path) |
| return await proxy_and_cache(request, url, disable_cache=True) |
|
|
|
|
| |
| @app.api_route("/vcbr/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "HEAD", "OPTIONS"]) |
| async def vc_br_proxy(request: Request, path: str): |
| |
| if args.packed and packed_is_initialized(): |
| packed_path = f"vcbr/{path}" |
| if response := await get_packed_file(packed_path, request): |
| return response |
| |
| |
| if VCBR_LOCAL_PATH: |
| local_path = os.path.join(VCBR_LOCAL_PATH, path) |
| if response := get_local_file(local_path, request): |
| return response |
| |
| if args.vcbr_local is not None or args.unpacked: |
| raise HTTPException(status_code=404, detail="File not found") |
| |
| |
| url = request_to_url(request, path, VCBR_BASE_URL) |
| if args.vcbr_cache: |
| cache_path = os.path.join("vcbr", path) |
| return await proxy_and_cache(request, url, cache_path) |
| return await proxy_and_cache(request, url, disable_cache=True) |
|
|
|
|
| @app.get("/") |
| async def read_index(): |
| if os.path.exists("dist/index.html"): |
| with open("dist/index.html", "r", encoding="utf-8") as f: |
| content = f.read() |
| |
| |
| custom_saves_val = "1" if args.custom_saves else "0" |
| content = content.replace( |
| 'new URLSearchParams(window.location.search).get("custom_saves") === "1"', |
| f'"{custom_saves_val}" === "1"' |
| ) |
| |
| return Response(content, media_type="text/html", headers={ |
| "Cross-Origin-Opener-Policy": "same-origin", |
| "Cross-Origin-Embedder-Policy": "require-corp" |
| }) |
| return Response("index.html not found", status_code=404) |
|
|
| app.mount("/", StaticFiles(directory="dist"), name="root") |
|
|
|
|
| async def init_server(): |
| """Initialize server components that need async init.""" |
| global VCSKY_LOCAL_PATH, VCBR_LOCAL_PATH |
| |
| |
| if args.unpacked: |
| vcsky_path, vcbr_path = await setup_unpacked(args.unpacked) |
| if vcsky_path: |
| VCSKY_LOCAL_PATH = vcsky_path |
| if vcbr_path: |
| VCBR_LOCAL_PATH = vcbr_path |
| |
| |
| if args.packed: |
| |
| |
| result = await init_packed_archive(args.packed) |
| if result is None: |
| print(f"Warning: Failed to initialize packed archive from: {args.packed}") |
|
|
|
|
| def start_server(app=app, host="0.0.0.0", port=args.port): |
| import uvicorn |
| |
| |
| if args.packed or args.unpacked: |
| asyncio.run(init_server()) |
| |
| uvicorn.run(app, host=host, port=port) |
|
|
|
|
| if __name__ == "__main__": |
| |
| if args.pack: |
| print(f"Pack mode: {args.pack}") |
| packed_file = pack_source(args.pack) |
| if packed_file: |
| print(f"\nUsing packed archive: {packed_file}") |
| args.packed = packed_file |
| else: |
| print("Packing failed, exiting.") |
| sys.exit(1) |
| |
| print(f"Starting server on http://localhost:{args.port}") |
| |
| if args.unpacked: |
| print(f"unpacked mode: {args.unpacked}") |
| elif args.packed: |
| print(f"packed: {args.packed}") |
| else: |
| vcsky_mode = 'local' if args.vcsky_local else 'proxy' |
| vcbr_mode = 'local' if args.vcbr_local else 'proxy' |
| vcsky_info = args.vcsky_local or VCSKY_BASE_URL |
| vcbr_info = args.vcbr_local or VCBR_BASE_URL |
| print(f"vcsky: {vcsky_mode} ({vcsky_info})") |
| print(f"vcbr: {vcbr_mode} ({vcbr_info})") |
| |
| start_server() |
|
|