medium / main.py
MyanmarSwe's picture
Update main.py
34b6af5 verified
import os
import re
import json
import time
import httpx
import random
import asyncio
import urllib.parse
import mimetypes
from bs4 import BeautifulSoup
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import StreamingResponse
from fake_useragent import UserAgent
import uvicorn
# Google Auth Libraries
from google.oauth2 import service_account
import google.auth.transport.requests
app = FastAPI()
# --- Configurations ---
ACCESS_KEY = os.getenv("ACCESS_KEY", "0000")
SERVICE_ACCOUNT_JSON_STR = os.getenv("GOOGLE_SERVICE_ACCOUNT_JSON")
ua = UserAgent(fallback='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36')
# MediaFire Cache
MEDIAFIRE_CACHE = {}
CACHE_TTL = 1800
MAX_CONCURRENT_REQUESTS = 30
semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS)
client = httpx.AsyncClient(
timeout=httpx.Timeout(30.0, read=None),
follow_redirects=True,
limits=httpx.Limits(max_connections=500, max_keepalive_connections=100)
)
@app.get("/")
async def index():
return {"status": "Online"}
def get_all_service_accounts():
if not SERVICE_ACCOUNT_JSON_STR: return []
try:
data = json.loads(SERVICE_ACCOUNT_JSON_STR)
return data if isinstance(data, list) else [data]
except: return []
async def get_token_for_account(cred_dict):
try:
scopes = ['https://www.googleapis.com/auth/drive.readonly']
creds = service_account.Credentials.from_service_account_info(cred_dict, scopes=scopes)
loop = asyncio.get_event_loop()
auth_req = google.auth.transport.requests.Request()
await loop.run_in_executor(None, creds.refresh, auth_req)
return creds.token
except: return None
def get_google_file_id(url):
fid = re.search(r'/(?:d|file/d|open\?id=)/([a-zA-Z0-9_-]+)', url)
return fid.group(1) if fid else None
def get_clean_filename(url):
decoded_url = urllib.parse.unquote(url)
name = decoded_url.split('/')[-1].split('?')[0]
return name if (name and '.' in name) else "video.mp4"
async def scrape_mediafire(url):
try:
headers = {
'User-Agent': ua.random,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8',
'Referer': 'https://www.mediafire.com/'
}
async with httpx.AsyncClient(headers=headers, follow_redirects=True, timeout=15.0) as temp_client:
r = await temp_client.get(url)
if r.status_code != 200: return None
# Direct link search
target = None
match = re.search(r'https?://download[^\s"\']+mediafire\.com/[^\s"\']+', r.text)
if match:
target = match.group(0).strip().replace("'", "").replace('"', '')
if not target:
soup = BeautifulSoup(r.text, 'html.parser')
btn = soup.find('a', {'id': 'downloadButton'}) or soup.find('a', {'aria-label': re.compile(r'Download', re.I)})
if btn: target = btn.get('href')
# --- URL Fix Section ---
if target:
# α€‘α€€α€šα€Ία // နဲ့စရင် https: α€‘α€Šα€·α€Ία€™α€šα€Ί
if target.startswith("//"):
target = f"https:{target}"
# α€‘α€€α€šα€Ία /file/... α€œα€­α€―α€· Relative path α€”α€²α€·α€œα€¬α€›α€„α€Ί MediaFire domain α€‘α€Šα€·α€Ία€•α€±α€Έα€™α€šα€Ί
elif target.startswith("/"):
target = f"https://www.mediafire.com{target}"
return target
except: pass
return None
@app.get("/download")
async def download_proxy(request: Request, url: str, key: str = None):
if key != ACCESS_KEY:
raise HTTPException(status_code=403, detail="Access Denied")
clean_url = urllib.parse.unquote(url)
filename = get_clean_filename(clean_url)
range_header = request.headers.get('range')
current_time = time.time()
# --- MediaFire Section ---
if "mediafire.com" in clean_url:
async with semaphore:
cached = MEDIAFIRE_CACHE.get(clean_url)
target_link = cached['link'] if (cached and (current_time - cached['time']) < CACHE_TTL) else None
if not target_link:
target_link = await scrape_mediafire(clean_url)
if target_link:
MEDIAFIRE_CACHE[clean_url] = {'link': target_link, 'time': current_time}
if not target_link:
raise HTTPException(status_code=404, detail="Direct link not found.")
try:
return await stream_file(target_link, range_header, filename, referer=clean_url)
except HTTPException as e:
if e.status_code == 415:
if clean_url in MEDIAFIRE_CACHE: del MEDIAFIRE_CACHE[clean_url]
new_link = await scrape_mediafire(clean_url)
if new_link: return await stream_file(new_link, range_header, filename, referer=clean_url)
raise e
except Exception:
raise HTTPException(status_code=500, detail="Streaming process error.")
# --- Google Drive Section ---
elif "drive.google.com" in clean_url:
file_id = get_google_file_id(clean_url)
if not file_id: raise HTTPException(status_code=400, detail="Invalid GD Link")
accounts = get_all_service_accounts()
random.shuffle(accounts)
for account in accounts:
token = await get_token_for_account(account)
if not token: continue
api_link = f"https://www.googleapis.com/drive/v3/files/{file_id}?alt=media"
headers = {"Authorization": f"Bearer {token}"}
if range_header: headers['Range'] = range_header
try:
req = client.build_request("GET", api_link, headers=headers)
r = await client.send(req, stream=True)
if r.status_code in [200, 206]:
return await process_response(r, filename)
await r.aclose()
except: continue
public_url = f"https://drive.google.com/uc?export=download&id={file_id}"
return await stream_file(public_url, range_header, filename)
else:
return await stream_file(clean_url, range_header, filename)
async def stream_file(target_url, range_header, filename, referer=None):
# Ensure full URL for httpx
if not target_url.startswith("http"):
raise HTTPException(status_code=500, detail=f"Malformed URL: {target_url}")
headers = {'User-Agent': ua.random}
if range_header: headers['Range'] = range_header
if referer: headers['Referer'] = referer
req = client.build_request("GET", target_url, headers=headers)
r = await client.send(req, stream=True)
if "text/html" in r.headers.get("Content-Type", "").lower() and r.status_code == 200:
await r.aclose()
raise HTTPException(status_code=415, detail="Blocked by MediaFire.")
return await process_response(r, filename)
async def process_response(r, filename):
mime_type, _ = mimetypes.guess_type(filename)
if not mime_type or 'application' in mime_type:
mime_type = 'video/mp4' if filename.lower().endswith('.mp4') else 'video/x-matroska'
res_headers = {
'Content-Type': mime_type,
'Accept-Ranges': 'bytes',
'Content-Disposition': f'inline; filename="{urllib.parse.quote(filename)}"',
'Cache-Control': 'no-cache'
}
if 'content-length' in r.headers: res_headers['Content-Length'] = r.headers['content-length']
if 'content-range' in r.headers: res_headers['Content-Range'] = r.headers['content-range']
async def stream_generator():
try:
async for chunk in r.aiter_bytes(chunk_size=131072):
yield chunk
finally:
await r.aclose()
return StreamingResponse(
stream_generator(),
status_code=r.status_code,
headers=res_headers,
media_type=mime_type
)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860)