terab / app.py
lelafav502's picture
Update app.py
a12b1c8 verified
import requests
import random
import string
import os
import json
import re
import gradio as gr
import tempfile
import shutil
MAIN_FILE_PATH = ""
def read_tokens_from_remote_file(url="https://temp-gmail.site/Tera/token.txt"):
tokens = {}
try:
# Download the token.txt file from the remote server
response = requests.get(url)
if response.status_code == 200:
# Split the content into lines and process them
for line in response.text.splitlines():
if '=' in line:
key, value = line.strip().split("=", 1)
tokens[key] = value
else:
print(f"Failed to retrieve token file. Status code: {response.status_code}")
except Exception as e:
print(f"Error reading token file from {url}: {e}")
return tokens
def handle_folder(uk, shareid, fs_id, jsToken, dp_logid, path):
# Prepare the URL and parameters for listing the contents of the folder
list_url = "https://www.terabox.app/api/list"
print(f"path response: {path}")
params = {
"app_id": "250528",
"web": "1",
"channel": "dubox",
"clienttype": "0",
"jsToken": jsToken,
"dp-logid": dp_logid,
"order": "name",
"desc": "0",
"dir": path, # Update this directory path as needed
"num": "100",
"page": "1",
"showempty": "0",
}
headers = {
"Host": "www.terabox.app",
"Cookie": "browserid=G0hDsCquBPA7SSZhsP9nY5hYCixA2ctDf-JeDzQTHuMFgCPZT1LrK7Gcp_4=; lang=en; TSID=GXcraYNHdclr0xNa4BMqyzpGHfqrRMQG; __bid_n=1948a3d5438a9a3b484207; _ga=GA1.1.1889078386.1737486066; _gcl_au=1.1.472633227.1737486142.1.1.472633227.1737486142; _fbp=fb.1.1737486141922.941960413240922412; _ga_K6JMPYL99R=GS1.1.1737486141.1.1.1737486243.0.0.0; _ga_RSNVN63CM3=GS1.1.1737486142.1.1.1737486243.20.0.0; ndus=Y2nBGixteHuiHGQ3H-ej0llprWUxc7t-lJpvCMfD; csrfToken=ya4Ks9cIFSQ5V-JSQ4zGXGYi; ndut_fmt=11FD31F142893EB8AF1586CD4735802CA414231F2F54D5010CE32D345FA1248E; ab_sr=1.0.1_NWYxODczYWFkODZkYmFiYzY3ZDkyMmI0OTgzZjI2ZGI1ZWZjZDdkYzcwNTlmNDI5NjdkMTcyMTQ5YmQwMTA3OWFmZDEzYjQ3OGYyMGIyOGNkYmUxMTA0MDA2YTczZWQzNjliMTU5NzY4MTI3YjM3ZjM3N2JhNGEwMGQ0Njc0OTliMWFlYjEyZjNkNWJhMWM3ZTJiMDQzYjRlMjY1ZWQ1NQ==; _ga_06ZNKL8C2E=GS1.1.1737635772.2.1.1737635785.47.0.0",
"X-Requested-With": "XMLHttpRequest",
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36",
"Accept": "application/json, text/plain, */*",
"Sec-Ch-Ua": "\"Not A(Brand\";v=\"8\", \"Chromium\";v=\"132\", \"Google Chrome\";v=\"132\"",
"Content-Type": "application/x-www-form-urlencoded",
"Sec-Ch-Ua-Mobile": "?0",
"Origin": "https://www.terabox.app",
"Sec-Fetch-Site": "same-origin",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Dest": "empty",
"Referer": f"https://www.terabox.app/sharing/link?surl=Wre_RELLyel86eQ0-S_Eow&download=0&fid={fs_id}&shareid={shareid}&from={uk}",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7",
"Priority": "u=1, i",
"Connection": "keep-alive",
}
response = requests.get(list_url, headers=headers, params=params)
print(f"first response: {response.json()}")
if response.status_code == 200:
files = response.json().get("list", [])
final_response = []
print(f"files response: {files}")
saved_paths = []
for file in files:
if file["isdir"] == 0: # Only process files (not directories)
file_path = file["path"]
print(file_path)
saved_path = download_m3u8_file(file_path)
saved_paths.append(saved_path)
print(f"saved_path in handel {saved_path}")
if saved_path:
metadata = {
"category": "1",
"fs_id": file["fs_id"], # Direct fs_id from the response
"isdir": "0", # It's a file, not a directory
"local_ctime": "", # Current time for creation
"local_mtime": "", # Current time for modification
"md5": "", # Simulating the md5 for now
"path": file_path,
"dlink": "",
"fastdlink": "",
"fdlink": "",
"play_forbid": "0",
"server_ctime": "",
"server_filename": file["server_filename"],
"server_mtime": "",
"size": file["size"],
"thumbs": file["thumbs"], # Direct thumbs data from the API response
"emd5": "", # MD5 of file path
"share_id": "shareid",
"uk": "uk",
"timstamp": "",
"jsToken": jsToken,
}
final_response.append({
"metadata": metadata,
"streamingUrl": {
"streaming_url": saved_path # Path to the saved file
}
})
print(f"final_response in handel {final_response}")
return final_response, saved_paths
else:
print(f"Failed to list folder contents. Status code: {response.status_code}")
return []
# Define the transfer_file function
def transfer_file(uk, shareid, fs_id):
global MAIN_FILE_PATH
tokens = read_tokens_from_remote_file("https://temp-gmail.site/Tera/token.txt")
jsToken = tokens.get("jsToken", "")
dp_logid = tokens.get("dp-logid", "")
url = "https://www.terabox.app/share/transfer"
params = {
"app_id": "250528",
"web": "1",
"channel": "dubox",
"clienttype": "0",
"jsToken": jsToken,
"dp-logid": dp_logid,
"ondup": "newcopy",
"async": "1",
"scene": "purchased_list",
"bdstoken": "",
"shareid": shareid,
"from": uk,
}
headers = {
"Host": "www.terabox.app",
"Cookie": "browserid=G0hDsCquBPA7SSZhsP9nY5hYCixA2ctDf-JeDzQTHuMFgCPZT1LrK7Gcp_4=; lang=en; TSID=GXcraYNHdclr0xNa4BMqyzpGHfqrRMQG; __bid_n=1948a3d5438a9a3b484207; _ga=GA1.1.1889078386.1737486066; _gcl_au=1.1.472633227.1737486142; _fbp=fb.1.1737486141922.941960413240922412; _ga_K6JMPYL99R=GS1.1.1737486141.1.1.1737486243.0.0.0; _ga_RSNVN63CM3=GS1.1.1737486142.1.1.1737486243.20.0.0; ndus=Y2nBGixteHuiHGQ3H-ej0llprWUxc7t-lJpvCMfD; csrfToken=ya4Ks9cIFSQ5V-JSQ4zGXGYi; ndut_fmt=11FD31F142893EB8AF1586CD4735802CA414231F2F54D5010CE32D345FA1248E; ab_sr=1.0.1_NWYxODczYWFkODZkYmFiYzY3ZDkyMmI0OTgzZjI2ZGI1ZWZjZDdkYzcwNTlmNDI5NjdkMTcyMTQ5YmQwMTA3OWFmZDEzYjQ3OGYyMGIyOGNkYmUxMTA0MDA2YTczZWQzNjliMTU5NzY4MTI3YjM3ZjM3N2JhNGEwMGQ0Njc0OTliMWFlYjEyZjNkNWJhMWM3ZTJiMDQzYjRlMjY1ZWQ1NQ==; _ga_06ZNKL8C2E=GS1.1.1737635772.2.1.1737635785.47.0.0",
"X-Requested-With": "XMLHttpRequest",
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36",
"Accept": "application/json, text/plain, */*",
"Sec-Ch-Ua": "\"Not A(Brand\";v=\"8\", \"Chromium\";v=\"132\", \"Google Chrome\";v=\"132\"",
"Content-Type": "application/x-www-form-urlencoded",
"Sec-Ch-Ua-Mobile": "?0",
"Origin": "https://www.terabox.app",
"Sec-Fetch-Site": "same-origin",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Dest": "empty",
"Referer": f"https://www.terabox.app/sharing/link?surl=Wre_RELLyel86eQ0-S_Eow&download=0&fid={fs_id}&shareid={shareid}&from={uk}",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7",
"Priority": "u=1, i",
"Connection": "keep-alive",
}
data = {
"fsidlist": f'["{fs_id}"]',
"path": "/",
}
response = requests.post(url, headers=headers, params=params, data=data)
if response.status_code == 200:
print("Request successful!")
print(response.json())
item = response.json().get('extra', {}).get('list', [{}])[0]
is_dir = int(item.get('meta', {}).get('isdir', "0")) # Convert to int
print(f"Type of is_dir: {type(is_dir)}")
print(f"is_dir value: {is_dir}")
paths = item.get('to', '')
if is_dir == 0: # It's a file
path = item.get('to', '')
MAIN_FILE_PATH = path
print(f"File transfer path: {path}")
file_path = download_m3u8_file(path)
print(f"singla file path: {file_path}")
return file_path
elif is_dir == 1: # It's a folder
print("Item is a folder, calling new function for folder...")
handle_path = handle_folder(uk, shareid, fs_id, jsToken, dp_logid, paths)
print(f"handle_path in transfer {handle_path}")
return handle_path
else:
print(f"Unknown type: {is_dir}. Cannot determine if it's a file or folder.")
else:
print(f"Request failed with status code {response.status_code}")
print(response.text)
def get_share_info(myurl):
url = myurl
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.6778.140 Safari/537.36',
'Accept': 'application/json, text/plain, */*',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Host': 'www.terabox.com',
}
response = requests.get(url, headers=headers)
if response.status_code == 200:
data = response.json()
fs_id = data['list'][0]['fs_id']
shareid = data['shareid']
uk = data['uk']
path = data['list'][0]['path']
category = data['list'][0]['category']
size = data['list'][0]['size']
dlink = data['list'][0]['dlink']
thumbs = data['list'][0].get('thumbs', {})
isdir = int(data['list'][0]['isdir'])
print(f"isdir {isdir}")
file_path = transfer_file(uk, shareid, fs_id)
print(f"share fun file_path {file_path}")
print("all ok")
print(f"dir value {isdir}")
if isdir == 0: # File case
print("is file 0")
formatted_response = {
"metadata": {
"category": category,
"fs_id": fs_id,
"isdir": "0",
"local_ctime": "1735200453",
"local_mtime": "1735200433",
"md5": "b2fd54d5cc01f5e22e0c18e60d4e4c98",
"path": path, # Using actual path instead of MAIN_FILE_PATH
"dlink": "",
"fastdlink" : "",
"fdlink" : "",
"play_forbid": "0",
"server_ctime": "1735200453",
"server_filename": path.split('/')[-1],
"server_mtime": "1736239992",
"size": size,
"thumbs": thumbs,
"emd5": "cd22b0853r56993a0c6d09ffa7a7d509",
"share_id": shareid,
"uk": uk,
"timstamp": int(data.get('timestamp', 0)),
"jsToken": ""
}
}
formatted_response["streamingUrl"] = {"streaming_url": file_path}
return formatted_response, file_path
elif isdir == 1: # Directory case
print("is folder 1")
print(f"most final {file_path}")
return file_path, "ymg"
else:
return {"error": f"Failed to fetch the data. Status code: {response.status_code}"}, None
def download_m3u8_file(path):
m3u8_url = f"https://www.1024terabox.com/api/streaming?path={path}&app_id=250528&clienttype=0&type=M3U8_FLV_264_480&vip=0"
print(m3u8_url)
cookies = {
'browserid': 'RC7Iabr17gjK8jnNdwr1g9hU__4SHnZ_Njqywo42OSky86gpvByzbraNxV8=',
'lang': 'en',
'ndus': 'Y-X3AgxteHuiyU_6vzb8dhUsXIgrYMIQIJbVVArW',
'csrfToken': 'x1PLq8LjUOKLOSiZDQTJi5aQ',
}
headers = {
"Host": "www.terabox.app",
"Cookie": "browserid=G0hDsCquBPA7SSZhsP9nY5hYCixA2ctDf-JeDzQTHuMFgCPZT1LrK7Gcp_4=; lang=en; TSID=GXcraYNHdclr0xNa4BMqyzpGHfqrRMQG; __bid_n=1948a3d5438a9a3b484207; _ga=GA1.1.1889078386.1737486066; _gcl_au=1.1.472633227.1737486142; _fbp=fb.1.1737486141922.941960413240922412; _ga_K6JMPYL99R=GS1.1.1737486141.1.1.1737486243.0.0.0; _ga_RSNVN63CM3=GS1.1.1737486142.1.1.1737486243.20.0.0; ndus=Y2nBGixteHuiHGQ3H-ej0llprWUxc7t-lJpvCMfD; csrfToken=ya4Ks9cIFSQ5V-JSQ4zGXGYi; ndut_fmt=11FD31F142893EB8AF1586CD4735802CA414231F2F54D5010CE32D345FA1248E; ab_sr=1.0.1_NWYxODczYWFkODZkYmFiYzY3ZDkyMmI0OTgzZjI2ZGI1ZWZjZDdkYzcwNTlmNDI5NjdkMTcyMTQ5YmQwMTA3OWFmZDEzYjQ3OGYyMGIyOGNkYmUxMTA0MDA2YTczZWQzNjliMTU5NzY4MTI3YjM3ZjM3N2JhNGEwMGQ0Njc0OTliMWFlYjEyZjNkNWJhMWM3ZTJiMDQzYjRlMjY1ZWQ1NQ==; _ga_06ZNKL8C2E=GS1.1.1737635772.2.1.1737635785.47.0.0",
"X-Requested-With": "XMLHttpRequest",
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36",
"Accept": "application/json, text/plain, */*",
"Sec-Ch-Ua": "\"Not A(Brand\";v=\"8\", \"Chromium\";v=\"132\", \"Google Chrome\";v=\"132\"",
"Content-Type": "application/x-www-form-urlencoded",
"Sec-Ch-Ua-Mobile": "?0",
"Origin": "https://www.terabox.app",
"Sec-Fetch-Site": "same-origin",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Dest": "empty",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7",
"Priority": "u=1, i",
"Connection": "keep-alive",
}
response = requests.get(m3u8_url, headers=headers)
if response.status_code == 200:
with tempfile.NamedTemporaryFile(delete=False, suffix=".m3u8", dir="/tmp") as tmp_file:
tmp_path = tmp_file.name # Temporary file path
# Write the response content to the temporary file
with open(tmp_path, 'wb') as file:
file.write(response.content)
print(f"File saved to: {tmp_path}")
return tmp_path
else:
print(f"Failed to download m3u8 file. Status code: {response.status_code}")
print(response.text)
def generate_random_filename(length=10):
return ''.join(random.choices(string.ascii_letters + string.digits, k=length))
def gradio_interface(myurl):
if myurl:
# Extract the part of the URL after 's/'
match = re.search(r's/([^/]+)', myurl)
if match:
short_url_part = match.group(1) # This is the value after 's/'
api_url = f"https://www.terabox.com/api/shorturlinfo?app_id=250528&shorturl={short_url_part}&root=1"
response, file_path = get_share_info(api_url) # Assume get_share_info returns response and file path
if file_path == "ymg":
# If file_path is 'ymg', return only the response
print(f"gradio interface file path : {file_path}")
print(f"gradio interface response : {response}")
return response
else:
# If file_path is not 'ymg', return both response and file path
print(f"gradio interface file path : {file_path}")
print(f"gradio interface response : {response}")
return response, file_path
else:
# If the URL doesn't match the expected pattern
return {"error": "Invalid URL format"}, None
else:
# If URL is missing
return {"error": "URL is missing"}, None
# def gradio_interface(myurl):
# if myurl:
# match = re.search(r's/([^/]+)', myurl)
# if match:
# short_url_part = match.group(1) # This is the value after 's/'
# api_url = f"https://www.terabox.com/api/shorturlinfo?app_id=250528&shorturl={short_url_part}&root=1"
# response, file_path = get_share_info(api_url)
# if file_path:
# # Return the JSON response and the file path as a tuple
# print(f"gradio interface file path : {file_path}")
# print(f"gradio interface response : {response}")
# return response
# else:
# return {"error": "File download failed"}, None
# else:
# return {"error": "Invalid URL format"}, None
# else:
# return {"error": "URL is missing"}, None
# Create Gradio interface
iface = gr.Interface(
fn=gradio_interface,
inputs=gr.Textbox(label="Enter URL", placeholder="Enter the share URL here..."),
outputs=["json", gr.File()],
title="Share Info Retrieval"
)
# iface = gr.Interface(
# fn=gradio_interface,
# inputs=gr.Textbox(label="Enter URL", placeholder="Enter the share URL here..."),
# outputs=["json"],
# title="Share Info Retrieval"
# )
if __name__ == '__main__':
iface.launch(debug=True)