| import os |
| import re |
| import json |
| import requests |
| import asyncio |
| from fastapi import FastAPI, Form, Request |
| from fastapi.responses import HTMLResponse, StreamingResponse |
| from fastapi.templating import Jinja2Templates |
| from huggingface_hub import HfApi |
|
|
| app = FastAPI() |
| templates = Jinja2Templates(directory="templates") |
|
|
| |
| HF_TOKEN = os.getenv("HF_TOKEN") |
| HF_REPO_ID = os.getenv("HF_REPO_ID", "factorstudios/movs") |
| DOWNLOAD_DIR = "downloads" |
|
|
| if not os.path.exists(DOWNLOAD_DIR): |
| os.makedirs(DOWNLOAD_DIR) |
|
|
| def get_direct_link(page_url): |
| |
| match = re.search(r'downloadwella\.com/([^/]+)', page_url) |
| if not match: |
| return None, "Error: Could not extract file ID from URL." |
| |
| file_id = match.group(1).split('.')[0] |
| |
| data = { |
| "op": "download2", |
| "id": file_id, |
| "rand": "", |
| "referer": "", |
| "method_free": "", |
| "method_premium": "" |
| } |
|
|
| headers = { |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", |
| "Referer": page_url |
| } |
|
|
| try: |
| response = requests.post(page_url, data=data, headers=headers) |
| response.raise_for_status() |
| |
| download_match = re.search(r'href="(https://[^"]+\.downloadwella\.com/d/[^"]+)"', response.text) |
| |
| if download_match: |
| return download_match.group(1), None |
| else: |
| if "captcha" in response.text.lower(): |
| return None, "Error: CAPTCHA required by the site. Cannot automate." |
| return None, "Error: Could not find direct download link." |
| except Exception as e: |
| return None, f"Error: {str(e)}" |
|
|
| async def process_transfer(url: str): |
| yield json.dumps({"message": f"Analyzing URL: {url}"}) + "\n" |
| |
| direct_link, error = get_direct_link(url) |
| if error: |
| yield json.dumps({"message": error, "error": True}) + "\n" |
| return |
|
|
| filename = direct_link.split('/')[-1] |
| filepath = os.path.join(DOWNLOAD_DIR, filename) |
| |
| yield json.dumps({"message": f"Direct link found. Downloading {filename}..."}) + "\n" |
| |
| try: |
| |
| with requests.get(direct_link, stream=True) as r: |
| r.raise_for_status() |
| total_size = int(r.headers.get('content-length', 0)) |
| downloaded = 0 |
| with open(filepath, 'wb') as f: |
| for chunk in r.iter_content(chunk_size=1024*1024): |
| if chunk: |
| f.write(chunk) |
| downloaded += len(chunk) |
| |
| |
| yield json.dumps({"message": f"Download complete. Uploading to Hugging Face..."}) + "\n" |
| |
| |
| if not HF_TOKEN: |
| yield json.dumps({"message": "Error: HF_TOKEN environment variable not set.", "error": True}) + "\n" |
| return |
| |
| api = HfApi(token=HF_TOKEN) |
| api.upload_file( |
| path_or_fileobj=filepath, |
| path_in_repo=filename, |
| repo_id=HF_REPO_ID, |
| repo_type="dataset" |
| ) |
| |
| yield json.dumps({"message": f"Successfully uploaded {filename} to {HF_REPO_ID}!"}) + "\n" |
| |
| |
| os.remove(filepath) |
| yield json.dumps({"message": "Cleaned up local file."}) + "\n" |
| |
| except Exception as e: |
| yield json.dumps({"message": f"Error during process: {str(e)}", "error": True}) + "\n" |
|
|
| @app.get("/", response_class=HTMLResponse) |
| async def index(request: Request): |
| return templates.TemplateResponse(request, "index.html") |
|
|
| @app.post("/process") |
| async def process(url: str = Form(...)): |
| return StreamingResponse(process_transfer(url), media_type="application/x-ndjson") |
|
|
| if __name__ == "__main__": |
| import uvicorn |
| uvicorn.run(app, host="0.0.0.0", port=7860) |
|
|