File size: 4,034 Bytes
9c96e18 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 | import os
import re
import json
import requests
import asyncio
from fastapi import FastAPI, Form, Request
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.templating import Jinja2Templates
from huggingface_hub import HfApi
app = FastAPI()
templates = Jinja2Templates(directory="templates")
# Configuration from environment variables
HF_TOKEN = os.getenv("HF_TOKEN")
HF_REPO_ID = os.getenv("HF_REPO_ID", "factorstudios/movs")
DOWNLOAD_DIR = "downloads"
if not os.path.exists(DOWNLOAD_DIR):
os.makedirs(DOWNLOAD_DIR)
def get_direct_link(page_url):
# Extract ID from URL
match = re.search(r'downloadwella\.com/([^/]+)', page_url)
if not match:
return None, "Error: Could not extract file ID from URL."
file_id = match.group(1).split('.')[0]
data = {
"op": "download2",
"id": file_id,
"rand": "",
"referer": "",
"method_free": "",
"method_premium": ""
}
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Referer": page_url
}
try:
response = requests.post(page_url, data=data, headers=headers)
response.raise_for_status()
download_match = re.search(r'href="(https://[^"]+\.downloadwella\.com/d/[^"]+)"', response.text)
if download_match:
return download_match.group(1), None
else:
if "captcha" in response.text.lower():
return None, "Error: CAPTCHA required by the site. Cannot automate."
return None, "Error: Could not find direct download link."
except Exception as e:
return None, f"Error: {str(e)}"
async def process_transfer(url: str):
yield json.dumps({"message": f"Analyzing URL: {url}"}) + "\n"
direct_link, error = get_direct_link(url)
if error:
yield json.dumps({"message": error, "error": True}) + "\n"
return
filename = direct_link.split('/')[-1]
filepath = os.path.join(DOWNLOAD_DIR, filename)
yield json.dumps({"message": f"Direct link found. Downloading {filename}..."}) + "\n"
try:
# Download file
with requests.get(direct_link, stream=True) as r:
r.raise_for_status()
total_size = int(r.headers.get('content-length', 0))
downloaded = 0
with open(filepath, 'wb') as f:
for chunk in r.iter_content(chunk_size=1024*1024): # 1MB chunks
if chunk:
f.write(chunk)
downloaded += len(chunk)
# Yield progress every few MBs if needed, but for simplicity:
yield json.dumps({"message": f"Download complete. Uploading to Hugging Face..."}) + "\n"
# Upload to Hugging Face
if not HF_TOKEN:
yield json.dumps({"message": "Error: HF_TOKEN environment variable not set.", "error": True}) + "\n"
return
api = HfApi(token=HF_TOKEN)
api.upload_file(
path_or_fileobj=filepath,
path_in_repo=filename,
repo_id=HF_REPO_ID,
repo_type="dataset"
)
yield json.dumps({"message": f"Successfully uploaded {filename} to {HF_REPO_ID}!"}) + "\n"
# Cleanup
os.remove(filepath)
yield json.dumps({"message": "Cleaned up local file."}) + "\n"
except Exception as e:
yield json.dumps({"message": f"Error during process: {str(e)}", "error": True}) + "\n"
@app.get("/", response_class=HTMLResponse)
async def index(request: Request):
return templates.TemplateResponse(request, "index.html")
@app.post("/process")
async def process(url: str = Form(...)):
return StreamingResponse(process_transfer(url), media_type="application/x-ndjson")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)
|