streamtape / main.py
bigbossmonster's picture
Update main.py
70dc04e verified
import os
import requests
import shutil
import time
import re
import asyncio
from fastapi import FastAPI, Form, Request, BackgroundTasks
from fastapi.templating import Jinja2Templates
from fastapi.responses import HTMLResponse, StreamingResponse
from huggingface_hub import HfApi
app = FastAPI()
templates = Jinja2Templates(directory="templates")
# --- CONFIGURATION ---
API_LOGIN = os.getenv("STREAMTAPE_LOGIN", "")
API_KEY = os.getenv("STREAMTAPE_KEY", "")
DOWNLOAD_DIR = "/data"
# Global Log Buffer
log_buffer = []
async def log(message: str):
"""Adds a message to the log buffer."""
timestamp = time.strftime("%H:%M:%S")
formatted_msg = f"[{timestamp}] {message}"
print(formatted_msg)
log_buffer.append(formatted_msg)
if len(log_buffer) > 100:
log_buffer.pop(0)
def extract_file_id(url: str):
"""Cleanly extracts ID from various URL formats."""
url = url.strip()
if "/v/" in url: return url.split("/v/")[1].split("/")[0]
if "/e/" in url: return url.split("/e/")[1].split("/")[0]
return url
def get_download_info(file_id: str):
"""
Returns a tuple: (direct_url, filename)
"""
base_url = "https://api.streamtape.com/file/dlticket"
params = {'file': file_id, 'login': API_LOGIN, 'key': API_KEY}
max_retries = 5
for attempt in range(max_retries):
try:
print(f"Requesting ticket for {file_id} (Attempt {attempt+1})...")
response = requests.get(base_url, params=params).json()
status = response.get('status')
msg = response.get('msg', '')
result = response.get('result', {})
# --- CASE 1: SUCCESS (200 OK) ---
if status == 200:
# Sub-case A: Direct URL provided immediately
if result.get('url'):
# Name usually provided in result as well
name = result.get('name', f"{file_id}.mp4")
return result['url'], name
# Sub-case B: Ticket provided
elif result.get('ticket'):
ticket = result['ticket']
wait_time = result.get('wait_time', 5)
print(f"🎟️ Ticket found. Waiting {wait_time}s...")
time.sleep(wait_time)
# Step 2: Get Final Link & Name
dl_url = "https://api.streamtape.com/file/dl"
dl_params = {'file': file_id, 'ticket': ticket}
dl_resp = requests.get(dl_url, params=dl_params).json()
if dl_resp.get('status') == 200:
final_url = dl_resp['result']['url']
final_name = dl_resp['result']['name']
return final_url, final_name
else:
raise Exception(f"Step 2 Error: {dl_resp.get('msg')}")
else:
raise Exception("API returned 200 but missing URL/Ticket.")
# --- CASE 2: WAIT (403) ---
elif status == 403 and "wait" in msg.lower():
wait_match = re.search(r'(\d+)', msg)
wait_seconds = int(wait_match.group(1)) if wait_match else 5
print(f"Rate Limit: Sleeping {wait_seconds}s...")
time.sleep(wait_seconds + 2)
continue
else:
raise Exception(f"API Error: {msg}")
except Exception as e:
print(f"Error: {e}")
if attempt == max_retries - 1: raise e
time.sleep(2)
raise Exception("Max retries exceeded")
async def batch_processor(urls: list, hf_token: str, hf_repo: str, repo_type: str):
"""
Processes a list of URLs sequentially:
Download -> Upload -> Delete -> Next
"""
await log(f"πŸ“¦ Batch started with {len(urls)} links.")
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/115.0.0.0 Safari/537.36",
"Referer": "https://streamtape.com/",
"Connection": "keep-alive"
}
for index, url in enumerate(urls):
if not url.strip(): continue
file_id = extract_file_id(url)
await log(f"▢️ Processing ({index+1}/{len(urls)}): ID {file_id}")
local_path = None
try:
# 1. Get Link & Name
direct_url, filename = await asyncio.to_thread(get_download_info, file_id)
await log(f" πŸ”Ή Name Detected: {filename}")
local_path = os.path.join(DOWNLOAD_DIR, filename)
# 2. Download
await log(f" ⬇️ Downloading...")
def download_file():
with requests.Session() as s:
s.headers.update(headers)
with s.get(direct_url, stream=True, timeout=45) as r:
r.raise_for_status()
with open(local_path, 'wb') as f:
shutil.copyfileobj(r.raw, f)
await asyncio.to_thread(download_file)
await log(f" βœ… Downloaded to storage.")
# 3. Upload to HF
if hf_token and hf_repo:
await log(f" ⬆️ Uploading to HF ({repo_type})...")
def upload_file():
api = HfApi(token=hf_token)
api.upload_file(
path_or_fileobj=local_path,
path_in_repo=filename,
repo_id=hf_repo,
repo_type=repo_type
)
await asyncio.to_thread(upload_file)
await log(f" πŸŽ‰ Upload Complete!")
# 4. DELETE LOCAL FILE (Cleanup)
if os.path.exists(local_path):
os.remove(local_path)
await log(f" πŸ—‘οΈ Local file deleted to free space.")
except Exception as e:
await log(f" ❌ Failed: {str(e)}")
# Even if failed, try to cleanup partial files
if local_path and os.path.exists(local_path):
os.remove(local_path)
await log("🏁 Batch Processing Completed.")
# --- API ENDPOINTS ---
@app.get("/stream-logs")
async def stream_logs(request: Request):
async def event_generator():
last_index = 0
while True:
if await request.is_disconnected(): break
if last_index < len(log_buffer):
new_logs = log_buffer[last_index:]
last_index = len(log_buffer)
for msg in new_logs: yield f"data: {msg}\n\n"
await asyncio.sleep(1)
return StreamingResponse(event_generator(), media_type="text/event-stream")
@app.get("/", response_class=HTMLResponse)
async def read_root(request: Request):
files = os.listdir(DOWNLOAD_DIR) if os.path.exists(DOWNLOAD_DIR) else []
return templates.TemplateResponse("index.html", {"request": request, "files": files})
@app.post("/start_batch")
async def start_batch(
request: Request,
background_tasks: BackgroundTasks,
urls: str = Form(...),
hf_token: str = Form(None),
hf_repo: str = Form(None),
repo_type: str = Form("dataset")
):
# Split text area by newlines
url_list = [line.strip() for line in urls.splitlines() if line.strip()]
if not url_list:
return templates.TemplateResponse("index.html", {
"request": request,
"files": os.listdir(DOWNLOAD_DIR),
"message": "❌ No URLs provided."
})
# Start background process
background_tasks.add_task(batch_processor, url_list, hf_token, hf_repo, repo_type)
return templates.TemplateResponse("index.html", {
"request": request,
"files": os.listdir(DOWNLOAD_DIR),
"message": f"πŸš€ Batch started for {len(url_list)} files."
})