Spaces:
Paused
Paused
File size: 7,998 Bytes
4691839 72aa956 4691839 e07bd3d 4691839 e07bd3d 4691839 e07bd3d 4691839 e07bd3d 4691839 e07bd3d 4f1b288 e07bd3d 4691839 e07bd3d 4691839 e07bd3d 4691839 e07bd3d 4691839 e07bd3d 4f1b288 e07bd3d 4691839 e07bd3d 4691839 e07bd3d 4691839 e07bd3d a12586d 7ab5786 e07bd3d 7ab5786 e07bd3d 4925567 4f1b288 4925567 4f1b288 4925567 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 | import requests, hashlib, os
from others import *
api_url = 'https://api.iwara.tv'
file_url = 'https://files.iwara.tv'
class BearerAuth(requests.auth.AuthBase):
def __init__(self, token):
self.token = token
def __call__(self, r):
r.headers['Authorization'] = 'Bearer ' + self.token
return r
class ApiClient:
def __init__(self, email, password):
self.email = email
self.password = password
self.api_url = api_url
self.file_url = file_url
self.timeout = 30
self.download_timeout = 300
self.token = None
def login(self) -> requests.Response:
url = self.api_url + '/user/login'
json = {'email': self.email, 'password': self.password}
r = requests.post(url, json=json, timeout=self.timeout)
try:
self.token = r.json()['token']
print('API Login success')
except:
print('API Login failed')
return r
def get_videos(self, sort='date', rating='all', page=0, limit=32, subscribed=False) -> requests.Response:
url = self.api_url + '/videos'
params = {'sort': sort, 'rating': rating, 'page': page, 'limit': limit, 'subscribed': 'true' if subscribed else 'false'}
if self.token is None:
r = requests.get(url, params=params, timeout=self.timeout)
else:
r = requests.get(url, params=params, auth=BearerAuth(self.token), timeout=self.timeout)
return r
def get_video(self, video_id) -> requests.Response:
url = self.api_url + '/video/' + video_id
if self.token is None:
r = requests.get(url, timeout=self.timeout)
else:
r = requests.get(url, auth=BearerAuth(self.token), timeout=self.timeout)
return r
def download_video_thumbnail(self, video_id) -> str:
video = self.get_video(video_id).json()
file_id = video['file']['id']
thumbnail_id = video['thumbnail']
url = self.file_url + '/image/original/' + file_id + '/thumbnail-{:02d}.jpg'.format(thumbnail_id)
title = video['title']
user = video['user']
name = user['name']
username = user['username']
print("Name:", name)
print("Username:", username)
print("Title:", title)
judul = f"{name} - {title.replace(' / ', ' ').replace('/', ' ').replace('_', ' ').title().replace('’S', '’s').replace('Mmd','MMD').replace('Nikke','NIKKE').replace('Fate','FATE').replace('】','】 ').replace(' ', ' ')}"
folder_download = f"/home/user/app/Iwara/Thumbnail/{name}"
if not os.path.exists(folder_download):
os.makedirs(folder_download)
thumbnail_file_name = os.path.join(folder_download, judul + '.jpg')
if os.path.exists(thumbnail_file_name):
print(f"Video ID {video_id} thumbnail already downloaded, skipped downloading.")
return thumbnail_file_name
print(f"Downloading thumbnail for video ID: {video_id} ...")
with open(thumbnail_file_name, 'wb') as f:
response = requests.get(url, stream=True)
total_size = int(response.headers.get('Content-Length', 0))
progress_bar = tqdm(total=total_size, unit='B', unit_scale=True, ncols=80)
for chunk in response.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
progress_bar.update(len(chunk))
progress_bar.close()
return thumbnail_file_name
def download_video(self, video_id) -> str:
try:
video = self.get_video(video_id).json()
except Exception as e:
raise Exception(f"Failed to get video info for video ID: {video_id}, error: {e}")
url = video['fileUrl']
file_id = video['file']['id']
expires = url.split('/')[4].split('?')[1].split('&')[0].split('=')[1]
SHA_postfix = "_5nFp9kmbNnHdAFhaqMvt"
SHA_key = file_id + "_" + expires + SHA_postfix
hash = hashlib.sha1(SHA_key.encode('utf-8')).hexdigest()
headers = {"X-Version": hash}
resources = requests.get(url, headers=headers, auth=BearerAuth(self.token), timeout=self.timeout).json()
title = video['title']
user = video['user']
name = user['name']
username = user['username']
print("Name:", name)
print("Username:", username)
print("Title:", title)
judul = f"{name} - {title.replace(' / ', ' ').replace('/', ' ').replace('_', ' ').title().replace('’S', '’s').replace('Mmd','MMD').replace('Nikke','NIKKE').replace('Fate','FATE').replace('】','】 ').replace(' ', ' ')}"
folder_download = f"/home/user/app/Iwara/Video/{name}"
if not os.path.exists(folder_download):
os.makedirs(folder_download)
quality_priority = ['Source', '1080', '720', '540', '480', '360']
resources_by_quality = {quality: None for quality in quality_priority}
for resource in resources:
if resource['name'] in resources_by_quality:
resources_by_quality[resource['name']] = resource
download_resource = None
for quality in quality_priority:
if resources_by_quality[quality] is not None:
download_resource = resources_by_quality[quality]
break
if download_resource is None:
raise Exception("No video with acceptable quality found")
download_link = "https:" + download_resource['src']['download']
file_type = download_resource['type'].split('/')[1]
video_file_name = os.path.join(folder_download, judul + '.' + file_type)
if os.path.exists(video_file_name):
print(f"Video ID {video_id} already downloaded, skipped downloading.")
return video_file_name
print(f"Downloading video ID: {video_id} ...")
try:
with open(video_file_name, 'wb') as file:
response = requests.get(download_link, stream=True)
total_size = int(response.headers.get("Content-Length", 0))
progress_bar = tqdm(total=total_size, unit="B", unit_scale=True, ncols=80)
for chunk in response.iter_content(chunk_size=1024):
if chunk:
file.write(chunk)
progress_bar.update(len(chunk))
progress_bar.close()
return video_file_name
except Exception as e:
if os.path.exists(video_file_name):
os.remove(video_file_name)
raise Exception(f"Failed to download video ID: {video_id}, error: {e}")
# def process_txt(txt_files, baris = 1):
# for txt_file in txt_files:
# file_path = os.path.join('/content', txt_file)
# with open(file_path, 'r') as f:
# links = f.read().splitlines()
# for link in links:
# if link.strip():
# print(f"Baris: {baris}")
# video_id = link.split("/")[4]
# client.download_video(video_id)
# baris += 1
# print()
# os.remove(file_path)
# -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
def iwara(video_url):
if 'client' not in globals():
client = ApiClient(email="AnimuHikari", password="Hikari123")
client.login()
video_id = video_url.split("/")[4]
video_file = client.download_video(video_id)
thumbnail_file = client.download_video_thumbnail(video_id)
# Mengkonversi video
video_file = convert_videos(720, video_file)
judul = os.path.splitext(os.path.basename(video_file))[0]
video_info = f"Judul: {judul}\n"
return video_file, judul, video_info, thumbnail_file |