VideoCutter / iwara.py
GilbertClaus's picture
Iwara
7ab5786
import requests, hashlib, os
from others import *
api_url = 'https://api.iwara.tv'
file_url = 'https://files.iwara.tv'
class BearerAuth(requests.auth.AuthBase):
def __init__(self, token):
self.token = token
def __call__(self, r):
r.headers['Authorization'] = 'Bearer ' + self.token
return r
class ApiClient:
def __init__(self, email, password):
self.email = email
self.password = password
self.api_url = api_url
self.file_url = file_url
self.timeout = 30
self.download_timeout = 300
self.token = None
def login(self) -> requests.Response:
url = self.api_url + '/user/login'
json = {'email': self.email, 'password': self.password}
r = requests.post(url, json=json, timeout=self.timeout)
try:
self.token = r.json()['token']
print('API Login success')
except:
print('API Login failed')
return r
def get_videos(self, sort='date', rating='all', page=0, limit=32, subscribed=False) -> requests.Response:
url = self.api_url + '/videos'
params = {'sort': sort, 'rating': rating, 'page': page, 'limit': limit, 'subscribed': 'true' if subscribed else 'false'}
if self.token is None:
r = requests.get(url, params=params, timeout=self.timeout)
else:
r = requests.get(url, params=params, auth=BearerAuth(self.token), timeout=self.timeout)
return r
def get_video(self, video_id) -> requests.Response:
url = self.api_url + '/video/' + video_id
if self.token is None:
r = requests.get(url, timeout=self.timeout)
else:
r = requests.get(url, auth=BearerAuth(self.token), timeout=self.timeout)
return r
def download_video_thumbnail(self, video_id) -> str:
video = self.get_video(video_id).json()
file_id = video['file']['id']
thumbnail_id = video['thumbnail']
url = self.file_url + '/image/original/' + file_id + '/thumbnail-{:02d}.jpg'.format(thumbnail_id)
title = video['title']
user = video['user']
name = user['name']
username = user['username']
print("Name:", name)
print("Username:", username)
print("Title:", title)
judul = f"{name} - {title.replace(' / ', ' ').replace('/', ' ').replace('_', ' ').title().replace('’S', '’s').replace('Mmd','MMD').replace('Nikke','NIKKE').replace('Fate','FATE').replace('】','】 ').replace(' ', ' ')}"
folder_download = f"/home/user/app/Iwara/Thumbnail/{name}"
if not os.path.exists(folder_download):
os.makedirs(folder_download)
thumbnail_file_name = os.path.join(folder_download, judul + '.jpg')
if os.path.exists(thumbnail_file_name):
print(f"Video ID {video_id} thumbnail already downloaded, skipped downloading.")
return thumbnail_file_name
print(f"Downloading thumbnail for video ID: {video_id} ...")
with open(thumbnail_file_name, 'wb') as f:
response = requests.get(url, stream=True)
total_size = int(response.headers.get('Content-Length', 0))
progress_bar = tqdm(total=total_size, unit='B', unit_scale=True, ncols=80)
for chunk in response.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
progress_bar.update(len(chunk))
progress_bar.close()
return thumbnail_file_name
def download_video(self, video_id) -> str:
try:
video = self.get_video(video_id).json()
except Exception as e:
raise Exception(f"Failed to get video info for video ID: {video_id}, error: {e}")
url = video['fileUrl']
file_id = video['file']['id']
expires = url.split('/')[4].split('?')[1].split('&')[0].split('=')[1]
SHA_postfix = "_5nFp9kmbNnHdAFhaqMvt"
SHA_key = file_id + "_" + expires + SHA_postfix
hash = hashlib.sha1(SHA_key.encode('utf-8')).hexdigest()
headers = {"X-Version": hash}
resources = requests.get(url, headers=headers, auth=BearerAuth(self.token), timeout=self.timeout).json()
title = video['title']
user = video['user']
name = user['name']
username = user['username']
print("Name:", name)
print("Username:", username)
print("Title:", title)
judul = f"{name} - {title.replace(' / ', ' ').replace('/', ' ').replace('_', ' ').title().replace('’S', '’s').replace('Mmd','MMD').replace('Nikke','NIKKE').replace('Fate','FATE').replace('】','】 ').replace(' ', ' ')}"
folder_download = f"/home/user/app/Iwara/Video/{name}"
if not os.path.exists(folder_download):
os.makedirs(folder_download)
quality_priority = ['Source', '1080', '720', '540', '480', '360']
resources_by_quality = {quality: None for quality in quality_priority}
for resource in resources:
if resource['name'] in resources_by_quality:
resources_by_quality[resource['name']] = resource
download_resource = None
for quality in quality_priority:
if resources_by_quality[quality] is not None:
download_resource = resources_by_quality[quality]
break
if download_resource is None:
raise Exception("No video with acceptable quality found")
download_link = "https:" + download_resource['src']['download']
file_type = download_resource['type'].split('/')[1]
video_file_name = os.path.join(folder_download, judul + '.' + file_type)
if os.path.exists(video_file_name):
print(f"Video ID {video_id} already downloaded, skipped downloading.")
return video_file_name
print(f"Downloading video ID: {video_id} ...")
try:
with open(video_file_name, 'wb') as file:
response = requests.get(download_link, stream=True)
total_size = int(response.headers.get("Content-Length", 0))
progress_bar = tqdm(total=total_size, unit="B", unit_scale=True, ncols=80)
for chunk in response.iter_content(chunk_size=1024):
if chunk:
file.write(chunk)
progress_bar.update(len(chunk))
progress_bar.close()
return video_file_name
except Exception as e:
if os.path.exists(video_file_name):
os.remove(video_file_name)
raise Exception(f"Failed to download video ID: {video_id}, error: {e}")
# def process_txt(txt_files, baris = 1):
# for txt_file in txt_files:
# file_path = os.path.join('/content', txt_file)
# with open(file_path, 'r') as f:
# links = f.read().splitlines()
# for link in links:
# if link.strip():
# print(f"Baris: {baris}")
# video_id = link.split("/")[4]
# client.download_video(video_id)
# baris += 1
# print()
# os.remove(file_path)
# -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
def iwara(video_url):
if 'client' not in globals():
client = ApiClient(email="AnimuHikari", password="Hikari123")
client.login()
video_id = video_url.split("/")[4]
video_file = client.download_video(video_id)
thumbnail_file = client.download_video_thumbnail(video_id)
# Mengkonversi video
video_file = convert_videos(720, video_file)
judul = os.path.splitext(os.path.basename(video_file))[0]
video_info = f"Judul: {judul}\n"
return video_file, judul, video_info, thumbnail_file